RabbitMQ 학습 의 3: 게시/구독 (방송 방식 fanout)
8201 단어 RabbitMQ 학습
지난 글 에서 우 리 는 RabbitMQ 작업 대기 열 에 대해 배 웠 고 메 시 지 를 전달 하 는 방식 과 메시지 피드백 체제 등 을 알 았 다. 또한 하나의 임 무 를 한 소비자 에 게 만 준다 고 가정 했다. 그러나 실제 업무 에서 '일대일' 모델 은 수 요 를 만족 시 키 지 못 했다. 이 글 에서우 리 는 앞으로 가능 한 한 실제 와 관련 도가 높 은 예 를 들 어 하나의 임 무 를 모든 소비자 에 게 보 낸 다음 에 소비자 들 이 이 소식 을 어떻게 처리 할 것 인 가 를 결정 할 것 이다.
지금부터 오늘 의 첫 번 째 개념: exchange (전송 기/공유 기) 먼저 홈 페이지 를 발췌 하면 The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn 't even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.(변환기/공유 기)사실 exchange 는 간단 한 작업 을 했 을 뿐 입 니 다. 1. producer 에서 보 낸 메 시 지 를 받 습 니 다. 2. 받 은 메 시 지 를 push 를 대기 열 에 보 내 는 것 은 이러한 작업 특징 이 있 습 니 다. exchange 는 받 은 메 시 지 를 어떻게 처리 해 야 하 는 지 잘 알 아야 합 니 다. 특정한 대기 열 에 메 시 지 를 보 내야 하 는 것 입 니까? 아니면 많은 대기 열 에 보 내야 하 는 것 입 니까? 아니면 무시 해 야 하 는 것 입 니까?어떻게 해 야 하 는 지 는 exchange 의 유형 을 어떻게 정의 하 느 냐 에 달 려 있 습 니 다.
exchage 형식:
1. direct
2. topic
3. headers
4. fanout
말 나 온 김 에 세심 한 친구 들 이 발 견 했 을 것 입 니 다. 예전 의 예 에서 exchange 를 밝 히 지 않 았 습 니 다. 하지만 우리 의 코드 는 잘 가지 않 았 습 니까? 이상 하 게 생각 하지 마 세 요. rabbitMQ 에서 기본 적 인 exchange 가 있 습 니 다. 그 이름 은 '"입 니 다. 여기 있 는 queneName 은 RoutingKey 입 니 다. 다음 에 다시 토론 하 겠 습 니 다.
channel.basicPublish("", queneName, null, msg.getBytes());
오늘 은 가장 간단 한 팬 아웃 방식, 즉 방송 방식, exchange 가 받 은 소식 을 모든 소비자 에 게 보 내 는 것 을 배 워 보 겠 습 니 다.
exchange 설명
// "exchange_fanout" exchange
channel.exchangeDeclare("exchange_fanout", "fanout");
// exchange
channel.basicPublish("exchange_fanout", "", null, msg.getBytes());
두 번 째 개념: 임시 대기 열 (Temporary queues)
이전의 예 에서, 우리 의 대기 열 은 모두 구체 적 인 이름 (예 를 들 어, "workquene 1") 이 있 었 다. 사실 우 리 는 producer 와 consumer 사이 에서 정 보 를 공유 해 야 할 때, 대기 열 에 이름 을 붙 이 는 것 은 필요 하 다. 그러나 이 글 은 잠시 이것 에 대해 토론 하지 않 는 다. 오늘 의 임 무 는 모든 소비자 에 게 정 보 를 보 내 는 것 이다. 마침 서버 는 우리 에 게 non - durable (지속 되 지 않 는) 을 제공 할 수 있다., exclusive (단독), autodelete (소비자 의 소멸 에 따라 자동 으로 삭 제 된), random (무 작위 이름) 의 대기 열 입 니 다.
String queueName = channel.queueDeclare().getQueue();
세 번 째 개념: 바 인 딩 (Bindings) 위 에 exchange 를 만 들 었 고 임시 대기 열 을 정 의 했 습 니 다. 지금 은 exchange 에서 모든 대기 열 로 메 시 지 를 보 내 려 고 합 니 다. exchange 와 대기 열 을 연결 해 야 합 니까? 그렇지 않 으 면 exchange 는 이 자 를 어디로 보 내 는 지 어떻게 압 니까?
// binding
channel.queueBind(" ", "exchange ", "");
자, 팬 아웃 타 입 은 말 이 많 지 않 습 니 다. 코드 를 올 려 도 됩 니 다.
생산자 생산자
public class Producer {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// connection
Connection conn = factory.newConnection();
// channel
Channel channel = conn.createChannel();
// channel fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Date nowDate = new Date();
String msg = nowDate.getTime() + " have log sth...";
// exchange
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println(nowDate + " ...");
channel.close();
conn.close();
}
}
소비자 소비자
public class Consumer1 {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
// pid,
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
// channel
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// RabbitMQ ,
String queueName = channel.queueDeclare().getQueue();
// binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(pid + " , ...");
QueueingConsumer consumer = new QueueingConsumer(channel);
//
channel.basicConsume(queueName, true, consumer);
while (true) {
// Delivery : ,
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String recieveMsg = new String(delivery.getBody());
System.out.println(pid + " : " + recieveMsg);
}
}
}
출력 결과:
생산 자 는 다음 두 가지 데 이 터 를 생산 했다.
1450345916316 have log sth… 1450345918852 have log sth…
소비자 2980
2980 수신 소식: 1450345916316 have log sth... 2980 수신 소식: 1450345918852 have log sth...
소비자
484 수신 소식: 1450345916316 have log sth... 484 수신 소식: 1450345918852 have log sth...