Process Communication using AMQP with RabbitMQ

Setup RabbitMQ



To setup RabbitMQ, follow this tutorial: htps //w w. 락비 tmq. 코 m/트리어 ls/트리어 l-오네-그럼. HTML
(Currently I am using Scala for development so I needed to convert the example code written in Java into Scala.)

If your project is a maven project, you need to add the following lines into your pom.xml file.
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.2.0</version>
</dependency>

Then, start RabbitMQ service
service rabbitmq-server start

RabbitMQ Sender



Following is a simple code of RabbitMQ Sender
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
object rabbitMQ_sender {
  def main(args: Array[String]) {

    val factory: ConnectionFactory = new ConnectionFactory()
    factory.setHost("localhost")
    val connection: Connection = factory.newConnection
    val channel: Channel = connection.createChannel

    val queue_to_send = "test"
    channel.queueDeclare(queue_to_send, false, false, false, null)
    val message = "Hello World!"
    channel.basicPublish("", queue_to_send, null, message.getBytes)
    System.out.println(" [x] Sent '" + message + "'")

    channel.close
    connection.close()
  }
}

In the below figure, P represents RabbitMQ Sender (P means producer). When you run the above code, the sender send a message "Hello World"to Queue Stack (red blocks in the figure). Then, the Receiver (C: consumer ) receives the message.



RabbitMQ Receiver



Following is a simple code of RabbitMQ Receiver.
import com.rabbitmq.client._
object rabbitMQ_receiver {
  def main(args: Array[String]) {
      try {
        val factory: ConnectionFactory  = new ConnectionFactory()
        factory.setHost("localhost")
        val connection: Connection  = factory.newConnection()
        val channel: Channel  = connection.createChannel()

        val QUEUE_NAME = "test"
        channel.queueDeclare(QUEUE_NAME, false, false, false, null)
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
        val consumer:Consumer  = new DefaultConsumer(channel) {
          override def handleDelivery(consumerTag: String, envelope: Envelope , properties: AMQP.BasicProperties , body: Array[Byte] ) = {
            val message: String  = new String (body, "UTF-8")
            System.out.println (" [x] Received '" + message + "'")
          }
        }
        channel.basicConsume (QUEUE_NAME, true, consumer)
      }
  }
}

While RabbitMQ Receiver is running, it returns the below result every time it gets message from a sender.
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

Test: Process handling using RabbitMQ



So now, how can I implement these RabbitMQ Sender/Receiver into my coding?

Here is what I am going to do.
  • Start 2 processes (process A and B) at the same time
  • But process B should stop its process at some point and wait process A finishes its job
  • Process A send a message when it completed its job
  • Once process B get a message from process A, it restarts its job

  • In process A, call the following function where you want process A to send AMQP message, in my case it should be the end of main() function, so that process B can be notified the job completion.
      def send_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
        val factory: ConnectionFactory = new ConnectionFactory()
        factory.setHost(rabbitmq_host)
        val connection: Connection = factory.newConnection
        val channel: Channel = connection.createChannel
    
        val queue_to_send = queue_name
        channel.queueDeclare(queue_to_send, false, false, false, null)
        channel.basicPublish("", queue_to_send, null, stop_message.getBytes)
        System.out.println(" [x] Sent '" + stop_message + "'")
        channel.close
        connection.close()
      }
    

    In process B, "RabbitMQ receiver"code should be modified a little bit like below. By adding while loop in the receiver, it waits until get a specific message from queue (in this case, a message from process B. These 2 messages need to be identical). Define the below function and call it where you want to stop process B until process A finishes its job.
    Note that, messages are stuck in a queue whose name is the same one which is defined in channel.queueDeclare(QUEUE_NAME, false, false, false, null) . Also, you can specify the location of RabbitMQ server with factory.setHost() .
        def get_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
          try {
            val factory: ConnectionFactory  = new ConnectionFactory()
            factory.setHost("localhost")
            val connection: Connection  = factory.newConnection()
            val channel: Channel  = connection.createChannel()
    
            val QUEUE_NAME = queue_name
            channel.queueDeclare(QUEUE_NAME, false, false, false, null)
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
    
            var enough: Boolean = false
            while (!enough) {
              val consumer: Consumer = new DefaultConsumer(channel) {
                override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]) = {
                  val message: String = new String(body, "UTF-8")
                  System.out.println(" [x] Received '" + message + "'")
                  if (message == stop_message) {enough = true}
                }
              }
              channel.basicConsume(QUEUE_NAME, true, consumer)
            }
            channel.close()
            connection.close()
        }
      }
    

    In addition, I defined the below function which remove all messages in a queue.
    Once you send a message to RabbitMQ server and it is stacked in a queue, unless a consumer reads the message, it will never be removed (and this is one of the beneficials of AMQP service). Therefore, before starting the 2 processes, execute the following function and clear all old messages to avoid that process B mistakenly read an old message from process A and move ahead without waiting process A completion.
        def clear_ampq_queues(rabbitmq_host: String, queue_name: String) = {
          val factory: ConnectionFactory  = new ConnectionFactory()
          factory.setHost("localhost")
          val connection: Connection  = factory.newConnection()
          val channel: Channel  = connection.createChannel()
          channel.queueDelete(queue_name)
          channel.close()
          connection.close()
        }
    

    The above codes are only applicable to sequential processes, but what I am trying to develop is like multiple processes which runs on containers in parallel and they send message after completing its job to a controller (which should be receiver in the Example a I am going to modify the code for parallel processing.

    좋은 웹페이지 즐겨찾기