RabbitMQ 메시지 확인 및 공정 스케줄링 소비자

1. 메시지 확인
메시지가 반드시 소비자에게 처리되는 것을 확보하기 위해rabbitMQ는 메시지 확인 기능을 제공했다. 소비자가 임무를 처리한 후에 서버에 피드백을 하면 서버는 이 메시지를 삭제하고 소비자가 시간을 초과하여 피드백을 하지 않으면 서버는 이 정보를 다른 소비자에게 다시 발송한다.
기본값은 켜져 있습니다. 소비자 측에서 아래의 방식으로 메시지 확인을 켜면 먼저 autoAck을 자동으로 닫습니다. 우리의 임무 수행이 끝난 후에 수동으로 확인하십시오. JDBC의 autocommit와 같습니다.
QueueingConsumer consumer = new QueueingConsumer(channel);boolean autoAck = false;channel.basicConsume("hello", autoAck, consumer);

앞의 예에서 사용한 것은 채널이다.basicConsume(channelName, true, consumer) ; 메시지를 받은 후 자동으로 서버에 메시지를 피드백합니다.
다음은 메시지 확인 기능을 테스트하는 예입니다.
Sender03.java
  • package com.zf.rabbitmq03;  
      
    import java.io.IOException;  
      
    import com.rabbitmq.client.Channel;  
    import com.rabbitmq.client.Connection;  
    import com.rabbitmq.client.ConnectionFactory;  
      
    /** 
     *   
     * @author zhoufeng 
     * 
     */  
    public class Sender03 {  
          
        public static void main(String[] args) throws IOException {  
              
              
            ConnectionFactory connFac = new ConnectionFactory() ;  
              
            //RabbitMQ-Server , 127.0.0.1  
            connFac.setHost("127.0.0.1");  
              
            //   
            Connection conn = connFac.newConnection() ;  
              
            //   
            Channel channel = conn.createChannel() ;  
              
            // Queue   
            String queueName = "queue01" ;  
              
            // channel queue ,queueName Queue   
            channel.queueDeclare( queueName , false, false, false, null) ;  
              
            String msg = "Hello World!";  
              
            //   
            channel.basicPublish("", queueName , null , msg.getBytes());  
              
            System.out.println("send message[" + msg + "] to "+ queueName +" success!");  
              
            channel.close();   
            conn.close();   
              
        }  
      
    }

  • 및 Sender01.자바와 마찬가지로 별 차이가 없다.
    Recv03.java
    package com.zf.rabbitmq03;  
      
    import java.io.IOException;  
      
    import com.rabbitmq.client.Channel;  
    import com.rabbitmq.client.Connection;  
    import com.rabbitmq.client.ConnectionFactory;  
    import com.rabbitmq.client.ConsumerCancelledException;  
    import com.rabbitmq.client.QueueingConsumer;  
    import com.rabbitmq.client.QueueingConsumer.Delivery;  
    import com.rabbitmq.client.ShutdownSignalException;  
      
    /** 
     *   
     * @author zhoufeng 
     * 
     */  
    public class Recv03 {  
      
        public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {  
              
            ConnectionFactory connFac = new ConnectionFactory() ;  
              
            connFac.setHost("127.0.0.1");  
              
            Connection conn = connFac.newConnection() ;  
              
            Channel channel = conn.createChannel() ;  
              
            String channelName = "channel01";  
              
            channel.queueDeclare(channelName, false, false, false, null) ;  
              
              
            //   
            QueueingConsumer consumer = new QueueingConsumer(channel) ;  
              
      
            //  autoAck  
            boolean autoAck = false ;  
              
            channel.basicConsume(channelName, autoAck, consumer) ;  
              
            //   
            while(true){  
                  
                // , ,   
                Delivery delivery = consumer.nextDelivery() ;  
                  
                String msg = new String(delivery.getBody()) ;    
                  
                // ,   
                channel.basicAck(delivery.getEnvelope().getDeliveryTag()  
                        , false);  
                  
                System.out.println("received message[" + msg + "] from " + channelName);  
            }  
              
        }  
          
    }

    주의: autoAck를 닫으면 메시지를 처리한 후 서버에 메시지를 확인하는 것을 잊지 마십시오.그렇지 않으면 서버에서 이 메시지를 계속 전달할 것이다
    위의 채널을.basicAck(delivery.getEnvelope().getDeliveryTag(), false);메모 삭제, Sender03.java는 한 번만 실행할 수 있습니다. Recv03.자바는 실행할 때마다 HelloWorld 메시지를 받습니다.
    참고:
    하지만 이 정도로는 부족합니다. 만약rabbitMQ-Server가 갑자기 끊기면 아직 읽히지 않은 메시지를 잃어버리기 때문에 우리는 메시지를 지속시킬 수 있습니다.Queue를 정의할 때 지속성 메시지를 설정하면 됩니다. 방법은 다음과 같습니다.
    boolean durable = true;channel.queueDeclare(channelName, durable, false, false, null);

    이렇게 설정하면 서버가 메시지를 받은 후 바로 하드디스크에 메시지를 기록하여 갑작스런 서버 종료로 인한 데이터 분실을 방지할 수 있다.그러나 서버가 메시지를 받자마자 하드디스크에 기록하지 못하고 끊어버리면 메시지의 분실을 피할 수 없다.
    2. 공평한 스케줄링
    이전 예는 메시지를 보내고 받는 것을 실현할 수 있다
    이전 Recv01에서 알 수 있듯이 메시지를 다 처리해야 다음 메시지를 받을 수 있습니다.만약 생산자가 매우 많다면, 소비자는 틀림없이 바쁠 것이다.이때 여러 소비자로 같은 채널의 소식을 처리할 수 있고 여러 소비자에게 공평하게 임무를 분배해야 한다.부분적으로 바쁘면 안 돼요. 부분적으로 항상 한가해요.
    공평한 스케줄링을 실현하는 방식은 모든 소비자가 같은 시간에 임무를 분배하도록 하는 것이다.채널을 통해.basicQos(1);설정 가능

    좋은 웹페이지 즐겨찾기