SpringCloudStream 이벤트 구동 2 단계 - RabbitMQ

Spring Cloud Stream 이벤트 구동 2 단계
글 목록
  • Spring Cloud Stream 이벤트 구동 2 단계
  • 주해 / 인터페이스 이해
  • 1. 자바 실현 (그 어떠한 것 도 가능)
  • 1. 기본 실현
  • 2. 사용자 정의 실현
  • 2. 출력 흐름 / 입력 흐름 바 인 딩 설정
  • 주해 / 인터페이스 이해
    Spring Cloud Stream 에는 세 개의 Source, Sink, Processor 인터페이스 가 내장 되 어 있다.
  • Source 송신 메시지 인터페이스 에 @ Output ("output")
  • 포함
  • 싱 크 메시지 의 소비자 인터페이스 에는 @ Input ("input")
  • 이 들 어 있 습 니 다.
  • 프로세서 가 Source, Sink
  • 을 계승 했다.
    @ Output ("output") 은 출력 흐름 으로 표시 되 며 Spring IOC 관리 에 추 가 됩 니 다.
    @ Input ("input") 은 입력 흐름 으로 표시 되 며, Spring IOC 관리 에 추 가 됩 니 다.
    @ Enablebinding ({Source. class, Sink, class}) 입력 흐름 과 출력 흐름 의 프 록 시 바 인 딩 을 엽 니 다.
    @ StreamListener (Sink. INPUT) 입력 흐름 감청
    @ ServiceActivator (inputChannel = Sink. INPUT) 감청 입력 흐름 은 @ StreamListener 보다 우선 순위 가 낮 습 니 다.
    
    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    dependency>
    

    1. 자바 구현 (어떤 것 이 든 가능)
    1. 기본 구현
  • 소스. OUTPUT 에 발 표 된 출력 흐름 (이것 은 내 장 된 출력 인터페이스)
    package com.ksaas.cloud.service.system.infrastructure.event.default2;
    
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * @author kylin
     * @desc        demo
     */
    @Component
    public class SourcePublisher {
    
        @Resource
        @Qualifier(Source.OUTPUT)
        MessageChannel messageChannel;
    
        public void test(String message) {
            messageChannel.send(MessageBuilder.withPayload(message).build());
            System.out.println("test default source");
        }
    }
    
    
  • 정 보 를 받 아들 이 고 Sink. INPUT 의 입력 흐름 을 받 아들 입 니 다 (이것 은 내 장 된 인터페이스 입 니 다)
    package com.ksaas.cloud.service.system.infrastructure.event.default2;
    
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    /**
     * @author kylin
     *         demo
     */
    @Component
    public class SinkReceiver {
    
        //      ,     
        @StreamListener(Sink.INPUT)
        public void test(Message message) {
            System.out.println("test default sink: " + message);
        }
    
    //	 //      ,     @StreamListener
    //    @ServiceActivator(inputChannel = Sink.INPUT)
    //    public void test(Message message) {
    //        System.out.println("test default sink: " + message);
    //    }
    
    //	 //    ,     ,   @PostConstruct+SubscribableChannel  
    //    @Resource
    //    @Qualifier(Sink.INPUT)
    //    SubscribableChannel subscribableChannel;
    //    @PostConstruct
    //    public void test() {
    //        subscribableChannel.subscribe(message -> {
    //            System.out.println("test default sink: " + message);
    //        });
    //    }
    }
    
    
  • 프 록 시 바 인 딩 시작
    package com.ksaas.cloud.service.system.infrastructure.event.default2;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.cloud.stream.messaging.Source;
    
    /**
     * @author kylin
     */
    @EnableBinding({Source.class, Sink.class})
    public class DefaultSpringCloudStreamConfig {
    }
    
    
  • 2. 사용자 정의 실현
  • CustomSource 인터페이스 만 들 기 (이름 사용자 정의): 출력 흐름
    package com.ksaas.cloud.service.system.infrastructure.event.custom;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    /**
     *            ,( Source  ,      )
     *
     * @author kylin
     */
    public interface CustomSource {
        String TEST_OUT_PUT = "customOutput";
    
        @Output(TEST_OUT_PUT)
        MessageChannel testOutput();
    }
    
    
  • CustomSink 인터페이스 만 들 기 (이름 사용자 정의): 입력 흐름
    package com.ksaas.cloud.service.system.infrastructure.event.custom;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.MessageChannel;
    
    /**
     *            ,( Sink  ,      )
     *
     * @author kylin
     */
    public interface CustomSink {
        String TEST_IN_PUT = "customInput";
    
        @Input(TEST_IN_PUT)
        MessageChannel testOutput();
    }
    
    
  • 발표 가 실현 되 었 습 니 다. CustomSource. TEST 에 발표 되 었 습 니 다.OUT_PUT 의 출력 흐름 (사실 기본 동작 의미 와 같 음)
    package com.ksaas.cloud.service.system.infrastructure.event.custom;
    
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     *          demo
     *
     * @author kylin
     */
    
    @Component
    public class CustomSourcePublisher {
    
        @Resource
        @Qualifier(CustomSource.TEST_OUT_PUT)
        MessageChannel messageChannel;
    
        public void test(String message) {
            messageChannel.send(MessageBuilder.withPayload(message).build());
            System.out.println("test custom source");
        }
    }
    
    
  • 소식 을 받 아들 여 실현, Custom Sink. TEST 받 아들 이기IN_PUT 의 입력 흐름 (사실 기본 동작 의미 와 같 음)
    package com.ksaas.cloud.service.system.infrastructure.event.custom;
    
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    /**
     *          demo
     *
     * @author kylin
     */
    @Component
    public class CustomSinkReceiver {
    
        @StreamListener(CustomSink.TEST_IN_PUT)
        public void test(Message message) {
            System.out.println();
            System.out.println("test custom sink: " + message);
        }
    }
    // .... 
    
  • 사용자 정의 프 록 시 바 인 딩 시작 (기본 동작 의미 와 같 음)
    package com.ksaas.cloud.service.system.infrastructure.event.custom;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    
    /**
     * @author kylin
     */
    @EnableBinding({CustomSource.class, CustomSink.class})
    public class CustomSpringCloudStreamConfig {
    }
    
    
  • 2. 출력 흐름 / 입력 흐름 바 인 딩 설정
    송신 단 과 소비 단의 귀속 설정
    spring:
      cloud:
        stream:
          bindings:
          	#        destination         
          	#(                 ,    /        )
            output:
              destination: kylin_default
              content-type: application/json
            #       destination         
            input:
              destination: kylin_default
              content-type: application/json
              group: group-cus1
            
            
            #    :  customOutput  customInput      kylin_custom
            customOutput:
              destination: kylin_custom
              content-type: application/json
            customInput:
              destination: kylin_custom
              content-type: application/json
              group: group-cus1
    

    좋은 웹페이지 즐겨찾기