RabbitMQ 학습 의 3: 게시/구독 (방송 방식 fanout)

8201 단어 RabbitMQ 학습
레 퍼 런 스http://blog.csdn.net/lmj623565791/article/details/37620057RabbitMQ 홈 페이지 와 함께 자신의 일부 수정 과 실험 을 합 니 다. 초보 자가 독학 을 했 기 때문에 잘못된 점 이 있 을 수 있 습 니 다. 여러분 의 지적 을 환영 합 니 다. 동생 은 먼저 감 사 했 습 니 다.
지난 글 에서 우 리 는 RabbitMQ 작업 대기 열 에 대해 배 웠 고 메 시 지 를 전달 하 는 방식 과 메시지 피드백 체제 등 을 알 았 다. 또한 하나의 임 무 를 한 소비자 에 게 만 준다 고 가정 했다. 그러나 실제 업무 에서 '일대일' 모델 은 수 요 를 만족 시 키 지 못 했다. 이 글 에서우 리 는 앞으로 가능 한 한 실제 와 관련 도가 높 은 예 를 들 어 하나의 임 무 를 모든 소비자 에 게 보 낸 다음 에 소비자 들 이 이 소식 을 어떻게 처리 할 것 인 가 를 결정 할 것 이다.
지금부터 오늘 의 첫 번 째 개념: exchange (전송 기/공유 기) 먼저 홈 페이지 를 발췌 하면 The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn 't even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.(변환기/공유 기)사실 exchange 는 간단 한 작업 을 했 을 뿐 입 니 다. 1. producer 에서 보 낸 메 시 지 를 받 습 니 다. 2. 받 은 메 시 지 를 push 를 대기 열 에 보 내 는 것 은 이러한 작업 특징 이 있 습 니 다. exchange 는 받 은 메 시 지 를 어떻게 처리 해 야 하 는 지 잘 알 아야 합 니 다. 특정한 대기 열 에 메 시 지 를 보 내야 하 는 것 입 니까? 아니면 많은 대기 열 에 보 내야 하 는 것 입 니까? 아니면 무시 해 야 하 는 것 입 니까?어떻게 해 야 하 는 지 는 exchange 의 유형 을 어떻게 정의 하 느 냐 에 달 려 있 습 니 다.
exchage 형식:
 1. direct
 2. topic
 3. headers 
 4. fanout

말 나 온 김 에 세심 한 친구 들 이 발 견 했 을 것 입 니 다. 예전 의 예 에서 exchange 를 밝 히 지 않 았 습 니 다. 하지만 우리 의 코드 는 잘 가지 않 았 습 니까? 이상 하 게 생각 하지 마 세 요. rabbitMQ 에서 기본 적 인 exchange 가 있 습 니 다. 그 이름 은 '"입 니 다. 여기 있 는 queneName 은 RoutingKey 입 니 다. 다음 에 다시 토론 하 겠 습 니 다.
channel.basicPublish("", queneName, null, msg.getBytes());

오늘 은 가장 간단 한 팬 아웃 방식, 즉 방송 방식, exchange 가 받 은 소식 을 모든 소비자 에 게 보 내 는 것 을 배 워 보 겠 습 니 다.
exchange 설명
//        "exchange_fanout" exchange
channel.exchangeDeclare("exchange_fanout", "fanout");
//       exchange
channel.basicPublish("exchange_fanout", "", null, msg.getBytes());

두 번 째 개념: 임시 대기 열 (Temporary queues)
이전의 예 에서, 우리 의 대기 열 은 모두 구체 적 인 이름 (예 를 들 어, "workquene 1") 이 있 었 다. 사실 우 리 는 producer 와 consumer 사이 에서 정 보 를 공유 해 야 할 때, 대기 열 에 이름 을 붙 이 는 것 은 필요 하 다. 그러나 이 글 은 잠시 이것 에 대해 토론 하지 않 는 다. 오늘 의 임 무 는 모든 소비자 에 게 정 보 를 보 내 는 것 이다. 마침 서버 는 우리 에 게 non - durable (지속 되 지 않 는) 을 제공 할 수 있다., exclusive (단독), autodelete (소비자 의 소멸 에 따라 자동 으로 삭 제 된), random (무 작위 이름) 의 대기 열 입 니 다.
String queueName = channel.queueDeclare().getQueue();

세 번 째 개념: 바 인 딩 (Bindings) 위 에 exchange 를 만 들 었 고 임시 대기 열 을 정 의 했 습 니 다. 지금 은 exchange 에서 모든 대기 열 로 메 시 지 를 보 내 려 고 합 니 다. exchange 와 대기 열 을 연결 해 야 합 니까? 그렇지 않 으 면 exchange 는 이 자 를 어디로 보 내 는 지 어떻게 압 니까?
// binding
channel.queueBind("   ", "exchange ", "");

자, 팬 아웃 타 입 은 말 이 많 지 않 습 니 다. 코드 를 올 려 도 됩 니 다.
생산자 생산자
public class Producer {

    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //   connection
        Connection conn = factory.newConnection();
        //   channel
        Channel channel = conn.createChannel();
        //    channel fanout  
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        Date nowDate = new Date();
        String msg = nowDate.getTime() + " have log sth...";
        //       exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        System.out.println(nowDate + "         ...");
        channel.close();
        conn.close();
    }
}

소비자 소비자
public class Consumer1 {

    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        //      pid,          
        String name = ManagementFactory.getRuntimeMXBean().getName();  
        String pid = name.split("@")[0];  
        //      channel
        ConnectionFactory factory = new ConnectionFactory();
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //  RabbitMQ         ,                  
        String queueName = channel.queueDeclare().getQueue();
        // binding
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(pid + "    ,      ...");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        //        
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            // Delivery :      ,     
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String recieveMsg = new String(delivery.getBody());
            System.out.println(pid + "      : " + recieveMsg);
        }
    }
}

출력 결과:
생산 자 는 다음 두 가지 데 이 터 를 생산 했다.
1450345916316 have log sth… 1450345918852 have log sth…
소비자 2980
2980 수신 소식: 1450345916316 have log sth... 2980 수신 소식: 1450345918852 have log sth...
소비자
484 수신 소식: 1450345916316 have log sth... 484 수신 소식: 1450345918852 have log sth...

좋은 웹페이지 즐겨찾기