Akka Notes - DeathWatch - 7
When we talked about Actor lifecycle, we saw that Actors could be stopped by various means (using ActorSystem.stop or ActorContext.stop or sending a PoisonPill
- there's also the Kill
and the gracefulStop
).
Whatever reason an Actor dies, there are cases when a few other actors in the system would like to know about it. Let's take a trivial example of an Actor who talks to a database - let's call it a RepositoryActor
. For obvious reasons, there would be few other actors in the system who would be sending message to this RepositoryActor
. These "interested" Actors would like to keep an eye
on or watch
this Actor if it goes down. Now, that in Actor terms is called DeathWatch
. And the methods to watch
and unwatch
over this is intuitively ActorContext.watch
and ActorContext.unwatch
. If watched, the watchers would receive a Terminated
message from the stopped Actor which they could comfortably add in to their receive
function.
Unlike Supervision, where there is a strict enforcement of parent-child hierarchy, any Actor could watch
any other Actor in the ActorSystem.
Let's have a look at the code.
Code
QuoteRepositoryActor
- Our
QueryRepositoryActor
holds a bunch ofquotes
as a List and serves a random one upon receiving aQuoteRepositoryRequest
. - It keeps track of the number of messages received and if it receives more than 3 messages, it kills itself with a
PoisonPill
Nothing fancy here.
package me.rerun.akkanotes.deathwatch
import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala}
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._
import scala.util.Random
class QuoteRepositoryActor() extends Actor with ActorLogging {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
var repoRequestCount:Int=1
def receive = {
case QuoteRepositoryRequest => {
if (repoRequestCount>3){
self!PoisonPill
}
else {
//Get a random Quote from the list and construct a response
val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))
log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
repoRequestCount=repoRequestCount+1
sender ! quoteResponse
}
}
}
}
TeacherActorWatcher
Again, nothing fancy with TeacherActorWatcher
except that it creates the QuoteRepositoryActor
and watches over it using a context.watch
.
package me.rerun.akkanotes.deathwatch
import akka.actor.{Terminated, Props, Actor, ActorLogging}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest
class TeacherActorWatcher extends Actor with ActorLogging {
val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
context.watch(quoteRepositoryActor)
def receive = {
case QuoteRequest => {
quoteRepositoryActor ! QuoteRepositoryRequest
}
case Terminated(terminatedActorRef)=>{
log.error(s"Child Actor {$terminatedActorRef} Terminated")
}
}
}
TestCases
This is the interesting bit. Frankly, I never thought that these could be tested. akka-testkit FTW. We will analyze three testcases here :
1. Assert receipt of Terminated
message if watched
The QuoteRepositoryActor
should send the testcase a Terminated
message on receipt of the 4th message. The first three messages should go in fine.
"A QuoteRepositoryActor" must {
...
...
...
"send back a termination message to the watcher on 4th message" in {
val quoteRepository=TestActorRef[QuoteRepositoryActor]
val testProbe=TestProbe()
testProbe.watch(quoteRepository) //Let's watch the Actor
within (1000 millis) {
var receivedQuotes = List[String]()
(1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
receiveWhile() {
case QuoteRepositoryResponse(quoteString) => {
receivedQuotes = receivedQuotes :+ quoteString
}
}
receivedQuotes.size must be (3)
println(s"receiveCount ${receivedQuotes.size}")
//4th message
quoteRepository!QuoteRepositoryRequest
testProbe.expectTerminated(quoteRepository) //Expect a Terminated Message
}
}
2. Assert non-receipt of Terminated
message if not watched/unwatched
Actually, we are over-doing things just to showcase the context.unwatch
. The testcase would work just fine if we remove the testProbe.watch
and testProbe.unwatch
lines.
"not send back a termination message on 4th message if not watched" in {
val quoteRepository=TestActorRef[QuoteRepositoryActor]
val testProbe=TestProbe()
testProbe.watch(quoteRepository) //watching
within (1000 millis) {
var receivedQuotes = List[String]()
(1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
receiveWhile() {
case QuoteRepositoryResponse(quoteString) => {
receivedQuotes = receivedQuotes :+ quoteString
}
}
testProbe.unwatch(quoteRepository) //not watching anymore
receivedQuotes.size must be (3)
println(s"receiveCount ${receivedQuotes.size}")
//4th message
quoteRepository!QuoteRepositoryRequest
testProbe.expectNoMsg() //Not Watching. No Terminated Message
}
}
3. Assert receipt of Terminated
message in the TeacherActorWatcher
We subscribe to the EventStream and check for a specific log message to assert termination.
"end back a termination message to the watcher on 4th message to the TeacherActor" in {
//This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
val teacherActor=TestActorRef[TeacherActorWatcher]
within (1000 millis) {
(1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor
EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
teacherActor!QuoteRequest //Send the dangerous 4th message
}
}
}
The pattern
property of the EventFilter
, not surprisingly, expects a regex pattern. The pattern="""Child Actor .* Terminated"""
is expected to match a log message which is of the format Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated
Github
As always, the code is available at github. Watch for the deathwatch
package.