백화 RabbitMQ (6): RPC
RabbitMQ 특 강
https://segmentfault.com/l/15...
CoolMQ 오픈 소스 프로젝트
우 리 는 메시지 큐 를 이용 하여 분포 식 업무 의 최종 일치 성 해결 방안 을 실현 하 였 으 니, 여러분 들 이 지 켜 보 세 요.원본 코드 참조 가능:https://github.com/vvsuperman... 프로젝트 지원 사이트:http://rabbitmq.org.cn최신 글 이나 실현 은 위 에 업 데 이 트 됩 니 다.
성명 RPC 인터페이스
RPC 를 설명 하기 위해 서 우 리 는 먼저 클 라 이언 트 인 터 페 이 스 를 만 듭 니 다. 방법 이 있 습 니 다. RPC 요청 을 할 것 이 고 결과 가 돌아 올 때 까지 계속 막 을 것 입 니 다.
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
RPC 는 RPC 가 흔 하지만 조심해 서 사용 해 야 합 니 다. rpc 가 매우 느 린 프로그램 이 라 고 가정 하면 결 과 를 예측 할 수 없고 디 버 깅 하기 어렵 습 니 다.
RPC 를 사용 할 때 아래 의 규범 을 참고 할 수 있 습 니 다.
RabbitMQ 로 RPC 를 실현 하 는 것 이 비교적 간단 합 니 다. 클 라 이언 트 가 요청 을 하면 서버 에서 이 요청 에 대한 응답 을 되 돌려 줍 니 다.이 기능 을 실현 하기 위해 서 우 리 는 '리 셋' 할 수 있 는 대기 열 이 필요 합 니 다. 우 리 는 직접 기본 대기 열 을 사용 하면 됩 니 다.
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
메시지 속성 (메시지 속성)
AMQP 0 - 9 - 1 프로 토 콜 은 모든 메시지 에 14 개의 속성 을 정의 합 니 다. 많은 속성 이 사용 되 지 않 지만 다음 과 같은 몇 가 지 를 주의해 야 합 니 다.
우 리 는 상응하는 가방 을 도입 해 야 한다.
import com.rabbitmq.client.AMQP.BasicProperties;
관계 Id (상관 Id)
앞의 방법 에서 우 리 는 모든 RPC 요청 에 하나의 대기 열 을 만 들 었 습 니 다. 이것 은 전혀 필요 하지 않 습 니 다. 우 리 는 모든 클 라 이언 트 에 하나의 대기 열 을 만 들 면 됩 니 다.
모든 RPC 가 하나의 대기 열 을 사용 하기 때문에 메시지 가 돌아 오 면 돌아 오 는 메시지 가 어떤 요청 에 대응 하 는 지 어떻게 압 니까?그래서 저 희 는 Correlation Id 를 사 용 했 습 니 다. 모든 요청 에 유일한 표지 로 서 반환 값 을 받 은 후에 이 Id 를 검사 하고 해당 하 는 응답 과 일치 합 니 다.Id 에 대응 하 는 요청 을 찾 지 못 하면 버 립 니 다.
여기 서 당신 은 왜 알 수 없 는 소식 을 버 려 야 하 는 지 의문 이 있 을 수 있 습 니 다.이상 한 거 던 지 는 게 아니 라이것 은 우리 서버 의 경쟁 조건 (possibility of a race condition) 과 관계 가 있 을 것 이다.예 를 들 어 저희 RabbitMQ 서비스 가 끊 겼 다 고 가정 하면 저희 에 게 답장 을 하 자마자 응답 을 기다 리 지 않 고 서버 가 끊 겼 습 니 다. 그러면 RabbitMQ 서비스 가 재 개 될 때 메 시 지 를 다시 보 냅 니 다. 클 라 이언 트 는 중복 되 는 메 시 지 를 받 을 것 입 니 다. 등 을 고려 하기 위해 저 희 는 돌아 온 후의 처리 방식 을 자세히 처리 해 야 합 니 다.
작은 매듭
RPC 작업 과정 은 다음 과 같 습 니 다.
클 라 이언 트 가 시 작 될 때 독립 된 익명 리 셋 대기 열 을 만 들 고 RPC 요청 을 보 냅 니 다. 이 RPC 요청 은 두 가지 속성 을 가 져 옵 니 다. reply To - RPC 호출 이 성공 하면 되 돌아 가 야 할 대기 열 이름 입 니 다.correlationId - 요청 마다 유일한 표식 입 니 다.RPC 서비스 제공 자 는 대기 열 에서 기다 리 고 있 습 니 다. 요청 이 도착 하면 즉시 응답 하여 자신의 일 을 끝내 고 결 과 를 되 돌려 줍 니 다. reply To 에 따라 해당 하 는 대기 열 로 돌아 갑 니 다.클 라 이언 트 도 대기 열 에 있 는 정보 가 돌아 오 기 를 기다 리 고 있 습 니 다. 메시지 가 나타 나 면 correlationId 를 검사 하고 결 과 를 응답 요청 발기인 에 게 되 돌려 줍 니 다.
통합
피 보 나치 급수
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
우 리 는 fibonacci 급 수 를 정의 합 니 다. 정수 만 받 아들 일 수 있 고 효율 이 그다지 높 지 않 은 것 입 니 다.rpc. java 는 다음 과 같다
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] argv) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
String response = "";
try {
String message = new String(body,"UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
}
catch (RuntimeException e){
System.out.println(" [.] " + e.toString());
}
finally {
channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized(this) {
this.notify();
}
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized(consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
if (connection != null)
try {
connection.close();
} catch (IOException _ignore) {}
}
}
}
서버 의 코드 가 비교적 직접적 입 니 다. 먼저 연결 을 만 들 고 channel 과 성명 대기 열 을 만 듭 니 다.우 리 는 이후 에 여러 소비 자 를 만 들 수 있 습 니 다. 더 좋 은 부하 균형 을 위해 channel. basicQos 에 prefetchCount 를 설정 한 다음 에 basicConsume 감청 대기 열 을 설정 하여 응답 함 수 를 제공 하여 요청 과 반환 값 을 처리 해 야 합 니 다.
RPCClient.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
}
public String call(String message) throws IOException, InterruptedException {
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue response = new ArrayBlockingQueue(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException {
connection.close();
}
//...
}
클 라 이언 트 코드 는 다음 과 같 습 니 다. 우 리 는 연결 을 만 들 고 'callback' 대기 열 을 설명 합 니 다. 우 리 는 'callback' 대기 열 에 메 시 지 를 제출 하고 RPC 의 반환 값 을 받 을 것 입 니 다. 구체 적 인 절 차 는 다음 과 같 습 니 다.
저 희 는 먼저 유일한 correlation Id 를 만 들 고 저장 합 니 다. 저 희 는 이 를 통 해 받 은 정 보 를 구분 할 것 입 니 다.그리고 이 메 시 지 를 보 내 면 메 시 지 는 reply To 와 collelation Id 두 가지 속성 을 포함 합 니 다.소비 메 시 지 는 다른 프로 세 스 이기 때문에 결과 가 돌아 올 때 까지 프로 세 스 를 막 아야 합 니 다. 차단 대기 열 BlockingQueue 를 사용 하 는 것 은 매우 좋 은 방법 입 니 다. 여기 서 우 리 는 길이 가 1 인 Array BlockQueue 를 사 용 했 습 니 다. handle Delivery 의 기능 은 메시지 의 correlationId 가 우리 가 이전에 보 낸 것 인지 확인 하 는 것 입 니 다. 만약 에...반환 값 을 BlockingQueue 로 되 돌려 줍 니 다.이 때 메 인 라인 은 되 돌아 오 기 를 기다 리 고 Array BlockQueue 에서 되 돌아 오 는 값 을 가 져 옵 니 다.
클 라 이언 트 로부터 요청
RPCClient fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibonacciRpc.close();
소스 코드 는 RPCClient. java 와 RPCServer. java 컴 파일 참조
javac -cp $CP RPCClient.java RPCServer.java
우리 rpc 서버 가 완료 되 었 습 니 다. 서 비 스 를 시작 합 니 다.
java -cp $CP RPCServer
# => [x] Awaiting RPC requests
fibonacci 급 수 를 얻 기 위해 서 는 클 라 이언 트 만 실행 해 야 합 니 다:
java -cp $CP RPCClient
# => [x] Requesting fib(30)
이상 의 실현 방식 은 RPC 요청 을 하 는 유일한 방식 이 아니 지만 여러 가지 장점 이 있 습 니 다. 만약 에 RPC 서비스 가 너무 느 리 면 당신 은 매우 편리 한 수준 으로 확장 할 수 있 습 니 다. 소비자 의 개 수 를 늘 리 기만 하면 됩 니 다. 우리 의 코드 는 비교적 간단 하고 책임 있 는 문제 가 해결 되 지 않 았 습 니 다. 예 를 들 어
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
kafka 의 0.8.2.1 버 전의 자바 코드 구현1. 설명 이 코드 구현 은 kafka2.10 의 0.8.2.1 버 전의 자바 코드 가 실현 되 고 소비 자 는 여러 개의 Topic 소비 에 대한 다 중 스 레 드 실현 이다. 2. 설치 참고: Kafka 간단 한...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.