How to Integrate ReactiveMongo in your Akka Spray Application

Scalability can be challenging when database access is needed: the common approach is to block the thread until a response is received. ReactiveMongo is a MongoDB Scala Driver that provides fully non-blocking asynchronous I/O operation that increases the scalability of your system.
In a previous post we have seen how to build a REST Api with Spray: in this article we will describe how to expand our application to integrate ReactiveMongo.

All the code produced in this tutorial can be found here.

Our Goal

Our goal is to create an application to manage quizzes. In particular, we want to:
– create a quiz
– delete a quiz
– get a random question
– get a question by id
– answer a question by id

Details on how we have chosen to implement the Rest Interface can be found here. In the following sessions we will analyse how ReactiveMongo can be used to store the quiz entity in our MongoDB database without any blocking operation.

Set Up

First, we need an instance of MongoDB: we can set it up one in our local machine (see MongoDB official website for instructions). Also, we need to include the ReactiveMongo library as part of our SBT dependencies and provide information on our MongoDB instance:

// file build.sbt
libraryDependencies ++= {
  ...
  Seq(
  	...
    "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",
    ...
  )
}
// file application.conf
...
mongodb {
  database = "quiz-management"
  servers = ["localhost:27017"]
}

Minor refactoring is needed to make our application a little bit more structured: the original QuizProtocol class has been split in two (QuizProtocol and QuestionProtocol) and moved to a new package called model.api.
Finally, we now let MongoDB generate the quiz id rather than asking the user to select one.

ReactiveMongo Integration

First step is to define our persistence model. Also, we need to provide instructions on how to serialise/deserialise our QuizEntity in MongoDB.

// file QuizEntity.scala
// note the package model.persistence to separate it from our model.api representations
package com.danielasfregola.quiz.management.model.persistence

import com.danielasfregola.quiz.management.model.api.QuizProtocol.Quiz
import reactivemongo.bson.{BSONDocumentWriter, BSONDocument, BSONDocumentReader, BSONObjectID}

case class QuizEntity(id: BSONObjectID = BSONObjectID.generate,
                      question: String, 
                      correctAnswer: String)

object QuizEntity {

  implicit def toQuizEntity(quiz: Quiz) = QuizEntity(question = quiz.question, correctAnswer = quiz.correctAnswer)

  implicit object QuizEntityBSONReader extends BSONDocumentReader[QuizEntity] {
    
    def read(doc: BSONDocument): QuizEntity = 
      QuizEntity(
        id = doc.getAs[BSONObjectID]("_id").get,
        question = doc.getAs[String]("question").get,
        correctAnswer = doc.getAs[String]("answer").get
      )
  }
  
  implicit object QuizEntityBSONWriter extends BSONDocumentWriter[QuizEntity] {
    def write(quizEntity: QuizEntity): BSONDocument =
      BSONDocument(
        "_id" -> quizEntity.id,
        "question" -> quizEntity.question,
        "answer" -> quizEntity.correctAnswer
      )
  }
}

Let’s create a trait, called MongoDao, that defines how we use our configuration to connect to our MongoDB instance:

// file MongoDao.scala
package com.danielasfregola.quiz.management.dao

import com.typesafe.config.ConfigFactory
import reactivemongo.api.MongoDriver

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global

trait MongoDao {

  val config = ConfigFactory.load()
  val database = config.getString("mongodb.database")
  val servers = config.getStringList("mongodb.servers").asScala

  val driver = new MongoDriver
  val connection = driver.connection(servers)

  val db = connection(database)
}

We now define our collection and the I/O operations that we can execute on it:

package com.danielasfregola.quiz.management.dao

import com.danielasfregola.quiz.management.model.persistance.QuizEntity
import reactivemongo.api.QueryOpts
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.{BSONDocument, BSONObjectID}
import reactivemongo.core.commands.Count

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

trait QuizDao extends MongoDao {
  
  import com.danielasfregola.quiz.management.model.persistance.QuizEntity._
  import com.danielasfregola.quiz.management.model.api.QuizProtocol._
  
  val collection = db[BSONCollection]("quizzes")

  // it creates a new quiz entity
  def save(quizEntity: QuizEntity) = collection.save(quizEntity)
    .map(_ => QuizCreated(quizEntity.id.stringify))
  
  // it finds a question by id
  def findById(id: String) =
    collection.find(queryById(id)).one[QuizEntity]
  
  // it finds a random question
  def findOne = {
    val futureCount = db.command(Count(collection.name))
    futureCount.flatMap { count =>
      val skip = Random.nextInt(count)
      collection.find(emptyQuery).options(QueryOpts(skipN = skip)).one[QuizEntity]
    }
  }
  
  // it deletes a quiz entity by id
  def deleteById(id: String) = collection.remove(queryById(id)).map(_ => QuizDeleted)

  private def queryById(id: String) = BSONDocument("_id" -> BSONObjectID(id))

  private def emptyQuery = BSONDocument()
}

Almost done! We now just need to use our QuizDao trait as part of our QuizManager and QuestionManager classes:

// file QuizManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

class QuizManager extends QuizDao {

  def createQuiz(quizEntity: QuizEntity) = save(quizEntity)

  def deleteQuizEntity(id: String) = deleteById(id)
  
}

// file QuestionManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.api.QuestionProtocol._
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class QuestionManager extends QuizDao {

  def getQuestion(maybeId: Option[String] = None) = {

    def extractQuestion(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(quizEntity) => toQuestion(quizEntity)
      case _ => QuestionNotFound
    }
    tryGetQuiz(maybeId).map(extractQuestion)
  }

  def answerQuestion(id: String, proposedAnswer: Answer) = {
    
    def isAnswerCorrect(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(q) if (q.correctAnswer == proposedAnswer.answer) => CorrectAnswer
      case _ => WrongAnswer
    }
    
    tryGetQuiz(Some(id)).map(isAnswerCorrect)
  }

  private def tryGetQuiz(maybeId: Option[String]): Future[Option[QuizEntity]] = maybeId match {
    case Some(id) => findById(id)
    case _ => findOne
  }
  
}

Because the ReactiveMongo library is based on Futures, all the methods of our QuizManager and QuestionManager wrap their values in a Future: let’s adopt the Akka Pipe Pattern to send messages to our Responder Actor. An example on how this approach works is following:

// file RestInterface.scala
...
 pathPrefix("quizzes") {
      pathEnd {
        post {
          entity(as[Quiz]) { quiz => requestContext =>
            val responder = createResponder(requestContext)
            quizManager.createQuiz(quiz).pipeTo(responder)
          }
        }
      } 
...

quizManager.createQuiz(quiz) returns a Future[QuizCreated]: once the future is completed, the QuizCreated message is sent to the Responder Actor.

Summary

ReactiveMongo is a non-blocking asynchronous Scala Driver for MongoDB that is particularly suitable for highly scalable application. This article has described how ReactiveMongo can be easily integrated in an existing Akka Spray application.

All the code produced in this tutorial can be found here.

Published by

Daniela Sfregola

Tech Leader at Paytouch

3 thoughts on “How to Integrate ReactiveMongo in your Akka Spray Application”

  1. thx. it’s very helpful for me.
    but i can’t find that quizManager return Future[QuizCreated] anywhere.

    ex).
    quizManager.createQuiz(quiz) returns a Future[QuizCreated]:

    i wonder where i couldn’t find it or the soure code was really misspelling.

    i’m newbie scala, akka, spray..

    Like

  2. Daniela, I’m trying to modify your code to add a route that gets the entire quiz entity collection from mongo. As with the other poster, I’m new to Scala. Is there an easy way to do this? Thanks!

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s