Akka Notes - DeathWatch - 7
When we talked about Actor lifecycle, we saw that Actors could be stopped by various means (using ActorSystem.stop or ActorContext.stop or sending a PoisonPill - there's also the Kill and the gracefulStop).
Whatever reason an Actor dies, there are cases when a few other actors in the system would like to know about it. Let's take a trivial example of an Actor who talks to a database - let's call it a RepositoryActor. For obvious reasons, there would be few other actors in the system who would be sending message to this RepositoryActor. These "interested" Actors would like to keep an eye on or watch this Actor if it goes down. Now, that in Actor terms is called DeathWatch. And the methods to watch and unwatch over this is intuitively ActorContext.watch and ActorContext.unwatch. If watched, the watchers would receive a Terminated message from the stopped Actor which they could comfortably add in to their receive function.
Unlike Supervision, where there is a strict enforcement of parent-child hierarchy, any Actor could watch any other Actor in the ActorSystem.

Let's have a look at the code.
Code
QuoteRepositoryActor
- Our
QueryRepositoryActorholds a bunch ofquotesas a List and serves a random one upon receiving aQuoteRepositoryRequest. - It keeps track of the number of messages received and if it receives more than 3 messages, it kills itself with a
PoisonPill
Nothing fancy here.
package me.rerun.akkanotes.deathwatch
import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala}
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._
import scala.util.Random
class QuoteRepositoryActor() extends Actor with ActorLogging {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
var repoRequestCount:Int=1
def receive = {
case QuoteRepositoryRequest => {
if (repoRequestCount>3){
self!PoisonPill
}
else {
//Get a random Quote from the list and construct a response
val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))
log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
repoRequestCount=repoRequestCount+1
sender ! quoteResponse
}
}
}
}
TeacherActorWatcher
Again, nothing fancy with TeacherActorWatcher except that it creates the QuoteRepositoryActor and watches over it using a context.watch.
package me.rerun.akkanotes.deathwatch
import akka.actor.{Terminated, Props, Actor, ActorLogging}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest
class TeacherActorWatcher extends Actor with ActorLogging {
val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
context.watch(quoteRepositoryActor)
def receive = {
case QuoteRequest => {
quoteRepositoryActor ! QuoteRepositoryRequest
}
case Terminated(terminatedActorRef)=>{
log.error(s"Child Actor {$terminatedActorRef} Terminated")
}
}
}
TestCases
This is the interesting bit. Frankly, I never thought that these could be tested. akka-testkit FTW. We will analyze three testcases here :
1. Assert receipt of Terminated message if watched
The QuoteRepositoryActor should send the testcase a Terminated message on receipt of the 4th message. The first three messages should go in fine.
"A QuoteRepositoryActor" must {
...
...
...
"send back a termination message to the watcher on 4th message" in {
val quoteRepository=TestActorRef[QuoteRepositoryActor]
val testProbe=TestProbe()
testProbe.watch(quoteRepository) //Let's watch the Actor
within (1000 millis) {
var receivedQuotes = List[String]()
(1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
receiveWhile() {
case QuoteRepositoryResponse(quoteString) => {
receivedQuotes = receivedQuotes :+ quoteString
}
}
receivedQuotes.size must be (3)
println(s"receiveCount ${receivedQuotes.size}")
//4th message
quoteRepository!QuoteRepositoryRequest
testProbe.expectTerminated(quoteRepository) //Expect a Terminated Message
}
}
2. Assert non-receipt of Terminated message if not watched/unwatched
Actually, we are over-doing things just to showcase the context.unwatch. The testcase would work just fine if we remove the testProbe.watch and testProbe.unwatch lines.
"not send back a termination message on 4th message if not watched" in {
val quoteRepository=TestActorRef[QuoteRepositoryActor]
val testProbe=TestProbe()
testProbe.watch(quoteRepository) //watching
within (1000 millis) {
var receivedQuotes = List[String]()
(1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
receiveWhile() {
case QuoteRepositoryResponse(quoteString) => {
receivedQuotes = receivedQuotes :+ quoteString
}
}
testProbe.unwatch(quoteRepository) //not watching anymore
receivedQuotes.size must be (3)
println(s"receiveCount ${receivedQuotes.size}")
//4th message
quoteRepository!QuoteRepositoryRequest
testProbe.expectNoMsg() //Not Watching. No Terminated Message
}
}
3. Assert receipt of Terminated message in the TeacherActorWatcher
We subscribe to the EventStream and check for a specific log message to assert termination.
"end back a termination message to the watcher on 4th message to the TeacherActor" in {
//This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
val teacherActor=TestActorRef[TeacherActorWatcher]
within (1000 millis) {
(1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor
EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
teacherActor!QuoteRequest //Send the dangerous 4th message
}
}
}
The pattern property of the EventFilter, not surprisingly, expects a regex pattern. The pattern="""Child Actor .* Terminated""" is expected to match a log message which is of the format Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated
Github
As always, the code is available at github. Watch for the deathwatch package.