Scala notes - Futures - 2 (Promises)
In the last post, we saw how to extract values from the Future upon onComplete and their counterparts - onSuccess and onFailure. We also saw how to use Await.result in Testcases to block and get the value from Future. In this post, we'll discuss briefly about the relationship between a Promise and a Future.
Promise
The concepts Promise and a Future go hand in hand. A scala.concurrent.Promise is the one which sets a value for the Future. In other words, the Promise is the brain behind executing the computation asynchronously and Future is just a handle for reading the result when it becomes available. Crudely put, the Promise is the setter and the Future is the getter.
Most often, we won't need to explicitly create a Promise. However, we are required to understand what a Promise is, in order to truly understand how a Future works.
Let's use the following examples to understand how to create a Promise.
1. Completing a Promise
In the following piece of code, we will see how a value is set in a promise and how it is read on the other side.
We
- create a
Promise completethePromiseby setting a successful value- then return the read side of the Promise - the
Futureback to the caller by using thepromise.future
There is no time consuming process happening behind the scenes. The value is set to the Promise immediately and therefore the value is immediately available via the Future.
Code
class PromiseInternals {
...
def aCompletedPromiseUsingSuccess(num:Int): Future[Int] = {
val promise=Promise[Int]()
promise.success(num)
promise.future
}
...
...
Testcase
When we run the Testcase code, the onComplete callback gets called immediately after the promise.success(100) is called.
class PromiseInternalsTest extends FunSpec with Matchers {
describe("A Future") {
it("gives out the correct value when a Promise is completed") {
val promiseInternals = new PromiseInternals
val aCompletedPromise=promiseInternals.aCompletedPromiseUsingSuccess(100)
assertValue(aCompletedPromise, 100)
}
...
...
def assertValueUsingOnComplete(future: Future[Int], expectedValue: Int): Unit = {
future.onComplete {
case Success(result) => {
println (s"Result is $result and expectedValue is $expectedValue")
result shouldBe expectedValue
}
case Failure (msg) => fail(msg)
}
}
The promise.success is just a shortcut for using promise.complete which accepts a Try[T] as an argument. So, we could have actually written the above function as :
def aCompletedPromiseUsingComplete(num:Int): Future[Int] = {
val promise=Promise[Int]()
promise.complete(Success(num))
promise.future
}
Alternatively, if we would like to indicate a failure in computation, we could either use a promise.complete(Failure(throwable)) or
def aCompletedPromiseUsingFailure(num:Int): Future[Int] = {
val promise=Promise[Int]()
promise.failure(new RuntimeException("Evil Exception"))
promise.future
}
Let's summarize the above in a picture :

2. Running a block asynchronously
Now that we saw how to complete a Promise by setting a successful value or an exception, we'll see how to execute a block of code asynchronously.
In the following Testcase, we pass a block of code to the someExternalDelayedCalculation to be executed asynchronously.
Testcase
Let's look at the testcase first.
- We pass a block as argument. The block of code simply sleeps for 2 seconds and then returns a 100.
- Assert the value after 3 seconds.
Simple enough.
it("gives out the correct value when an asynchronous block is submitted and is completed through a Promise") {
val promiseInternals = new PromiseInternals
val longCalculationFuture = promiseInternals.someExternalDelayedCalculation{()=>
Thread.sleep(2000)
100
}
println (s"We have submitted a block to be executed asynchronously ${longCalculationFuture.isCompleted}") //false at this point
assertValue(longCalculationFuture, 100)
}
def assertValue(future: Future[Int], expectedValue: Int): Unit = {
val resultVal=Await.result(future, 3000 seconds)
resultVal shouldBe expectedValue
}
Code
The implementation of the someExternalDelayedCalculation is interesting :
We
- create a
FixedThreadPoolto execute our asynchronous code. - create a Promise
- create a
Runnableand wrap the block to be run asynchronously in therunmethod - close the promise and complete the promise using the result of the
run - execute the
Runnablein thesomePoolthreadpool. - return the
promise.futurefrom which the caller can read the value.
val somePool=Executors.newFixedThreadPool(2)
def someExternalDelayedCalculation(f:()=>Int): Future[Int] = {
val promise=Promise[Int]()
val thisIsWhereWeCallSomeExternalComputation = new Runnable {
override def run(): Unit ={
promise.complete{
try(Success(f()))
catch {
case NonFatal (msg)=> Failure(msg)
}
}
}
}
somePool.execute(thisIsWhereWeCallSomeExternalComputation)
promise.future
}
That's it !!
3. How is the Future.apply() actually implemented?
Well, I cheated. The code in bullet 2 is actually stolen from the actual implementation of the Future.apply itself.
Remember in the previous post, we saw that when a block of code is passed into the Future's apply function, it gets executed asynchronously.
Now, compare the code above in someExternalDelayedCalculation with the actual implementation of Future.apply and the Runnable that it wraps.
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
val runnable = new PromiseCompletingRunnable(body)
executor.prepare.execute(runnable)
runnable.promise.future
}
class PromiseCompletingRunnable[T](body: => T) extends Runnable {
val promise = new Promise.DefaultPromise[T]()
override def run() = {
promise complete {
try Success(body) catch { case NonFatal(e) => Failure(e) }
}
}
}
To repeat the same steps as above, the apply function
- holds the
ThreadPoolthat we provide as the implicitExecutionContext - creates a
Promiseby creating aPromiseCompletingRunnablethat is aRunnable - wraps the block to be run asynchronously in the
runmethod - closes the promise and completes the promise using the result of the
run - executes the
Runnableusing theExecutionContext - returns the
promise.futurefrom which the caller can read the value.
4. Once written, twice error
Once the promise gets completed either with a Success or a Failure, all we could do after that is to extract the value from its Future. Also, the onComplete callback of the Future gets called. The value wrapped inside the Future of the Promise is set in stone and cannot be changed.
If we attempt to set a new value by completing an already completed Promise, an IllegalStateException is thrown.
Code
Let's look at this using a snippet. In the following code, we create a Promise and complete it with a value of 100. We then attempt to complete it with a failure.
def alreadyCompletedPromise(): Future[Int] = {
val promise = Promise[Int]()
promise.success(100) //completed
promise.failure(new RuntimeException("Will never be set because an IllegalStateException will be thrown beforehand"))
promise.future
}
Testcase
The testcase just asserts that the IllegalStateException gets thrown when an attempt to complete the Promise with a Failure.
it("should throw an error if a Promise is attempted to be completed more than once") {
val promiseInternals = new PromiseInternals
intercept[IllegalStateException] {
promiseInternals.alreadyCompletedPromise()
}
}
Code
The complete code backing this blog is available in github