백화 RabbitMQ (6): RPC

널리 보급 하 다
RabbitMQ 특 강
https://segmentfault.com/l/15...
CoolMQ 오픈 소스 프로젝트
우 리 는 메시지 큐 를 이용 하여 분포 식 업무 의 최종 일치 성 해결 방안 을 실현 하 였 으 니, 여러분 들 이 지 켜 보 세 요.원본 코드 참조 가능:https://github.com/vvsuperman... 프로젝트 지원 사이트:http://rabbitmq.org.cn최신 글 이나 실현 은 위 에 업 데 이 트 됩 니 다.
성명 RPC 인터페이스
RPC 를 설명 하기 위해 서 우 리 는 먼저 클 라 이언 트 인 터 페 이 스 를 만 듭 니 다. 방법 이 있 습 니 다. RPC 요청 을 할 것 이 고 결과 가 돌아 올 때 까지 계속 막 을 것 입 니 다.
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

RPC 는 RPC 가 흔 하지만 조심해 서 사용 해 야 합 니 다. rpc 가 매우 느 린 프로그램 이 라 고 가정 하면 결 과 를 예측 할 수 없고 디 버 깅 하기 어렵 습 니 다.
RPC 를 사용 할 때 아래 의 규범 을 참고 할 수 있 습 니 다.
  • 시스템 디자인 에 있어 상세 한 문서 설명 이 있어 야 구성 요소 간 의 의존 도 를 명확 하 게 하고 조사 할 수 있 는 근 거 를 가 져 야 한다
  • .
  • 잘못된 이상 처 리 를 했 습 니 다. 특히 RPC 서비스 가 끊 기거 나 오랫동안 응답 하지 않 았 을 때
  • 차단 식 RPC 대신 비동기 파 이 프 를 사용 하여 시스템 간 결합 을 낮 춥 니 다
  • 리 셋 대기 열 (Callback queue)
    RabbitMQ 로 RPC 를 실현 하 는 것 이 비교적 간단 합 니 다. 클 라 이언 트 가 요청 을 하면 서버 에서 이 요청 에 대한 응답 을 되 돌려 줍 니 다.이 기능 을 실현 하기 위해 서 우 리 는 '리 셋' 할 수 있 는 대기 열 이 필요 합 니 다. 우 리 는 직접 기본 대기 열 을 사용 하면 됩 니 다.
    callbackQueueName = channel.queueDeclare().getQueue();
    
    BasicProperties props = new BasicProperties
                                .Builder()
                                .replyTo(callbackQueueName)
                                .build();
    
    channel.basicPublish("", "rpc_queue", props, message.getBytes());
    
    // ... then code to read a response message from the     callback_queue ...
    

    메시지 속성 (메시지 속성)
    AMQP 0 - 9 - 1 프로 토 콜 은 모든 메시지 에 14 개의 속성 을 정의 합 니 다. 많은 속성 이 사용 되 지 않 지만 다음 과 같은 몇 가 지 를 주의해 야 합 니 다.
  • 배포 모드 (delivery Mode): 메시지 가 지속 되 어야 하 는 지 (persistent) 또는 업무 (transient) 가 필요 한 지 등 을 표시 합 니 다. 제2 장 에 설명 되 어 있 습 니 다
  • 메시지 유형 (contentType): 메시지 에서 구체 적 인 내용 을 전달 하 는 인 코딩 방식 을 설명 합 니 다. 예 를 들 어 우리 가 자주 사용 하 는 JSON 은 application / json
  • 으로 설정 할 수 있 습 니 다.
  • 메시지 응답 (reply To): 대기 열 을 되 돌 리 는 데 사용
  • 관계 Id (correlationId): RPC 의 반환 값 을 해당 하 는 요청 에 연결 하 는 데 사 용 됩 니 다.

  • 우 리 는 상응하는 가방 을 도입 해 야 한다.
    import com.rabbitmq.client.AMQP.BasicProperties;
    

    관계 Id (상관 Id)
    앞의 방법 에서 우 리 는 모든 RPC 요청 에 하나의 대기 열 을 만 들 었 습 니 다. 이것 은 전혀 필요 하지 않 습 니 다. 우 리 는 모든 클 라 이언 트 에 하나의 대기 열 을 만 들 면 됩 니 다.
    모든 RPC 가 하나의 대기 열 을 사용 하기 때문에 메시지 가 돌아 오 면 돌아 오 는 메시지 가 어떤 요청 에 대응 하 는 지 어떻게 압 니까?그래서 저 희 는 Correlation Id 를 사 용 했 습 니 다. 모든 요청 에 유일한 표지 로 서 반환 값 을 받 은 후에 이 Id 를 검사 하고 해당 하 는 응답 과 일치 합 니 다.Id 에 대응 하 는 요청 을 찾 지 못 하면 버 립 니 다.
    여기 서 당신 은 왜 알 수 없 는 소식 을 버 려 야 하 는 지 의문 이 있 을 수 있 습 니 다.이상 한 거 던 지 는 게 아니 라이것 은 우리 서버 의 경쟁 조건 (possibility of a race condition) 과 관계 가 있 을 것 이다.예 를 들 어 저희 RabbitMQ 서비스 가 끊 겼 다 고 가정 하면 저희 에 게 답장 을 하 자마자 응답 을 기다 리 지 않 고 서버 가 끊 겼 습 니 다. 그러면 RabbitMQ 서비스 가 재 개 될 때 메 시 지 를 다시 보 냅 니 다. 클 라 이언 트 는 중복 되 는 메 시 지 를 받 을 것 입 니 다. 등 을 고려 하기 위해 저 희 는 돌아 온 후의 처리 방식 을 자세히 처리 해 야 합 니 다.
    작은 매듭
    RPC 작업 과정 은 다음 과 같 습 니 다.
    클 라 이언 트 가 시 작 될 때 독립 된 익명 리 셋 대기 열 을 만 들 고 RPC 요청 을 보 냅 니 다. 이 RPC 요청 은 두 가지 속성 을 가 져 옵 니 다. reply To - RPC 호출 이 성공 하면 되 돌아 가 야 할 대기 열 이름 입 니 다.correlationId - 요청 마다 유일한 표식 입 니 다.RPC 서비스 제공 자 는 대기 열 에서 기다 리 고 있 습 니 다. 요청 이 도착 하면 즉시 응답 하여 자신의 일 을 끝내 고 결 과 를 되 돌려 줍 니 다. reply To 에 따라 해당 하 는 대기 열 로 돌아 갑 니 다.클 라 이언 트 도 대기 열 에 있 는 정보 가 돌아 오 기 를 기다 리 고 있 습 니 다. 메시지 가 나타 나 면 correlationId 를 검사 하고 결 과 를 응답 요청 발기인 에 게 되 돌려 줍 니 다.
    통합
    피 보 나치 급수
    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }

    우 리 는 fibonacci 급 수 를 정의 합 니 다. 정수 만 받 아들 일 수 있 고 효율 이 그다지 높 지 않 은 것 입 니 다.rpc. java 는 다음 과 같다
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RPCServer {
    
        private static final String RPC_QUEUE_NAME = "rpc_queue";
    
        public static void main(String[] argv) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            Connection connection = null;
            try {
                connection      = factory.newConnection();
                final Channel channel = connection.createChannel();
    
                channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
    
                channel.basicQos(1);
    
                System.out.println(" [x] Awaiting RPC requests");
    
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                .Builder()
                                .correlationId(properties.getCorrelationId())
                                .build();
    
                        String response = "";
    
                        try {
                            String message = new String(body,"UTF-8");
                            int n = Integer.parseInt(message);
    
                            System.out.println(" [.] fib(" + message + ")");
                            response += fib(n);
                        }
                        catch (RuntimeException e){
                            System.out.println(" [.] " + e.toString());
                        }
                        finally {
                            channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
    
                            channel.basicAck(envelope.getDeliveryTag(), false);
    
                // RabbitMq consumer worker thread notifies the RPC server owner thread 
                        synchronized(this) {
                            this.notify();
                        }
                        }
                    }
                };
    
                channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
    
                // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized(consumer) {
            try {
                  consumer.wait();
                } catch (InterruptedException e) {
                  e.printStackTrace();          
                }
                }
             }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (connection != null)
            try {
                    connection.close();
                 } catch (IOException _ignore) {}
             }
        }
    }
    

    서버 의 코드 가 비교적 직접적 입 니 다. 먼저 연결 을 만 들 고 channel 과 성명 대기 열 을 만 듭 니 다.우 리 는 이후 에 여러 소비 자 를 만 들 수 있 습 니 다. 더 좋 은 부하 균형 을 위해 channel. basicQos 에 prefetchCount 를 설정 한 다음 에 basicConsume 감청 대기 열 을 설정 하여 응답 함 수 를 제공 하여 요청 과 반환 값 을 처리 해 야 합 니 다.
    RPCClient.java
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeoutException;
    
    public class RPCClient {
    
        private Connection connection;
        private Channel channel;
        private String requestQueueName = "rpc_queue";
        private String replyQueueName;
    
        public RPCClient() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            connection = factory.newConnection();
            channel = connection.createChannel();
    
            replyQueueName = channel.queueDeclare().getQueue();
        }
    
        public String call(String message) throws IOException, InterruptedException {
            String corrId = UUID.randomUUID().toString();
    
            AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
    
            channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
    
            final BlockingQueue response = new ArrayBlockingQueue(1);
    
            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });
    
            return response.take();
        }
    
        public void close() throws IOException {
            connection.close();
        }
    
        //...
    }
    

    클 라 이언 트 코드 는 다음 과 같 습 니 다. 우 리 는 연결 을 만 들 고 'callback' 대기 열 을 설명 합 니 다. 우 리 는 'callback' 대기 열 에 메 시 지 를 제출 하고 RPC 의 반환 값 을 받 을 것 입 니 다. 구체 적 인 절 차 는 다음 과 같 습 니 다.
    저 희 는 먼저 유일한 correlation Id 를 만 들 고 저장 합 니 다. 저 희 는 이 를 통 해 받 은 정 보 를 구분 할 것 입 니 다.그리고 이 메 시 지 를 보 내 면 메 시 지 는 reply To 와 collelation Id 두 가지 속성 을 포함 합 니 다.소비 메 시 지 는 다른 프로 세 스 이기 때문에 결과 가 돌아 올 때 까지 프로 세 스 를 막 아야 합 니 다. 차단 대기 열 BlockingQueue 를 사용 하 는 것 은 매우 좋 은 방법 입 니 다. 여기 서 우 리 는 길이 가 1 인 Array BlockQueue 를 사 용 했 습 니 다. handle Delivery 의 기능 은 메시지 의 correlationId 가 우리 가 이전에 보 낸 것 인지 확인 하 는 것 입 니 다. 만약 에...반환 값 을 BlockingQueue 로 되 돌려 줍 니 다.이 때 메 인 라인 은 되 돌아 오 기 를 기다 리 고 Array BlockQueue 에서 되 돌아 오 는 값 을 가 져 옵 니 다.
    클 라 이언 트 로부터 요청
    RPCClient fibonacciRpc = new RPCClient();
    
    System.out.println(" [x] Requesting fib(30)");
    String response = fibonacciRpc.call("30");
    System.out.println(" [.] Got '" + response + "'");
    
    fibonacciRpc.close();
    

    소스 코드 는 RPCClient. java 와 RPCServer. java 컴 파일 참조
    javac -cp $CP RPCClient.java RPCServer.java

    우리 rpc 서버 가 완료 되 었 습 니 다. 서 비 스 를 시작 합 니 다.
    java -cp $CP RPCServer
    # => [x] Awaiting RPC requests

    fibonacci 급 수 를 얻 기 위해 서 는 클 라 이언 트 만 실행 해 야 합 니 다:
    java -cp $CP RPCClient
    # => [x] Requesting fib(30)
    

    이상 의 실현 방식 은 RPC 요청 을 하 는 유일한 방식 이 아니 지만 여러 가지 장점 이 있 습 니 다. 만약 에 RPC 서비스 가 너무 느 리 면 당신 은 매우 편리 한 수준 으로 확장 할 수 있 습 니 다. 소비자 의 개 수 를 늘 리 기만 하면 됩 니 다. 우리 의 코드 는 비교적 간단 하고 책임 있 는 문제 가 해결 되 지 않 았 습 니 다. 예 를 들 어
  • 서비스 가 모두 끊 기 면 클 라 이언 트 는 어떻게 처리 해 야 합 니까
  • 서비스 시간 이 초과 되면 어떻게 처리 해 야 합 니까
  • 불법 정 보 는 어떻게 처리 해 야 합 니까
  • 기본 장절 의 내용 은 여기까지 입 니 다. 여기까지 입 니 다. 메시지 큐 의 기본 적 인 용법 을 기본적으로 알 수 있 습 니 다. 그 다음 에 우 리 는 중급 내용 의 학습 에 들 어 갈 수 있 습 니 다.

    좋은 웹페이지 즐겨찾기