Samza la hug

50 %
50 %
Information about Samza la hug
Technology

Published on March 10, 2014

Author: sriramsub

Source: slideshare.net

Description

Apache Samza talk at LA HUG

Apache Samza* Reliable Stream Processing atop Apache Kafka and Yarn Sriram Subramanian Me on Linkedin Me on twitter - @sriramsub1 * Incubating

Agenda • Why Stream Processing? • What is Samza’s Design ? • How is Samza’s Design Implemented? • How can you use Samza ? • Example usage at Linkedin

Why Stream Processing?

Response latency 0 ms

Response latency Synchronous 0 ms

Response latency Synchronous Later. Possibly much later. 0 ms

Response latency Milliseconds to minutes Synchronous Later. Possibly much later. 0 ms

Newsfeed Ad Relevance

Search Index Metrics and Monitoring

What is Samza’s Design ?

Stream A JOB Stream B Stream C

Stream A JOB 1 Stream B Stream C Stream D JOB 2 Stream E Stream F JOB 3 Stream G

Streams Partition 0 Partition 1 Partition 2

Streams Partition 0 Partition 1 Partition 2 1 2 3 4 5 6 1 2 3 4 5 1 2 3 4 5 6 7

Streams Partition 0 Partition 1 Partition 2 1 2 3 4 5 6 1 2 3 4 5 1 2 3 4 5 6 7

Streams Partition 0 Partition 1 Partition 2 1 2 3 4 5 6 1 2 3 4 5 1 2 3 4 5 6 7

Streams Partition 0 Partition 1 Partition 2 1 2 3 4 5 6 1 2 3 4 5 1 2 3 4 5 6 7

Streams Partition 0 Partition 1 Partition 2 1 2 3 4 5 6 1 2 3 4 5 1 2 3 4 5 6 7

Streams Partition 0 Partition 1 Partition 2 next append 1 2 3 4 5 6 1 2 3 4 5 1 2 3 4 5 6 7

Jobs Stream A Stream B Task 1 Task 2 Task 3 Stream C

Jobs AdViews AdClicks Task 1 Task 2 Task 3 AdClickThroughRate

Tasks AdViews CounterTask Partition 0 Partition 1 Ad Views - Partition 0 1 2 3 4 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 Ad Views - Partition 0 1 2 3 4 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 Ad Views - Partition 0 1 2 3 4 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 Ad Views - Partition 0 1 2 3 4 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 Ad Views - Partition 0 1 2 3 4 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 Ad Views - Partition 0 1 2 3 4 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 Ad Views - Partition 0 1 2 3 4 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 Ad Views - Partition 0 1 2 3 4 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 1 2 3 4 2 Partition 1 Checkpoint Stream Ad Views - Partition 0 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 1 2 3 4 2 Partition 1 Checkpoint Stream Ad Views - Partition 0 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 1 2 3 4 2 Partition 1 Checkpoint Stream Ad Views - Partition 0 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 1 2 3 4 2 Partition 1 Checkpoint Stream Ad Views - Partition 0 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 1 2 3 4 2 Partition 1 Checkpoint Stream Ad Views - Partition 0 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 1 2 3 4 2 Partition 1 Checkpoint Stream Ad Views - Partition 0 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 1 2 3 4 2 Partition 1 Checkpoint Stream Ad Views - Partition 0 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 1 2 3 4 2 Partition 1 Checkpoint Stream Ad Views - Partition 0 Output Count Stream

Tasks AdViews CounterTask Partition 0 Partition 1 1 2 3 4 2 Partition 1 Checkpoint Stream Ad Views - Partition 0 Output Count Stream

Dataflow Stream A Stream B Stream C Stream E Stream B Job 1 Job 2 Stream D Job 3

Dataflow Stream A Stream B Stream C Stream E Stream B Job 1 Job 2 Stream D Job 3

Stateful Processing • Windowed Aggregation – Counting the number of page views for each user per hour • Stream Stream Join – Join stream of ad clicks to stream of ad views to identify the view that lead to the click • Stream Table Join – Join user region info to stream of page views to create an augmented stream

• In memory state with checkpointing – Periodically save out the task’s in memory data – As state grows becomes very expensive – Some implementation checkpoints diffs but adds complexity How do people do this?

• Using an external store – Push state to an external store – Performance suffers because of remote queries – Lack of isolation – Limited query capabilities How do people do this?

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Stateful Tasks Stream A Task 1 Task 2 Task 3 Stream B Changelog Stream

Key-Value Store • put(table_name, key, value) • get(table_name, key) • delete(table_name, key) • range(table_name, key1, key2)

How is Samza’s Design Implemented?

Apache Kafka • Persistent, reliable, distributed message queue

At LinkedIn 10+ billion writes per day 172k messages per second (average) 60+ billion messages per day to real-time consumers

Apache Kafka • Models streams as topics • Each topic is partitioned and each partition is replicated • Producer sends messages to a topic • Messages are stored in brokers • Consumers consume from a topic (pull from broker)

YARN- Yet another resource negotiator • Framework to run your code on a grid of machines • Distributes our tasks across multiple machines • Notifies our framework when a task has died • Isolates our tasks from each other

Jobs Stream A Task 1 Task 2 Task 3 Stream B

Containers Task 1 Task 2 Task 3 Stream B Stream A

Containers Stream B Stream A Samza Container 1 Samza Container 2

Containers Samza Container 1 Samza Container 2

YARN Samza Container 1 Samza Container 2 Host 1 Host 2

YARN Samza Container 1 Samza Container 2 NodeManager NodeManager Host 1 Host 2

YARN Samza Container 1 Samza Container 2 NodeManager NodeManager Samza YARN AM Host 1 Host 2

YARN Samza Container 1 Samza Container 2 NodeManager Kafka Broker NodeManager Samza YARN AM Kafka Broker Host 1 Host 2

YARN MapReduce Container MapReduce Container NodeManager HDFS NodeManager MapReduce YARN AM HDFS Host 1 Host 2

YARN Samza Container 1 NodeManager Kafka Broker Host 1 Stream C Stream A Samza Container 1 Samza Container 2

YARN Samza Container 1 NodeManager Kafka Broker Host 1 Stream C Stream A Samza Container 1 Samza Container 2

YARN Samza Container 1 NodeManager Kafka Broker Host 1 Stream C Stream A Samza Container 1 Samza Container 2

YARN Samza Container 1 NodeManager Kafka Broker Host 1 Stream C Stream A Samza Container 1 Samza Container 2

YARN Samza Container 1 Samza Container 2 NodeManager Kafka Broker NodeManager Samza YARN AM Kafka Broker Host 1 Host 2

CGroups Samza Container 1 Samza Container 2 NodeManager Kafka Broker NodeManager Samza YARN AM Kafka Broker Host 1 Host 2

How can you use Samza ?

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Tasks Partition 0 class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); } }

Stateful Stream Task public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); } }

Stateful Stream Task public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); } }

Stateful Stream Task public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); } }

Stateful Stream Task public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); } }

Example usage at Linkedin

Call graph assembly get_unread_msg_count() get_PYMK() get_Pulse_news() get_relevant_ads() get_news_updates()

Lots of calls == lots of machines, logs get_unread_msg_count() get_PYMK() get_Pulse_news() get_relevant_ads() get_news_updates() unread_msg_service_call get_PYMK_service_call pulse_news_service_call add_relevance_service_call news_update_service_call

TreeID: Unique identifier page_view_event (123456) unread_msg_service_call (123456) another_service_call (123456) silly_service_call (123456) get_PYMK_service_call (123456) counter_service_call (123456) unread_msg_service_call (123456) count_invites_service_call (123 count_msgs_service_call (1234

OK, now lots of streams with TreeIDs… all_service_calls (partitioned by TreeID) Samza job: Repartition-By-TreeID *_service_call Samza job: Assemble Call Graph service_call_graphs • Near real-time holistic view of how we’re actually serving data • Compare day-over-day, cost, changes, outages

Thank you • Quick start: bit.ly/hello-samza • Project homepage: samza.incubator.apache.org • Newbie issues: bit.ly/samza_newbie_issues • Detailed Samza and YARN talk: bit.ly/samza_and_yarn • A must-read: http://bit.ly/jay_on_logs • Twitter: @samzastream • Me on Twitter: @sriramsub1

Add a comment

Related presentations

Presentación que realice en el Evento Nacional de Gobierno Abierto, realizado los ...

In this presentation we will describe our experience developing with a highly dyna...

Presentation to the LITA Forum 7th November 2014 Albuquerque, NM

Un recorrido por los cambios que nos generará el wearabletech en el futuro

Um paralelo entre as novidades & mercado em Wearable Computing e Tecnologias Assis...

Microsoft finally joins the smartwatch and fitness tracker game by introducing the...

Related pages

Samza | LinkedIn

View 553 Samza posts, presentations, experts, and more. Get the professional knowledge you need on LinkedIn.
Read more

Dutch hug - Documents

Samza la hug Reading PA HUG 03-03-14 ... Extensibility and type safety in formatting: the design of xformat - Dutch HUG - 11 September 2009. Dutch. Dutch ...
Read more

[Kafka-users] KOYA vs. Samza? - Grokbase

(4 replies) Hi, I was wondering if anyone can compare and contrast KOYA and Samza? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized ...
Read more

Turning the database inside-out with Apache Samza

Turning the database inside-out with Apache Samza ... We have a problem at our end where its getting very difficult to scale merge operations in huge ...
Read more

J on the Beach

The total IT conference for developers & DevOps on the beach. Sun ... Big Data, JVM and loads of fun! - 20th-21st May 2016 La Térmica ... Storm and Samza.
Read more

c 2016 Guangxiang Du

... the demand for real time processing of huge ... Samza and many other stream processing ... we present three novel techniques to reduce the tail la-
Read more

Welcome to The Apache Software Foundation!

The Apache Software Foundation. provides support for the Apache Community of open-source software ... Samza; Santuario; Sentry; Serf; ServiceMix; Shiro ...
Read more

Krishna Kumar S. - Los Angeles Big Data Users Group (Los ...

Krishna Kumar S. http://www.meetup.com/Los-Angeles-Big-Data-Users-Group ... Los Angeles Hadoop Users Group- LA-HUG. Member. Pasadena ... LA Mobile ...
Read more