rabbitmq 에 대한 구덩이 및 의문

3689 단어 rabbitmqACK
springboot 에서 rabbitmq 를 사용 할 때 일부 상황 을 만 났 는데 개념 이 명확 하지 않 아서 생 긴 것 이 많 습 니 다. 이 기록 입 니 다.(아래 내용 은 순 전 히 개인 적 인 견해 입 니 다. 지적 을 환영 합 니 다.)
1. 확인 메커니즘
  publisherConfirm
  개인 이 이해 하 는 확인 체 제 는 사실 두 부분 으로 구성 되 는데 그것 이 바로 rabbitmq - server 가 생산자 에 대한 확인, 즉 Publisher Confirm 이다.이 확인 정 보 는 server 에서 생산자 가 보 낸 메 시 지 를 받 은 후 즉시 server 로 돌아 갑 니 다. 메시지 가 소비자 에 게 처리 되 었 는 지 와 는 상 관 없 이 server 가 단순히 생산자 에 게 당신 의 소식 을 들 었 습 니 다.
  ack
  또 하나의 메 시 지 는 ack 입 니 다. ack 은 소비자 가 server 에서 메 시 지 를 꺼 내 서 server 에 게 메 시 지 를 꺼 냈 다 고 알려 주 는 것 을 말 합 니 다.엄 밀 히 말 하면 사실 소식 의 처리 성공 여부 와 도 관계 가 없다.ack 는 세 가지 가 있 는데 각각 ack, 자동 ack, 수 동 ack 이다.ack 없 이 설명 할 필요 가 없습니다. 수 동 ack 은 소비자 가 메 시 지 를 받 은 후에 자동 으로 server 에 ack 을 되 돌려 주 는 것 입 니 다. 소비자 가 메시지 에 대한 처리 에 이상 이 있 든 없 든 ack 으로 돌아 갑 니 다. 개인 적 으로 업무 코드 가 실 행 된 후에 server 가 ack 을 받 은 후에 정상 적 인 상황 에서 메 시 지 를 server 에서 삭제 할 것 이 라 고 추측 합 니 다.하지만 이상 이 생기 면 이 메 시 지 를 무한 재 시도 해 삭제 되 지 않 는 다.여기 서 저 는 의문 이 하나 있 습 니 다. 신 이 풀 어 주 셨 으 면 좋 겠 습 니 다. 문 제 는 이 렇 습 니 다. 저 는 자동 ack 을 설정 하고 코드 에 빈 포인터 이상 을 써 서 이상 을 테스트 했 습 니 다. 이때 소식 을 받 은 후에 이상 이 발생 했 습 니 다. 그리고 소식 은 무한 재 시도 되 었 습 니 다. 재 시도 과정 에서 소식 의 상 태 는 unacked 이 었 습 니 다. 그리고 저 는 수 동 으로 소비 자 를 멈 추 었 습 니 다.이때 메시지 의 상태 가 ready 로 바 뀌 었 는데, 이 ready 는 어떻게 바 뀌 었 을 까?소비자 들 이 서버 에 메 시 지 를 받 았 는데 처리 에 이상 이 생 겼 다 고 다른 로 고 를 보 낸 건 가? ack 을 보 냈 다 면 바로 대기 열 에서 메 시 지 를 삭제 해 야 하지 않 겠 습 니까?준비 가 되 어 있 는 건 아니 겠 지?
수 동 ack 에 대해 이해 하기 어 려 운 것 은 없습니다. 바로 자신 이 수 동 으로 ack 를 보 낼 시 기 를 제어 하 는 것 입 니 다. 수 동 ack 가 이상 하면 무한 재 시도 하지 않 습 니 다.
이상 의 관점 은 제 가 이해 한 것 입 니 다. 처음에 이해 가 부당 해서 ack 으로 분포 식 업무 관 리 를 하고 싶 었 습 니 다. 정리 한 후에 밝 아 졌 습 니 다.
2. rpc 메시지 형식, 분포 식 사무 관리
    분포 식 사 무 는 이전에 그다지 고려 하지 않 았 는데, 최근 에 mq 를 통합 할 때 비로소 mq 로 해결 할 생각 을 했 고, 인터넷 을 통 해 확인 하 는 것 도 하나의 방안 이다.
사실 간단하게 이해 하면 보 낸 mq 메시지 가 왔 다 갔다 하 는 것 입 니 다. 과거 에 회의 에 참석 하지 않 은 메 시 지 는 하나의 대열 이 아 닙 니 다.우리 가 메 시 지 를 받 았 을 때 되 돌아 오 는 대기 열 을 지정 하면 됩 니 다. 생산자 에서 이 대기 열 을 감청 하면 됩 니 다.이전 코드 로 넘 어 갈 게 요.
@Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("26Test");
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                message.getMessageProperties().getCorrelationIdString();
                CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                        .correlationId(UUID.randomUUID().toString())
                        .replyTo("return")
                        .build();
                System.out.println(" getReplyRequest----"+props.getCorrelationId()+"==============" +new Date() );
                channel.basicPublish("", "return", props, body);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        });
        return container;
    }

이 코드 가 실 현 된 기능 은 26Test 라 는 대기 열 을 감청 하 는 것 입 니 다. 이 대기 열의 메 시 지 를 처리 한 후에 return 대기 열 에서 메 시 지 를 보 내 려 고 합 니 다. 메 시 지 를 실현 하 는 것 은 반복 적 입 니 다.그러나 이런 사무의 처리 방식 도 폐단 이 있다. 뚜렷 한 비동기 처리 3. 무한 재 시 는 어떻게 처리 합 니까?  자동 ack 시 무한 재 시도 설정 가능 한 가요?

좋은 웹페이지 즐겨찾기