RabbitMQ 메시지 생산자 와 소비자
52037 단어 자바
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
//1. ,
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.
Connection connection = connectionFactory.newConnection();
//3.
Channel channel = connection.createChannel();
Map<String,Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)//2 , , , 。1 : bug ,
.contentEncoding("UTF-8")
.expiration("10000")//
.headers(headers)//
.build();
//4. ,exchange,routingkey,props: ,body:
for (int i = 0; i < 5; i++) {
String msg = "lulu ";
channel.basicPublish("", "lulu", properties, msg.getBytes());
System.out.println(msg);
}
//5. :
channel.close();
connection.close();
}
}
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
public static void main(String[] args) throws Exception {
//1. ,
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.
Connection connection = connectionFactory.newConnection();
//3.
Channel channel = connection.createChannel();
//4. 1. ,2. ,3. : ,4. exchange 5.
String queueName = "lulu";
channel.queueDeclare(queueName, true, false, false, null);
//5.
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6. channel 1. ,2. 3.
channel.basicConsume(queueName, true, queueingConsumer);
//7. 。
while (true) {
System.out.println(" ");
Delivery delivery = queueingConsumer.nextDelivery();// 。
String msg = new String(delivery.getBody());// body
Map<String,Object> headers = delivery.getProperties().getHeaders();
System.out.println(headers.get("my1"));
System.out.println(headers.get("my2"));
//Envelope envelope = delivery.getEnvelope();//
}
2–
public void fireEngineSend2(String message) {
try {
//
factory = new ConnectionFactory();
//
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
factory.setPort(port);
factory.setVirtualHost("/");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(queueName, false, false, false, null);
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
//
if (channel != null) {
channel.close();
}
//
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
public void fireEngineReceive2() {
try {
//
factory = new ConnectionFactory();
//
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
factory.setPort(port);
factory.setVirtualHost("/");
//
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
3–
public void fireEngineSend(){
try {
//
factory = new ConnectionFactory();
//
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
factory.setPort(port);
factory.setVirtualHost("/");
Connection connection = null;
connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchange = "fire_engine";
String routingKey = "consumer.save";
String msg = "Hello RabbitMQ Consumer Message";
for(int i =0; i<5; i ++) {
try {
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public void fireEngineReceive() {
try {
//
factory = new ConnectionFactory();
//
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
factory.setPort(port);
Connection connection = null;
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
/**
* Consumer
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
4–
/**
* @author MrBird
*/
@EnableAsync
@SpringBootApplication
@EnablePatrolCloudResourceServer
@EnableTransactionManagement
@EnableDistributedTransaction
@EnableBinding(Sink.class)
@MapperScan("com.data.mapper")
public class PatrolServerDataProcessApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(PatrolServerDataProcessApplication.class)
.web(WebApplicationType.SERVLET)
.run(args);
RabbitmqUtil rabbitmqUtil = new RabbitmqUtil();
rabbitmqUtil.fireEngineSend(); //
rabbitmqUtil.fireEngineReceive();
}
}
@StreamListener(Sink.INPUT)
public void handle(Person person) {
System.out.println("Received: " + person);
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Is Eclipse IDE dying?In 2014 the Eclipse IDE is the leading development environment for Java with a market share of approximately 65%. but ac...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.