Peer-to-Many Communication in Akka

The most common communication channel in Akka is Peer-to-Peer, where an individual actor sends a message directly to another individual actor actor. However, sometimes this is not enough as we may need a Peer-to-Many type of communication, where an individual actor sends a message to a group of actors. This is particularly useful when you need to model your system using a Publisher-Subscriber Pattern. This article will provide a quick tutorial on how to use and customise an Event Bus, the Akka way of implementing a Peer-to-Many communication channel.

Event Stream

Event Stream is the simplest and most common implementation of an Event Bus. It follows the classic Publisher-Subscriber Pattern: one system actor will publish a message and all the actors that subscribed to that specific message type will receive it.

Let’s see with a simple tutorial how easily this can be achieved: (gist available here).

In this tutorial, we want to model the following scenario: every time someone publishes a book, all the subscribers need to receive it.

First of all, we need to define what a book is:

case class Book(title: String, authors: List[String])

Then, we need to specify an Actor that acts as book publisher: every time it receives a book, it publishes it on the System Event Stream.

class BookPublisher extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      context.system.eventStream.publish(book)
    }
  }
  
}

Finally, let’s create an Actor that will subscribe to the System Event Stream for all messages of type Book. Note that the preStart function will be executed by Akka right after the creation of the Actor.

class BookSubscriber extends Actor {
  
  override def preStart = context.system.eventStream.subscribe(self, classOf[Book])
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Done! See, it wasn’t that bad… 🙂

Now we just need to play with our system to make sure it works as expected:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
 
  val author = "Author"
  
  val bookPublisher = system.actorOf(Props[BookPublisher], name = "book-publisher")
  
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1") 
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
  
  bookPublisher ! Book(title = "A book title", authors = List(author, "Another author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-2 and I have received a new book: Book(A book title,List(Author, Another author))
  
  system.eventStream.unsubscribe(subscriber2, classOf[Book])
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another author))
  // My name is subscriber-1 and I have received a new book: Book(Another book title,List(Another author))
} 

Note that instead of unsubscribing subscriber2 to all the messages of type Book, we could have also unsubscribed it to any type of messages by using system.eventStream.unsubscribe(subscriber2).

Custom Event Bus

Event Streams are really easy to use. However, they may not be that useful if we want to perform some filtering on the published message. One solution to our problem could be to do some filtering before publishing the message, but what if this cannot be done or simply we want to do it in a more elegant way?

An Event Stream is a specific implementation of a Event Bus trait: Akka gives us to opportunity to reuse this trait to create Custom Event Buses.

Assume that now a subscriber wants to receive books for a specific author — gist of the following code can be found here.

First of all, the BookSubscriber actor doesn’t need to automatically subscribe to the System Event Stream. Moreover, the BookPublisher actor now needs to publish on a given Event Bus, rather than the System Event Stream.

class BookPublisher(bus: AuthorBookBus) extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      bus.publish(book)
    }
  }
  
}
 
class BookSubscriber extends Actor {
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Finally, let’s define our AuthorBookBus that will filter books according to their authors.

class AuthorBookBus(author: String) extends EventBus
  with LookupClassification
  with ActorEventBus {
  
  type Event = Book
  type Classifier = Boolean
  
  override def mapSize = 2
  
  override def classify(book: Book) = book.authors.contains(author)
 
  override protected def publish(book: Book, subscriber: Subscriber)= subscriber ! book
} 

Our Event Bus accepts events of type Book and it filters books with two possible values: true if author is one of the authors of the book, false otherwise — and this is why override def mapSize = 2! The function classify is used to categories a book according to the Classifier type. Akka provides different type of classifiers (see here), in our case the LookupClassification was enough as we wanted a classification based on the event itself.

The following app shows an example of how our actor system could be used:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
  
  val author = "Author"
 
  val authorBookBus = new AuthorBookBus(author)
  val bookPublisher = system.actorOf(Props(new BookPublisher(authorBookBus)), name = "book-publisher")
 
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1")
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
 
  authorBookBus.subscribe(subscriber1, true)
  // i.e.: subscriber1 will receive all the books
  // where one of the authors is "Author"

  authorBookBus.subscribe(subscriber2, false)
  // i.e.: subscriber2 will receive all the books 
  // where "Author" is not an author

  bookPublisher ! Book(title = "A book title", authors = List(author, "Another Author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another Author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another Author))
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another Author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another Author))
  // My name is subscriber-2 and I have received a new book: Book(Another book title,List(Another Author))
}

Summary

Not only Akka allows Peer-to-Peer communication, but also Peer-to-Many, which is useful to implement a publisher-subscriber pattern. This article has described how this can be achieved using Event Streams for simplest case scenarios and Custom Event Buses when some classification on the event is needed.

Pure Functions

One of the key principles of functional programming is writing pure functions. What is a pure functions? Why do we care? Why everyone is talking about it? This article will try and answer these questions.

Purity and Referentially Transparent Expressions

A pure function is composed only by referentially transparent expressions. An expression is referentially transparent if it can always be substituted with the result of its evaluation.

It’s a bit like solving mathematical expressions the same way we were doing back at school when we were younger:
2 * (1 + 3) becomes…
2 * 4 becomes…
8
The expressions 2 * (1 + 3) and 8 are equivalent, that’s why 2 * (1 + 3) is a referentially transparent expression.

Let’s see how we can relate the referentially transparent expression concept to pure functions!

Consider the following expression instead: println("Yo!"). When evaluating it, its result is of type Unit: the two expressions are not equivalent, so println is considered to be a non-pure function.

Let’s analyse the following scala function, called myFunc: because it is not defined for each integer value, it is considered to be a pure function.

scala> def myFunc(n: Int) = if (n > 0) n + 1
myFunc: (n: Int)AnyVal

scala> myFunc(1)
res0: AnyVal = 2

scala> myFunc(-1)
res1: AnyVal = ()

Why Pure Functions?

One of the main principles of Functional Programming is to write our applications so that the core is made of pure functions, while side effects are in a thin external layer. Being functional is not always easy, what are the benefits of doing it?

First of all, pure functions are usually smaller and easier to understand: the human brain already thinks in a functional way! Moreover, being functional force the separation between different types of concerns: what a function does rather than how a function can be used.

Pure functions are also a lot easier to test. For example, let’s assume we have the following two function:

scala> :paste
// Entering paste mode (ctrl-D to finish)

def pureF(name: String) = s"Yo $name"

def impureF(name: String) = println(s"Yo $name")


// Exiting paste mode, now interpreting.

pureF: (name: String)String
impureF: (name: String)Unit

In order to test our function pureF, 1 line of code is enough: assert(pureF("Fella") == "Yo Fella"). On the other side, testing the function impureF is a lot more complicated as we need to redirect the standard output and do assertions on it.

Finally, in my opinion its biggest advantage, Functional Programming makes developers more productive. Pure functions are small and they can be easily composed together, the code duplication is minimised and it is extremely reusable. For example, consider the following two functions:

scala> def pureSum(a: Int, b: Int) = a + b
pureSum: (a: Int, b: Int)Int

scala> def impureSum(a: Int, b: Int) = println(a +b)
impureSum: (a: Int, b: Int)Unit

Differently from the function impureSum, the function pureSum can be easily reused to sum an arbitrary sequence of numbers as following:

scala> Seq.range(0,10).reduce(pureSum)
res0: Int = 45

Summary

Pure functions are one of the fundamentals of Functional Programming. This article has described what a pure function is and what are its advantages.

How to Compose Futures

Futures are a powerful tool that has been developed by the Akka team and then adopted as a standard Scala library from version 2.10.
A Future is a placeholder for a value that will be available in the future: thanks to it, it is possible to run operations in parallel and to worry about what to do with it only once the value is available making our applications more scalable and performant. A lot can be achieved with it, have a look at the official Scala documentation for Future and Promises. Each future can be seen as an isolated parallel operation, so combining them can be challenging: in this article we will describe how Futures can be composed together.

How to Select the Fastest Future

Let’s assume that in our application we have more services to perform the same operation and that these services have a different response time according to their traffic load. Because our application doesn’t have any information on the load of each service, or simply we don’t want to rely on it, we want to call all the services and get the first reply we get back: let’s see how this can be achieved using futures.

First of all, let’s simplify our life a bit: for the purposes of this tutorial, we will simulate the behaviour of our services with a method that will wait a period of time before returning a String wrapped in a Future:

def reply(timeout: Duration, msg: String): Future[String] = Future {
  Thread.sleep(timeout.toMillis)
  msg
}

Future.firstCompletedOf is the function that we are looking for: it will get a sequence of futures and return the first one that completes:

val futureSlowReply = reply(1 second, "Hello from a slow fella")
val futureFastReply = reply(100 milliseconds, "I am a super fast fella!")

val futureReplies = Seq(futureSlowReply, futureFastReply)
val futureFastestReply = Future.firstCompletedOf(futureReplies)

Await.result(futureFastestReply, 100 milliseconds)
// res0: String = I am a super fast fella!

Note that waiting 100 milliseconds to complete the future is enough: all the futures are run in parallel and we know that the fastest will complete by then.

How to Combine Futures in Parallel

What if we have different services that process that same information differently? For example, given a customer id we have a service to retrieve the account information, another to retrieve the payment details, another to retrieve product suggestions based on previous selections. We could do it the old Java style way and retrieve sequentially all the information…or we could retrieve all the information in parallel and be really efficient! 😀

Let’s see how this can be achieved using the zip method of the Future class:

val futureSlowReply = reply(1 second, "Hello from a slow fella")
val futureFastReply = reply(100 milliseconds, "I am a super fast fella!")

val futureAllParallelReplies< = futureSlowReply.zip(futureFastReply)
Await.result(futureAllParallelReplies, 1 second)
// res1: (String, String) = (Hello from a slow fella,I am a super fast fella!)

Note that waiting the combined future value, called futureAllParallelReplies, for less than 1 second would generate a java.util.concurrent.TimeoutException: the zip function needs all the futures to be completed before returning a composition of all the futures!

How to Concatenate Futures

In order to combine futures in parallel they need to be independent from each other. What if this is not possible and we need to run them sequentially?

All we need to do is using the for-comprehension loop to force the futures to run sequentially:

def futureAllSequentialReplies(msg: String) = for {
  firstReply <- reply(100 milliseconds, msg)
  nextMsg = if (msg.length < 3) msg.reverse else msg.toUpperCase 
  secondReply <- reply(200 milliseconds, nextMsg)
} yield (firstReply, secondReply)

Await.result(futureAllSequentialReplies("Hi"), 400 milliseconds)
// res2: (String, String) = (Hi,iH)
Await.result(futureAllSequentialReplies("Hello"), 400 milliseconds)
// res3: (String, String) = (Hello,HELLO)

Note that waiting for 300 milliseconds is not enough: not only the futures are run sequentially moreover, but also we spend some time computing the nextMsg String.

Summary

Future is a powerful tool to perform operations in parallel. However, combining several parallel operation can be challenging. This article has described who easily we can compose Scala Futures: how to filter them, how to combine them in parallel and, when needed, how to force them to run sequentially.