PHP, RabbitMQ, and You

40 %
60 %
Information about PHP, RabbitMQ, and You
Technology

Published on March 16, 2014

Author: jasonlotito

Source: slideshare.net

Description

PHP, RabbitMQ, and You talk given at MidwestPHP 2014

PHP, RabbitMQ, and You #mwphp14 #rabbitmq @jasonlotito MidwestPHP 2014 - RabbitMQ What will we be covering? 1. What is RabbitMQ 2. Technology Overview 3. Publishers 4. Consumers 5. Exchanges 6. Queues 7. Bindings 8. Carrot to make things easy 9. Publish events from the web 10.Multiple consumers 11.Management UI Publishing 12.Consumers Publishing

PHP, RabbitMQ, and You #mwphp14 #rabbitmq @jasonlotito

Jason Lotito Senior Architect @ MeetMe @jasonlotito.com github.com/jasonlotito@gmail.com ! Senior Architect means people can blame me when things don’t work as expected. When things work, it’s because they worked around my code.

Who has worked with RabbitMQ in production? Raise your hands. The only audience participation part, I promise.

Part 1 Crash Course In RabbitMQ 1. What is RabbitMQ 2. Technology Overview 3. Publishers 4. Consumers 5. Exchanges 6. Queues 7. Bindings

– RabbitMQ In Action*, Manning “RabbitMQ is an open source message broker and queueing server that can be used to let disparate applications share data via a common protocol, or to simply queue jobs for processing by distributed workers. ”

Where RabbitMQ Sits (P) Producer/Publisher - (X) Exchange - (C) Consumer

Event Occurs in Application (P) Producer/Publisher - (X) Exchange - (C) Consumer

Message is Sent to Exchange (P) Producer/Publisher - (X) Exchange - (C) Consumer

Message is Sent to Queue (P) Producer/Publisher - (X) Exchange - (C) Consumer Exchanges connect to Queues through Bindings

Message is Sent to Consumer (P) Producer/Publisher - (X) Exchange - (C) Consumer

– Me, Now “Where as a database handles your data, a message queue handles your events.”

A database handles nouns. A message queue handles verbs.

But enough talk Let’s see some code!

composer.json "require": {
 "videlalvaro/php-amqplib": "2.2.*"
 }

We are starting with the publisher

Publisher: send.php <?php
 // Setup, $ php send.php whatever you want to send
 require_once 'vendor/autoload.php';
 $config = require('config.php');
 use PhpAmqpLibConnectionAMQPConnection;
 use PhpAmqpLibMessageAMQPMessage;
 
 // Message Prep
 $connection = new AMQPConnection($config['mq']['host'],
 $config['mq']['port'],
 $config['mq']['user'],
 $config['mq']['pass']);
 
 $channel = $connection->channel();
 $message = join(' ', array_splice($argv, 1));
 $message = empty($message) ? 'Hello world!' : $message;
 
 // Publish Message
 $channel->basic_publish(new AMQPMessage( $message ), '', 'hello');
 echo " [x] Sent '$message'n";
 $channel->close();
 $connection->close();


Publisher: send.php <?php
 // Setup, $ php send.php whatever you want to send
 require_once 'vendor/autoload.php';
 $config = require('config.php');
 use PhpAmqpLibConnectionAMQPConnection;
 use PhpAmqpLibMessageAMQPMessage;
 
 // Message Prep
 $connection = new AMQPConnection($config['mq']['host'],
 $config['mq']['port'],
 $config['mq']['user'],
 $config['mq']['pass']);
 
 $channel = $connection->channel();
 $message = join(' ', array_splice($argv, 1));
 $message = empty($message) ? 'Hello world!' : $message;
 
 // Publish Message
 $channel->basic_publish(new AMQPMessage( $message ), '', 'hello');
 echo " [x] Sent '$message'n";
 $channel->close();
 $connection->close();


Publisher: send.php <?php
 // Setup, $ php send.php whatever you want to send
 require_once 'vendor/autoload.php';
 $config = require('config.php');
 use PhpAmqpLibConnectionAMQPConnection;
 use PhpAmqpLibMessageAMQPMessage;
 
 // Message Prep
 $connection = new AMQPConnection($config['mq']['host'],
 $config['mq']['port'],
 $config['mq']['user'],
 $config['mq']['pass']);
 
 $channel = $connection->channel();
 $message = join(' ', array_splice($argv, 1));
 $message = empty($message) ? 'Hello world!' : $message;
 
 // Publish Message
 $channel->basic_publish(new AMQPMessage( $message ), '', 'hello');
 echo " [x] Sent '$message'n";
 $channel->close();
 $connection->close();


Publisher: send.php <?php
 // Setup, $ php send.php whatever you want to send
 require_once 'vendor/autoload.php';
 $config = require('config.php');
 use PhpAmqpLibConnectionAMQPConnection;
 use PhpAmqpLibMessageAMQPMessage;
 
 // Message Prep
 $connection = new AMQPConnection($config['mq']['host'],
 $config['mq']['port'],
 $config['mq']['user'],
 $config['mq']['pass']);
 
 $channel = $connection->channel();
 $message = join(' ', array_splice($argv, 1));
 $message = empty($message) ? 'Hello world!' : $message;
 
 // Publish Message
 $channel->basic_publish(new AMQPMessage( $message ), '', 'hello');
 echo " [x] Sent '$message'n";
 $channel->close();
 $connection->close();


Publisher: send.php <?php
 // Setup, $ php send.php whatever you want to send
 require_once 'vendor/autoload.php';
 $config = require('config.php');
 use PhpAmqpLibConnectionAMQPConnection;
 use PhpAmqpLibMessageAMQPMessage;
 
 // Message Prep
 $connection = new AMQPConnection($config['mq']['host'],
 $config['mq']['port'],
 $config['mq']['user'],
 $config['mq']['pass']);
 
 $channel = $connection->channel();
 $message = join(' ', array_splice($argv, 1));
 $message = empty($message) ? 'Hello world!' : $message;
 
 // Publish Message
 $channel->basic_publish(new AMQPMessage( $message ), '', 'hello');
 echo " [x] Sent '$message'n";
 $channel->close();
 $connection->close();


Now we create a consumer

Consumer: receive.php <?php
 
 require_once 'vendor/autoload.php';
 $config = require('config.php');
 use PhpAmqpLibConnectionAMQPConnection;
 
 $connection = new AMQPConnection($config['mq']['host'],
 $config['mq']['port'],
 $config['mq']['user'],
 $config['mq']['pass']);
 $channel = $connection->channel();
 $channel->queue_declare('hello', false, false, false, true);
 
 echo ' [*] Waiting for messages. To exit press CTRL+C', "n";
 
 $handler = function($message) use($channel){
 echo sprintf('Message: %s' . PHP_EOL, $message->body);
 };
 
 $channel->basic_consume('hello', false, true, true, false, false, $handler);
 $channel->wait();

Consumer: receive.php <?php
 
 require_once 'vendor/autoload.php';
 $config = require('config.php');
 use PhpAmqpLibConnectionAMQPConnection;
 
 $connection = new AMQPConnection($config['mq']['host'],
 $config['mq']['port'],
 $config['mq']['user'],
 $config['mq']['pass']);
 $channel = $connection->channel();
 $channel->queue_declare('hello', false, false, false, true);
 
 echo ' [*] Waiting for messages. To exit press CTRL+C', "n";
 
 $handler = function($message) use($channel){
 echo sprintf('Message: %s' . PHP_EOL, $message->body);
 };
 
 $channel->basic_consume('hello', false, true, true, false, false, $handler);
 $channel->wait();

Consumer: receive.php <?php
 
 require_once 'vendor/autoload.php';
 $config = require('config.php');
 use PhpAmqpLibConnectionAMQPConnection;
 
 $connection = new AMQPConnection($config['mq']['host'],
 $config['mq']['port'],
 $config['mq']['user'],
 $config['mq']['pass']);
 $channel = $connection->channel();
 $channel->queue_declare('hello', false, false, false, true);
 
 echo ' [*] Waiting for messages. To exit press CTRL+C', "n";
 
 $handler = function($message) use($channel){
 echo sprintf('Message: %s' . PHP_EOL, $message->body);
 };
 
 $channel->basic_consume('hello', false, true, true, false, false, $handler);
 $channel->wait();

Consumer: receive.php <?php
 
 require_once 'vendor/autoload.php';
 $config = require('config.php');
 use PhpAmqpLibConnectionAMQPConnection;
 
 $connection = new AMQPConnection($config['mq']['host'],
 $config['mq']['port'],
 $config['mq']['user'],
 $config['mq']['pass']);
 $channel = $connection->channel();
 $channel->queue_declare('hello', false, false, false, true);
 
 echo ' [*] Waiting for messages. To exit press CTRL+C', "n";
 
 $handler = function($message) use($channel){
 echo sprintf('Message: %s' . PHP_EOL, $message->body);
 };
 
 $channel->basic_consume('hello', false, true, true, false, false, $handler);
 $channel->wait();

Message is Sent to Exchange (P) Producer/Publisher - (X) Exchange - (C) Consumer

Exchange Types Direct, Fanout, and Topic

Queue bound to many exchanges

$msg = new AMQPMessage( $message );
 $channel->basic_publish($msg, '', ‘messages.new');

* matches one word # matches zero or more words A word is delineated by . *, #, and .

*.new messages.* NOT *.messages.* messages.new matches

NOT spam.* spam.*.* spam.# spam.message.new matches

Now Let’s Create an Exchange and a Queue

Using the Management UI rabbitmq-plugins enable rabbitmq_management http://localhost:15672

We’ve Created Everything Publishers, Exchanges, Bindings, Queues, and Consumers

So what can we do with this?

Part 2 PHP & RabbitMQ Together 1. Carrot to make things easy 2. Publish events from the web 3. Multiple consumers 4. Management UI Publishing 5. Consumers Publishing

Carrot github.com/jasonlotito/Carrot

Carrot Consumer Code <?php
 require_once 'vendor/autoload.php';
 
 use CarrotConsumer; $queue = 'new_messages';
 $handler = function($msg){
 echo $msg, PHP_EOL;
 return true;
 }; 
 (new Consumer())->listenTo($queue, $handler)
 ->listenAndWait();!

Carrot Publisher Code <?php
 require 'vendor/autoload.php';
 
 use CarrotPublisher;
 
 $msg = implode(' ', array_splice($argv, 1));
 (new Publisher('messages'))
 ->publish('message.new', $msg);

Make publishing easy

Make consuming easier

github.com/jasonlotito/ midwest-rabbitmq/tree/carrot

Adding the Web

Publisher $publisher = new Publisher('messages');
 $sendCount = (int) (isset($_POST['simulatedMessageCount']) ? $_POST['simulatedMessageCount'] : 1);
 
 for($x = 0; $x<$sendCount; $x++){
 if(isset($_POST['simulateWork'])) {
 usleep(500000);
 $msg = ['comment' => $_POST['comment'] . " $x"];
 $publisher->eventuallyPublish('message.new', $msg);
 } else {
 $msg = ['comment' => $_POST['comment'] . " $x"];
 $publisher->publish('message.new', $msg);
 }
 }

Let’s use the written Carrot Consumer Code

batch_basic_publish public function eventuallyPublish($routingKey, $message)
 {
 $msg = $this->buildMessage($message);
 $channel = $this->getChannel();
 $channel->batch_basic_publish($msg, $this->exchange, $routingKey);
 $this->registerShutdownHandler();
 }
 
 public function finallyPublish()
 {
 if ($this->doBatchPublish) {
 $this->doBatchPublish = false;
 $this->getChannel()->publish_batch();
 }
 } ! // register finallyPublish private function registerShutdownHandler();


Publish from the web Let’s add another consumer Without changing existing code

send text messages $queueName = 'messages_for_nexmo';
 
 (new Consumer())
 ->bind($queueName, 'messages', 'message.new')
 ->listenTo($queueName, function($msg) use($key, $secret, $phoneNumber) {
 $msg = json_decode($msg);
 $urlString = 'https://rest.nexmo.com/sms/json?api_key=%s&api_secret=%s' .
 '&from=17088568489&to=%s&text=%s';
 $preparedMessage = urlencode($msg->comment);
 $url = sprintf($urlString, $key, $secret, $phoneNumber, $preparedMessage);
 $res = file_get_contents($url);
 $result = json_decode($res);
 $messageResult = $result->messages[0];
 echo "Message Result: " .
 ($messageResult->status === '0' ? 'Message Sent' : 'Message not sent')
 . PHP_EOL;
 return $messageResult->status === '0';
 })->listenAndWait();


send text messages $queueName = 'messages_for_nexmo';
 
 (new Consumer())
 ->bind($queueName, 'messages', 'message.new')
 ->listenTo($queueName, function($msg) use($key, $secret, $phoneNumber) {
 $msg = json_decode($msg);
 $urlString = 'https://rest.nexmo.com/sms/json?api_key=%s&api_secret=%s' .
 '&from=17088568489&to=%s&text=%s';
 $preparedMessage = urlencode($msg->comment);
 $url = sprintf($urlString, $key, $secret, $phoneNumber, $preparedMessage);
 $res = file_get_contents($url);
 $result = json_decode($res);
 $messageResult = $result->messages[0];
 echo "Message Result: " .
 ($messageResult->status === '0' ? 'Message Sent' : 'Message not sent')
 . PHP_EOL;
 return $messageResult->status === '0';
 })->listenAndWait();


send text messages $queueName = 'messages_for_nexmo';
 
 (new Consumer())
 ->bind($queueName, 'messages', 'message.new')
 ->listenTo($queueName, function($msg) use($key, $secret, $phoneNumber) {
 $msg = json_decode($msg);
 $urlString = 'https://rest.nexmo.com/sms/json?api_key=%s&api_secret=%s' .
 '&from=17088568489&to=%s&text=%s';
 $preparedMessage = urlencode($msg->comment);
 $url = sprintf($urlString, $key, $secret, $phoneNumber, $preparedMessage);
 $res = file_get_contents($url);
 $result = json_decode($res);
 $messageResult = $result->messages[0];
 echo "Message Result: " .
 ($messageResult->status === '0' ? 'Message Sent' : 'Message not sent')
 . PHP_EOL;
 return $messageResult->status === '0';
 })->listenAndWait();


Both queues get the message

Let’s add another layer

Yo dawg, let’s have a consumer publish

Send an email After the text message

Create a new Exchange

Update text message consumer $publisher = new Publisher('sms');
 
 (new Consumer())
 ->bind($queueName, 'messages', 'message.new')
 ->listenTo($queueName, function($msg) use($key, $secret, $phoneNumber, $publisher) {
 // existing code
 $successful = $messageResult->status === '0';
 
 if ($successful) {
 $publisher->publish(‘sms.sent', ['message' => $msg->comment]);
 }
 
 return $successful;
 })->listenAndWait();


Let’s write the email consumer And I’ll also show you how to easily test them

Email Consumer (new Consumer())
 ->bind('send_email', 'emails', '*.send')
 ->bind('send_email', 'sms', '*.sent')
 ->listenTo('send_email', function($message){
 $msg = json_decode($message);
 mail('jasonlotito@gmail.com', 'MidwestPHP RabbitMQ Talk', $msg->message);
 echo 'Message sent: ' . $msg->message . PHP_EOL;
 return true;
 })->listenAndWait();

Email consumer (new Consumer())
 ->bind('send_email', 'emails', '*.send')
 ->bind('send_email', 'sms', '*.sent')
 ->listenTo('send_email', function($message){
 $msg = json_decode($message);
 mail('jasonlotito@gmail.com', 'MidwestPHP RabbitMQ Talk', $msg->message);
 echo 'Message sent: ' . $msg->message . PHP_EOL;
 return true;
 })->listenAndWait();

Now, let’s see it from the beginning

rabbitmq.org

Thank you. Review: joind.in/10558 #midwestphp #rabbitmq Questions? Feel free to stop and ask me, email, tweet, @jasonlotito@gmail.com

Add a comment

Related presentations

Presentación que realice en el Evento Nacional de Gobierno Abierto, realizado los ...

In this presentation we will describe our experience developing with a highly dyna...

Presentation to the LITA Forum 7th November 2014 Albuquerque, NM

Un recorrido por los cambios que nos generará el wearabletech en el futuro

Um paralelo entre as novidades & mercado em Wearable Computing e Tecnologias Assis...

Microsoft finally joins the smartwatch and fitness tracker game by introducing the...

Related pages

RabbitMQ - RabbitMQ tutorial - "Hello World!"

RabbitMQ is a message broker. The principal idea is pretty simple: it accepts and forwards messages. You can think about it as a post office: when you send ...
Read more

How to use RabbitMQ with PHP - SitePoint

How to use RabbitMQ with PHP ... In order to be able to integrate your php application with RabbitMQ, you’ll need the php-amqplib library.
Read more

pecl amqp package - The PHP Extension Community Library

Communicate with any AMQP compliant ... such as RabbitMQ, OpenAMQP and Qpid, giving you the ability to create and delete exchanges ... PHP Version: PHP 5.3 ...
Read more

GitHub - php-amqplib/php-amqplib: AMQP library for PHP

README.md php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ. Requirements: PHP 5.3 due to the ...
Read more

Getting Started with RabbitMQ in PHP - Tutorial - Binpress

In this tutorial I'll walk you through using message broker software RabbitMQ with PHP. It acts as a middleman between a producer and a consumer.
Read more

RabbitMQ - Messaging that just works

Teach yourself RabbitMQ in six easy lessons. Documentation. ... Read More. Languages: Java | .NET | Ruby | Python | PHP | JavaScript | more ...
Read more

Queuing with RabbitMQ and PHP - BrandonSavage.net

There are many times that you want to write background processes and queue up the tasks so that they can be handled in sequential order. There are any ...
Read more

RabbitMQ: An Open Source Messaging Broker That Just Works ...

RabbitMQ: An Open Source Messaging Broker That Just Works ... What RabbitMQ Can For You - Duration: 12:22. Nomad PHP 13,762 views.
Read more

What RabbitMQ Can For You - YouTube

RabbitMQ is a message broker - an application that allows communication between applications by way of a message queuing system. In this talk ...
Read more

PHP: AMQP - Manual

https://github.com/pdezwart/php-amqp. up. down. 1 ... If you are looking for a better documentation, until this gets updated, you can look at https: ...
Read more