Akka · Tutorial

Peer-to-Many Communication in Akka

The most common communication channel in Akka is Peer-to-Peer, where an individual actor sends a message directly to another individual actor actor. However, sometimes this is not enough as we may need a Peer-to-Many type of communication, where an individual actor sends a message to a group of actors. This is particularly useful when you need to model your system using a Publisher-Subscriber Pattern. This article will provide a quick tutorial on how to use and customise an Event Bus, the Akka way of implementing a Peer-to-Many communication channel.

Event Stream

Event Stream is the simplest and most common implementation of an Event Bus. It follows the classic Publisher-Subscriber Pattern: one system actor will publish a message and all the actors that subscribed to that specific message type will receive it.

Let’s see with a simple tutorial how easily this can be achieved: (gist available here).

In this tutorial, we want to model the following scenario: every time someone publishes a book, all the subscribers need to receive it.

First of all, we need to define what a book is:

case class Book(title: String, authors: List[String])

Then, we need to specify an Actor that acts as book publisher: every time it receives a book, it publishes it on the System Event Stream.

class BookPublisher extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      context.system.eventStream.publish(book)
    }
  }
  
}

Finally, let’s create an Actor that will subscribe to the System Event Stream for all messages of type Book. Note that the preStart function will be executed by Akka right after the creation of the Actor.

class BookSubscriber extends Actor {
  
  override def preStart = context.system.eventStream.subscribe(self, classOf[Book])
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Done! See, it wasn’t that bad… 🙂

Now we just need to play with our system to make sure it works as expected:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
 
  val author = "Author"
  
  val bookPublisher = system.actorOf(Props[BookPublisher], name = "book-publisher")
  
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1") 
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
  
  bookPublisher ! Book(title = "A book title", authors = List(author, "Another author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another author))
  // My name is subscriber-2 and I have received a new book: Book(A book title,List(Author, Another author))
  
  system.eventStream.unsubscribe(subscriber2, classOf[Book])
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another author))
  // My name is subscriber-1 and I have received a new book: Book(Another book title,List(Another author))
} 

Note that instead of unsubscribing subscriber2 to all the messages of type Book, we could have also unsubscribed it to any type of messages by using system.eventStream.unsubscribe(subscriber2).

Custom Event Bus

Event Streams are really easy to use. However, they may not be that useful if we want to perform some filtering on the published message. One solution to our problem could be to do some filtering before publishing the message, but what if this cannot be done or simply we want to do it in a more elegant way?

An Event Stream is a specific implementation of a Event Bus trait: Akka gives us to opportunity to reuse this trait to create Custom Event Buses.

Assume that now a subscriber wants to receive books for a specific author — gist of the following code can be found here.

First of all, the BookSubscriber actor doesn’t need to automatically subscribe to the System Event Stream. Moreover, the BookPublisher actor now needs to publish on a given Event Bus, rather than the System Event Stream.

class BookPublisher(bus: AuthorBookBus) extends Actor {
  
  def receive = {
    case book: Book => {
      println(s"Yeah! Publishing a new book: $book")
      bus.publish(book)
    }
  }
  
}
 
class BookSubscriber extends Actor {
  
  def receive = {
    case book: Book => println(s"My name is ${self.path.name} and I have received a new book: $book")
  }
}

Finally, let’s define our AuthorBookBus that will filter books according to their authors.

class AuthorBookBus(author: String) extends EventBus
  with LookupClassification
  with ActorEventBus {
  
  type Event = Book
  type Classifier = Boolean
  
  override def mapSize = 2
  
  override def classify(book: Book) = book.authors.contains(author)
 
  override protected def publish(book: Book, subscriber: Subscriber)= subscriber ! book
} 

Our Event Bus accepts events of type Book and it filters books with two possible values: true if author is one of the authors of the book, false otherwise — and this is why override def mapSize = 2! The function classify is used to categories a book according to the Classifier type. Akka provides different type of classifiers (see here), in our case the LookupClassification was enough as we wanted a classification based on the event itself.

The following app shows an example of how our actor system could be used:

object Main extends App {
  
  implicit val system = ActorSystem("publisher-subscribers-example")
  
  val author = "Author"
 
  val authorBookBus = new AuthorBookBus(author)
  val bookPublisher = system.actorOf(Props(new BookPublisher(authorBookBus)), name = "book-publisher")
 
  val subscriber1 = system.actorOf(Props[BookSubscriber], name = "subscriber-1")
  val subscriber2 = system.actorOf(Props[BookSubscriber], name = "subscriber-2")
 
  authorBookBus.subscribe(subscriber1, true)
  // i.e.: subscriber1 will receive all the books
  // where one of the authors is "Author"

  authorBookBus.subscribe(subscriber2, false)
  // i.e.: subscriber2 will receive all the books 
  // where "Author" is not an author

  bookPublisher ! Book(title = "A book title", authors = List(author, "Another Author"))
  // Yeah! Publishing a new book: Book(A book title,List(Author, Another Author))
  // My name is subscriber-1 and I have received a new book: Book(A book title,List(Author, Another Author))
 
  bookPublisher ! Book(title = "Another book title", authors = List("Another Author"))
  // Yeah! Publishing a new book: Book(Another book title,List(Another Author))
  // My name is subscriber-2 and I have received a new book: Book(Another book title,List(Another Author))
}

Summary

Not only Akka allows Peer-to-Peer communication, but also Peer-to-Many, which is useful to implement a publisher-subscriber pattern. This article has described how this can be achieved using Event Streams for simplest case scenarios and Custom Event Buses when some classification on the event is needed.

One thought on “Peer-to-Many Communication in Akka

Leave a comment