May 23, 2016 · akka scala fsm

Akka Notes - Finite State Machines - 2

In the first part of notes on Akka FSM, we saw the basics of Akka FSM and the outline of the Coffee vending machine that we planned to build - the structure of the Actor and a list of messages we pass to the Actor. In this second and final part, we will go ahead and implement each of these States.

Recap

As a quick recap, let's look at the structure of the FSM and the messages that can be sent to it.

States and Data

The three States of the FSM and the Data being sent across the States are :


object CoffeeMachine {

  sealed trait MachineState
  case object Open extends MachineState
  case object ReadyToBuy extends MachineState
  case object PoweredOff extends MachineState

  case class MachineData(currentTxTotal: Int, costOfCoffee: Int, coffeesLeft: Int)

}

Messages

The Vendor and User Interaction messages that we send to the FSM are :


object CoffeeProtocol {

  trait UserInteraction
  trait VendorInteraction

  case class   Deposit(value: Int) extends UserInteraction
  case class   Balance(value: Int) extends UserInteraction
  case object  Cancel extends UserInteraction
  case object  BrewCoffee extends UserInteraction
  case object  GetCostOfCoffee extends UserInteraction

  case object  ShutDownMachine extends VendorInteraction
  case object  StartUpMachine extends VendorInteraction
  case class   SetNumberOfCoffee(quantity: Int) extends VendorInteraction
  case class   SetCostOfCoffee(price: Int) extends VendorInteraction
  case object  GetNumberOfCoffee extends VendorInteraction

  case class   MachineError(errorMsg:String)

}

Structure of FSM Actor

Here's the overall structure that we saw in Part 1


class CoffeeMachine extends FSM[MachineState, MachineData] {

  //What State and Data must this FSM start with (duh!)
  startWith(Open, MachineData(..))

  //Handlers of State
  when(Open) {
  ...
  ...

  when(ReadyToBuy) {
  ...
  ...

  when(PoweredOff) {
  ...
  ...

  //fallback handler when an Event is unhandled by none of the States.
  whenUnhandled {
  ...
  ...

  //Do we need to do something when there is a State change?
  onTransition {
    case Open -> ReadyToBuy => ...
  ...
  ...
}

Initial State

As with any State Machine, an FSM needs an initial state to start with. This can be declared inside Akka FSM in a very intuitive method named startWith. The startWith accepts two arguments - the initial state and the initial data.


class CoffeeMachine extends FSM[MachineState, MachineData] {

  startWith(Open, MachineData(currentTxTotal = 0, costOfCoffee =  5, coffeesLeft = 10))

...
...

The above code just says that the FSM's initial state is Open and the initial data while the Coffee machine is opened is MachineData(currentTxTotal = 0, costOfCoffee = 5, coffeesLeft = 10).

Since the machine has just started, the vending machine starts with a clean slate. It had no interaction with any user yet and therefore the current showing balance for this transaction is 0. The price of the coffee is set at $5 and the total number of coffees that the machine could vend in total is 10. Once the coffees has vended 10 coffees and is left with 0, the machine shuts down.

Implementing the States

Ah, Finally !!

I felt that the easiest way to look at the interactions with the vending machine at different states is by grouping the interactions, write testcases around it and accompany that with the implementation in the FSM.

If you are referring to the github code, all tests are in the CoffeeSpec and the FSM is the CoffeeMachine

All the following tests are wrapped inside the CoffeeSpec test class whose declaration goes like :

class CoffeeSpec extends TestKit(ActorSystem("coffee-system")) with MustMatchers with FunSpecLike with ImplicitSender 

Setting and Getting Price of coffee

As we saw above, the MachineData is initiated at $5 dollars per coffee and with a capacity of 10 coffees. This is just an initial state. The Vendor must have the capacity to set the price of the coffee and the capacity of the machine at any point of time.

Setting of price is achieved by sending the SetCostOfCoffee message to the Actor. We should also have the ability to get the price of coffee. This is done by using the GetCostOfCoffee message to which the Machine responds with the currently set price.

Testcase
describe("The Coffee Machine") {

   it("should allow setting and getting of price of coffee") {
      val coffeeMachine = TestActorRef(Props(new CoffeeMachine()))
      coffeeMachine ! SetCostOfCoffee(7)
      coffeeMachine ! GetCostOfCoffee
      expectMsg(7)
    }
...
...
...
Implementation

Like we discussed in Part 1, every message that is being sent to the FSM is received and wrapped in an Event class which also wraps around the MachineData:

 when(Open) {
     case Event(SetCostOfCoffee(price), _) => stay using stateData.copy(costOfCoffee = price)
    case Event(GetCostOfCoffee, _) => sender ! (stateData.costOfCoffee); stay()
   ...
   ...
  }
}

There are a few words which are new in the above code - stay, using and stateData. Let's look at them in detail.

stay and goto

The idea is that each of the case blocks in a state must return a State. This could either be done using stay which just means that at the end of processing this message (SetCostOfCoffee or GetCostOfCoffee), the CoffeeMachine remains in the same State, which is Open in our case.

The goto, on the other hand transitions to a different State. We'll see how it is done while discussing the Deposit message.

Not surprisingly, check out the implementation of the stay function

  final def stay(): State = goto(currentState.stateName)
using

As you might already guessed, the using function allows us to pass modified data to the next state. In case of the SetCostOfCoffee message, we set the costOfCoffee field of the MachineData to the incoming price wrapped inside the SetCostOfCoffee. Since State is a case class (immutability is strongly advised unless you have fun debugging at odd hours), we do a copy.

stateData

The stateData is just a function that gives us a handle to the data of the FSM, which is the MachineData itself. So, the following blocks of code are equivalent

case Event(GetCostOfCoffee, _) => sender ! (stateData.costOfCoffee); stay()
case Event(GetCostOfCoffee, machineData) => sender ! (machineData.costOfCoffee); stay()

The implementation of the maximum number of coffees to be dispensed using GetNumberOfCoffee and SetNumberOfCoffee is almost the same as the setting and getting the price itself. Let's skip that and go for the interesting part - Buying coffee.

Buying coffee

So, the coffee enthusiast deposits money for the coffee but we can't allow the machine to dispense coffee until he has entered the cost of a coffee. Also, if he has given extra cash, we'll have to give him the balance. So, the various cases goes like this :

  1. Until the User deposits money to cover a coffee, we'll keep track of the cumulative amount he has deposited and stay in the Open state.
  2. Once the cumulative cash exceeds the price of a coffee, we'll transition to the ReadyToBuy state and allow him to buy coffee.
  3. At this ReadyToBuy state, he could change his mind and Cancel the transaction during which all his cumulative Balance is returned.
  4. If he wishes to drink the coffee, he sends the machine a BrewCoffee message instead during which we dispense the coffee and give him back the Balance money as well. (Actually, in our code, we don't dispense the coffee. We just subtract the price of the coffee from his deposit and give him the balance. Such a rip off !!)

Let's take each of the above cases

Case 1 - User deposits cash but falls short of the price of a coffee
Testcase

The testcase starts by setting the cost of the coffee to the $5 and the total number of coffees in the machine to be 10. We then Deposit $2 which is less than the price of the coffee and check if the Machine is in the Open state and the total number of coffees in the machine remains at 10.

 it("should stay at Transacting when the Deposit is less then the price of the coffee") {
      val coffeeMachine = TestActorRef(Props(new CoffeeMachine()))
      coffeeMachine ! SetCostOfCoffee(5)
      coffeeMachine ! SetNumberOfCoffee(10)
      coffeeMachine ! SubscribeTransitionCallBack(testActor)

      expectMsg(CurrentState(coffeeMachine, Open))

      coffeeMachine ! Deposit(2)

      coffeeMachine ! GetNumberOfCoffee

      expectMsg(10)
    }

So, how exactly did we ensure that the machine is in the Open state?

Each FSM can handle a special message called FSM.SubscribeTransitionCallBack(callerActorRef) which enables the caller to be notified of any State transitions. The first notification that gets sent on subscription is the CurrentState, which tells us what State the FSM is currently in. This is followed by several Transition messages when that happens.

Implementation

So, we add up deposit to the cumulative transaction total and stay in Open state waiting for more Deposit

when(Open) {
...
...
  case Event(Deposit(value), MachineData(currentTxTotal, costOfCoffee, coffeesLeft)) if (value + currentTxTotal) < stateData.costOfCoffee => {
        val cumulativeValue = currentTxTotal + value
        stay using stateData.copy(currentTxTotal = cumulativeValue)
  }
Case 2 and 4 - User deposits amount that covers the price of coffee
Testcase 1 - Deposit equal to the price of coffee

Our testcase bootstraps the machine, confirms if the current state is Open and then deposits $5 which is exactly the price of the coffee. We then assert that the machine has transitioned from Open to ReadyToBuy by way of expecting a Transition message which gives us information about the from and the to states of the coffee machine. In the first case, it is the transition from Open to ReadyToBuy.

We then go further and ask the machine to BrewCoffee during which we expect a transition again from ReadyToBuy to Open upon dispensing the coffee. Finally an assertion is made against the remaining number of coffees in the machine (which is 9 now).


it("should transition to ReadyToBuy and then Open when the Deposit is equal to the price of the coffee") {
      val coffeeMachine = TestActorRef(Props(new CoffeeMachine()))
      coffeeMachine ! SetCostOfCoffee(5)
      coffeeMachine ! SetNumberOfCoffee(10)
      coffeeMachine ! SubscribeTransitionCallBack(testActor)

      expectMsg(CurrentState(coffeeMachine, Open))

      coffeeMachine ! Deposit(5)

      expectMsg(Transition(coffeeMachine, Open, ReadyToBuy))

      coffeeMachine ! BrewCoffee
      expectMsg(Transition(coffeeMachine, ReadyToBuy, Open))

      coffeeMachine ! GetNumberOfCoffee

      expectMsg(9)
    }
Testcase 2 - Deposit greater than the price of the coffee

The second test case is 90% similar to the first testcase except that we deposit cash in increments that is greater than the price of the coffee ($6). Since we set the price of coffee to be $5, we now expect a Balance message with value $1

it("should transition to ReadyToBuy and then Open when the Deposit is greater than the price of the coffee") {
      val coffeeMachine = TestActorRef(Props(new CoffeeMachine()))
      coffeeMachine ! SetCostOfCoffee(5)
      coffeeMachine ! SetNumberOfCoffee(10)
      coffeeMachine ! SubscribeTransitionCallBack(testActor)

      expectMsg(CurrentState(coffeeMachine, Open))

      coffeeMachine ! Deposit(2)
      coffeeMachine ! Deposit(2)
      coffeeMachine ! Deposit(2)

      expectMsg(Transition(coffeeMachine, Open, ReadyToBuy))

      coffeeMachine ! BrewCoffee

      expectMsgPF(){
        case Balance(value)=>value==1
      }

      expectMsg(Transition(coffeeMachine, ReadyToBuy, Open))

      coffeeMachine ! GetNumberOfCoffee

      expectMsg(9)
    }
Implementation

The implementation is much simpler than the test cases itself. If the deposit amount is greater than or equal to the cost of coffee, then we goto ReadyToBuy state using the accumulated amount.

when(Open){
...
...
 case Event(Deposit(value), MachineData(currentTxTotal, costOfCoffee, coffeesLeft)) if (value + currentTxTotal) >= stateData.costOfCoffee => {
      goto(ReadyToBuy) using stateData.copy(currentTxTotal = currentTxTotal + value)
    }

Once transitioned to the ReadyToBuy state, when the user sends a BrewCoffee, we check if there is a balance to be dispensed. If not, we just transition to Open state after subtracting one coffee from the total number of coffees. Else, we disburse the balance and transition to Open state after subtracting the number of coffees. (Like I said earlier, we don't actually dispense coffee in this example)

  when(ReadyToBuy) {
    case Event(BrewCoffee, MachineData(currentTxTotal, costOfCoffee, coffeesLeft)) => {
      val balanceToBeDispensed = currentTxTotal - costOfCoffee
      logger.debug(s"Balance is $balanceToBeDispensed")
      if (balanceToBeDispensed > 0) {
        sender ! Balance(value = balanceToBeDispensed)
        goto(Open) using stateData.copy(currentTxTotal = 0, coffeesLeft = coffeesLeft - 1)
      }
      else goto(Open) using stateData.copy(currentTxTotal = 0, coffeesLeft = coffeesLeft - 1)
    }
  }

That's it !! We've covered the juice of the program.

Case 3 - User wishes to Cancel the transaction

Actually, the User should be in a position to Cancel the transaction at any point, whatever state he is in. Like we discussed in Part 1, the perfect place to hold these kind of generic messages is in the whenUnhandled block. We should also make sure that if the user has deposited some cash before canceling, we should give it back to them.

Implementation
  whenUnhandled {
  ...
  ...
    case Event(Cancel, MachineData(currentTxTotal, _, _)) => {
      sender ! Balance(value = currentTxTotal)
      goto(Open) using stateData.copy(currentTxTotal = 0)
    }
  }

Testcase

The testcase is simply the same as the one we saw above except that the balance issued upon cancellation is the cumulative deposit.

 it("should transition to Open after flushing out all the deposit when the coffee is canceled") {
      val coffeeMachine = TestActorRef(Props(new CoffeeMachine()))
      coffeeMachine ! SetCostOfCoffee(5)
      coffeeMachine ! SetNumberOfCoffee(10)
      coffeeMachine ! SubscribeTransitionCallBack(testActor)

      expectMsg(CurrentState(coffeeMachine, Open))

      coffeeMachine ! Deposit(2)
      coffeeMachine ! Deposit(2)
      coffeeMachine ! Deposit(2)

      expectMsg(Transition(coffeeMachine, Open, ReadyToBuy))

      coffeeMachine ! Cancel

      expectMsgPF(){
        case Balance(value)=>value==6
      }

      expectMsg(Transition(coffeeMachine, ReadyToBuy, Open))

      coffeeMachine ! GetNumberOfCoffee

      expectMsg(10)
    }

Code

I didn't want to bore you to death and took the liberty of skipping the explanation for ShutDownMachine message and the PoweredOff state but if you are looking forward to an explanation for them, please leave a comment.

As always, the code is available on github.