rabbitMQ 학습노트(3) 메시지 확인 및 공정 스케줄링 소비자
1. 메시지 확인
메시지가 반드시 소비자에게 처리되는 것을 확보하기 위해rabbitMQ는 메시지 확인 기능을 제공했다. 소비자가 임무를 처리한 후에 서버에 피드백을 하면 서버는 이 메시지를 삭제하고 소비자가 시간을 초과하여 피드백을 하지 않으면 서버는 이 정보를 다른 소비자에게 다시 발송한다.
기본값은 켜져 있습니다. 소비자 측에서 아래의 방식으로 메시지 확인을 켜면 먼저 autoAck을 자동으로 닫습니다. 우리의 임무 수행이 끝난 후에 수동으로 확인하십시오. JDBC의 autocommit와 같습니다.
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
앞의 예에서 사용한 것은 채널이다.basicConsume(channelName, true, consumer) ; 메시지를 받은 후 자동으로 서버에 메시지를 피드백합니다.다음은 메시지 확인 기능을 테스트하는 예입니다.
Sender03.java
package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
*
* @author zhoufeng
*
*/
public class Sender03 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server , 127.0.0.1
connFac.setHost("127.0.0.1");
//
Connection conn = connFac.newConnection() ;
//
Channel channel = conn.createChannel() ;
// Queue
String queueName = "queue01" ;
// channel queue ,queueName Queue
channel.queueDeclare( queueName , false, false, false, null) ;
String msg = "Hello World!";
//
channel.basicPublish("", queueName , null , msg.getBytes());
System.out.println("send message[" + msg + "] to "+ queueName +" success!");
channel.close();
conn.close();
}
}
및 Sender01.자바와 마찬가지로 별 차이가 없다.
Recv03.java
package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
*
* @author zhoufeng
*
*/
public class Recv03 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String channelName = "channel01";
channel.queueDeclare(channelName, false, false, false, null) ;
//
QueueingConsumer consumer = new QueueingConsumer(channel) ;
// autoAck
boolean autoAck = false ;
channel.basicConsume(channelName, autoAck, consumer) ;
//
while(true){
// , ,
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
// ,
channel.basicAck(delivery.getEnvelope().getDeliveryTag()
, false);
System.out.println("received message[" + msg + "] from " + channelName);
}
}
}
주의: autoAck를 닫으면 메시지를 처리한 후 서버에 메시지를 확인하는 것을 잊지 마십시오.그렇지 않으면 서버에서 이 메시지를 계속 전달할 것이다
위의 채널을.basicAck(delivery.getEnvelope().getDeliveryTag(), false);메모 삭제, Sender03.java는 한 번만 실행할 수 있습니다. Recv03.자바는 실행할 때마다 HelloWorld 메시지를 받습니다.
참고:
하지만 이 정도로는 부족합니다. 만약rabbitMQ-Server가 갑자기 끊기면 아직 읽히지 않은 메시지를 잃어버리기 때문에 우리는 메시지를 지속시킬 수 있습니다.Queue를 정의할 때 지속성 메시지를 설정하면 됩니다. 방법은 다음과 같습니다.
boolean durable = true;
channel.queueDeclare(channelName, durable, false, false, null);
이렇게 설정하면 서버가 메시지를 받은 후 즉시 메시지를 하드디스크에 기록하여 갑작스런 서버 종료로 인한 데이터 분실을 방지할 수 있다.그러나 서버가 메시지를 받자마자 하드디스크에 기록하지 못하고 끊어버리면 메시지의 분실을 피할 수 없다.2. 공평한 스케줄링
이전 예는 메시지를 보내고 받는 것을 실현할 수 있다
이전 Recv01에서 알 수 있듯이 메시지를 다 처리해야 다음 메시지를 받을 수 있습니다.만약 생산자가 매우 많다면, 소비자는 틀림없이 바쁠 것이다.이때 여러 소비자로 같은 채널의 소식을 처리할 수 있고 여러 소비자에게 공평하게 임무를 분배해야 한다.부분적으로 바쁘면 안 돼요. 부분적으로 항상 한가해요.
공평한 스케줄링을 실현하는 방식은 모든 소비자가 같은 시간에 임무를 분배하도록 하는 것이다.채널을 통해.basicQos(1);설정 가능
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.