How to build a REST API with Akka Http

In previous articles we have discussed how to use Spray to build REST APIs. Since Akka 2.4, Spray is no longer supported and it is replaced by Akka Http.
This article will introduce Akka Http, the new shiny toy from the Akka Team, and provide a tutorial on how it can be used to create a simple REST API.

All the code produced in this tutorial can be found on GitHub.

Why Akka Http?

Spray has been the Akka way of building APIs for quite some time.
Although not directly built by the Akka Team, it was heavily using the Akka ecosystem: Spray is implemented with Akka Actors.
The project went so well that, after some time, the Akka Team decided to adopt it. After the Akka Team released Akka Streams, they realised that Spray’s performance could be improved by using Akka Streams together with Akka Actors.

Since Akka 2.4, Akka Http is the official Akka toolkit to create REST API, both client and server side.
Note that in Akka 2.4, Spray is no longer supported…so you are forced to migrate from Spray to Akka Http if you want to use any of the other latest tools of Akka, like Akka Persistance.

At the time of this writing, Akka Http is still an experimental module — but it has been declared stable for production. Also its performance is worse than Spray: the Akka Team has been focusing on its interface, but they have promised to massively improve its performance by Q1 2016.

Our CRUD Application

Previously, we described how to create a simple CRUD application with Spray.
In this article we will rewrite exactly the same application using Akka Http instead of Spray.

In particular, our application will create, retrieve, update, delete a Question entity.
A question has 3 fields (id, title, text) and its case class looks as following:

case class Question(id: String, title: String, text: String)

Also, to keep things simple, we are going to keep all the data in memory, rather than properly storing it in a database.
The class QuestionService simulates a persistent storage by keeping all the entities in a Vector.
The following code is the skeleton of the QuestionService class (more details on its implementation can be found here):

package com.danielasfregola.quiz.management.services

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import scala.concurrent.{ExecutionContext, Future}

class QuestionService(implicit val executionContext: ExecutionContext) {

  var questions = Vector.empty[Question]

  def createQuestion(question: Question): Future[Option[String]] = ...

  def getQuestion(id: String): Future[Option[Question]] = ...

  def updateQuestion(id: String, update: QuestionUpdate): Future[Option[Question]] = ...

  def deleteQuestion(id: String): Future[Unit] = ...

}

Last but not least, we will use json4s to (de)serialise a json into a case class (more information on how to use json4s can be found here).

Setup

The first step is to add the right dependencies to our project:

// build.sbt
...
resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
                  Resolver.bintrayRepo("hseeberger", "maven"))

libraryDependencies ++= {
  val AkkaVersion       = "2.3.9"
  val AkkaHttpVersion   = "2.0.1"
  val Json4sVersion     = "3.2.11"
  Seq(
    "com.typesafe.akka" %% "akka-slf4j"      % AkkaVersion,
    "com.typesafe.akka" %% "akka-http-experimental" % AkkaHttpVersion,
    "ch.qos.logback"    %  "logback-classic" % "1.1.2",
    "org.json4s"        %% "json4s-native"   % Json4sVersion,
    "org.json4s"        %% "json4s-ext"      % Json4sVersion,
    "de.heikoseeberger" %% "akka-http-json4s" % "1.4.2"
  )
}
...

Then, we need to bind our api to an host and port:

// Main.scala
package com.danielasfregola.quiz.management

import scala.concurrent.duration._
import akka.actor._
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.util.Timeout

import com.typesafe.config.ConfigFactory

object Main extends App with RestInterface {
  val config = ConfigFactory.load()
  val host = config.getString("http.host")
  val port = config.getInt("http.port")

  implicit val system = ActorSystem("quiz-management-service")
  implicit val materializer = ActorMaterializer()


  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout(10 seconds)

  val api = routes

  Http().bindAndHandle(handler = api, interface = host, port = port) map { binding =>
    println(s"REST interface bound to ${binding.localAddress}") } recover { case ex =>
    println(s"REST interface could not bind to $host:$port", ex.getMessage)
  }
}

Note that RestInterface is just a collection of routes and the services needed:

package com.danielasfregola.quiz.management

import scala.concurrent.ExecutionContext

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.resources.QuestionResource
import com.danielasfregola.quiz.management.services.QuestionService

trait RestInterface extends Resources {

  implicit def executionContext: ExecutionContext

  lazy val questionService = new QuestionService

  val routes: Route = questionRoutes

}

trait Resources extends QuestionResource

Question Resource

QuestionResource is a generic Resource:
– it has a service that performs some operations on the entity
– it has some routes (see later paragraphs of this article)
– it extends a generic Resource, called MyResource

Its skeleton is following:

// QuestionResource.scala
package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = ???

  }
}

MyResource is a trait where we add code that is common/useful for all the resources (the code can be found here).
In particular, it includes the json4s support to (de)serialise case classes and some helper methods that will make our akka-http routing easier.

Now that we have setup the skeleton of our application, we can focus on the implementation of our endpoints.

POST – Create a Question

Usage

The first task of our application is to define an endpoint to create a question entity.
According to the REST protocol, an entity is created through a POST request that should reply with a 201 (Created) HTTP status code. Also, a Location Header with the URI that identifies the location of the new entity should be returned.
Note that a POST request is non-idempotent: if the entity already exists or cannot be created, we should return an HTTP error status code.

For our questions application, this can be translated in the following curl command:

curl -v -H "Content-Type: application/json" \
   -X POST http://localhost:5000/questions \
   -d '{"id": "test", "title": "MyTitle", "text":"The text of my question"}'

The first time we make the request, we should get a reply similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
>
* upload completely sent off: 68 out of 68 bytes
 HTTP/1.1 201 Created
 Location: http://localhost:5000/questions/test
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:16:50 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

If we repeat the request again, we will get an HTTP response with a 409 (Conflict) status code as the entity already exists:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
>
* upload completely sent off: 68 out of 68 bytes
 HTTP/1.1 409 Conflict
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:17:07 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

Implementation

As described in the previous paragraph, when creating an entity we would like to provide the URI of the created entity in a Location header.
Our implementation is tailored to the behaviour of our system: when QuestionService creates a question, it returns a Future[Option[T]] and, if the returned option is not defined, we want to return a different HTTP status code.
Unfortunately, Akka Http doesn’t have a default implementation to achieve this, so we will have to create our own my adding to our MyResource trait:

// MyResource.scala
ppackage com.danielasfregola.quiz.management.routing

import akka.http.scaladsl.marshalling.{ToResponseMarshaller, ToResponseMarshallable}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{Directives, Route}

import com.danielasfregola.quiz.management.serializers.JsonSupport

trait MyResource extends Directives with JsonSupport {

  implicit def executionContext: ExecutionContext

  def completeWithLocationHeader[T](resourceId: Future[Option[T]], ifDefinedStatus: Int, ifEmptyStatus: Int): Route =
    onSuccess(resourceId) {
      case Some(t) => completeWithLocationHeader(ifDefinedStatus, t)
      case None => complete(ifEmptyStatus, None)
    }

  def completeWithLocationHeader[T](status: Int, resourceId: T): Route =
    extractRequestContext { requestContext =>
      val request = requestContext.request
      val location = request.uri.copy(path = request.uri.path / resourceId.toString)
      respondWithHeader(Location(location)) {
        complete(status, None)
      }
    }

  def complete[T: ToResponseMarshaller](resource: Future[Option[T]]): Route =
    onSuccess(resource) {
      case Some(t) => complete(ToResponseMarshallable(t))
      case None => complete(404, None)
    }

  def complete(resource: Future[Unit]): Route = onSuccess(resource) { complete(204, None) }

}

We can now put everything together and define the endpoint to create a question entity:

// QuestionResource.scala
package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    pathEnd {
      post {
        entity(as[Question]) { question =>
          completeWithLocationHeader(
            resourceId = questionService.createQuestion(question),
            ifDefinedStatus = 201, ifEmptyStatus = 409)
          }
        }
    } ~ ...

  }
}

GET – Retrieve a Question

Usage

Now that we have created a question, we can retrieve it by performing a GET request to the URI that identifies the entity (i.e.: the one returned in the Location Header).
The request should respond with either a 200 (OK) HTTP status code with a body containing the question entity or a 404 (NotFound) HTTP status code with empty body.

For example, we can get an existing question with the following curl command…

curl -v http://localhost:5000/questions/test

…and it should return something similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
>
 HTTP/1.1 200 OK
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:17:31 GMT
 Content-Type: application/json
 Content-Length: 64

* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"The text of my question"}

Moreover, if we request an entity that doesn’t exists…

curl -v http://localhost:5000/questions/non-existing-question

….we should get a 404 error code:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
>
 HTTP/1.1 404 Not Found
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:18:40 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

Implementation

QuestionService returns a Future[Option[Question]] when retrieving a question.
Differently from Spray, Akka Http doesn’t seem to complete optional values correctly: complete(Future(None)) returns an http response with code 200 and an empty body rather than a 404 http response — which in my opinion doesn’t make much sense, considering that 200 should have a non-empty body.

UPDATE:
The Akka team has discussed this issue on GitHub (see here for more information) and this does not seem to be a bug: apparently there are some case scenarios where complete(Future(None)) needs to be completed with something else rather than 404.
Thank you to @ktoso for looking into this!

Not a problem, we can add some *black magic* to our MyResource trait to make look the code exactly the same as before:

// MyResource.scala
package com.danielasfregola.quiz.management.routing

import akka.http.scaladsl.marshalling.{ToResponseMarshaller, ToResponseMarshallable}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{Directives, Route}

import com.danielasfregola.quiz.management.serializers.JsonSupport

trait MyResource extends Directives with JsonSupport {

  implicit def executionContext: ExecutionContext

  ...

  def complete[T: ToResponseMarshaller](resource: Future[Option[T]]): Route =
    onSuccess(resource) {
      case Some(t) => complete(ToResponseMarshallable(t))
      case None => complete(404, None)
    }

  ...
}

Thanks to our trick, our route now looks exactly the same as with Spray:

package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      get {
        complete(questionService.getQuestion(id))
      } ~
      ...
    }
  }

}

PUT – Update a Question

Usage

When updating an entity, we should use a PUT request. Also, we should send only the fields that we want to update, not the whole object. Not only this will make the usage of our API easier, but it will also reduce potential concurrency issues.
If the update goes through, we should get a HTTP response with a 200 (OK) status code with the updated entity in the body. On the other side, if the update was not possible, for example because the entity no longer exists, we should get a HTTP response with status 404 (NotFound) and an empty body.
Note that a PUT request is idempotent: performing the update multiple times should already return the same result.

In our application we can update the question entity with the following curl command…

curl -v -H "Content-Type: application/json" \
   -X PUT http://localhost:5000/questions/test \
   -d '{"text":"Another text"}'

….and get the following reply:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
>
* upload completely sent off: 23 out of 23 bytes
 HTTP/1.1 200 OK
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:19:31 GMT
 Content-Type: application/json
 Content-Length: 53

* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"Another text"}

If we try to update a resource that doesn’t exist, we should get a 404 response:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
>
* upload completely sent off: 23 out of 23 bytes
 HTTP/1.1 404 Not Found
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:20:07 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

Implementation

As explained in the previous section, we want the client of our API to send just the fields to update, not the whole entity. In order to achieve this, we will deserialise the body of our PUT request to the following case class:

case class QuestionUpdate(title: Option[String], text: Option[String])

Note that we decided not to allow our clients to update the field id, as it is used to locate the entity.

Keeping in mind that QuestionService returns Future[Option[Question]] when updating a question, we can reuse our “black magic” trick used in our GET route too make our code look nice:

package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route
import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      ... ~
      put {
        entity(as[QuestionUpdate]) { update =>
          complete(questionService.updateQuestion(id, update))
        }
      } ~ ...
    }
  }

}

DELETE – Delete a Question

Usage

Finally, we want to have an endpoint to delete a question entity. This can be achieved by sending a DELETE request to the URI that identifies the entity that should reply with a 204 (NoContent) status code once the operation has been completed.
Note that DELETE is idempotent, so deleting a resource that has been already deleted should still return an HTTP response with a 204 (NoContent) status code and an empty body.

For example, we can delete the question test with the following…

curl -v -X DELETE http://localhost:5000/questions/test

…and get the following result back:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> DELETE /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
>
 HTTP/1.1 204 No Content
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:20:30 GMT
 Content-Type: application/json

* Connection #0 to host localhost left intact

Implementation

When deleting a question, QuestionService returns Future[Unit].
Unfortunately, Akka Http resolves complete(Future(())) with an http response with code 200 and empty body — same as complete(Future(None))! Please, put a comment below if you know the rationale behind this design choice.

UPDATE:
Thank you again to @ktoso from the Akka Team for looking into this! Apparently this is an issue related to Json4s.
Here is the crystal clear explanation that @hseeberger has provided in the comments below:

What causes the issue is that (a) Json4s cannot marshal `AnyVal`s and (b) Json4s happily marshals `Future`. This leads to bypassing the `Future` marshaller from Akka HTTP.

Not a problem, we just need to add the following code to MyResource to make it resolve complete with a 204 http response with empty body:

// MyResource.scala
package com.danielasfregola.quiz.management.routing

import akka.http.scaladsl.marshalling.{ToResponseMarshaller, ToResponseMarshallable}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{Directives, Route}

import com.danielasfregola.quiz.management.serializers.JsonSupport

trait MyResource extends Directives with JsonSupport {

  implicit def executionContext: ExecutionContext

  ...

  def complete(resource: Future[Unit]): Route = onSuccess(resource) { complete(204, None) }

}

Our DELETE endpoint can now be implemented as following:

package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route
import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    .. ~
    path(Segment) { id =>
      ... ~
      delete {
        complete(questionService.deleteQuestion(id))
      }
    }
  }

}

Summary

In this article we have introduced Akka Http and we have provided a simple tutorial on how to create a simple CRUD application using Akka Http.

The completed code of this tutorial can be found on GitHub.

Thank to @ktoso and @hseeberger for clarifying some issues raised in this article!

How to build a Scala REST CRUD application with Spray

In previous articles we have described how to build a REST Api with Spray and how to (de)serialize case classes with json4s. However, in order to keep things simple, we didn’t always do things as suggested by spray.io.

In this article we will redeem ourselves and we will describe how to build a REST CRUD application in Spray, taking full advantage of the tools offered by the Spray’s tool kit.

All the code shown in this tutorial can be found on GitHub.

Our CRUD Application

A REST CRUD application is an application that manipulated entities using 4 key operations: create, retrieve, update, delete.
In this tutorial we will describe how to create a simple REST CRUD application to manage question entities.
A question has the following fields: id, title, text. We are going to use json4s to translate it to the following case class (for more information on how to use json4s, have a look here):

case class Question(id: String, title: String, text: String)

Also, to keep things simple, we are not going to store the entities in a database but we will simply keep them in memory. In this tutorial the class QuestionService simulates a persistent storage by storing all the questions in a Vector (see its complete code here):

package com.danielasfregola.quiz.management.services

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import scala.concurrent.{ExecutionContext, Future}

class QuestionService(implicit val executionContext: ExecutionContext) {

  var questions = Vector.empty[Question]

  def createQuestion(question: Question): Future[Option[String]] = ...

  def getQuestion(id: String): Future[Option[Question]] = ...

  def updateQuestion(id: String, update: QuestionUpdate): Future[Option[Question]] = ...
    
  def deleteQuestion(id: String): Future[Unit] = ...

}

POST – Create a Question

Usage

The first task of our application is to define an endpoint to create a question entity.
According to the REST protocol, an entity is created through a POST request that should reply with a 201 (Created) HTTP status code. Also, a Location Header with the URI that identifies the location of the new entity should be returned.
Note that a POST request is non-idempotent: if the entity already exists or cannot be created, we should return an HTTP error status code.

For our questions application, this can be translated in the following curl command:

curl -v -H "Content-Type: application/json" \
   -X POST http://localhost:5000/questions \
   -d '{"id": "test", "title": "MyTitle", "text":"The text of my question"}'

The first time we make the request, we should get a reply similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
> 
* upload completely sent off: 68 out of 68 bytes
< HTTP/1.1 201 Created
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 11:37:11 GMT
< Location: http://localhost:5000/questions/test
< Content-Length: 0
<
* Connection #0 to host localhost left intact

If we repeat the request again, we will get an HTTP response with a 409 (Conflict) status code as the entity already exists:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
> 
* upload completely sent off: 68 out of 68 bytes
< HTTP/1.1 409 Conflict
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 11:53:34 GMT
< Content-Length: 0
< 

Implementation

Spray has several methods to complete a generic result and convert it into a Route (see the Spray Documentation for more information). However, there isn’t a standard function to transform a result into a Location Header….so we are going to write one! 😀
Note that our implementation is tailored to the behaviour of our system: when QuestionService creates a question, it returns a Future[Option[T]] and, if the returned option is not defined, we want to return a different HTTP status code.

package com.danielasfregola.quiz.management.routing

import com.danielasfregola.quiz.management.serializers.JsonSupport
import spray.http.HttpHeaders
import spray.routing._
import scala.concurrent.{ExecutionContext, Future}

trait MyHttpService extends HttpService with JsonSupport {

  implicit val executionContext: ExecutionContext

  def completeWithLocationHeader[T](resourceId: Future[Option[T]], ifDefinedStatus: Int, ifEmptyStatus: Int): Route =
    onSuccess(resourceId) { maybeT =>
      maybeT match {
        case Some(t) => completeWithLocationHeader(ifDefinedStatus, t)
        case None => complete(ifEmptyStatus, None)
      }
    }

  def completeWithLocationHeader[T](status: Int, resourceId: T): Route =
    requestInstance { request =>
      val location = request.uri.copy(path = request.uri.path / resourceId.toString)
      respondWithHeader(HttpHeaders.Location(location)) {
        complete(status, None)
      }
    }
}

We can now put everything together and define the endpoint to create a question entity:

package com.danielasfregola.quiz.management.resources

import com.danielasfregola.quiz.management.entities.{QuestionUpdate, Question}
import com.danielasfregola.quiz.management.routing.MyHttpService
import com.danielasfregola.quiz.management.services.QuestionService
import spray.routing._

trait QuestionResource extends MyHttpService {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    pathEnd {
      post {
        entity(as[Question]) { question =>
          completeWithLocationHeader(
            resourceId = questionService.createQuestion(question),
            ifDefinedStatus = 201, ifEmptyStatus = 409)
          }
        }
    } ~
    ...
  }

}

GET – Retrieve a Question

Usage

Now that we have created a question, we can retrieve it by performing a GET request to the URI that identifies the entity (i.e.: the one returned in the Location Header). The request should respond with either a 200 (OK) HTTP status code with a body containing the question entity or a 404 (NotFound) HTTP status code with empty body.

For example, we can get an existing question with the following curl command…

curl -v http://localhost:5000/questions/test

…and it should return something similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:23:34 GMT
< Content-Type: application/json; charset=UTF-8
< Content-Length: 64
< 
* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"The text of my question"}

Moreover, if we request an entity that doesn’t exists…

curl -v http://localhost:5000/questions/non-existing-question

….we should get a 404 error code:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 404 Not Found
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:25:43 GMT
< Content-Length: 0
< 
* Connection #0 to host localhost left intact

Implementation

This behaviour can be easily be achieved with Spray as it will automatically complete optional values:
– if the option is defined, complete will transform it in a HTTP response with status 200 (OK) status code and a body containing the entity json representation.
– if the option is empty, it will just return a HTTP response with status 404 (NotFound) status code.

package com.danielasfregola.quiz.management.resources

import com.danielasfregola.quiz.management.entities.{QuestionUpdate, Question}
import com.danielasfregola.quiz.management.routing.MyHttpService
import com.danielasfregola.quiz.management.services.QuestionService
import spray.routing._

trait QuestionResource extends MyHttpService {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      get {
        complete(questionService.getQuestion(id))
      } ~
      ...
    }
  }
}

PUT – Update a Question

Usage

When updating an entity, we should use a PUT request. Also, we should send only the fields that we want to update, not the whole object. Not only this will make the usage of our API easier, but it will also reduce potential concurrency issues.
If the update goes through, we should get a HTTP response with a 200 (OK) status code with the updated entity in the body. On the other side, if the update was not possible, for example because the entity no longer exists, we should get a HTTP response with status 404 (NotFound) and an empty body.
Note that a PUT request is idempotent: performing the update multiple times should already return the same result.

In our application we can update the question entity with the following curl command…

curl -v -H "Content-Type: application/json" \
   -X PUT http://localhost:5000/questions/test \
   -d '{"text":"Another text"}'

….and get the following reply:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
> 
* upload completely sent off: 23 out of 23 bytes
< HTTP/1.1 200 OK
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:44:03 GMT
< Content-Type: application/json; charset=UTF-8
< Content-Length: 53
< 
* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"Another text"}

Similarly, if we try to update a resource that doesn’t exist…

curl -v -H "Content-Type: application/json" \
   -X PUT http://localhost:5000/questions/non-existing-question \
   -d '{"text":"Another text"}'

…we should get a 404 (NotFound) error code:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
> 
* upload completely sent off: 23 out of 23 bytes
< HTTP/1.1 404 Not Found
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:46:15 GMT
< Content-Length: 0
< 
* Connection #0 to host localhost left intact

Implementation

As explained in the previous section, we want the client of our API to send just the fields to update, not the whole entity. In order to achieve this, we will deserialize the body of our PUT request to the following case class:

case class QuestionUpdate(title: Option[String], text: Option[String])

Note that we decided not to allow our clients to update the field id, as it is used to locate the entity.

Similarly to what we did for the GET request, Spray does all the work for us:

package com.danielasfregola.quiz.management.resources

import com.danielasfregola.quiz.management.entities.{QuestionUpdate, Question}
import com.danielasfregola.quiz.management.routing.MyHttpService
import com.danielasfregola.quiz.management.services.QuestionService
import spray.routing._

trait QuestionResource extends MyHttpService {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { 
      ... ~
      put {
        entity(as[QuestionUpdate]) { update =>
          complete(questionService.updateQuestion(id, update))
        }
      } ~
      ...
    }
  }
}

DELETE – Delete a Question

Usage

Finally, we want to have an endpoint to delete a question entity. This can be achieved by sending a DELETE request to the URI that identifies the entity that should reply with a 204 (NoContent) status code once the operation has been completed. Note that DELETE is idempotent, so deleting a resource that has been already deleted should still return an HTTP response with a 204 (NoContent) status code and an empty body.

For example, we can delete the question test with the following…

curl -v -X DELETE http://localhost:5000/questions/test

…and get the following result back:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> DELETE /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 204 No Content
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:58:30 GMT
< Content-Type: application/json; charset=UTF-8
< Content-Length: 2
< 
* Excess found in a non pipelined read: excess = 2 url = /questions/non-existing-question (zero-length body)
* Connection #0 to host localhost left intact

Implementation

Once again, Spray makes our life really easy as all we have to do in order to define an endpoint to delete a question is just to reuse already defined functions in the Spray’s tool kit:

package com.danielasfregola.quiz.management.resources

import com.danielasfregola.quiz.management.entities.{QuestionUpdate, Question}
import com.danielasfregola.quiz.management.routing.MyHttpService
import com.danielasfregola.quiz.management.services.QuestionService
import spray.routing._

trait QuestionResource extends MyHttpService {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      ... ~
      delete {
        complete(204, questionService.deleteQuestion(id))
      }
    }
  }
}

Summary

In this article we have described what a REST CRUD application is. Also, we have provided a simple tutorial on how to create a simple CRUD application using Spray. The code of the application analysed can be found on GitHub.

How to Create a Spray Custom Authenticator

Spray has a few standard authenticators already built in. However, what if the authentication we need is not supported? In this article we will describe how to create a custom authenticator in Spray. In particular, we will implement a custom client id/secret token-based authentication.

Spray Authentication: how it works

Spray has already some built in authentication systems. For example, it supports HTTP Basic Authentication: have a look at their documentation here. What if the authentication logic that we need is not supported? Luckily, Spray allows the implementation of custom authenticators.

The authenticate directive has two signatures:

  def authenticate[T](auth: ⇒ Future[Authentication[T]])(implicit executor: ExecutionContext): Directive1[T]
  def authenticate[T](auth: ContextAuthenticator[T])(implicit executor: ExecutionContext): Directive1[T]

where ContextAuthenticator and Authentication are aliases for the following types:

// from spray.routing.authentication package object
...
  type Authentication[T] = Either[Rejection, T]
  type ContextAuthenticator[T] = RequestContext ⇒ Future[Authentication[T]]
...

In other words, a ContextAuthenticator is a function that takes a RequestContext (i.e.: a wrapper that contains all the information about the received request) and it uses it to either authenticate or reject the request.

Cient Id/Secret Custom Authenticator

Our goal is to create a custom authenticator that looks at the query parameters of a request, it extracts some client_id and client_secret and it authorizes the request if they are correct.

For the purposes of this tutorial we have decided to focusing on the second signature of the authenticate directive, so we will define a ContextAuthenticator to use with the directive.

First, let’s create a data container for our credentials and an auxiliary method to extract the id/secret from the request context:

  case class Credentials(id: String, secret: String)

  private def extractCredentials(ctx: RequestContext): Option[Credentials] = {
    val queryParams = ctx.request.uri.query.toMap
    for {
      id <- queryParams.get("client_id")
      secret <- queryParams.get("client_secret")
    } yield Credentials(id, secret)
  }

We can now define our ContextAuthenticator as following, keeping in mind that Authentication[T] is just a type alias for Either[Rejection, T]:

 val authenticator: ContextAuthenticator[Unit] = { ctx =>
      Future {
        val maybeCredentials = extractCredentials(ctx)
          maybeCredentials.fold[authentication.Authentication[Unit]](
            Left(AuthenticationFailedRejection(CredentialsMissing, List()))
          )( credentials =>
              credentials match {
                case AppCredentials("my-client-id", "my-client-super-secret") => Right()
                case _ => Left(AuthenticationFailedRejection(CredentialsRejected, List()))
              }
          )
      }
  }

If any of the authentication query parameters are missing, the following response is returned:

Status: 401 Unauthorized
Body: The resource requires authentication, which was not supplied with the request

If the credentials are provided but they are not correct, the client will see the following message:

Status: 401 Unauthorized
Body: The supplied authentication is invalid

Finally, if the credentials are correct, the request will be authorized and satisfied if possible.

Note that in a more realistic case scenario, you would probably return a User entity instead of Unit. Moreover, rather than having the correct credentials hard-coded in the code, they should be either configurable or stored in a proper data storage.

We are now ready to use our custom authenticator!

For example, to make all our endpoints use our custom authenticator we could do something similar to the following:

 def routes: Route = 
    sealRoute {
        authenticate(authenticator) { authenticated =>
            firstResourceRoutes ~
              secondResourceRoutes
          }
      }
    }

Summary

Spray allows the implementation of custom authenticators. This article provides an example of how this feature can be used to implement a client id/secret token based authentication.

Akka Dead Letters Channel

Akka doesn’t guarantee the delivery of a message. What happens when a message cannot be delivered? In this article we will describe how the Dead Letters Channel works and how it can be used to spot issues in our system.

How it works

In a previous article we have described the use of Event Streams in Akka. The Dead Letter Channel is nothing more that a special Event Stream that the system uses internally every time a message cannot be delivered: either because the message cannot be processed or delivered.

When Akka is redirecting the failed message to the Dead Letter actor, it wraps the message in a case class called Dead Letter to provide the message, the original sender and recipient:

case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)

Unless specified differently, dead letters are logged in the INFO level: more information on how to tweak your logging settings can be found here.

How to use it

Because the Dead Letter Channel is an Event Stream, we can subscribe to it and listen to all the messages it publishes.

The code used for this tutorial is available here.

First of all, let’s create a dummy actor, called EchoActor, that prints all the messages it receives:

 
class EchoActor extends Actor {
  
  def receive = {
    case msg => println(s"New msg received: $msg")
  }
  
}

The second step is to create our actor system: we will have two instance of EchoActor, one called deadLettersSubscriber that will listen for DeadLetters and the other, called echoActor, that will simply wait and receive messages.

  implicit val system = ActorSystem("dead-letters-usage-example")

  val deadLettersSubscriber = system.actorOf(Props[EchoActor], name = "dead-letters-subscriber")
  val echoActor = system.actorOf(Props[EchoActor], name = "generic-echo-actor")

  system.eventStream.subscribe(deadLettersSubscriber, classOf[DeadLetter])

When successfully sending a message, no dead letter is generated.

  echoActor ! "First Message"
  // generic-echo-actor - New msg received: First Message

However, when we try to send a message to an actor that has been killed, the message is successfully transformed into a DeadLetter.

  echoActor ! PoisonPill
  echoActor ! "Second Message"
  // dead-letters-subscriber - New msg received: DeadLetter(Second Message,Actor[akka://dead-letters-usage-example/deadLetters],Actor[akka://dead-letters-usage-example/user/generic-echo-actor#317003256])
  // INFO  [RepointableActorRef]: Message [java.lang.String] from Actor[akka://dead-letters-usage-example/deadLetters] to Actor[akka://dead-letters-usage-example/user/generic-echo-actor#317003256] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Finally, we can also send messages directly to the Dead Letter Actor: this is usually not advised as the Dead Letter Channel should be reserved for the system to redirect failed messages.

  system.deadLetters ! "Dead Message"
  // dead-letters-subscriber - New msg received: DeadLetter(Dead Message,Actor[akka://dead-letters-usage-example/deadLetters],Actor[akka://dead-letters-usage-example/deadLetters])
  // INFO  [DeadLetterActorRef]: Message [java.lang.String] from Actor[akka://dead-letters-usage-example/deadLetters] to Actor[akka://dead-letters-usage-example/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Summary

Akka redirects all the messages that couldn’t be delivered or process to the Dead Letter Channel. In this article we have discussed how Akka uses it and how we can exploit it when testing our system and investigating issues with our system.

Peer-to-Many Communication in Akka

The most common communication channel in Akka is Peer-to-Peer, where an individual actor sends a message directly to another individual actor actor. However, sometimes this is not enough as we may need a Peer-to-Many type of communication, where an individual actor sends a message to a group of actors. This is particularly useful when you need to model your system using a Publisher-Subscriber Pattern. This article will provide a quick tutorial on how to use and customise an Event Bus, the Akka way of implementing a Peer-to-Many communication channel.

Event Stream

Event Stream is the simplest and most common implementation of an Event Bus. It follows the classic Publisher-Subscriber Pattern: one system actor will publish a message and all the actors that subscribed to that specific message type will receive it.

Let’s see with a simple tutorial how easily this can be achieved: (gist available here).

In this tutorial, we want to model the following scenario: every time someone publishes a book, all the subscribers need to receive it.

First of all, we need to define what a book is:

case class Book(title: String, authors: List[String])

Then, we need to specify an Actor that acts as book publisher: every time it receives a book, it publishes it on the System Event Stream.

class BookPublisher extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      context.system.eventStream.publish(book)
    }
  }
  
}

Finally, let’s create an Actor that will subscribe to the System Event Stream for all messages of type Book. Note that the preStart function will be executed by Akka right after the creation of the Actor.

class BookSubscriber extends Actor {
  
  override def preStart = context.system.eventStream.subscribe(self, classOf[Book])
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Done! See, it wasn’t that bad… 🙂

Now we just need to play with our system to make sure it works as expected:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
 
  val author = "Author"
  
  val bookPublisher = system.actorOf(Props[BookPublisher], name = "book-publisher")
  
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1") 
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
  
  bookPublisher ! Book(title = "A book title", authors = List(author, "Another author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-2 and I have received a new book: Book(A book title,List(Author, Another author))
  
  system.eventStream.unsubscribe(subscriber2, classOf[Book])
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another author))
  // My name is subscriber-1 and I have received a new book: Book(Another book title,List(Another author))
} 

Note that instead of unsubscribing subscriber2 to all the messages of type Book, we could have also unsubscribed it to any type of messages by using system.eventStream.unsubscribe(subscriber2).

Custom Event Bus

Event Streams are really easy to use. However, they may not be that useful if we want to perform some filtering on the published message. One solution to our problem could be to do some filtering before publishing the message, but what if this cannot be done or simply we want to do it in a more elegant way?

An Event Stream is a specific implementation of a Event Bus trait: Akka gives us to opportunity to reuse this trait to create Custom Event Buses.

Assume that now a subscriber wants to receive books for a specific author — gist of the following code can be found here.

First of all, the BookSubscriber actor doesn’t need to automatically subscribe to the System Event Stream. Moreover, the BookPublisher actor now needs to publish on a given Event Bus, rather than the System Event Stream.

class BookPublisher(bus: AuthorBookBus) extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      bus.publish(book)
    }
  }
  
}
 
class BookSubscriber extends Actor {
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Finally, let’s define our AuthorBookBus that will filter books according to their authors.

class AuthorBookBus(author: String) extends EventBus
  with LookupClassification
  with ActorEventBus {
  
  type Event = Book
  type Classifier = Boolean
  
  override def mapSize = 2
  
  override def classify(book: Book) = book.authors.contains(author)
 
  override protected def publish(book: Book, subscriber: Subscriber)= subscriber ! book
} 

Our Event Bus accepts events of type Book and it filters books with two possible values: true if author is one of the authors of the book, false otherwise — and this is why override def mapSize = 2! The function classify is used to categories a book according to the Classifier type. Akka provides different type of classifiers (see here), in our case the LookupClassification was enough as we wanted a classification based on the event itself.

The following app shows an example of how our actor system could be used:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
  
  val author = "Author"
 
  val authorBookBus = new AuthorBookBus(author)
  val bookPublisher = system.actorOf(Props(new BookPublisher(authorBookBus)), name = "book-publisher")
 
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1")
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
 
  authorBookBus.subscribe(subscriber1, true)
  // i.e.: subscriber1 will receive all the books
  // where one of the authors is "Author"

  authorBookBus.subscribe(subscriber2, false)
  // i.e.: subscriber2 will receive all the books 
  // where "Author" is not an author

  bookPublisher ! Book(title = "A book title", authors = List(author, "Another Author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another Author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another Author))
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another Author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another Author))
  // My name is subscriber-2 and I have received a new book: Book(Another book title,List(Another Author))
}

Summary

Not only Akka allows Peer-to-Peer communication, but also Peer-to-Many, which is useful to implement a publisher-subscriber pattern. This article has described how this can be achieved using Event Streams for simplest case scenarios and Custom Event Buses when some classification on the event is needed.

How to Compose Futures

Futures are a powerful tool that has been developed by the Akka team and then adopted as a standard Scala library from version 2.10.
A Future is a placeholder for a value that will be available in the future: thanks to it, it is possible to run operations in parallel and to worry about what to do with it only once the value is available making our applications more scalable and performant. A lot can be achieved with it, have a look at the official Scala documentation for Future and Promises. Each future can be seen as an isolated parallel operation, so combining them can be challenging: in this article we will describe how Futures can be composed together.

How to Select the Fastest Future

Let’s assume that in our application we have more services to perform the same operation and that these services have a different response time according to their traffic load. Because our application doesn’t have any information on the load of each service, or simply we don’t want to rely on it, we want to call all the services and get the first reply we get back: let’s see how this can be achieved using futures.

First of all, let’s simplify our life a bit: for the purposes of this tutorial, we will simulate the behaviour of our services with a method that will wait a period of time before returning a String wrapped in a Future:

def reply(timeout: Duration, msg: String): Future[String] = Future {
  Thread.sleep(timeout.toMillis)
  msg
}

Future.firstCompletedOf is the function that we are looking for: it will get a sequence of futures and return the first one that completes:

val futureSlowReply = reply(1 second, "Hello from a slow fella")
val futureFastReply = reply(100 milliseconds, "I am a super fast fella!")

val futureReplies = Seq(futureSlowReply, futureFastReply)
val futureFastestReply = Future.firstCompletedOf(futureReplies)

Await.result(futureFastestReply, 100 milliseconds)
// res0: String = I am a super fast fella!

Note that waiting 100 milliseconds to complete the future is enough: all the futures are run in parallel and we know that the fastest will complete by then.

How to Combine Futures in Parallel

What if we have different services that process that same information differently? For example, given a customer id we have a service to retrieve the account information, another to retrieve the payment details, another to retrieve product suggestions based on previous selections. We could do it the old Java style way and retrieve sequentially all the information…or we could retrieve all the information in parallel and be really efficient! 😀

Let’s see how this can be achieved using the zip method of the Future class:

val futureSlowReply = reply(1 second, "Hello from a slow fella")
val futureFastReply = reply(100 milliseconds, "I am a super fast fella!")

val futureAllParallelReplies< = futureSlowReply.zip(futureFastReply)
Await.result(futureAllParallelReplies, 1 second)
// res1: (String, String) = (Hello from a slow fella,I am a super fast fella!)

Note that waiting the combined future value, called futureAllParallelReplies, for less than 1 second would generate a java.util.concurrent.TimeoutException: the zip function needs all the futures to be completed before returning a composition of all the futures!

How to Concatenate Futures

In order to combine futures in parallel they need to be independent from each other. What if this is not possible and we need to run them sequentially?

All we need to do is using the for-comprehension loop to force the futures to run sequentially:

def futureAllSequentialReplies(msg: String) = for {
  firstReply <- reply(100 milliseconds, msg)
  nextMsg = if (msg.length < 3) msg.reverse else msg.toUpperCase 
  secondReply <- reply(200 milliseconds, nextMsg)
} yield (firstReply, secondReply)

Await.result(futureAllSequentialReplies("Hi"), 400 milliseconds)
// res2: (String, String) = (Hi,iH)
Await.result(futureAllSequentialReplies("Hello"), 400 milliseconds)
// res3: (String, String) = (Hello,HELLO)

Note that waiting for 300 milliseconds is not enough: not only the futures are run sequentially moreover, but also we spend some time computing the nextMsg String.

Summary

Future is a powerful tool to perform operations in parallel. However, combining several parallel operation can be challenging. This article has described who easily we can compose Scala Futures: how to filter them, how to combine them in parallel and, when needed, how to force them to run sequentially.

How to Integrate ReactiveMongo in your Akka Spray Application

Scalability can be challenging when database access is needed: the common approach is to block the thread until a response is received. ReactiveMongo is a MongoDB Scala Driver that provides fully non-blocking asynchronous I/O operation that increases the scalability of your system.
In a previous post we have seen how to build a REST Api with Spray: in this article we will describe how to expand our application to integrate ReactiveMongo.

All the code produced in this tutorial can be found here.

Our Goal

Our goal is to create an application to manage quizzes. In particular, we want to:
– create a quiz
– delete a quiz
– get a random question
– get a question by id
– answer a question by id

Details on how we have chosen to implement the Rest Interface can be found here. In the following sessions we will analyse how ReactiveMongo can be used to store the quiz entity in our MongoDB database without any blocking operation.

Set Up

First, we need an instance of MongoDB: we can set it up one in our local machine (see MongoDB official website for instructions). Also, we need to include the ReactiveMongo library as part of our SBT dependencies and provide information on our MongoDB instance:

// file build.sbt
libraryDependencies ++= {
  ...
  Seq(
  	...
    "org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",
    ...
  )
}
// file application.conf
...
mongodb {
  database = "quiz-management"
  servers = ["localhost:27017"]
}

Minor refactoring is needed to make our application a little bit more structured: the original QuizProtocol class has been split in two (QuizProtocol and QuestionProtocol) and moved to a new package called model.api.
Finally, we now let MongoDB generate the quiz id rather than asking the user to select one.

ReactiveMongo Integration

First step is to define our persistence model. Also, we need to provide instructions on how to serialise/deserialise our QuizEntity in MongoDB.

// file QuizEntity.scala
// note the package model.persistence to separate it from our model.api representations
package com.danielasfregola.quiz.management.model.persistence

import com.danielasfregola.quiz.management.model.api.QuizProtocol.Quiz
import reactivemongo.bson.{BSONDocumentWriter, BSONDocument, BSONDocumentReader, BSONObjectID}

case class QuizEntity(id: BSONObjectID = BSONObjectID.generate,
                      question: String, 
                      correctAnswer: String)

object QuizEntity {

  implicit def toQuizEntity(quiz: Quiz) = QuizEntity(question = quiz.question, correctAnswer = quiz.correctAnswer)

  implicit object QuizEntityBSONReader extends BSONDocumentReader[QuizEntity] {
    
    def read(doc: BSONDocument): QuizEntity = 
      QuizEntity(
        id = doc.getAs[BSONObjectID]("_id").get,
        question = doc.getAs[String]("question").get,
        correctAnswer = doc.getAs[String]("answer").get
      )
  }
  
  implicit object QuizEntityBSONWriter extends BSONDocumentWriter[QuizEntity] {
    def write(quizEntity: QuizEntity): BSONDocument =
      BSONDocument(
        "_id" -> quizEntity.id,
        "question" -> quizEntity.question,
        "answer" -> quizEntity.correctAnswer
      )
  }
}

Let’s create a trait, called MongoDao, that defines how we use our configuration to connect to our MongoDB instance:

// file MongoDao.scala
package com.danielasfregola.quiz.management.dao

import com.typesafe.config.ConfigFactory
import reactivemongo.api.MongoDriver

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global

trait MongoDao {

  val config = ConfigFactory.load()
  val database = config.getString("mongodb.database")
  val servers = config.getStringList("mongodb.servers").asScala

  val driver = new MongoDriver
  val connection = driver.connection(servers)

  val db = connection(database)
}

We now define our collection and the I/O operations that we can execute on it:

package com.danielasfregola.quiz.management.dao

import com.danielasfregola.quiz.management.model.persistance.QuizEntity
import reactivemongo.api.QueryOpts
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.{BSONDocument, BSONObjectID}
import reactivemongo.core.commands.Count

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

trait QuizDao extends MongoDao {
  
  import com.danielasfregola.quiz.management.model.persistance.QuizEntity._
  import com.danielasfregola.quiz.management.model.api.QuizProtocol._
  
  val collection = db[BSONCollection]("quizzes")

  // it creates a new quiz entity
  def save(quizEntity: QuizEntity) = collection.save(quizEntity)
    .map(_ => QuizCreated(quizEntity.id.stringify))
  
  // it finds a question by id
  def findById(id: String) =
    collection.find(queryById(id)).one[QuizEntity]
  
  // it finds a random question
  def findOne = {
    val futureCount = db.command(Count(collection.name))
    futureCount.flatMap { count =>
      val skip = Random.nextInt(count)
      collection.find(emptyQuery).options(QueryOpts(skipN = skip)).one[QuizEntity]
    }
  }
  
  // it deletes a quiz entity by id
  def deleteById(id: String) = collection.remove(queryById(id)).map(_ => QuizDeleted)

  private def queryById(id: String) = BSONDocument("_id" -> BSONObjectID(id))

  private def emptyQuery = BSONDocument()
}

Almost done! We now just need to use our QuizDao trait as part of our QuizManager and QuestionManager classes:

// file QuizManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

class QuizManager extends QuizDao {

  def createQuiz(quizEntity: QuizEntity) = save(quizEntity)

  def deleteQuizEntity(id: String) = deleteById(id)
  
}

// file QuestionManager.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.dao.QuizDao
import com.danielasfregola.quiz.management.model.api.QuestionProtocol._
import com.danielasfregola.quiz.management.model.persistance.QuizEntity

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class QuestionManager extends QuizDao {

  def getQuestion(maybeId: Option[String] = None) = {

    def extractQuestion(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(quizEntity) => toQuestion(quizEntity)
      case _ => QuestionNotFound
    }
    tryGetQuiz(maybeId).map(extractQuestion)
  }

  def answerQuestion(id: String, proposedAnswer: Answer) = {
    
    def isAnswerCorrect(maybeQuiz: Option[QuizEntity]) = maybeQuiz match {
      case Some(q) if (q.correctAnswer == proposedAnswer.answer) => CorrectAnswer
      case _ => WrongAnswer
    }
    
    tryGetQuiz(Some(id)).map(isAnswerCorrect)
  }

  private def tryGetQuiz(maybeId: Option[String]): Future[Option[QuizEntity]] = maybeId match {
    case Some(id) => findById(id)
    case _ => findOne
  }
  
}

Because the ReactiveMongo library is based on Futures, all the methods of our QuizManager and QuestionManager wrap their values in a Future: let’s adopt the Akka Pipe Pattern to send messages to our Responder Actor. An example on how this approach works is following:

// file RestInterface.scala
...
 pathPrefix("quizzes") {
      pathEnd {
        post {
          entity(as[Quiz]) { quiz => requestContext =>
            val responder = createResponder(requestContext)
            quizManager.createQuiz(quiz).pipeTo(responder)
          }
        }
      } 
...

quizManager.createQuiz(quiz) returns a Future[QuizCreated]: once the future is completed, the QuizCreated message is sent to the Responder Actor.

Summary

ReactiveMongo is a non-blocking asynchronous Scala Driver for MongoDB that is particularly suitable for highly scalable application. This article has described how ReactiveMongo can be easily integrated in an existing Akka Spray application.

All the code produced in this tutorial can be found here.