Akka Notes - Actor Supervision - 8
Failures are more like a feature among distributed systems. And with Akka's let it crash fault tolerance model, you could achieve a clear separation between your business logic and your failure handling logic (supervision logic). All with very little effort. It's pretty amazing. This is the topic of our discussion now.
Actor Supervision
Imagine a method call stack and the top most method in your stack throws an Exception. What could be done by the methods down the stack?
- The exception could be caught and handled in order to recover
- The exception could be caught, may be logged and kept quiet.
- The methods down the stack could also choose to duck the exception completely (or may be caught and rethrown)
Imagine if all the methods until the main method doesn't handle the exception. In that case, the program exits after writing an essay for an exception to the console.
You could also compare the same scenario with spawning Threads. If a child thread throws an exception and if the run
or the call
method doesn't handle it, then the exception is expected to be handled by the parent thread or the main thread, whatever be the case. If the main thread doesn't handle it, then the system exits.
Let's do it one more time - if the Child Actor which was created using the context.actorOf
fails with an Exception, the parent actor (aka supervisor) could prefer to handle any failures of the child actor. If it does, it could prefer to handle it and recover (Restart
/Resume
). Else, duck the exception (Escalate
) to its parent. Alternatively, it could just Stop
the child actor - that's the end of story for that child. Why did I say parent (aka supervisor)? Simply because Akka's approach towards supervision is Parental supervision - which means that only the creators of the Actors could supervise over them.
That's it !! We have pretty much covered all the supervision Directives
(what could be done about the failures).
Strategies
Ah, I forgot to mention this one : You already know that an Akka Actor could create children and that they could create as many children as they want.
Now, consider two scenarios :
1. OneForOneStrategy
Your Actor spawns multiple child actors and each one of these child actors connect to different datasources. Say you are running an app which translates an english word into multiple languages.
Suppose, one child actor fails and you are fine to skip that result in the final list, what would you want to do? Shut down the service? Nope, you might want to just restart/stop only that child actor. Isn't it? Now that's called OneForOneStrategy
in Akka supervision strategy terms - If one actor goes down, just handle one alone.
Depending on your business exceptions, you would want to react differently (Stop
, Restart
, Escalate
, Resume
) to different exceptions. To configure your own strategy, you just override the supervisorStrategy
in your Actor class.
An example declaration of OneForOneStrategy
would be
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop
class TeacherActorOneForOne extends Actor with ActorLogging {
...
...
override val supervisorStrategy=OneForOneStrategy() {
case _: MinorRecoverableException => Restart
case _: Exception => Stop
}
...
...
2. AllForOneStrategy
Assume that you are doing an External Sort (One more example to prove that my creativity sucks!!), and each of your chunk is handled by a different Actor. Suddenly, one Actor fails throwing an exception. It doesn't make any sense to continue processing the rest of the chunks because the final result wouldn't be correct. So, it is logical to Stop
~ALL~ the actors.
Why did I say Stop
instead of Restart
in the previous line? Because Restart
ing would also not make any sense for this use-case considering the mailbox for each of these Actors would not be cleared on Restart. So, if we restart, the rest of the chunks would still be processed. That's not what we want. Recreating the Actors with shiny new mailboxes would be the right approach here.
Again, just like the OneForOneStrategy
, you just override the supervisorStrategy
with an implementation of AllForOneStrategy
And example would be
import akka.actor.{Actor, ActorLogging}
import akka.actor.AllForOneStrategy
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.SupervisorStrategy.Stop
class TeacherActorAllForOne extends Actor with ActorLogging {
...
override val supervisorStrategy = AllForOneStrategy() {
case _: MajorUnRecoverableException => Stop
case _: Exception => Escalate
}
...
...
Directives
The constructor of both AllForOneStrategy
and the OneForOneStrategy
accepts a PartialFunction[Throwable,Directive]
called Decider
which maps a Throwable
to a Directive
as you may see here :
case _: MajorUnRecoverableException => Stop
There are simply just four kinds of directives - Stop
, Resume
, Escalate
and Restart
Stop
The child actor is stopped in case of exception and any messages to the stopped actor would obviously go to the deadLetters queue.
Resume
The child actor just ignores the message that threw the exception and proceeds with processing the rest of the messages in the queue.
Restart
The child actor is stopped and a brand new actor is initialized. Processing of the rest of the messages in the mailbox continue. The rest of the world is unaware that this happened since the same ActorRef is attached to the new Actor.
Escalate
The supervisor ducks the failure and lets its supervisor handle the exception.
Default Strategy
What if our Actor doesn't specify any Strategy but has created child Actors. How are they handled? There is a default strategy declared in the Actor
trait which (if condensed) looks like below :
override val supervisorStrategy=OneForOneStrategy() {
case _: ActorInitializationException=> Stop
case _: ActorKilledException => Stop
case _: DeathPactException => Stop
case _: Exception => Restart
}
So, in essence, the default strategy handles four cases :
1. ActorInitializationException => Stop
When the Actor could not be initialized, it would throw an ActorInitializationException
. The Actor would be stopped then. Let's simulate it by throwing an exception in the preStart
callback :
package me.rerun.akkanotes.supervision
import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
object ActorInitializationExceptionApp extends App{
val actorSystem=ActorSystem("ActorInitializationException")
val actor=actorSystem.actorOf(Props[ActorInitializationExceptionActor], "initializationExceptionActor")
actor!"someMessageThatWillGoToDeadLetter"
}
class ActorInitializationExceptionActor extends Actor with ActorLogging{
override def preStart={
throw new Exception("Some random exception")
}
def receive={
case _=>
}
}
Running the ActorInitializationExceptionApp
would generate a ActorInitializationException
(duh!!) and then move all the messages into the message queue of the deadLetters
Actor:
Log
[ERROR] [11/10/2014 16:08:46.569] [ActorInitializationException-akka.actor.default-dispatcher-2] [akka://ActorInitializationException/user/initializationExceptionActor] Some random exception
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
...
...
Caused by: java.lang.Exception: Some random exception
at me.rerun.akkanotes.supervision.ActorInitializationExceptionActor.preStart(ActorInitializationExceptionApp.scala:17)
...
...
[INFO] [11/10/2014 16:08:46.581] [ActorInitializationException-akka.actor.default-dispatcher-4] [akka://ActorInitializationException/user/initializationExceptionActor] Message [java.lang.String] from Actor[akka://ActorInitializationException/deadLetters] to Actor[akka://ActorInitializationException/user/initializationExceptionActor#-1290470495] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2. ActorKilledException => Stop
When the Actor was killed using the Kill
message, then it would throw an ActorKilledException
. The default strategy would stop the child Actor if it throws the exception. At first, it seems that there's no point in stopping an already killed Actor. However, consider this :
-
ActorKilledException
would just be propagated to the supervisor. What about the lifecycle watchers or deathwatchers of this Actor that we saw during DeathWatch. The watchers won't know anything until the Actor isStopped
. -
Sending a
Kill
on an Actor would just affect that particular actor which the supervisor knows. However, handling that withStop
would suspend the mailbox of that Actor, suspends the mailboxes of child actors, stops the child actors, sends aTerminated
to all the child actor watchers, send aTerminated
to all the immediate failed Actor's watchers and finally stop the Actor itself. (Wow, that's pretty awesome !!)
package me.rerun.akkanotes.supervision
import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Kill
object ActorKilledExceptionApp extends App{
val actorSystem=ActorSystem("ActorKilledExceptionSystem")
val actor=actorSystem.actorOf(Props[ActorKilledExceptionActor])
actor!"something"
actor!Kill
actor!"something else that falls into dead letter queue"
}
class ActorKilledExceptionActor extends Actor with ActorLogging{
def receive={
case message:String=> log.info (message)
}
}
Log
The logs just say that once the ActorKilledException
comes in, the supervisor stops that actor and then the messages go into the queue of deadLetters
INFO m.r.a.s.ActorKilledExceptionActor - something
ERROR akka.actor.OneForOneStrategy - Kill
akka.actor.ActorKilledException: Kill
INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ActorKilledExceptionSystem/deadLetters] to Actor[akka://ActorKilledExceptionSystem/user/$a#-1569063462] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
3. DeathPactException => Stop
From DeathWatch, you know that when an Actor watches over a child Actor, it is expected to handle the Terminated
message in its receive
. What if it doesn't? You get the DeathPactException
The code shows that the Supervisor watches
the child actor after creation but doesn't handle the Terminated
message from the child.
package me.rerun.akkanotes.supervision
import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Kill
import akka.actor.PoisonPill
import akka.actor.Terminated
object DeathPactExceptionApp extends App{
val actorSystem=ActorSystem("DeathPactExceptionSystem")
val actor=actorSystem.actorOf(Props[DeathPactExceptionParentActor])
actor!"create_child" //Throws DeathPactException
Thread.sleep(2000) //Wait until Stopped
actor!"someMessage" //Message goes to DeadLetters
}
class DeathPactExceptionParentActor extends Actor with ActorLogging{
def receive={
case "create_child"=> {
log.info ("creating child")
val child=context.actorOf(Props[DeathPactExceptionChildActor])
context.watch(child) //Watches but doesnt handle terminated message. Throwing DeathPactException here.
child!"stop"
}
case "someMessage" => log.info ("some message")
//Doesnt handle terminated message
//case Terminated(_) =>
}
}
class DeathPactExceptionChildActor extends Actor with ActorLogging{
def receive={
case "stop"=> {
log.info ("Actor going to stop and announce that it's terminated")
self!PoisonPill
}
}
}
Log
The logs tell us that the DeathPactException
comes in, the supervisor stops that actor and then the messages go into the queue of deadLetters
INFO m.r.a.s.DeathPactExceptionParentActor - creating child
INFO m.r.a.s.DeathPactExceptionChildActor - Actor going to stop and announce that it's terminated
ERROR akka.actor.OneForOneStrategy - Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated
akka.actor.DeathPactException: Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated
INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://DeathPactExceptionSystem/deadLetters] to Actor[akka://DeathPactExceptionSystem/user/$a#-1452955980] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
4. Exception => Restart
For all other Exceptions, the default Directive
is to Restart
the Actor. Check the following app. Just to prove that the Actor is restarted, OtherExceptionParentActor
makes the child throw an exception and immediately sends a message. The message reaches the mailbox and when the the child actor restarts, the message gets processed. Nice !!
package me.rerun.akkanotes.supervision
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Stop
object OtherExceptionApp extends App{
val actorSystem=ActorSystem("OtherExceptionSystem")
val actor=actorSystem.actorOf(Props[OtherExceptionParentActor])
actor!"create_child"
}
class OtherExceptionParentActor extends Actor with ActorLogging{
def receive={
case "create_child"=> {
log.info ("creating child")
val child=context.actorOf(Props[OtherExceptionChildActor])
child!"throwSomeException"
child!"someMessage"
}
}
}
class OtherExceptionChildActor extends akka.actor.Actor with ActorLogging{
override def preStart={
log.info ("Starting Child Actor")
}
def receive={
case "throwSomeException"=> {
throw new Exception ("I'm getting thrown for no reason")
}
case "someMessage" => log.info ("Restarted and printing some Message")
}
override def postStop={
log.info ("Stopping Child Actor")
}
}
Log
The logs of this program is pretty neat.
- The exception gets thrown. We see the trace
- The child restarts - Stop and Start gets called (we'll see about the
preRestart
andpostRestart
callbacks soon) - The message that was send to the Child actor before restart is processed.
INFO m.r.a.s.OtherExceptionParentActor - creating child
INFO m.r.a.s.OtherExceptionChildActor - Starting Child Actor
ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason
java.lang.Exception: I'm getting thrown for no reason
at me.rerun.akkanotes.supervision.OtherExceptionChildActor$$anonfun$receive$2.applyOrElse(OtherExceptionApp.scala:39) ~[classes/:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.4.jar:na]
...
...
INFO m.r.a.s.OtherExceptionChildActor - Stopping Child Actor
INFO m.r.a.s.OtherExceptionChildActor - Starting Child Actor
INFO m.r.a.s.OtherExceptionChildActor - Restarted and printing some Message
Escalate and Resume
We saw examples of Stop
and Restart
via the defaultStrategy
. Now, let's have a quick look at the Escalate
.
Resume
just ignores the exception and proceeds processing the next message in the mailbox. It's more like catching the exception and doing nothing about it. Awesome stuff but not a lot to talk about there.
Escalating generally means that the exception is something critical and the immediate supervisor would not be able to handle it. So, it asks help from its supervisor. Let's take an example.
Consider three Actors - EscalateExceptionTopLevelActor
, EscalateExceptionParentActor
and EscalateExceptionChildActor
. If the child actor throws an exception and if the parent level actor could not handle it, it could Escalate
it to the Top level Actor. The Top level actor could choose to react with any of the Directives. In our example, we just Stop
. Stop
would stop the immediate child (which is the EscalateExceptionParentActor
). As you know, when a Stop
is executed on an Actor, all its children would also be stopped before the Actor itself is stopped.
package me.rerun.akkanotes.supervision
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.SupervisorStrategy.Stop
import akka.actor.actorRef2Scala
object EscalateExceptionApp extends App {
val actorSystem = ActorSystem("EscalateExceptionSystem")
val actor = actorSystem.actorOf(Props[EscalateExceptionTopLevelActor], "topLevelActor")
actor ! "create_parent"
}
class EscalateExceptionTopLevelActor extends Actor with ActorLogging {
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception => {
log.info("The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.")
Stop //Stop will stop the Actor that threw this Exception and all its children
}
}
def receive = {
case "create_parent" => {
log.info("creating parent")
val parent = context.actorOf(Props[EscalateExceptionParentActor], "parentActor")
parent ! "create_child" //Sending message to next level
}
}
}
class EscalateExceptionParentActor extends Actor with ActorLogging {
override def preStart={
log.info ("Parent Actor started")
}
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception => {
log.info("The exception is ducked by the Parent Actor. Escalating to TopLevel Actor")
Escalate
}
}
def receive = {
case "create_child" => {
log.info("creating child")
val child = context.actorOf(Props[EscalateExceptionChildActor], "childActor")
child ! "throwSomeException"
}
}
override def postStop = {
log.info("Stopping parent Actor")
}
}
class EscalateExceptionChildActor extends akka.actor.Actor with ActorLogging {
override def preStart={
log.info ("Child Actor started")
}
def receive = {
case "throwSomeException" => {
throw new Exception("I'm getting thrown for no reason.")
}
}
override def postStop = {
log.info("Stopping child Actor")
}
}
Log
As you could see from the logs,
- The child actor throws exception.
- The immediate supervisor (
EscalateExceptionParentActor
) escalates that exception to its supervisor (EscalateExceptionTopLevelActor
) - The resultant directive from
EscalateExceptionTopLevelActor
is to Stop the Actor. As a sequence, the child actors gets stopped first. - The parent actor gets stopped next (only after the watchers have been notified)
INFO m.r.a.s.EscalateExceptionTopLevelActor - creating parent
INFO m.r.a.s.EscalateExceptionParentActor - Parent Actor started
INFO m.r.a.s.EscalateExceptionParentActor - creating child
INFO m.r.a.s.EscalateExceptionChildActor - Child Actor started
INFO m.r.a.s.EscalateExceptionParentActor - The exception is ducked by the Parent Actor. Escalating to TopLevel Actor
INFO m.r.a.s.EscalateExceptionTopLevelActor - The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.
ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason.
java.lang.Exception: I'm getting thrown for no reason.
at me.rerun.akkanotes.supervision.EscalateExceptionChildActor$$anonfun$receive$3.applyOrElse(EscalateExceptionApp.scala:71) ~[classes/:na]
...
...
INFO m.r.a.s.EscalateExceptionChildActor - Stopping child Actor
INFO m.r.a.s.EscalateExceptionParentActor - Stopping parent Actor
Please note that whatever directive that was issued would only apply to the immediate child that escalated. Say, if a Restart
is issued at the TopLevel, only the Parent would be restarted and anything in its constructor/preStart
would be executed. If the children of the Parent actor was created in the constructor, they would also be created. However, children that were created through messages to the Parent Actor would still be in the Terminated
state.
TRIVIA
Actually, you could control whether the preStart
gets called at all. We'll see about this in the next minor write-up. If you are curious, just have a look at the postRestart
method of the Actor
def postRestart(reason: Throwable): Unit = {
preStart()
}
Code
As always, code is on github
(my .gitignore
wasn't setup right for this project. will fix it today. sorry)