RabbitMQ 단순 대기 열 인 스 턴 스 및 원리 분석

이 글 은 RabbitMQ 단순 대기 열 실례 와 원리 해석 을 소개 하 였 으 며,예시 코드 를 통 해 매우 상세 하 게 소개 하 였 으 며,여러분 의 학습 이나 업무 에 대해 일정한 참고 학습 가 치 를 가지 고 있 으 므 로 필요 한 분 들 은 참고 하 시기 바 랍 니 다.
RabbitMQ 약술
RabbitMQ 는 메시지 에이전트 입 니 다.메 시 지 를 받 아들 이 고 전달 합 니 다.우체국 으로 볼 수 있 습 니 다.보 낸 메 일 을 메 일 로 보 낼 때 Postman 씨 가 최종 적 으로 받 는 사람 에 게 메 일 을 보 낼 것 이 라 고 확신 할 수 있 습 니 다.이 비유 에서 RabbitMQ 는 메 일 박스,우체국,우 체 부 로 바 이 너 리 데이터 블록 을 받 아들 이 고 저장 하 며 전달 하 는 데 사용 된다.
대열 은 마치 RabbitMQ 에서 메 일 박스 역할 을 하 는 것 과 같다.메 시 지 는 RabbitMQ 와 응용 프로그램 을 거 쳤 지만 대기 열 에 만 저장 할 수 있 습 니 다.대기 열 은 호스트 의 메모리 와 디스크 에 만 제한 되 어 있 으 며,본질 적 으로 큰 메시지 버퍼 입 니 다.많은 생산자 들 이 한 대열 에 메 시 지 를 보 낼 수 있 고,많은 소비자 들 이 한 대열 에서 데 이 터 를 받 으 려 고 시도 할 수 있다.
producer 는 생산자 로 메 시 지 를 대기 열 에 보 내 는 데 사용 합 니 다.consumer 는 소비자 이 므 로 대열 안의 소식 을 읽 어야 한다.producer,consumer 와 broker(rabbitMQ server)는 같은 호스트 에 머 물 필요 가 없습니다.확실히 대부분의 응용 프로그램 에서 그것들 은 이렇게 분포 되 어 있다.
단순 대기 열
간단 한 대열 은 가장 간단 한 모델 로 생산자,대열,소비자 로 구성 된다.생산 자 는 메 시 지 를 대기 열 에 보 내 고 소비 자 는 대기 열 에서 메 시 지 를 읽 어 소 비 를 완성 합 니 다.
아래 그림 에서'P'는 우리 의 생산자 이 고'C'는 우리 의 소비자 이다.가운데 상 자 는 대기 열-RabbitMQ 가 소비 자 를 대표 하 는 메시지 버퍼 입 니 다.

자바 방식
생산자

package com.anqi.mq.nat;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MyProducer {
  private static final String QUEUE_NAME = "ITEM_QUEUE";

  public static void main(String[] args) throws Exception {
    //1.      ConnectionFactory      
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");

    //2.            
    Connection connection = factory.newConnection();

    //3.    Connection     Channel
    Channel channel = connection.createChannel();

    //     ,    json     
    String msg = "hello";
    //4.       
    for (int i = 1; i <= 3 ; i++) {
      channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
      System.out.println("Send message" + i +" : " + msg);
    }

    //5.     
    channel.close();
    connection.close();
  }
}

  /**
   * Declare a queue
   * @param queue the name of the queue
   * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
   * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
   * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
   * @param arguments other properties (construction arguments) for the queue
   * @return a declaration-confirm method to indicate the queue was successfully declared
   * @throws java.io.IOException if an error is encountered
   */
  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

  /**
   * Publish a message
   * @see com.rabbitmq.client.AMQP.Basic.Publish
   * @param exchange the exchange to publish the message to
   * @param routingKey the routing key
   * @param props other properties for the message - routing headers etc
   * @param body the message body
   * @throws java.io.IOException if an error is encountered
   */
  void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;


  /**
   * Start a non-nolocal, non-exclusive consumer, with
   * a server-generated consumerTag.
   * @param queue the name of the queue
   * @param autoAck true if the server should consider messages
   * acknowledged once delivered; false if the server should expect
   * explicit acknowledgements
   * @param callback an interface to the consumer object
   * @return the consumerTag generated by the server
   * @throws java.io.IOException if an error is encountered
   * @see com.rabbitmq.client.AMQP.Basic.Consume
   * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
   * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
   */
  String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
소비자

package com.anqi.mq.nat;

import com.rabbitmq.client.*;
import java.io.IOException;

public class MyConsumer {

  private static final String QUEUE_NAME = "ITEM_QUEUE";

  public static void main(String[] args) throws Exception {
    //1.      ConnectionFactory      
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");

    //2.            
    Connection connection = factory.newConnection();

    //3.    Connection     Channel
    Channel channel = connection.createChannel();

    //4.       
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    /*
      true:      ,          ,                 ,            
      false:      ,        ,                ,        ,      
           ,               ,                ,         ,
              。
    */

    //5.           
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };

    //6.    Channel        
    channel.basicConsume(QUEUE_NAME, true, consumer);

  }
}

Send message1 : hello
Send message2 : hello
Send message3 : hello

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hello'
 [x] Received 'hello'
 [x] Received 'hello'
우리 가 생산 자 를 시작 한 후에 RabbitMQ 관리 배경 을 보면 소 비 를 기다 리 고 있다 는 소식 을 볼 수 있다.

우리 가 소비 자 를 시작 한 후에 다시 살 펴 보면 밀 린 소식 이 이미 소비 되 었 음 을 볼 수 있다.

총결산
  • 대기 열 성명 queueDeclare 의 매개 변수:첫 번 째 매개 변 수 는 대기 열 이름 을 표시 하고 두 번 째 매개 변 수 는 지속 여부(true 는 대기 열 이 서버 를 다시 시작 할 때 생존 할 것 임 을 표시 합 니 다),세 번 째 매개 변 수 는 독점 대기 열 인지 여부(작성 자가 사용 할 수 있 는 개인 대기 열 입 니 다.끊 으 면 자동 으로 삭 제 됩 니 다),네 번 째 매개 변 수 는 모든 소비자 클 라 이언 트 의 연결 이 끊 겼 을 때 대기 열 을 자동 으로 삭제 할 지,다섯 번 째 매개 변 수 는 대기 열의 다른 매개 변 수 를 자동 으로 삭제 할 지 여부 입 니 다
  • basic Consume 의 두 번 째 매개 변수 autoAck:응답 모드,true:자동 응답,즉 소비자 가 메 시 지 를 얻 으 면 이 메 시 지 는 대기 열 에서 삭 제 됩 니 다.false:수 동 응답,대기 열 에서 메 시 지 를 꺼 낸 후에 프로그래머 가 수 동 으로 호출 하 는 방법 으로 응답 해 야 합 니 다.만약 응답 이 없 으 면 이 메 시 지 는 대기 열 에 다시 넣 을 것 입 니 다.이 소식 이 계속 소비 되 지 않 는 현상 이 나타 날 것 이다
  • 이러한 간단 한 대기 열 모드 는 시스템 이 모든 대기 열 에 기본 교환 기 를 암시 적 으로 연결 합 니 다.교환기 이름 은"(AMQP default)"이 고 형식 은 직접 direct 입 니 다.수 동 으로 대기 열 을 만 들 때 시스템 은 자동 으로 이 대기 열 을 빈 Direct 형식의 교환기 에 연결 합 니 다.연 결 된 경로 키 routing key 는 대기 열 이름과 같 습 니 다.channel.queueBind 에 해당 합 니 다.NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");인 스 턴 스 는 명시 적 으로 교환기 가 없 지만 경로 키 가 대기 열 이름과 같 을 때 이 기본 교환기 에 메 시 지 를 보 냅 니 다.이런 방식 은 비교적 간단 하지만 복잡 한 업무 수 요 를 만족 시 킬 수 없 기 때문에 생산 환경 에서 이런 방식 을 거의 사용 하지 않 는 다
  • The default exchange is implicitly bound to every queue,with a routing key equal to the queue name.It is not possible to explicitly bind to,or unbind from the default exchange.It also not be deleted.기본 교환기 가 모든 대기 열 에 암시 적 으로 연결 되 어 있 으 며,경로 키 는 대기 열 이름 과 같 습 니 다.명시 적 으로 연결 되 거나 결 성 교환 에서 연결 을 해제 할 수 없습니다.그것 도 삭제 할 수 없다. ――RabbitMQ 공식 문서 에서 인용 합 니 다
  • spring-amqp 방식
    Maven 의존 도입
    
        <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.6.0</version>
        </dependency>    
            <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit</artifactId>
          <version>2.1.5.RELEASE</version>
        </dependency>
    spring 프로필
    
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/rabbit
          https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
          http://www.springframework.org/schema/beans
          https://www.springframework.org/schema/beans/spring-beans.xsd">
    
      <rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="/"
      username="guest" password="guest"/>
      <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
      <rabbit:admin connection-factory="connectionFactory"/>
      <rabbit:queue name="MY-QUEUE"/>
    </beans>
    사용 테스트
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class Main {
      public static void main(String[] args) {
        ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml");
        AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class);
        amqpTemplate.convertAndSend("MY-QUEUE", "Item");
        String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE");
        System.out.println(msg);
      }
    }
    참고 방법
    
    /**
     * Convert a Java object to an Amqp {@link Message} and send it to a specific exchange
     * with a specific routing key.
     *
     * @param exchange the name of the exchange
     * @param routingKey the routing key
     * @param message a message to send
     * @throws AmqpException if there is a problem
     */
    void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
    /**
       * Receive a message if there is one from a specific queue and convert it to a Java
       * object. Returns immediately, possibly with a null value.
       *
       * @param queueName the name of the queue to poll
       * @return a message or null if there is none waiting
       * @throws AmqpException if there is a problem
       */
    @Nullable
    Object receiveAndConvert(String queueName) throws AmqpException;
    이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

    좋은 웹페이지 즐겨찾기