And the Oscar for best deb scalero goes to…

Scalera

Before going on, we would like to thank you all that participated in this challenge. It’s totally inspiring see how people from many different places around the world share the same motivation, spending time on resolving a small challenge.

It was complicated to choose a winner (you did it really great). Discarding solutions that were based somehow on the distance between rovers (we modified the value of the constant DistanceBetweenRovers to check if rovers still find each other), and some others, we named finalists (by solution arrival order) the following people:

Pablo Francisco Pérez
Daniela Sfregola
Daniel Dehouck

Sebastián Ortega
Florian Mériaux

the very first 3 provided very nice solutions but finally we decided that the winner is …

Lenny-Listening-The-Simpsons

Daniela Sfregola with a 50 tick/7 lines solution.

tumblr_nqg2m1V47l1s9y3qio2_250

Congratulations! Yours is the Scalera t-shirt and our acknowledgement.
Thanks again to the participants. We will shortly be back with new challenges 😉

View original post 4 more words

Background Processing with Akka Cluster: ClusterSingletonManager

REST services are quite commonly used in scalable architectures because they are stateless. However, in the practical world, a service is rarely just a CRUD service: for example we could also have some background processing (i.e.: downloading/parsing files, scheduled processed, triggers, etc).
In this tutorial we demonstrate how to use an Akka SingletonClusterManager in a scalable architecture to perform some background operations only from one node, the Leader of our Akka Cluster.

All the code produced in this article can be found on GitHub.

Our Application

Let’s imagine we have a simple Akka Http REST API with one simple endpoint that given a ping request returns a pong response (for more information on how to create an api with Akka Http see this article):

// curl http://localhost:5000/ping >> pong
// RestInterface.scala
...

val routes: Route =
  path("ping") {
    get {
      complete("pong")
  }
}
...

An Akka Actor, called TickCounter, is also attached to our system to count ticks starting from zero. Its code is following:

// TickCounter.scala
package com.danielasfregola.akka.tutorials.actors

import akka.actor.Actor

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

class TickCounter extends Actor {

  case class Tick(n: Int)

  override def preStart = self ! Tick(0)

  def receive = {
    case Tick(t) =>
      println(s"${self.path} - Tick $t")
      context.system.scheduler.scheduleOnce(1 second, self, Tick(t+1))
  }

}

Our configuration file has only one dependency:

// build.sbt
name := "akka-tutorials"

version := "0.1"

organization := "com.danielasfregola"

scalaVersion := "2.11.5"

resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/")

libraryDependencies ++= {
  val AkkaVersion       = "2.4.2"
  Seq(
    "com.typesafe.akka" %% "akka-http-experimental" % AkkaVersion
  )
}

Also, our Main class looks as following:

// Main.scala
package com.danielasfregola.akka.tutorials

import akka.actor.{Props, ActorSystem}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.danielasfregola.akka.tutorials.actors.TickCounter

import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

object Main extends App with RestInterface {
  val config = ConfigFactory.load()
  val host = config.getString("http.host")
  val port = config.getInt("http.port")

  implicit val system = ActorSystem("akka-tutorials")
  implicit val materializer = ActorMaterializer()

  val tickCounter = system.actorOf(Props[TickCounter], "tick-counter")

  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout(10 seconds)

  val api = routes

  Http().bindAndHandle(handler = api, interface = host, port = port) map { binding =>
    println(s"REST interface bound to $host:$port") } recover { case ex =>
    println(s"REST interface could not bind to $host:$port", ex.getMessage)
  }

}

Our Goal

Our goal is to change our application so that when multiple instances of the same service are running, only one TickCounter is active.

We could set a flag to disable/enable the Actor. However, this is not ideal in a scalable ecosystem where every instance can potentially be removed. What happens if we remove the only node with the flag on? What if we deploy more than one service with the flag as active?

We could move the background process in a new service. However, for cases where the processing is not big enough to justify a new service, this could be quite expensive in terms of infrastructure and maintanance.

…or with some minor code changes we could setup a Akka Singleton Cluster Manager, get over with it and go to the pub early.

Akka SingletonClusterManager

In order to let our application use a SingletonClusterManager we just need to perform three simple operations:
– import the right dependency
– let our Cluster Manager know about our Actor
– set up the behaviour of the Akka Cluster

Obviously, this is only one way of setting up our Cluster. For more information on the other available configurations, please have a look at the official Akka documentation on Cluster Singleton.

Step 1: Update your dependencies

The SingletonClusterManager is part of the akka-cluster-tools package, so we need to add its dependency to our build.sbt file:

"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.2"

Step 2: Define the actors in your Cluster

In our Main we need to specify that our TickCounter Actor is part of our Cluster:

val tickCounter = {
  val singletonProps = ClusterSingletonManager.props(
    singletonProps = Props[TickCounter],
    terminationMessage = PoisonPill,
    settings = ClusterSingletonManagerSettings(system)
  )
  system.actorOf(singletonProps, "tick-counter-singleton")
}

The terminationMessage is sent to the Actor if the Cluster Manager needs to terminate it. Here we decided to keep it simple, so we just brutally kill the actor. Note that this can be customised to trigger a behaviour change rather than causing its termination.

Step 3: Configure the behaviour of your Cluster

Add the following configurations to your application.conf file to tell the Cluster how to behave:

// build.sbt
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = ${REMOTE_PORT}
    }
  }
}

akka.cluster {
  seed-nodes = [
    "akka.tcp://akka-tutorials@127.0.0.1:2551",
    "akka.tcp://akka-tutorials@127.0.0.1:2552",
    "akka.tcp://akka-tutorials@127.0.0.1:2553"
  ]
  min-nr-of-members = 1
  auto-down-unreachable-after = 30s
}

The akka.actor and akka.remote settings provide information on how to create and bind the node of the Cluster. In particular, akka.remote.netty.tcp defines our to reach the node by providing a hostname and port.

The akka.cluster configuration contains the following information:
seed-nodes: the seed nodes of the Cluster.
min-nr-of-members: the number of members needed before starting the Cluster.
auto-down-unreachable-after: the number of seconds after an unreachable node gets maked as Down and removed from the Cluster.

Note that auto-down-unreachable-after is a really sensible and dangerous setting that needs to be setup properly, in particular in production. From the Akka Documentation:

Be very careful when using Cluster Singleton together with Automatic Downing, since it allows the Cluster to split up into two separate clusters, which in turn will result in multiple Singletons being started, one in each separate Cluster!

Usage

Now that our Akka Cluster is set up, we are ready to use it.
Note that in our application, in our to start a service, we have decided to force two configurations at runtime: the port of the api and port of the akka node.

Let’s run the first instance of our service on port 5000 and the Akka node on port 2551:

> export PORT=5000
> export REMOTE_PORT=2551
> sbt run

In the logs we we see that the Cluster Manager declares the node as the Oldest of the Cluster and it starts the TickCounter Actor:

...
[INFO] [02/21/2016 16:14:24.312] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Node [akka.tcp://akka-tutorials@127.0.0.1:2551] is JOINING, roles []
[INFO] [02/21/2016 16:14:24.320] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is moving node [akka.tcp://akka-tutorials@127.0.0.1:2551] to [Up]
[INFO] [02/21/2016 16:14:24.337] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] Singleton manager starting singleton actor [akka://akka-tutorials/user/tick-counter-singleton/singleton]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 0
[INFO] [02/21/2016 16:14:24.340] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Oldest]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 1
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 2
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 3
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 4
...

Let’s run a second instance of our service on port 5001 and the node on port 2552:

> export PORT=5001
> export REMOTE_PORT=2552
> sbt run

In the logs of the second instance we see that the Cluster Manager declares the second node as Younger and it does not start its TickCounter Actor:

...
[INFO] [02/21/2016 16:18:31.343] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Welcome from [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:18:31.540] [akka-tutorials-akka.actor.default-dispatcher-20] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Younger]
REST interface bound to 0.0.0.0:5001
...

If we stop the non-leader node (i.e.: the second one we started), the leader node acknowledges that the second node is unreachable: after some time, it marks it as down and it removes it from the Cluster:

[WARN] [02/21/2016 16:22:32.154] [akka-tutorials-akka.remote.default-remote-dispatcher-21] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-2] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
...
[WARN] [02/21/2016 16:22:36.287] [akka-tutorials-akka.actor.default-dispatcher-15] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/cluster/core/daemon] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://akka-tutorials@127.0.0.1:2552, status = Up)]
...
[WARN] [02/21/2016 16:22:37.349] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-3] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
...
[INFO] [02/21/2016 16:22:55.339] [akka-tutorials-akka.actor.default-dispatcher-18] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader can currently not perform its duties, reachability status: [akka.tcp://akka-tutorials@127.0.0.1:2551 -> akka.tcp://akka-tutorials@127.0.0.1:2552: Unreachable [Unreachable] (1)], member status: [akka.tcp://akka-tutorials@127.0.0.1:2551 Up seen=true, akka.tcp://akka-tutorials@127.0.0.1:2552 Up seen=false]
[WARN] [02/21/2016 16:22:55.346] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-6] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
[WARN] [02/21/2016 16:23:01.350] [akka-tutorials-akka.remote.default-remote-dispatcher-47] [akka.tcp://akka-tutorials@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2552-7] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
...
[INFO] [02/21/2016 16:23:06.305] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is auto-downing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552]
[INFO] [02/21/2016 16:23:06.306] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Marking unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552] as [Down]
...
[INFO] [02/21/2016 16:23:07.285] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader can perform its duties again
[INFO] [02/21/2016 16:23:07.287] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is removing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2552]
[INFO] [02/21/2016 16:23:07.288] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.tcp://akka-tutorials@127.0.0.1:2551/user/tick-counter-singleton] Member removed [akka.tcp://akka-tutorials@127.0.0.1:2552]
[WARN] [02/21/2016 16:23:07.292] [akka-tutorials-akka.remote.default-remote-dispatcher-21] [akka.remote.Remoting] Association to [akka.tcp://akka-tutorials@127.0.0.1:2552] having UID [-2037835353] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
...

When we start the second node again, the node is added back to the Cluster:

...
[INFO] [02/21/2016 16:28:49.560] [akka-tutorials-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Node [akka.tcp://akka-tutorials@127.0.0.1:2552] is JOINING, roles []
[INFO] [02/21/2016 16:28:50.293] [akka-tutorials-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2551] - Leader is moving node [akka.tcp://akka-tutorials@127.0.0.1:2552] to [Up]
...

At last, we kill the Leader node and we see that after some time the second node is promoted as Leader:

REST interface bound to 0.0.0.0:5001
[INFO] [02/21/2016 16:28:51.192] [akka-tutorials-akka.actor.default-dispatcher-15] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Start -> Younger]
[WARN] [02/21/2016 16:32:01.276] [akka-tutorials-akka.remote.default-remote-dispatcher-6] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-0] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
[WARN] [02/21/2016 16:32:06.185] [akka-tutorials-akka.actor.default-dispatcher-17] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/cluster/core/daemon] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Marking node(s) as UNREACHABLE [Member(address = akka.tcp://akka-tutorials@127.0.0.1:2551, status = Up)]
[WARN] [02/21/2016 16:32:06.674] [akka-tutorials-akka.remote.default-remote-dispatcher-5] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-2] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
[WARN] [02/21/2016 16:32:12.677] [akka-tutorials-akka.remote.default-remote-dispatcher-6] [akka.tcp://akka-tutorials@127.0.0.1:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fakka-tutorials%40127.0.0.1%3A2551-3] Association with remote system [akka.tcp://akka-tutorials@127.0.0.1:2551] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://akka-tutorials@127.0.0.1:2551]] Caused by: [Connection refused: /127.0.0.1:2551]
...
[INFO] [02/21/2016 16:32:26.188] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader can currently not perform its duties, reachability status: [akka.tcp://akka-tutorials@127.0.0.1:2552 -> akka.tcp://akka-tutorials@127.0.0.1:2551: Unreachable [Unreachable] (1)], member status: [akka.tcp://akka-tutorials@127.0.0.1:2551 Up seen=false, akka.tcp://akka-tutorials@127.0.0.1:2552 Up seen=true]
...
[INFO] [02/21/2016 16:32:36.203] [akka-tutorials-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader is auto-downing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:36.204] [akka-tutorials-akka.actor.default-dispatcher-16] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Marking unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551] as [Down]
...
[INFO] [02/21/2016 16:32:37.182] [akka-tutorials-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader can perform its duties again
[INFO] [02/21/2016 16:32:37.189] [akka-tutorials-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://akka-tutorials)] Cluster Node [akka.tcp://akka-tutorials@127.0.0.1:2552] - Leader is removing unreachable node [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:37.192] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Previous oldest removed [akka.tcp://akka-tutorials@127.0.0.1:2551]
[INFO] [02/21/2016 16:32:37.193] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Younger observed OldestChanged: [None -> myself]
[WARN] [02/21/2016 16:32:37.194] [akka-tutorials-akka.remote.default-remote-dispatcher-5] [akka.remote.Remoting] Association to [akka.tcp://akka-tutorials@127.0.0.1:2551] having UID [676396303] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[INFO] [02/21/2016 16:32:37.195] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] Singleton manager starting singleton actor [akka://akka-tutorials/user/tick-counter-singleton/singleton]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 0
[INFO] [02/21/2016 16:32:37.197] [akka-tutorials-akka.actor.default-dispatcher-19] [akka.tcp://akka-tutorials@127.0.0.1:2552/user/tick-counter-singleton] ClusterSingletonManager state change [Younger -> Oldest]
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 1
akka://akka-tutorials/user/tick-counter-singleton/singleton - Tick 2
....

Summary

In this article we have described how to use an Akka SingletonClusterManager to synchronise several instances of the same service. In particular, our goal was to configure an Akka Cluster of services so that some operations will run only from one service at the time (i.e.: the Leader of the Cluster).

All the code produced in this tutorial is on GitHub.

How to build a REST API with Akka Http

In previous articles we have discussed how to use Spray to build REST APIs. Since Akka 2.4, Spray is no longer supported and it is replaced by Akka Http.
This article will introduce Akka Http, the new shiny toy from the Akka Team, and provide a tutorial on how it can be used to create a simple REST API.

All the code produced in this tutorial can be found on GitHub.

Why Akka Http?

Spray has been the Akka way of building APIs for quite some time.
Although not directly built by the Akka Team, it was heavily using the Akka ecosystem: Spray is implemented with Akka Actors.
The project went so well that, after some time, the Akka Team decided to adopt it. After the Akka Team released Akka Streams, they realised that Spray’s performance could be improved by using Akka Streams together with Akka Actors.

Since Akka 2.4, Akka Http is the official Akka toolkit to create REST API, both client and server side.
Note that in Akka 2.4, Spray is no longer supported…so you are forced to migrate from Spray to Akka Http if you want to use any of the other latest tools of Akka, like Akka Persistance.

At the time of this writing, Akka Http is still an experimental module — but it has been declared stable for production. Also its performance is worse than Spray: the Akka Team has been focusing on its interface, but they have promised to massively improve its performance by Q1 2016.

Our CRUD Application

Previously, we described how to create a simple CRUD application with Spray.
In this article we will rewrite exactly the same application using Akka Http instead of Spray.

In particular, our application will create, retrieve, update, delete a Question entity.
A question has 3 fields (id, title, text) and its case class looks as following:

case class Question(id: String, title: String, text: String)

Also, to keep things simple, we are going to keep all the data in memory, rather than properly storing it in a database.
The class QuestionService simulates a persistent storage by keeping all the entities in a Vector.
The following code is the skeleton of the QuestionService class (more details on its implementation can be found here):

package com.danielasfregola.quiz.management.services

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import scala.concurrent.{ExecutionContext, Future}

class QuestionService(implicit val executionContext: ExecutionContext) {

  var questions = Vector.empty[Question]

  def createQuestion(question: Question): Future[Option[String]] = ...

  def getQuestion(id: String): Future[Option[Question]] = ...

  def updateQuestion(id: String, update: QuestionUpdate): Future[Option[Question]] = ...

  def deleteQuestion(id: String): Future[Unit] = ...

}

Last but not least, we will use json4s to (de)serialise a json into a case class (more information on how to use json4s can be found here).

Setup

The first step is to add the right dependencies to our project:

// build.sbt
...
resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
                  Resolver.bintrayRepo("hseeberger", "maven"))

libraryDependencies ++= {
  val AkkaVersion       = "2.3.9"
  val AkkaHttpVersion   = "2.0.1"
  val Json4sVersion     = "3.2.11"
  Seq(
    "com.typesafe.akka" %% "akka-slf4j"      % AkkaVersion,
    "com.typesafe.akka" %% "akka-http-experimental" % AkkaHttpVersion,
    "ch.qos.logback"    %  "logback-classic" % "1.1.2",
    "org.json4s"        %% "json4s-native"   % Json4sVersion,
    "org.json4s"        %% "json4s-ext"      % Json4sVersion,
    "de.heikoseeberger" %% "akka-http-json4s" % "1.4.2"
  )
}
...

Then, we need to bind our api to an host and port:

// Main.scala
package com.danielasfregola.quiz.management

import scala.concurrent.duration._
import akka.actor._
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.util.Timeout

import com.typesafe.config.ConfigFactory

object Main extends App with RestInterface {
  val config = ConfigFactory.load()
  val host = config.getString("http.host")
  val port = config.getInt("http.port")

  implicit val system = ActorSystem("quiz-management-service")
  implicit val materializer = ActorMaterializer()


  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout(10 seconds)

  val api = routes

  Http().bindAndHandle(handler = api, interface = host, port = port) map { binding =>
    println(s"REST interface bound to ${binding.localAddress}") } recover { case ex =>
    println(s"REST interface could not bind to $host:$port", ex.getMessage)
  }
}

Note that RestInterface is just a collection of routes and the services needed:

package com.danielasfregola.quiz.management

import scala.concurrent.ExecutionContext

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.resources.QuestionResource
import com.danielasfregola.quiz.management.services.QuestionService

trait RestInterface extends Resources {

  implicit def executionContext: ExecutionContext

  lazy val questionService = new QuestionService

  val routes: Route = questionRoutes

}

trait Resources extends QuestionResource

Question Resource

QuestionResource is a generic Resource:
– it has a service that performs some operations on the entity
– it has some routes (see later paragraphs of this article)
– it extends a generic Resource, called MyResource

Its skeleton is following:

// QuestionResource.scala
package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = ???

  }
}

MyResource is a trait where we add code that is common/useful for all the resources (the code can be found here).
In particular, it includes the json4s support to (de)serialise case classes and some helper methods that will make our akka-http routing easier.

Now that we have setup the skeleton of our application, we can focus on the implementation of our endpoints.

POST – Create a Question

Usage

The first task of our application is to define an endpoint to create a question entity.
According to the REST protocol, an entity is created through a POST request that should reply with a 201 (Created) HTTP status code. Also, a Location Header with the URI that identifies the location of the new entity should be returned.
Note that a POST request is non-idempotent: if the entity already exists or cannot be created, we should return an HTTP error status code.

For our questions application, this can be translated in the following curl command:

curl -v -H "Content-Type: application/json" \
   -X POST http://localhost:5000/questions \
   -d '{"id": "test", "title": "MyTitle", "text":"The text of my question"}'

The first time we make the request, we should get a reply similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
>
* upload completely sent off: 68 out of 68 bytes
 HTTP/1.1 201 Created
 Location: http://localhost:5000/questions/test
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:16:50 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

If we repeat the request again, we will get an HTTP response with a 409 (Conflict) status code as the entity already exists:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
>
* upload completely sent off: 68 out of 68 bytes
 HTTP/1.1 409 Conflict
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:17:07 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

Implementation

As described in the previous paragraph, when creating an entity we would like to provide the URI of the created entity in a Location header.
Our implementation is tailored to the behaviour of our system: when QuestionService creates a question, it returns a Future[Option[T]] and, if the returned option is not defined, we want to return a different HTTP status code.
Unfortunately, Akka Http doesn’t have a default implementation to achieve this, so we will have to create our own my adding to our MyResource trait:

// MyResource.scala
ppackage com.danielasfregola.quiz.management.routing

import akka.http.scaladsl.marshalling.{ToResponseMarshaller, ToResponseMarshallable}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{Directives, Route}

import com.danielasfregola.quiz.management.serializers.JsonSupport

trait MyResource extends Directives with JsonSupport {

  implicit def executionContext: ExecutionContext

  def completeWithLocationHeader[T](resourceId: Future[Option[T]], ifDefinedStatus: Int, ifEmptyStatus: Int): Route =
    onSuccess(resourceId) {
      case Some(t) => completeWithLocationHeader(ifDefinedStatus, t)
      case None => complete(ifEmptyStatus, None)
    }

  def completeWithLocationHeader[T](status: Int, resourceId: T): Route =
    extractRequestContext { requestContext =>
      val request = requestContext.request
      val location = request.uri.copy(path = request.uri.path / resourceId.toString)
      respondWithHeader(Location(location)) {
        complete(status, None)
      }
    }

  def complete[T: ToResponseMarshaller](resource: Future[Option[T]]): Route =
    onSuccess(resource) {
      case Some(t) => complete(ToResponseMarshallable(t))
      case None => complete(404, None)
    }

  def complete(resource: Future[Unit]): Route = onSuccess(resource) { complete(204, None) }

}

We can now put everything together and define the endpoint to create a question entity:

// QuestionResource.scala
package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    pathEnd {
      post {
        entity(as[Question]) { question =>
          completeWithLocationHeader(
            resourceId = questionService.createQuestion(question),
            ifDefinedStatus = 201, ifEmptyStatus = 409)
          }
        }
    } ~ ...

  }
}

GET – Retrieve a Question

Usage

Now that we have created a question, we can retrieve it by performing a GET request to the URI that identifies the entity (i.e.: the one returned in the Location Header).
The request should respond with either a 200 (OK) HTTP status code with a body containing the question entity or a 404 (NotFound) HTTP status code with empty body.

For example, we can get an existing question with the following curl command…

curl -v http://localhost:5000/questions/test

…and it should return something similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
>
 HTTP/1.1 200 OK
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:17:31 GMT
 Content-Type: application/json
 Content-Length: 64

* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"The text of my question"}

Moreover, if we request an entity that doesn’t exists…

curl -v http://localhost:5000/questions/non-existing-question

….we should get a 404 error code:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
>
 HTTP/1.1 404 Not Found
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:18:40 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

Implementation

QuestionService returns a Future[Option[Question]] when retrieving a question.
Differently from Spray, Akka Http doesn’t seem to complete optional values correctly: complete(Future(None)) returns an http response with code 200 and an empty body rather than a 404 http response — which in my opinion doesn’t make much sense, considering that 200 should have a non-empty body.

UPDATE:
The Akka team has discussed this issue on GitHub (see here for more information) and this does not seem to be a bug: apparently there are some case scenarios where complete(Future(None)) needs to be completed with something else rather than 404.
Thank you to @ktoso for looking into this!

Not a problem, we can add some *black magic* to our MyResource trait to make look the code exactly the same as before:

// MyResource.scala
package com.danielasfregola.quiz.management.routing

import akka.http.scaladsl.marshalling.{ToResponseMarshaller, ToResponseMarshallable}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{Directives, Route}

import com.danielasfregola.quiz.management.serializers.JsonSupport

trait MyResource extends Directives with JsonSupport {

  implicit def executionContext: ExecutionContext

  ...

  def complete[T: ToResponseMarshaller](resource: Future[Option[T]]): Route =
    onSuccess(resource) {
      case Some(t) => complete(ToResponseMarshallable(t))
      case None => complete(404, None)
    }

  ...
}

Thanks to our trick, our route now looks exactly the same as with Spray:

package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      get {
        complete(questionService.getQuestion(id))
      } ~
      ...
    }
  }

}

PUT – Update a Question

Usage

When updating an entity, we should use a PUT request. Also, we should send only the fields that we want to update, not the whole object. Not only this will make the usage of our API easier, but it will also reduce potential concurrency issues.
If the update goes through, we should get a HTTP response with a 200 (OK) status code with the updated entity in the body. On the other side, if the update was not possible, for example because the entity no longer exists, we should get a HTTP response with status 404 (NotFound) and an empty body.
Note that a PUT request is idempotent: performing the update multiple times should already return the same result.

In our application we can update the question entity with the following curl command…

curl -v -H "Content-Type: application/json" \
   -X PUT http://localhost:5000/questions/test \
   -d '{"text":"Another text"}'

….and get the following reply:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
>
* upload completely sent off: 23 out of 23 bytes
 HTTP/1.1 200 OK
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:19:31 GMT
 Content-Type: application/json
 Content-Length: 53

* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"Another text"}

If we try to update a resource that doesn’t exist, we should get a 404 response:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
>
* upload completely sent off: 23 out of 23 bytes
 HTTP/1.1 404 Not Found
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:20:07 GMT
 Content-Type: application/json
 Content-Length: 0

* Connection #0 to host localhost left intact

Implementation

As explained in the previous section, we want the client of our API to send just the fields to update, not the whole entity. In order to achieve this, we will deserialise the body of our PUT request to the following case class:

case class QuestionUpdate(title: Option[String], text: Option[String])

Note that we decided not to allow our clients to update the field id, as it is used to locate the entity.

Keeping in mind that QuestionService returns Future[Option[Question]] when updating a question, we can reuse our “black magic” trick used in our GET route too make our code look nice:

package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route
import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      ... ~
      put {
        entity(as[QuestionUpdate]) { update =>
          complete(questionService.updateQuestion(id, update))
        }
      } ~ ...
    }
  }

}

DELETE – Delete a Question

Usage

Finally, we want to have an endpoint to delete a question entity. This can be achieved by sending a DELETE request to the URI that identifies the entity that should reply with a 204 (NoContent) status code once the operation has been completed.
Note that DELETE is idempotent, so deleting a resource that has been already deleted should still return an HTTP response with a 204 (NoContent) status code and an empty body.

For example, we can delete the question test with the following…

curl -v -X DELETE http://localhost:5000/questions/test

…and get the following result back:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> DELETE /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
>
 HTTP/1.1 204 No Content
 Server: akka-http/2.3.12
 Date: Sun, 07 Feb 2016 11:20:30 GMT
 Content-Type: application/json

* Connection #0 to host localhost left intact

Implementation

When deleting a question, QuestionService returns Future[Unit].
Unfortunately, Akka Http resolves complete(Future(())) with an http response with code 200 and empty body — same as complete(Future(None))! Please, put a comment below if you know the rationale behind this design choice.

UPDATE:
Thank you again to @ktoso from the Akka Team for looking into this! Apparently this is an issue related to Json4s.
Here is the crystal clear explanation that @hseeberger has provided in the comments below:

What causes the issue is that (a) Json4s cannot marshal `AnyVal`s and (b) Json4s happily marshals `Future`. This leads to bypassing the `Future` marshaller from Akka HTTP.

Not a problem, we just need to add the following code to MyResource to make it resolve complete with a 204 http response with empty body:

// MyResource.scala
package com.danielasfregola.quiz.management.routing

import akka.http.scaladsl.marshalling.{ToResponseMarshaller, ToResponseMarshallable}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{Directives, Route}

import com.danielasfregola.quiz.management.serializers.JsonSupport

trait MyResource extends Directives with JsonSupport {

  implicit def executionContext: ExecutionContext

  ...

  def complete(resource: Future[Unit]): Route = onSuccess(resource) { complete(204, None) }

}

Our DELETE endpoint can now be implemented as following:

package com.danielasfregola.quiz.management.resources

import akka.http.scaladsl.server.Route
import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import com.danielasfregola.quiz.management.routing.MyResource
import com.danielasfregola.quiz.management.services.QuestionService

trait QuestionResource extends MyResource {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    .. ~
    path(Segment) { id =>
      ... ~
      delete {
        complete(questionService.deleteQuestion(id))
      }
    }
  }

}

Summary

In this article we have introduced Akka Http and we have provided a simple tutorial on how to create a simple CRUD application using Akka Http.

The completed code of this tutorial can be found on GitHub.

Thank to @ktoso and @hseeberger for clarifying some issues raised in this article!

twitter4s: A Scala client for the Twitter API

A few months ago, I started looking into the Twitter API and I have developed twitter4s, an asynchronous non-blocking Twitter client in Scala.

In this article, we will introduce twitter4s providing examples of how to download tweets from a user timeline and how to perform some simple data analysis.

The code shown in this tutorial is available on Github.

Getting Started

The Twitter API can be accessed by means of a registered app. So, first of all, we need to register our app. Login with your twitter account and have a look at the Twitter API terms and conditions.
If you are happy with them, register your app at http://apps.twitter.com.

In order to do so, you will need to provide an app name and an brief description of what it does. After the registration, a consumer key and consumer secret will be provided: save them as we will need them when setting up twitter4s.
Also, generate an access key and access secret and make sure you have the correct permissions: for this tutorial “Read Only” is enough.

Finally, please note that the Twitter API has rate limits — have a look at the Twitter’s development website for more information.
Also, have a look at the rate limits chart where the rate limit for each endpoint is summarized.

Setup

If not already there, add Maven Central as resolver in your SBT configuration:

resolvers += "Maven central" at "http://repo1.maven.org/maven2/"

Also, you need to include the library as your dependency:

libraryDependencies ++= Seq(
  "com.danielasfregola" %% "twitter4s" % "0.2.1"
)

Usage

Add your consumer and access token to your configuration file and initialize your Twitter Client:

import com.danielasfregola.twitter4s.TwitterClient

val client = new TwitterClient()

Alternatively, you can also specify your tokens directly when creating the client:

import com.danielasfregola.twitter4s.TwitterClient
import com.danielasfregola.twitter4s.entities.{AccessToken, ConsumerToken}

val consumerToken = ConsumerToken(key = "my-consumer-key", secret = "my-consumer-secret")
val accessToken = AccessToken(key = "my-access-key", secret = "my-access-secret")
val client = new TwitterClient(consumerToken, accessToken)

Now that our Twitter Client has been initialized, we are now ready to use it! 😀
Have a look at its documentation for a complete list of the supported functionalities.

Top Hashtags in Timeline

As a sample code, let’s collect the tweets in a user timeline and display the top 10 hashtags used. In the tutorial, we will download and analyze tweets by Martin Odersky (the creator of Scala).

First, we need to get the tweets from the user timeline:

client.getUserTimelineForUser(screen_name = "odersky", count = 200)

The method getUserTimelineForUser (see scaladoc) return type is Future[Seq[Tweet]].
Note that a Tweet is a quite rich case class that contains a lot of information (see its scaladoc): it has more than 22 fields!
The need of having huge case classes is the reason why this library doesn’t support Scala versions older than 2.11: previous versions allow up to 22 fields in a case class.

In order to retrieve the hashtags used in a Tweet, we don’t have to parse the text of the tweet, as the Twitter API has already done all the hard work for us: we just need to access the Entities field and count how many times each hashtag is used:

  def getTopHashtags(tweets: Seq[Tweet], n: Int = 10): Seq[(String, Int)] = {
    val hashtags: Seq[Seq[HashTag]] = tweets.map { tweet =>
      tweet.entities.map(_.hashtags).getOrElse(Seq.empty)
    }
    val hashtagTexts: Seq[String] = hashtags.flatten.map(_.text.toLowerCase)
    val hashtagFrequencies: Map[String, Int] = hashtagTexts.groupBy(identity).mapValues(_.size)
    hashtagFrequencies.toSeq.sortBy { case (entity, frequency) => -frequency }.take(n)
  }

Let’s put everything together and add some code to print the results with a nice layout:

import com.danielasfregola.twitter4s.TwitterClient
import com.danielasfregola.twitter4s.entities.{HashTag, Tweet}

import scala.concurrent.ExecutionContext.Implicits.global

object UserTopHashtags extends App {

  def getTopHashtags(tweets: Seq[Tweet], n: Int = 10): Seq[(String, Int)] = {
    val hashtags: Seq[Seq[HashTag]] = tweets.map { tweet =>
      tweet.entities.map(_.hashtags).getOrElse(Seq.empty)
    }
    val hashtagTexts: Seq[String] = hashtags.flatten.map(_.text.toLowerCase)
    val hashtagFrequencies: Map[String, Int] = hashtagTexts.groupBy(identity).mapValues(_.size)
    hashtagFrequencies.toSeq.sortBy { case (entity, frequency) => -frequency }.take(n)
  }

  val client = new TwitterClient()

  val user = "odersky"

  client.getUserTimelineForUser(screen_name = user, count = 200).map { tweets =>
    val topHashtags: Seq[((String, Int), Int)] = getTopHashtags(tweets).zipWithIndex
    val rankings = topHashtags.map { case ((entity, frequency), idx) => s"[${idx + 1}] $entity (found $frequency times)"}
    println(s"${user.toUpperCase}'S TOP HASHTAGS:")
    println(rankings.mkString("\n"))
  }

}

At the time of this writing, running the following code generates the following output:

ODERSKY'S TOP HASHTAGS:
[1] scala (found 25 times)
[2] scaladays (found 5 times)
[3] scalajs (found 4 times)
[4] progfun (found 3 times)
[5] coursera (found 2 times)
[6] scalax (found 1 times)
[7] community (found 1 times)
[8] aws (found 1 times)
[9] iexpectmoreofapple (found 1 times)
[10] scalamatsuri (found 1 times)

Summary

In this article we have introduced a new asynchronous non-blocking Scala Client for Twitter, called twitter4s.
We have described how to register our app, setup the Twitter Client and we have provided a sample code to download tweets from a user timeline and analyze their hashtags.

The code shown in this tutorial can be found here.

How to build a Scala REST CRUD application with Spray

In previous articles we have described how to build a REST Api with Spray and how to (de)serialize case classes with json4s. However, in order to keep things simple, we didn’t always do things as suggested by spray.io.

In this article we will redeem ourselves and we will describe how to build a REST CRUD application in Spray, taking full advantage of the tools offered by the Spray’s tool kit.

All the code shown in this tutorial can be found on GitHub.

Our CRUD Application

A REST CRUD application is an application that manipulated entities using 4 key operations: create, retrieve, update, delete.
In this tutorial we will describe how to create a simple REST CRUD application to manage question entities.
A question has the following fields: id, title, text. We are going to use json4s to translate it to the following case class (for more information on how to use json4s, have a look here):

case class Question(id: String, title: String, text: String)

Also, to keep things simple, we are not going to store the entities in a database but we will simply keep them in memory. In this tutorial the class QuestionService simulates a persistent storage by storing all the questions in a Vector (see its complete code here):

package com.danielasfregola.quiz.management.services

import com.danielasfregola.quiz.management.entities.{Question, QuestionUpdate}
import scala.concurrent.{ExecutionContext, Future}

class QuestionService(implicit val executionContext: ExecutionContext) {

  var questions = Vector.empty[Question]

  def createQuestion(question: Question): Future[Option[String]] = ...

  def getQuestion(id: String): Future[Option[Question]] = ...

  def updateQuestion(id: String, update: QuestionUpdate): Future[Option[Question]] = ...
    
  def deleteQuestion(id: String): Future[Unit] = ...

}

POST – Create a Question

Usage

The first task of our application is to define an endpoint to create a question entity.
According to the REST protocol, an entity is created through a POST request that should reply with a 201 (Created) HTTP status code. Also, a Location Header with the URI that identifies the location of the new entity should be returned.
Note that a POST request is non-idempotent: if the entity already exists or cannot be created, we should return an HTTP error status code.

For our questions application, this can be translated in the following curl command:

curl -v -H "Content-Type: application/json" \
   -X POST http://localhost:5000/questions \
   -d '{"id": "test", "title": "MyTitle", "text":"The text of my question"}'

The first time we make the request, we should get a reply similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
> 
* upload completely sent off: 68 out of 68 bytes
< HTTP/1.1 201 Created
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 11:37:11 GMT
< Location: http://localhost:5000/questions/test
< Content-Length: 0
<
* Connection #0 to host localhost left intact

If we repeat the request again, we will get an HTTP response with a 409 (Conflict) status code as the entity already exists:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> POST /questions HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 68
> 
* upload completely sent off: 68 out of 68 bytes
< HTTP/1.1 409 Conflict
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 11:53:34 GMT
< Content-Length: 0
< 

Implementation

Spray has several methods to complete a generic result and convert it into a Route (see the Spray Documentation for more information). However, there isn’t a standard function to transform a result into a Location Header….so we are going to write one! 😀
Note that our implementation is tailored to the behaviour of our system: when QuestionService creates a question, it returns a Future[Option[T]] and, if the returned option is not defined, we want to return a different HTTP status code.

package com.danielasfregola.quiz.management.routing

import com.danielasfregola.quiz.management.serializers.JsonSupport
import spray.http.HttpHeaders
import spray.routing._
import scala.concurrent.{ExecutionContext, Future}

trait MyHttpService extends HttpService with JsonSupport {

  implicit val executionContext: ExecutionContext

  def completeWithLocationHeader[T](resourceId: Future[Option[T]], ifDefinedStatus: Int, ifEmptyStatus: Int): Route =
    onSuccess(resourceId) { maybeT =>
      maybeT match {
        case Some(t) => completeWithLocationHeader(ifDefinedStatus, t)
        case None => complete(ifEmptyStatus, None)
      }
    }

  def completeWithLocationHeader[T](status: Int, resourceId: T): Route =
    requestInstance { request =>
      val location = request.uri.copy(path = request.uri.path / resourceId.toString)
      respondWithHeader(HttpHeaders.Location(location)) {
        complete(status, None)
      }
    }
}

We can now put everything together and define the endpoint to create a question entity:

package com.danielasfregola.quiz.management.resources

import com.danielasfregola.quiz.management.entities.{QuestionUpdate, Question}
import com.danielasfregola.quiz.management.routing.MyHttpService
import com.danielasfregola.quiz.management.services.QuestionService
import spray.routing._

trait QuestionResource extends MyHttpService {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    pathEnd {
      post {
        entity(as[Question]) { question =>
          completeWithLocationHeader(
            resourceId = questionService.createQuestion(question),
            ifDefinedStatus = 201, ifEmptyStatus = 409)
          }
        }
    } ~
    ...
  }

}

GET – Retrieve a Question

Usage

Now that we have created a question, we can retrieve it by performing a GET request to the URI that identifies the entity (i.e.: the one returned in the Location Header). The request should respond with either a 200 (OK) HTTP status code with a body containing the question entity or a 404 (NotFound) HTTP status code with empty body.

For example, we can get an existing question with the following curl command…

curl -v http://localhost:5000/questions/test

…and it should return something similar to the following:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:23:34 GMT
< Content-Type: application/json; charset=UTF-8
< Content-Length: 64
< 
* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"The text of my question"}

Moreover, if we request an entity that doesn’t exists…

curl -v http://localhost:5000/questions/non-existing-question

….we should get a 404 error code:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> GET /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 404 Not Found
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:25:43 GMT
< Content-Length: 0
< 
* Connection #0 to host localhost left intact

Implementation

This behaviour can be easily be achieved with Spray as it will automatically complete optional values:
– if the option is defined, complete will transform it in a HTTP response with status 200 (OK) status code and a body containing the entity json representation.
– if the option is empty, it will just return a HTTP response with status 404 (NotFound) status code.

package com.danielasfregola.quiz.management.resources

import com.danielasfregola.quiz.management.entities.{QuestionUpdate, Question}
import com.danielasfregola.quiz.management.routing.MyHttpService
import com.danielasfregola.quiz.management.services.QuestionService
import spray.routing._

trait QuestionResource extends MyHttpService {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      get {
        complete(questionService.getQuestion(id))
      } ~
      ...
    }
  }
}

PUT – Update a Question

Usage

When updating an entity, we should use a PUT request. Also, we should send only the fields that we want to update, not the whole object. Not only this will make the usage of our API easier, but it will also reduce potential concurrency issues.
If the update goes through, we should get a HTTP response with a 200 (OK) status code with the updated entity in the body. On the other side, if the update was not possible, for example because the entity no longer exists, we should get a HTTP response with status 404 (NotFound) and an empty body.
Note that a PUT request is idempotent: performing the update multiple times should already return the same result.

In our application we can update the question entity with the following curl command…

curl -v -H "Content-Type: application/json" \
   -X PUT http://localhost:5000/questions/test \
   -d '{"text":"Another text"}'

….and get the following reply:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
> 
* upload completely sent off: 23 out of 23 bytes
< HTTP/1.1 200 OK
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:44:03 GMT
< Content-Type: application/json; charset=UTF-8
< Content-Length: 53
< 
* Connection #0 to host localhost left intact
{"id":"test","title":"MyTitle","text":"Another text"}

Similarly, if we try to update a resource that doesn’t exist…

curl -v -H "Content-Type: application/json" \
   -X PUT http://localhost:5000/questions/non-existing-question \
   -d '{"text":"Another text"}'

…we should get a 404 (NotFound) error code:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> PUT /questions/non-existing-question HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 23
> 
* upload completely sent off: 23 out of 23 bytes
< HTTP/1.1 404 Not Found
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:46:15 GMT
< Content-Length: 0
< 
* Connection #0 to host localhost left intact

Implementation

As explained in the previous section, we want the client of our API to send just the fields to update, not the whole entity. In order to achieve this, we will deserialize the body of our PUT request to the following case class:

case class QuestionUpdate(title: Option[String], text: Option[String])

Note that we decided not to allow our clients to update the field id, as it is used to locate the entity.

Similarly to what we did for the GET request, Spray does all the work for us:

package com.danielasfregola.quiz.management.resources

import com.danielasfregola.quiz.management.entities.{QuestionUpdate, Question}
import com.danielasfregola.quiz.management.routing.MyHttpService
import com.danielasfregola.quiz.management.services.QuestionService
import spray.routing._

trait QuestionResource extends MyHttpService {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { 
      ... ~
      put {
        entity(as[QuestionUpdate]) { update =>
          complete(questionService.updateQuestion(id, update))
        }
      } ~
      ...
    }
  }
}

DELETE – Delete a Question

Usage

Finally, we want to have an endpoint to delete a question entity. This can be achieved by sending a DELETE request to the URI that identifies the entity that should reply with a 204 (NoContent) status code once the operation has been completed. Note that DELETE is idempotent, so deleting a resource that has been already deleted should still return an HTTP response with a 204 (NoContent) status code and an empty body.

For example, we can delete the question test with the following…

curl -v -X DELETE http://localhost:5000/questions/test

…and get the following result back:

*   Trying ::1...
* Connected to localhost (::1) port 5000 (#0)
> DELETE /questions/test HTTP/1.1
> Host: localhost:5000
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 204 No Content
< Server: Quiz Management Service REST API
< Date: Sat, 21 Nov 2015 12:58:30 GMT
< Content-Type: application/json; charset=UTF-8
< Content-Length: 2
< 
* Excess found in a non pipelined read: excess = 2 url = /questions/non-existing-question (zero-length body)
* Connection #0 to host localhost left intact

Implementation

Once again, Spray makes our life really easy as all we have to do in order to define an endpoint to delete a question is just to reuse already defined functions in the Spray’s tool kit:

package com.danielasfregola.quiz.management.resources

import com.danielasfregola.quiz.management.entities.{QuestionUpdate, Question}
import com.danielasfregola.quiz.management.routing.MyHttpService
import com.danielasfregola.quiz.management.services.QuestionService
import spray.routing._

trait QuestionResource extends MyHttpService {

  val questionService: QuestionService

  def questionRoutes: Route = pathPrefix("questions") {
    ... ~
    path(Segment) { id =>
      ... ~
      delete {
        complete(204, questionService.deleteQuestion(id))
      }
    }
  }
}

Summary

In this article we have described what a REST CRUD application is. Also, we have provided a simple tutorial on how to create a simple CRUD application using Spray. The code of the application analysed can be found on GitHub.

Spray: how to (de)serialize objects with json4s

A while ago I wrote an article on how to build a REST api with Spray, where I used the Spray facilities to serialize and deserialize objects to and from JSON.

This article will analyze how we can make the (de)serialization of our case classes easier, by using a library called json4s.

All the code produced in this tutorial can be found here.

Why Json4s?

Although the Spray way of (de)serializing objects works out-of-the-box, it is not always super convenient to use.

In fact, for every entity we need to specify the corresponding JSON format. See for example our QuizProtocol object from the previous tutorial:

// file QuizProtocol.scala
 ...  
  object Quiz extends DefaultJsonProtocol {
    implicit val format = jsonFormat3(Quiz.apply)
  }

  object Question extends DefaultJsonProtocol {
    implicit val format = jsonFormat2(Question.apply)
  }

  object Answer extends DefaultJsonProtocol {
    implicit val format = jsonFormat1(Answer.apply)
  }
...
/* We need to do it for each object
 * and we need to specify the number of fields to serialize
 */

Json4s is an easy-to-use library that automatically (de)serializes case classes into JSON objects: by using this library we will avoid the need of specifying a (de)serialization format for every case class.

Set up

First of all, we need to import the library by adding the dependencies to our build.sbt file:

// build.sbt
libraryDependencies ++= {
  ...
  val Json4sVersion = "3.2.11"

  Seq(
    ...
    "org.json4s" %% "json4s-native" % Json4sVersion,
    "org.json4s" %% "json4s-ext" % Json4sVersion,
    ...
  )
}

We now need to tell json4s about any customization we would like in our serializers.

For example, we can define a different date format to use:

// file JsonSupport.scala
package com.danielasfregola.quiz.management.serializers

import java.text.SimpleDateFormat

import org.json4s.ext.JodaTimeSerializers
import org.json4s.{DefaultFormats, Formats}
import spray.httpx.Json4sSupport

trait JsonSupport extends Json4sSupport {

  implicit def json4sFormats: Formats = customDateFormat ++ JodaTimeSerializers.all

  val customDateFormat = new DefaultFormats {
    override def dateFormatter = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss")
  }
  
}

Also, note that the package json4s-ext offers serializers for types that, although not standard, are quite common. For example, if you use joda.time you will need to add a specific group of serializers, called JodaTimeSerializers.

Json4s Custom Serializers

What happens if you need to serialize a type that is not supported by json4s? Do not dispare! In fact, json4s allows you to specify custom serializers quite easily.

For example, let’s assume that your API uses Timestamp dates and that when, (de)serializing a date in JSON, it should be converted to milliseconds from the unix epoc (i.e.: 1970-01-01 00:00:00 UTC). All you have to do is to create a custom serializer as following:

// file CustomerSerializers.scala
package com.danielasfregola.quiz.management.serializers

import java.sql.Timestamp

import org.json4s.CustomSerializer
import org.json4s.JsonAST.{JInt, JNull}

object CustomSerializers {
  val all = List(CustomTimestampSerializer)
}

case object CustomTimestampSerializer extends CustomSerializer[Timestamp](format =>
  ({
    case JInt(x) => new Timestamp(x.longValue * 1000)
    case JNull => null
  },
    {
      case date: Timestamp => JInt(date.getTime / 1000)
    }))

Don’t forget to add your custom serializers as part of your (de)serialization configuration:

// file JsonSupport.scala
...

trait JsonSupport extends Json4sSupport {

  implicit def json4sFormats: Formats = ... ++ CustomSerializers.all

  ...
}

Usage

Now that the set up is completed we just need to extend the JsonSupport trait every time a JSON format for (de)serialization is required.

For example, our QuitProtocol object now looks a lot simpler than before:

// file QuizProtocol.scala
package com.danielasfregola.quiz.management

import com.danielasfregola.quiz.management.serializers.JsonSupport

object QuizProtocol extends JsonSupport {

  case class Quiz(id: String, question: String, correctAnswer: String)
  
  case object QuizCreated
  
  case object QuizAlreadyExists
  
  case object QuizDeleted
  
  case class Question(id: String, question: String)
  
  case object QuestionNotFound
  
  case class Answer(answer: String)
  
  case object CorrectAnswer
  
  case object WrongAnswer

  implicit def toQuestion(quiz: Quiz): Question = Question(id = quiz.id, question = quiz.question)

  implicit def toAnswer(quiz: Quiz): Answer = Answer(answer = quiz.correctAnswer)
}

Summary

Previously, we have described how to create a REST API using Spray. This article has simplified how to perform (de)serialization in Spray, by using the json4s library.

All the code produced in this tutorial can be found here.

How to Create a Spray Custom Authenticator

Spray has a few standard authenticators already built in. However, what if the authentication we need is not supported? In this article we will describe how to create a custom authenticator in Spray. In particular, we will implement a custom client id/secret token-based authentication.

Spray Authentication: how it works

Spray has already some built in authentication systems. For example, it supports HTTP Basic Authentication: have a look at their documentation here. What if the authentication logic that we need is not supported? Luckily, Spray allows the implementation of custom authenticators.

The authenticate directive has two signatures:

  def authenticate[T](auth: ⇒ Future[Authentication[T]])(implicit executor: ExecutionContext): Directive1[T]
  def authenticate[T](auth: ContextAuthenticator[T])(implicit executor: ExecutionContext): Directive1[T]

where ContextAuthenticator and Authentication are aliases for the following types:

// from spray.routing.authentication package object
...
  type Authentication[T] = Either[Rejection, T]
  type ContextAuthenticator[T] = RequestContext ⇒ Future[Authentication[T]]
...

In other words, a ContextAuthenticator is a function that takes a RequestContext (i.e.: a wrapper that contains all the information about the received request) and it uses it to either authenticate or reject the request.

Cient Id/Secret Custom Authenticator

Our goal is to create a custom authenticator that looks at the query parameters of a request, it extracts some client_id and client_secret and it authorizes the request if they are correct.

For the purposes of this tutorial we have decided to focusing on the second signature of the authenticate directive, so we will define a ContextAuthenticator to use with the directive.

First, let’s create a data container for our credentials and an auxiliary method to extract the id/secret from the request context:

  case class Credentials(id: String, secret: String)

  private def extractCredentials(ctx: RequestContext): Option[Credentials] = {
    val queryParams = ctx.request.uri.query.toMap
    for {
      id <- queryParams.get("client_id")
      secret <- queryParams.get("client_secret")
    } yield Credentials(id, secret)
  }

We can now define our ContextAuthenticator as following, keeping in mind that Authentication[T] is just a type alias for Either[Rejection, T]:

 val authenticator: ContextAuthenticator[Unit] = { ctx =>
      Future {
        val maybeCredentials = extractCredentials(ctx)
          maybeCredentials.fold[authentication.Authentication[Unit]](
            Left(AuthenticationFailedRejection(CredentialsMissing, List()))
          )( credentials =>
              credentials match {
                case AppCredentials("my-client-id", "my-client-super-secret") => Right()
                case _ => Left(AuthenticationFailedRejection(CredentialsRejected, List()))
              }
          )
      }
  }

If any of the authentication query parameters are missing, the following response is returned:

Status: 401 Unauthorized
Body: The resource requires authentication, which was not supplied with the request

If the credentials are provided but they are not correct, the client will see the following message:

Status: 401 Unauthorized
Body: The supplied authentication is invalid

Finally, if the credentials are correct, the request will be authorized and satisfied if possible.

Note that in a more realistic case scenario, you would probably return a User entity instead of Unit. Moreover, rather than having the correct credentials hard-coded in the code, they should be either configurable or stored in a proper data storage.

We are now ready to use our custom authenticator!

For example, to make all our endpoints use our custom authenticator we could do something similar to the following:

 def routes: Route = 
    sealRoute {
        authenticate(authenticator) { authenticated =>
            firstResourceRoutes ~
              secondResourceRoutes
          }
      }
    }

Summary

Spray allows the implementation of custom authenticators. This article provides an example of how this feature can be used to implement a client id/secret token based authentication.