Networked pub/sub with UDP, Go, Ruby and ZMQ

100 %
0 %
Information about Networked pub/sub with UDP, Go, Ruby and ZMQ
Technology

Published on March 11, 2014

Author: ismasan

Source: slideshare.net

Description

Building a network of publishers and subscribers with Golang, Ruby, ZMQ (with some Git and Redis in the mix). Presented at London Ruby User Group in March 2014.

Blog post: http://new-bamboo.co.uk/blog/2013/09/17/micro-network-daemons-in-go

Ismael Celis Networked pub/sub with Go, Ruby and ZMQ

bootic.net - Hosted e-commerce in South America

Store frontsTheme editor Checkout Dashboard Image resizer API … and more in the pipeline.

• analytics • automated backups • activity dashboard • audit trail Ancillary services

Also: Play with cool toys

Publisher 1 Publisher 2 Publisher 3 Events hub Events bus pub/sub

Publisher 1 Publisher 2 Publisher 3 Events hub Events bus pub/sub JSON UDP msgpack ZMQ

Events hub Events bus pub/sub JSON UDP msgpack ZMQ Stats Backups Logs ? Websocket

pub / sub

Publisher 1 pub/sub require 'socket'
 
 socket = UDPSocket.new
 
 message = {
 time: Time.now.to_s,
 type: 'pageview',
 app: 'store_fronts',
 data: {
 account: 'acme',
 user: 'Joe Bloggs',
 domain: 'www.acme.com',
 path: '/about/us',
 ua: 'Mozilla/5.0 (Windows NT 6.2; Win64; x64)...'
 }
 }
 
 json = ActiveSupport::JSON.encode(message)
 
 socket.send(json, 0, 'events_hub_host', 5555)

Events hub pub/sub github.com/bootic/bootic_data_collector func (daemon *Daemon) ReceiveDatagrams() {
 
 for {
 buffer := make([]byte, 1024)
 
 daemon.Conn.ReadFromUDP(buffer)
 
 event, err := data.DecodeJSON(buffer[:c])
 daemon.Dispatch(event)
 
 }
 
 panic("should never have got myself into this.")
 }

Events hub pub/sub github.com/bootic/bootic_data_collector // Start up UDP daemon daemon, err := udp.NewDaemon(udpHost)
 
 // Start up PUB ZMQ client
 zmqObserver := fanout.NewZmq(zmqAddress)
 
 // Push incoming UDP events down ZMQ pub/sub socket
 daemon.Subscribe(zmqObserver.Notifier)

Events hub pub/sub github.com/bootic/bootic_data_collector daemon, err := udp.NewDaemon(udpHost)

Events hub pub/sub github.com/bootic/bootic_data_collector type Daemon struct {
 Conn *net.UDPConn
 observers map[string][]data.EventsChannel
 }
 
 func NewDaemon(udpHostAndPort string) (daemon *Daemon, err error) {
 conn, err := createUDPListener(udpHostAndPort)
 
 if err != nil {
 return
 }
 
 daemon = &Daemon{
 Conn: conn,
 observers: make(map[string][]data.EventsChannel),
 }
 
 go daemon.ReceiveDatagrams()
 
 return
 }

Events hub pub/sub github.com/bootic/bootic_data_collector go daemon.ReceiveDatagrams()

Events hub pub/sub github.com/bootic/bootic_data_collector 
 daemon, err := udp.NewDaemon(udpHost)
 
 // Start up PUB ZMQ client
 zmqObserver := fanout.NewZmq(zmqAddress)
 
 // Push incoming UDP events down ZMQ pub/sub socket
 daemon.Subscribe(zmqObserver.Notifier)

Events hub pub/sub github.com/bootic/bootic_data_collector daemon.Subscribe(zmqObserver.Notifier)

Events hub pub/sub github.com/bootic/bootic_data_collector // Start up UDP daemon
 daemon, err := udp.NewDaemon(udpHost)
 
 
 // Setup Websockets server
 wshub := ws.HandleWebsocketsHub("/ws")
 
 // Push incoming UDP messages to multiple listeners
 daemon.Subscribe(wshub.Notifier)
 
 // Start up PUB ZMQ client
 zmqObserver := fanout.NewZmq(zmqAddress)
 
 // Push incoming UDP events down ZMQ pub/sub socket
 daemon.Subscribe(zmqObserver.Notifier)
 
 log.Fatal("HTTP server error: ", http.ListenAndServe(wsHost, nil))

pub / sub

pageviews tracker Events hub 1px tracking .gif JSON / UDP github.com/bootic/bootic_pageviews

Events hub Redis pub sub aggregator HTTP API internets “pageview”

Stats aggregates pub/sub // Setup ZMQ subscriber
 daemon, _ := booticzmq.NewZMQSubscriber(zmqAddress)
 
 // Setup Redis tracker
 tracker, err := redis_stats.NewTracker(redisAddress)
 
 
 // Redis subscribes to these events
 daemon.SubscribeToType(tracker.Notifier, "pageview") github.com/bootic/bootic_stats_aggregates

Stats aggregates pub/sub github.com/bootic/bootic_stats_aggregates year := now.Year()
 month := now.Month()
 day := now.Day()
 hour := now.Hour()
 
 go func () {
 
 // increment current month in year: “track:acme/pageview/2013”
 yearKey := fmt.Sprintf("track:%s/%s/%s", account, evtType, year)
 self.Conn.HIncrBy(yearKey, month, 1)
 
 // increment current day in month: “track:acme/pageview/2013/12”
 monthKey := fmt.Sprintf("track:%s/%s/%s/%s", key, evtType, year, month)
 self.Conn.HIncrBy(monthKey, day, 1)
 
 // increment current hour in day: “track:acme/pageview/2013/12/16”
 dayKey := fmt.Sprintf("track:%s/%s/%s/%s/%s", key, evtType, year, month, day)
 self.Conn.HIncrBy(dayKey, hour, 1)
 
 }()

Stats aggregates pub/sub github.com/bootic/bootic_stats_aggregates GET /api/stats/track/acme/pageview/2013/12/16 {
 "account": "acme",
 "event": "pageview",
 "year": "2013",
 "month": "12",
 "day": "16",
 "data": {
 "0": 2693,
 "1": 1215,
 "2": 341,
 "3": 176,
 "4": 80,
 "5": 89,
 "6": 333,
 "7": 779,
 "8": 1506,
 "9": 2553,
 "10": 3734
 }
 }


Stats aggregates pub/sub github.com/bootic/bootic_stats_aggregates

Git backups pub/sub github.com/bootic/bootic_themes_backup

Events hub pub sub Git backups Themes API “theme” Git

Git git clone tufte:/home/git/git_themes/acme

Git backups pub/sub doneChan := make(chan string)
 bufferChan := make(chan int, 20)
 
 stores := make(map[string]*ThemeStore)
 
 for {
 select {
 case event := <-writer.Notifier:
 account := event.Get("account")
 
 store := stores[account]
 // Register store and start delayed writing 
 // if not already registered
 if store == nil {
 store = NewThemeStore(account)
 stores[account] = store
 go store.DelayedWrite(bufferChan, doneChan)
 }
 
 case account := <-doneChan:
 // A store is done writing. 
 // Un-register it so it can be registered again.
 delete(stores, account)
 }
 } github.com/bootic/bootic_themes_backup

Git backups pub/sub github.com/bootic/bootic_themes_backup go store.DelayedWrite(bufferChan, doneChan) doneChan := make(chan string)
 bufferChan := make(chan int, 20)
 
 …

Git backups pub/sub github.com/bootic/bootic_themes_backup func (store *ThemeStore) DelayedWrite(bufferChan chan int, doneChan chan string) {
 
 time.Sleep(10)
 
 // Start work. This will block if buffer is full.
 bufferChan <- 1
 
 store.Backup()
 
 // Done. Free space in the buffer
 <-bufferChan
 
 doneChan <- store.Account
 }()
 } doneChan := make(chan string)
 bufferChan := make(chan int, 20)
 
 …

The future

The future • Searchable events history • Per-account secure websocket • More stats! • Webhooks

Ismael Celis @ismasan bit.ly/1fYmUff Blog post

bit.ly/1fYmUff Ismael Celis @ismasan bootic/bootic_data_collector bootic/bootic_pageviews bootic/bootic_stats_aggregates bootic/bootic_themes_backup Githubs bootic/bootic_stathat Blog post Ismael Celis @ismasan

Add a comment

Related presentations

Related pages

pub/sub Slide decks | Lanyrd

pub/sub Slide decks. ... Clear. Slides Networked pub/sub with UDP, Go, Ruby and ZMQ ... from Building a SOA network of daemons with Go, Ruby and ZMQ at ...
Read more

A network of data consumers and producers with Go, UDP ...

A network of data consumers and producers with ... of small network daemons using Go, UDP, ... incoming UDP events down ZMQ pub/sub socket ...
Read more

ZMQ Slide decks | Lanyrd

Slides Networked pub/sub with UDP, Go, Ruby and ZMQ (slideshare.net) from Building a SOA network of daemons with Go, ...
Read more

Routing with Ruby & ZeroMQ Devices - igvita.com

Routing with Ruby & ZeroMQ Devices. ... and you are ready to go. ... we can run a pub-sub forwarding device (ZMQ Streamer) ...
Read more

Zmq | LinkedIn

View 75 Zmq posts, presentations, experts, and more. Get the professional knowledge you need on LinkedIn. LinkedIn Home What is LinkedIn? Join Today
Read more

Distributed Messaging - zeromq

ZeroMQ zero-em-queue, ØMQ: ... Ø Smart patterns like pub-sub, push-pull, and router-dealer.
Read more

ZeroMQ with Ruby - SitePoint

ZeroMQ is a network messaging library that provides the building blocks to create complex communication systems with minimal fuss via its simple API.
Read more

Pub Sub | LinkedIn

View 2417 Pub Sub posts, presentations, experts, and more. Get the professional knowledge you need on LinkedIn.
Read more