June 12, 2016 · scala concurrency

Scala notes - Futures - 3 (Combinators and Async)

In the previous parts of this post, we discussed about Futures and Promises. In this last part, we'll compose Futures using its powerful combinators.

Composing Futures :

In the first post, we saw how to extract a value from Future using onComplete, foreach and in testcases using Await.result. Extracting a value from a single Future is good but many a time we spawn more than one asynchronous operation and wait on multiple Futures to arrive at the final result. Even better, sometimes the result of one Future would be fed into another or a chain of Futures.

Future is a Monad. (I am sorry to drop the M-bomb here and I will take a stab at explaining my understanding on what a Monoid, Functor, Monad and an Applicative later). But for now, let's live with this crude explanation :

  1. Future is a container of a value(s) of some type (i.e it accepts a type as an argument and it can't exist without it). You can have a Future[Int] or Future[String] or Future[AwesomeClass] - you can't just have a plain Future. A fancy term for this is type-constructor. To compare, a List is a type constructor (and a Monad as well). A list is a container of values that are of type Int, String or any of other types. A List/Future without a contained type does not exist.

  2. Future has flatMap and unit functions (and consequentially a map function too).

The reason I brought this up is that instead of using the onComplete callback or the foreach, we could simply map or flatMap the result of the Future just like we would do it against an Option or a List.

Now, let's look at the map and the flatMap combinators.

Mapping Futures that execute sequentially

Let's consider this simple task, which adds three numbers which are asynchronously calculated after some interval.

Warning : The following code is messy and executes the Futures sequentially

Code

class FutureCombinators {

  def sumOfThreeNumbersSequentialMap(): Future[Int] = {
    Future {
      Thread.sleep(1000)
      1
    }.flatMap { oneValue =>
      Future {
        Thread.sleep(2000)
        2
      }.flatMap { twoValue =>
        Future {
          Thread.sleep(3000)
          3
        }.map { thirdValue =>
          oneValue + twoValue + thirdValue
        }
      }
    }
  }
...
...

The first Future returns a 1 after 1 second, the second Future returns a 2 after 2 seconds and the third Future returns a 3 after 3 seconds. The nested block finally calculates the sum of three values and returns one single Future[Int].

Testcase

For the sake of calculating the time taken to compute the values, we have a small utility function (inside the ConcurrentUtils trait) called timed which calculates and prints the time taken by a block.

We Await.result to do a blocking wait on the result of futureCombinators.sumOfThreeNumbersSequentialMap. We also time the total execution and print it.

class FutureCombinatorsTest extends FunSpec with Matchers with ConcurrentUtils {

  describe("Futures") {
    it("could be composed using map") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialMap(), 7 seconds))
      result shouldBe 6
    }
  }
...
...
}

trait ConcurrentUtils {
  def timed[T](block: => T): T = {
    val start = System.currentTimeMillis()
    val result = block
    val duration = System.currentTimeMillis() - start
    println(s"Time taken : $duration")
    result
  }
}

Output

Time taken : 6049

The function took a little over 6 seconds to execute which indicates that the Futures are executed in sequence.

Using for-comprehension syntactic sugar instead of Map

Scala gives a great way to work with classes that has map and flatMap (Monads) - for comprehensions. For comprehensions are just syntactic sugar, which gets de-sugared into flatMap and map.

The following code means exactly the same as the above, except that the uglification is done by the Scala compiler.

Code
def sumOfThreeNumbersSequentialForComprehension(): Future[Int] = {
    for {
      localOne <- Future {
        Thread.sleep(1000)
        1
      }
      localTwo <- Future {
        Thread.sleep(2000)
        2
      }
      localThree <- Future {
        Thread.sleep(3000)
        3
      }
    } yield localOne + localTwo + localThree
}

Testcase

It's the same as above.

  it("could be composed using for comprehensions") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialForComprehension(), 7 seconds))
      result shouldBe 6
    }

Output

Time taken : 6012
Executing Futures in parallel

As we saw, the previous block of code runs the three Futures sequentially and therefore takes a total of 6 seconds to finish the computation. That isn't good. Our Futures need to run in parallel. In order to achieve this, all we need to do is to extract the Future block out and declare them separately.

Code

 val oneFuture: Future[Int] = Future {
    Thread.sleep(1000)
    1
  }

  val twoFuture: Future[Int] = Future {
    Thread.sleep(2000)
    2
  }

  val threeFuture: Future[Int] = Future {
    Thread.sleep(3000)
    3
  }

Now, let's use for-comprehension to calculate the value.

def sumOfThreeNumbersParallelMapForComprehension(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture
    threeValue <- threeFuture
} yield oneValue + twoValue + threeValue

Testcase

Let's time the computation and assert the correct value using the following testcase.

 describe("Futures that are executed in parallel") {
    it("could be composed using for comprehensions") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallel(), 4 seconds))
      result shouldBe 6
    }
  }

Output

Time taken : 3005

As we see, that sumOfThreeNumbersParallel takes almost the same time as the longest Future (threeFuture), which is 3 seconds.

Just for the sake of comparison, the above code could be written without using for-comprehension as :


def sumOfThreeNumbersParallelMap(): Future[Int] = oneFuture.flatMap { oneValue =>
    twoFuture.flatMap { twoValue =>
      threeFuture.map { threeValue =>
        oneValue + twoValue + threeValue
      }
    }
}

Guards in for-comprehensions

Just like we add a guarded if clause in for-comprehensions over List and other collections (aka other Monadic types), we could add guards against the generators of Future as well. The following if guard checks if the value returned by the twoFuture is more than 1, which it is.

  def sumOfThreeNumbersParallelWithGuard(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 1
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue

These guards get de-sugared as withFilter like so (I am 90% sure nobody wants to write this way):

def sumOfThreeNumbersMapAndFlatMapWithFilter(): Future[Int] = oneFuture.flatMap { oneValue =>
    twoFuture.withFilter(_ > 1).flatMap { twoValue =>
      threeFuture.map { threeValue =>
        oneValue + twoValue + threeValue
      }
    }
  }
Guards in for-comprehensions - Failure case

If the guard evaluates to false thereby the generator yielding a failure, a NoSuchElementException would be thrown. Let's change the guard condition to evaluate to false.

  def sumOfThreeNumbersParallelWithGuardAndFailure(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 2
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue

Output

Future.filter predicate is not satisfied
java.util.NoSuchElementException: Future.filter predicate is not satisfied
	at scala.concurrent.Future$$anonfun$filter$1.apply(Future.scala:280)
	at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
	at scala.util.Try$.apply(Try.scala:192)
	at scala.util.Success.map(Try.scala:237)
	at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
	at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception handling

Just like the NoSuchElementException thrown by the guard, code executing asynchronously inside a Future could throw a variety of exceptions. While one may argue that Exceptions are not very FP-like, chances are that with a distributed application or through usage of Java libraries inside your Future, exceptions do happen.

Code

Both the two functions below throw Exceptions - the first one throws a NoSuchElementException and the second one throws a LegacyException.


  //NoSuchElementException
  def throwsNoSuchElementIfGuardFails(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 2
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue

  //LegacyException

  val futureCallingLegacyCode: Future[Int] = Future {
    Thread.sleep(1000)
    throw new LegacyException("Danger! Danger!")
  }

  def throwsExceptionFromComputation(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCode
  } yield oneValue + futureThrowingException

case class LegacyException(msg: String) extends Exception(msg)
Testcases
 describe("Futures that throw exception") {
    it("could blow up on the caller code when guard fails") {
      val futureCombinators = new FutureCombinators
      intercept[NoSuchElementException] {
        val result = timed(Await.result(futureCombinators.throwsNoSuchElementIfGuardFails(), 4 seconds))
      }
    }

    it("could blow up on the caller code when exception comes from a computation executed inside the Future") {
      val futureCombinators = new FutureCombinators
      intercept[LegacyException] {
        val result = timed(Await.result(futureCombinators.throwsExceptionFromComputation(), 4 seconds))
      }
    }
...
...

Note that even if one of the Futures result in an exception, the entire result of the composed computation will result in propagating the exception.

Recovering from Exception :

Using recover

If a Future throws a scala.util.control.NonFatal and we would want to have a default fallback value instead of propagating the error to the caller, we could use the recover function. The recover is much like the catch block.

Let's modify the above function throwsExceptionFromComputation which throws a LegacyException. The recover function accepts a PartialFunction that maps from Throwable to the type that the Future wraps.

Code

In the below code, if the futureCallingLegacyCode throws an Exception (which it does), the value that is the result of this computation is set to be 200. If it hadn't thrown an Exception, the resulting value would be the result of that computation itself.

  val futureCallingLegacyCodeWithRecover: Future[Int] = futureCallingLegacyCode.recover {
    case LegacyException(msg) => 200
  }

  def recoversFromExceptionUsingRecover(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCodeWithRecover
  } yield oneValue + futureThrowingException

To reiterate, if the original Future yields a successful value, the recover block is never executed. Also, if the PartialFunction inside the recover function does not handle the exception that is originally thrown, the exception gets propagated to the caller.

Testcase

The testcase asserts that the result of the computation is the sum of values that are returned by oneFuture (which is 1) and the futureCallingLegacyCodeWithRecover (which is 200).

   it("could be recovered with a recovery value") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecover(), 2 seconds))
      result shouldBe 201
    }

Output

Time taken : 1004
Using recoverWith

Instead of recovering with a value when a Future results in an Exception, we might want to recover with a result of some other Future in some circumstances. Say, unavailability of a HTTP call to Server1 due to network failure could be recovered with a HTTP call to another service running on Server2.

Similar to recover, the recoverWith accepts a PartialFunction. However, the PartialFunction maps a Throwable to a Future of the same type as the original Future.

Just like recover, if the main Future on which the recoverWith is called fails, then the Future that is mapped to in the PartialFunction gets called. If the second future results in a successful value, then the new result is returned.

Code
 val futureCallingLegacyCodeWithRecoverWith: Future[Int] = futureCallingLegacyCode.recoverWith {
    case LegacyException(msg) =>
      println("Exception occurred. Recovering with a Future that wraps 1000")
      Thread.sleep(2000)
      Future(1000)
  }

  def recoversFromExceptionUsingRecoverWith(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCodeWithRecoverWith
  } yield oneValue + futureThrowingException

Testcase

The oneFuture takes 1 second and the recovering Future takes 2 seconds. So, we set the Await.result timeout to 4 seconds. The final result 1001 is the sum of the result of oneFuture and futureCallingLegacyCodeWithRecoverWith.

    it("could be recovered with a recovery Future") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWith(), 4 seconds))
      result shouldBe 1001
    }

Output

Time taken : 3006

Note that just like the recover, if the second future also fails, then the error thrown by the second future gets propagated to the caller.

Code

In the following code, we create another Future that throws an Exception with message Dieded!! and we recover the first Future with this error-throwing-Future. The testcase would reveal that the exception from the second future (recovery one) gets thrown back to the caller.

 val anotherErrorThrowingFuture: Future[Int] = Future {
    Thread.sleep(1000)
    throw new LegacyException("Dieded!!")
  }

  val futureRecoveringWithAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.recoverWith {
    case LegacyException(msg) =>
      anotherErrorThrowingFuture
  }

  def recoversFromExceptionUsingRecoverWithThatFails(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureRecoveringWithAnotherErrorThrowingFuture
  } yield oneValue + futureThrowingException

Testcase
  it("when recovered with another Future that throws Exception would throw the error from the second Future") {
      val futureCombinators = new FutureCombinators
      val exception = intercept[LegacyException] {
        timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWithThatFails(), 4 seconds))
      }
      exception.msg shouldBe "Dieded!!"
    }
Using fallbackTo :

fallbackTo works just like recoverWith when it comes to successful value. It uses the first Future's value if it is successful or falls back to the second Future's value. However, if both the first and the second Future fails, then the error that is propagated to the caller is that of the first Future and not the second Future.

Code

Let's use the same Futures that we used in the recoverWith.


  val futureFallingBackToAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.fallbackTo (anotherErrorThrowingFuture)

  def recoversFromExceptionUsingFallbackTo(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureFallingBackToAnotherErrorThrowingFuture
  } yield oneValue + futureThrowingException

Notice that the fallbackTo function just accepts another Future and not a PartialFunction like recoverWith.

Testcase

   it("when fallen back to another Future that throws Exception would throw the error from the first Future") {
      val futureCombinators = new FutureCombinators
      val exception = intercept[LegacyException] {
        timed(Await.result(futureCombinators.recoversFromExceptionUsingFallbackTo(), 4 seconds))
      }
      exception.msg shouldBe "Danger! Danger!"
    }

Other interesting and useful combinators

The following is a super-brief list of other Future combinators which I find very useful.

zip

zip works just like List.zip. It just merges two Futures and yields a Future of a Tuple.

def zipTwoFutures:Future[(Int,Int)]=oneFuture zip twoFuture
firstCompletedOf

Ah! The firstCompletedOf comes in really handy when you have two equivalent services and you want to proceed once the fastest service returns a value.

  val listOfFutures=List(oneFuture,twoFuture,threeFuture)
  def getFirstResult():Future[Int]=Future.firstCompletedOf(listOfFutures)

In the above case, the oneFuture returns the fastest.

sequence

The sequence is pure magic. Say, you have a List[Future[Int]] just like List(oneFuture,twoFuture,threeFuture) and you require that all the values are given back to you as a List[Int] instead of each Int wrapped inside a Future. The sequence takes your List[Future[Int]] and transforms into a Future[List[Int]]

def getResultsAsList():Future[List[Int]]=Future.sequence(listOfFutures)

The last time I used was for batching where I executed logic against chunks of data in parallel and combined them together with sequence.


Scala-async library

The Scala Async library is an external project and could be added to the project by adding the dependency into our build.sbt

"org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2"

The Async library has just two powerful functions in its scala.async.Async class - async and await.

async

The async function is very similar to the Future.apply function. In fact their signatures are very much the same and we could comfortably replace the Future.apply with async wherever it is available.

Future.apply


def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T]

Async


def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T]

The foremost advantage of using async over Future.apply, other than the general ease of readability, is that for each Future generator (when used with a for comprehension), the compiler yields a separate anonymous class while with async it is just one single anonymous class.

Therefore, we could re-write our oneFuture as,

val oneFuture: Future[Int] = async {
    Thread.sleep(1000)
    1
}
await

The await function accepts a Future and returns the result. But isn't the same as Await.result which accepts a Future and returns the result as well? Nope. The key difference is that the Await.result is blocking and is strongly discouraged to be used in production code except for testcases. The await function, on the other hand is implemented using Scala macros and the implementation is that it returns the result of the Future using the onComplete callback.

Since the async function returns a Future, all other error handling and recovery mechanisms stays the same as before.

Code

Let's rewrite the previous sum-of-three-numbers with async/await :

  def sumOfThreeNumbersParallelWithAsyncAwait(): Future[Int] = async {
    await(oneFuture) + await(twoFuture) + await(threeFuture)
  }
Testcase
  it("could be composed using async/await") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallelWithAsyncAwait(), 4 seconds))
      result shouldBe 6
    }

As we see, the code written this way is not only asynchronous but also looks natural (in fact, it looks synchronous). We could argue that for-comprehensions are a huge leap from using map and flatMap but async/await goes one big step further.

Code

The code and its corresponding testcase are on github.