Working with Futures

Back to home page

Contents

Function cheat sheet

Name Description
Map Transform the result of a Future
FlatMap Transform the result of a Future to another Future
For Comprehensions An easy way to use Map, FlatMap and Filter
Sequence Transform a collection of Futures to a Future of a collection
Fold Collapse a collection of Futures to a Future of a single value
Traverse A generalised version of Sequence
Recover/RecoverWith Recover from fatal exceptions
Filter Only succeed if the result matches a predicate
Collect Transform the successful result of a filter
Transform Most powerful method on a Future - use if all else fails
OnComplete/OnSuccess/OnFailure For callbacks from Futures

Code examples

Asynchronous programming introduction

A simple synchronous application will run all commands one after the other on the main app thread.

This is okay in simple cases but:

Asynchronous programming allows us to get around these issues by allowing multiple operations to be run in parallel

Examples of when asynchronous programming can be useful:

Traditionally writing asynchronous code has been tedious and difficult - which is why we have Futures!

Using Futures

In Scala a Future is an abstraction that makes it easy to write async code.

Any block of code can be made to run asynchronously by wrapping it in a Future. For example:

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global
 
scala> import scala.concurrent.Future
import scala.concurrent.Future
 
scala> val fut = Future{ 21 - 1 }
fut: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> fut.value
res1: Option[scala.util.Try[Int]] = Some(Success(20))

A few things to note:

There may be some situations you want to construct a Future that has already been completed (mainly in testing). This can be done with the Future.successful and Future.failed methods:

scala> Future.successful(21+1).value
res8: Option[scala.util.Try[Int]] = Some(Success(22))
 
scala> Future.failed(new Throwable("Test")).value
res9: Option[scala.util.Try[Nothing]] = Some(Failure(java.lang.Throwable: Test))

This has the same result as just using Future but is slightly more efficient since it doesn’t have to manage the asynchronous calculation. (Note the second case didn’t throw a fatal exception. See Error handling)

Awaiting

In general you should never have an Await.result (or Await.ready) in your production code (it actually says so in the documentation).

Doing this will block the main thread to wait for the result of the Future, defeating the point of using it in the first place.

Even if you have a good reason to there is usually a better way. Most simple cases should hopefully be covered in here somewhere.

Map

In most cases we will want to do something with the result of the future. Using a map makes it possible to do this without having to block and wait for the result of the Future

Map has the following signature:

 def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S]

So for a map all we need is to provide a function f: T => S. Here’s a simple example:

scala> def double(x: Int): Int = x*2
double: (x: Int)Int
 
scala> fut
res2: scala.concurrent.Future[Int] = Future(Success(20))
 
scala> fut2 = fut.map(double)
res3: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> fut2.value
res4: Option[scala.util.Try[Int]] = Some(Success(40))
 
scala> val fut3 = fut2.map(res => "Result:" + res.toString)
fut3: scala.concurrent.Future[String] = Future(<not completed>)
 
scala> fut3.value
res5: Option[scala.util.Try[String]] = Some(Success(Result:40))

Mapping will actually return a new Future containing the result of mapping on the first.

FlatMap

What if we want to run one bit of asynchronous code after another. To chain two methods returning Futures you can use a flatMap.

FlatMap has the following signature:

 def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S]

Which is almost exactly the same as map except the function f returns a Future[S] instead of just S. Here’s another example:

scala> def doubleAsync(x: Int): Future[Int] = Future{x*2}
doubleAsync: (x: Int)scala.concurrent.Future[Int]
 
scala> val fut1 = Future{21 + 1}
fut1: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> val fut2 = fut1.flatMap(doubleAsync)
fut2: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> fut2.value
res6: Option[scala.util.Try[Int]] = Some(Success(44))

Map and FlatMap can be combined to create more complex methods:

scala> val fut1 = Future{21 + 1}
fut1: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> val fut2 = Future{31 + 2}
fut2: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> val res = fut1.flatMap{ res1 => fut2.map { res2 => res1 + res2 } }
res: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> res.value
res7: Option[scala.util.Try[Int]] = Some(Success(55))

For-Comprehensions

Map and FlatMap are great, but they can get messy to use in complex flows. For example:

scala> val fut9 = Future(21)
fut9: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> val fut10 = Future(32)
fut10: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> val fut11 = Future(43)
fut11: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> val result = fut9.flatMap { res1 =>
     |   fut10.flatMap { res2 =>
     |     fut11.map { res3 =>
     |       (res1 + res2) * res3
     |     }
     |   }
     | }
result: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> result.value
res23: Option[scala.util.Try[Int]] = Some(Success(2279))

This can be cleaned up using a for-comprehension:

scala> val result2 = for {
     |   res1 <- fut9
     |   res2 <- fut10
     |   res3 <- fut11
     | } yield {
     |   (res1 + res2) * res3
     | }
result2: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> result2.value
res24: Option[scala.util.Try[Int]] = Some(Success(2279))

Internally this will “flatten” the Futures using map and flatMap as appropriate, but can be much cleaner and easier to read.

Important note:

In this example the 3 Futures were defined before the for-comprehension.

This means a thread will start computing the result(s) as soon as each value is assigned and all 3 will run in parallel.

If instead they were defined inside the for-comprehension they would actually be run in series (i.e. work wouldn’t start on computing fut10 until fut9 had finished and so-on). The same applies to using map and flatMap.

If one Future depends on the result of another they will be run in series automatically so you don’t need to worry. For example:

scala> val result3 = for {
     |   res1 <- fut11
     |   res2 <- doubleAsync(res1)
     | } yield {
     |   res2
     | }
result3: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> result3.value
res25: Option[scala.util.Try[Int]] = Some(Success(86))

would always wait for fut11 to complete before computing doubleAsync(res1) wherever it was defined.

Combining collections of Futures

Quite often you can end up dealing with a lot of Futures running at the same time. To keep track of them all Scala has a few functions to operate on entire collections of Futures

Sequence

By far the most common (in my experience anyway) is Future.sequence. This lets you transform a collection of Futures into a Future of a collection, which is usually much easier to work with.

scala> val seqOfFutures: Seq[Future[Int]] = Seq(
     |   Future(11 + 2),
     |   Future(22 + 3),
     |   Future(33 + 4),
     |   Future(44 + 5)
     | )
seqOfFutures: Seq[scala.concurrent.Future[Int]] = List(Future(Success(13)), Future(Success(25)), Future(Success(37)), Future(Success(49)))
 
scala> val result4 = Future.sequence(seqOfFutures)
result4: scala.concurrent.Future[Seq[Int]] = Future(<not completed>)
 
scala> result4.value
res26: Option[scala.util.Try[Seq[Int]]] = Some(Success(List(13, 25, 37, 49)))

This will evaluate every Future in the collection at the same time on different threads (or at least as many as it can) and return a Future containing the result of them all in a collection. If any of them fail the whole Future will fail.

Fold

The Future.fold method will collapse or “fold” a collection of Futures down to a single Future

scala> val result5 = Future.fold(seqOfFutures)(10) { (x, y) =>
     |   x + y
     | }
result5: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> result5.value
res27: Option[scala.util.Try[Int]] = Some(Success(134))

This will run all Futures serially, using the result of one to feed into the next.

Note that this is deprecated for foldLeft in Scala-2.12. It works basically the same but requires an immutable collection

scala> val seqOfFutures2: immutable.Iterable[Future[Int]] = scala.collection.immutable.Iterable(
     |   Future(11 + 2),
     |   Future(22 + 3),
     |   Future(33 + 4),
     |   Future(44 + 5)
     | )
seqOfFutures2: scala.collection.immutable.Iterable[scala.concurrent.Future[Int]] = List(Future(Success(13)), Future(Success(25)), Future(Success(37)), Future(Success(49)))
 
scala> val result6 = Future.foldLeft(seqOfFutures2)(10){ (x, y) =>
     |   x + y
     | }
result6: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> result6.value
res30: Option[scala.util.Try[Int]] = Some(Success(134))

Traverse

This one is less likely to be useful, but is basically a generalised Future.sequence. The difference is you start with a collection of whatever you like and give a function to turn each into a Future. Then it sequences the result:

scala> val seqOfInt = Seq(
     |   11 + 2,
     |   22 + 3,
     |   33 + 4,
     |   44 + 5
     | )
seqOfInt: Seq[Int] = List(13, 25, 37, 49)
 
scala> val result7 = Future.traverse(seqOfInt)(x => Future.apply(x))
result7: scala.concurrent.Future[Seq[Int]] = Future(<not completed>)
 
scala> result7.value
res31: Option[scala.util.Try[Seq[Int]]] = Some(Success(List(13, 25, 37, 49)))

Note that result4 (the sequence) is the same as result7 (the traverse). That’s because:

Future.sequence(seqOfFutures) === Future.traverse(seqOfFutures)(identity)

Other methods on Futures

Filter

This will check the result of a Future, either returning a successful result as-is if it matches the filter, or throwning a NoSuchElementException otherwise.

scala> val anotherFuture = Future(22 + 10)
anotherFuture: scala.concurrent.Future[Int] = Future(Success(32))
 
scala> val result9 = anotherFuture.filter(_ >= 0)
result9: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> result9.value
res0: Option[scala.util.Try[Int]] = Some(Success(32))
 
scala> val result10 = anotherFuture.filter(_ < 0)
result10: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> result10.value
res1: Option[scala.util.Try[Int]] = Some(Failure(java.util.NoSuchElementException: Future.filter predicate is not satisfied))

Since Futures have a withFilter method defined this will also work in for-comprehensions:

scala> val result11 = for {
     |   res <- anotherFuture if res > 0
     | } yield {
     |   res
     | }
result11: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> result11.value
res2: Option[scala.util.Try[Int]] = Some(Success(32))

Though in practice I haven’t really found a use for filter in either form since there are so many other ways to handle this.

Collect

Collect applies a partial function to the result of the Future, either returning the result if defined or again returning a NoSuchElementException otherwise.

This is basically a filter with a map on the positive case

scala> val result12 = anotherFuture.collect {
     |   case x if x > 0 => 2 * x
     | }
result12: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> result12.value
res3: Option[scala.util.Try[Int]] = Some(Success(64))
 
scala> val result13 = anotherFuture.collect {
     |   case x if x < 0 => 2 * x
     | }
result13: scala.concurrent.Future[Int] = Future(Failure(java.util.NoSuchElementException: Future.collect partial function is not defined at: 32))
 
scala> result13.value
res4: Option[scala.util.Try[Int]] = Some(Failure(java.util.NoSuchElementException: Future.collect partial function is not defined at: 32))

Transform

Transform is the generalised version of pretty much every other method on Futures. You can use it to transform results, filter, change Success to Failure or vice-versa… Here’s a slightly more complex example to show this off:

scala> val result14 = anotherFuture.transform {
     |   case Success(x) if x < 0 => Failure(new Throwable(s"$x must not be negative"))
     |   case Success(x) => Success(x * 2)
     |   case e: Failure[Int] => Success(s"Caught an error ${e.exception.getMessage}")
     | }
result14: scala.concurrent.Future[Any] = Future(<not completed>)
 
scala> result14.value
res6: Option[scala.util.Try[Any]] = Some(Success(64))

As a rule don’t use this - there is probably a less powerful method that does what you want. But if writing something with simpler functions is complicated or messy transform might be the way to go.

Callbacks

You can also register callbacks from Futures using onComplete, onSuccess and onFailure. These can be useful in some use cases but should generally be avoided in functional code.

scala> var caller: String = null
caller: String = null
 
scala> val called = Future { 11 + 12 }
called: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> called.onComplete(_ => caller = "Done")
 
scala> called.value
res8: Option[scala.util.Try[Int]] = Some(Success(23))
 
scala> caller
res9: String = Done

One important thing to remember about callbacks is there is no guarentee on the ordering of callbacks on a single Future (this is why the interface was designed to return Unit so they couldn’t be easily chained).

If you need to order multiple callbacks make sure to carefully create a new Future that runs after the first, though in general I’d say just avoid this situation and write the code another way.

Error handling

If a fatal error occurs while a Future is running it won’t just throw an exception (mainly because if it was on another thread we’d never see it). Instead it will return a Failure, which can then be handled to get back to a Success state.

You can do this with the recover method, or recoverWith if you want to recover with another Future:

scala> val failedFuture = Future { 11 / 0 }
failedFuture: scala.concurrent.Future[Int] = Future(<not completed>)
 
scala> val result8 = failedFuture.recover {
     |   case _: ArithmeticException => "Cannot divide by 0"
     |   case _ => "Help!"
     | }
result8: scala.concurrent.Future[Any] = Future(<not completed>)
 
scala> result8.value
res1: Option[scala.util.Try[Any]] = Some(Success(Cannot divide by 0))

Testing with futures

Awaiting the result

The standard and easiest way to test async code is to Await the result. After awaiting the completion of a future this is just like a normal test.

val testFuture: Future[Int] = Future.successful(32)

Await.result(testFuture, 10 seconds) mustBe 32

Async testing with ScalaTest

As of version 3 ScalaTest now supports testing Future code. You can enable it by importing the Async version of your favourite test spec. E.g AsyncWordSpec. Using that you can test Futures using map:

val testFuture: Future[Int] = Future.successful(32)

testFuture.map(_ mustBe 32)

The Async test suites also work for synchronous tests. As far as I know theres no performance benefit to this, just neater test code.

If you use this be careful to only have one assertion per test. In a “normal” test you can get away with multiple mustBe/shouldBe/assert blocks, but with the Async test suites only the last test will run!

Something else to be aware of is that ScalaTests Async suites use a custom serial execution context. This shouldn’t really matter, but I’ve had issues in some tests where Futures don’t complete because of this. It’s easily fixed by explicitly passing the usual execution context to the class being tested.

Execution contexts and thread pools

When working with Futures you generally don’t have to think about the threads your code is running on. This is because it is managed by an ExecutionContext.

The ExecutionContext (EC) tells any Future using it which threads to run on. In general the default Scala EC is fine

import scala.concurrent.ExecutionContext.Implicits.global

When using the Play! framework you should instead use the default Play! EC (and it should be injected using the DI framework).

The only common reason to use a custom EC would be for a very long computation, as the Scala default is not designed for this.

Other notes

References