봄 클 라 우 드 스 트림 요약

콘 셉 트
1、group:
그룹 내 에 하나의 실례 만 소비 된다.group 을 설정 하지 않 으 면 stream 은 모든 인 스 턴 스 에 익명 과 독립 된 group 을 자동 으로 만 들 기 때문에 모든 인 스 턴 스 가 소 비 됩 니 다.
팀 내 1 회 에 1 개의 인 스 턴 스 만 소비 하고 부하 균형 에 대해 문의 할 것 이다.일반적으로 프로그램 을 주어진 목표 에 연결 할 때 consumer group 을 항상 지정 하 는 것 이 좋 습 니 다.
2、destination binder:
외부 정보 시스템 과 통신 하 는 구성 요 소 는 binding 을 구성 하 는 데 두 가지 방법 을 제공 하 는데 그것 이 바로 bindConsumer 와 bindProducer 로 각각 생산자 와 소비 자 를 구성 하 는 데 사용 된다.Binder 는 Spring Cloud Stream 응용 프로그램 을 미들웨어 에 유연 하 게 연결 할 수 있 도록 합 니 다. 현재 spring 은 kafka, rabbitmq 에 binder 를 제공 합 니 다.
3、destination binding:
Binding 은 응용 프로그램 과 메시지 중간 부품 을 연결 하 는 다리 로 메시지 의 소비 와 생산 에 사용 되 며 binder 에서 만 듭 니 다.
4、partition
한 개 이상 의 생산자 가 데 이 터 를 여러 소비자 에 게 보 내 고 공 통 된 특징 표지 가 있 는 데 이 터 를 같은 소비자 가 처리 하도록 확보한다.기본적으로 메시지 에 대해 hashCode 를 한 다음 에 구역 개수 에 따라 나머지 를 취하 기 때문에 같은 소식 에 대해 서 는 항상 같은 소비자 에 게 떨 어 집 니 다.
주의: 엄 밀 히 말 하면 partition 은 개념 에 속 하지 않 고 Stream 이 신축성, 스루풋 을 향상 시 키 는 방식 입 니 다.
주해
1. @ Input, 예제 사용:
public interface MySink {
    @Input("my-input")
    SubscribableChannel input();
}

역할:
  • 메시지 수신 에 사용
  • 모든 binding 에 channel 인 스 턴 스 생 성
  • input channel 의 이름 을 지정 합 니 다
  • spring 용기 에 my - input 라 는 이름 의 Subscribable Channel 의 bean
  • 을 생 성 합 니 다.
  • spring 용기 에 클래스 를 생 성하 여 MySink 인 터 페 이 스 를 실현 합 니 다.

  • 2. @ Output, 예제 사용:
    public interface MySource {
        @Output("my-output")
        MessageChannel output();
    }

    역할:
  • @Input 과 유사 하여 생산 소식
  • 에 불과 하 다.
    3. @ StreamListener, 예제 사용:
    @StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
    public void receive(String messageBody) {
        log.info("Received: {}", messageBody);
    }

    역할:
  • 소비 소식
  • condition 의 역할: 메 시 지 를 걸 러 내 는 데 사용 되 며, 조건 식 에 맞 는 메시지 만 처리 합 니 다
  • condition 이 작용 하 는 두 가지 조건:
  • 주해 방법 은 반환 값 이 없습니다
  • 방법 은 Reactive API
  • 를 지원 하지 않 는 독립 적 인 방법 입 니 다.

    4. @ SendTo, 예제 사용:
    //   INPUT  channel   ,        OUTPUT  channel
    @StreamListener(Sink.INPUT)
    @SendTo(Source.OUTPUT)
    public String receive(String receiveMsg) {
       return "handle...";
    }

    역할:
  • 메시지 보 내기
  • 4. @ InboundChannel Adapter, 예제 사용:
    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT,
            poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
    public MessageSource producer() {
        return () -> new GenericMessage<>("Hello Spring Cloud Stream");
    }

    역할:
  • 이 주 해 를 추가 하 는 방법 으로 생산 소식
  • fixedDelay: 몇 밀리초 에 1 회 발송
  • maxMessagesPerPoll: 매번 몇 개의 메 시 지 를 보 냅 니까
  • 5. @ ServiceActivator, 예제 사용:
    @ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
    public String transform(String payload) {
        return payload.toUpperCase();
    }

    역할:
  • 이 주 해 를 표시 하 는 방법 은 메시지 나 메시지 의 유효한 내용 을 처리 할 수 있 으 며, input 메 시 지 를 감청 하여 방법 체 의 코드 로 처리 한 후 output 에 출력
  • 6. @ Transformer, 예제 사용:
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object transform(String message) {
      return message.toUpperCase();
    }

    역할:
  • @ServiceActivator 과 유사 하 며, 이 주 해 를 표시 하 는 방법 은 메시지, 메시지 헤더 또는 메시지 의 유효 내용
  • 을 바 꿀 수 있다.
    PollableMessageSource
    폴 라 블 메시지 소스 는 소비자 들 이 소비 속 도 를 조절 할 수 있 도록 허용 한다.예 를 들 어 간단하게 보 여 드 리 겠 습 니 다. 먼저 인 터 페 이 스 를 정의 합 니 다.
    public interface PolledProcessor {
        @Input("pollable-input")
        PollableMessageSource input();
    }

    사용 예시:
    @Autowired
    private PolledProcessor polledProcessor;
    
    @Scheduled(fixedDelay = 5_000)
    public void poll() {
        polledProcessor.input().poll(message -> {
            byte[] bytes = (byte[]) message.getPayload();
            String payload = new String(bytes);
            System.out.println(payload);
        });
    }

    참고:
    https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers

    좋은 웹페이지 즐겨찾기