Akka tips

50 %
50 %
Information about Akka tips

Published on July 13, 2016

Author: raymondroestenburg

Source: slideshare.net

1. hAkking TIPS 'N TRICKS12th of July 2016

2. HANDS UP

3. Akka Actor IS A POWER TOOL1 1 power tools are fun, but fun can be dangerous!

4. WARMUP QUIZ

5. WHATS WRONG WITH THIS CODE

6. object MyActor { case class Request(offset: Long, nrBytes: Int) case class Response(bytes: ByteString) } class MyActor(backend: Backend) extends Actor { def receive = { case Request(offset, nrBytes) => { val response = backend.get(offset, nrBytes) response } } } // in AnotherActor, which has a ref to MyActor myActorRef ! Request(1, 20)

7. A. Works (AnotherActor receives a Response) B. Works on my machine C. Compiler error D. "Nothing" happens

8. D. "Nothing" happens

9. object MyActor { case class Request(offset: Long, nrBytes: Int) case class Response(bytes: ByteString) } class MyActor(backend: Backend) extends Actor { def receive = { case Request(offset, nrBytes) => { backend.get(offset, nrBytes).foreach { bytes => sender() ! Response(bytes) } } } }

10. A. Works B. Works on my machine C. Compiler error

11. trait Backend { def get(offset: Long, nrBytes: Int): Future[ByteString] }

12. B. Works on my machine (as in: DOES NOT work)

13. close over sender

14. import akka.pattern.pipe class MyActor(backend: Backend) extends Actor { def receive = { case Request(offset, nrBytes) => { pipe(backend.get(offset, nrBytes).map(Response)) to sender() } } } Would be better.

15. ACTOR crashes. HOW FAST DOES IT RESTART?

16. A. Configurable in supervisor strategy B. As fast as possible

17. B. As fast as possible This is not always acceptable. (fast re-connect)

18. WHAT HAPPENS IF YOU WATCH AN ACTOR THAT'S ALREADY DEAD?

19. class Dodo extends Actor { self ! PoisonPill def receive = { case _ => } }

20. class Watcher(watchee: ActorRef) extends Actor { context.watch(watchee) def receive = { case Terminated(`watchee`) => println(s"$watchee is dead!") } }

21. val watchee = system.actorOf(Props(new Dodo), "dodo") val watcher = system.actorOf(Props(new Watcher(watchee)), "watcher")

22. A. DeathPactException B. Prints to console C. DeadLetters D. Works on my machine E. "Nothing" happens

23. B. Prints to console. (Watcher receives Terminated) Which is great!

24. END WARMUP QUIZ

25. (Semi-random) TIPS 'N TRICKS

26. STRUCTURAL

27. DEFINE MESSAGES IN companion DO NOT import Companion._

28. CREATE ACTOR FROM COMPANION 3 3 Standard practice.

29. object MyActor { def props(host: String, port: Int) = Props(new MyActor(host, port)) } Standard practice.

30. WHAT ABOUT THE ACTOR'S name?

31. ONE CHILD ACTOR: ONE NAME MANY CHILD ACTORS: UNIQUE NAMES

32. class MyActor(host: String, port: Int) extends Actor { def receive = { // .. more code } }

33. trait ActorContext extends ActorRefFactory { ... } abstract class ActorSystem extends ActorRefFactory { ... } trait Actor { // ... implicit val context: ActorContext }

34. object MyActor { def create( host: String, port: Int )(implicit factory: ActorRefFactory): ActorRef = factory.actorOf( Props(new MyActor(args)), s"my-actor-${UUID.randomUUID}" ) }

35. object MyActor { def create( host: String, port: Int )(implicit factory: ActorRefFactory): ActorRef = factory.actorOf(Props(new MyActor(args)), name) val name = "single-my-actor" } So you can do: context.child(MyActor.name).getOrElse(MyActor.create(host, port))

36. A TERMINATOR PREVENTS YOUR APP FROM BECOMING A zombie

37. RULE 'EM' ALL: Application Supervisor KILL 'EM ALL: Terminator

38. object MyApp { def main(args: Array[String]): Unit = { val system = ActorSystem("my-app") val _ = Terminator.supervise( system, MyApplicationSupervisor.create() ) } } create has a second parameter list that has a factory parameter. So what is passed in here is a ActorRefFactory => ActorRef

39. TERMINATOR

40. object Terminator { case object GetActorRef def supervise( system: ActorSystem, newActor: ActorRefFactory ActorRef ): ActorRef = system.actorOf(Props(new Terminator(newActor, stopOnThrowable)), terminatorName()) def supervise( system: ActorSystem, strategy: SupervisorStrategy, newActor: ActorRefFactory ActorRef ): ActorRef = { system.actorOf(Props(new Terminator(newActor, strategy)), terminatorName()) } def terminatorName(): String = s"terminator-${UUID.randomUUID}" val stopOnThrowable = OneForOneStrategy() { case e: Throwable Stop } }

41. class Terminator( newActor: ActorRefFactory ActorRef, override val supervisorStrategy: SupervisorStrategy ) extends Actor with ActorLogging { val actor = newActor(context) context.watch(actor) def receive = { case GetActorRef sender() ! actor case Terminated(`actor`) log.info(s"${actor.path} has terminated, shutting down.") context.stop(self) context.system.terminate() case msg actor forward msg } }

42. Application Supervisor 2 2 The top level application actor

43. class MyApplicationSupervisor extends Actor { import SupervisorStrategy._ def decider: Decider = { case _: MyProcess.StreamingException Restart case _: MyProcess.SomethingReallyCrapException Escalate } override def supervisorStrategy = OneForOneStrategy()(decider orElse defaultDecider)

44. MESSAGING

45. DEBUGGING ASKIF YOU NEED MORE CONTEXT THAN JUST THE MESSAGE TYPE

46. trait RequestSupport { def request[T: ClassTag]( actor: ActorRef, req: Any, printRequest: Any String = _.toString )(implicit ec: ExecutionContext, timeout: Timeout): Future[T] = { val err = s"Request ${printRequest(req)} to ${actor.path} timed out." actor.ask(req).mapTo[T].recoverWith { case cause: AskTimeoutException Future.failed(new RequestTimeoutException(req, err, cause)) } } }

47. object RequestSupport extends RequestSupport class RequestTimeoutException(request: Any, msg: String, cause: Exception) extends Exception(msg, cause)

48. val f: Future[Response] = request[Response](actor, Request(something))

49. SENDING TO AN Option[ActorRef]? DON'T blackhole

50. case msg child match { case Some(c) c.forward(msg) case None context.system.deadLetters.forward(msg) } instead of case msg child.map(_ forward msg)

51. RESILIENCE

52. ASYNC RESOURCE

53. QUIZ

54. AGAIN, WHAT IS WRONG WITH THE code

55. class MyActor(uri: String) extends Actor { self ! Init def receive = init def init: Receive = { case Init => pipe(connect(uri)) to self case Connected(connection) => context.become(connected(connection)) } def connected(connection: Conn): Receive = { case Request(offset, nrBytes) => pipe(connection.get(offset, nrBytes)) to sender() } // implementation left out def connect(uri: String): Future[Connected] = ... }

56. ▸ It only connects once ▸ Before connected, messages are lost ▸ Connection failure not handled ▸ Connects when the actor is created

57. def init: Receive = { case Init => pipe(connect(uri)) to self case Connected(connection) => context.become(connected(connection)) case Status.Failure(e) => self ! Init case m => self ! m } //... def connected(connection: Conn): Receive = { case Request(offset, nrBytes) => pipe(connection.get(offset, nrBytes)) to sender() }

58. ▸ Sending to self re-orders messages ▸ Reconnect is too fast ▸ Which Failure is it? ▸ Only reconnecting when initially not connected ▸ Connected state is not really 'connected'

59. class MyActor(backend: Backend, reconnectDelay: FiniteDuration) extends Actor with Stash { self ! Init

60. def receive = init def init: Receive = { case Init => pipe( connect(uri) .recover { case e: Throwable => NotConnected(e) } ) to self case Connected(connection) => context.become(connected(connection)) unstashAll() case NotConnected(e) => context.system.scheduler.scheduleOnce(reconnectDelay, self, Init) case _ => stash() }

61. def connected(connection: Conn): Receive = { case Request(offset, nrBytes) => pipe( connection.get(offset, nrBytes).recover { case e: Throwable => self ! NotConnected(e) ConnectionError(e) } ) to sender() case NotConnected(e) => context.become(init) self ! Init connection.close }

62. ▸ Connects when the actor is created ▸ Scheduled reconnect, no back-off ▸ Fatal exceptions should not be caught (case e: Throwable)

63. BACK-OFF supervision

64. In parent / supervisor val supervisorProps = BackoffSupervisor.propsWithSupervisorStrategy( MyActor.props(host, port), MyActor.name(), minBackoff, maxBackoff, ManualReset, randomDelayFactor, SupervisorStrategy.stoppingStrategy) context.actorOf(supervisorProps, supervisorName)

65. Reset when a connection is made (in child). def uninitialized: Receive = { case Init connect(uri) case Connected(con) context.become(connected(con)) context.parent ! BackoffSupervisor.Reset

66. CONNECT ONFIRST MESSAGE RECEIVED

67. def receive = waiting def waiting: Receive = { case _: Request // a Request triggers connecting stash() self ! Init context.become(init) case _ stash() }

68. def init: Receive = { case Init val _ = pipe( connect(uri) .recover { case e: Throwable NotConnected(e) } ) to self case Connected(conn) context.become(connected(con)) unstashAll() case NotConnected(e) log.error(e, s"Could not connect.") throw e // let supervisor deal with it. (also fatal errors) case _ stash() }

69. def connected(con: Connection): Receive = { case Request(offset, bytes) val _ = pipe( con.get(offset, bytes).recover { case e: Throwable self ! NotConnected(e) ConnectionError(e) } ) to sender() case NotConnected(e) => throw e // let supervisor deal with it }

70. SPAWNED ACTORSReceiveTimeout and PoisonPill

71. ACTOR FOR A 'SESSION' ENCOUNTERS ERROR WAITS FOR A CLOSE CLOSE MIGHT NOT ARRIVE

72. class CreateTopicRequest(receiver: ActorRef, topic: Topic) extends Actor { context.setReceiveTimeout(someTimeout) self ! CreateTopic(topic) def receive { case CreateTopic(topic) => // start some complicated flow case TopicCreated => // .. more code .. case CreateTopicFailed => // .. more code .. case ReceiveTimeout => receiver ! Status.Failure(new Exception("Timeout creating topic.")) self ! PoisonPill } }

73. CAPTURE sender IN PROTOCOL MESSAGES

74. var streamBackoff = Backoff(minBackoff, maxBackoff, randomDelayFactor) def receive = { case Protocol.Start(earliestOffset, lastOffset) self ! Protocol.NextChunk(earliestOffset, lastOffset, sender(), streamId) case nextChunk: Protocol.NextChunk handleNextChunk(nextChunk) case Protocol.ReadResponseForChunk(readResponse, nextChunk) handleReadResponse(readResponse, nextChunk) case Protocol.RetryChunk(nextChunk) self ! Protocol.NextBackoff val _ = context.system.scheduler.scheduleOnce(streamBackoff.waitTime, self, nextChunk) case Protocol.NextBackoff streamBackoff = streamBackoff.nextBackoff }

75. def transcoding(streamer: ActorRef): Receive = { // lots of code case _: NoData[K, V] val cancellable = context.system.scheduler.schedule(0 seconds, 1 second, responder, WaitMessage) context.become(waitingForData(streamer, cancellable)) case ev: ConnectionClosed context.become(closing) context.setReceiveTimeout(duration) } def waitingForData(streamer: ActorRef, cancellable: Cancellable): Receive = { case _: NoData[K, V] // nothing to do, scheduler is still sending waits. case m stash() cancellable.cancel() context.become((transcoding(streamer))) unstashAll() } def closing: Receive = { case StreamClosed | StreamRequestEnd self ! PoisonPill case ReceiveTimeout self ! PoisonPill case _ }

76. RECAP

77. Companion.create Terminator RequestSupport DontBlackhole PipeToSelf CrashOnFutureFailure LazyAsyncResource CaptureSenderInProtocol ReceiveTimeout

78. ENDhttp://manning.com/roestenburg DISCOUNT CODE 39ROESTENBURG

Add a comment