May 29, 2016 · scala concurrency

Scala Notes - Futures - 1

Almost all modern programming languages have a Future-Promise idiom for concurrent programming. I don't intend to bore you with why we need higher level of concurrency abstractions. Instead, in this post, we'll cut to the chase and discuss only about Scala's approach to Futures.

A scala.concurrent.Future is a representation of a value that is yet to be realized. This value is generally a result of a longer and/or parallel computation.

In crude terms, a block of code which runs synchronously, when wrapped in a Future runs asynchronously. All implementations of Future gives us a handle through which we could can get retrieve the resulting value of the computation of the block (or what's the point in it!).

In this post, we'll look at the basics of how to construct a Future and extract the value out of it through blocking wait and callbacks. In the next part, we'll talk about composing Futures and other advanced constructs such as recover and fallback for exception handling.

Creating an asynchronous computation

Creating a computation that runs asynchronously using Future is super easy. We just need to throw in our logic into the apply function of the Future

val aFuture: Future[Int] = Future {
    //Some massively huge super important computation
}

As an example, let's create a oneFuture that returns a 1 after a delay of one second.

val oneFuture: Future[Int] = Future {
    Thread.sleep(1000)
    1
}

Let's pause and explore the apply function

def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T]

Yup. You got me there. The method also accepts something called an ExecutionContext as an implicit argument. When a block of code is used to construct a Future, the computation runs on an ExecutionContext. The ExecutionContext is synonymous with a ThreadPool and when a Future is started, it runs on a separate Thread.

Scala provides a ready-made static global ExecutionContext in scala.concurrent.ExecutionContext.global which we plan to use now. (More on this in a moment)

So, the entire code actually looks like this

class PromisingFutures{

  import scala.concurrent.ExecutionContext.Implicits.global

  val oneFuture: Future[Int] = Future {
    Thread.sleep(1000)
    1
  }
  ...
  ...
On using scala.concurrent.ExecutionContext.global

The global ExecutionContext is very convenient to use. However, the underlying ThreadPool is a ForkJoinPool. Nothing wrong with this and ForkJoinPool is amazing for short-lived computations but is strongly not recommended for Blocking IO such as database or webservice calls (or for that sake even long running computations or simply talking outside the JVM).

The good news is that we still can use Future but the workaround is to simply use a separate threadpool that is not ForkJoinPool - a fixed ThreadPool, for example, is a good option.

implicit lazy val fixedThreadPoolExecutionContext: ExecutionContext = {
    val fixedThreadPool: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors * 2) //or some fixed number
    ExecutionContext.fromExecutor(fixedThreadPool)
}

Future States and values

Before we go into the extracting the values out of Future, let's see what are the various states of Future and when and what value is actually available during those states.

The Future just has two states - Not Completed and Completed.

Consider the following image

Future states and their values

When given a computation, the resulting value that we could get from the Future is an Option[Try[T]].

  1. In Not Complete state, the result of the computation is not yet realized. The value therefore would be a None.
  2. Once Completed, the result is a Some(Try[T]) which means it could be one of the following :
    1. A positive outcome of the computation or
    2. An Exception

Let's look at the Success case with an example


def checkState(): Unit = {
    println("Before the job finishes")
    Thread.sleep(500)
    println(s"Completed : ${oneFuture.isCompleted}, Value : ${oneFuture.value}")

    println("After the job finishes")
    Thread.sleep(1100)
    println(s"Completed : ${oneFuture.isCompleted}, Value : ${oneFuture.value}")

}

The time taken for the delayedStringFuture Future to complete is 1 seconds. So, we first check after 500 milliseconds, whether the Future is complete and print its current value. The value function of the Future returns an Option[Try[T]]. Not surprisingly, we get a false for the isCompleted check and a None as the value itself.

Output for 500 ms:

Before the job finishes
Completed : false, Value : None

Let's check back again after 1100 milliseconds giving some leeway after that 1000 millisecond sleep. The output now is the result of the computation itself and the completion status is true now..

Output for 1100 ms:

After the job finishes
Completed : true, Value : Some(Success(1))

Now that we've got that out of our way, let's see how to extract a value out of a Future

Extracting value out of Future

Other than composing futures, which we'll be seeing next week, there are two ways to extract just the value out of a Future - Blocking wait and callback

1. Blocking wait using Await.result :

The scala.concurrent.Await's result function has the following syntax :

    @throws(classOf[Exception])
    def result[T](awaitable: Awaitable[T], atMost: Duration): T =
      blocking(awaitable.result(atMost)(AwaitPermission))

It accepts an implementation of the Awaitable trait, which a Future is. It also accepts a second parameter atMost which indicates the maximum time duration the caller thread has to block for the result. Upon expiry of the atMost duration, if the Future still didn't complete, then a java.util.concurrent.TimeoutException would be thrown.

If the Future is complete, Await.result will extract us the actual value. If the Future is complete and if the Future's result is a Throwable, the Exception gets propagated to the caller.

Using Await.result in production code is highly discouraged but this construct comes in handy for running testcases against Future.

Let's consider two Futures, the original oneFuture and a oneDangerousFuture which throws an Exception.

Code

Here's how they look :

 val oneFuture: Future[Int] = Future {
    Thread.sleep(1000)
    1
  }

  val oneDangerousFuture=Future{
    Thread.sleep(2000)
    throw new SomeComputationException("Welcome to the Dark side !")
  }

  case class SomeComputationException(msg: String) extends Exception(msg)
Testcases

We have three testcases here :

  1. The first one is our happy day scenario - The computation takes 1 second, we wait for result for a maximum duration of 2 seconds. We obviously will have our result in hand. Our assertion that a value should be returned becomes true.

  2. In the second testcase, we spawn a Future that throws a SomeComputationException and we assert that the exception gets propagated to the caller when we await for the result.

  3. In the last testcase, we wait only for 500 milliseconds while the computation itself takes 1 second. As we saw from the implementation of the Await.result, this call throws a TimeoutException.

class PromisingFutureTest extends FunSpec with Matchers {

  describe("A PromisingFuture") {

    it("should hold a Int value if the Await.result is called after the Future completes") {
      val promisingFuture = new PromisingFutures()
      val oneFuture = promisingFuture.oneFuture //Takes 1 second to compute
      val intValue = Await.result(oneFuture, 2 seconds)
      intValue should be(1)
    }

    it("should propagate the Exception to the callee if the computation threw an exception") {
      val promisingFuture = new PromisingFutures()
      val oneDangerousFuture = promisingFuture.oneDangerousFuture //throws exception
      intercept[SomeComputationException] {
        val intValue = Await.result(oneDangerousFuture, 2 seconds)
      }
    }

    it("should throw a TimeOutException exception when an Await.result's atMost parameter is lesser than the time taken for the Future to complete") {
      val promisingFuture = new PromisingFutures()
      val oneDelayedFuture = promisingFuture.oneFuture //Takes 1 second to compute
      intercept[TimeoutException] {
        Await.result(oneDelayedFuture, 500 millis)
      }
    }
  }

}


2. Callback :

The alternative and the clean way to extract a value from the Future (other than composing) is by way of callbacks. There are three different callbacks available on the Future - the onSuccess, the onFailure and the combined onComplete.

  1. The onSuccess callback gets called only when the Future completes successfully with a result.
  2. The onFailure callback gets called only when there is an Exception.
  3. The onComplete is a combination of onSuccess and onFailure. It accepts a function that works on a Try[T] after getting the Option around the Future's result unwrapped.
def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit

Note that all callbacks return a Unit which means that they can't be composed and are side-effecting.

Now let's see how we could use the onComplete callback. I have this little method called printFuture which just writes to the console the contents of the Future once it is complete. Let's try to pass in both the oneFuture and the oneDangerousFuture into it.

class PromisingFutures {
...
...
def printFuture[T](future: Future[T]): Unit = future.onComplete {
    case Success(result) => println(s"Success $result")
    case Failure(throwable) => println(s"Failure $throwable")
}
...


object PromisingFutures{

 def main(args: Array[String]) {
    val promisingFutures=new PromisingFutures
    promisingFutures.printFuture(promisingFutures.oneFuture)
    promisingFutures.printFuture(promisingFutures.oneDangerousFuture)

    synchronized(wait(3000))
  }
}

Output :

Success 1
Failure SomeComputationException: Welcome to the Dark side !

As expected, the oneFuture goes into the Success case and yields 1 while the oneDangerousFuture goes into the Failure case and prints the Exception :

TimeUnit

Scala provides a very convenient DSL-like syntax to represent TimeUnit eg. 2 seconds, 5 minutes etc. Three implicit classes scala.concurrent.duration package - DurationInt, DurationLong and DurationDouble and the trait DurationConversions do this magic. All we need to do is to import scala.concurrent.duration._.

Also, the "500 millis" in our example could be represented as "500 milli", "500 milliseconds" or "500 millisecond". The entire list of aliases for various time units could be found in the sourcecode or Scaladoc of the trait DurationConversions.

Code

The complete code backing this blog is available in github


Addendum

@lightspeed7 pointed out that with Scala 2.12, the partial versions of onComplete, namely onSuccess and onFailure is deprecated. The deprecation is primarily to favor total versions such as onComplete and foreach in place of partials that handles either Success or Failure cases such as onSuccess and onFailure.

Let's explore a little more on Future.foreach with the following printing function.

def printWithForEach[T](future: Future[T]): Unit = future.foreach(println)

With foreach, the Future.foreach delegates itself to the Try.foreach. So, if the Future returns a Success, a Success(value) would be printed but for a Failure, nothing would be printed simply because the implementation of Failure goes like this :

def foreach[U](f: T => U): Unit = ()