Skip to content

Futures

Rohit edited this page Jan 19, 2017 · 4 revisions

Latency as an effect:

The Four Essential Effects In Programming

One Many
Synchronous T/Try[T] Iterable[T]
Asynchronous Future[T] Observable[T]

Future

Here we see Latency as an Effect. Eg. Sending a data packet from one place to another place takes time. This is nothing but latency in the program that we are executing. We want to check if we can create a Monad which would make the latency visible.

Here is a Monad that handles exceptions and latency.

import scala.concurrent._
import scala.concurrent.ExectutionContext.Implicits.global

trait Future[T] {
  // Takes in 2 callbacks, one When the function successfully completes, two when it fails
  def onComplete( success: (T => Unit), failed: (Throwable => Unit) ): Unit
  def onComplete( callback: Observer[T] ): Unit
}

trait Observer[T] {
  def onNext(value: T): Unit
  def onError(error: Throwable): Unit
}

usage:

trait Socket {
    def readFromMemory(): Future[Array[Byte]]
    def sendToEuropeFromUSA(packet: Array[Byte]): Future[Array[Byte]]
}

Here we can see that the functions are returning the Future type. So in readFromMemory() it means this computation will take a long time so it will return to future and what do I do with this future? I give it a callback that will be called once my future completes either with an exception or with a regular value. And similarly for send a package from the US to Europe, which took a really long time. So there also, the type becomes a future of array of bytes.

Combinators on Futures:

Sending Packets using Filters:

val socket = Socket()
val packet: Future[Array[Byte]] = socket.readFromMemory()
packet onComplete {
    case Success(p) => {
      val confirmation: Future[Array[Byte]] = socket.sendToEurope(p)
    }
    case Failure(t)  …
}

This can be re-written better using flatmap:

val socket = Socket()
val packet: Future[Array[Byte]] = socket.readFromMemory()
val confirmation: Future[Array[Byte]] = packet.flatMap(p => socket.sendToEurope(p))

Never block on an asynchronous operation.

Composing Futures

Implementations, not written/copied here.

Implementation of FlatMap on Futures

Let’s take a closer look at flatMap:

trait Future[T] {
    def onComplete(callback: Try[T] => Unit) = ...
    def flatMap[S](f: T => Future[S]): Future[S] = ???
}

How can we implement flatMap in terms of onComplete? Below is a simplified implementation. In fact, that implementation is almost automatic; all we need to do is follow the types.

trait Future[T] { self =>
  def flatMap[S](f: T => Future[S]): Future[S] = new Future[S] {
    def onComplete(callback: Try[S] => Unit): Unit = self onComplete {
      case Success(x) => f(x).onComplete(callback)
      case Failure(e) => callback(Failure(e))
    }
  }
} 

The actual implementation is somewhat more involved since it also has to handle thread scheduling.

Clone this wiki locally