Background Processing with Akka Cluster: ClusterSingletonManager

REST services are quite commonly used in scalable architectures because they are stateless. However, in the practical world, a service is rarely just a CRUD service: for example we could also have some background processing (i.e.: downloading/parsing files, scheduled processed, triggers, etc).
In this tutorial we demonstrate how to use an Akka SingletonClusterManager in a scalable architecture to perform some background operations only from one node, the Leader of our Akka Cluster.

All the code produced in this article can be found on GitHub.

Our Application

Let’s imagine we have a simple Akka Http REST API with one simple endpoint that given a ping request returns a pong response (for more information on how to create an api with Akka Http see this article):

// curl http://localhost:5000/ping >> pong
// RestInterface.scala
...

val routes: Route =
  path("ping") {
    get {
      complete("pong")
  }
}
...

An Akka Actor, called TickCounter, is also attached to our system to count ticks starting from zero. Its code is following:

// TickCounter.scala
package com.danielasfregola.akka.tutorials.actors

import akka.actor.Actor

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

class TickCounter extends Actor {

  case class Tick(n: Int)

  override def preStart = self ! Tick(0)

  def receive = {
    case Tick(t) =>
      println(s"${self.path} - Tick $t")
      context.system.scheduler.scheduleOnce(1 second, self, Tick(t+1))
  }

}

Our configuration file has only one dependency:

// build.sbt
name := "akka-tutorials"

version := "0.1"

organization := "com.danielasfregola"

scalaVersion := "2.11.5"

resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/")

libraryDependencies ++= {
  val AkkaVersion       = "2.4.2"
  Seq(
    "com.typesafe.akka" %% "akka-http-experimental" % AkkaVersion
  )
}

Also, our Main class looks as following:

// Main.scala
package com.danielasfregola.akka.tutorials

import akka.actor.{Props, ActorSystem}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.danielasfregola.akka.tutorials.actors.TickCounter

import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

object Main extends App with RestInterface {
  val config = ConfigFactory.load()
  val host = config.getString("http.host")
  val port = config.getInt("http.port")

  implicit val system = ActorSystem("akka-tutorials")
  implicit val materializer = ActorMaterializer()

  val tickCounter = system.actorOf(Props[TickCounter], "tick-counter")

  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout(10 seconds)

  val api = routes

  Http().bindAndHandle(handler = api, interface = host, port = port) map { binding =>
    println(s"REST interface bound to $host:$port") } recover { case ex =>
    println(s"REST interface could not bind to $host:$port", ex.getMessage)
  }

}

Our Goal

Our goal is to change our application so that when multiple instances of the same service are running, only one TickCounter is active.

We could set a flag to disable/enable the Actor. However, this is not ideal in a scalable ecosystem where every instance can potentially be removed. What happens if we remove the only node with the flag on? What if we deploy more than one service with the flag as active?

We could move the background process in a new service. However, for cases where the processing is not big enough to justify a new service, this could be quite expensive in terms of infrastructure and maintanance.

…or with some minor code changes we could setup a Akka Singleton Cluster Manager, get over with it and go to the pub early.

Akka SingletonClusterManager

In order to let our application use a SingletonClusterManager we just need to perform three simple operations:
– import the right dependency
– let our Cluster Manager know about our Actor
– set up the behaviour of the Akka Cluster

Obviously, this is only one way of setting up our Cluster. For more information on the other available configurations, please have a look at the official Akka documentation on Cluster Singleton.

Step 1: Update your dependencies

The SingletonClusterManager is part of the akka-cluster-tools package, so we need to add its dependency to our build.sbt file:

"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.2"

Step 2: Define the actors in your Cluster

In our Main we need to specify that our TickCounter Actor is part of our Cluster:

val tickCounter = {
  val singletonProps = ClusterSingletonManager.props(
    singletonProps = Props[TickCounter],
    terminationMessage = PoisonPill,
    settings = ClusterSingletonManagerSettings(system)
  )
  system.actorOf(singletonProps, "tick-counter-singleton")
}

The terminationMessage is sent to the Actor if the Cluster Manager needs to terminate it. Here we decided to keep it simple, so we just brutally kill the actor. Note that this can be customised to trigger a behaviour change rather than causing its termination.

Step 3: Configure the behaviour of your Cluster

Add the following configurations to your application.conf file to tell the Cluster how to behave:

// build.sbt
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = ${REMOTE_PORT}
    }
  }
}

akka.cluster {
  seed-nodes = [
    "akka.tcp://akka-tutorials@127.0.0.1:2551",
    "akka.tcp://akka-tutorials@127.0.0.1:2552",
    "akka.tcp://akka-tutorials@127.0.0.1:2553"
  ]
  min-nr-of-members = 1
  auto-down-unreachable-after = 30s
}

The akka.actor and akka.remote settings provide information on how to create and bind the node of the Cluster. In particular, akka.remote.netty.tcp defines our to reach the node by providing a hostname and port.

The akka.cluster configuration contains the following information:
seed-nodes: the seed nodes of the Cluster.
min-nr-of-members: the number of members needed before starting the Cluster.
auto-down-unreachable-after: the number of seconds after an unreachable node gets maked as Down and removed from the Cluster.

Note that auto-down-unreachable-after is a really sensible and dangerous setting that needs to be setup properly, in particular in production. From the Akka Documentation:

Be very careful when using Cluster Singleton together with Automatic Downing, since it allows the Cluster to split up into two separate clusters, which in turn will result in multiple Singletons being started, one in each separate Cluster!

Usage

Now that our Akka Cluster is set up, we are ready to use it.
Note that in our application, in our to start a service, we have decided to force two configurations at runtime: the port of the api and port of the akka node.

Let’s run the first instance of our service on port 5000 and the Akka node on port 2551:

> export PORT=5000
> export REMOTE_PORT=2551
> sbt run

In the logs we we see that the Cluster Manager declares the node as the Oldest of the Cluster and it starts the TickCounter Actor:

...
[INFO] [02/21/2016 16:14:24.312] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Node [akka.tcp://akka-tutorials@127.0.0.1:2551] is JOINING, roles []
[INFO] [02/21/2016 16:14:24.320] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is moving node [akka.tcp://akka-tutorials@127.0.0.1:2551] to [Up]
[INFO] [02/21/2016 16:14:24.337] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] Singleton manager starting singleton actor [akka://akka-tutorials/user/tick-counter-singleton/singleton]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 0
[INFO] [02/21/2016 16:14:24.340] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Oldest]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 1
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 2
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 3
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 4
...

Let’s run a second instance of our service on port 5001 and the node on port 2552:

> export PORT=5001
> export REMOTE_PORT=2552
> sbt run

In the logs of the second instance we see that the Cluster Manager declares the second node as Younger and it does not start its TickCounter Actor:

...
[INFO] [02/21/2016 16:18:31.343] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Welcome from [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:18:31.540] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Younger]
REST interface bound to 0.0.0.0:5001
...

If we stop the non-leader node (i.e.: the second one we started), the leader node acknowledges that the second node is unreachable: after some time, it marks it as down and it removes it from the Cluster:

[WARN] [02/21/2016 16:22:32.154] [akka-tutorials-akka.remote.default-remote-dispatcher-21] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-2] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
...
[WARN] [02/21/2016 16:22:36.287] [akka-tutorials-akka.actor.default-dispatcher-15] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/cluster/core/daemon] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://akka-tutorials@127.0.0.1:2552, status = Up)]
...
[WARN] [02/21/2016 16:22:37.349] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-3] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
...
[INFO] [02/21/2016 16:22:55.339] [akka-tutorials-akka.actor.default-dispatcher-18] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader can currently not perform its duties, reachability status: [akka.tcp://akka-tutorials@127.0.0.1:2551 -> akka.tcp://akka-tutorials@127.0.0.1:2552: Unreachable [Unreachable] (1)], member status: [akka.tcp://akka-tutorials@127.0.0.1:2551 Up seen=true, akka.tcp://akka-tutorials@127.0.0.1:2552 Up seen=false]
[WARN] [02/21/2016 16:22:55.346] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-6] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
[WARN] [02/21/2016 16:23:01.350] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-7] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
[INFO] [02/21/2016 16:23:06.305] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is auto-downing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552]
[INFO] [02/21/2016 16:23:06.306] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Marking unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552] as [Down]
...
[INFO] [02/21/2016 16:23:07.285] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader can perform its duties again
[INFO] [02/21/2016 16:23:07.287] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is removing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552]
[INFO] [02/21/2016 16:23:07.288] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] Member removed [akka.tcp://akka-tutorials@127.0.0.1:2552]
[WARN] [02/21/2016 16:23:07.292] [akka-tutorials-akka.remote.default-remote-dispatcher-21] [akka.remote.Remoting] Association to [akka.tcp://akka-tutorials@127.0.0.1:2552] having UID [-2037835353] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
...

When we start the second node again, the node is added back to the Cluster:

...
[INFO] [02/21/2016 16:28:49.560] [akka-tutorials-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Node [akka.tcp://akka-tutorials@127.0.0.1:2552] is JOINING, roles []
[INFO] [02/21/2016 16:28:50.293] [akka-tutorials-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is moving node [akka.tcp://akka-tutorials@127.0.0.1:2552] to [Up]
...

At last, we kill the Leader node and we see that after some time the second node is promoted as Leader:

REST interface bound to 0.0.0.0:5001
[INFO] [02/21/2016 16:28:51.192] [akka-tutorials-akka.actor.default-dispatcher-15] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Younger]
[WARN] [02/21/2016 16:32:01.276] [akka-tutorials-akka.remote.default-remote-dispatcher-6] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
[WARN] [02/21/2016 16:32:06.185] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/cluster/core/daemon] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://akka-tutorials@127.0.0.1:2551, status = Up)]
[WARN] [02/21/2016 16:32:06.674] [akka-tutorials-akka.remote.default-remote-dispatcher-5] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-2] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[WARN] [02/21/2016 16:32:12.677] [akka-tutorials-akka.remote.default-remote-dispatcher-6] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-3] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
...
[INFO] [02/21/2016 16:32:26.188] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader can currently not perform its duties, reachability status: [akka.tcp://akka-tutorials@127.0.0.1:2552 -> akka.tcp://akka-tutorials@127.0.0.1:2551: Unreachable [Unreachable] (1)], member status: [akka.tcp://akka-tutorials@127.0.0.1:2551 Up seen=false, akka.tcp://akka-tutorials@127.0.0.1:2552 Up seen=true]
...
[INFO] [02/21/2016 16:32:36.203] [akka-tutorials-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader is auto-downing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:36.204] [akka-tutorials-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Marking unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551] as [Down]
...
[INFO] [02/21/2016 16:32:37.182] [akka-tutorials-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader can perform its duties again
[INFO] [02/21/2016 16:32:37.189] [akka-tutorials-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader is removing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:37.192] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Previous oldest removed [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:37.193] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Younger observed OldestChanged: [None -> myself]
[WARN] [02/21/2016 16:32:37.194] [akka-tutorials-akka.remote.default-remote-dispatcher-5] [akka.remote.Remoting] Association to [akka.tcp://akka-tutorials@127.0.0.1:2551] having UID [676396303] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[INFO] [02/21/2016 16:32:37.195] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Singleton manager starting singleton actor [akka://akka-tutorials/user/tick-counter-singleton/singleton]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 0
[INFO] [02/21/2016 16:32:37.197] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Younger -> Oldest]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 1
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 2
....

Summary

In this article we have described how to use an Akka SingletonClusterManager to synchronise several instances of the same service. In particular, our goal was to configure an Akka Cluster of services so that some operations will run only from one service at the time (i.e.: the Leader of the Cluster).

All the code produced in this tutorial is on GitHub.

Akka Dead Letters Channel

Akka doesn’t guarantee the delivery of a message. What happens when a message cannot be delivered? In this article we will describe how the Dead Letters Channel works and how it can be used to spot issues in our system.

How it works

In a previous article we have described the use of Event Streams in Akka. The Dead Letter Channel is nothing more that a special Event Stream that the system uses internally every time a message cannot be delivered: either because the message cannot be processed or delivered.

When Akka is redirecting the failed message to the Dead Letter actor, it wraps the message in a case class called Dead Letter to provide the message, the original sender and recipient:

case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)

Unless specified differently, dead letters are logged in the INFO level: more information on how to tweak your logging settings can be found here.

How to use it

Because the Dead Letter Channel is an Event Stream, we can subscribe to it and listen to all the messages it publishes.

The code used for this tutorial is available here.

First of all, let’s create a dummy actor, called EchoActor, that prints all the messages it receives:

 
class EchoActor extends Actor {
  
  def receive = {
    case msg => println(s"New msg received: $msg")
  }
  
}

The second step is to create our actor system: we will have two instance of EchoActor, one called deadLettersSubscriber that will listen for DeadLetters and the other, called echoActor, that will simply wait and receive messages.

  implicit val system = ActorSystem("dead-letters-usage-example")

  val deadLettersSubscriber = system.actorOf(Props[EchoActor], name = "dead-letters-subscriber")
  val echoActor = system.actorOf(Props[EchoActor], name = "generic-echo-actor")

  system.eventStream.subscribe(deadLettersSubscriber, classOf[DeadLetter])

When successfully sending a message, no dead letter is generated.

  echoActor ! "First Message"
  // generic-echo-actor - New msg received: First Message

However, when we try to send a message to an actor that has been killed, the message is successfully transformed into a DeadLetter.

  echoActor ! PoisonPill
  echoActor ! "Second Message"
  // dead-letters-subscriber - New msg received: DeadLetter(Second Message,Actor[akka://dead-letters-usage-example/deadLetters],Actor[akka://dead-letters-usage-example/user/generic-echo-actor#317003256])
  // INFO  [RepointableActorRef]: Message [java.lang.String] from Actor[akka://dead-letters-usage-example/deadLetters] to Actor[akka://dead-letters-usage-example/user/generic-echo-actor#317003256] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Finally, we can also send messages directly to the Dead Letter Actor: this is usually not advised as the Dead Letter Channel should be reserved for the system to redirect failed messages.

  system.deadLetters ! "Dead Message"
  // dead-letters-subscriber - New msg received: DeadLetter(Dead Message,Actor[akka://dead-letters-usage-example/deadLetters],Actor[akka://dead-letters-usage-example/deadLetters])
  // INFO  [DeadLetterActorRef]: Message [java.lang.String] from Actor[akka://dead-letters-usage-example/deadLetters] to Actor[akka://dead-letters-usage-example/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Summary

Akka redirects all the messages that couldn’t be delivered or process to the Dead Letter Channel. In this article we have discussed how Akka uses it and how we can exploit it when testing our system and investigating issues with our system.

Peer-to-Many Communication in Akka

The most common communication channel in Akka is Peer-to-Peer, where an individual actor sends a message directly to another individual actor actor. However, sometimes this is not enough as we may need a Peer-to-Many type of communication, where an individual actor sends a message to a group of actors. This is particularly useful when you need to model your system using a Publisher-Subscriber Pattern. This article will provide a quick tutorial on how to use and customise an Event Bus, the Akka way of implementing a Peer-to-Many communication channel.

Event Stream

Event Stream is the simplest and most common implementation of an Event Bus. It follows the classic Publisher-Subscriber Pattern: one system actor will publish a message and all the actors that subscribed to that specific message type will receive it.

Let’s see with a simple tutorial how easily this can be achieved: (gist available here).

In this tutorial, we want to model the following scenario: every time someone publishes a book, all the subscribers need to receive it.

First of all, we need to define what a book is:

case class Book(title: String, authors: List[String])

Then, we need to specify an Actor that acts as book publisher: every time it receives a book, it publishes it on the System Event Stream.

class BookPublisher extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      context.system.eventStream.publish(book)
    }
  }
  
}

Finally, let’s create an Actor that will subscribe to the System Event Stream for all messages of type Book. Note that the preStart function will be executed by Akka right after the creation of the Actor.

class BookSubscriber extends Actor {
  
  override def preStart = context.system.eventStream.subscribe(self, classOf[Book])
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Done! See, it wasn’t that bad… 🙂

Now we just need to play with our system to make sure it works as expected:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
 
  val author = "Author"
  
  val bookPublisher = system.actorOf(Props[BookPublisher], name = "book-publisher")
  
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1") 
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
  
  bookPublisher ! Book(title = "A book title", authors = List(author, "Another author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-2 and I have received a new book: Book(A book title,List(Author, Another author))
  
  system.eventStream.unsubscribe(subscriber2, classOf[Book])
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another author))
  // My name is subscriber-1 and I have received a new book: Book(Another book title,List(Another author))
} 

Note that instead of unsubscribing subscriber2 to all the messages of type Book, we could have also unsubscribed it to any type of messages by using system.eventStream.unsubscribe(subscriber2).

Custom Event Bus

Event Streams are really easy to use. However, they may not be that useful if we want to perform some filtering on the published message. One solution to our problem could be to do some filtering before publishing the message, but what if this cannot be done or simply we want to do it in a more elegant way?

An Event Stream is a specific implementation of a Event Bus trait: Akka gives us to opportunity to reuse this trait to create Custom Event Buses.

Assume that now a subscriber wants to receive books for a specific author — gist of the following code can be found here.

First of all, the BookSubscriber actor doesn’t need to automatically subscribe to the System Event Stream. Moreover, the BookPublisher actor now needs to publish on a given Event Bus, rather than the System Event Stream.

class BookPublisher(bus: AuthorBookBus) extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      bus.publish(book)
    }
  }
  
}
 
class BookSubscriber extends Actor {
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Finally, let’s define our AuthorBookBus that will filter books according to their authors.

class AuthorBookBus(author: String) extends EventBus
  with LookupClassification
  with ActorEventBus {
  
  type Event = Book
  type Classifier = Boolean
  
  override def mapSize = 2
  
  override def classify(book: Book) = book.authors.contains(author)
 
  override protected def publish(book: Book, subscriber: Subscriber)= subscriber ! book
} 

Our Event Bus accepts events of type Book and it filters books with two possible values: true if author is one of the authors of the book, false otherwise — and this is why override def mapSize = 2! The function classify is used to categories a book according to the Classifier type. Akka provides different type of classifiers (see here), in our case the LookupClassification was enough as we wanted a classification based on the event itself.

The following app shows an example of how our actor system could be used:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
  
  val author = "Author"
 
  val authorBookBus = new AuthorBookBus(author)
  val bookPublisher = system.actorOf(Props(new BookPublisher(authorBookBus)), name = "book-publisher")
 
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1")
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
 
  authorBookBus.subscribe(subscriber1, true)
  // i.e.: subscriber1 will receive all the books
  // where one of the authors is "Author"

  authorBookBus.subscribe(subscriber2, false)
  // i.e.: subscriber2 will receive all the books 
  // where "Author" is not an author

  bookPublisher ! Book(title = "A book title", authors = List(author, "Another Author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another Author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another Author))
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another Author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another Author))
  // My name is subscriber-2 and I have received a new book: Book(Another book title,List(Another Author))
}

Summary

Not only Akka allows Peer-to-Peer communication, but also Peer-to-Many, which is useful to implement a publisher-subscriber pattern. This article has described how this can be achieved using Event Streams for simplest case scenarios and Custom Event Buses when some classification on the event is needed.

How to Integrate ReactiveMongo in your Akka Spray Application

Scalability can be challenging when database access is needed: the common approach is to block the thread until a response is received. ReactiveMongo is a MongoDB Scala Driver that provides fully non-blocking asynchronous I/O operation that increases the scalability of your system.
In a previous post we have seen how to build a REST Api with Spray: in this article we will describe how to expand our application to integrate ReactiveMongo.

All the code produced in this tutorial can be found here.

Our Goal

Our goal is to create an application to manage quizzes. In particular, we want to:
– create a quiz
– delete a quiz
– get a random question
– get a question by id
– answer a question by id

Details on how we have chosen to implement the Rest Interface can be found here. In the following sessions we will analyse how ReactiveMongo can be used to store the quiz entity in our MongoDB database without any blocking operation.

Set Up

First, we need an instance of MongoDB: we can set it up one in our local machine (see MongoDB official website for instructions). Also, we need to include the ReactiveMongo library as part of our SBT dependencies and provide information on our MongoDB instance:

// file build.sbt
libraryDependencies ++= {
  ...
  Seq(
  	...
    "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",
    ...
  )
}
// file application.conf
...
mongodb {
  database = "quiz-management"
  servers = ["localhost:27017"]
}

Minor refactoring is needed to make our application a little bit more structured: the original QuizProtocol class has been split in two (QuizProtocol and QuestionProtocol) and moved to a new package called model.api.
Finally, we now let MongoDB generate the quiz id rather than asking the user to select one.

ReactiveMongo Integration

First step is to define our persistence model. Also, we need to provide instructions on how to serialise/deserialise our QuizEntity in MongoDB.

// file QuizEntity.scala
// note the package model.persistence to separate it from our model.api representations
package com.danielasfregola.quiz.management.model.persistence

import com.danielasfregola.quiz.management.model.api.QuizProtocol.Quiz
import reactivemongo.bson.{BSONDocumentWriter, BSONDocument, BSONDocumentReader, BSONObjectID}

case class QuizEntity(id: BSONObjectID = BSONObjectID.generate,
                      question: String, 
                      correctAnswer: String)

object QuizEntity {

  implicit def toQuizEntity(quiz: Quiz) = QuizEntity(question = quiz.question, correctAnswer = quiz.correctAnswer)

  implicit object QuizEntityBSONReader extends BSONDocumentReader[QuizEntity] {
    
    def read(doc: BSONDocument): QuizEntity = 
      QuizEntity(
        id = doc.getAs[BSONObjectID]("_id").get,
        question = doc.getAs[String]("question").get,
        correctAnswer = doc.getAs[String]("answer").get
      )
  }
  
  implicit object QuizEntityBSONWriter extends BSONDocumentWriter[QuizEntity] {
    def write(quizEntity: QuizEntity): BSONDocument =
      BSONDocument(
        "_id" -> quizEntity.id,
        "question" -> quizEntity.question,
        "answer" -> quizEntity.correctAnswer
      )
  }
}

Let’s create a trait, called MongoDao, that defines how we use our configuration to connect to our MongoDB instance:

// file MongoDao.scala
package com.danielasfregola.quiz.management.dao

import com.typesafe.config.ConfigFactory
import reactivemongo.api.MongoDriver

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global

trait MongoDao {

  val config = ConfigFactory.load()
  val database = config.getString("mongodb.database")
  val servers = config.getStringList("mongodb.servers").asScala

  val driver = new MongoDriver
  val connection = driver.connection(servers)

  val db = connection(database)
}

We now define our collection and the I/O operations that we can execute on it:

package com.danielasfregola.quiz.management.dao

import com.danielasfregola.quiz.management.model.persistance.QuizEntity
import reactivemongo.api.QueryOpts
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.{BSONDocument, BSONObjectID}
import reactivemongo.core.commands.Count

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

trait QuizDao extends MongoDao {
  
  import com.danielasfregola.quiz.management.model.persistance.QuizEntity._
  import com.danielasfregola.quiz.management.model.api.QuizProtocol._
  
  val collection = db[BSONCollection]("quizzes")

  // it creates a new quiz entity
  def save(quizEntity: QuizEntity) = collection.save(quizEntity)
    .map(_ => QuizCreated(quizEntity.id.stringify))
  
  // it finds a question by id
  def findById(id: String) =
    collection.find(queryById(id)).one[QuizEntity]
  
  // it finds a random question
  def findOne = {
    val futureCount = db.command(Count(collection.name))
    futureCount.flatMap { count =>
      val skip = Random.nextInt(count)
      collection.find(emptyQuery).options(QueryOpts(skipN = skip)).one[QuizEntity]
    }
  }
  
  // it deletes a quiz entity by id
  def deleteById(id: String) = collection.remove(queryById(id)).map(_ => QuizDeleted)

  private def queryById(id: String) = BSONDocument("_id" -> BSONObjectID(id))

  private def emptyQuery = BSONDocument()
}

Almost done! We now just need to use our QuizDao trait as part of our QuizManager and QuestionManager classes:

// file QuizManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

class QuizManager extends QuizDao {

  def createQuiz(quizEntity: QuizEntity) = save(quizEntity)

  def deleteQuizEntity(id: String) = deleteById(id)
  
}

// file QuestionManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.api.QuestionProtocol._
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class QuestionManager extends QuizDao {

  def getQuestion(maybeId: Option[String] = None) = {

    def extractQuestion(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(quizEntity) => toQuestion(quizEntity)
      case _ => QuestionNotFound
    }
    tryGetQuiz(maybeId).map(extractQuestion)
  }

  def answerQuestion(id: String, proposedAnswer: Answer) = {
    
    def isAnswerCorrect(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(q) if (q.correctAnswer == proposedAnswer.answer) => CorrectAnswer
      case _ => WrongAnswer
    }
    
    tryGetQuiz(Some(id)).map(isAnswerCorrect)
  }

  private def tryGetQuiz(maybeId: Option[String]): Future[Option[QuizEntity]] = maybeId match {
    case Some(id) => findById(id)
    case _ => findOne
  }
  
}

Because the ReactiveMongo library is based on Futures, all the methods of our QuizManager and QuestionManager wrap their values in a Future: let’s adopt the Akka Pipe Pattern to send messages to our Responder Actor. An example on how this approach works is following:

// file RestInterface.scala
...
 pathPrefix("quizzes") {
      pathEnd {
        post {
          entity(as[Quiz]) { quiz => requestContext =>
            val responder = createResponder(requestContext)
            quizManager.createQuiz(quiz).pipeTo(responder)
          }
        }
      } 
...

quizManager.createQuiz(quiz) returns a Future[QuizCreated]: once the future is completed, the QuizCreated message is sent to the Responder Actor.

Summary

ReactiveMongo is a non-blocking asynchronous Scala Driver for MongoDB that is particularly suitable for highly scalable application. This article has described how ReactiveMongo can be easily integrated in an existing Akka Spray application.

All the code produced in this tutorial can be found here.

How to Supervise Akka Actors

Supervision is one of the core operations that an Actor can fulfill. Handling errors is not always easy in a classic object oriented programming context as exceptions can be difficult to predict as they are fully embedded in the normal execution flow. In the Akka Actor Model, errors are handled in a well-structured isolated execution flow: not only this makes the exception handling more predictable but it also forces developers to design a proper fault-recovery system. This article describes how to use Actor Supervisors to handle error and recover from them.

Actor Supervision: Overview

Actors have a well-structured tree hierarchy built according to specific rules:
– Your Father (i.e.: the Actor that created you) is your Supervisor.
– Every Actor has a Supervisor, a part from the Guardian Actor (/user) which is the first one created by the system (same as a root node in a tree structure).
– Your Children (i.e.: the Actors you have created) follow your destiny: if you are restarted/stopped/resumed, they are restarted/stopped/resumed as well.
– If unable to handle an exception, escalate it to your Supervisor.
– If the Guardian Actor is unable to handle an exception, the system will shutdown.

Akka provides two categories for our strategies:
OneForOneStrategy where the strategy is applied only to the child actor that failed.
AllForOneStrategy where the strategy is applied to all the children actors when one fails.

Although Akka provides two predefined failure-recovery strategies, called defaultStrategy and stoppingStrategy, most of the time we need to define our own: this can be easily done as shown in the following tutorial.

Actor Supervision in Practice!

In this tutorial we want to trigger an actor supervision operation when a specific word is contained in the received message:
– if the message contains the word “restart”, the child actor is restarted
– if the message contains the word “resume”, the child actor is resumed after the failure
– if the message contains the word “stop”, the child actor is stopped…FOREVER! 😈
– if the message contains the word “secret”, we throw an unhandled exception that forces the Guardian Actor to shutdown the system

First of all, let’s define our protocol and exceptions:

// file protocol.scala
package com.danielasfregola

object PrinterProtocol {

  case class Message(msg: String)
  
}

class RestartMeException extends Exception("RESTART")
class ResumeMeException extends Exception("RESUME")
class StopMeException extends Exception("STOP")

Then we define the behaviour of our Actor and when we are going to throw the exceptions. Note that we have also added some utility methods to better observe the life cycle of our Actors.

// file PrinterActor.scala
package com.danielasfregola

import akka.actor.Actor

class PrinterActor extends Actor {
  
  import PrinterProtocol._
  
  override def preRestart(reason: Throwable, message: Option[Any]) = {
    println("Yo, I am restarting...")
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable) = {
    println("...restart completed!")
    super.postRestart(reason)
  }
  
  override def preStart() = println("Yo, I am alive!")
  override def postStop() = println("Goodbye world!")

  override def receive: Receive = {
    case Message(msg) if containsRestart(msg) =>
      println(msg); throw new RestartMeException
    case Message(msg) if containsResume(msg) =>
      println(msg); throw new ResumeMeException
    case Message(msg) if containsStop(msg) =>
      println(msg); throw new StopMeException
    case Message(msg) if containsSecret(msg) =>
      println(msg); throw new Throwable
    case Message(msg) => println(msg)
  }
  
  private def containsRestart = containsWordCaseInsensitive("restart")_
  private def containsResume = containsWordCaseInsensitive("resume")_
  private def containsStop = containsWordCaseInsensitive("stop")_
  private def containsSecret = containsWordCaseInsensitive("secret")_

  private def containsWordCaseInsensitive(word: String)(msg: String) =  msg matches s".*(?i)$word.*"
}

Finally, the Supervisor just needs to create the actor and define the failure-recovery logic:

// file PrinterActorSupervisor.scala
package com.danielasfregola

import akka.actor.SupervisorStrategy._
import akka.actor.{Actor, OneForOneStrategy, Props}

class PrinterActorSupervisor extends Actor {

  override def preStart() = println("The Supervisor is ready to supervise")
  override def postStop() = println("Bye Bye from the Supervisor")

  override def supervisorStrategy = OneForOneStrategy() {
    case _: RestartMeException => Restart
    case _: ResumeMeException => Resume
    case _: StopMeException => Stop
  } 
  
  val printer = context.actorOf(Props(new PrinterActor), "printer-actor")
  
  override def receive: Receive = {
    case msg => printer forward msg
  }
}

That’s it! Now we just need to have fun with our buddies 🙂

When initialising our Actor system, all the Actors are created and automatically started:

  import PrinterProtocol._
  
  implicit val system = ActorSystem("printer-service")
  val printerSupervisor = system.actorOf(Props(new PrinterActorSupervisor), "printer-supervisor")
  // "The Supervisor is ready to supervise"
  // "Yo, I am alive!"

If no special keyword is send, nothing happens to our actors:

  printerSupervisor ! Message("...please, print me...")
  // ...please, print me...
  printerSupervisor ! Message("...another message to print, nothing should happen...")
  // ...another message to print, nothing should happen...

When restarting our actor, it is stopped and replaced by a brand new one. Also, the event is recorded in the logs.

  printerSupervisor ! Message("...why don't you restart?!")
  //  ...why don't you restart?!
  //  Yo, I am restarting...
  //  Goodbye world!
  //  ...restart completed!
  //  Yo, I am alive!

  // From the logs:
  // ERROR [OneForOneStrategy]: RESTART
  // com.danielasfregola.RestartMeException: RESTART
  //	at com.danielasfregola.PrinterActor$$anonfun$receive$1.applyOrElse(PrinterActor.scala:24) ~[classes/:na]
  // ...

When resuming, nothing happens but a nice warning is in the logs for us:

  printerSupervisor ! Message("...fell free to resume!")
  // ...fell free to resume!

  // From the logs:
  // WARN  [OneForOneStrategy]: RESUME

When stopping, the behaviour is similar to the restart case scenario:

  printerSupervisor ! Message("...you can STOP now!")
  // ...you can STOP now!
  // Goodbye world!

  // From the logs:
  // ERROR [OneForOneStrategy]: STOP
  // com.danielasfregola.StopMeException: STOP
  //	at com.danielasfregola.PrinterActor$$anonfun$receive$1.applyOrElse(PrinterActor.scala:28) ~[classes/:na]
  // ...

Finally, let’s see what happen with an exception that it is not handled. Note that both PrinterActor and PrinterActorSupervisor are killed as the whole system is shutdown by the Guardian Actor.

	printerSupervisor ! Message("...this is going to be our little secret...")
	// ...this is going to be our little secret...
	// Goodbye world!
	// Bye Bye from the Supervisor

	// From the logs:
	// ERROR [LocalActorRefProvider(akka://printer-service)]: guardian failed, shutting down system
	// java.lang.Throwable: null
    //	  at com.danielasfregola.PrinterActor$$anonfun$receive$1.applyOrElse(PrinterActor.scala:30) ~[classes/:na]
    // ...

Summary

The Akka Actor Model allows the creation of failure-recovery systems thanks to its well-structured hierarchy of Actor Supervisors. This article has provided a tutorial on how supervision can be used to control the life cycle of Actors in order to handle and recover from errors.

How to test Actors with Akka TestKit and Spec2

Actors are a really powerful tool to handle concurrency thanks to their message-based model. However, they can be tricky to test: sending and processing messages is done asynchronously. Moreover, their status is hidden internally and it cannot be easily accessed to make assertions on it.

The Akka Team has created a library, called akka-testkit, to simplify unit tests on actors. This article provides an overview of the main features of this library and how it can be used to test our lovely actors.

Single Threaded Tests

If our actor is particularly simple, a single threaded test may be enough. Thanks to TestActorRef, we are able to access the actor internal status and make assertions on it.

For example, we have built an actor that memorises all the received messages starting with ‘A’:

import akka.actor.Actor

object MessageFilteringActorProtocol {
  case class SimpleMessage(text: String)
}

class MessageFilteringActor extends Actor {
  import MessageFilteringActorProtocol._
  
  var messages = Vector[String]()
  
  // what the actor state is
  def state = messages
  
  // the actor behaviour when receiving an object
  def receive = {
    case SimpleMessage(text) if text startsWith "A" =>
      messages = messages :+ text
  }

}

Let’s build a test for our actor:

import akka.testkit.TestKit
import akka.actor.ActorSystem
import org.specs2.mutable.SpecificationLike

class MessageFilteringActorSpec extends TestKit(ActorSystem())
  with SpecificationLike {
  
  import MessageFilteringActorProtocol._
  
  val actor = TestActorRef[MessageFilteringActor]
  "A Message Filtering Actor" should {
    
    "save only messages that starts with 'A'" in {
      actor ! SimpleMessage("A message to remember")
      actor ! SimpleMessage("This message should not be saved")
      actor ! SimpleMessage("Another message for you")
      actor.underlyingActor.state.length mustEqual 2
    }
    
  }
}

Multi Threaded Testing

Unfortunately, single threaded unit testing is not always sufficient with more complex scenarios. To perform multi threaded tests, we have access to the TestProbe class that offers useful methods to wait and analyse the status and interaction with our actor. Some of the most common methods are the following:
expectMsg: it receives a message that is equal to the provided one
expectNoMsg: it receives no message
receiveWhile: it receives messages until the condition is respected or the time out is reached.
A complete list of all the methods offered by the TestProbe class can be found here.

Although the TestProbe class is quite powerful, it may require some changes in the actor code itself to make it more testable: we need to make sure that the actor is sending messages/information to our TestProbe class so that it can perform assertions on them.

A quite common approach is to create ad hoc messages for test purposes. For example, let’s assume we would like to know the internal status of our actor in a multi-threaded testing context. Moreover, we can have an optional listener to help us testing side effects.

An example on how to use these different approaches is as follows. Our BucketCounterActor prints the label on a bucket and it accumulates all the quantities received so far:

import akka.actor.Actor

object BucketCounterActorProtocol {
  case class Bucket(label: String, quantity: Int)
}

class BucketCounterActor extends Actor {
  import BucketCounterActorProtocol._
  
  var counter = 0
  
  def receive = {
    case Bucket(label, quantity) =>
      counter += quantity
      print(label)
  }

}

Let’s add some ad hoc code to our actor for test purposes:

import akka.actor.{ActorRef, Actor}

object BucketCounterActorProtocol {
  case class Bucket(label: String, quantity: Int)
  
  // a new message to expose the internal status of the actor 
  case class GetCounter(receiver: ActorRef)
}

// adding an optional listener to the class
class BucketCounterActor(listener: Option[ActorRef] = None) extends Actor {
  import BucketCounterActorProtocol._
  
  var counter = 0
  
  def receive = {
    case Bucket(label, quantity) =>
      counter = counter + quantity
      print(label)
      // informing the listener of the side effect
      listener.map(_ ! label)
    
    // logic to expose internal status
    case GetCounter(receiver) => receiver ! counter
  }

}

Thanks to the code we just added, testing our actor is now going to be really easy:

class BucketCounterActorSpec extends TestKit(ActorSystem()) with SpecificationLike {
  import BucketCounterActorProtocol._
  
  "A Bucket Counter Actor" should {
    
    val actorProps = Props(new BucketCounterActor(Some(testActor)))
    val actor = system.actorOf(actorProps, "actor-to-test")
    
    val firstBucket = Bucket("Yo, I am a bucket", 1)
    val secondBucket = Bucket("I am another bucket", 9)

    "print out the name of the received buckets" in {
      actor ! firstBucket
      expectMsg(firstBucket.label)
      actor ! secondBucket
      expectMsg(secondBucket.label)
      success
    }
    
    "accumulate the quantity of buckets received" in {
      actor ! GetCounter(testActor)
      expectMsg(10)
      success
    }
  }

Summary

Akka actors are a powerful tool to build concurrent systems. This article has provided different examples on how actors can be tested thanks to the akka-testkit library, using both single and multi threaded approaches.

How to build a REST api with Spray and Akka

Spray is a library written on top of Akka and Scala that allows to quickly create REST interfaces. It is becoming more and more popular in the Scala community because it is easy to use and performant thanks to this asynchronous, actor-based model. This article describes how to efficiently exploit Spray to create a simple REST api.

All the code produced in this tutorial, together with a quick guide on how to run and use the application, can be found here.

Update: I wrote some follow ups on this article:
1) In this article, I used the default Spray facilities for (de)serializing objects from and to JSON. In this follow-up article, I’m showing how to make (de)serialization in Spray simpler by using the json4s library.
2) Also, this article doesn’t always respect the REST protocol or the spray way of completing an endpoint. In the following article, I explain how to build a REST CRUD application in Spray, making full advantage of the Spray’s tool kit.

Our Goal

For this tutorial we want to build a REST interface to manage quizzes. In particular we would like our interface to do the following:
– Create a quiz
– Delete a quiz
– Get a random question
– Get a question by id
– Answer a question

In the following sections we will analyse how to achieve these requirements. In particular, we will focus on how to create and delete a quiz entity. The described pattern can be re-used to achieve all the other requirements: describing all the requirements one by one would not bring any extra value to the tutorial itself. Please, have a look at the GitHub repository for the complete code of this tutorial and examples on how to use the application.

Project Metadata

First we need to set up our project. Let’s add some metadata: its name and version, its organization, what scala and SBT versions we intend to use.

// file build.sbt
name := "quiz-management-service"

version := "0.1"

organization := "com.danielasfregola"

scalaVersion := "2.11.5"
// file project/build.properties
sbt.version=0.13.6

Plugins

Next step: make our life a lot easier! By importing the following plugins, we just need to run the sbt assembly command to assembly our code: it will compile the code, run all the tests and produce an executable java jar.

// file project/plugins.sbt
resolvers += Classpaths.typesafeResolver

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.0-M4")

addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.3.9")
// file build.sbt
enablePlugins(JavaServerAppPackaging)
...

// Assembly settings
mainClass in Global := Some("com.danielasfregola.quiz.management.Main")

jarName in assembly := "quiz-management-server.jar"

To compile, test and assembly the code:

cd quiz-management-service/spray-akka
sbt assembly

The produced jar will be called “quiz-management-server.jar” and it will be use the main class located in the package “com.danielasfregola.quiz.management”. To run our service (by default on http://localhost:5000), we just need to type the following command:

java -jar target/scala-2.11/quiz-management-service.jar

Spray Dependencies

Almost there with the boring part! We now have to include the spray libraries that we are going to use:

// file build.sbt
resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"Spray Repository" at "http://repo.spray.io")

libraryDependencies ++= {
    val akkaVersion = "2.3.9"
    val sprayVersion = "1.3.2"
    Seq(
        "com.typesafe.akka" %% "akka-actor" % akkaVersion,
        "io.spray" %% "spray-can" % sprayVersion,
        "io.spray" %% "spray-routing" % sprayVersion,
        "io.spray" %% "spray-json" % "1.3.1",
        "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
        "ch.qos.logback" % "logback-classic" % "1.1.2",
        "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
        "io.spray" %% "spray-testkit" % sprayVersion % "test",
        "org.specs2" %% "specs2" % "2.3.13" % "test"
    )
}

Main Class

The first thing to do in our Main class is to load our system configurations. Then, we create our actor system called “quiz-management-service” that contains our http interface — remember that Spray is actor-based! Finally, our actor system is bound to an HTTP port.

package com.danielasfregola.quiz.management

import akka.actor._
import akka.io.IO
import akka.pattern.ask
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import spray.can.Http

import scala.concurrent.duration._

object Main extends App {
    val config = ConfigFactory.load()
    val host = config.getString("http.host")
    val port = config.getInt("http.port")
    
    implicit val system = ActorSystem("quiz-management-service")
    
    val api = system.actorOf(Props(new RestInterface()), "httpInterface")
    
    implicit val executionContext = system.dispatcher
    implicit val timeout = Timeout(10 seconds)
    
    IO(Http).ask(Http.Bind(listener = api, interface = host, port = port))
    .mapTo[Http.Event]
    .map {
        case Http.Bound(address) =>
            println(s"REST interface bound to $address")
        case Http.CommandFailed(cmd) =>
            println("REST interface could not bind to " +
            s"$host:$port, ${cmd.failureMessage}")
        system.shutdown()
    }
}

Rest Interface Class

The Rest interface class is an HttpServiceActor: every time it receives an HTTP request, it tries to match it to a routing defined in our RestApi trait. When a matching is found, the action defined is executed and the result sent to a Responder actor. The Responder actor is responsible for sending back a meaningful HTTP response with an appropriate status code and body. Every time we receive a valid HTTP request, we create an instance of the Responder actor and then kill it once the request has been completed.

package com.danielasfregola.quiz.management

import akka.actor._
import akka.util.Timeout
import spray.http.StatusCodes
import spray.httpx.SprayJsonSupport._
import spray.routing._

import scala.concurrent.duration._
import scala.language.postfixOps

class RestInterface extends HttpServiceActor
with RestApi {

    def receive = runRoute(routes)
}

trait RestApi extends HttpService with ActorLogging { actor: Actor =>

    implicit val timeout = Timeout(10 seconds)
    
    var quizzes = Vector[Quiz]()
    
    def routes: Route = ???
}

class Responder(requestContext:RequestContext) extends Actor with ActorLogging {

    def receive = ???
    
    private def killYourself = self ! PoisonPill

}

For the purposes of this tutorial, we will store all the data in a vector: this is not an ideal solution for a production system, but it allows this article to focus on how Spray works. In a more realistic context, we should delegate actions to other classes — or even better actors! — and store the data in a persistent data storage (e.g.: database).

How to create and delete a quiz

Our interface needs to provide the functionalities of creating and deleting a quiz entity. This can be translated in the following REST calls:
– POST to URI /quizzes with JSON body: {"id": "my_quiz_id", "question": "my_question", "correctAnswer": "my_answer"}
– DELETE to URI /quizzes/my_quiz_id with no body

First, we need to define what a Quiz is and the messages that we can send to the Responder actor. This can easily be achieved by using case classes and objects:

// file src/main/scala/com/danielasfregola/quiz/management/QuizProtocol.scala
...
// entity: quiz
case class Quiz(id: String, question: String, correctAnswer: String)

// message: quiz has been created
case object QuizCreated

// message: quiz cannot be created because it already exists
case object QuizAlreadyExists

// message: quiz has been deleted
case object QuizDeleted
...

Also, we need instructions on how to (un)marshall a Quiz case class.

// file src/main/scala/com/danielasfregola/quiz/management/QuizProtocol.scala
...
object Quiz extends DefaultJsonProtocol {
    implicit val format = jsonFormat3(Quiz.apply)
}
...

The RestApi trait defines the routing and the operation to execute. For each matched HTTP request we create an instance of the Responder actor: once the result of the executed operation is available we send it to the Responder actor.

....

var quizzes = Vector[Quiz]()

def routes: Route =

pathPrefix("quizzes") {
    pathEnd {
        post {
            entity(as[Quiz]) { quiz => requestContext =>
                val responder = createResponder(requestContext)
                createQuiz(quiz) match {
                    case true => responder ! QuizCreated
                    case _ => responder ! QuizAlreadyExists
                }
            }
        }
    } ~
    path(Segment) { id =>
        delete { requestContext =>
            val responder = createResponder(requestContext)
            deleteQuiz(id)
            responder ! QuizDeleted
        }
    }
}
...

private def createQuiz(quiz: Quiz): Boolean = {
    val doesNotExist = !quizzes.exists(_.id == quiz.id)
    if (doesNotExist) quizzes = quizzes :+ quiz
    doesNotExist
}

private def deleteQuiz(id: String): Unit = {
    quizzes = quizzes.filterNot(_.id == id)
}

private def createResponder(requestContext:RequestContext) = {
    context.actorOf(Props(new Responder(requestContext)))
}

When the Responder actor receives a message it maps it to a meaningful HTTP response: it sends it back to the HTTP requester and it kills itself ( 😥 ).

// file src/main/scala/com/danielasfregola/quiz/management/RestInterface.scala
class Responder(requestContext:RequestContext) extends Actor with ActorLogging {
import com.danielasfregola.quiz.management.QuizProtocol._

def receive = {

case QuizCreated =>
    requestContext.complete(StatusCodes.Created)
    killYourself

case QuizDeleted =>
    requestContext.complete(StatusCodes.OK)
    killYourself

case QuizAlreadyExists =>
    requestContext.complete(StatusCodes.Conflict)
    killYourself

....
}

private def killYourself = self ! PoisonPill

Summary

This article has described how to use Spray to create a REST interface. Spray is a library that uses a actor-based model to easily and efficiently implement a REST layer. Although the provided example in not applicable in a realistic system (because all the data is stored in a Vector variable!), it shows a simple pattern on how to structure a REST api using Spray: how to organise the communication between actors, the routing and the response handling.