Scala Notes - Futures - 1
Almost all modern programming languages have a Future-Promise idiom for concurrent programming. I don't intend to bore you with why we need higher level of concurrency abstractions. Instead, in this post, we'll cut to the chase and discuss only about Scala's approach to Futures.
A scala.concurrent.Future
is a representation of a value that is yet to be realized. This value is generally a result of a longer and/or parallel computation.
In crude terms, a block of code which runs synchronously, when wrapped in a Future
runs asynchronously. All implementations of Future
gives us a handle through which we could can get retrieve the resulting value of the computation of the block (or what's the point in it!).
In this post, we'll look at the basics of how to construct a Future
and extract the value out of it through blocking wait and callbacks. In the next part, we'll talk about composing Futures and other advanced constructs such as recover
and fallback
for exception handling.
Creating an asynchronous computation
Creating a computation that runs asynchronously using Future
is super easy. We just need to throw in our logic into the apply
function of the Future
val aFuture: Future[Int] = Future {
//Some massively huge super important computation
}
As an example, let's create a oneFuture
that returns a 1 after a delay of one second.
val oneFuture: Future[Int] = Future {
Thread.sleep(1000)
1
}
Let's pause and explore the apply
function
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T]
Yup. You got me there. The method also accepts something called an ExecutionContext
as an implicit argument. When a block of code is used to construct a Future, the computation runs on an ExecutionContext
. The ExecutionContext
is synonymous with a ThreadPool
and when a Future is started, it runs on a separate Thread.
Scala provides a ready-made static global ExecutionContext in scala.concurrent.ExecutionContext.global
which we plan to use now. (More on this in a moment)
So, the entire code actually looks like this
class PromisingFutures{
import scala.concurrent.ExecutionContext.Implicits.global
val oneFuture: Future[Int] = Future {
Thread.sleep(1000)
1
}
...
...
On using scala.concurrent.ExecutionContext.global
The global
ExecutionContext is very convenient to use. However, the underlying ThreadPool is a ForkJoinPool
. Nothing wrong with this and ForkJoinPool is amazing for short-lived computations but is strongly not recommended for Blocking IO such as database or webservice calls (or for that sake even long running computations or simply talking outside the JVM).
The good news is that we still can use Future but the workaround is to simply use a separate threadpool that is not ForkJoinPool
- a fixed ThreadPool, for example, is a good option.
implicit lazy val fixedThreadPoolExecutionContext: ExecutionContext = {
val fixedThreadPool: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors * 2) //or some fixed number
ExecutionContext.fromExecutor(fixedThreadPool)
}
Future States and values
Before we go into the extracting the values out of Future, let's see what are the various states of Future and when and what value is actually available during those states.
The Future just has two states - Not Completed and Completed.
Consider the following image
When given a computation, the resulting value that we could get from the Future is an Option[Try[T]]
.
- In Not Complete state, the result of the computation is not yet realized. The value therefore would be a
None
. - Once Completed, the result is a
Some(Try[T])
which means it could be one of the following :- A positive outcome of the computation or
- An Exception
Let's look at the Success
case with an example
def checkState(): Unit = {
println("Before the job finishes")
Thread.sleep(500)
println(s"Completed : ${oneFuture.isCompleted}, Value : ${oneFuture.value}")
println("After the job finishes")
Thread.sleep(1100)
println(s"Completed : ${oneFuture.isCompleted}, Value : ${oneFuture.value}")
}
The time taken for the delayedStringFuture Future to complete is 1 seconds. So, we first check after 500 milliseconds, whether the Future is complete and print its current value. The value
function of the Future
returns an Option[Try[T]]
. Not surprisingly, we get a false for the isCompleted
check and a None
as the value itself.
Output for 500 ms:
Before the job finishes
Completed : false, Value : None
Let's check back again after 1100 milliseconds giving some leeway after that 1000 millisecond sleep. The output now is the result of the computation itself and the completion status is true now..
Output for 1100 ms:
After the job finishes
Completed : true, Value : Some(Success(1))
Now that we've got that out of our way, let's see how to extract a value out of a Future
Extracting value out of Future
Other than composing futures, which we'll be seeing next week, there are two ways to extract just the value out of a Future - Blocking wait and callback
1. Blocking wait using Await.result :
The scala.concurrent.Await
's result function has the following syntax :
@throws(classOf[Exception])
def result[T](awaitable: Awaitable[T], atMost: Duration): T =
blocking(awaitable.result(atMost)(AwaitPermission))
It accepts an implementation of the Awaitable
trait, which a Future
is. It also accepts a second parameter atMost
which indicates the maximum time duration the caller thread has to block for the result. Upon expiry of the atMost
duration, if the Future still didn't complete, then a java.util.concurrent.TimeoutException
would be thrown.
If the Future is complete, Await.result will extract us the actual value. If the Future is complete and if the Future's result is a Throwable, the Exception gets propagated to the caller.
Using Await.result
in production code is highly discouraged but this construct comes in handy for running testcases against Future.
Let's consider two Futures, the original oneFuture
and a oneDangerousFuture
which throws an Exception.
Code
Here's how they look :
val oneFuture: Future[Int] = Future {
Thread.sleep(1000)
1
}
val oneDangerousFuture=Future{
Thread.sleep(2000)
throw new SomeComputationException("Welcome to the Dark side !")
}
case class SomeComputationException(msg: String) extends Exception(msg)
Testcases
We have three testcases here :
-
The first one is our happy day scenario - The computation takes 1 second, we wait for result for a maximum duration of 2 seconds. We obviously will have our result in hand. Our assertion that a value should be returned becomes true.
-
In the second testcase, we spawn a Future that throws a
SomeComputationException
and we assert that the exception gets propagated to the caller when we await for the result. -
In the last testcase, we wait only for 500 milliseconds while the computation itself takes 1 second. As we saw from the implementation of the
Await.result
, this call throws aTimeoutException
.
class PromisingFutureTest extends FunSpec with Matchers {
describe("A PromisingFuture") {
it("should hold a Int value if the Await.result is called after the Future completes") {
val promisingFuture = new PromisingFutures()
val oneFuture = promisingFuture.oneFuture //Takes 1 second to compute
val intValue = Await.result(oneFuture, 2 seconds)
intValue should be(1)
}
it("should propagate the Exception to the callee if the computation threw an exception") {
val promisingFuture = new PromisingFutures()
val oneDangerousFuture = promisingFuture.oneDangerousFuture //throws exception
intercept[SomeComputationException] {
val intValue = Await.result(oneDangerousFuture, 2 seconds)
}
}
it("should throw a TimeOutException exception when an Await.result's atMost parameter is lesser than the time taken for the Future to complete") {
val promisingFuture = new PromisingFutures()
val oneDelayedFuture = promisingFuture.oneFuture //Takes 1 second to compute
intercept[TimeoutException] {
Await.result(oneDelayedFuture, 500 millis)
}
}
}
}
2. Callback :
The alternative and the clean way to extract a value from the Future
(other than composing) is by way of callbacks. There are three different callbacks available on the Future - the onSuccess
, the onFailure
and the combined onComplete
.
- The
onSuccess
callback gets called only when the Future completes successfully with a result. - The
onFailure
callback gets called only when there is an Exception. - The
onComplete
is a combination ofonSuccess
andonFailure
. It accepts a function that works on aTry[T]
after getting theOption
around the Future's result unwrapped.
def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit
Note that all callbacks return a Unit
which means that they can't be composed and are side-effecting.
Now let's see how we could use the onComplete
callback. I have this little method called printFuture
which just writes to the console the contents of the Future once it is complete. Let's try to pass in both the oneFuture
and the oneDangerousFuture
into it.
class PromisingFutures {
...
...
def printFuture[T](future: Future[T]): Unit = future.onComplete {
case Success(result) => println(s"Success $result")
case Failure(throwable) => println(s"Failure $throwable")
}
...
object PromisingFutures{
def main(args: Array[String]) {
val promisingFutures=new PromisingFutures
promisingFutures.printFuture(promisingFutures.oneFuture)
promisingFutures.printFuture(promisingFutures.oneDangerousFuture)
synchronized(wait(3000))
}
}
Output :
Success 1
Failure SomeComputationException: Welcome to the Dark side !
As expected, the oneFuture
goes into the Success
case and yields 1 while the oneDangerousFuture
goes into the Failure
case and prints the Exception :
TimeUnit
Scala provides a very convenient DSL-like syntax to represent TimeUnit eg. 2 seconds, 5 minutes etc. Three implicit classes scala.concurrent.duration
package - DurationInt
, DurationLong
and DurationDouble
and the trait DurationConversions
do this magic. All we need to do is to import scala.concurrent.duration._
.
Also, the "500 millis" in our example could be represented as "500 milli", "500 milliseconds" or "500 millisecond". The entire list of aliases for various time units could be found in the sourcecode or Scaladoc of the trait DurationConversions.
Code
The complete code backing this blog is available in github
Addendum
@lightspeed7 pointed out that with Scala 2.12, the partial versions of onComplete
, namely onSuccess
and onFailure
is deprecated. The deprecation is primarily to favor total versions such as onComplete
and foreach
in place of partials that handles either Success or Failure cases such as onSuccess
and onFailure
.
Let's explore a little more on Future.foreach
with the following printing function.
def printWithForEach[T](future: Future[T]): Unit = future.foreach(println)
With foreach
, the Future.foreach
delegates itself to the Try.foreach
. So, if the Future
returns a Success
, a Success(value)
would be printed but for a Failure
, nothing would be printed simply because the implementation of Failure
goes like this :
def foreach[U](f: T => U): Unit = ()