Reactive Programming

60 %
40 %
Information about Reactive Programming
Technology

Published on March 6, 2014

Author: maruyama097

Source: slideshare.net

Reactive Programming @maruyama097 丸山不二夫

In a recent post on the NetBeans developer site, one of the core maintainer observed that a single class had been patched over 14 times to fix threading related problems. --- Brian Goetz et al “Java Concurrency In Practice”

1/3 of the code in Adobe’s desktop applications is devoted to event handling logic 1/2 of the bugs reported during a product cycle exist in this code --- quoted in Martin Odersky’s “Deprecating the Observer Pattern”

Merriam-Webster defines reactive as “readily responsive to a stimulus”, i.e. its components are “active” and always ready to receive events. This definition captures the essence of reactive applications, focusing on systems that: event-driven, scalable, resilient, responsive --- “Reactive Manifesto”

はじめに  Reactive プログラミングの歴史は、インタラ クティブなUIの登場とともに古い。表計算ソ フトは、もっとも身近な最も成功した Reactive プログラミングの一つである。  現代のReactive プログラミングをドライブし ている力の一つは、ResponsiveなUIへの欲 求である。その主要な舞台は、Web UIであ る。Web Componentとともに、post HTML5の中心的なWeb技術の一つである、 Model Driven View(MDV)は、こうした Reactive プログラミング技術と見ることが出

はじめに  現代のReactive プログラミングをドライブし ているもう一つの力は、非同期でイベント・ ドリブンな処理への志向である。この方向は 、CPUのメニコア化と分散処理の巨大なクラ ウドへの拡大という、まったく異なる二つの 分野でともに強力に押し進められている。  2010年頃に公開されたマイクロソフト社の Rx(Reactive Extention)は、Observableの 役割に注目し、Reactive プログラミングの新 しい手法を切り開いた画期的なものであった 。

はじめに  Rxは、オープンソース化され、多くの言語に 移植され、また多くのReactive プログラミン グ言語に大きな影響を与えた。  Rxの応用で注目すべき事例は、Netflixによる RxのJavaへの移植と、それに基づくAPIの書 き換え・システムの再構築の成功である。  それはまた、小さな関数から大きな関数を合 成するという関数型のアプローチの、大規模 な実システムでの有効性を示すものでもあっ た。

Agenda Part I  WebとReactiveプログラミング       非同期とイベント・ドリブンへの志向 Web UIとReactiveプログラミング Meteor AngularJS Dart Polymer.dart

Agenda Part II  Reactive Extensionの基礎       LINQとEnumerable RxとObservable Observableを使ったEvent処理 Rx for JavaScript Observableを使った非同期処理 AndroidとRx

Agenda Part III  Observableを活用する  Observableのストリームを処理する  Observableを変形・加工する合成可能な 関数  ServerサイドでのRxの事例 Netflix  NetflixでのObservableの利用

Agenda Part IV  Reactive Manifesto  The Need to Go Reactive       Reactive Applications Event-driven Scalable Resilient Responsive Cocclusion

Part I WebとReactiveプログラミン グ

表計算とReactive Web UI

表計算ソフトはReactive 最も身近で、成功したアプリの一つ

表計算のアルゴリズムを考える Loop { セルCijへの入力/修正のイベント検出 Loop { セルCijへの入力/修正は終了したか? } セルCijに関連付けられている全てのマクロMnの検索 Loop { 新しいセルCijの値で全てのMnを再計算 } } Mnが定義されているセルCklの値を再表示

AngularJSの処理

AngularJSの処理 1. 2. 3. 4. 5. 6. 7. ‗X‘ keyが押されると、ブラウザーはinputコントロールに keydownイベントを発する。 input directiveは、inputの値の変化を捉えて、Angular 実行 コンテキスト内のアプリケーション・モデルを変更する為に、 $apply(―name = ‗X‘;‖) を呼び出す。 Angularは、 name = ‗X‘; をモデルに適用する。 $digest ループが始まる。 $watch リストは、name属性の変化を検出すると、 {{name}}補完に通知をして、それが、つづいてDOMを更新す る。 Angularは実行コンテキストを抜け出す。それで、keydownイ ベントは終わり、それとともにJavaScriptの実行コンテキスト も終わる。 ブラウザーは、更新されたテキストで、ビューを再描画する。

非同期とイベント・ドリブンへ の注目

2009年11月 JSConf Ryan Dahlの登場  ここでは、node.jsの爆発的な拡大の起点となっ た、2009年ベルリンで行われた、JSConfでの node.jsの創始者Ryan Dahlの講演の一部を見て みよう。 “Node.js, Evented I/O for V8 Javascript” by Ryan Dahl  http://jsconf.eu/2009/speaker/speakers_selecte d.html#entry-3356  http://s3.amazonaws.com/four.livejournal/2009 1117/jsconf.pdf

I/O は、違ったやり方で 行われる必要がある。  多くのWebアプリは、次のようなコードを使 う。 var result = db.query("select * from T"); // use result  データベースを検索している間、ソフトウェ アは何をしているのだろう? たいていの場 合、ただレスポンスを待っているだけである 。  それに加えて、IOによる遅延は、コンピュー タの能力と比較すると、次のように、巨大な http://s3.amazonaws.com/four.livejournal/20091117/jsconf.pdf ものである。

I/O による遅延      L1: 3 cycles L2: 14 cycles RAM: 250 cycles DISK: 41,000,000 cycles NETWORK: 240,000,000 cycles  別の例を挙げよう。サーバーのパフォーマン スも実装によって大きな違いが生まれている 。次のズは、AppacheとNGINXを比較した ものである。

―Latency as an Effect‖ ―Principles of Reactive Programming‖ Coursera lecture by Erik Meijer https://class.coursera.org/reactive001/lecture/51

2013 Nov 4th ―Principles of Reactive Programming‖ https://www.coursera.org/course/reactive

trait Socket { def readFromMemory(): Array[Byte] def sendToEurope(packet: Array[Byte]): Array[Byte] } val socket = Socket() val packet = socket.readFromMemory() val confirmation = socket.sendToEurope(packet)

PCでの処理時間

シーケンシャルな実行時間 val socket = Socket() val packet = socket.readFromMemory() // 50,000 ナノ秒ブロックする // 例外が発生しない時には、次の処理へ val confirmation = socket.sendToEurope(packet) // 150,000,000 ナノ秒ブロックする // 例外が発生しない時には、次の処理へ

1 ナノ秒を 1 秒だと思うと

1ナノ秒を1秒だとした場合の実行 時間 val socket = Socket() val packet = socket.readFromMemory() // 3日間ブロックする // 例外が発生しない時には、次の処理へ val confirmation = socket.sendToEurope(packet) // 5年間ブロックする // 例外が発生しない時には、次の処理へ

var result = db.query("select..");  このようなコードは、全体のプロセスをブロ ックするか、複数のプロセスの実行をスタッ クさせることになるかのいずれかである。  しかし、次のようなコードでは、プログラム は、すぐにevent loopに戻ることが可能であ る。どのようなメカニズムも必要としない。 db.query("select..", function (result) { // use result });  これが、I/Oが行われるべき方法である。  では、何故、誰もがevent loopやCallbackや Non-BlokingI/Oを使わないのだろうか?

文化的偏見  我々は、I/Oを次のように行うと教えられてき た puts("Enter your name: "); var name = gets(); puts("Name: " + name);  我々は、入力を要求されたら、それが終わる まで何もしないと教えられてきた。次のよう なコードは、複雑すぎるとして、退けられて きた。 puts("Enter your name: "); gets(function (name) {

インフラの不在  それでは、何故、誰もが event loopを使わな いのだろう?  シングル・スレッドのevent loopは、I/Oが non-blockingであることを要求するのだが、 ほとんどのライブラリーはそうなっていない 。  POSIX async file I/O は使えない.  Man pagesは、関数がディスクにアクセスするこ とがあることを触れていない。 (e.g getpwuid())  Cに、クロージャーや無名関数がないことが、コ ールバックを難しくしている。

JavaScript  Javascriptは、event loopと一緒に使われる ように、特別にデザインされた言語である。  無名関数、クロージャー  同時には、一つだけのコールバック  DOMのイベントコールバックを通じたI/O  JavaScriptのカルチャーは、既に、イベント 中心のプログラミングの準備ができている。

node.js project: 高度な並列プログラムを記述する、純 粋にイベント中心で、non-blocking なインフラを提供すること。

Libuv  libuv enforces an asynchronous, eventdriven style of programming.  Its core job is to provide an event loop and callback based notifications of I/O and other activities.  libuv offers core utilities like timers, non-blocking networking support, asynchronous file system access, child processes and more.

―Brief Hisory of Node.js‖ Ben Noordhuis http://vimeo.com/51637038

―An Introduction to libuv‖ Nikhil Marathe November 10, 2013 Release 1.0.0 http://nikhilm.github.io/uvbook/An% 20Introduction%20to%20libuv.pdf

#include #include #include #include <stdio.h> <fcntl.h> <unistd.h> <uv.h> libuvスタイルでの catの実装 void on_read(uv_fs_t *req); uv_fs_t open_req; uv_fs_t read_req; uv_fs_t write_req; char buffer[1024]; イベントの ようなもの open, red, writeの libuv版は、第一引数に loopを取る。最終引数は callback void on_write(uv_fs_t *req) { // callbackのsigniture uv_fs_req_cleanup(req); if (req->result < 0) { fprintf(stderr, "Write error: %sn", uv_strerror(uv_last_error(uv_default_loop()))); } else { uv_fs_read(uv_default_loop(), &read_req, open_req.result, buffer, sizeof(buffer), -1, on_read); // callback } } // on_write callbackの中で、on_read callbackが呼ばれる

void on_read(uv_fs_t *req) { // callbackのsigniture uv_fs_req_cleanup(req); if (req->result < 0) { fprintf(stderr, "Read error: %sn", uv_strerror(uv_last_error(uv_default_loop()))); } else if (req->result == 0) { uv_fs_t close_req; // synchronous uv_fs_close(uv_default_loop(), &close_req, open_req.result, NULL); } // callbackがNULL、同期型 else { uv_fs_write(uv_default_loop(), &write_req, 1, buffer, req->result, 1, on_write); } // callback } // on_read callbackの中で、on_write callbackが呼ばれる

void on_open(uv_fs_t *req) { // callbackのsigniture if (req->result != -1) { uv_fs_read(uv_default_loop(), &read_req, req->result, buffer, sizeof(buffer), -1, on_read); // callback } else { fprintf(stderr, "error opening file: %dn", req->errorno); } uv_fs_req_cleanup(req); } // on_open callbackの中で、on_read callbackが呼ばれる int main(int argc, char **argv) { uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open); // callback return 0; } // openは、on_open callbackを呼んで、すぐループに入る。

Callback Hell  “A guide to writing asynchronous javascript programs” http://callbackhell.com/ Asynchronous javascript, or javascript that uses callbacks, is hard to get right intuitively.  “Callback hell in nodejs?” http://stackoverflow.com/questions/18095107/callbackhell-in-nodejs In below code am I in callbackhell? How to overcome such scenario without using any async modules in pure javascript?

Web UIとReactiveプログラミ ング

Web UI での Reactive / Model Driven View(MDV)への志向  Meteor.js  Reactive プログラミング  Reactive-contextとReactive-data-source  publish/subscribe モデル  AngularJS  Imperativeからdeclarative  Data binding  Polymer.js  Model Driven View  .....

Meteor Reactive プログラミング http://docs.meteor.com/ https://github.com/meteor/meteor

Meteor Template <template name="hello"> <div class="greeting">Hello there, {{first}} {{last}}!</div> </template> // in the JavaScript console > Template.hello({first: "Alyssa", last: "Hacker"}); => "<div class="greeting">Hello there, Alyssa Hacker!</div>" Meteor.render(function () { return Template.hello({first: "Alyssa", last: "Hacker"}); }) => automatically updating DOM elements

Meteor LiveHTML var fragment = Meteor.render( function () { var name = Session.get("name") || "Anonymous"; return "<div>Hello, " + name + "</div>"; }); document.body.appendChild(fragment); Session.set(“name”, “Bob”); // ページは自動的に更新される

Meteor.render()  Meteor.render は、rendering function、ある HTMLを文字列として返すを関数を引数に取る。それ は、自動更新するDocumentFragmentを返す。  rendering functionで利用されたデータに変更があ ったとき、それは、再実行される。 DocumentFragment内のDOMノードは、ページの どの場所に挿入されていても、その場で自分自身を更 新する。それは全く自動的である。  Meteor.renderは、rendering functionによって、 どのデータが利用されたかを発見する為にreactive contextを使う。

Meteor: reactive context + reactive data source  reactive context + reactive data source という 単純なパターンは、広い応用可能性をもっている。  それ以上に、プログラマーは、 unsubscribe/resubscribe の呼び出しのコードを書 く手間が省け、それらは、正しい時に確実に呼び出さ れことになる  一般的に、Meteorは、そうでなければ、誤りを犯し やすいロジックで、アプリケーションの邪魔になる、 データ伝搬の全てのクラスを取り除くことが出来る、

AngularJS Data Binding http://angularjs.org/ https://github.com/angular/angular.j s

AngularJS Data Binding デモ <!doctype html> <html ng-app> <head> <script src= "http://code.angularjs.org/1.2.0rc1/angular.min.js"> </script> </head> <body> <input ng-model="name"> <p>Hello {{name}}!</p> </body> </html>

Controller  controllerは、viewの背後にあるコードで ある。その役割は、モデルを構築して、コー ルバック・メソッドにそって、それをビュー に公開することである。  viewは、scopeのtemplate(HTML)上へ の投射である。  scopeは、モデルをviewにつなげ、イベント をControllerに向けさせる「つなぎ」の役割 を果たす。

The separation of the controller and the view  controllerは、JavaScriptで書かれる。 JavaScript は命令型である。命令型は、アプリケーションの振る 舞いを指定するには、もっとも適している。 controllerは、(DOMへの参照、HTMLの断片のよう な)レンダー用の情報を含むべきではない。  view templateはHTMLで書かれる。HTMLは宣言型 である。宣言型は、UIの指定に最適である。viewは 、振る舞いを含むべきではない。  controllerはviewのことを知らないので、同一の contorollerに対して複数のviewがあり得る。このこ とは、スキンの変更・デバイス特有のview(モバイ ルとPC)・テスト可能性に取って、重要である。

Model  modelは、templateとマージされてviewを生み出す データである。modelがviewにレンダーされる為に は、modelは、scopeから参照されなければならない 。  他の多くのフレームワークとは異なって、Angularは 、modelについて何の制限も要請も設けてはいない。 modelにアクセス、あるいはmodelを変更する為に 、継承すべきクラスも、特別のアクセス・メソッドも ない。  modelは、プリミティブでも、オブジェクト・ハッシ ュでも、全くのオブジェクト型でも構わない。要する に、modelは、‖plain JavaScript object‖ なのであ る。

View  viewは、ユーザーが見るものである。viewの ライフサイクルは、templateとして始まる。 viewは、modelとマージされて、最終的には 、ブラウザーのDOMにレンダーされる。  Angularは、他の大部分のtemplateシステム と比較して、全く異なるアプローチを採用し ている。  Angularのtemplateシステムは、文字列の上 ではなくDOMオブジェクトの上で動く。 templateは確かにHTMLで書かれているが、 それはHTMLである。

MDV templates in Dart https://www.dartlang.org/articles/we b-ui/#one-way-binding

MDV templates in Dart  One-way data binding: Embed data into your UI  Two-way data binding: Keep data in sync with UI changes  Conditionals: Selectively show parts of the UI  Loops: Construct lists and tables by iterating over collections  Event listeners: Easily attach code that reacts to UI events

One-way data binding <html lang="en"> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> <link rel="stylesheet‖ href="//cdnjs.cloudflare.com/ajax/libs /twitter-bootstrap/2.3.1/css/bootstrap.css"> </head> <body> <div class="well">Hello {{dataValue}}!</div> <script type="application/dart"> String dataValue; main() { var today = new DateTime.now(); dataValue = 'world ${today.year}-${today.month}-${today.day}'; } </script> </body> </html>

<html lang="en"> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> <link rel="stylesheet ‖href="//cdnjs.cloudflare.com/ajax/libs /twitter-bootstrap/2.3.1/css/bootstrap.css"> </head> <body> <div class="well">Hello counter: {{count}}</div> <script type="application/dart"> import 'dart:async' as async; import 'dart:html' as html; import 'package:web_ui/watcher.dart' as watchers; int count; main() { count = 0; new async.Timer.periodic(const Duration(seconds: 1), (_) { count++; watchers.dispatch(); }); } </script> </body> </html>

Two-way data binding <html lang="en"> <head> ..... </head> <body> <div class="well"> Input: <input type="text" bind-value="str" placeholder="type something here"> <div> Value: {{str}}</div> <div> Length: {{str.length}}</div> </div> <script type="application/dart"> String str = ''; main() {} </script> </body> </html>

Conditionals <html lang="en"> <head> ..... </head> <body> <div class="well"> <div> Input1: <input type="text" bind-value="str1"></div> <div> Input2: <input type="text" bind-value="str2"></div> <template if='str1 == str2'> <div>They match!</div> </template> </div> <script type="application/dart"> String str1 = ''; String str2 = ''; main() {} </script> </body> </html>

<html lang="en"> <head> ..... </head> <body> <input type="checkbox" bind-checked="show">Show phones <br> <table class="table"> <thead> <tr> <td>Last</td><td>First</td> <td template if="show">Phone</td> </tr> </thead> <tbody> <tr> <td>Bracha</td> <td>Gilad</td> <td template if="show">555-555-5555</td> </tr>

<tr> <td>Bak</td> <td>Lars</td> <td template if="show">222-222-2222</td> </tr> <tr> <td>Ladd</td> <td>Seth</td> <td template if="show">111-222-3333</td> </tr> </tbody></table> <script type="application/dart"> bool show = true; main() {} </script> </body> </html>

Loops <html lang="en"> <head> ..... </head> <body> <table class="table"> <tbody template iterate='row in table'> <tr template iterate='cell in row'> <td>{{cell}}</td> </tr> </tbody> </table> <script type="application/dart"> var table = [['X', 'O', '_'], ['O', 'X', '_'], ['_', '_', 'X']]; main() {} </script> </body> </html>

Event listeners <html lang="en"> <head> ..... </head> <body> <div class="well"> <button on-click="increment()">Click me</button> <span>(click count: {{count}})</span> </div> <script type="application/dart"> int count = 0; void increment() { count++; } main() {} </script> </body> </html>

Polymer.dart Observables and Data Binding with Web UI https://www.dartlang.org/webui/observables/

Overview of data binding in Web UI  Web UI helps you efficiently bind application data to HTML, and vice versa, with observables and observers.  Observables are variables, fields, or collections that can be observed for changes.  Observers are functions that run when an observable changes.

Efficiently tracking changes  Instead of asking every possible observable ―Did you change?‖ on every event loop over and over, Web UI has an efficient mechanism to notify only the right observers at the right time.

Observing variables <body> <h1>Hello Web UI</h1> <p>Web UI is {{ superlative }}</p> <button id="change-it" on-click="changeIt()"> Change</button> <script type="text/javascript" src="dart.js‖> </script> <script type="application/dart" src="hello_world.dart"> </script> </body>

library hello_world; import 'package:web_ui/web_ui.dart'; @observable String superlative = 'awesome'; const List<String> alternatives = const <String>['wicked cool', 'sweet', 'fantastic', 'wonderful']; int _alternativeCount = 0; String get nextAlternative => alternatives[_alternativeCount++ % alternatives.length]; changeIt() { superlative = nextAlternative; } main() { }

Observing classes <p>Hello {{person.name}}!</p> <p><button on-click="newName()"> Change Name</button></p> @observable class Person { String name; Person(this.name); } final Person person = new Person('Bob'); const List<String> names = const <String> ['Sally', 'Alice', 'Steph']; newName() { person.name = nextName; }

Observing collections final List<DateTime> timestamps = toObservable([]); void addTimestamp() { timestamps.add(new DateTime.now()); } void clear() { timestamps.clear(); }

Observing nested objects @observable class Person { String name; Address address; } @observable class Address { String city; } @observable Person person; main() { person = new Person() ..name = 'Clark Kent' ..address = ( new Address() ..city = 'Metropolis' ); }

Expression Observers <p>The time is <span id="msg"></span></p> <p><button on-click="updateMsg()"> Update</button></p> @observable String msg; updateMsg() { msg = new DateTime.now().toString(); } main() { observe(() => msg, (_) { query('#msg').text = msg; }); }

Part II Reactive Extensionの基礎

Reactive Extension (Rx) by Microsoft http://msdn.microsoft.com/enus/data/gg577609.aspx

Reactive Extensionとは?  ―The Reactive Extensions (Rx) is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.‖  「Reactive Extensionは、非同期かつイベン ト・ベースのプログラムをオブザーバブル・ コレクションとLINQスタイルの検索操作を利 用して構成する為のライブラリーである。」

講演資料  2010年 11月 DevCamp Keynote ―Rx: Curing your asynchronous programming blues‖ http://channel9.msdn.com/Blogs/codefest/DC2010T010 0-Keynote-Rx-curing-your-asynchronous-programmingblues  2012年 6月 TechEd Europe ―Rx: Curing your asynchronous programming blues‖ http://channel9.msdn.com/Events/TechEd/Europe/2012/ DEV413

Rx オープンソース化と移植  2012年11月 MS Open Tech からオープンソ ースとして公開。 Rx.NET :https://github.com/ReactiveExtensions/Rx.NET  あわせて、JavaScript、C++、Python、Ruby への移植版もオープンソースとして公開。     RxJS : https://github.com/Reactive-Extensions/RxJS RxCpp :https://github.com/Reactive-Extensions/RxCpp RxPy : https://github.com/Reactive-Extensions/RxPy Rx.rb : https://github.com/Reactive-Extensions/Rx.rb

RxJava  Netflixは、Rx.NETをJavaに移植し、オープ ンソースとして公開した。 RxJava: https://github.com/Netflix/RxJava/  Netflixは、自社のシステムをRxJavaを用い て書き換え、大規模システムでのReactive プ ログラミングの有効性を示した。  2013年2月 ―Functional Reactive in the Netflix API with RxJava‖ http://techblog.netflix.com/2013/02/rxjavanetflix-api.html

RxJavaの移植によって、JVM上で走る、Clojure, Scala, Groovy, Jruby等の言語でもReactive Extension が走るようになった。

LINQとEnumerable データソースの抽象化として、.NET 3.0 から導入されたLINQで、中心的な役割を 果たすのは、Enumerableインターフェー スである。 http://code.msdn.microsoft.com/101-LINQSamples-3fb9811b

LINQの最も重要なインターフェース Enumerable 全てのCollectionクラスは、IEnumerableを実装している interface IEnumerable<out T> { IEnumerator<T> Enumerator(); } interface IEnumerator<out T> : IDisposable { bool Next(); T Current { get; } void Reset(); }

LINQで利用出来るオペレータ Projection: Select, SelectMany Restriction: Where Partitioning: Take, Skip, TakeWhile, ... Ordering: OrderBy, OrderByDecsending, ... Grouping: GroupBy Set: Distinct, Union, Intersect, .. Conversion: ToList, ToArray, .. Generation: Range, Repeat Aggregate: Count, Sum, Min, Max, Average, ....

LINQは、SQLライクな検索構文で、データソースから データソースを導出することが出来る public void Linq1() { int[] numbers = { 5, 4, 1, 3, 9, 8, 6, 7, 2, 0 }; var lowNums = from n in numbers where n < 5 select n; } Console.WriteLine("Numbers < 5:"); foreach (var x in lowNums) { Console.WriteLine(x); }

LINQでは、検索式の中にラムダ式を利用出来る public void Linq5() { string[] digits = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" }; var shortDigits = digits.Where( (digit, index) => digit.Length < index); } Console.WriteLine("Short digits:"); foreach (var d in shortDigits) { Console.WriteLine( "The word {0} is shorter than its value.", d); }

SelectMany (多段のSelect) public void Linq14() { int[] numbersA = { 0, 2, 4, 5, 6, 8, 9 }; int[] numbersB = { 1, 3, 5, 7, 8 }; var pairs = from a in numbersA from b in numbersB where a < b select new { a, b }; } Console.WriteLine("Pairs where a < b:"); foreach (var pair in pairs) { Console.WriteLine( "{0} is less than {1}", pair.a, pair.b); }

SelectMany (多段のSelect) public void Linq15() { List<Customer> customers = GetCustomerList(); var orders = from c in customers from o in c.Orders where o.Total < 500.00M select new { c.CustomerID, o.OrderID, o.Total }; } ObjectDumper.Write(orders);

SelectMany (多段のSelect) public void Linq18() { List<Customer> customers = GetCustomerList(); DateTime cutoffDate = new DateTime(1997, 1, 1); var orders = from c in customers where c.Region == "WA" from o in c.Orders where o.OrderDate >= cutoffDate select new { c.CustomerID, o.OrderID };

public void Linq41() { string[] words = { "blueberry", "chimpanzee", "abacus", "banana", "apple", "cheese" }; var wordGroups = from w in words group w by w[0] into g select new { FirstLetter = g.Key, Words = g }; } foreach (var g in wordGroups) { Console.WriteLine( "Words that start with the letter '{0}':", g.FirstLetter); foreach (var w in g.Words) Words that start with the letter 'b': blueberry { banana Console.WriteLine(w); Words that start with the letter 'c': chimpanzee } cheese } Words that start with the letter 'a': abacus apple

Reactive Extensionと Observable Rxで中心的な役割を果たすのは、イベン トのソースを抽象化したObservableであ る。それは、データソースの抽象化として 、LINQで中心的な役割を果たす Enumerableとよく似ている。

ObservableとObserver  Observableは、Enumerableと同様に、デー タの集まり。それは相互に変換可能。  Observableには時間の概念が入っている。 それは時間とともにデータを生み出す。イベ ントの抽象に用いられる時に便利である。  データ・ソースとしてのObservableが生成す るデータは、ObservableにObserverが Subscribeした時に、はじめて利用出来る。  データはObservableからSubscribeの時に指 定したObserver上の関数に自動的に送り出さ れる。(Push)

Observer上の三つの関数 OnNext, OnError, OnComplete  ObservableからObserverへデータがpushさ れる時に、次の三つの関数のみが利用される 。  OnNext(): 通常のデータの受け渡し。  OnError(): エラーが発生したこと伝える。  OnComplete(): 送るべきデータが終了したこ とを伝える。  これらの関数の定義は、Observableへの Subscribeの際に与えられねばならない。  ObservableへのSubscribeは、 Subscriptionを返す。この値は、Sunscribe

EnumarableとObservable Enumerable どちらもデータの集りである Observable 時間の流れ

EnumerableとObservableは、 相互に変換出来る Enumerable  Observable .ToObservable(); Observable  Enumerable .ToEnumerable()

Rxの最も重要なインターフェース Observable interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); } interface IObserver<in T> { void OnNext(T value); void OnError(Exception ex); void OnCompleted(); } Observerが 実装する3つの タイプのメソッド

ObservableからObserverへの通知 OnNext (42) OnNext (43) OnNext (―Hello‖) OnCompleted OnError (error) OnNext* (OnError | OnCompleted)?

// Range()は、次のようなObservablを生成する // -1-2-3-4-5-6-7-8-9-10-| IObservable<int> source = Observable.Range(1, 10); // ここでは、Subscribe()は、三つの引数を取っている。 // 順番に、OnNext,OnError,OnCompleteの定義である IDisposable subscription = source.Subscribe( Obser x => Console.WriteLine("OnNext: {0}", x), verが ex => Console.WriteLine("OnError: {0}", 実装する ex.Message), 3つの () => Console.WriteLine("OnCompleted") タイプの ); メソッド Console.WriteLine("Press ENTER to unsubscribe..."); Console.ReadLine(); // Subscribeをやめる subscription.Dispose();

internal class Generate_Simple { private static void Main() { var observable = Observable.Generate(1, // 初期値 -1-2-3-4-5-| x => x < 6, // 条件 x => x + 1, // 増分 x => x, // 返り値 x=>TimeSpan.FromSeconds(1)).Timestamp(); // ここでは、SubscribeにはOnNextの定義だけが与えられている using (observable.Subscribe(x => Console.WriteLine( "{0}, {1}", x.Value, x.Timestamp))) { Console.WriteLine("Press any key to unsubscribe"); Console.ReadKey(); } } } Console.WriteLine("Press any key to exit"); Console.ReadKey();

class Select_Simple { static void Main() { // 一秒間隔で数列を生成する var oneNumberPerSecond = Observable. Interval(TimeSpan.FromSeconds(1)); var numbersTimesTwo = // 新しいObservableの生成 from n in oneNumberPerSecond select n * 2; // LINQが使える! } } Console.WriteLine("Numbers * 2:"); numbersTimesTwo.Subscribe(num => { Console.WriteLine(num); } // OnNextの定義が引数で与えられている ); Console.ReadKey();

MoveNex t IEnumerable<T> IEnumerator<T> Environment Have next! IObservable<T> IObserver<T> Reactive Got next? OnNext Interactive Application

Observableを使ったEvent処理

る マウスは、イベントのデータベー ス! var lbl = new Label(); var frm = new Form { Controls = { lbl } }; var moves = Observable. FromEvent<MouseEventArgs>(frm, "MouseMove"); using (moves.Subscribe (evt => { lbl.Text = evt.EventArgs.Location.ToString(); })) { Application.Run(frm); }

var txt = new TextBox(); var frm = new Form { Controls = { txt } }; var moves = Observable.FromEvent<MouseEventArgs> (frm, "MouseMove"); var input = Observable.FromEvent<EventArgs> (txt, "TextChanged"); var movesSubscription = moves.Subscribe( evt => { Console.WriteLine("Mouse at: " + evt.EventArgs.Location); }); var inputSubscription = input.Subscribe( evt => { Console.WriteLine("User wrote: " + ((TextBox)evt.Sender).Text); }); using (new CompositeDisposable( movesSubscription, inputSubscription)) { Application.Run(frm); }

var moves = from evt in Observable. FromEvent<MouseEventArgs>(frm, "MouseMove") select evt.EventArgs.Location; var input = from evt in Observable. FromEvent<EventArgs>(txt, "TextChanged") select ((TextBox)evt.Sender).Text; var movesSubscription = moves.Subscribe( pos => Console.WriteLine("Mouse at: " + pos)); var inputSubscription = input.Subscribe( inp => Console.WriteLine("User wrote: " + inp)); var overFirstBisector = from pos in moves where pos.X == pos.Y select pos; var movesSubscription = overFirstBisector.Subscribe( pos => Console.WriteLine("Mouse at: " + pos));

違った値が来た時のみデータを送る var input = (from evt in Observable.FromEvent<EventArgs>(txt, "TextChanged") select ((TextBox)evt.Sender).Text) .DistinctUntilChanged(); Doは、Observablを変化させずに、OnNextを実行する var input = (from evt in Observable.FromEvent<EventArgs>(txt, "TextChanged") select ((TextBox)evt.Sender).Text) .Do(inp => Console.WriteLine("Before DistinctUntilChanged: " + inp)) .DistinctUntilChanged(); Throttleは指定された時間の間、Observableのイベントを抑制する TimeSpan.FromSecond(1)は、一秒間の指定 var input = (from evt in Observable.FromEvent<EventArgs>(txt, "TextChanged") select ((TextBox)evt.Sender).Text) .Throttle(TimeSpan.FromSeconds(1)) .DistinctUntilChanged(); using (input.ObserveOn(lbl).Subscribe(inp => lbl.Text = inp)) Application.Run(frm);

Reactive Extension for JavaScript

var source = null; // Observableとしてsourceを生成するコード(省略) var subscription = source.Subscribe( // 三つの関数の定義が順番に引数に与えられている function (next) { $("<p/>").html("OnNext: " + next) .appendTo("#content"); }, function (exn) { $("<p/>").html("OnError: " + exn) .appendTo("#content"); }, function () { $("<p/>").html("OnCompleted") .appendTo("#content"); } );

var source = Rx.Observable.Empty(); OnCompleted var source = Rx.Observable.Throw("Oops!"); OnError: Oops var source = Rx.Observable.Return(42); OnNext: 42 OnCompleted var source = Rx.Observable.Range(5,3); OnNext: 5 OnNext: 6 OnNext: 7 OnCompleted

var source = Rx.Observable.Generate( 0, function (i) { return i < 4; }, function (i) { return i + 1; }, // Like a for loop function (i) { return i * i; } ); OnNext: 0 OnNext: 1 OnNext: 4 OnNext: 9 OnCompleted

RxJS: DOMイベントをObservableに

$(document).ready(function () { $(document).mousemove(function (event) { // A position tracking mechanism, // updating a paragraph called ―content‖ $("<p/>") .text("X: " + event.pageX + " Y: " + event.pageY) .appendTo("#content"); }); jQuery }); RxJS $(document).ready(function () { $(document).toObservable("mousemove") .Subscribe(function (event) { // A position tracking mechanism, // updating a paragraph called ―content‖ $("<p/>") .text("X: " + event.pageX + " Y: " + event.pageY) .appendTo("#content"); }); });

$(document).ready(function () { $(document).toObservable("mousemove") .Subscribe(function (event) { $("<p/>") .text("X: " + event.pageX + " Y: " + event.pageY) .appendTo("#content"); }); $("#textbox").toObservable("keyup") .Subscribe(function (event) { $("<p/>") .text("User wrote: " + $(event.target).val()) .appendTo("#content"); }); });

var moves = $(document).toObservable("mousemove") .Select(function(event) { return { pageX : event.pageX, pageY : event.pageY }; }); var inputs = $(―#textbox‖).toObservable(‖keyup‖) .Select(function(event) { return $(event.target).val(); });

var movesSubscription = moves.Subscribe( function (pos) { $("<p/>‖) .text("X: " + pos.pageX + " Y: " + pos.pageY) .appendTo("#content"); }); var inputSubscription = input.Subscribe( function (text) { $("<p/>") .text("User wrote: " +text) .appendTo("#content"); });

var overFirstBisector = moves .Where(function(pos) { return pos.pageX === pos.pageY; }); var movesSubscription = overFirstBisector .Subscribe(function (pos) { $("<p/>") .text("Mouse at: "+pos.pageX+","+pos.pageY) .appendTo("#content"); });

var inputs = $(―#textbox‖).toObservable(‖keyup‖) .Select(function(event) { return $(event.target).val(); } .DistinctUntilChanged(); ); var inputSubscription = input.Subscribe( function (text) { $("<p/>") .text("User wrote: " +text) .appendTo("#content"); });

var inputs = $(―#textbox‖).toObservable(‖keyup‖) .Select(function(event) { return $(event.target).val(); } .Throttle(1000) .DistinctUntilChanged(); ); var inputSubscription = input.Subscribe( function (text) { $("<p/>") .text("User wrote: " +text) .appendTo("#content"); });

Observableと非同期処理

.NETの非同期プログラミング FileStream fs = File.OpenRead("data.txt"); byte[] bs = new byte[1024]; fs.BeginRead(bs, 0, bs.Length, new AsyncCallback(iar => { int bytesRead = fs.EndRead(iar); // Do something with bs[0..bytesRead-1] }), null );

Rxでの非同期処理 FileStream fs = File.OpenRead("data.txt"); Func<byte[], int, int, IObservable<int>> read = Observable.FromAsyncPattern<byte[], int, int, int>( fs.BeginRead, fs.EndRead); byte[] bs = new byte[1024]; read(bs, 0, bs.Length).Subscribe(bytesRead => { // Do something with bs[0..bytesRead-1] });

非同期サンプル ネットワーク上のサービスの利用 Asynchronous request React TextChanged Reaction Reactive Reactor Data binding on UI thread Dictionary web service

// IObservable<string> from TextChanged events var changed = Observable.FromEvent<EventArgs>(txt,"TextChanged"); var input = (from text in changed select ((TextBox)text.Sender).Text); .DistinctUntilChanged() .Throttle(TimeSpan.FromSeconds(1)); // Bridge with the dictionary web service var svc = new DictServiceSoapClient(); var lookup = Observable.FromAsyncPattern<string,DictionaryWord[]> (svc.BeginLookup, svc.EndLookup); // Compose both sources using SelectMany var res = from term in input from words in lookup(term) select words;

function searchWikipedia(term) { return $.ajaxAsObservable( { url: "http://en.wikipedia.org/w/api.php", dataType: "jsonp", data: { action: "opensearch", search: term, format: "json" } }) .Select(function (d) { return d.data[1]; }); } var searchObservable = searchWikipedia("react"); var searchSubscription = searchObservable.Subscribe( function (results) { $("#results").empty(); $.each(results, function (_, result) { $("#results").append("<li>" + result + "</li>"); }); }, function (exn) { $("#error").text(error); } );

var searchObservable = terms.SelectMany( function (term) { return searchWikipedia(term); } ); var searchObservable = terms.SelectMany(searchWikipedia); var res = from term in terms from words in searchWikipedia(term) select words;

Functional Reactive Programming on Android With RxJava http://mttkay.github.io/blog/2013/08 /25/functional-reactiveprogramming-on-android-withrxjava/

現在のAsyncTaskを使った処理 class DownloadTask extends AsyncTask<String, Void, File> { protected File doInBackground(String... args) { final String url = args[0]; try { byte[] fileContent = downloadFile(url); File file = writeToFile(fileContent); return file; } catch (Exception e) { // ??? } } } protected void onPostExecute(File file) { Context context = getContext(); // ??? Toast.makeText(context, "Downloaded: " + file.getAbsolutePath(), Toast.LENGTH_SHORT) .show(); }

private Observable<File> downloadFileObservable() { return Observable.create(new OnSubscribeFunc<File>() { @Override public Subscription onSubscribe( Observer<? super File> fileObserver) { try { byte[] fileContent = downloadFile(); File file = writeToFile(fileContent); fileObserver.onNext(file); fileObserver.onCompleted(); } catch (Exception e) { fileObserver.onError(e); } } } }); return Subscriptions.empty();

class MyFragment extends Fragment implements Observer<File> { private Subscription subscription; @Override protected void onCreate( Bundle savedInstanceState) { subscription = AndroidObservables .fromFragment(this,downloadFileObservable()) .subscribeOn(Schedulers.newThread()) .subscribe(this); } private Observable<File> downloadFileObservable() { /* as above */ } @Override protected void onDestroy() { subscription.unsubscribe(); }

Observable<String> filePathObservable = downloadFileObservable().map(new Func1<File, String>() { @Override public String call(File file) { return file.getAbsolutePath(); } }); // now emits file paths, not `File`s subscription = filePathObservable.subscribe( /* Observer<String> */);

public void onNext(File file) { Toast.makeText(getActivity(), "Downloaded: " + file.getAbsolutePath(), Toast.LENGTH_SHORT) .show(); } public void onCompleted() {} } public void onError(Throwable error) { Toast.makeText(getActivity(), "Download failed: " + error.getMessage(), Toast.LENGTH_SHORT) .show(); }

RxJava on Android with Scala => AWESOME http://pommedeterresautee.blogspot. jp/2013/11/rxjava-on-android-withscala-awesome.html

def getAsyncUrl(urlString: String) :Observable[Option[String]] = Observable { (observer: Observer[Option[String]]) => { val url = new URL(urlString) val urlCon = url.openConnection() urlCon.setConnectTimeout(5000) urlCon.setReadTimeout(5000) val io = Source.fromInputStream( urlCon.getInputStream) val result = Option(io.getLines(). mkString.split("n")(0)) io.close() observer.onNext(result) observer.onCompleted() Subscription() } }

val mUnsubscribe = CompositeSubscription() mUnsubscribe += getAsyncUrl(lastScript) .execAsync.subscribe( _ match { case OnNext(Some(t)) => CInfo(t) case OnNext(None) => CAlert(getString( R.string.connection_error_message, "Server error")) case OnError(e) => CAlert(getString(R.string.connection_ error_message, e.getLocalizedMessage)) } )

Reactive extensions for .NET (Rx .NET) for Windows Phone http://msdn.microsoft.com/enus/library/windowsphone/develop/ff4 31792%28v=vs.105%29.aspx

Part III Observableを活用する

Observableのストリームを 処理する Rx サンプル

Stock Trade Analysis MSFT 27.01 INTC 21.75 MSFT 27.96 ticks from tick in ticks MSFT 31.21 INTC 22.54 INTC 20.98 MSFT 30.73

Stock Trade Analysis MSFT 27.01 INTC 21.75 MSFT 27.96 MSFT 31.21 INTC 22.54 INTC 20.98 MSFT 30.73 ticks 27. 01 27. 96 30. 73 31. 21 MSFT 21. 75 INTC from tick in ticks group tick by tick.Symbol 22. 54 20. 98

Stock Trade Analysis MSFT 27.01 INTC 21.75 MSFT 27.96 MSFT 31.21 INTC 22.54 INTC 20.98 MSFT 30.73 ticks [27.01, 27.96] [27.96, 3 1.21] [31.21, 3 0.73] MSFT [21.75, 22.54] INTC from tick in ticks group tick by tick.Symbol into company from openClose in company.Buffer(2, 1) [22.54, 20.98]

Stock Trade Analysis MSFT INTC MSFT INTC Stock 21.75 MSFT Analysis Trade 27.01 27.96 31.21 22.54 INTC 20.98 MSFT 30.73 ticks 0.0 34 0.0 15 0.1 04 MSFT 0.0 36 INTC 0.0 69 from tick in ticks group tick by tick.Symbol into company from openClose in company.Buffer(2, 1) let diff = (openClose[1] – openClose[0])/openClose[0]

Stock Trade Analysis MSFT 27.01 INTC 21.75 MSFT 27.96 MSFT 31.21 0.0 34 INTC 22.54 INTC 20.98 0.1 04 MSFT 30.73 ticks 0.0 15 MSFT 0.0 36 INTC 0.0 69 from tick in ticks group tick by tick.Symbol into company from openClose in company.Buffer(2, 1) let diff = (openClose[1] – openClose[0])/openClose[0] where diff > 0.1

Stock Trade Analysis MSFT 27.01 INTC 21.75 MSFT 27.96 MSFT 31.21 INTC 22.54 INTC 20.98 MSFT 30.73 ticks res Company = MSFT Increase = 0.104 from tick in ticks group tick by tick.Symbol into company from openClose in company.Buffer(2, 1) let diff = (openClose[1] – openClose[0]) / openClose[0] where diff > 0.1 select new { Company = company.Key, Increase = diff }

Observableを変形・加工する 合成可能な関数を利用する

Composable Functions  Transform: map, flatmap, reduce, Scan ...  Filter: take, skip, sample, takewhile, filter ...  Combine: concat, merge, zip, combinelatest, multicast, publish, cache, refcount ...  Concurrency: observeon, subscribeon  Error Handling: onErrorreturn, onErrorResume ...

map def map[B](f: A⇒B): Observable[B]

filter def filter(p: A⇒Boolean): Observable[A]

merge def merge(Observable[A],Obserbal[A]): Observable[A]

flatmap def flatMap(f: T=>Observable[S]): Observable[S] = { map(f).flatten }

flatmap val xs: Observable[Int] = Observable(3,2,1) val yss: Observable[Observable[Int]] = xs.map(x => Observable.Interval(x seconds) .map(_=>x).take(2)) val zs: Observable[Int] = yss.flatten() xs: Observable[Int] yss: Observable [Observable[Int]]] merge zs: Observable[Int]

concat

val xs: Observable[Int] = Observable(3,2,1) val yss: Observable[Observable[Int]] = xs.map(x => Observable.Interval(x seconds) .map(_=>x).take(2)) val zs: Observable[Int] = yss.concat() xs: Observable[Int] yss: Observable [Observable[Int]]] zs: Observable[Int]

zip

groupBy def groupBy[K](keySelector: T⇒K) : Observable[(K,Observable[T])]

startWith def startWith(ss: T*): Observable[T]

reduce def reduce(f:(T, T) ⇒T): Observable[T]

ServerサイドでのRxの利用 Netflix --- RxJava ―a library for composing asynchronous and event-based programs using observable sequences for the Java VM‖

RxJava Java 8へのRxの移植 Observable.toObservable("one‖,"two‖,"three") .take(2) .subscribe( (arg) -> { System.out.println(arg); } ); https://github.com/Netflix/RxJava

Netflix  Netflixは、北米のインターネットのダウンロ ード・トラフィックの33% を占める。  Netflixの顧客は、一月に10億時間以上、テレ ビを見ている。  APIのトラフィックは、2010年の2000万/一 日から、2013年には 20億/一日に成長した。

一日の Netflix APIへのリクエスト数

 クライアントのデバイスは、Netflixの二つの 主要なサービスに接続する。第一のサービス は、ビデオの検索やそのコンテンツの紹介に 関係した機能を提供するNetflix APIである。 第二のサービスは、ビデオ・ストリームの再 生サービスである。  ここでは、ビデオの検索・発見にかかわる Netflix APIについて見ていく。

Rxの発見は、システムの Re-Atchitectureから始まった

ネットワーク・トラフィックは、 APIの改良で、劇的に改善した 9つあったネットワークの呼び出しは 1つになり、WANでのネットワーク 遅延は一回だけになった クライアントのロジ ックは、サーバに移 され、20以上の呼び 出しが削除された

関連情報  Netflix API architecture http://techblog.netflix.com/search/label/api https://speakerdeck.com/benjchristensen/  Re-architecture http://techblog.netflix.com/2013/01/optimiz ing-netflix-api.html  Hystrix https://github.com/Netflix/Hystrix

NetflixでのObservableの利用 Netflixでは、ブロックするAPIを Observableを返す非同期のAPIに作り替え 、ついで、それらのObservableを返すAPI を組み合わせてサービスを構成した。

Observableを作る Observable.create({ observer try { observer.onNext(new ....)) observer.onCompleted(); } catch(Exception e) { observer.onError(e); } }) ‐> Observable<T> create(Func1<Observer<T>, Subscription> func)

def Observable<VideoRating> getRating(userId, videoId) { return Observable.create({ observer ‐> executor.execute(new Runnable() { def void run() { try { VideoRating rating = //.. do network call . observer.onNext(rating) observer.onCompleted(); } catch(Exception e) { observer.onError(e); } } }) Asynchronous Observable }) } with Single Values

def Observable<Video> getVideos() { return Observable.create({ observer ‐> executor.execute(new Runnable() { def void run() { try { for ( id in videoIds ) { Video v = // .. do network call ... observer.onNext(v) } observer.onCompleted(); } catch(Exception e) { observer.onError(e); } } Asynchronous Observable }) }) with Multiple Values }

Observableを組み合わせて、サービスを作る こうした情報を返すサービス

Observableを組み合わせて、サービスを作る Observable<Video>は、OnNextにn個のビデオを送り出す

def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) } Observable<Video>は、OnNextにn個のビデオを送り出す

def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // リストの最初の10本だけが欲しい .take(10) } 最初の10個を取って、あとはunscribeする。 10個のビデオを送り出す Observable<Video>を返す。

Observableを組み合わせて、サービスを作る 最初の10個を取って、あとはunscribeする。 10個のビデオを送り出す Observable<Video>を返す。

Observableを組み合わせて、サービスを作る unscribe 最初の10個を取って、あとはunscribeする。 10個のビデオを送り出す Observable<Video>を返す。

def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // リストの最初の10本だけが欲しい .take(10) .map( { Video video -> // Videoオブジェクトを変形する }) } map operatorは、入力値を他の出力値に変える

Observable<R> b = Observable<T>.map({ T t -> R r = ... transform t ... return r; })

def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // リストの最初の10本だけが欲しい .take(10) .flatMap( { Video video -> // それぞれのビデオからメタデータを取り出す def m = video.getMetaData() .map( { Map<String, String> md -> return [ title: md.get(―title‖), length: md.get(―duration‖)] }) // ブックマークとレイティングも def b .... def r .... }) } ここでは、flatMap / mapMany を使う。 mapでは Observable<T>の 要素一つが、型 Rの要素一つに変わったが、flatMap / mapManyでは、 Observable<T>の要素一つ一つが、別のObservable<R>に変わる。

Observableを組み合わせて、サービスを作る

def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // リストの最初の10本だけが欲しい .take(10) .mapMany( { Video video -> // それぞれのビデオからメタデータを取り出す def m = video.getMetaData() .map( { Map<String, String> md -> return [ title: md.get(―title‖), length: md.get(―duration‖)] }) // ブックマークとレイティングも def b .... mapManyは、三つの関数の定義に応じて def r .... Observable<Video>から、次の三つの }) Observableを生成する。 } Observable<VideoMetadata> Observable<VideoBookmark> Observable<VideoRating>

Observableを組み合わせて、サービスを作る 一つのVideo ‗v‘に対して getMetaData()は、 Observable<MetaData>を返す

def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // リストの最初の10本だけが欲しい .take(10) .mapMany( { Video video -> // それぞれのビデオからメタデータを取り出す def m = video.getMetaData() .map( { Map<String, String> md -> return [ title: md.get(―title‖), length: md.get(―duration‖)] }) // ブックマークとレイティングも def b .... def r .... }) } それぞれのObservableは、mapを利用して データを変換する

Observableを組み合わせて、サービスを作る Observable<MetaData>は、map関数で変形される

Observableを組み合わせて、サービスを作る Observable<Bookmark>、Observable<rating>も 同様に処理される

def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // リストの最初の10本だけが欲しい .take(10) .mapMany( { Video video -> // それぞれのビデオからメタデータを取り出す def m = video.getMetaData() .map( { Map<String, String> md -> return [ title: md.get(―title‖), length: md.get(―duration‖)] }) // ブックマークとレイティングも def b .... def r .... // これらを結合する }) }

def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // リストの最初の10本だけが欲しい .take(10) .flatmap( { Video video -> // それぞれのビデオからメタデータを取り出す def m ... def b .... def r .... // これらを結合する }) }

def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // リストの最初の10本だけが欲しい .take(10) .flatmap( { Video video -> // それぞれのビデオからメタデータを取り出す def m ... def b .... def r .... // これらを結合する return Observable.zip( m, b, r, { metadata, bookmark, rating -> // 完全なデータの辞書に変換する return [ id: video.videoId ] << metadata << bookmark << rating }) zipは、三つの非同期のObservableを }) 一つのObservableにまとめる }

Observableを組み合わせて、サービスを作る

Observableを組み合わせて、サービスを作る

public Data getData(); を考える -- NetFlix RxJava もしも、同期型から非同期型に変わったら 、実装はどう変化するだろうか? クライアントは、ブロックしないように実 行するには、どうすればいいだろうか?

getDataの実装の候補 public Data getData(); public void getData(Callback<T> c); public Future<T> getData(); public Future<List<Future<T>>> getData();  ...     ?

IterableとObservable Iterable Observable Pull Push T next() onNext[T] throw Exception onError[Exception] returns; onCompletion

IterableとObservable  同じ高階関数が、IterableにもObservableに も、同じように適用出来る。

getData()の分類  同期/非同期、返り値の単数/複数で表を作っ てみると、getData()の次のような分類が可 能である。

String s = getData(args); if (s.equals(x)) { // do something } else { // do something else } scalarを返り値に持つ、典型的な同期型 getData()のコード

Iterable<String> values = getData(args); for (String s : values) { if (s.equals(x)) { // do something } else { // do something else } } scalar値が返る場合とほとんど同じだが、処理はループする

Future<String> s = getData(args); if (s.get().equals(x)) { // do something } else { // do something else } 古いJavaのFutureコード。非同期だが、getでブロックする。

Future<String> s = getData(args); Futures.addCallback(s, new FutureCallback<String> { public void onSuccess(String s) { if (s.get().equals(x)) {...} else {...} } public void onFailure(Throwable t) { // handle error } }, executor); Futureとcallbackをつかった非同期のコード・サンプル

Observable<String> s = getData(args); s.map({ s -> if (s.get().equals(x)) { // do something } else { // do something else }); Observableを使ったgetData()のコード・サンプル

IterableとObservableの「双対 性」  Observable/Observer は、同期型の Iterable/Iterator の非同期の双対形である。  ―Subject/Observer is Dual to Iterator‖ Erik Meijer‖http://csl.stanford.edu/~christos/pldi2010.fit/ meijer.duality.pdf  ―Introduction to the Reactive Framework Part II ‖ http://codebetter.com/matthewpodwysocki/2009/11/03/ introduction-to-the-reactive-framework-part-ii/

Part IV Reactive Manifesto

Reactive Manifesto 2013年9月23日 version 1.1 http://www.reactivemanifesto.org/

Reactiveに移行する必要性  近年、アプリケーションの要請は劇的に変わってきた 。ほんの数年前には、大規模なアプリケーションは、 数十台のサーバ、数秒のレスポンスタイム、数時間の オフライン・メンテナンス、数ギガバイトのデータか ら構成されていた。今日、アプリケーションは、モバ イル・デバイスから数千のマルチコア・プロセッサー が走るクラウド・ベースのクラスターにいたるまであ らゆるところに配備されている。ユーザはミリセカン ドさらにはナノセコンドのレスポンスタイムと、 100%の稼働を期待している。データの要求は、ペタ バイトに拡大しようとしている。

 当初は、GoogleやTwitterといった革新的なインター ネット・ドリブンの企業の領域だけだったが、こうし たアプリケーションの特徴は、ほとんどの産業で表面 化した。金融や通信事業者が、最初に新しい要請を満 たす実践を受け入れ、その他の産業がそれに続いた。  新しい要請は、新しいテクノロジーを要求する。それ 以前のソリューションは、管理されたサーバとコンテ ナに力点を置いたものだった。規模拡大は、より大き なサーバを買うことと、マルチスレッドによる並行処 理を通じて達成された。追加されたサーバは、複雑で 非効率かつ高価なプロプライエトリなソリューション を通じて追加された。

 しかし今では新しいアーキテクチャーが進化し、開発 者が今日の要求を満たすアプリケーションを概念的に 把握し、アプリを構築するのを可能にしている。我々 は、これらをリアクティブ・アプリケーションと呼ん でいる。このアプリケーションは開発者が、イベント ・ドリブンで、拡張性を持ち、耐障害性の高い、応答 性に優れたシステムを構築することを可能とする。す なわち、このアプリケーションは、リアルタイム感に 富み、拡張可能で耐障害性の高いアプリケーション層 で支えられ、マルチコアでもクラウド・コンピュータ のアーキテクチャでもすぐに配備されうる、高い応答 性のユーザ体験を提供出来るアプリケーションである 。 このReactive Manifestoは、リアクティブに移行 するのに必要な本質的な諸特徴を記述するものである 。

Reactive アプリケーション  辞書のMerriam-Websterは、reactiveを「刺激に対 してすぐに反応できること」と定義している。すなわ ち、そのコンポーネントは「アクティブ」で、常にイ ベントを受け取る準備ができているということである 。この定義は、リアクティブなアプリケーションの本 質を捉えている。それはシステムの次のようなところ に焦点を合わせている。

 react to events イベントに反応する イベント・ドリブンの性質は、次のような特 質を可能にする  react to load 負荷に反応する 共有リソースの競合を避けることで、拡張可 能性にフォーカスする  react to failure 失敗に反応する 全てのレベルでエラー回復の能力を持たせる ことで、耐障害性の高いシステムを構築する  react to users ユーザに反応する 負荷のいかんにかかわらず、高い反応時間を 保証する

 これらの一つ一つが、リアクティブなアプリケーショ ンの本質的な特徴である。一方で、それらのあいだに は、相互に従属関係が存在する。ただ、これらの特徴 は、標準的な階層型のアプリケーションの意味での「 層」にあたるものではない。その代わりに、それらは 全ての技術をつらぬいた設計の諸特徴を記述したもの である。

Event-driven 重要なこと  非同期通信に基づいたアプリケーションは、疎結合デ ザインを実装する。それは、純粋に同期型のメソッド 呼び出しに基づいたアプリケーションよりもいいもの である。イベントの送り手と受け手は、如何にイベン トが伝播するのかの詳細を考えなくても実装されるこ とが出来る。インターフェースは通信の内容に集中出 来る。このことで、拡張も改良も保守も、より容易な 実装ができるようになり、開発の柔軟性を増やすと同 時に保守コストを低減出来る。

 非同期通信のメッセージの受け手は、イベントが起き るまで、あるいは、メッセージを受け取るまで、眠っ ていることが出来

Add a comment

Comments

versace ??? ??????? | 22/01/15
???? ?? ?? versace ??? ??????? http://www.birminghamcitydogshow.co.uk/year/Social_versace.html

Related presentations

Related pages

Reactive programming - Wikipedia

In computing, reactive programming is a programming paradigm oriented around data flows and the propagation of change. This means that it should be ...
Read more

The introduction to Reactive Programming you've been ...

So you're curious in learning this new thing called Reactive Programming, particularly its variant comprising of Rx, Bacon.js, RAC, and others.
Read more

Reaktive Programmierung – Wikipedia

Reactive Programming in .NET Microsofts Reactive-Extensions-Homepage. Deprecating the Observer Pattern Ingo Maier, Tiark Rompf und Martin Odersky; ...
Read more

What is Reactive Programming? - Medium

What is Reactive Programming? To understand Reactive — both the programming paradigm and the motivation behind it — it helps to understand the ...
Read more

ReactiveX

The Observer pattern done right ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming
Read more

Reactive Programming – Mehr als nur Streams und Lambdas ...

Reactive Programming kombiniert viele Bausteine aus Software-Architektur und modernen Techniken, wie asynchrone Streams und Funktionale Programmierung.
Read more

Reactive Programming – vom Hype zum Praxiseinsatz | heise ...

Das Thema "Reactive" ist en vogue. Doch was genau versteht man hinter diesem Programmierstil? Und wie entwickelt man etwas, sodass es als "reactive" gelten ...
Read more

Einfach und effizient: Reactive Programming mit JavaFX ...

Die Nutzer von heute stellen hohe Erwartungen an die Benutzeroberflächen ihrer Programme. Ein sofortiges Feedback auf Eingaben und stets aktuelle ...
Read more

Functional Programming in Scala | Coursera

Explore Functional Programming in Scala Certificate offered by École Polytechnique ... using infinite data structures or functional reactive programming.
Read more

The Reactive Manifesto

The Reactive Manifesto. Published on September 16 2014. (v2.0) Organisations working in disparate domains are independently discovering patterns ...
Read more