How to Integrate ReactiveMongo in your Akka Spray Application

Scalability can be challenging when database access is needed: the common approach is to block the thread until a response is received. ReactiveMongo is a MongoDB Scala Driver that provides fully non-blocking asynchronous I/O operation that increases the scalability of your system.
In a previous post we have seen how to build a REST Api with Spray: in this article we will describe how to expand our application to integrate ReactiveMongo.

All the code produced in this tutorial can be found here.

Our Goal

Our goal is to create an application to manage quizzes. In particular, we want to:
– create a quiz
– delete a quiz
– get a random question
– get a question by id
– answer a question by id

Details on how we have chosen to implement the Rest Interface can be found here. In the following sessions we will analyse how ReactiveMongo can be used to store the quiz entity in our MongoDB database without any blocking operation.

Set Up

First, we need an instance of MongoDB: we can set it up one in our local machine (see MongoDB official website for instructions). Also, we need to include the ReactiveMongo library as part of our SBT dependencies and provide information on our MongoDB instance:

// file build.sbt
libraryDependencies ++= {
  ...
  Seq(
  	...
    "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",
    ...
  )
}
// file application.conf
...
mongodb {
  database = "quiz-management"
  servers = ["localhost:27017"]
}

Minor refactoring is needed to make our application a little bit more structured: the original QuizProtocol class has been split in two (QuizProtocol and QuestionProtocol) and moved to a new package called model.api.
Finally, we now let MongoDB generate the quiz id rather than asking the user to select one.

ReactiveMongo Integration

First step is to define our persistence model. Also, we need to provide instructions on how to serialise/deserialise our QuizEntity in MongoDB.

// file QuizEntity.scala
// note the package model.persistence to separate it from our model.api representations
package com.danielasfregola.quiz.management.model.persistence

import com.danielasfregola.quiz.management.model.api.QuizProtocol.Quiz
import reactivemongo.bson.{BSONDocumentWriter, BSONDocument, BSONDocumentReader, BSONObjectID}

case class QuizEntity(id: BSONObjectID = BSONObjectID.generate,
                      question: String, 
                      correctAnswer: String)

object QuizEntity {

  implicit def toQuizEntity(quiz: Quiz) = QuizEntity(question = quiz.question, correctAnswer = quiz.correctAnswer)

  implicit object QuizEntityBSONReader extends BSONDocumentReader[QuizEntity] {
    
    def read(doc: BSONDocument): QuizEntity = 
      QuizEntity(
        id = doc.getAs[BSONObjectID]("_id").get,
        question = doc.getAs[String]("question").get,
        correctAnswer = doc.getAs[String]("answer").get
      )
  }
  
  implicit object QuizEntityBSONWriter extends BSONDocumentWriter[QuizEntity] {
    def write(quizEntity: QuizEntity): BSONDocument =
      BSONDocument(
        "_id" -> quizEntity.id,
        "question" -> quizEntity.question,
        "answer" -> quizEntity.correctAnswer
      )
  }
}

Let’s create a trait, called MongoDao, that defines how we use our configuration to connect to our MongoDB instance:

// file MongoDao.scala
package com.danielasfregola.quiz.management.dao

import com.typesafe.config.ConfigFactory
import reactivemongo.api.MongoDriver

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global

trait MongoDao {

  val config = ConfigFactory.load()
  val database = config.getString("mongodb.database")
  val servers = config.getStringList("mongodb.servers").asScala

  val driver = new MongoDriver
  val connection = driver.connection(servers)

  val db = connection(database)
}

We now define our collection and the I/O operations that we can execute on it:

package com.danielasfregola.quiz.management.dao

import com.danielasfregola.quiz.management.model.persistance.QuizEntity
import reactivemongo.api.QueryOpts
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.{BSONDocument, BSONObjectID}
import reactivemongo.core.commands.Count

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

trait QuizDao extends MongoDao {
  
  import com.danielasfregola.quiz.management.model.persistance.QuizEntity._
  import com.danielasfregola.quiz.management.model.api.QuizProtocol._
  
  val collection = db[BSONCollection]("quizzes")

  // it creates a new quiz entity
  def save(quizEntity: QuizEntity) = collection.save(quizEntity)
    .map(_ => QuizCreated(quizEntity.id.stringify))
  
  // it finds a question by id
  def findById(id: String) =
    collection.find(queryById(id)).one[QuizEntity]
  
  // it finds a random question
  def findOne = {
    val futureCount = db.command(Count(collection.name))
    futureCount.flatMap { count =>
      val skip = Random.nextInt(count)
      collection.find(emptyQuery).options(QueryOpts(skipN = skip)).one[QuizEntity]
    }
  }
  
  // it deletes a quiz entity by id
  def deleteById(id: String) = collection.remove(queryById(id)).map(_ => QuizDeleted)

  private def queryById(id: String) = BSONDocument("_id" -> BSONObjectID(id))

  private def emptyQuery = BSONDocument()
}

Almost done! We now just need to use our QuizDao trait as part of our QuizManager and QuestionManager classes:

// file QuizManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

class QuizManager extends QuizDao {

  def createQuiz(quizEntity: QuizEntity) = save(quizEntity)

  def deleteQuizEntity(id: String) = deleteById(id)
  
}

// file QuestionManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.api.QuestionProtocol._
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class QuestionManager extends QuizDao {

  def getQuestion(maybeId: Option[String] = None) = {

    def extractQuestion(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(quizEntity) => toQuestion(quizEntity)
      case _ => QuestionNotFound
    }
    tryGetQuiz(maybeId).map(extractQuestion)
  }

  def answerQuestion(id: String, proposedAnswer: Answer) = {
    
    def isAnswerCorrect(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(q) if (q.correctAnswer == proposedAnswer.answer) => CorrectAnswer
      case _ => WrongAnswer
    }
    
    tryGetQuiz(Some(id)).map(isAnswerCorrect)
  }

  private def tryGetQuiz(maybeId: Option[String]): Future[Option[QuizEntity]] = maybeId match {
    case Some(id) => findById(id)
    case _ => findOne
  }
  
}

Because the ReactiveMongo library is based on Futures, all the methods of our QuizManager and QuestionManager wrap their values in a Future: let’s adopt the Akka Pipe Pattern to send messages to our Responder Actor. An example on how this approach works is following:

// file RestInterface.scala
...
 pathPrefix("quizzes") {
      pathEnd {
        post {
          entity(as[Quiz]) { quiz => requestContext =>
            val responder = createResponder(requestContext)
            quizManager.createQuiz(quiz).pipeTo(responder)
          }
        }
      } 
...

quizManager.createQuiz(quiz) returns a Future[QuizCreated]: once the future is completed, the QuizCreated message is sent to the Responder Actor.

Summary

ReactiveMongo is a non-blocking asynchronous Scala Driver for MongoDB that is particularly suitable for highly scalable application. This article has described how ReactiveMongo can be easily integrated in an existing Akka Spray application.

All the code produced in this tutorial can be found here.

How to Supervise Akka Actors

Supervision is one of the core operations that an Actor can fulfill. Handling errors is not always easy in a classic object oriented programming context as exceptions can be difficult to predict as they are fully embedded in the normal execution flow. In the Akka Actor Model, errors are handled in a well-structured isolated execution flow: not only this makes the exception handling more predictable but it also forces developers to design a proper fault-recovery system. This article describes how to use Actor Supervisors to handle error and recover from them.

Actor Supervision: Overview

Actors have a well-structured tree hierarchy built according to specific rules:
– Your Father (i.e.: the Actor that created you) is your Supervisor.
– Every Actor has a Supervisor, a part from the Guardian Actor (/user) which is the first one created by the system (same as a root node in a tree structure).
– Your Children (i.e.: the Actors you have created) follow your destiny: if you are restarted/stopped/resumed, they are restarted/stopped/resumed as well.
– If unable to handle an exception, escalate it to your Supervisor.
– If the Guardian Actor is unable to handle an exception, the system will shutdown.

Akka provides two categories for our strategies:
OneForOneStrategy where the strategy is applied only to the child actor that failed.
AllForOneStrategy where the strategy is applied to all the children actors when one fails.

Although Akka provides two predefined failure-recovery strategies, called defaultStrategy and stoppingStrategy, most of the time we need to define our own: this can be easily done as shown in the following tutorial.

Actor Supervision in Practice!

In this tutorial we want to trigger an actor supervision operation when a specific word is contained in the received message:
– if the message contains the word “restart”, the child actor is restarted
– if the message contains the word “resume”, the child actor is resumed after the failure
– if the message contains the word “stop”, the child actor is stopped…FOREVER! 😈
– if the message contains the word “secret”, we throw an unhandled exception that forces the Guardian Actor to shutdown the system

First of all, let’s define our protocol and exceptions:

// file protocol.scala
package com.danielasfregola

object PrinterProtocol {

  case class Message(msg: String)
  
}

class RestartMeException extends Exception("RESTART")
class ResumeMeException extends Exception("RESUME")
class StopMeException extends Exception("STOP")

Then we define the behaviour of our Actor and when we are going to throw the exceptions. Note that we have also added some utility methods to better observe the life cycle of our Actors.

// file PrinterActor.scala
package com.danielasfregola

import akka.actor.Actor

class PrinterActor extends Actor {
  
  import PrinterProtocol._
  
  override def preRestart(reason: Throwable, message: Option[Any]) = {
    println("Yo, I am restarting...")
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable) = {
    println("...restart completed!")
    super.postRestart(reason)
  }
  
  override def preStart() = println("Yo, I am alive!")
  override def postStop() = println("Goodbye world!")

  override def receive: Receive = {
    case Message(msg) if containsRestart(msg) =>
      println(msg); throw new RestartMeException
    case Message(msg) if containsResume(msg) =>
      println(msg); throw new ResumeMeException
    case Message(msg) if containsStop(msg) =>
      println(msg); throw new StopMeException
    case Message(msg) if containsSecret(msg) =>
      println(msg); throw new Throwable
    case Message(msg) => println(msg)
  }
  
  private def containsRestart = containsWordCaseInsensitive("restart")_
  private def containsResume = containsWordCaseInsensitive("resume")_
  private def containsStop = containsWordCaseInsensitive("stop")_
  private def containsSecret = containsWordCaseInsensitive("secret")_

  private def containsWordCaseInsensitive(word: String)(msg: String) =  msg matches s".*(?i)$word.*"
}

Finally, the Supervisor just needs to create the actor and define the failure-recovery logic:

// file PrinterActorSupervisor.scala
package com.danielasfregola

import akka.actor.SupervisorStrategy._
import akka.actor.{Actor, OneForOneStrategy, Props}

class PrinterActorSupervisor extends Actor {

  override def preStart() = println("The Supervisor is ready to supervise")
  override def postStop() = println("Bye Bye from the Supervisor")

  override def supervisorStrategy = OneForOneStrategy() {
    case _: RestartMeException => Restart
    case _: ResumeMeException => Resume
    case _: StopMeException => Stop
  } 
  
  val printer = context.actorOf(Props(new PrinterActor), "printer-actor")
  
  override def receive: Receive = {
    case msg => printer forward msg
  }
}

That’s it! Now we just need to have fun with our buddies 🙂

When initialising our Actor system, all the Actors are created and automatically started:

  import PrinterProtocol._
  
  implicit val system = ActorSystem("printer-service")
  val printerSupervisor = system.actorOf(Props(new PrinterActorSupervisor), "printer-supervisor")
  // "The Supervisor is ready to supervise"
  // "Yo, I am alive!"

If no special keyword is send, nothing happens to our actors:

  printerSupervisor ! Message("...please, print me...")
  // ...please, print me...
  printerSupervisor ! Message("...another message to print, nothing should happen...")
  // ...another message to print, nothing should happen...

When restarting our actor, it is stopped and replaced by a brand new one. Also, the event is recorded in the logs.

  printerSupervisor ! Message("...why don't you restart?!")
  //  ...why don't you restart?!
  //  Yo, I am restarting...
  //  Goodbye world!
  //  ...restart completed!
  //  Yo, I am alive!

  // From the logs:
  // ERROR [OneForOneStrategy]: RESTART
  // com.danielasfregola.RestartMeException: RESTART
  //	at com.danielasfregola.PrinterActor$$anonfun$receive$1.applyOrElse(PrinterActor.scala:24) ~[classes/:na]
  // ...

When resuming, nothing happens but a nice warning is in the logs for us:

  printerSupervisor ! Message("...fell free to resume!")
  // ...fell free to resume!

  // From the logs:
  // WARN  [OneForOneStrategy]: RESUME

When stopping, the behaviour is similar to the restart case scenario:

  printerSupervisor ! Message("...you can STOP now!")
  // ...you can STOP now!
  // Goodbye world!

  // From the logs:
  // ERROR [OneForOneStrategy]: STOP
  // com.danielasfregola.StopMeException: STOP
  //	at com.danielasfregola.PrinterActor$$anonfun$receive$1.applyOrElse(PrinterActor.scala:28) ~[classes/:na]
  // ...

Finally, let’s see what happen with an exception that it is not handled. Note that both PrinterActor and PrinterActorSupervisor are killed as the whole system is shutdown by the Guardian Actor.

	printerSupervisor ! Message("...this is going to be our little secret...")
	// ...this is going to be our little secret...
	// Goodbye world!
	// Bye Bye from the Supervisor

	// From the logs:
	// ERROR [LocalActorRefProvider(akka://printer-service)]: guardian failed, shutting down system
	// java.lang.Throwable: null
    //	  at com.danielasfregola.PrinterActor$$anonfun$receive$1.applyOrElse(PrinterActor.scala:30) ~[classes/:na]
    // ...

Summary

The Akka Actor Model allows the creation of failure-recovery systems thanks to its well-structured hierarchy of Actor Supervisors. This article has provided a tutorial on how supervision can be used to control the life cycle of Actors in order to handle and recover from errors.

How to test Actors with Akka TestKit and Spec2

Actors are a really powerful tool to handle concurrency thanks to their message-based model. However, they can be tricky to test: sending and processing messages is done asynchronously. Moreover, their status is hidden internally and it cannot be easily accessed to make assertions on it.

The Akka Team has created a library, called akka-testkit, to simplify unit tests on actors. This article provides an overview of the main features of this library and how it can be used to test our lovely actors.

Single Threaded Tests

If our actor is particularly simple, a single threaded test may be enough. Thanks to TestActorRef, we are able to access the actor internal status and make assertions on it.

For example, we have built an actor that memorises all the received messages starting with ‘A’:

import akka.actor.Actor

object MessageFilteringActorProtocol {
  case class SimpleMessage(text: String)
}

class MessageFilteringActor extends Actor {
  import MessageFilteringActorProtocol._
  
  var messages = Vector[String]()
  
  // what the actor state is
  def state = messages
  
  // the actor behaviour when receiving an object
  def receive = {
    case SimpleMessage(text) if text startsWith "A" =>
      messages = messages :+ text
  }

}

Let’s build a test for our actor:

import akka.testkit.TestKit
import akka.actor.ActorSystem
import org.specs2.mutable.SpecificationLike

class MessageFilteringActorSpec extends TestKit(ActorSystem())
  with SpecificationLike {
  
  import MessageFilteringActorProtocol._
  
  val actor = TestActorRef[MessageFilteringActor]
  "A Message Filtering Actor" should {
    
    "save only messages that starts with 'A'" in {
      actor ! SimpleMessage("A message to remember")
      actor ! SimpleMessage("This message should not be saved")
      actor ! SimpleMessage("Another message for you")
      actor.underlyingActor.state.length mustEqual 2
    }
    
  }
}

Multi Threaded Testing

Unfortunately, single threaded unit testing is not always sufficient with more complex scenarios. To perform multi threaded tests, we have access to the TestProbe class that offers useful methods to wait and analyse the status and interaction with our actor. Some of the most common methods are the following:
expectMsg: it receives a message that is equal to the provided one
expectNoMsg: it receives no message
receiveWhile: it receives messages until the condition is respected or the time out is reached.
A complete list of all the methods offered by the TestProbe class can be found here.

Although the TestProbe class is quite powerful, it may require some changes in the actor code itself to make it more testable: we need to make sure that the actor is sending messages/information to our TestProbe class so that it can perform assertions on them.

A quite common approach is to create ad hoc messages for test purposes. For example, let’s assume we would like to know the internal status of our actor in a multi-threaded testing context. Moreover, we can have an optional listener to help us testing side effects.

An example on how to use these different approaches is as follows. Our BucketCounterActor prints the label on a bucket and it accumulates all the quantities received so far:

import akka.actor.Actor

object BucketCounterActorProtocol {
  case class Bucket(label: String, quantity: Int)
}

class BucketCounterActor extends Actor {
  import BucketCounterActorProtocol._
  
  var counter = 0
  
  def receive = {
    case Bucket(label, quantity) =>
      counter += quantity
      print(label)
  }

}

Let’s add some ad hoc code to our actor for test purposes:

import akka.actor.{ActorRef, Actor}

object BucketCounterActorProtocol {
  case class Bucket(label: String, quantity: Int)
  
  // a new message to expose the internal status of the actor 
  case class GetCounter(receiver: ActorRef)
}

// adding an optional listener to the class
class BucketCounterActor(listener: Option[ActorRef] = None) extends Actor {
  import BucketCounterActorProtocol._
  
  var counter = 0
  
  def receive = {
    case Bucket(label, quantity) =>
      counter = counter + quantity
      print(label)
      // informing the listener of the side effect
      listener.map(_ ! label)
    
    // logic to expose internal status
    case GetCounter(receiver) => receiver ! counter
  }

}

Thanks to the code we just added, testing our actor is now going to be really easy:

class BucketCounterActorSpec extends TestKit(ActorSystem()) with SpecificationLike {
  import BucketCounterActorProtocol._
  
  "A Bucket Counter Actor" should {
    
    val actorProps = Props(new BucketCounterActor(Some(testActor)))
    val actor = system.actorOf(actorProps, "actor-to-test")
    
    val firstBucket = Bucket("Yo, I am a bucket", 1)
    val secondBucket = Bucket("I am another bucket", 9)

    "print out the name of the received buckets" in {
      actor ! firstBucket
      expectMsg(firstBucket.label)
      actor ! secondBucket
      expectMsg(secondBucket.label)
      success
    }
    
    "accumulate the quantity of buckets received" in {
      actor ! GetCounter(testActor)
      expectMsg(10)
      success
    }
  }

Summary

Akka actors are a powerful tool to build concurrent systems. This article has provided different examples on how actors can be tested thanks to the akka-testkit library, using both single and multi threaded approaches.