Scala notes - Futures - 3 (Combinators and Async)
In the previous parts of this post, we discussed about Futures and Promises. In this last part, we'll compose Futures using its powerful combinators.
Composing Futures :
In the first post, we saw how to extract a value from Future
using onComplete
, foreach
and in testcases using Await.result
. Extracting a value from a single Future
is good but many a time we spawn more than one asynchronous operation and wait on multiple Futures to arrive at the final result. Even better, sometimes the result of one Future
would be fed into another or a chain of Futures.
Future
is a Monad. (I am sorry to drop the M-bomb here and I will take a stab at explaining my understanding on what a Monoid
, Functor
, Monad
and an Applicative
later). But for now, let's live with this crude explanation :
-
Future
is a container of a value(s) of some type (i.e it accepts a type as an argument and it can't exist without it). You can have aFuture[Int]
orFuture[String]
orFuture[AwesomeClass]
- you can't just have a plainFuture
. A fancy term for this is type-constructor. To compare, aList
is a type constructor (and a Monad as well). A list is a container of values that are of typeInt
,String
or any of other types. A List/Future without a contained type does not exist. -
Future
hasflatMap
andunit
functions (and consequentially amap
function too).
The reason I brought this up is that instead of using the onComplete
callback or the foreach
, we could simply map
or flatMap
the result of the Future just like we would do it against an Option
or a List
.
Now, let's look at the map
and the flatMap
combinators.
Mapping Futures that execute sequentially
Let's consider this simple task, which adds three numbers which are asynchronously calculated after some interval.
Warning : The following code is messy and executes the Futures sequentially
Code
class FutureCombinators {
def sumOfThreeNumbersSequentialMap(): Future[Int] = {
Future {
Thread.sleep(1000)
1
}.flatMap { oneValue =>
Future {
Thread.sleep(2000)
2
}.flatMap { twoValue =>
Future {
Thread.sleep(3000)
3
}.map { thirdValue =>
oneValue + twoValue + thirdValue
}
}
}
}
...
...
The first Future returns a 1 after 1 second, the second Future returns a 2 after 2 seconds and the third Future returns a 3 after 3 seconds. The nested block finally calculates the sum of three values and returns one single Future[Int]
.
Testcase
For the sake of calculating the time taken to compute the values, we have a small utility function (inside the ConcurrentUtils
trait) called timed which calculates and prints the time taken by a block.
We Await.result
to do a blocking wait on the result of futureCombinators.sumOfThreeNumbersSequentialMap
. We also time the total execution and print it.
class FutureCombinatorsTest extends FunSpec with Matchers with ConcurrentUtils {
describe("Futures") {
it("could be composed using map") {
val futureCombinators = new FutureCombinators
val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialMap(), 7 seconds))
result shouldBe 6
}
}
...
...
}
trait ConcurrentUtils {
def timed[T](block: => T): T = {
val start = System.currentTimeMillis()
val result = block
val duration = System.currentTimeMillis() - start
println(s"Time taken : $duration")
result
}
}
Output
Time taken : 6049
The function took a little over 6 seconds to execute which indicates that the Futures are executed in sequence.
Using for-comprehension syntactic sugar instead of Map
Scala gives a great way to work with classes that has map
and flatMap
(Monads) - for comprehensions. For comprehensions are just syntactic sugar, which gets de-sugared into flatMap
and map
.
The following code means exactly the same as the above, except that the uglification is done by the Scala compiler.
Code
def sumOfThreeNumbersSequentialForComprehension(): Future[Int] = {
for {
localOne <- Future {
Thread.sleep(1000)
1
}
localTwo <- Future {
Thread.sleep(2000)
2
}
localThree <- Future {
Thread.sleep(3000)
3
}
} yield localOne + localTwo + localThree
}
Testcase
It's the same as above.
it("could be composed using for comprehensions") {
val futureCombinators = new FutureCombinators
val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialForComprehension(), 7 seconds))
result shouldBe 6
}
Output
Time taken : 6012
Executing Futures in parallel
As we saw, the previous block of code runs the three Futures sequentially and therefore takes a total of 6 seconds to finish the computation. That isn't good. Our Futures need to run in parallel. In order to achieve this, all we need to do is to extract the Future block out and declare them separately.
Code
val oneFuture: Future[Int] = Future {
Thread.sleep(1000)
1
}
val twoFuture: Future[Int] = Future {
Thread.sleep(2000)
2
}
val threeFuture: Future[Int] = Future {
Thread.sleep(3000)
3
}
Now, let's use for-comprehension to calculate the value.
def sumOfThreeNumbersParallelMapForComprehension(): Future[Int] = for {
oneValue <- oneFuture
twoValue <- twoFuture
threeValue <- threeFuture
} yield oneValue + twoValue + threeValue
Testcase
Let's time the computation and assert the correct value using the following testcase.
describe("Futures that are executed in parallel") {
it("could be composed using for comprehensions") {
val futureCombinators = new FutureCombinators
val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallel(), 4 seconds))
result shouldBe 6
}
}
Output
Time taken : 3005
As we see, that sumOfThreeNumbersParallel
takes almost the same time as the longest Future (threeFuture
), which is 3 seconds.
Just for the sake of comparison, the above code could be written without using for-comprehension as :
def sumOfThreeNumbersParallelMap(): Future[Int] = oneFuture.flatMap { oneValue =>
twoFuture.flatMap { twoValue =>
threeFuture.map { threeValue =>
oneValue + twoValue + threeValue
}
}
}
Guards in for-comprehensions
Just like we add a guarded if
clause in for-comprehensions over List
and other collections (aka other Monadic types), we could add guards against the generators of Future
as well. The following if
guard checks if the value returned by the twoFuture
is more than 1, which it is.
def sumOfThreeNumbersParallelWithGuard(): Future[Int] = for {
oneValue <- oneFuture
twoValue <- twoFuture if twoValue > 1
threeValue <- threeFuture
} yield oneValue + twoValue + threeValue
These guards get de-sugared as withFilter
like so (I am 90% sure nobody wants to write this way):
def sumOfThreeNumbersMapAndFlatMapWithFilter(): Future[Int] = oneFuture.flatMap { oneValue =>
twoFuture.withFilter(_ > 1).flatMap { twoValue =>
threeFuture.map { threeValue =>
oneValue + twoValue + threeValue
}
}
}
Guards in for-comprehensions - Failure case
If the guard evaluates to false thereby the generator yielding a failure, a NoSuchElementException
would be thrown. Let's change the guard condition to evaluate to false.
def sumOfThreeNumbersParallelWithGuardAndFailure(): Future[Int] = for {
oneValue <- oneFuture
twoValue <- twoFuture if twoValue > 2
threeValue <- threeFuture
} yield oneValue + twoValue + threeValue
Output
Future.filter predicate is not satisfied
java.util.NoSuchElementException: Future.filter predicate is not satisfied
at scala.concurrent.Future$$anonfun$filter$1.apply(Future.scala:280)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Success.map(Try.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception handling
Just like the NoSuchElementException
thrown by the guard, code executing asynchronously inside a Future
could throw a variety of exceptions. While one may argue that Exceptions are not very FP-like, chances are that with a distributed application or through usage of Java libraries inside your Future
, exceptions do happen.
Code
Both the two functions below throw Exceptions - the first one throws a NoSuchElementException
and the second one throws a LegacyException
.
//NoSuchElementException
def throwsNoSuchElementIfGuardFails(): Future[Int] = for {
oneValue <- oneFuture
twoValue <- twoFuture if twoValue > 2
threeValue <- threeFuture
} yield oneValue + twoValue + threeValue
//LegacyException
val futureCallingLegacyCode: Future[Int] = Future {
Thread.sleep(1000)
throw new LegacyException("Danger! Danger!")
}
def throwsExceptionFromComputation(): Future[Int] = for {
oneValue <- oneFuture
futureThrowingException <- futureCallingLegacyCode
} yield oneValue + futureThrowingException
case class LegacyException(msg: String) extends Exception(msg)
Testcases
describe("Futures that throw exception") {
it("could blow up on the caller code when guard fails") {
val futureCombinators = new FutureCombinators
intercept[NoSuchElementException] {
val result = timed(Await.result(futureCombinators.throwsNoSuchElementIfGuardFails(), 4 seconds))
}
}
it("could blow up on the caller code when exception comes from a computation executed inside the Future") {
val futureCombinators = new FutureCombinators
intercept[LegacyException] {
val result = timed(Await.result(futureCombinators.throwsExceptionFromComputation(), 4 seconds))
}
}
...
...
Note that even if one of the Futures result in an exception, the entire result of the composed computation will result in propagating the exception.
Recovering from Exception :
Using recover
If a Future throws a scala.util.control.NonFatal
and we would want to have a default fallback value instead of propagating the error to the caller, we could use the recover
function. The recover
is much like the catch block.
Let's modify the above function throwsExceptionFromComputation
which throws a LegacyException
. The recover
function accepts a PartialFunction
that maps from Throwable
to the type that the Future
wraps.
Code
In the below code, if the futureCallingLegacyCode
throws an Exception
(which it does), the value that is the result of this computation is set to be 200. If it hadn't thrown an Exception, the resulting value would be the result of that computation itself.
val futureCallingLegacyCodeWithRecover: Future[Int] = futureCallingLegacyCode.recover {
case LegacyException(msg) => 200
}
def recoversFromExceptionUsingRecover(): Future[Int] = for {
oneValue <- oneFuture
futureThrowingException <- futureCallingLegacyCodeWithRecover
} yield oneValue + futureThrowingException
To reiterate, if the original Future
yields a successful value, the recover
block is never executed. Also, if the PartialFunction
inside the recover
function does not handle the exception that is originally thrown, the exception gets propagated to the caller.
Testcase
The testcase asserts that the result of the computation is the sum of values that are returned by oneFuture
(which is 1) and the futureCallingLegacyCodeWithRecover
(which is 200).
it("could be recovered with a recovery value") {
val futureCombinators = new FutureCombinators
val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecover(), 2 seconds))
result shouldBe 201
}
Output
Time taken : 1004
Using recoverWith
Instead of recovering with a value when a Future
results in an Exception
, we might want to recover with a result of some other Future in some circumstances. Say, unavailability of a HTTP call to Server1 due to network failure could be recovered with a HTTP call to another service running on Server2.
Similar to recover
, the recoverWith
accepts a PartialFunction
. However, the PartialFunction
maps a Throwable
to a Future
of the same type as the original Future
.
Just like recover
, if the main Future
on which the recoverWith
is called fails, then the Future
that is mapped to in the PartialFunction
gets called. If the second future results in a successful value, then the new result is returned.
Code
val futureCallingLegacyCodeWithRecoverWith: Future[Int] = futureCallingLegacyCode.recoverWith {
case LegacyException(msg) =>
println("Exception occurred. Recovering with a Future that wraps 1000")
Thread.sleep(2000)
Future(1000)
}
def recoversFromExceptionUsingRecoverWith(): Future[Int] = for {
oneValue <- oneFuture
futureThrowingException <- futureCallingLegacyCodeWithRecoverWith
} yield oneValue + futureThrowingException
Testcase
The oneFuture
takes 1 second and the recovering Future takes 2 seconds. So, we set the Await.result
timeout to 4 seconds. The final result 1001 is the sum of the result of oneFuture
and futureCallingLegacyCodeWithRecoverWith
.
it("could be recovered with a recovery Future") {
val futureCombinators = new FutureCombinators
val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWith(), 4 seconds))
result shouldBe 1001
}
Output
Time taken : 3006
Note that just like the recover
, if the second future also fails, then the error thrown by the second future gets propagated to the caller.
Code
In the following code, we create another Future
that throws an Exception
with message Dieded!! and we recover the first Future with this error-throwing-Future. The testcase would reveal that the exception from the second future (recovery one) gets thrown back to the caller.
val anotherErrorThrowingFuture: Future[Int] = Future {
Thread.sleep(1000)
throw new LegacyException("Dieded!!")
}
val futureRecoveringWithAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.recoverWith {
case LegacyException(msg) =>
anotherErrorThrowingFuture
}
def recoversFromExceptionUsingRecoverWithThatFails(): Future[Int] = for {
oneValue <- oneFuture
futureThrowingException <- futureRecoveringWithAnotherErrorThrowingFuture
} yield oneValue + futureThrowingException
Testcase
it("when recovered with another Future that throws Exception would throw the error from the second Future") {
val futureCombinators = new FutureCombinators
val exception = intercept[LegacyException] {
timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWithThatFails(), 4 seconds))
}
exception.msg shouldBe "Dieded!!"
}
Using fallbackTo :
fallbackTo
works just like recoverWith
when it comes to successful value. It uses the first Future's value if it is successful or falls back to the second Future's value. However, if both the first and the second Future fails, then the error that is propagated to the caller is that of the first Future and not the second Future.
Code
Let's use the same Futures that we used in the recoverWith
.
val futureFallingBackToAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.fallbackTo (anotherErrorThrowingFuture)
def recoversFromExceptionUsingFallbackTo(): Future[Int] = for {
oneValue <- oneFuture
futureThrowingException <- futureFallingBackToAnotherErrorThrowingFuture
} yield oneValue + futureThrowingException
Notice that the fallbackTo
function just accepts another Future
and not a PartialFunction
like recoverWith
.
Testcase
it("when fallen back to another Future that throws Exception would throw the error from the first Future") {
val futureCombinators = new FutureCombinators
val exception = intercept[LegacyException] {
timed(Await.result(futureCombinators.recoversFromExceptionUsingFallbackTo(), 4 seconds))
}
exception.msg shouldBe "Danger! Danger!"
}
Other interesting and useful combinators
The following is a super-brief list of other Future combinators which I find very useful.
zip
zip
works just like List.zip
. It just merges two Futures and yields a Future
of a Tuple
.
def zipTwoFutures:Future[(Int,Int)]=oneFuture zip twoFuture
firstCompletedOf
Ah! The firstCompletedOf
comes in really handy when you have two equivalent services and you want to proceed once the fastest service returns a value.
val listOfFutures=List(oneFuture,twoFuture,threeFuture)
def getFirstResult():Future[Int]=Future.firstCompletedOf(listOfFutures)
In the above case, the oneFuture
returns the fastest.
sequence
The sequence
is pure magic. Say, you have a List[Future[Int]]
just like List(oneFuture,twoFuture,threeFuture)
and you require that all the values are given back to you as a List[Int]
instead of each Int
wrapped inside a Future
. The sequence
takes your List[Future[Int]]
and transforms into a Future[List[Int]]
def getResultsAsList():Future[List[Int]]=Future.sequence(listOfFutures)
The last time I used was for batching where I executed logic against chunks of data in parallel and combined them together with sequence
.
Scala-async library
The Scala Async library is an external project and could be added to the project by adding the dependency into our build.sbt
"org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2"
The Async library has just two powerful functions in its scala.async.Async
class - async
and await
.
async
The async
function is very similar to the Future.apply
function. In fact their signatures are very much the same and we could comfortably replace the Future.apply
with async
wherever it is available.
Future.apply
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T]
Async
def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T]
The foremost advantage of using async
over Future.apply
, other than the general ease of readability, is that for each Future generator (when used with a for comprehension), the compiler yields a separate anonymous class while with async it is just one single anonymous class.
Therefore, we could re-write our oneFuture
as,
val oneFuture: Future[Int] = async {
Thread.sleep(1000)
1
}
await
The await
function accepts a Future
and returns the result. But isn't the same as Await.result
which accepts a Future
and returns the result as well? Nope. The key difference is that the Await.result
is blocking and is strongly discouraged to be used in production code except for testcases. The await
function, on the other hand is implemented using Scala macros and the implementation is that it returns the result of the Future using the onComplete
callback.
Since the async
function returns a Future
, all other error handling and recovery mechanisms stays the same as before.
Code
Let's rewrite the previous sum-of-three-numbers with async/await :
def sumOfThreeNumbersParallelWithAsyncAwait(): Future[Int] = async {
await(oneFuture) + await(twoFuture) + await(threeFuture)
}
Testcase
it("could be composed using async/await") {
val futureCombinators = new FutureCombinators
val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallelWithAsyncAwait(), 4 seconds))
result shouldBe 6
}
As we see, the code written this way is not only asynchronous but also looks natural (in fact, it looks synchronous). We could argue that for-comprehensions are a huge leap from using map
and flatMap
but async/await
goes one big step further.
Code
The code and its corresponding testcase are on github.