Process Communication using AMQP with RabbitMQ
Setup RabbitMQ
To setup RabbitMQ, follow this tutorial: htps //w w. 락비 tmq. 코 m/트리어 ls/트리어 l-오네-그럼. HTML
(Currently I am using Scala for development so I needed to convert the example code written in Java into Scala.)
If your project is a maven project, you need to add the following lines into your pom.xml file.
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.0</version>
</dependency>
Then, start RabbitMQ service
service rabbitmq-server start
RabbitMQ Sender
Following is a simple code of RabbitMQ Sender
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
object rabbitMQ_sender {
def main(args: Array[String]) {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost("localhost")
val connection: Connection = factory.newConnection
val channel: Channel = connection.createChannel
val queue_to_send = "test"
channel.queueDeclare(queue_to_send, false, false, false, null)
val message = "Hello World!"
channel.basicPublish("", queue_to_send, null, message.getBytes)
System.out.println(" [x] Sent '" + message + "'")
channel.close
connection.close()
}
}
In the below figure, P represents RabbitMQ Sender (P means producer). When you run the above code, the sender send a message "Hello World"to Queue Stack (red blocks in the figure). Then, the Receiver (C: consumer ) receives the message.
RabbitMQ Receiver
Following is a simple code of RabbitMQ Receiver.
import com.rabbitmq.client._
object rabbitMQ_receiver {
def main(args: Array[String]) {
try {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost("localhost")
val connection: Connection = factory.newConnection()
val channel: Channel = connection.createChannel()
val QUEUE_NAME = "test"
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
val consumer:Consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope , properties: AMQP.BasicProperties , body: Array[Byte] ) = {
val message: String = new String (body, "UTF-8")
System.out.println (" [x] Received '" + message + "'")
}
}
channel.basicConsume (QUEUE_NAME, true, consumer)
}
}
}
While RabbitMQ Receiver is running, it returns the below result every time it gets message from a sender.
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
Test: Process handling using RabbitMQ
So now, how can I implement these RabbitMQ Sender/Receiver into my coding?
Here is what I am going to do.
In process A, call the following function where you want process A to send AMQP message, in my case it should be the end of main() function, so that process B can be notified the job completion.
def send_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost(rabbitmq_host)
val connection: Connection = factory.newConnection
val channel: Channel = connection.createChannel
val queue_to_send = queue_name
channel.queueDeclare(queue_to_send, false, false, false, null)
channel.basicPublish("", queue_to_send, null, stop_message.getBytes)
System.out.println(" [x] Sent '" + stop_message + "'")
channel.close
connection.close()
}
In process B, "RabbitMQ receiver"code should be modified a little bit like below. By adding while loop in the receiver, it waits until get a specific message from queue (in this case, a message from process B. These 2 messages need to be identical). Define the below function and call it where you want to stop process B until process A finishes its job.
Note that, messages are stuck in a queue whose name is the same one which is defined in
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
. Also, you can specify the location of RabbitMQ server with factory.setHost()
. def get_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
try {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost("localhost")
val connection: Connection = factory.newConnection()
val channel: Channel = connection.createChannel()
val QUEUE_NAME = queue_name
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
var enough: Boolean = false
while (!enough) {
val consumer: Consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]) = {
val message: String = new String(body, "UTF-8")
System.out.println(" [x] Received '" + message + "'")
if (message == stop_message) {enough = true}
}
}
channel.basicConsume(QUEUE_NAME, true, consumer)
}
channel.close()
connection.close()
}
}
In addition, I defined the below function which remove all messages in a queue.
Once you send a message to RabbitMQ server and it is stacked in a queue, unless a consumer reads the message, it will never be removed (and this is one of the beneficials of AMQP service). Therefore, before starting the 2 processes, execute the following function and clear all old messages to avoid that process B mistakenly read an old message from process A and move ahead without waiting process A completion.
def clear_ampq_queues(rabbitmq_host: String, queue_name: String) = {
val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost("localhost")
val connection: Connection = factory.newConnection()
val channel: Channel = connection.createChannel()
channel.queueDelete(queue_name)
channel.close()
connection.close()
}
The above codes are only applicable to sequential processes, but what I am trying to develop is like multiple processes which runs on containers in parallel and they send message after completing its job to a controller (which should be receiver in the Example a I am going to modify the code for parallel processing.
Reference
이 문제에 관하여(Process Communication using AMQP with RabbitMQ), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/bmj0114/items/ef6036b9ea6d06159767텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)