Practical Introduction to Akka Streaming

Akka Streaming is a streaming IO engine used to build high performance, fault tolerant and scalable streaming data services. In this post I will describe how you can implement some of the features included in Akka Streaming using only simple streams of integers and strings, although the true power of Akka streams only becomes apparent when we are consuming data from real sources such as Websockets, databases and files. Akka is available in Java and Scala, but I will be focusing on the Scala API in this post.

Building a new SBT Project

Simple Build Tool is the most used build tool of Scala developers, despite the name it is incredibly powerful with many advanced features. In this post, we will be using SBT to manage the dependency on Akka. Firstly we must specify the following directory structure:

.
├── build.sbt
└── src
    └── main
        └── scala

3 directories, 1 file

The file build.sbt will contain the information required by SBT to download the Akka Stream dependencies. The .scala source code will live in the scala directory.

Dependencies in SBT

First, we need to to specify some library dependencies in the file build.sbt:

name := "Akka-Stream-Example"
version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.14"

Now, in the terminal, navigate to the root directory of the project and run sbt. The dependencies will be downloaded automatically, and available to use in any source files.

Source

Source represents the start of an Akka stream, there are many methods for constructing streams from Source. For now, we will define a ticking stream of integers and investigate how we can transform and output this stream using Flows and Sinks respectively. Here is a Source which outputs a steady stream of 1s every second

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

object Streaming {
  implicit val system = ActorSystem("Streaming")
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val in = Source.tick(1.second, 1.second, 1)
}

Flow

A Flow is a data processing stage. We will define a data flow which takes in an integer, then doubles it.

val doubleFlow = Flow[Int].map(a => a * 2)

This Flow is reusable and can be joined on to any stream which emits an Int. The map function is an example of a higher-order function; a higher-order function accepts a function as an argument. The map function here is used to access the value held inside of the Flow, in this case an Int. The function passed as the argument to map is an anonymous (or lambda) function, it says we take the Int and multiply it by two, a type annotation is not needed on the value a as the compiler infers the type to be Int.

Sink

A Sink is an endpoint to a stream, we can use it to print to the console, write to a database or another external service. However only when the stream is materialized is the side effect in the Sink performed. Let’s define a simple sink which prints each element out on a new line

val out = Sink.foreach(println)

Putting it all together

Now we want to get our stream printing to the console, we must define a main method for the Streaming object connecting the Source to the Flow and finally to the Sink.

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Flow, Source}
import scala.concurrent.duration._

object Streaming {
  implicit val system = ActorSystem("Streaming")
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val in = Source.tick(1.second, 1.second, 1)
  val double_flow = Flow[Int].map(a => a * 2)
  val print_sink = Sink.foreach(println)

  def main(args: Array[String]) {
      in.
        via(double_flow).
        take(10).
        runWith(print_sink).
        onComplete(_ => system.terminate)
  }
}

You can now run this code block using by executing sbt run from the terminal, in the project directory root (where build.sbt lives). We should get a stream of twos emitting once every second. The function take will limit the amount of twos printed to the console and once the stream is exhausted the Akka system is shutdown using the function onComplete.

Graph DSL

Akka Streaming provides a domain specific language (DSL) to express stream processing pipelines using a graph. Here is another way to define the main method using a RunnableGraph:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

  in ~> double_flow ~> Flow[Int].take(10) ~> print_sink

  ClosedShape
})
  
graph.run()

The graph DSL requires a bit of work, namely defining a runnable graph and specifying that the graph is closed. This is because we can define partial graphs (a graph which isn’t connected to a Source, Sink or both) which compose with other graphs, handy if you want to reuse a block of processing.

In order to specify a partial graph with no connections, we use FlowShape(inlet, outlet). Let’s define a partial graph which takes in one stream a integers, splits them on a condition, performs some processing then sends them out:

val partial_graph = Flow.fromGraph(GraphDSL.create() { implicit builder =>
    val broadcast = builder.add(Broadcast[Int](2))
    val zip = builder.add(Zip[Int, Int]())

    broadcast.out(0) ~> Flow[Int].filter(_ % 2 == 0) ~> Flow[Int].map(_ / 2) ~> zip.in0
    broadcast.out(1) ~> Flow[Int].filter(_ % 2 != 0) ~> Flow[Int].map(_ * 2) ~> zip.in1

    FlowShape(broadcast.in, zip.out)
  })

This FlowShape is expecting a Source containing integers, in the first line of processing it checks if the items are even, then divides them by two. The second line of processing doubles all the even numbers. The two streams are then recombined using a zip. In order to materialize data through this processing stage, a Source of integers and a Sink must be connected.

Merging a stream

If we have two streams containing the same datatype, then we can merge these two streams into one:

Let’s reuse our stream of ones and merge it into the stream of twos we already have:

val merge_graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  val merge = builder.add(Merge[Int](2))
  
  in ~> double_flow ~> merge ~> print_sink
                in ~> merge
  ClosedShape
})

We add a Merge to the graph builder, Merge requires we specify the type of the stream elements we are merging and the number of stream sources we are merging. We then alter the stream processing flow to include a stream of ones and perform the merge.

In this case, if the stream of ones was publishing at a higher frequency than the other stream, we would have a stream with more ones than twos. ie:

1 1 1 2 1 1 1 2 …

Zipping a Stream

We can zip a stream just as we can zip collections in Scala. This results in a tuple, which can have heterogeneous types. Zip requires that both streams have an element available, so if one stream is publishing at a quicker rate than the others there will be buffering of those elements.

To illustrate this, we will build a continuous stream of natural numbers using the unfold function:

val naturalNumbers = Source.unfold(1)(a => Some(a + 1, a))

unfold is the dual of fold and is an Anamorphism. unfold starts with a seed value and applies a function to produce the next value in the stream, the result of this function evaluation is sent to the next evaluation and so on. Using unfold is a simple way to define a stream which depends on the previous value.

Now if we zip together a continuous source of ones which publishes every ten seconds, Zip will wait for both streams to have an element before publishing the tuple of both streams, guaranteeing order.

val in = Source.tick(1.second, 10.seconds, 1)

val zipStream = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
    val zip = builder.add(Zip[Int, Int])
    naturalNumbers ~> zip.in0
    in ~> zip.in1
    zip.out ~> out

    ClosedShape
  })

  def main(args: Array[String]): Unit = {
    zipStream.run(materializer)
  }

The output from this stream is: (1,1) (2,1) (3,1) …

Summary

We have covered a few ways to express simple streams using Akka Streaming. The real power of Akka streaming is when it is combined with file or connection handling. Streaming libraries can be used to process extremely large, or unbounded, data files using a bounded amount of computational power. This is useful when dealing with infinite sources of data, such as streaming data from Twitter.