November 10, 2014 · supervision akka

Akka Notes - Actor Supervision - 8

Failures are more like a feature among distributed systems. And with Akka's let it crash fault tolerance model, you could achieve a clear separation between your business logic and your failure handling logic (supervision logic). All with very little effort. It's pretty amazing. This is the topic of our discussion now.

Actor Supervision

Imagine a method call stack and the top most method in your stack throws an Exception. What could be done by the methods down the stack?

  1. The exception could be caught and handled in order to recover
  2. The exception could be caught, may be logged and kept quiet.
  3. The methods down the stack could also choose to duck the exception completely (or may be caught and rethrown)

Imagine if all the methods until the main method doesn't handle the exception. In that case, the program exits after writing an essay for an exception to the console.

Exception Stack

You could also compare the same scenario with spawning Threads. If a child thread throws an exception and if the run or the call method doesn't handle it, then the exception is expected to be handled by the parent thread or the main thread, whatever be the case. If the main thread doesn't handle it, then the system exits.

Let's do it one more time - if the Child Actor which was created using the context.actorOf fails with an Exception, the parent actor (aka supervisor) could prefer to handle any failures of the child actor. If it does, it could prefer to handle it and recover (Restart/Resume). Else, duck the exception (Escalate) to its parent. Alternatively, it could just Stop the child actor - that's the end of story for that child. Why did I say parent (aka supervisor)? Simply because Akka's approach towards supervision is Parental supervision - which means that only the creators of the Actors could supervise over them.

That's it !! We have pretty much covered all the supervision Directives (what could be done about the failures).

Strategies

Ah, I forgot to mention this one : You already know that an Akka Actor could create children and that they could create as many children as they want.

Now, consider two scenarios :

1. OneForOneStrategy

Your Actor spawns multiple child actors and each one of these child actors connect to different datasources. Say you are running an app which translates an english word into multiple languages.

OneForOneStrategy

Suppose, one child actor fails and you are fine to skip that result in the final list, what would you want to do? Shut down the service? Nope, you might want to just restart/stop only that child actor. Isn't it? Now that's called OneForOneStrategy in Akka supervision strategy terms - If one actor goes down, just handle one alone.

Depending on your business exceptions, you would want to react differently (Stop, Restart, Escalate, Resume) to different exceptions. To configure your own strategy, you just override the supervisorStrategy in your Actor class.

An example declaration of OneForOneStrategy would be


import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop


class TeacherActorOneForOne extends Actor with ActorLogging {

	...
    ...
  override val supervisorStrategy=OneForOneStrategy() {
    
    case _: MinorRecoverableException 	=> Restart
    case _: Exception                   => Stop
    
  }
  ...
  ...  

2. AllForOneStrategy

Assume that you are doing an External Sort (One more example to prove that my creativity sucks!!), and each of your chunk is handled by a different Actor. Suddenly, one Actor fails throwing an exception. It doesn't make any sense to continue processing the rest of the chunks because the final result wouldn't be correct. So, it is logical to Stop ~ALL~ the actors.

AllForOneStrategy

Why did I say Stop instead of Restart in the previous line? Because Restarting would also not make any sense for this use-case considering the mailbox for each of these Actors would not be cleared on Restart. So, if we restart, the rest of the chunks would still be processed. That's not what we want. Recreating the Actors with shiny new mailboxes would be the right approach here.

Again, just like the OneForOneStrategy, you just override the supervisorStrategy with an implementation of AllForOneStrategy

And example would be

import akka.actor.{Actor, ActorLogging}
import akka.actor.AllForOneStrategy
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.SupervisorStrategy.Stop


class TeacherActorAllForOne extends Actor with ActorLogging {

  ...

  override val supervisorStrategy = AllForOneStrategy() {

    case _: MajorUnRecoverableException => Stop
    case _: Exception => Escalate

  }
  ...
  ...

Directives

The constructor of both AllForOneStrategy and the OneForOneStrategy accepts a PartialFunction[Throwable,Directive] called Decider which maps a Throwable to a Directive as you may see here :

case _: MajorUnRecoverableException => Stop

There are simply just four kinds of directives - Stop, Resume, Escalate and Restart

Stop

The child actor is stopped in case of exception and any messages to the stopped actor would obviously go to the deadLetters queue.

Resume

The child actor just ignores the message that threw the exception and proceeds with processing the rest of the messages in the queue.

Restart

The child actor is stopped and a brand new actor is initialized. Processing of the rest of the messages in the mailbox continue. The rest of the world is unaware that this happened since the same ActorRef is attached to the new Actor.

Escalate
The supervisor ducks the failure and lets its supervisor handle the exception.

Default Strategy

What if our Actor doesn't specify any Strategy but has created child Actors. How are they handled? There is a default strategy declared in the Actor trait which (if condensed) looks like below :

override val supervisorStrategy=OneForOneStrategy() {
    
    case _: ActorInitializationException=> Stop
    case _: ActorKilledException		=> Stop
    case _: DeathPactException 			=> Stop
    case _: Exception                   => Restart
    
}
  

So, in essence, the default strategy handles four cases :

1. ActorInitializationException => Stop

When the Actor could not be initialized, it would throw an ActorInitializationException. The Actor would be stopped then. Let's simulate it by throwing an exception in the preStart callback :

package me.rerun.akkanotes.supervision

import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging

object ActorInitializationExceptionApp extends App{

  val actorSystem=ActorSystem("ActorInitializationException")
  val actor=actorSystem.actorOf(Props[ActorInitializationExceptionActor], "initializationExceptionActor")
  actor!"someMessageThatWillGoToDeadLetter"
}

class ActorInitializationExceptionActor extends Actor with ActorLogging{
  override def preStart={
    throw new Exception("Some random exception")
  }
  def receive={
    case _=>
  }
}

Running the ActorInitializationExceptionApp would generate a ActorInitializationException (duh!!) and then move all the messages into the message queue of the deadLetters Actor:

Log

[ERROR] [11/10/2014 16:08:46.569] [ActorInitializationException-akka.actor.default-dispatcher-2] [akka://ActorInitializationException/user/initializationExceptionActor] Some random exception
akka.actor.ActorInitializationException: exception during creation
	at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
...
...
Caused by: java.lang.Exception: Some random exception
	at me.rerun.akkanotes.supervision.ActorInitializationExceptionActor.preStart(ActorInitializationExceptionApp.scala:17)
...
...

[INFO] [11/10/2014 16:08:46.581] [ActorInitializationException-akka.actor.default-dispatcher-4] [akka://ActorInitializationException/user/initializationExceptionActor] Message [java.lang.String] from Actor[akka://ActorInitializationException/deadLetters] to Actor[akka://ActorInitializationException/user/initializationExceptionActor#-1290470495] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    
2. ActorKilledException => Stop

When the Actor was killed using the Kill message, then it would throw an ActorKilledException. The default strategy would stop the child Actor if it throws the exception. At first, it seems that there's no point in stopping an already killed Actor. However, consider this :

  1. ActorKilledException would just be propagated to the supervisor. What about the lifecycle watchers or deathwatchers of this Actor that we saw during DeathWatch. The watchers won't know anything until the Actor is Stopped.

  2. Sending a Kill on an Actor would just affect that particular actor which the supervisor knows. However, handling that with Stop would suspend the mailbox of that Actor, suspends the mailboxes of child actors, stops the child actors, sends a Terminated to all the child actor watchers, send a Terminated to all the immediate failed Actor's watchers and finally stop the Actor itself. (Wow, that's pretty awesome !!)

package me.rerun.akkanotes.supervision

import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Kill

object ActorKilledExceptionApp extends App{

  val actorSystem=ActorSystem("ActorKilledExceptionSystem")
  val actor=actorSystem.actorOf(Props[ActorKilledExceptionActor])
  actor!"something"
  actor!Kill
  actor!"something else that falls into dead letter queue"
}

class ActorKilledExceptionActor extends Actor with ActorLogging{
  def receive={
    case message:String=> log.info (message)
  }
}

Log

The logs just say that once the ActorKilledException comes in, the supervisor stops that actor and then the messages go into the queue of deadLetters

INFO  m.r.a.s.ActorKilledExceptionActor - something

ERROR akka.actor.OneForOneStrategy - Kill
akka.actor.ActorKilledException: Kill

INFO  akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ActorKilledExceptionSystem/deadLetters] to Actor[akka://ActorKilledExceptionSystem/user/$a#-1569063462] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.


3. DeathPactException => Stop

From DeathWatch, you know that when an Actor watches over a child Actor, it is expected to handle the Terminated message in its receive. What if it doesn't? You get the DeathPactException

DeathPactException maps to Stop

The code shows that the Supervisor watches the child actor after creation but doesn't handle the Terminated message from the child.

package me.rerun.akkanotes.supervision

import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Kill
import akka.actor.PoisonPill
import akka.actor.Terminated

object DeathPactExceptionApp extends App{

  val actorSystem=ActorSystem("DeathPactExceptionSystem")
  val actor=actorSystem.actorOf(Props[DeathPactExceptionParentActor])
  actor!"create_child" //Throws DeathPactException
  Thread.sleep(2000) //Wait until Stopped
  actor!"someMessage" //Message goes to DeadLetters
  
}

class DeathPactExceptionParentActor extends Actor with ActorLogging{
  
  def receive={
    case "create_child"=> {
      log.info ("creating child")
      val child=context.actorOf(Props[DeathPactExceptionChildActor])
      context.watch(child) //Watches but doesnt handle terminated message. Throwing DeathPactException here.
      child!"stop"
    }
    case "someMessage" => log.info ("some message")
    //Doesnt handle terminated message
    //case Terminated(_) =>
  }
}

class DeathPactExceptionChildActor extends Actor with ActorLogging{
  def receive={
    case "stop"=> {
      log.info ("Actor going to stop and announce that it's terminated")
      self!PoisonPill
    }
  }
}

Log

The logs tell us that the DeathPactException comes in, the supervisor stops that actor and then the messages go into the queue of deadLetters

INFO  m.r.a.s.DeathPactExceptionParentActor - creating child

INFO  m.r.a.s.DeathPactExceptionChildActor - Actor going to stop and announce that it's terminated

ERROR akka.actor.OneForOneStrategy - Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated
akka.actor.DeathPactException: Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated

INFO  akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://DeathPactExceptionSystem/deadLetters] to Actor[akka://DeathPactExceptionSystem/user/$a#-1452955980] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.


4. Exception => Restart

For all other Exceptions, the default Directive is to Restart the Actor. Check the following app. Just to prove that the Actor is restarted, OtherExceptionParentActor makes the child throw an exception and immediately sends a message. The message reaches the mailbox and when the the child actor restarts, the message gets processed. Nice !!

OtherException maps to Stop

package me.rerun.akkanotes.supervision

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Stop

object OtherExceptionApp extends App{

  val actorSystem=ActorSystem("OtherExceptionSystem")
  val actor=actorSystem.actorOf(Props[OtherExceptionParentActor])
  actor!"create_child"
  
}

class OtherExceptionParentActor extends Actor with ActorLogging{
      
  def receive={
    case "create_child"=> {
      log.info ("creating child")
      val child=context.actorOf(Props[OtherExceptionChildActor])
      
      child!"throwSomeException"
      child!"someMessage"
    }
  }
}

class OtherExceptionChildActor extends akka.actor.Actor with ActorLogging{
  
  override def preStart={
    log.info ("Starting Child Actor")
  }
  
  def receive={
    case "throwSomeException"=> {
      throw new Exception ("I'm getting thrown for no reason") 
    }
    case "someMessage" => log.info ("Restarted and printing some Message")
  }
  
  override def postStop={
    log.info ("Stopping Child Actor")
  }
  
}

Log

The logs of this program is pretty neat.

  1. The exception gets thrown. We see the trace
  2. The child restarts - Stop and Start gets called (we'll see about the preRestart and postRestart callbacks soon)
  3. The message that was send to the Child actor before restart is processed.
INFO  m.r.a.s.OtherExceptionParentActor - creating child

INFO  m.r.a.s.OtherExceptionChildActor - Starting Child Actor

ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason

java.lang.Exception: I'm getting thrown for no reason
	at me.rerun.akkanotes.supervision.OtherExceptionChildActor$$anonfun$receive$2.applyOrElse(OtherExceptionApp.scala:39) ~[classes/:na]
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.4.jar:na]
...
...	

INFO  m.r.a.s.OtherExceptionChildActor - Stopping Child Actor

INFO  m.r.a.s.OtherExceptionChildActor - Starting Child Actor

INFO  m.r.a.s.OtherExceptionChildActor - Restarted and printing some Message

Escalate and Resume

We saw examples of Stop and Restart via the defaultStrategy. Now, let's have a quick look at the Escalate.

Resume just ignores the exception and proceeds processing the next message in the mailbox. It's more like catching the exception and doing nothing about it. Awesome stuff but not a lot to talk about there.

Escalating generally means that the exception is something critical and the immediate supervisor would not be able to handle it. So, it asks help from its supervisor. Let's take an example.

Consider three Actors - EscalateExceptionTopLevelActor, EscalateExceptionParentActor and EscalateExceptionChildActor. If the child actor throws an exception and if the parent level actor could not handle it, it could Escalate it to the Top level Actor. The Top level actor could choose to react with any of the Directives. In our example, we just Stop. Stop would stop the immediate child (which is the EscalateExceptionParentActor). As you know, when a Stop is executed on an Actor, all its children would also be stopped before the Actor itself is stopped.

Escalate

package me.rerun.akkanotes.supervision

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.SupervisorStrategy.Stop
import akka.actor.actorRef2Scala

object EscalateExceptionApp extends App {

  val actorSystem = ActorSystem("EscalateExceptionSystem")
  val actor = actorSystem.actorOf(Props[EscalateExceptionTopLevelActor], "topLevelActor")
  actor ! "create_parent"
}

class EscalateExceptionTopLevelActor extends Actor with ActorLogging {

  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception => {
      log.info("The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.")
      Stop //Stop will stop the Actor that threw this Exception and all its children
    }
  }

  def receive = {
    case "create_parent" => {
      log.info("creating parent")
      val parent = context.actorOf(Props[EscalateExceptionParentActor], "parentActor")
      parent ! "create_child" //Sending message to next level
    }
  }
}

class EscalateExceptionParentActor extends Actor with ActorLogging {

  override def preStart={
    log.info ("Parent Actor started")
  }
  
  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception => {
      log.info("The exception is ducked by the Parent Actor. Escalating to TopLevel Actor")
      Escalate
    }
  }

  def receive = {
    case "create_child" => {
      log.info("creating child")
      val child = context.actorOf(Props[EscalateExceptionChildActor], "childActor")
      child ! "throwSomeException"
    }
  }

  override def postStop = {
    log.info("Stopping parent Actor")
  }
}

class EscalateExceptionChildActor extends akka.actor.Actor with ActorLogging {
  
  override def preStart={
    log.info ("Child Actor started")
  }
  
  def receive = {
    case "throwSomeException" => {
      throw new Exception("I'm getting thrown for no reason.")
    }
  }
  override def postStop = {
    log.info("Stopping child Actor")
  }
}

Log

As you could see from the logs,

  1. The child actor throws exception.
  2. The immediate supervisor (EscalateExceptionParentActor) escalates that exception to its supervisor (EscalateExceptionTopLevelActor)
  3. The resultant directive from EscalateExceptionTopLevelActor is to Stop the Actor. As a sequence, the child actors gets stopped first.
  4. The parent actor gets stopped next (only after the watchers have been notified)
INFO  m.r.a.s.EscalateExceptionTopLevelActor - creating parent

INFO  m.r.a.s.EscalateExceptionParentActor - Parent Actor started

INFO  m.r.a.s.EscalateExceptionParentActor - creating child

INFO  m.r.a.s.EscalateExceptionChildActor - Child Actor started

INFO  m.r.a.s.EscalateExceptionParentActor - The exception is ducked by the Parent Actor. Escalating to TopLevel Actor

INFO  m.r.a.s.EscalateExceptionTopLevelActor - The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.

ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason.
java.lang.Exception: I'm getting thrown for no reason.
	at me.rerun.akkanotes.supervision.EscalateExceptionChildActor$$anonfun$receive$3.applyOrElse(EscalateExceptionApp.scala:71) ~[classes/:na]
	...
	...
    
INFO  m.r.a.s.EscalateExceptionChildActor - Stopping child Actor

INFO  m.r.a.s.EscalateExceptionParentActor - Stopping parent Actor

Please note that whatever directive that was issued would only apply to the immediate child that escalated. Say, if a Restart is issued at the TopLevel, only the Parent would be restarted and anything in its constructor/preStart would be executed. If the children of the Parent actor was created in the constructor, they would also be created. However, children that were created through messages to the Parent Actor would still be in the Terminated state.

TRIVIA

Actually, you could control whether the preStart gets called at all. We'll see about this in the next minor write-up. If you are curious, just have a look at the postRestart method of the Actor

def postRestart(reason: Throwable): Unit = {
  preStart()
}

Code

As always, code is on github

(my .gitignore wasn't setup right for this project. will fix it today. sorry)