Akka persistence webinar

55 %
45 %
Information about Akka persistence webinar
Technology

Published on March 19, 2014

Author: patriknw

Source: slideshare.net

Akka Persistence Webinar by Patrik Nordwall
 @patriknw

opt-in at-least-once delivery semantics between actors recover application state after a crash

Akka 2.3.0 Release 
 "com.typesafe.akka" %% "akka-persistence-experimental" % "2.3.0"
 <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-persistence-experimental_2.10</artifactId> <version>2.3.0</version> </dependency>

Martin Krasser
 Twitter: mrt1nz
 Github: krasserm Eventsourced
 
 akka-persistence
 September 2013 - February 2014 
 contractor for Typesafe Awesome work, Martin!

storing structure * Purchase Order Line Item Shipping Information

storing state transitions Cart Created Added 2 Socks Added 2 Shirts Shipping Info Added

Akka  Persistence  Webinar Domain Events • things that have completed, facts • immutable • verbs in past tense • CustomerRelocated • CargoShipped • InvoiceSent “State transitions are an important part of our problem space and should be modeled within our domain.”   Greg Young, 2008

Akka  Persistence  Webinar Benefits • Bullet-proof auditing and historical tracing • Support future ways of looking at data • Performance and scalability • Testability • Reconstruct production scenarios • No object-relational impedance mismatch • Nice fit with actors

Akka  Persistence  Webinar Command Sourcing Event Sourcing write-ahead-log derive events from a command same behavior during recovery as normal operation external interaction can be problematic only state-changing behavior during recovery persisted before validation events cannot fail allows retroactive changes to the business logic fixing the business logic will not affect persisted events naming: represent intent, imperative naming: things that have completed, verbs in past tense

Akka  Persistence  Webinar Consistency boundary • An actor is a consistency boundary • DDD Aggregate • No distributed transactions • eventually consistent • compensating actions

Akka  Persistence  Webinar Life beyond Distributed Transactions: an Apostate’s Opinion Position Paper Pat Helland “In general, application developers simply do not implement large scalable applications assuming distributed transactions.”   Pat Helland http://www-­‐db.cs.wisc.edu/cidr/cidr2007/papers/cidr07p15.pdf

Processor Eventsourced
 Processor Channel Persistent
 Channel View Journal Cluster
 Sharding Building Blocks

Processor import akka.persistence.{ Persistent, Processor } ! class MyProcessor extends Processor { def receive = { case Persistent(payload, sequenceNr) => // message successfully written to journal case other => // message not written to journal } } val processor = context.actorOf(Props[MyProcessor],
 name = "myProcessor") ! processor ! Persistent("foo") // will be journaled processor ! "bar" // will not be journaled

Processor class InvoiceService extends Processor { var invoices = Map.empty[String, Invoice] ! def receive: Receive = { case Persistent(CreateInvoice(id), _) => invoices = invoices.updated(id, Invoice(id)) } }

Processor class InvoiceService extends Processor { var invoices = Map.empty[String, Invoice] ! def receive: Receive = { case Persistent(CreateInvoice(id), _) => invoices = invoices.updated(id, Invoice(id)) ! case Persistent(AddInvoiceItem(id, item), _) => invoices.get(id) match { case Some(inv) => invoices = invoices.updated(id, inv.addItem(item)) case None => // TODO } ! case GetInvoice(id) => sender() ! invoices.getOrElse(id, "not found: " + id) ! case Persistent(SendInvoiceTo(id, address), _) => // TODO send to the invoice printing service } }

Processor case class CreateInvoice(invoiceId: String) case class AddInvoiceItem(invoiceId: String, invoiceItem: InvoiceItem) case class SendInvoiceTo(invoiceId: String, to: InvoiceAddress) case class GetInvoice(invoiceId: String) ! case class Invoice(id: String, items: IndexedSeq[InvoiceItem] = Vector.empty) { def addItem(item: InvoiceItem): Invoice = copy(items = items :+ item) } ! case class InvoiceItem(description: String, count: Int, amount: BigDecimal) ! case class InvoiceAddress(name: String, street: String, city: String)

Processor Identifier 
 override def processorId = "my-stable-processor-id" Default is actor path without address
 /user/top/myProcessor Don’t use anonymous processors

Akka  Persistence  Webinar Processor • Automatic recovery on start and restart • Stashing until recovery completed • Failure handling with supervisor strategy • Might want to delete erroneous messages

What about side effects during replay?

Processor with Channel val printingChannel = context.actorOf(Channel.props(), name = "printingChannel") val printingDestination = context.system / "printingService" ! def receive: Receive = { case p @ Persistent(SendInvoiceTo(id, address), _) => // send to the invoice printing service invoices.get(id) match { case Some(inv) => printingChannel ! Deliver(p.withPayload( PrintingOrder(inv, address)), printingDestination) invoices -= inv.id case None => // TODO } } class PrintingService extends Actor { def receive = { case p @ ConfirmablePersistent(payload, sequenceNr, redeliveries) => // ... p.confirm() } }

Akka  Persistence  Webinar EventsourcedProcessor • Incoming messages (commands) are not persisted • Steps: 1. Validate command 2. Create domain event and explicitly persist it 3. Update internal state, by applying the event 4. External side effects • During recovery the internal state is updated by applying the events • no external side effects

EventsourcedProcessor class BlogPost extends EventsourcedProcessor { override def receiveCommand: Receive = ???
 override def receiveRecover: Receive = ??? }

EventsourcedProcessor object BlogPost { case class AddPost(author: String, title: String) } ! class BlogPost extends EventsourcedProcessor { override def receiveCommand: Receive = ???
 override def receiveRecover: Receive = ??? }

object BlogPost { case class AddPost(author: String, title: String) } ! class BlogPost extends EventsourcedProcessor { import BlogPost._ ! override def receiveCommand: Receive = { case AddPost(author, title) => persist(PostAdded(author, title)) { evt => state = state.updated(evt) } } ! override def receiveRecover: Receive = ??? } EventsourcedProcessor

EventsourcedProcessor object BlogPost { case class AddPost(author: String, title: String) ! sealed trait Event case class PostAdded(author: String, title: String) extends Event ! private case class State(author: String, title: String) { def updated(evt: Event): State = evt match { case PostAdded(author, title) => copy(author, title) } } } ! class BlogPost extends EventsourcedProcessor { import BlogPost._ private var state = State("", “") ! override def receiveCommand: Receive = { case AddPost(author, title) => persist(PostAdded(author, title)) { evt => state = state.updated(evt) } } ! override def receiveRecover: Receive = ??? }

EventsourcedProcessor object BlogPost { case class AddPost(author: String, title: String) case class ChangeBody(body: String) case object Publish ! sealed trait Event case class PostAdded(author: String, title: String) extends Event case class BodyChanged(body: String) extends Event case object PostPublished extends Event ! private case class State(author: String, title: String, body: String, published: Boolean) { ! def updated(evt: Event): State = evt match { case PostAdded(author, title) => copy(author, title) case BodyChanged(b) => copy(body = b) case PostPublished => copy(published = true) } } }

EventsourcedProcessor class BlogPost extends EventsourcedProcessor { import BlogPost._ private var state = State("", "", "", false) ! override def receiveCommand: Receive = { case AddPost(author, title) => if (state.body == "" && author != "" && title != "") persist(PostAdded(author, title)) { evt => state = state.updated(evt) } case ChangeBody(body) => if (!state.published) persist(BodyChanged(body)) { evt => state = state.updated(evt) } case Publish => if (!state.published) persist(PostPublished) { evt => state = state.updated(evt) // call external web content service ... } } ! override def receiveRecover: Receive = { case evt: Event => state = state.updated(evt) } }

Snapshots class MyProcessor extends Processor { var state: Any = _ def receive = { case "snap" => saveSnapshot(state) case SaveSnapshotSuccess(metadata) => // ... case SaveSnapshotFailure(metadata, reason) => // ... } }

Snapshots class MyProcessor extends Processor { var state: Any = _ def receive = { case "snap" => saveSnapshot(state) case SaveSnapshotSuccess(metadata) => // ... case SaveSnapshotFailure(metadata, reason) => // ... ! case SnapshotOffer(metadata, offeredSnapshot) => state = offeredSnapshot case Persistent(payload, _) => // ... } }

Akka  Persistence  Webinar View • replays Persistent messages from a Processor’s journal • query side of CQRS • features • auto-update interval • Update message • limit • may store its own snapshots

View class InvoiceCounter extends View { import InvoiceCounter._ override def processorId: String = "/user/invoiceService" override def autoUpdateInterval = 10.seconds ! var count = 0L ! def receive: Actor.Receive = { case Persistent(payload: SendInvoiceTo, _) => count += 1 case _: Persistent => case GetInvoiceCount => sender ! InvoiceCount(count) } } ! object InvoiceCounter { case object GetInvoiceCount case class InvoiceCount(count: Long) }

Processor Eventsourced
 Processor Channel Persistent
 Channel View Journal Cluster
 Sharding Building Blocks

Akka  Persistence  Webinar At-least-once delivery sender destination $

Akka  Persistence  Webinar At-least-once delivery sender destination $ ok $ ok

Akka  Persistence  Webinar Channels • re-deliver messages until confirmed • application level confirmation • different semantics • duplicates may be received • message order not retained • after a crash and restart messages are still delivered • listener for RedeliverFailure notifications • recommendation: one destination per channel 
 exception: replies via channel

Akka  Persistence  Webinar Channel vs. PersistentChannel • Channel • use from Processor • in-memory • PersistentChannel • standalone usage • conceptually: processor + channel • persist messages before delivering • reply ack when persisted • more advanced delivery flow control

Processor with Channel class MyProcessor extends Processor { val channel = context.actorOf(Channel.props(), name = "myChannel") ! def receive = { case p @ Persistent(payload, _) => val destination = context.system / "myDestination" channel ! Deliver(p.withPayload("output msg"), destination) } } ! class MyDestination extends Actor { def receive = { case p @ ConfirmablePersistent(payload, sequenceNr, redeliveries) => // ... p.confirm() } }

PersistentChannel class Endpoint extends Actor { val channel = context.actorOf(PersistentChannel.props( PersistentChannelSettings(redeliverInterval = 3.seconds, redeliverMax = 10, replyPersistent = true)), name = "myChannel") val destination = context.system / "jobManager" ! import context.dispatcher implicit val timeout = Timeout(5.seconds) ! def receive = { case job: Job => (channel ? Deliver(Persistent(job), destination)) map { case _: Persistent => "OK: " + job.id } recover { case e => "FAILED: " + job.id } pipeTo sender() } }

Akka  Persistence  Webinar Serialization • pluggable, Akka serialization • application life-cycle, versioning • don’t use default Java serialization

Akka  Persistence  Webinar Journal and snapshot store • pluggable • LevelDB shipped with Akka – local files • Community http://akka.io/community/ • BDB • Cassandra • DynamoDB • HBase • MapDB • MongoDB

Akka  Persistence  Webinar Cluster • simple way of migrating/moving stateful actors in a cluster • distributed journal • shared LevelDB journal for testing • single writer per event stream • cluster singleton • cluster sharding

Akka  Persistence  Webinar Cluster Singleton A B C D

Akka  Persistence  Webinar Cluster Singleton A B C D role: backend-1 role: backend-1 role: backend-2 role: backend-2

Akka  Persistence  Webinar Cluster Sharding A B C D

Akka  Persistence  Webinar Cluster Sharding sender id:17 region
 node-­‐1 coordinator region
 node-­‐2 region
 node-­‐3 GetShardHome:17 id:17 ShardHome:17  -­‐>  node2 17  -­‐>  node2

Akka  Persistence  Webinar Cluster Sharding sender region
 node-­‐1 coordinator region
 node-­‐2 region
 node-­‐3 id:17 id:17 GetShardHome:17 ShardHome:17  -­‐>  node2 id:17 17  -­‐>  node2 17  -­‐>  node2

Akka  Persistence  Webinar Cluster Sharding 17 sender region
 node-­‐1 coordinator region
 node-­‐2 region
 node-­‐3 id:17 id:17 17  -­‐>  node2 17  -­‐>  node2 17  -­‐>  node2

Akka  Persistence  Webinar Cluster Sharding 17 sender region
 node-­‐1 coordinator region
 node-­‐2 region
 node-­‐3 17  -­‐>  node2 17  -­‐>  node2 17  -­‐>  node2 id:17

Akka  Persistence  Webinar Cluster Sharding 17 sender region
 node-­‐1 coordinator region
 node-­‐2 region
 node-­‐3 17  -­‐>  node2 17  -­‐>  node2 17  -­‐>  node2 id:17

Cluster Sharding val idExtractor: ShardRegion.IdExtractor = { case cmd: Command => (cmd.postId, cmd) } ! val shardResolver: ShardRegion.ShardResolver = msg => msg match { case cmd: Command => (math.abs(cmd.postId.hashCode) % 100).toString } ClusterSharding(system).start( typeName = BlogPost.shardName, entryProps = Some(BlogPost.props()), idExtractor = BlogPost.idExtractor, shardResolver = BlogPost.shardResolver) val postRegion: ActorRef = 
 ClusterSharding(context.system).shardRegion(BlogPost.shardName) ! val postId = UUID.randomUUID().toString postRegion ! BlogPost.AddPost(postId, author, title)

Processor Eventsourced
 Processor Channel Persistent
 Channel View Journal Cluster
 Sharding Building Blocks

Akka  Persistence  Webinar Next step • Documentation • http://doc.akka.io/docs/akka/2.3.0/scala/persistence.html • http://doc.akka.io/docs/akka/2.3.0/java/persistence.html • http://doc.akka.io/docs/akka/2.3.0/contrib/cluster-sharding.html • Typesafe Activator • https://typesafe.com/activator/template/akka-sample-persistence-scala • https://typesafe.com/activator/template/akka-sample-persistence-java • http://typesafe.com/activator/template/akka-cluster-sharding-scala • Mailing list • http://groups.google.com/group/akka-user • Migration guide from Eventsourced • http://doc.akka.io/docs/akka/2.3.0/project/migration-guide-eventsourced-2.3.x.html

Akka  Persistence  Webinar Akka in Action • All registrants for this webinar qualify for a free E-Book PDF subset of Akka in Action by Raymond Roestenburg, Rob Bakker and Rob Williams. Additionally, you will qualify for the Typesafe discount and can save 40% on the full book.

©Typesafe 2014 – All Rights Reserved

Add a comment

Related presentations

Presentación que realice en el Evento Nacional de Gobierno Abierto, realizado los ...

In this presentation we will describe our experience developing with a highly dyna...

Presentation to the LITA Forum 7th November 2014 Albuquerque, NM

Un recorrido por los cambios que nos generará el wearabletech en el futuro

Um paralelo entre as novidades & mercado em Wearable Computing e Tecnologias Assis...

Microsoft finally joins the smartwatch and fitness tracker game by introducing the...

Related pages

Intro to Akka persistence – Videos | @lightbend

Intro to Akka persistence With Patrik Nordwall Akka. Patrik Nordwall hosts "Akka Persistence." Akka Persistence is a new module in Akka 2.3 that adds actor ...
Read more

Upcoming webinars: Akka Persistence and Java 8 | @lightbend

Upcoming webinars: Akka Persistence and Java 8 As many of our readers know, we've been hosting monthly webinars now since August 2013, and (aside ...
Read more

Creating Persistent Actors in Akka.NET with Akka ...

Creating Persistent Actors in Akka.NET with ... to the database with Akka.Persistence - persistent actors asynchronously ... via webinar, lasts four hours ...
Read more

Akka.NET v0.8 Release - Dependency Injection, Akka ...

Akka.Persistence.TestKit - install-package Akka.Persistence.TestKit -pre; ... Each training is done remotely via webinar, lasts four hours, ...
Read more

Akka persistence webinar - Technology - documents.mx

1.Akka Persistence Webinar by Patrik Nordwall @patriknw 2. opt-in at-least-once delivery semantics between actors recover application state after a crash 3.
Read more

Akka Days Webinar: Day 1 - YouTube

Celebrate Akka Days! Day One Schedule: Duncan DeVore Title: Lessons Learned in Deploying Akka Persistence Jan Machacek, CTO & Alex Lashford ...
Read more

Akka persistence - Aleksei Irbe - YouTube

Advanced Java Meetup March 31, 2016 Reactive Programming with Akka. Slides available here: http://bit.ly/1TBBpPa.
Read more

Archiv - JAX

JAX | Die Konferenz für Java, Architektur & Softwareinnovationen vom 8. bis 12. Mai 2017 in Mainz
Read more