Event Bus in php using AMQP Library and RabbitMQ

Piyuri Sahu
7 min readJul 20, 2018

RabbitMQ is a message queueing software called a message broker or queue manager. It is a software where queues can be defined, applications may connect to the queue and transfer a message onto it.

rabbitmq is a bit more than mere messaging…its a common platform that has ability to inter-connect applications. Using rabbitmq a java application can speak to a linux server and/or a .net app , to a ruby & rails + almost anything that finds its place in the corporate web development..

We have a fullstack web application written with Node.js.

On the other hand we have a Linux program that runs on server, and is written in c. Node app needs data from C program. How do you solve that problem?Well, first we wrote a daemon for Linux that will take care around the cprogram.It will check if it works, or if it finished working, etc.

That daemon also has 2 components. One relies on jansson (C library for JSON), and another one relies on AMQP (RabbitMQ’s protocol).We used the approach so Node sends the JSON messages. Those messages are being serialized, and sent to RabbitMQ.

RabbitMQ forwards those messages to daemon. We then deserialize JSON, and take the relevant data, and push it to the C program.Once we finish, we use the same way around to notify the Node.js app.

That’s where Rabbit, and other MQ’s play big role.

AMQB: RabbitMQ allows clients to connect over a range of different open and standardized protocols such as AMQP, HTTP, STOMP, MQTT and WebSockets/Web-Stomp.

To understand AMQP , You should know few term:

  • Broker (Server):IT is a message queueing software called a message broker or queue manager. It is a software where queues can be defined, applications may connect to the queue and transfer a message onto it(RabitMq is broker here)
  • Message: Content of data transferred / routed including information such as payload and message attributes.
  • Consumer: An application which receives message(s) — put by a producer — from queues.
  • Producer: An application which put messages to a queue via an exchange
  • Exchange: A part of the broker (i.e. server) which receives messages and routes them to queues(which are often compared to post offices or mailbox. Exchanges then distribute message copies to queues using rules called bindings. Then AMQP brokers either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand. )
  • Queue (message queue):A queue is the buffer that stores messages in the message broker.
  • Bindings: Rules for distributing messages from exchanges to queues
<?php
/**
* Created by PhpStorm.
* User: Piyuri Sahu
* Date: 7/20/18
* Time: 12:01 PM
*/
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;

/**
* Create a connection to RabbitAMQP
*/

$connection = new AMQPStreamConnection(
'localhost', #host - host name where the RabbitMQ server is
5672, #port - port number of the service, 5672 is the default
'guest', #user - username to connect to server
'guest', #password
'v_host' , password
);

$connection = new

/** @var $channel AMQPChannel */
$channel = $connection->channel();


$channel->queue_declare(
'status_update', #queue name - Queue names may be up to 255 bytes of UTF-8 characters
false, #passive - can use this to check whether an exchange exists without modifying the server state
false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
false #autodelete - queue is deleted when last consumer unsubscribes
);
$channel->queue_declare('package.statusUpdate.v1', false, false, false, false);
$message = '{
"entityId": "abc",
"data": {
"packageType": "Sales_order",
"platformName": "INDIA",
"status": "in_pickup",
"statusTimestamp": 1512192085
}
}'
;


$msg = new AMQPMessage($message);

$channel->basic_publish(
$msg, #message
'', #exchange
'pizzaTime' #routing key
);

$channel->basic_publish($msg, '', 'status_update');

First, we create a connection object.

You can use https://www.cloudamqp.com/plans.html (Sign-in ) and get credential for testing purpose.

Since RabbitMQ listens and serves using a single port, we need to create a channel (think of it as a virtual port)

$channel = $connection->channel();

Once we have our channel ready, let’s declare a queue to send the request to. The good thing about RabbitMQ is that we can create queues directly from the client, but we have to be careful how we create it. Let’s explain briefly the parameters used to create a queue with $channel->queue_declare()

Queue Name: this is an arbitrary name, will be used to identify the queue

Passive: if set to true, the server will only check if the queue can be created, false will actually attempt to create the queue.

Durable: Typically, if the server stops or crashes, all queues and messages are lost… unless we declare the queue durable, in which case the queue will persist if the server is restarted.

Exclusive: If true, the queue can only be used by the connection that created it.

Autodelete: if true, the queue will be deleted once it has no messages and there are no subscribers connected

In our example the queue will not persist if the server is restarted, can be used by other connections and will not be deleted if there are no more subscribers to it.

Next, we created a message object with

$msg = newAMQPMessage($message);

Now we have to publish the message to the queue. However, we cannot publish messages directly to the queue if it is not through an exchange. We never declared an exchange, so how will this be possible? It turns out that when we create a queue without defining an exchange to bind the queue to, a default exchange will be used. We can publish the message to the queue through the default exchange with $channel->basic_publish(), the parameters it uses are:

Message: the message we want to send

Exchange: notice that we are using an empty string, because we will use the default exchange

Routing key: the queue name we want the message to be delivered to.

So the Status Update message is in the queue, how do we retrieve those? First of all we have to be aware that a consumer has to establish a constant connection to the queue server (a.k.a. subscribe) in order for it to receive messages from the server.

function consume($queue)
{

$updateStatus = function (AMQPMessage $message) {
$decodedMessage = json_decode($message->body, true);

if (!$decodedMessage) {
Log::error('Message Payload is not in correct format' . $decodedMessage);

// Reject the message
$this->channel->basic_nack($message->delivery_info['delivery_tag']);

return;
}

// Update package id
$this->packageService->updateStatus($decodedMessage);

// Acknowledge the message
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
return;

// Reject the message
$this->channel->basic_nack($message->delivery_info['delivery_tag']);

};

$connection = new AMQPStreamConnection(
'localhost', #host - host name where the RabbitMQ server is
5672, #port - port number of the service, 5672 is the default
'guest', #user - username to connect to server
'guest', #password
'v_host' , password
);

/** @var $channel AMQPChannel */
$channel = $connection->channel();


$channel->queue_declare(
'status_update', #queue name, the same as the sender
false, #passive - can use this to check whether an exchange exists without modifying the server state
false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
false #autodelete - queue is deleted when last consumer unsubscribes
);

$this->channel->basic_consume(
$queue, #queue name, the same as the sender
self::CONSUMER_TAG . '.' . $queue, #consumer tag - Identifier for the consumer, valid within the current channel. just string
false,#no local - TRUE: the server will not send messages to the connection that published them
false, #no ack - send a proper acknowledgment from the worker, once we're done with a task
false, #exclusive - queues may only be accessed by the current connection
false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
$updateStatus #callback - method that will receive the message
);
while ($this->channel->callbacks) {
$this->channel->wait();
}


}

Just as we connected, created a channel and declared a queue in the producer, we have to do exactly the same inside the consumer. However in the consumer, we have to subscribe to the channel with $channel->basic_consume(), and the parameters used are defined as follows:

  • Queue: has to be the same queue name we defined in the producer
  • Consumer tag: an arbitrary name given to the consumer. If this field is empty the server will generate a unique tag
  • No local: This is an obscure parameter, if activated, the server will not deliver its own messages
  • No Ack(nowledgement): will automatically acknowledge that the consumer received the message, so we do not have to manually do so.
  • No wait: If set, the server will not wait for the process in the consumer to complete
  • Callback: can be a function name, an array containing the object and the method name, or a closure that will receive the queued message. This callback has to accept a parameter, containing such a message.

Message Acknowledgement

When RabbitMQ delivers a message to a consumer, it needs to know when to consider the message successfully sent. An ack will acknowledge one or more messages, which tells RabbitMQ that a message/messages has been handled.

  • After broker sends a message to an application (using either basic.deliver or basic.get-okAMQP methods).
  • After the application sends back an acknowledgement (using basic.ack AMQP method).

Rejecting Message

When a consumer application receives a message, processing of that message may or may not succeed. An application can indicate to the broker that message processing has failed (or cannot be accomplished at the time) by rejecting a message. When rejecting a message, an application can ask the broker to discard or requeue it. When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again.

Negative Acknowledgements

Messages are rejected with the basic.reject AMQP method. There is one limitation that basic.reject has: there is no way to reject multiple messages as you can do with acknowledgements.

Nack is a negative acknowledge, that tells RabbitMQ that the message was not handled as expected. A nack:ed message is by default sent back into the queue

--

--

Piyuri Sahu

Application-developer at Technogise Software Solution