RabbitMQ 에서 Work queues 작업 대기 열 모드 구현

모델 설명

Work Queues 는 입문 프로그램의 간단 한 모델 에 비해 하나 또는 일부 소비 단 이 많 고 여러 소비 단 이 같은 대기 열 에 있 는 정 보 를 공동으로 소비 합 니 다.
응용 장면:작업 이 너무 무 겁 거나 작업 이 많은 경우 작업 대기 열 을 사용 하면 작업 처리 속 도 를 높 일 수 있 습 니 다.
코드
Work Queues 는 입문 프로그램의 간단 한 모델 코드 와 거의 같 습 니 다.완전히 복사 하고 여러 소비 자 를 복사 하여 여러 소비자 가 동시에 정 보 를 소비 하 는 테스트 를 할 수 있 습 니 다.
① 생산자

package com.itheima.rabbitmq.work; 
import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
public class Producer { 
	static final String QUEUE_NAME = "work_queue"; 
	public static void main(String[] args) throws Exception { 
		//     
		Connection connection = ConnectionUtil.getConnection(); 
		//      
		Channel channel = connection.createChannel(); 
		//   (  )   
		/**
		 *   1:     
		 *   2:          
		 *   3:         
		 *   4:                
		 *   5:       
		*/ 
		channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
		for (int i = 1; i <= 30; i++) { 
			//      
			String message = "  ;   !work  --" + i; 
			/**
			 *   1:     ,           Default Exchage 
			 *   2:  key,             
			 *   3:       
			 *   4:     
			*/ 
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
			System.out.println("     :" + message); 
		}
		//      
		channel.close(); connection.close(); 
	} 
}
② 소비자 1

package com.itheima.rabbitmq.work; 
import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*;
import java.io.IOException; 
public class Consumer1 { 
	public static void main(String[] args) throws Exception { 
		Connection connection = ConnectionUtil.getConnection(); 
		//      
		Channel channel = connection.createChannel(); 
		//   (  )   
		/**
		 *   1:     
		 *   2:          
		 *   3:         
		 *   4:                
		 *   5:       
		*/ 
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); 
		//              
		channel.basicQos(1); 
		//     ;        
		DefaultConsumer consumer = new DefaultConsumer(channel){ 
			@Override 
			/**
			 * consumerTag      , channel.basicConsume       
			 * envelope       ,       id,  routingkey,   ,       (               ) 
			 * properties      
			 * body    
			*/ 
			public void handleDelivery(String consumerTag, Envelope envelope, 
					AMQP.BasicProperties properties, byte[] body) throws IOException { 
				try {
					//  key 
					System.out.println("  key :" + envelope.getRoutingKey()); 
					//    
					System.out.println("    :" + envelope.getExchange()); 
					//  id 
					System.out.println("  id :" + envelope.getDeliveryTag()); 
					//      
					System.out.println("   1-       :" + new String(body, "utf-8")); 
					Thread.sleep(1000); 
					//     
					channel.basicAck(envelope.getDeliveryTag(), false); 
				} 
				catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
			} 
		};
		//     
		/**
		 *   1:    
		 *   2:      ,   true           mq      ,mq          ,   false        
		 *   3:         
		*/ 
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer); 
	} 
}
③ 소비자 2

package com.itheima.rabbitmq.work; 
import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*; 
import java.io.IOException; 
public class Consumer2 { 
	public static void main(String[] args) throws Exception { 
		Connection connection = ConnectionUtil.getConnection(); 
		//      
		Channel channel = connection.createChannel(); 
		//   (  )   
		/**
		 *   1:     
		 *   2:          
		 *   3:         
		 *   4:                
		 *   5:       
		*/ 
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); 
		//              
		channel.basicQos(1); 
		//     ;        
		DefaultConsumer consumer = new DefaultConsumer(channel){ 
			@Override 
			/**
			 * consumerTag      , channel.basicConsume       
			 * envelope       ,       id,  routingkey,   ,       (               ) 
			 * properties      
			 * body    
			*/ 
			public void handleDelivery(String consumerTag, Envelope envelope, 
					AMQP.BasicProperties properties, byte[] body) throws IOException { 
				try {
					//  key 
					System.out.println("  key :" + envelope.getRoutingKey()); 
					//    
					System.out.println("    :" + envelope.getExchange()); 
					//  id 
					System.out.println("  id :" + envelope.getDeliveryTag());
					//      
					System.out.println("   2-       :" + new String(body, "utf-8")); 
					Thread.sleep(1000); 
					//     
					channel.basicAck(envelope.getDeliveryTag(), false); 
				} catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
			} 
		};
		//     
		/**
		 *   1:     
		 *   2:      ,   true           mq      ,mq          ,   false        
		 *   3:         
		*/ 
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer); 
	} 
}
테스트
두 소비 자 를 시작 한 다음 에 생산자 가 메 시 지 를 보 내 는 것 을 시작 합 니 다.IDEA 의 두 소비자 가 대응 하 는 콘 솔 에 가서 경쟁 적 으로 메 시 지 를 받 았 는 지 확인 하 세 요.


총결산
한 대열 에 여러 명의 소비자 가 있다 면 소비자 간 에 같은 소식 에 대한 관 계 는 경쟁 관계 이다.
RabbitMQ 에서 Work queues 모드 를 어떻게 실현 하 는 지 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.도움 이 되 셨 으 면 좋 겠 습 니 다.더 많은 관련 RabbitMQ 내용 은 저희 의 이전 글 을 검색 하거나 아래 의 관련 글 을 계속 찾 아 보 세 요.앞으로 많은 응원 부 탁 드 리 겠 습 니 다!

좋은 웹페이지 즐겨찾기