Cassandra Day SV 2014: Spark, Shark, and Apache Cassandra

43 %
57 %
Information about Cassandra Day SV 2014: Spark, Shark, and Apache Cassandra

Published on April 21, 2014

Author: planetcassandra



This session covers our experience with using the Spark and Shark frameworks for running real-time queries on top of Cassandra data.We will start by surveying the current Cassandra analytics landscape, including Hadoop and HIVE, and touch on the use of custom input formats to extract data from Cassandra. We will then dive into Spark and Shark, two memory-based cluster computing frameworks, and how they enable often dramatic improvements in query speed and productivity, over the standard solutions today.

Interactive Analytics With 
 Spark And Cassandra ! Evan Chan
 Ooyala, Inc. April 7Th, 2014

• Staff Engineer, Compute and Data Services, Ooyala • Building multiple web-scale real-time systems on top of C*, Kafka, Storm, etc. • Scala/Akka guy • Very excited by open source, big data projects • @evanfchan Who is this guy? !2

• Cassandra at Ooyala • What problem are we trying to solve? • Spark and Shark • Integrating Cassandra and Spark • Our Spark/Cassandra Architecture Agenda !3


OOYALA Powering personalized video experiences across all screens.

CONFIDENTIAL—DO NOT DISTRIBUTE !6CONFIDENTIAL—DO NOT DISTRIBUTE Founded in 2007 Commercially launch in 2009 230+ employees in Silicon Valley, LA, NYC, 
 London, Paris, Tokyo, Sydney & Guadalajara Global footprint, 200M unique users,
110+ countries, and more than 6,000 websites Over 1 billion videos played per month 
and 2 billion analytic events per day 25% of U.S. online viewers watch video 
 powered by Ooyala COMPANY OVERVIEW


TITLE TEXT GOES HERE • 12 clusters ranging in size from 3 to 107 nodes • Total of 28TB of data managed over ~220 nodes • Powers all of our analytics infrastructure • Traditional analytics aggregations • Recommendations and trends • DSE/C* 1.0.x, 1.1.x, 1.2.6 We are a large Cassandra user !8

TITLE TEXT GOES HERE • Started investing in Spark beginning of 2013 • 2 teams of developers doing stuff with Spark • Actively contributing to Spark developer community • Deploying Spark to a large (>100 node) production cluster • Spark community very active, huge amount of interest Becoming a big Spark user... !9


From mountains of raw data...

• Quickly • Painlessly • At scale?
 To nuggets of truth...

Today: Precomputed Aggregates • Video metrics computed along several high cardinality dimensions • Very fast lookups, but inflexible, and hard to change • Most computed aggregates are never read • What if we need more dynamic queries? • Top content for mobile users in France • Engagement curves for users who watched recommendations • Data mining, trends, machine learning

The Static - Dynamic Continuum • Super fast lookups • Inflexible, wasteful • Best for 80% most common queries • Always compute results from raw data • Flexible but slow 100% Precomputation 100% Dynamic

Where We Want To Be Partly dynamic • Pre-aggregate most common queries • Flexible, fast dynamic queries • Easily generate many materialized views


Introduction To Spark • In-memory distributed computing framework • Created by UC Berkeley AMP Lab in 2010 • Targeted problems that MR is bad at: –Iterative algorithms (machine learning) –Interactive data mining • More general purpose than Hadoop MR • Top level Apache project • Active contributions from Intel, Yahoo, lots of

Spark Vs Hadoop HDFS Map Reducee Map Reduce Data  Source map() join() Source  2 cache() transform

Throughput: Memory Is King 6-node C*/DSE 1.1.9 cluster,
 Spark 0.7.0 Spark cached RDD 10-50x faster than raw Cassandra

Developers Love It • “I wrote my first aggregation job in 30 minutes” • High level “distributed collections” API • No Hadoop cruft • Full power of Scala, Java, Python • Interactive REPL shell

Spark Vs Hadoop Word Count file = spark.textFile("hdfs://...")   file.flatMap(line => line.split(" "))     .map(word => (word, 1))     .reduceByKey(_ + _) 1 package org.myorg;! 2 ! 3 import;! 4 import java.util.*;! 5 ! 6 import org.apache.hadoop.fs.Path;! 7 import org.apache.hadoop.conf.*;! 8 import*;! 9 import org.apache.hadoop.mapreduce.*;! 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;! 11 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;! 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;! 13 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;! 14 ! 15 public class WordCount {! 16 ! 17 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {! 18 private final static IntWritable one = new IntWritable(1);! 19 private Text word = new Text();! 20 ! 21 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {! 22 String line = value.toString();! 23 StringTokenizer tokenizer = new StringTokenizer(line);! 24 while (tokenizer.hasMoreTokens()) {! 25 word.set(tokenizer.nextToken());! 26 context.write(word, one);! 27 }! 28 }! 29 } ! 30 ! 31 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {! 32 ! 33 public void reduce(Text key, Iterable<IntWritable> values, Context context) ! 34 throws IOException, InterruptedException {! 35 int sum = 0;! 36 for (IntWritable val : values) {! 37 sum += val.get();! 38 }! 39 context.write(key, new IntWritable(sum));! 40 }! 41 }! 42 ! 43 public static void main(String[] args) throws Exception {! 44 Configuration conf = new Configuration();! 45 ! 46 Job job = new Job(conf, "wordcount");! 47 ! 48 job.setOutputKeyClass(Text.class);! 49 job.setOutputValueClass(IntWritable.class);! 50 ! 51 job.setMapperClass(Map.class);! 52 job.setReducerClass(Reduce.class);! 53 ! 54 job.setInputFormatClass(TextInputFormat.class);! 55 job.setOutputFormatClass(TextOutputFormat.class);! 56 ! 57 FileInputFormat.addInputPath(job, new Path(args[0]));! 58 FileOutputFormat.setOutputPath(job, new Path(args[1]));! 59 ! 60 job.waitForCompletion(true);! 61 }! 62 ! 63 }!

One Platform To Rule Them All HIVE on Spark Spark Streaming - discretized stream processing • SQL, Graph, ML, Streaming all in one framework • Much higher code sharing/reuse • Easy integration between components • Fewer platforms == lower TCO • Integration with Mesos, YARN helps share resources

Shark - Hive On Spark • 100% HiveQL compatible • 10-100x faster than HIVE, answers in seconds • Reuse UDFs, SerDe’s, StorageHandlers • Can use DSE / CassandraFS for Metastore


Our Spark/Shark/Cassandra Stack Node1 Cassandra InputFormat SerDe Spark Worker Shark Node2 Cassandra InputFormat SerDe Spark Worker Shark Node3 Cassandra InputFormat SerDe Spark Worker Shark Spark Master Job Server

OPTIONS FOR READING FROM C* • Hadoop InputFormat – ColumnFamilyInputFormat - reads all rows from 1 CF – CqlPagingInputFormat, etc. - CQL3, 2-dary indexes – Roll your own (join multiple CFs, etc) • Spark native RDD – sc.parallelize(rowkeys).flatMap(readColum ns(_)) – JdbcRdd + Cassandra JDBC driver

Columnfamilyinputformat video type Record1 10 1 Record2 11 5 id Video Type Record1 10 1 Record2 11 5 • Must read from all rows • One CF only, not very flexible

Node 2Node 1 Spark RDD • RDD = Resilient Distributed Dataset • Multiple partitions living on different nodes • Each partition has records Partition 1 Partition 2 Partition 3 Partition 4

Inputformat Vs Rdd InputFormat RDD Supports Hadoop, HIVE, Spark, Shark Spark / Shark only Have to implement multiple classes - InputFormat, RecordReader, Writeable, etc. Clunky API. One class - simple API. Two APIs, and often need to implement both (HIVE needs older...) Just one API. • You can easily use InputFormats in Spark using newAPIHadoopRDD(). • Writing a custom RDD could have saved us lots of time.

Node 2Node 1 Skipping The Inputformat Row 1 data Row 2 data Row 3 data Row 4 data Rowkey1 Rowkey2 Rowkey3 Rowkey4 sc.parallelize(rowkeys).flatMap(readColumns(_))


From Raw Logs To Fast Queries Process C*
 columnar storage Raw Logs Raw Logs Raw Logs Spark Spark Spark OLAP Table 1 OLAP Table 2 OLAP Table 3 Spark Shark Predefined queries Ad-hoc HiveQL

Why Cassandra Alone Isn’t Enough • Over 30 million multi-dimensional fact table rows per day • Materializing every possible answer isn’t close to possible • Multi dimensional filtering and grouping alone leads to many billions of possible answers • Querying fact tables in Cassandra is too slow • Reading millions or billion random rows • CQL doesn’t support grouping

Our Approach • Use Cassandra to store the raw fact tables • Optimize the schema for OLAP workloads • Fast full table reads • Easily read fewer columns • Use Spark for fast random row access and fast distributed computation

uuid- part-0 uuid- part-1 2013-04-05T 00:00Z#id1 Section 0 Section 1 Section 2 country rows 0-9999 rows 10000-19999 rows .... city rows 0-9999 rows 10000-19999 rows .... Index CF Columns CF An OLAP Schema for Cassandra Metadata 2013-04-05T 00:00-part-0 {columns: [“country”, “city”, “plays” ]} Metadata CF •Optimized for: selective column loading, maximum throughput and compression

Olap Workflow Dataset Aggregation Job Query Job Spark
 Executors Cassandra REST Job Server Query Job Aggregate Query Result Query Result

Querying Data In Spark Node 2Node 1 Partition 1 ! record1 record2 record3
 Partition 2 ! record4 record5 record6
 Partition 3 ! record7 record8 record9 Partition 4 ! record10 record11 record12 Convert to Spark SQL / Shark Table Shark / Spark SQL / .map / .sort / .join etc rdd.reduce / .collect / .to p

Fault Tolerance • Cached dataset lives in Java Heap only - what if process dies? • Spark lineage - automatic recomputation from source, but this is expensive! • Can also replicate cached dataset to survive single node failures • Persist materialized views back to C*, then load into cache -- now recovery path is much faster

DEMO !39

Creating a Shark Table

Creating a Cached Table

Querying a Cached Table

THANK YOU And YES, We’re HIRING!! @evanfchan

Industry Trends • Fast execution frameworks • Impala • In-memory databases • VoltDB, Druid • Streaming and real-time • Higher-level, productive data

PERFORMANCE #’S Spark: C* -> OLAP aggregates
 cold cache, 1.4 million events 130 seconds C* -> OLAP aggregates
 warmed cache 20-30 seconds OLAP aggregate query via Spark
 (56k records) 60 ms 6-node C*/DSE 1.1.9 cluster,
 Spark 0.7.0

EXAMPLE: OLAP PROCESSING t0 2013-0 4-05T0 0:00Z#i {vide o: 10, 2013-0 4-05T0 0:00Z#i {vide o: 20, C* events OLAP Aggregates OLAP Aggregates OLAP Aggregates Cached Materialized Views Spark Spark Spark Union Query 1: Plays by Provider Query 2: Top content for mobile

Add a comment

Related presentations

Presentación que realice en el Evento Nacional de Gobierno Abierto, realizado los ...

In this presentation we will describe our experience developing with a highly dyna...

Presentation to the LITA Forum 7th November 2014 Albuquerque, NM

Un recorrido por los cambios que nos generará el wearabletech en el futuro

Um paralelo entre as novidades & mercado em Wearable Computing e Tecnologias Assis...

Microsoft finally joins the smartwatch and fitness tracker game by introducing the...

Related pages

Spark and Cassandra | Spark Summit 2014

Spark Summit 2014 brought the Apache Spark community together on ... Spark Summit 2014. ... Apache Cassandra is the leading distributed database in use ...
Read more

Using Spark Streaming for High Velocity Analytics on ...

Spark Summit 2014 brought the Apache Spark community ... Shark, Spark Streaming ... Spark Streaming and Cassandra together make up a platform ...
Read more

Cassandra Summit EU 2014 — Register Today!

Europe's largest Apache Cassandra conference Cassandra Summit EU 2014 is ... Apache Cassandra conference ... to Spark, Shark, Scala, and Cassandra.
Read more

Instagram: Apache Cassandra at Instagram 2014 - YouTube

Instagram: Apache Cassandra at Instagram 2014 ... Cassandra Day Seattle 2014: Cassandra Data ... Apache Cassandra & Apache Spark for ...
Read more

Apache Cassandra and Spark at “Spark Summit 2014″ - 为程序员服务

Come hear from leading production users of Spark, Shark, ... Open Source Mechanic for Apache Cassandra at DataStax ... Cassandra Day New York 2014 ...
Read more

Documentation | Apache Spark

Apache Spark Documentation. ... Videos from Spark Summit 2014, San Francisco, ... load a dataset, and query it with Spark, Shark, Spark Streaming, ...
Read more

Getting Started with the DataStax C# Driver for Apache ...

So you've grabbed the latest 2.0 beta of DataStax C# driver ... Cassandra, Spark and Shark at ... Apache Cassandra & Apache Spark for ...
Read more

Running Real-Time Queries with Spark and Shark on Top of ...

“Running Real-Time Queries with Spark and Shark on Top of Cassandra Data” was ... at Cassandra Day Silicon ... Apache, Apache Cassandra, ...
Read more