RABBITMQ

What is RabbitMQ?

RabbitMQ is an open source message broker software. It accepts messages from producers, and delivers them to consumers. It acts like a middleman which can be used to reduce loads and delivery times taken by web application servers

Features of RabbitMQ:

RabbitMQ is an open source message broker software. It accepts messages from producers, and delivers them to consumers. It acts like a middleman which can be used to reduce loads and delivery times taken by web application servers

  • Robust messaging for building applications in a distributed manner.
  • Easy to use
  • Runs on all major Operating Systems.
  • Supports a huge number of developer platforms
  • Supports multiple messaging protocols, message queuing, delivery acknowledgement, flexible routing to queues, multiple exchange type.
  • Open source and commercially supported
How RabbitMQ Works?

The Producer sends messages to an exchange. An exchange is responsible for the routing of the messages to the different queues. An exchange accepts messages from the producer application and routes them to message queues with the help of bindings and routing keys. A binding is a key between a queue and an exchange. Then consumers receive messages from the queue.

Prerequisites:
  • RabbitMQ
  • Python

How to Send and Receive a message using RABBITMQ?

Send a Message using RabbitMQ:

Following Program send.py will send a single message to the queue.

Step 1: To Establish a connection with RabbitMQ server.

 
       
        import pika
        
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
       
 

Step 2: To Create a hello queue to which the message will be delivered:


channel.queue_declare(queue='hello')
     
   

Step 3: Publish the message and mention the exchange details and queue name in the exchange and routing key params to which queue the message should go.


channel.basic_publish(exchange='', routing_key='hello', body='Hello RabbitMQ!')
        print(" [x] Sent 'Hello RabbitMQ!'")
        

Step 4: closing a connection to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ


connection.close()
       
 
Recieve a message using RabbitMQ:

Following Program recieve.py will send a single message to the queue.

Step 1: It works by subscribing a callbackfunction to a queue. Whenever receiving a message, this callback function is called by the Pika library. Following function will print on the screen the contents of the message.


def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
       
 

Step 2:Next, need to tell RabbitMQ that this particular callback function should receive messages from our hello queue:


channel.basic_consume(
queue='hello', on_message_callback=callback, auto_ack=True)

        

Step 3:And finally, Enter a never-ending loop that waits for data and runs callbacks whenever necessary.


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
     
   

Step 4:Open terminal. Run the Send.py The producer program will stop after every run: python send.py [x] Sent ‘Hello RabbitMQ!’ We can go to the web browser and hit the URL http://localhost:15672/, and see the count of the message sent as shown below in the dashboard:

Step 5: Open terminal. Run the receive.py program.

python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Hello RabbitMQ!’

If ready and total count is zero in the dashboard, then confirm the messages are received by consumer.

Note: Continuously send a message through RabbitMQ. As noticed, the receive.py program doesn’t exit. It will stay ready to receive further messages, and may be interrupted with Ctrl-C.

Messaging system

A Messaging System is responsible for transferring data from one application to another, so the applications can focus on data, but not worry about how to share it. In Big Data, an enormous volume of data is used.

Two types of messaging patterns are available:

Messaging system

In a point-to-point system, messages are persisted in a queue. One or more consumers can consume the messages in the queue, but a particular message can be consumed by a maximum of one consumer only.

Publish-Subscribe Messaging System

In the publish-subscribe system, messages are persisted in a topic. Unlike point-to-point system, consumers can subscribe to one or more topic and consume all the messages in that topic. In the Publish-Subscribe system, message producers are called publishers and message consumers are called subscribers.

Kafka

Apache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables us to pass messages from one end-point to another.

The following diagram illustrates the main terminologies.

Topics

A stream of messages belonging to a category is called a topic. Data is stored in topics. Kafka topics are analogous to radio / TV channels. Multiple consumers can subscribe to same topic and consume the messages.

Topics are split into partitions. For each topic, Kafka keeps a minimum of one partition. Each such partition contains messages in an immutable ordered sequence.

Partition offset

Each partitioned message has a unique sequence id called as offset. For each topic, the Kafka cluster maintains a partitioned log that looks like this:

Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

The Kafka cluster durably persists all published records—whether or not they have been consumed—using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space.

Replicas of partition

Replicas are nothing but backups of a partition. Replicas are never read or write data. They are used to prevent data loss.

Brokers

Brokers are simple system responsible for maintaining the published data. Each broker may have zero or more partitions per topic.

Kafka Cluster

Kafka’s having more than one broker are called as Kafka cluster.

Kafka Cluster Architecture

Zookeeper

ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system.

Consumer Group

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group.

Kafka features
  1. High Throughput: Provides support for hundreds of thousands of messages with modest hardware.
  2. Scalability: Highly scalable distributed system with no downtime
  3. Data Loss: Kafka ensures no data loss once configured properly
  4. Stream processing: Kafka can be used along with real time streaming applications like Spark and Storm
  5. Durability: Provides support to persisting messages on disk
  6. Replication: Messages can be replicated across clusters, which supports multiple subscribers
Installing and Getting started

Prerequisite: Install Java

  1. Download kafka .tgz file from https://kafka.apache.org/downloads
  2. Untar the file and go into the kafka directory
              
             > tar -xzf kafka_2.11-2.1.0.tgz
             > cd kafka_2.11-2.1.0 
             
  3. Start the zookeeper server using the properties in zookeeper.properties
              
            > bin/zookeeper-server-start.sh config/zookeeper.properties
              
            
  4. iv. Start the Kafka broker using the properties in server.properties
              
            > bin/kafka-server-start.sh config/server.properties 
             
            
Create a topic

Let’s create a topic named “test” with a single partition and only one replica:
In below command 2181 is the port number we have specified in zookeeper.properties

            
        > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
            
        

To list the current list of topics, we can query the zookeeper using below command:

                
                    > bin/kafka-topics.sh --list --zookeeper localhost:2181
        Test
                
        
Command line producer

Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message. Run the producer and then type a few messages into the console to send to the server

(In below command 9092 is the port number we configured for the broker in server.properties)

                
                    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
        This is a message
        This is another message
                
        
Command line consumer

Kafka also has a command line consumer that will dump out messages to standard output.

                
                    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
        This is a message
        This is another message
                
        
Kafka producer and consumer using python

The Kafka producer and consumer can be coded in many languages like java, python, etc. In this section, we will see how to send and receive messages from a python topic using python.

  1. First we have to install the kafka-python package using python package manager.
                            
                                pip install kafka-python
                            
                            
  2. Below python program reads records from an input file and sends them as messages to the test topic which we created in previous section

    Producer.py

                    
                        from kafka import KafkaProducer
            producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
            with open('/example/data/inputfile.txt') as f:
                 lines = f.readlines()
            
            for line in lines:
               producer.send('test', line)
                    
            

Below python program consumes the messages from the kafka topic and prints them on the screen.

Consumer.py

                
                    from kafka import KafkaConsumer
        consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
        consumer.subscribe(['test'])
        for msg in consumer:
            print(msg)
                
        

In the above program, we have

                
                    auto_offset_reset=’earliest’.
                
                

This will cause the consumer program to read all the messages from beginning if the same program is run again and again.

Instead if we have

                 auto_offset_reset=’latest’,
                
                

the consumer program will read messages starting from the latest offset which was consumed earlier.

CloudIQ is a leading Cloud Consulting and Solutions firm that helps businesses solve today’s problems and plan the enterprise of tomorrow by integrating intelligent cloud solutions. We help you leverage the technologies that make your people more productive, your infrastructure more intelligent, and your business more profitable. 

US

3520 NE Harrison Drive, Issaquah, WA, 98029

INDIA

Chennai One IT SEZ,

Module No:5-C, Phase ll, 2nd Floor, North Block, Pallavaram-Thoraipakkam 200 ft road, Thoraipakkam, Chennai – 600097


© 2023 CloudIQ Technologies. All rights reserved.

Get in touch

Please contact us using the form below

    USA

    3520 NE Harrison Drive, Issaquah, WA, 98029

    +1 (206) 203-4151

    INDIA

    Chennai One IT SEZ,

    Module No:5-C, Phase ll, 2nd Floor, North Block, Pallavaram-Thoraipakkam 200 ft road, Thoraipakkam, Chennai – 600097

    +91-044-43548317