RabbitMQ의 세 가지 Exchange, 데이터 지속성, 비지속성 사례

9235 단어 Java-RabbitMQ
최근 프로젝트는 Rabbitmq의 데이터 지속화 기술을 사용해야 하기 때문에 여가 시간을 이용하여 각각 Rabbitmq의 세 가지 자주 사용하는 Exchange(direct,fanout,topic)에 대해 테스트 실례를 썼고 초보자만 참고하여 공부할 수 있도록 하며 각 분야의 정신을 굽히지 않기를 바랍니다.
개발 전에 인용 패키지가 필요합니다. 가장 좋은 것은 3.4.0 이하 버전입니다. 상기 버전을 시험해 보았기 때문에 시간이 초과되었습니다. 구체적인 원인은 아직 연구하지 않았습니다. 연구한 대신의 댓글로 알려주시기 바랍니다. 감사합니다.
		
			com.rabbitmq
			amqp-client
			3.4.0
		

1.direct(게시 및 구독)
1.1 생산자(Direct)
public class Direct {
	private static final String EXCHANGE_NAME = "temp_direct";  
    private static final String[] TYPE = { "info", "warning", "error" };  
    
    public static void main(String[] argv) throws java.io.IOException  
    {  
        //    
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("192.168.32.129");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //    
        channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);  
  
        // 6   
        for (int i = 0; i 

1.2 소비자
public class ReceiveDirect {
	private static final String EXCHANGE_NAME = "temp_direct";  
	private final static String HOST = "192.168.32.129";
    private static final String[] TYPE = { "info", "warning", "error" };  
    
    public static void main(String[] argv) throws java.io.IOException,  
    java.lang.InterruptedException  
	{  
	//    
	ConnectionFactory factory = new ConnectionFactory();  
	factory.setHost(HOST);  
	Connection connection = factory.newConnection();  
	final Channel channel = connection.createChannel();  
	//  direct   
	channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);  
	for (int i = 0; i < TYPE.length; i++) {
		// 
		channel.queueDeclare(TYPE[i], true, false, false, null);
		// 
		channel.basicQos(1);
		// Exchange
		channel.queueBind(TYPE[i], EXCHANGE_NAME, TYPE[i]);
		 
		System.out.println(" " + TYPE[i] + " !");
	}
	
	for (int i = 0; i < TYPE.length; i++) {
	      final String queue = TYPE[i];
	      new Thread(){
	        public void run() {
	          try {
	            receive(channel, queue);
	          } catch (Exception e) {
	            e.printStackTrace();
	          }
	        }
	      }.start();
	    }
	}  
	
	private static void receive(Channel channel,String QUEUE_NAME) throws Exception {
	    //  
	    QueueingConsumer consumer = new QueueingConsumer(channel);
	    channel.basicConsume(QUEUE_NAME, false, consumer);
	    while (true) {
	      //  
	      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
	      String message = new String(delivery.getBody(), "UTF-8");
	      System.out.println(QUEUE_NAME + " Received '" + message + "'");
	      //  
	      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
	    }
	}
}

2. fanout (라디오)
2.1 생산자
public class Fanout {
	 private final static String HOST = "192.168.32.129";
	 private final static String EXCHANGE_NAME = "fanout";
	 private final static String QUEUE = "temp_fanout";
	 private final static String ROUTKEY = "mq.fanout";
	 private final static boolean DURABLE = true;
	 public static void main(String[] args) throws IOException, TimeoutException{
		 	//    
	        ConnectionFactory factory = new ConnectionFactory();  
	        factory.setHost(HOST);  
	        Connection connection = factory.newConnection();  
	        Channel channel = connection.createChannel();  
	        //    
	        channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );  
	        // 
	        channel.queueDeclare(QUEUE, DURABLE, false, false, null);
	        channel.basicQos(1);
	        channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);
	        String message = new Date().getTime()+" : fanout something";  
	        //    
	        channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  
	  
	        System.out.println(" [x] Sent '" + message + "'");  
	  
	        channel.close();  
	        connection.close();  
	 }
}
2.2 소비자
public class ReceiveFanout {
	 private final static String HOST = "192.168.32.129";
	 private final static String EXCHANGE_NAME = "fanout";
	 private final static String QUEUE = "temp_fanout";
	 private final static String ROUTKEY = "mq.fanout";
	 private final static boolean DURABLE = true;
	public static void main(String[] args) throws java.io.IOException,  
    java.lang.InterruptedException, TimeoutException{
		//    
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost(HOST);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
  
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 
        // 
        channel.queueDeclare(QUEUE, DURABLE, false, false, null);
        channel.basicQos(1);
        //  、   
        //String queueName = channel.queueDeclare().getQueue();  
        //  , binding  
        channel.queueBind(QUEUE, EXCHANGE_NAME,ROUTKEY);  
  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        //  , ,   
        channel.basicConsume(QUEUE, true, consumer);  
  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println(" [x] Received '" + message + "'");  
  
        }  
	}
}

3.topic(주제)
3.1 생산자
public class Topic {
	private final static String HOST = "192.168.32.129";
	private static final String EXCHANGE_NAME = "topic_Exc";
	private static final String QUEUE = "temp_wwww";
	private static final String ROUTKEY="*_topic";
	private static final boolean durable = true;
	public static void main(String[] argv) throws Exception  
    {  
        //    
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost(HOST);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 
        channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable);  
        // 
        channel.queueDeclare(QUEUE, durable, false, false, null);
        channel.basicQos(1);
        // Exchange
        channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);
        
        String msg = UUID.randomUUID().toString();
        
        channel.basicPublish(EXCHANGE_NAME, ROUTKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg  
                .getBytes()); 
        System.out.println(msg);
  
        channel.close();  
        connection.close();  
    }  
}

3.2 소비자
public class ReceiveTopicFortopic{
	private final static String HOST = "192.168.32.129";
	private static final String EXCHANGE_NAME = "topic_Exc";
	private static final String QUEUE = "temp_topic";
	private static final String ROUTKEY="*_topic";
	 private static final boolean durable = true;
	 public static void main(String[] argv) throws Exception  
	    {  
	        //    
	        ConnectionFactory factory = new ConnectionFactory();  
	        factory.setHost(HOST);  
	        Connection connection = factory.newConnection();  
	        Channel channel = connection.createChannel();  
	        //    
	        channel.exchangeDeclare(EXCHANGE_NAME, "topic",durable); 
	        // 
	        channel.queueDeclare(QUEUE, durable, false, false, null);
	        channel.basicQos(1);
	        // Exchange
	        channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTKEY);

	        System.out  
	                .println(" [*] Waiting for critical messages. To exit press CTRL+C");  
	        // 
	        QueueingConsumer consumer = new QueueingConsumer(channel);  
	        channel.basicConsume(QUEUE, true, consumer);  
	  
	        while (true)  
	        {  
	            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
	            String message = new String(delivery.getBody());  
	            String routingKey = delivery.getEnvelope().getRoutingKey();  
	  
	            System.out.println(" [x] Received routingKey = " + routingKey  
	                    + ",msg = " + message + ".");  
	        }  
	    }  
}

다음은 Exchange 인스턴스 세 가지입니다.

좋은 웹페이지 즐겨찾기