Background Processing with Akka Cluster: ClusterSingletonManager

REST services are quite commonly used in scalable architectures because they are stateless. However, in the practical world, a service is rarely just a CRUD service: for example we could also have some background processing (i.e.: downloading/parsing files, scheduled processed, triggers, etc).
In this tutorial we demonstrate how to use an Akka SingletonClusterManager in a scalable architecture to perform some background operations only from one node, the Leader of our Akka Cluster.

All the code produced in this article can be found on GitHub.

Our Application

Let’s imagine we have a simple Akka Http REST API with one simple endpoint that given a ping request returns a pong response (for more information on how to create an api with Akka Http see this article):

// curl http://localhost:5000/ping >> pong
// RestInterface.scala
...

val routes: Route =
  path("ping") {
    get {
      complete("pong")
  }
}
...

An Akka Actor, called TickCounter, is also attached to our system to count ticks starting from zero. Its code is following:

// TickCounter.scala
package com.danielasfregola.akka.tutorials.actors

import akka.actor.Actor

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

class TickCounter extends Actor {

  case class Tick(n: Int)

  override def preStart = self ! Tick(0)

  def receive = {
    case Tick(t) =>
      println(s"${self.path} - Tick $t")
      context.system.scheduler.scheduleOnce(1 second, self, Tick(t+1))
  }

}

Our configuration file has only one dependency:

// build.sbt
name := "akka-tutorials"

version := "0.1"

organization := "com.danielasfregola"

scalaVersion := "2.11.5"

resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/")

libraryDependencies ++= {
  val AkkaVersion       = "2.4.2"
  Seq(
    "com.typesafe.akka" %% "akka-http-experimental" % AkkaVersion
  )
}

Also, our Main class looks as following:

// Main.scala
package com.danielasfregola.akka.tutorials

import akka.actor.{Props, ActorSystem}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.danielasfregola.akka.tutorials.actors.TickCounter

import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

object Main extends App with RestInterface {
  val config = ConfigFactory.load()
  val host = config.getString("http.host")
  val port = config.getInt("http.port")

  implicit val system = ActorSystem("akka-tutorials")
  implicit val materializer = ActorMaterializer()

  val tickCounter = system.actorOf(Props[TickCounter], "tick-counter")

  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout(10 seconds)

  val api = routes

  Http().bindAndHandle(handler = api, interface = host, port = port) map { binding =>
    println(s"REST interface bound to $host:$port") } recover { case ex =>
    println(s"REST interface could not bind to $host:$port", ex.getMessage)
  }

}

Our Goal

Our goal is to change our application so that when multiple instances of the same service are running, only one TickCounter is active.

We could set a flag to disable/enable the Actor. However, this is not ideal in a scalable ecosystem where every instance can potentially be removed. What happens if we remove the only node with the flag on? What if we deploy more than one service with the flag as active?

We could move the background process in a new service. However, for cases where the processing is not big enough to justify a new service, this could be quite expensive in terms of infrastructure and maintanance.

…or with some minor code changes we could setup a Akka Singleton Cluster Manager, get over with it and go to the pub early.

Akka SingletonClusterManager

In order to let our application use a SingletonClusterManager we just need to perform three simple operations:
– import the right dependency
– let our Cluster Manager know about our Actor
– set up the behaviour of the Akka Cluster

Obviously, this is only one way of setting up our Cluster. For more information on the other available configurations, please have a look at the official Akka documentation on Cluster Singleton.

Step 1: Update your dependencies

The SingletonClusterManager is part of the akka-cluster-tools package, so we need to add its dependency to our build.sbt file:

"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.2"

Step 2: Define the actors in your Cluster

In our Main we need to specify that our TickCounter Actor is part of our Cluster:

val tickCounter = {
  val singletonProps = ClusterSingletonManager.props(
    singletonProps = Props[TickCounter],
    terminationMessage = PoisonPill,
    settings = ClusterSingletonManagerSettings(system)
  )
  system.actorOf(singletonProps, "tick-counter-singleton")
}

The terminationMessage is sent to the Actor if the Cluster Manager needs to terminate it. Here we decided to keep it simple, so we just brutally kill the actor. Note that this can be customised to trigger a behaviour change rather than causing its termination.

Step 3: Configure the behaviour of your Cluster

Add the following configurations to your application.conf file to tell the Cluster how to behave:

// build.sbt
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = ${REMOTE_PORT}
    }
  }
}

akka.cluster {
  seed-nodes = [
    "akka.tcp://akka-tutorials@127.0.0.1:2551",
    "akka.tcp://akka-tutorials@127.0.0.1:2552",
    "akka.tcp://akka-tutorials@127.0.0.1:2553"
  ]
  min-nr-of-members = 1
  auto-down-unreachable-after = 30s
}

The akka.actor and akka.remote settings provide information on how to create and bind the node of the Cluster. In particular, akka.remote.netty.tcp defines our to reach the node by providing a hostname and port.

The akka.cluster configuration contains the following information:
seed-nodes: the seed nodes of the Cluster.
min-nr-of-members: the number of members needed before starting the Cluster.
auto-down-unreachable-after: the number of seconds after an unreachable node gets maked as Down and removed from the Cluster.

Note that auto-down-unreachable-after is a really sensible and dangerous setting that needs to be setup properly, in particular in production. From the Akka Documentation:

Be very careful when using Cluster Singleton together with Automatic Downing, since it allows the Cluster to split up into two separate clusters, which in turn will result in multiple Singletons being started, one in each separate Cluster!

Usage

Now that our Akka Cluster is set up, we are ready to use it.
Note that in our application, in our to start a service, we have decided to force two configurations at runtime: the port of the api and port of the akka node.

Let’s run the first instance of our service on port 5000 and the Akka node on port 2551:

> export PORT=5000
> export REMOTE_PORT=2551
> sbt run

In the logs we we see that the Cluster Manager declares the node as the Oldest of the Cluster and it starts the TickCounter Actor:

...
[INFO] [02/21/2016 16:14:24.312] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Node [akka.tcp://akka-tutorials@127.0.0.1:2551] is JOINING, roles []
[INFO] [02/21/2016 16:14:24.320] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is moving node [akka.tcp://akka-tutorials@127.0.0.1:2551] to [Up]
[INFO] [02/21/2016 16:14:24.337] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] Singleton manager starting singleton actor [akka://akka-tutorials/user/tick-counter-singleton/singleton]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 0
[INFO] [02/21/2016 16:14:24.340] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Oldest]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 1
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 2
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 3
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 4
...

Let’s run a second instance of our service on port 5001 and the node on port 2552:

> export PORT=5001
> export REMOTE_PORT=2552
> sbt run

In the logs of the second instance we see that the Cluster Manager declares the second node as Younger and it does not start its TickCounter Actor:

...
[INFO] [02/21/2016 16:18:31.343] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Welcome from [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:18:31.540] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Younger]
REST interface bound to 0.0.0.0:5001
...

If we stop the non-leader node (i.e.: the second one we started), the leader node acknowledges that the second node is unreachable: after some time, it marks it as down and it removes it from the Cluster:

[WARN] [02/21/2016 16:22:32.154] [akka-tutorials-akka.remote.default-remote-dispatcher-21] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-2] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
...
[WARN] [02/21/2016 16:22:36.287] [akka-tutorials-akka.actor.default-dispatcher-15] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/cluster/core/daemon] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://akka-tutorials@127.0.0.1:2552, status = Up)]
...
[WARN] [02/21/2016 16:22:37.349] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-3] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
...
[INFO] [02/21/2016 16:22:55.339] [akka-tutorials-akka.actor.default-dispatcher-18] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader can currently not perform its duties, reachability status: [akka.tcp://akka-tutorials@127.0.0.1:2551 -> akka.tcp://akka-tutorials@127.0.0.1:2552: Unreachable [Unreachable] (1)], member status: [akka.tcp://akka-tutorials@127.0.0.1:2551 Up seen=true, akka.tcp://akka-tutorials@127.0.0.1:2552 Up seen=false]
[WARN] [02/21/2016 16:22:55.346] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-6] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
[WARN] [02/21/2016 16:23:01.350] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-7] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
[INFO] [02/21/2016 16:23:06.305] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is auto-downing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552]
[INFO] [02/21/2016 16:23:06.306] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Marking unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552] as [Down]
...
[INFO] [02/21/2016 16:23:07.285] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader can perform its duties again
[INFO] [02/21/2016 16:23:07.287] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is removing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552]
[INFO] [02/21/2016 16:23:07.288] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] Member removed [akka.tcp://akka-tutorials@127.0.0.1:2552]
[WARN] [02/21/2016 16:23:07.292] [akka-tutorials-akka.remote.default-remote-dispatcher-21] [akka.remote.Remoting] Association to [akka.tcp://akka-tutorials@127.0.0.1:2552] having UID [-2037835353] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
...

When we start the second node again, the node is added back to the Cluster:

...
[INFO] [02/21/2016 16:28:49.560] [akka-tutorials-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Node [akka.tcp://akka-tutorials@127.0.0.1:2552] is JOINING, roles []
[INFO] [02/21/2016 16:28:50.293] [akka-tutorials-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is moving node [akka.tcp://akka-tutorials@127.0.0.1:2552] to [Up]
...

At last, we kill the Leader node and we see that after some time the second node is promoted as Leader:

REST interface bound to 0.0.0.0:5001
[INFO] [02/21/2016 16:28:51.192] [akka-tutorials-akka.actor.default-dispatcher-15] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Younger]
[WARN] [02/21/2016 16:32:01.276] [akka-tutorials-akka.remote.default-remote-dispatcher-6] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
[WARN] [02/21/2016 16:32:06.185] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/cluster/core/daemon] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://akka-tutorials@127.0.0.1:2551, status = Up)]
[WARN] [02/21/2016 16:32:06.674] [akka-tutorials-akka.remote.default-remote-dispatcher-5] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-2] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[WARN] [02/21/2016 16:32:12.677] [akka-tutorials-akka.remote.default-remote-dispatcher-6] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-3] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
...
[INFO] [02/21/2016 16:32:26.188] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader can currently not perform its duties, reachability status: [akka.tcp://akka-tutorials@127.0.0.1:2552 -> akka.tcp://akka-tutorials@127.0.0.1:2551: Unreachable [Unreachable] (1)], member status: [akka.tcp://akka-tutorials@127.0.0.1:2551 Up seen=false, akka.tcp://akka-tutorials@127.0.0.1:2552 Up seen=true]
...
[INFO] [02/21/2016 16:32:36.203] [akka-tutorials-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader is auto-downing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:36.204] [akka-tutorials-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Marking unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551] as [Down]
...
[INFO] [02/21/2016 16:32:37.182] [akka-tutorials-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader can perform its duties again
[INFO] [02/21/2016 16:32:37.189] [akka-tutorials-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader is removing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:37.192] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Previous oldest removed [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:37.193] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Younger observed OldestChanged: [None -> myself]
[WARN] [02/21/2016 16:32:37.194] [akka-tutorials-akka.remote.default-remote-dispatcher-5] [akka.remote.Remoting] Association to [akka.tcp://akka-tutorials@127.0.0.1:2551] having UID [676396303] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[INFO] [02/21/2016 16:32:37.195] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Singleton manager starting singleton actor [akka://akka-tutorials/user/tick-counter-singleton/singleton]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 0
[INFO] [02/21/2016 16:32:37.197] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Younger -> Oldest]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 1
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 2
....

Summary

In this article we have described how to use an Akka SingletonClusterManager to synchronise several instances of the same service. In particular, our goal was to configure an Akka Cluster of services so that some operations will run only from one service at the time (i.e.: the Leader of the Cluster).

All the code produced in this tutorial is on GitHub.

Published by

Daniela Sfregola

Tech Leader at Paytouch

7 thoughts on “Background Processing with Akka Cluster: ClusterSingletonManager”

  1. Hi Daniela, great post!
    I have a problem now, my memory is not decreasing when my singleton actors are not used. What do you mean when you said “The terminationMessage is sent to the Actor if the Cluster Manager needs to terminate it”? I need to terminate mi singleton actors

    Like

    1. Hi Xavier,
      the termination message is sent to actors that would like to become singleton but can’t as a singleton actor is already been selected. By the definition of singleton, you can have only one singleton at the time. If for some reason you need to stop the actor that has been selected as singleton, you need to do it manually. Note that the termination message is just a message that is handled by the actor that receives it. When sending a `PoisonPill`, the actor knows that it needs to stop and die. Note that you can send any message and potentially change the behaviour of the actor rather than killing it.

      Hope it helps!

      Cheers,
      Daniela

      Like

  2. Dear Daniela,

    Your post is very nice and truly well explained ..

    I have a problem if i continue to work on the scenario…
    The second node is promoted as Leader , but if i restart the first node..then the first node is back immediately as leader….Can we keep who is leader must be leader until it die, i.e. even first node restarts, 2nd node stays leader?

    Thank a lot.

    Like

    1. Hi Kim,
      I think the key to your problem is the parameter `auto-down-unreachable-after`.

      Let’s assume you have 2 nodes, node A and node B. Node A is currently the leader, while Node B is the worker.

      When node A (the Leader) dies, the cluster manager from node B (the Worker) will try to reconnect to node A (the Leader) for the time indicated in `auto-down-unreachable-after`. If the node A (the Leader) comes back online before the time indicated in `auto-down-unreachable-after`, it will stay as Leader. Vice versa, if the node A doesn’t come back online within the indicated time, the node B will become the new leader, and it will stay as leader even if later on the node A rejoins.

      Please, be careful when setting a value for `auto-down-unreachable-after`! More info on the akka docs here: http://doc.akka.io/docs/akka/snapshot/scala/cluster-usage.html#Automatic_vs__Manual_Downing

      Cheers,
      Daniela

      Like

      1. Dear Daniela,
        Just had some testings recently, it seems that the parameter doesn’t have any effect on it.
        I also noticed similar issues without answers from https://groups.google.com/forum/#!topic/akka-user/_s9fd83at2A
        “So here the leader is still node 1, i.e. the node that was already alive when node 2 was added. But when I re-added node 1 to the cluster that consisted of just node 2, the position of leader was immediately switched over to node 1.”

        Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s