Background Processing with Akka Cluster: ClusterSingletonManager

REST services are quite commonly used in scalable architectures because they are stateless. However, in the practical world, a service is rarely just a CRUD service: for example we could also have some background processing (i.e.: downloading/parsing files, scheduled processed, triggers, etc).
In this tutorial we demonstrate how to use an Akka SingletonClusterManager in a scalable architecture to perform some background operations only from one node, the Leader of our Akka Cluster.

All the code produced in this article can be found on GitHub.

Our Application

Let’s imagine we have a simple Akka Http REST API with one simple endpoint that given a ping request returns a pong response (for more information on how to create an api with Akka Http see this article):

// curl http://localhost:5000/ping >> pong
// RestInterface.scala
...

val routes: Route =
  path("ping") {
    get {
      complete("pong")
  }
}
...

An Akka Actor, called TickCounter, is also attached to our system to count ticks starting from zero. Its code is following:

// TickCounter.scala
package com.danielasfregola.akka.tutorials.actors

import akka.actor.Actor

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

class TickCounter extends Actor {

  case class Tick(n: Int)

  override def preStart = self ! Tick(0)

  def receive = {
    case Tick(t) =>
      println(s"${self.path} - Tick $t")
      context.system.scheduler.scheduleOnce(1 second, self, Tick(t+1))
  }

}

Our configuration file has only one dependency:

// build.sbt
name := "akka-tutorials"

version := "0.1"

organization := "com.danielasfregola"

scalaVersion := "2.11.5"

resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/")

libraryDependencies ++= {
  val AkkaVersion       = "2.4.2"
  Seq(
    "com.typesafe.akka" %% "akka-http-experimental" % AkkaVersion
  )
}

Also, our Main class looks as following:

// Main.scala
package com.danielasfregola.akka.tutorials

import akka.actor.{Props, ActorSystem}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.danielasfregola.akka.tutorials.actors.TickCounter

import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

object Main extends App with RestInterface {
  val config = ConfigFactory.load()
  val host = config.getString("http.host")
  val port = config.getInt("http.port")

  implicit val system = ActorSystem("akka-tutorials")
  implicit val materializer = ActorMaterializer()

  val tickCounter = system.actorOf(Props[TickCounter], "tick-counter")

  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout(10 seconds)

  val api = routes

  Http().bindAndHandle(handler = api, interface = host, port = port) map { binding =>
    println(s"REST interface bound to $host:$port") } recover { case ex =>
    println(s"REST interface could not bind to $host:$port", ex.getMessage)
  }

}

Our Goal

Our goal is to change our application so that when multiple instances of the same service are running, only one TickCounter is active.

We could set a flag to disable/enable the Actor. However, this is not ideal in a scalable ecosystem where every instance can potentially be removed. What happens if we remove the only node with the flag on? What if we deploy more than one service with the flag as active?

We could move the background process in a new service. However, for cases where the processing is not big enough to justify a new service, this could be quite expensive in terms of infrastructure and maintanance.

…or with some minor code changes we could setup a Akka Singleton Cluster Manager, get over with it and go to the pub early.

Akka SingletonClusterManager

In order to let our application use a SingletonClusterManager we just need to perform three simple operations:
– import the right dependency
– let our Cluster Manager know about our Actor
– set up the behaviour of the Akka Cluster

Obviously, this is only one way of setting up our Cluster. For more information on the other available configurations, please have a look at the official Akka documentation on Cluster Singleton.

Step 1: Update your dependencies

The SingletonClusterManager is part of the akka-cluster-tools package, so we need to add its dependency to our build.sbt file:

"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.2"

Step 2: Define the actors in your Cluster

In our Main we need to specify that our TickCounter Actor is part of our Cluster:

val tickCounter = {
  val singletonProps = ClusterSingletonManager.props(
    singletonProps = Props[TickCounter],
    terminationMessage = PoisonPill,
    settings = ClusterSingletonManagerSettings(system)
  )
  system.actorOf(singletonProps, "tick-counter-singleton")
}

The terminationMessage is sent to the Actor if the Cluster Manager needs to terminate it. Here we decided to keep it simple, so we just brutally kill the actor. Note that this can be customised to trigger a behaviour change rather than causing its termination.

Step 3: Configure the behaviour of your Cluster

Add the following configurations to your application.conf file to tell the Cluster how to behave:

// build.sbt
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = ${REMOTE_PORT}
    }
  }
}

akka.cluster {
  seed-nodes = [
    "akka.tcp://akka-tutorials@127.0.0.1:2551",
    "akka.tcp://akka-tutorials@127.0.0.1:2552",
    "akka.tcp://akka-tutorials@127.0.0.1:2553"
  ]
  min-nr-of-members = 1
  auto-down-unreachable-after = 30s
}

The akka.actor and akka.remote settings provide information on how to create and bind the node of the Cluster. In particular, akka.remote.netty.tcp defines our to reach the node by providing a hostname and port.

The akka.cluster configuration contains the following information:
seed-nodes: the seed nodes of the Cluster.
min-nr-of-members: the number of members needed before starting the Cluster.
auto-down-unreachable-after: the number of seconds after an unreachable node gets maked as Down and removed from the Cluster.

Note that auto-down-unreachable-after is a really sensible and dangerous setting that needs to be setup properly, in particular in production. From the Akka Documentation:

Be very careful when using Cluster Singleton together with Automatic Downing, since it allows the Cluster to split up into two separate clusters, which in turn will result in multiple Singletons being started, one in each separate Cluster!

Usage

Now that our Akka Cluster is set up, we are ready to use it.
Note that in our application, in our to start a service, we have decided to force two configurations at runtime: the port of the api and port of the akka node.

Let’s run the first instance of our service on port 5000 and the Akka node on port 2551:

> export PORT=5000
> export REMOTE_PORT=2551
> sbt run

In the logs we we see that the Cluster Manager declares the node as the Oldest of the Cluster and it starts the TickCounter Actor:

...
[INFO] [02/21/2016 16:14:24.312] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Node [akka.tcp://akka-tutorials@127.0.0.1:2551] is JOINING, roles []
[INFO] [02/21/2016 16:14:24.320] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is moving node [akka.tcp://akka-tutorials@127.0.0.1:2551] to [Up]
[INFO] [02/21/2016 16:14:24.337] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] Singleton manager starting singleton actor [akka://akka-tutorials/user/tick-counter-singleton/singleton]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 0
[INFO] [02/21/2016 16:14:24.340] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Oldest]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 1
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 2
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 3
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 4
...

Let’s run a second instance of our service on port 5001 and the node on port 2552:

> export PORT=5001
> export REMOTE_PORT=2552
> sbt run

In the logs of the second instance we see that the Cluster Manager declares the second node as Younger and it does not start its TickCounter Actor:

...
[INFO] [02/21/2016 16:18:31.343] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Welcome from [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:18:31.540] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Younger]
REST interface bound to 0.0.0.0:5001
...

If we stop the non-leader node (i.e.: the second one we started), the leader node acknowledges that the second node is unreachable: after some time, it marks it as down and it removes it from the Cluster:

[WARN] [02/21/2016 16:22:32.154] [akka-tutorials-akka.remote.default-remote-dispatcher-21] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-2] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
...
[WARN] [02/21/2016 16:22:36.287] [akka-tutorials-akka.actor.default-dispatcher-15] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/cluster/core/daemon] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://akka-tutorials@127.0.0.1:2552, status = Up)]
...
[WARN] [02/21/2016 16:22:37.349] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-3] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
...
[INFO] [02/21/2016 16:22:55.339] [akka-tutorials-akka.actor.default-dispatcher-18] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader can currently not perform its duties, reachability status: [akka.tcp://akka-tutorials@127.0.0.1:2551 -> akka.tcp://akka-tutorials@127.0.0.1:2552: Unreachable [Unreachable] (1)], member status: [akka.tcp://akka-tutorials@127.0.0.1:2551 Up seen=true, akka.tcp://akka-tutorials@127.0.0.1:2552 Up seen=false]
[WARN] [02/21/2016 16:22:55.346] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-6] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
[WARN] [02/21/2016 16:23:01.350] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-7] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
[INFO] [02/21/2016 16:23:06.305] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is auto-downing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552]
[INFO] [02/21/2016 16:23:06.306] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Marking unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552] as [Down]
...
[INFO] [02/21/2016 16:23:07.285] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader can perform its duties again
[INFO] [02/21/2016 16:23:07.287] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is removing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552]
[INFO] [02/21/2016 16:23:07.288] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] Member removed [akka.tcp://akka-tutorials@127.0.0.1:2552]
[WARN] [02/21/2016 16:23:07.292] [akka-tutorials-akka.remote.default-remote-dispatcher-21] [akka.remote.Remoting] Association to [akka.tcp://akka-tutorials@127.0.0.1:2552] having UID [-2037835353] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
...

When we start the second node again, the node is added back to the Cluster:

...
[INFO] [02/21/2016 16:28:49.560] [akka-tutorials-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Node [akka.tcp://akka-tutorials@127.0.0.1:2552] is JOINING, roles []
[INFO] [02/21/2016 16:28:50.293] [akka-tutorials-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is moving node [akka.tcp://akka-tutorials@127.0.0.1:2552] to [Up]
...

At last, we kill the Leader node and we see that after some time the second node is promoted as Leader:

REST interface bound to 0.0.0.0:5001
[INFO] [02/21/2016 16:28:51.192] [akka-tutorials-akka.actor.default-dispatcher-15] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Younger]
[WARN] [02/21/2016 16:32:01.276] [akka-tutorials-akka.remote.default-remote-dispatcher-6] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
[WARN] [02/21/2016 16:32:06.185] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/cluster/core/daemon] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://akka-tutorials@127.0.0.1:2551, status = Up)]
[WARN] [02/21/2016 16:32:06.674] [akka-tutorials-akka.remote.default-remote-dispatcher-5] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-2] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[WARN] [02/21/2016 16:32:12.677] [akka-tutorials-akka.remote.default-remote-dispatcher-6] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-3] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
...
[INFO] [02/21/2016 16:32:26.188] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader can currently not perform its duties, reachability status: [akka.tcp://akka-tutorials@127.0.0.1:2552 -> akka.tcp://akka-tutorials@127.0.0.1:2551: Unreachable [Unreachable] (1)], member status: [akka.tcp://akka-tutorials@127.0.0.1:2551 Up seen=false, akka.tcp://akka-tutorials@127.0.0.1:2552 Up seen=true]
...
[INFO] [02/21/2016 16:32:36.203] [akka-tutorials-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader is auto-downing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:36.204] [akka-tutorials-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Marking unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551] as [Down]
...
[INFO] [02/21/2016 16:32:37.182] [akka-tutorials-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader can perform its duties again
[INFO] [02/21/2016 16:32:37.189] [akka-tutorials-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader is removing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:37.192] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Previous oldest removed [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:37.193] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Younger observed OldestChanged: [None -> myself]
[WARN] [02/21/2016 16:32:37.194] [akka-tutorials-akka.remote.default-remote-dispatcher-5] [akka.remote.Remoting] Association to [akka.tcp://akka-tutorials@127.0.0.1:2551] having UID [676396303] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[INFO] [02/21/2016 16:32:37.195] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Singleton manager starting singleton actor [akka://akka-tutorials/user/tick-counter-singleton/singleton]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 0
[INFO] [02/21/2016 16:32:37.197] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Younger -> Oldest]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 1
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 2
....

Summary

In this article we have described how to use an Akka SingletonClusterManager to synchronise several instances of the same service. In particular, our goal was to configure an Akka Cluster of services so that some operations will run only from one service at the time (i.e.: the Leader of the Cluster).

All the code produced in this tutorial is on GitHub.

How to build a REST API with Akka Http

In previous articles we have discussed how to use Spray to build REST APIs. Since Akka 2.4, Spray is no longer supported and it is replaced by Akka Http.
This article will introduce Akka Http, the new shiny toy from the Akka Team, and provide a tutorial on how it can be used to create a simple REST API.

All the code produced in this tutorial can be found on GitHub.

Why Akka Http?

Spray has been the Akka way of building APIs for quite some time.
Although not directly built by the Akka Team, it was heavily using the Akka ecosystem: Spray is implemented with Akka Actors.
The project went so well that, after some time, the Akka Team decided to adopt it. After the Akka Team released Akka Streams, they realised that Spray’s performance could be improved by using Akka Streams together with Akka Actors.

Since Akka 2.4, Akka Http is the official Akka toolkit to create REST API, both client and server side.
Note that in Akka 2.4, Spray is no longer supported…so you are forced to migrate from Spray to Akka Http if you want to use any of the other latest tools of Akka, like Akka Persistance.

At the time of this writing, Akka Http is still an experimental module — but it has been declared stable for production. Also its performance is worse than Spray: the Akka Team has been focusing on its interface, but they have promised to massively improve its performance by Q1 2016.

Our CRUD Application

Previously, we described how to create a simple CRUD application with Spray.
In this article we will rewrite exactly the same application using Akka Http instead of Spray.

In particular, our application will create, retrieve, update, delete a Question entity.
A question has 3 fields (id, title, text) and its case class looks as following:

case class Question(id: String, title: String, text: String)

Also, to keep things simple, we are going to keep all the data in memory, rather than properly storing it in a database.
The class QuestionService simulates a persistent storage by keeping all the entities in a Vector.
The following code is the skeleton of the QuestionService class (more details on its implementation can be found here):

package com.danielasfregola.quiz.management.services

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import scala.concurrent.{ExecutionContext, Future}

class QuestionService(implicit val executionContext: ExecutionContext) {

  var questions = Vector.empty[Question]

  def createQuestion(question: Question): Future[Option[String]] = ...

  def getQuestion(id: String): Future[Option[Question]] = ...

  def updateQuestion(id: String, update: QuestionUpdate): Future[Option[Question]] = ...

  def deleteQuestion(id: String): Future[Unit] = ...

}

Last but not least, we will use json4s to (de)serialise a json into a case class (more information on how to use json4s can be found here).

Setup

The first step is to add the right dependencies to our project:

// build.sbt
...
resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
                  Resolver.bintrayRepo("hseeberger", "maven"))

libraryDependencies ++= {
  val AkkaVersion       = "2.3.9"
  val AkkaHttpVersion   = "2.0.1"
  val Json4sVersion     = "3.2.11"
  Seq(
    "com.typesafe.akka" %% "akka-slf4j"      % AkkaVersion,
    "com.typesafe.akka" %% "akka-http-experimental" % AkkaHttpVersion,
    "ch.qos.logback"    %  "logback-classic" % "1.1.2",
    "org.json4s"        %% "json4s-native"   % Json4sVersion,
    "org.json4s"        %% "json4s-ext"      % Json4sVersion,
    "de.heikoseeberger" %% "akka-http-json4s" % "1.4.2"
  )
}
...

Then, we need to bind our api to an host and port:

// Main.scala
package com.danielasfregola.quiz.management

import scala.concurrent.duration._
import akka.actor._
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.util.Timeout

import com.typesafe.config.ConfigFactory

object Main extends App with RestInterface {
  val config = ConfigFactory.load()
  val host = config.getString("http.host")
  val port = config.getInt("http.port")

  implicit val system = ActorSystem("quiz-management-service")
  implicit val materializer = ActorMaterializer()


  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout(10 seconds)

  val api = routes

  Http().bindAndHandle(handler = api, interface = host, port = port) map { binding =>
    println(s"REST interface bound to ${binding.localAddress}") } recover { case ex =>
    println(s"REST interface could not bind to $host:$port", ex.getMessage)
  }
}

Note that RestInterface is just a collection of routes and the services needed:

package com.danielasfregola.quiz.management

import scala.concurrent.ExecutionContext

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.resources.QuestionResource
import com.danielasfregola.quiz.management.services.QuestionService

trait RestInterface extends Resources {

  implicit def executionContext: ExecutionContext

  lazy val questionService = new QuestionService

  val routes: Route = questionRoutes

}

trait Resources extends QuestionResource

Question Resource

QuestionResource is a generic Resource:
– it has a service that performs some operations on the entity
– it has some routes (see later paragraphs of this article)
– it extends a generic Resource, called MyResource

Its skeleton is following:

// QuestionResource.scala
package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = ???

  }
}

MyResource is a trait where we add code that is common/useful for all the resources (the code can be found here).
In particular, it includes the json4s support to (de)serialise case classes and some helper methods that will make our akka-http routing easier.

Now that we have setup the skeleton of our application, we can focus on the implementation of our endpoints.

POST – Create a Question

Usage

The first task of our application is to define an endpoint to create a question entity.
According to the REST protocol, an entity is created through a POST request that should reply with a 201 (Created) HTTP status code. Also, a Location Header with the URI that identifies the location of the new entity should be returned.
Note that a POST request is non-idempotent: if the entity already exists or cannot be created, we should return an HTTP error status code.

For our questions application, this can be translated in the following curl command:

curl -v -H "Content-Type: application/json" \
   -X POST http://localhost:5000/questions \
   -d '{"id": "test", "title": "MyTitle", "text":"The text of my question"}'

The first time we make the request, we should get a reply similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
>
* upload completely sent off: 68 out of 68 bytes
 HTTP/1.1 201 Created
 Location: http://localhost:5000/questions/test
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:16:50 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

If we repeat the request again, we will get an HTTP response with a 409 (Conflict) status code as the entity already exists:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
>
* upload completely sent off: 68 out of 68 bytes
 HTTP/1.1 409 Conflict
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:17:07 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

Implementation

As described in the previous paragraph, when creating an entity we would like to provide the URI of the created entity in a Location header.
Our implementation is tailored to the behaviour of our system: when QuestionService creates a question, it returns a Future[Option[T]] and, if the returned option is not defined, we want to return a different HTTP status code.
Unfortunately, Akka Http doesn’t have a default implementation to achieve this, so we will have to create our own my adding to our MyResource trait:

// MyResource.scala
ppackage com.danielasfregola.quiz.management.routing

import akka.http.scaladsl.marshalling.{ToResponseMarshaller, ToResponseMarshallable}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{Directives, Route}

import com.danielasfregola.quiz.management.serializers.JsonSupport

trait MyResource extends Directives with JsonSupport {

  implicit def executionContext: ExecutionContext

  def completeWithLocationHeader[T](resourceId: Future[Option[T]], ifDefinedStatus: Int, ifEmptyStatus: Int): Route =
    onSuccess(resourceId) {
      case Some(t) => completeWithLocationHeader(ifDefinedStatus, t)
      case None => complete(ifEmptyStatus, None)
    }

  def completeWithLocationHeader[T](status: Int, resourceId: T): Route =
    extractRequestContext { requestContext =>
      val request = requestContext.request
      val location = request.uri.copy(path = request.uri.path / resourceId.toString)
      respondWithHeader(Location(location)) {
        complete(status, None)
      }
    }

  def complete[T: ToResponseMarshaller](resource: Future[Option[T]]): Route =
    onSuccess(resource) {
      case Some(t) => complete(ToResponseMarshallable(t))
      case None => complete(404, None)
    }

  def complete(resource: Future[Unit]): Route = onSuccess(resource) { complete(204, None) }

}

We can now put everything together and define the endpoint to create a question entity:

// QuestionResource.scala
package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    pathEnd {
      post {
        entity(as[Question]) { question =>
          completeWithLocationHeader(
            resourceId = questionService.createQuestion(question),
            ifDefinedStatus = 201, ifEmptyStatus = 409)
          }
        }
    } ~ ...

  }
}

GET – Retrieve a Question

Usage

Now that we have created a question, we can retrieve it by performing a GET request to the URI that identifies the entity (i.e.: the one returned in the Location Header).
The request should respond with either a 200 (OK) HTTP status code with a body containing the question entity or a 404 (NotFound) HTTP status code with empty body.

For example, we can get an existing question with the following curl command…

curl -v http://localhost:5000/questions/test

…and it should return something similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
>
 HTTP/1.1 200 OK
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:17:31 GMT
 Content-Type: application/json
 Content-Length: 64

* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"The text of my question"}

Moreover, if we request an entity that doesn’t exists…

curl -v http://localhost:5000/questions/non-existing-question

….we should get a 404 error code:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
>
 HTTP/1.1 404 Not Found
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:18:40 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

Implementation

QuestionService returns a Future[Option[Question]] when retrieving a question.
Differently from Spray, Akka Http doesn’t seem to complete optional values correctly: complete(Future(None)) returns an http response with code 200 and an empty body rather than a 404 http response — which in my opinion doesn’t make much sense, considering that 200 should have a non-empty body.

UPDATE:
The Akka team has discussed this issue on GitHub (see here for more information) and this does not seem to be a bug: apparently there are some case scenarios where complete(Future(None)) needs to be completed with something else rather than 404.
Thank you to @ktoso for looking into this!

Not a problem, we can add some *black magic* to our MyResource trait to make look the code exactly the same as before:

// MyResource.scala
package com.danielasfregola.quiz.management.routing

import akka.http.scaladsl.marshalling.{ToResponseMarshaller, ToResponseMarshallable}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{Directives, Route}

import com.danielasfregola.quiz.management.serializers.JsonSupport

trait MyResource extends Directives with JsonSupport {

  implicit def executionContext: ExecutionContext

  ...

  def complete[T: ToResponseMarshaller](resource: Future[Option[T]]): Route =
    onSuccess(resource) {
      case Some(t) => complete(ToResponseMarshallable(t))
      case None => complete(404, None)
    }

  ...
}

Thanks to our trick, our route now looks exactly the same as with Spray:

package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      get {
        complete(questionService.getQuestion(id))
      } ~
      ...
    }
  }

}

PUT – Update a Question

Usage

When updating an entity, we should use a PUT request. Also, we should send only the fields that we want to update, not the whole object. Not only this will make the usage of our API easier, but it will also reduce potential concurrency issues.
If the update goes through, we should get a HTTP response with a 200 (OK) status code with the updated entity in the body. On the other side, if the update was not possible, for example because the entity no longer exists, we should get a HTTP response with status 404 (NotFound) and an empty body.
Note that a PUT request is idempotent: performing the update multiple times should already return the same result.

In our application we can update the question entity with the following curl command…

curl -v -H "Content-Type: application/json" \
   -X PUT http://localhost:5000/questions/test \
   -d '{"text":"Another text"}'

….and get the following reply:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
>
* upload completely sent off: 23 out of 23 bytes
 HTTP/1.1 200 OK
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:19:31 GMT
 Content-Type: application/json
 Content-Length: 53

* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"Another text"}

If we try to update a resource that doesn’t exist, we should get a 404 response:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
>
* upload completely sent off: 23 out of 23 bytes
 HTTP/1.1 404 Not Found
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:20:07 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

Implementation

As explained in the previous section, we want the client of our API to send just the fields to update, not the whole entity. In order to achieve this, we will deserialise the body of our PUT request to the following case class:

case class QuestionUpdate(title: Option[String], text: Option[String])

Note that we decided not to allow our clients to update the field id, as it is used to locate the entity.

Keeping in mind that QuestionService returns Future[Option[Question]] when updating a question, we can reuse our “black magic” trick used in our GET route too make our code look nice:

package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route
import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      ... ~
      put {
        entity(as[QuestionUpdate]) { update =>
          complete(questionService.updateQuestion(id, update))
        }
      } ~ ...
    }
  }

}

DELETE – Delete a Question

Usage

Finally, we want to have an endpoint to delete a question entity. This can be achieved by sending a DELETE request to the URI that identifies the entity that should reply with a 204 (NoContent) status code once the operation has been completed.
Note that DELETE is idempotent, so deleting a resource that has been already deleted should still return an HTTP response with a 204 (NoContent) status code and an empty body.

For example, we can delete the question test with the following…

curl -v -X DELETE http://localhost:5000/questions/test

…and get the following result back:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> DELETE /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
>
 HTTP/1.1 204 No Content
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:20:30 GMT
 Content-Type: application/json

* Connection #0 to host localhost left intact

Implementation

When deleting a question, QuestionService returns Future[Unit].
Unfortunately, Akka Http resolves complete(Future(())) with an http response with code 200 and empty body — same as complete(Future(None))! Please, put a comment below if you know the rationale behind this design choice.

UPDATE:
Thank you again to @ktoso from the Akka Team for looking into this! Apparently this is an issue related to Json4s.
Here is the crystal clear explanation that @hseeberger has provided in the comments below:

What causes the issue is that (a) Json4s cannot marshal `AnyVal`s and (b) Json4s happily marshals `Future`. This leads to bypassing the `Future` marshaller from Akka HTTP.

Not a problem, we just need to add the following code to MyResource to make it resolve complete with a 204 http response with empty body:

// MyResource.scala
package com.danielasfregola.quiz.management.routing

import akka.http.scaladsl.marshalling.{ToResponseMarshaller, ToResponseMarshallable}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{Directives, Route}

import com.danielasfregola.quiz.management.serializers.JsonSupport

trait MyResource extends Directives with JsonSupport {

  implicit def executionContext: ExecutionContext

  ...

  def complete(resource: Future[Unit]): Route = onSuccess(resource) { complete(204, None) }

}

Our DELETE endpoint can now be implemented as following:

package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route
import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    .. ~
    path(Segment) { id =>
      ... ~
      delete {
        complete(questionService.deleteQuestion(id))
      }
    }
  }

}

Summary

In this article we have introduced Akka Http and we have provided a simple tutorial on how to create a simple CRUD application using Akka Http.

The completed code of this tutorial can be found on GitHub.

Thank to @ktoso and @hseeberger for clarifying some issues raised in this article!

Akka Dead Letters Channel

Akka doesn’t guarantee the delivery of a message. What happens when a message cannot be delivered? In this article we will describe how the Dead Letters Channel works and how it can be used to spot issues in our system.

How it works

In a previous article we have described the use of Event Streams in Akka. The Dead Letter Channel is nothing more that a special Event Stream that the system uses internally every time a message cannot be delivered: either because the message cannot be processed or delivered.

When Akka is redirecting the failed message to the Dead Letter actor, it wraps the message in a case class called Dead Letter to provide the message, the original sender and recipient:

case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)

Unless specified differently, dead letters are logged in the INFO level: more information on how to tweak your logging settings can be found here.

How to use it

Because the Dead Letter Channel is an Event Stream, we can subscribe to it and listen to all the messages it publishes.

The code used for this tutorial is available here.

First of all, let’s create a dummy actor, called EchoActor, that prints all the messages it receives:

 
class EchoActor extends Actor {
  
  def receive = {
    case msg => println(s"New msg received: $msg")
  }
  
}

The second step is to create our actor system: we will have two instance of EchoActor, one called deadLettersSubscriber that will listen for DeadLetters and the other, called echoActor, that will simply wait and receive messages.

  implicit val system = ActorSystem("dead-letters-usage-example")

  val deadLettersSubscriber = system.actorOf(Props[EchoActor], name = "dead-letters-subscriber")
  val echoActor = system.actorOf(Props[EchoActor], name = "generic-echo-actor")

  system.eventStream.subscribe(deadLettersSubscriber, classOf[DeadLetter])

When successfully sending a message, no dead letter is generated.

  echoActor ! "First Message"
  // generic-echo-actor - New msg received: First Message

However, when we try to send a message to an actor that has been killed, the message is successfully transformed into a DeadLetter.

  echoActor ! PoisonPill
  echoActor ! "Second Message"
  // dead-letters-subscriber - New msg received: DeadLetter(Second Message,Actor[akka://dead-letters-usage-example/deadLetters],Actor[akka://dead-letters-usage-example/user/generic-echo-actor#317003256])
  // INFO  [RepointableActorRef]: Message [java.lang.String] from Actor[akka://dead-letters-usage-example/deadLetters] to Actor[akka://dead-letters-usage-example/user/generic-echo-actor#317003256] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Finally, we can also send messages directly to the Dead Letter Actor: this is usually not advised as the Dead Letter Channel should be reserved for the system to redirect failed messages.

  system.deadLetters ! "Dead Message"
  // dead-letters-subscriber - New msg received: DeadLetter(Dead Message,Actor[akka://dead-letters-usage-example/deadLetters],Actor[akka://dead-letters-usage-example/deadLetters])
  // INFO  [DeadLetterActorRef]: Message [java.lang.String] from Actor[akka://dead-letters-usage-example/deadLetters] to Actor[akka://dead-letters-usage-example/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Summary

Akka redirects all the messages that couldn’t be delivered or process to the Dead Letter Channel. In this article we have discussed how Akka uses it and how we can exploit it when testing our system and investigating issues with our system.

Peer-to-Many Communication in Akka

The most common communication channel in Akka is Peer-to-Peer, where an individual actor sends a message directly to another individual actor actor. However, sometimes this is not enough as we may need a Peer-to-Many type of communication, where an individual actor sends a message to a group of actors. This is particularly useful when you need to model your system using a Publisher-Subscriber Pattern. This article will provide a quick tutorial on how to use and customise an Event Bus, the Akka way of implementing a Peer-to-Many communication channel.

Event Stream

Event Stream is the simplest and most common implementation of an Event Bus. It follows the classic Publisher-Subscriber Pattern: one system actor will publish a message and all the actors that subscribed to that specific message type will receive it.

Let’s see with a simple tutorial how easily this can be achieved: (gist available here).

In this tutorial, we want to model the following scenario: every time someone publishes a book, all the subscribers need to receive it.

First of all, we need to define what a book is:

case class Book(title: String, authors: List[String])

Then, we need to specify an Actor that acts as book publisher: every time it receives a book, it publishes it on the System Event Stream.

class BookPublisher extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      context.system.eventStream.publish(book)
    }
  }
  
}

Finally, let’s create an Actor that will subscribe to the System Event Stream for all messages of type Book. Note that the preStart function will be executed by Akka right after the creation of the Actor.

class BookSubscriber extends Actor {
  
  override def preStart = context.system.eventStream.subscribe(self, classOf[Book])
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Done! See, it wasn’t that bad… 🙂

Now we just need to play with our system to make sure it works as expected:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
 
  val author = "Author"
  
  val bookPublisher = system.actorOf(Props[BookPublisher], name = "book-publisher")
  
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1") 
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
  
  bookPublisher ! Book(title = "A book title", authors = List(author, "Another author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-2 and I have received a new book: Book(A book title,List(Author, Another author))
  
  system.eventStream.unsubscribe(subscriber2, classOf[Book])
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another author))
  // My name is subscriber-1 and I have received a new book: Book(Another book title,List(Another author))
} 

Note that instead of unsubscribing subscriber2 to all the messages of type Book, we could have also unsubscribed it to any type of messages by using system.eventStream.unsubscribe(subscriber2).

Custom Event Bus

Event Streams are really easy to use. However, they may not be that useful if we want to perform some filtering on the published message. One solution to our problem could be to do some filtering before publishing the message, but what if this cannot be done or simply we want to do it in a more elegant way?

An Event Stream is a specific implementation of a Event Bus trait: Akka gives us to opportunity to reuse this trait to create Custom Event Buses.

Assume that now a subscriber wants to receive books for a specific author — gist of the following code can be found here.

First of all, the BookSubscriber actor doesn’t need to automatically subscribe to the System Event Stream. Moreover, the BookPublisher actor now needs to publish on a given Event Bus, rather than the System Event Stream.

class BookPublisher(bus: AuthorBookBus) extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      bus.publish(book)
    }
  }
  
}
 
class BookSubscriber extends Actor {
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Finally, let’s define our AuthorBookBus that will filter books according to their authors.

class AuthorBookBus(author: String) extends EventBus
  with LookupClassification
  with ActorEventBus {
  
  type Event = Book
  type Classifier = Boolean
  
  override def mapSize = 2
  
  override def classify(book: Book) = book.authors.contains(author)
 
  override protected def publish(book: Book, subscriber: Subscriber)= subscriber ! book
} 

Our Event Bus accepts events of type Book and it filters books with two possible values: true if author is one of the authors of the book, false otherwise — and this is why override def mapSize = 2! The function classify is used to categories a book according to the Classifier type. Akka provides different type of classifiers (see here), in our case the LookupClassification was enough as we wanted a classification based on the event itself.

The following app shows an example of how our actor system could be used:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
  
  val author = "Author"
 
  val authorBookBus = new AuthorBookBus(author)
  val bookPublisher = system.actorOf(Props(new BookPublisher(authorBookBus)), name = "book-publisher")
 
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1")
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
 
  authorBookBus.subscribe(subscriber1, true)
  // i.e.: subscriber1 will receive all the books
  // where one of the authors is "Author"

  authorBookBus.subscribe(subscriber2, false)
  // i.e.: subscriber2 will receive all the books 
  // where "Author" is not an author

  bookPublisher ! Book(title = "A book title", authors = List(author, "Another Author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another Author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another Author))
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another Author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another Author))
  // My name is subscriber-2 and I have received a new book: Book(Another book title,List(Another Author))
}

Summary

Not only Akka allows Peer-to-Peer communication, but also Peer-to-Many, which is useful to implement a publisher-subscriber pattern. This article has described how this can be achieved using Event Streams for simplest case scenarios and Custom Event Buses when some classification on the event is needed.

How to Integrate ReactiveMongo in your Akka Spray Application

Scalability can be challenging when database access is needed: the common approach is to block the thread until a response is received. ReactiveMongo is a MongoDB Scala Driver that provides fully non-blocking asynchronous I/O operation that increases the scalability of your system.
In a previous post we have seen how to build a REST Api with Spray: in this article we will describe how to expand our application to integrate ReactiveMongo.

All the code produced in this tutorial can be found here.

Our Goal

Our goal is to create an application to manage quizzes. In particular, we want to:
– create a quiz
– delete a quiz
– get a random question
– get a question by id
– answer a question by id

Details on how we have chosen to implement the Rest Interface can be found here. In the following sessions we will analyse how ReactiveMongo can be used to store the quiz entity in our MongoDB database without any blocking operation.

Set Up

First, we need an instance of MongoDB: we can set it up one in our local machine (see MongoDB official website for instructions). Also, we need to include the ReactiveMongo library as part of our SBT dependencies and provide information on our MongoDB instance:

// file build.sbt
libraryDependencies ++= {
  ...
  Seq(
  	...
    "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",
    ...
  )
}
// file application.conf
...
mongodb {
  database = "quiz-management"
  servers = ["localhost:27017"]
}

Minor refactoring is needed to make our application a little bit more structured: the original QuizProtocol class has been split in two (QuizProtocol and QuestionProtocol) and moved to a new package called model.api.
Finally, we now let MongoDB generate the quiz id rather than asking the user to select one.

ReactiveMongo Integration

First step is to define our persistence model. Also, we need to provide instructions on how to serialise/deserialise our QuizEntity in MongoDB.

// file QuizEntity.scala
// note the package model.persistence to separate it from our model.api representations
package com.danielasfregola.quiz.management.model.persistence

import com.danielasfregola.quiz.management.model.api.QuizProtocol.Quiz
import reactivemongo.bson.{BSONDocumentWriter, BSONDocument, BSONDocumentReader, BSONObjectID}

case class QuizEntity(id: BSONObjectID = BSONObjectID.generate,
                      question: String, 
                      correctAnswer: String)

object QuizEntity {

  implicit def toQuizEntity(quiz: Quiz) = QuizEntity(question = quiz.question, correctAnswer = quiz.correctAnswer)

  implicit object QuizEntityBSONReader extends BSONDocumentReader[QuizEntity] {
    
    def read(doc: BSONDocument): QuizEntity = 
      QuizEntity(
        id = doc.getAs[BSONObjectID]("_id").get,
        question = doc.getAs[String]("question").get,
        correctAnswer = doc.getAs[String]("answer").get
      )
  }
  
  implicit object QuizEntityBSONWriter extends BSONDocumentWriter[QuizEntity] {
    def write(quizEntity: QuizEntity): BSONDocument =
      BSONDocument(
        "_id" -> quizEntity.id,
        "question" -> quizEntity.question,
        "answer" -> quizEntity.correctAnswer
      )
  }
}

Let’s create a trait, called MongoDao, that defines how we use our configuration to connect to our MongoDB instance:

// file MongoDao.scala
package com.danielasfregola.quiz.management.dao

import com.typesafe.config.ConfigFactory
import reactivemongo.api.MongoDriver

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global

trait MongoDao {

  val config = ConfigFactory.load()
  val database = config.getString("mongodb.database")
  val servers = config.getStringList("mongodb.servers").asScala

  val driver = new MongoDriver
  val connection = driver.connection(servers)

  val db = connection(database)
}

We now define our collection and the I/O operations that we can execute on it:

package com.danielasfregola.quiz.management.dao

import com.danielasfregola.quiz.management.model.persistance.QuizEntity
import reactivemongo.api.QueryOpts
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.{BSONDocument, BSONObjectID}
import reactivemongo.core.commands.Count

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

trait QuizDao extends MongoDao {
  
  import com.danielasfregola.quiz.management.model.persistance.QuizEntity._
  import com.danielasfregola.quiz.management.model.api.QuizProtocol._
  
  val collection = db[BSONCollection]("quizzes")

  // it creates a new quiz entity
  def save(quizEntity: QuizEntity) = collection.save(quizEntity)
    .map(_ => QuizCreated(quizEntity.id.stringify))
  
  // it finds a question by id
  def findById(id: String) =
    collection.find(queryById(id)).one[QuizEntity]
  
  // it finds a random question
  def findOne = {
    val futureCount = db.command(Count(collection.name))
    futureCount.flatMap { count =>
      val skip = Random.nextInt(count)
      collection.find(emptyQuery).options(QueryOpts(skipN = skip)).one[QuizEntity]
    }
  }
  
  // it deletes a quiz entity by id
  def deleteById(id: String) = collection.remove(queryById(id)).map(_ => QuizDeleted)

  private def queryById(id: String) = BSONDocument("_id" -> BSONObjectID(id))

  private def emptyQuery = BSONDocument()
}

Almost done! We now just need to use our QuizDao trait as part of our QuizManager and QuestionManager classes:

// file QuizManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

class QuizManager extends QuizDao {

  def createQuiz(quizEntity: QuizEntity) = save(quizEntity)

  def deleteQuizEntity(id: String) = deleteById(id)
  
}

// file QuestionManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.api.QuestionProtocol._
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class QuestionManager extends QuizDao {

  def getQuestion(maybeId: Option[String] = None) = {

    def extractQuestion(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(quizEntity) => toQuestion(quizEntity)
      case _ => QuestionNotFound
    }
    tryGetQuiz(maybeId).map(extractQuestion)
  }

  def answerQuestion(id: String, proposedAnswer: Answer) = {
    
    def isAnswerCorrect(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(q) if (q.correctAnswer == proposedAnswer.answer) => CorrectAnswer
      case _ => WrongAnswer
    }
    
    tryGetQuiz(Some(id)).map(isAnswerCorrect)
  }

  private def tryGetQuiz(maybeId: Option[String]): Future[Option[QuizEntity]] = maybeId match {
    case Some(id) => findById(id)
    case _ => findOne
  }
  
}

Because the ReactiveMongo library is based on Futures, all the methods of our QuizManager and QuestionManager wrap their values in a Future: let’s adopt the Akka Pipe Pattern to send messages to our Responder Actor. An example on how this approach works is following:

// file RestInterface.scala
...
 pathPrefix("quizzes") {
      pathEnd {
        post {
          entity(as[Quiz]) { quiz => requestContext =>
            val responder = createResponder(requestContext)
            quizManager.createQuiz(quiz).pipeTo(responder)
          }
        }
      } 
...

quizManager.createQuiz(quiz) returns a Future[QuizCreated]: once the future is completed, the QuizCreated message is sent to the Responder Actor.

Summary

ReactiveMongo is a non-blocking asynchronous Scala Driver for MongoDB that is particularly suitable for highly scalable application. This article has described how ReactiveMongo can be easily integrated in an existing Akka Spray application.

All the code produced in this tutorial can be found here.

How to Supervise Akka Actors

Supervision is one of the core operations that an Actor can fulfill. Handling errors is not always easy in a classic object oriented programming context as exceptions can be difficult to predict as they are fully embedded in the normal execution flow. In the Akka Actor Model, errors are handled in a well-structured isolated execution flow: not only this makes the exception handling more predictable but it also forces developers to design a proper fault-recovery system. This article describes how to use Actor Supervisors to handle error and recover from them.

Actor Supervision: Overview

Actors have a well-structured tree hierarchy built according to specific rules:
– Your Father (i.e.: the Actor that created you) is your Supervisor.
– Every Actor has a Supervisor, a part from the Guardian Actor (/user) which is the first one created by the system (same as a root node in a tree structure).
– Your Children (i.e.: the Actors you have created) follow your destiny: if you are restarted/stopped/resumed, they are restarted/stopped/resumed as well.
– If unable to handle an exception, escalate it to your Supervisor.
– If the Guardian Actor is unable to handle an exception, the system will shutdown.

Akka provides two categories for our strategies:
OneForOneStrategy where the strategy is applied only to the child actor that failed.
AllForOneStrategy where the strategy is applied to all the children actors when one fails.

Although Akka provides two predefined failure-recovery strategies, called defaultStrategy and stoppingStrategy, most of the time we need to define our own: this can be easily done as shown in the following tutorial.

Actor Supervision in Practice!

In this tutorial we want to trigger an actor supervision operation when a specific word is contained in the received message:
– if the message contains the word “restart”, the child actor is restarted
– if the message contains the word “resume”, the child actor is resumed after the failure
– if the message contains the word “stop”, the child actor is stopped…FOREVER! 😈
– if the message contains the word “secret”, we throw an unhandled exception that forces the Guardian Actor to shutdown the system

First of all, let’s define our protocol and exceptions:

// file protocol.scala
package com.danielasfregola

object PrinterProtocol {

  case class Message(msg: String)
  
}

class RestartMeException extends Exception("RESTART")
class ResumeMeException extends Exception("RESUME")
class StopMeException extends Exception("STOP")

Then we define the behaviour of our Actor and when we are going to throw the exceptions. Note that we have also added some utility methods to better observe the life cycle of our Actors.

// file PrinterActor.scala
package com.danielasfregola

import akka.actor.Actor

class PrinterActor extends Actor {
  
  import PrinterProtocol._
  
  override def preRestart(reason: Throwable, message: Option[Any]) = {
    println("Yo, I am restarting...")
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable) = {
    println("...restart completed!")
    super.postRestart(reason)
  }
  
  override def preStart() = println("Yo, I am alive!")
  override def postStop() = println("Goodbye world!")

  override def receive: Receive = {
    case Message(msg) if containsRestart(msg) =>
      println(msg); throw new RestartMeException
    case Message(msg) if containsResume(msg) =>
      println(msg); throw new ResumeMeException
    case Message(msg) if containsStop(msg) =>
      println(msg); throw new StopMeException
    case Message(msg) if containsSecret(msg) =>
      println(msg); throw new Throwable
    case Message(msg) => println(msg)
  }
  
  private def containsRestart = containsWordCaseInsensitive("restart")_
  private def containsResume = containsWordCaseInsensitive("resume")_
  private def containsStop = containsWordCaseInsensitive("stop")_
  private def containsSecret = containsWordCaseInsensitive("secret")_

  private def containsWordCaseInsensitive(word: String)(msg: String) =  msg matches s".*(?i)$word.*"
}

Finally, the Supervisor just needs to create the actor and define the failure-recovery logic:

// file PrinterActorSupervisor.scala
package com.danielasfregola

import akka.actor.SupervisorStrategy._
import akka.actor.{Actor, OneForOneStrategy, Props}

class PrinterActorSupervisor extends Actor {

  override def preStart() = println("The Supervisor is ready to supervise")
  override def postStop() = println("Bye Bye from the Supervisor")

  override def supervisorStrategy = OneForOneStrategy() {
    case _: RestartMeException => Restart
    case _: ResumeMeException => Resume
    case _: StopMeException => Stop
  } 
  
  val printer = context.actorOf(Props(new PrinterActor), "printer-actor")
  
  override def receive: Receive = {
    case msg => printer forward msg
  }
}

That’s it! Now we just need to have fun with our buddies 🙂

When initialising our Actor system, all the Actors are created and automatically started:

  import PrinterProtocol._
  
  implicit val system = ActorSystem("printer-service")
  val printerSupervisor = system.actorOf(Props(new PrinterActorSupervisor), "printer-supervisor")
  // "The Supervisor is ready to supervise"
  // "Yo, I am alive!"

If no special keyword is send, nothing happens to our actors:

  printerSupervisor ! Message("...please, print me...")
  // ...please, print me...
  printerSupervisor ! Message("...another message to print, nothing should happen...")
  // ...another message to print, nothing should happen...

When restarting our actor, it is stopped and replaced by a brand new one. Also, the event is recorded in the logs.

  printerSupervisor ! Message("...why don't you restart?!")
  //  ...why don't you restart?!
  //  Yo, I am restarting...
  //  Goodbye world!
  //  ...restart completed!
  //  Yo, I am alive!

  // From the logs:
  // ERROR [OneForOneStrategy]: RESTART
  // com.danielasfregola.RestartMeException: RESTART
  //	at com.danielasfregola.PrinterActor$$anonfun$receive$1.applyOrElse(PrinterActor.scala:24) ~[classes/:na]
  // ...

When resuming, nothing happens but a nice warning is in the logs for us:

  printerSupervisor ! Message("...fell free to resume!")
  // ...fell free to resume!

  // From the logs:
  // WARN  [OneForOneStrategy]: RESUME

When stopping, the behaviour is similar to the restart case scenario:

  printerSupervisor ! Message("...you can STOP now!")
  // ...you can STOP now!
  // Goodbye world!

  // From the logs:
  // ERROR [OneForOneStrategy]: STOP
  // com.danielasfregola.StopMeException: STOP
  //	at com.danielasfregola.PrinterActor$$anonfun$receive$1.applyOrElse(PrinterActor.scala:28) ~[classes/:na]
  // ...

Finally, let’s see what happen with an exception that it is not handled. Note that both PrinterActor and PrinterActorSupervisor are killed as the whole system is shutdown by the Guardian Actor.

	printerSupervisor ! Message("...this is going to be our little secret...")
	// ...this is going to be our little secret...
	// Goodbye world!
	// Bye Bye from the Supervisor

	// From the logs:
	// ERROR [LocalActorRefProvider(akka://printer-service)]: guardian failed, shutting down system
	// java.lang.Throwable: null
    //	  at com.danielasfregola.PrinterActor$$anonfun$receive$1.applyOrElse(PrinterActor.scala:30) ~[classes/:na]
    // ...

Summary

The Akka Actor Model allows the creation of failure-recovery systems thanks to its well-structured hierarchy of Actor Supervisors. This article has provided a tutorial on how supervision can be used to control the life cycle of Actors in order to handle and recover from errors.

How to test Actors with Akka TestKit and Spec2

Actors are a really powerful tool to handle concurrency thanks to their message-based model. However, they can be tricky to test: sending and processing messages is done asynchronously. Moreover, their status is hidden internally and it cannot be easily accessed to make assertions on it.

The Akka Team has created a library, called akka-testkit, to simplify unit tests on actors. This article provides an overview of the main features of this library and how it can be used to test our lovely actors.

Single Threaded Tests

If our actor is particularly simple, a single threaded test may be enough. Thanks to TestActorRef, we are able to access the actor internal status and make assertions on it.

For example, we have built an actor that memorises all the received messages starting with ‘A’:

import akka.actor.Actor

object MessageFilteringActorProtocol {
  case class SimpleMessage(text: String)
}

class MessageFilteringActor extends Actor {
  import MessageFilteringActorProtocol._
  
  var messages = Vector[String]()
  
  // what the actor state is
  def state = messages
  
  // the actor behaviour when receiving an object
  def receive = {
    case SimpleMessage(text) if text startsWith "A" =>
      messages = messages :+ text
  }

}

Let’s build a test for our actor:

import akka.testkit.TestKit
import akka.actor.ActorSystem
import org.specs2.mutable.SpecificationLike

class MessageFilteringActorSpec extends TestKit(ActorSystem())
  with SpecificationLike {
  
  import MessageFilteringActorProtocol._
  
  val actor = TestActorRef[MessageFilteringActor]
  "A Message Filtering Actor" should {
    
    "save only messages that starts with 'A'" in {
      actor ! SimpleMessage("A message to remember")
      actor ! SimpleMessage("This message should not be saved")
      actor ! SimpleMessage("Another message for you")
      actor.underlyingActor.state.length mustEqual 2
    }
    
  }
}

Multi Threaded Testing

Unfortunately, single threaded unit testing is not always sufficient with more complex scenarios. To perform multi threaded tests, we have access to the TestProbe class that offers useful methods to wait and analyse the status and interaction with our actor. Some of the most common methods are the following:
expectMsg: it receives a message that is equal to the provided one
expectNoMsg: it receives no message
receiveWhile: it receives messages until the condition is respected or the time out is reached.
A complete list of all the methods offered by the TestProbe class can be found here.

Although the TestProbe class is quite powerful, it may require some changes in the actor code itself to make it more testable: we need to make sure that the actor is sending messages/information to our TestProbe class so that it can perform assertions on them.

A quite common approach is to create ad hoc messages for test purposes. For example, let’s assume we would like to know the internal status of our actor in a multi-threaded testing context. Moreover, we can have an optional listener to help us testing side effects.

An example on how to use these different approaches is as follows. Our BucketCounterActor prints the label on a bucket and it accumulates all the quantities received so far:

import akka.actor.Actor

object BucketCounterActorProtocol {
  case class Bucket(label: String, quantity: Int)
}

class BucketCounterActor extends Actor {
  import BucketCounterActorProtocol._
  
  var counter = 0
  
  def receive = {
    case Bucket(label, quantity) =>
      counter += quantity
      print(label)
  }

}

Let’s add some ad hoc code to our actor for test purposes:

import akka.actor.{ActorRef, Actor}

object BucketCounterActorProtocol {
  case class Bucket(label: String, quantity: Int)
  
  // a new message to expose the internal status of the actor 
  case class GetCounter(receiver: ActorRef)
}

// adding an optional listener to the class
class BucketCounterActor(listener: Option[ActorRef] = None) extends Actor {
  import BucketCounterActorProtocol._
  
  var counter = 0
  
  def receive = {
    case Bucket(label, quantity) =>
      counter = counter + quantity
      print(label)
      // informing the listener of the side effect
      listener.map(_ ! label)
    
    // logic to expose internal status
    case GetCounter(receiver) => receiver ! counter
  }

}

Thanks to the code we just added, testing our actor is now going to be really easy:

class BucketCounterActorSpec extends TestKit(ActorSystem()) with SpecificationLike {
  import BucketCounterActorProtocol._
  
  "A Bucket Counter Actor" should {
    
    val actorProps = Props(new BucketCounterActor(Some(testActor)))
    val actor = system.actorOf(actorProps, "actor-to-test")
    
    val firstBucket = Bucket("Yo, I am a bucket", 1)
    val secondBucket = Bucket("I am another bucket", 9)

    "print out the name of the received buckets" in {
      actor ! firstBucket
      expectMsg(firstBucket.label)
      actor ! secondBucket
      expectMsg(secondBucket.label)
      success
    }
    
    "accumulate the quantity of buckets received" in {
      actor ! GetCounter(testActor)
      expectMsg(10)
      success
    }
  }

Summary

Akka actors are a powerful tool to build concurrent systems. This article has provided different examples on how actors can be tested thanks to the akka-testkit library, using both single and multi threaded approaches.