SpringCloudStream 이벤트 구동 2 단계 - RabbitMQ
글 목록
Spring Cloud Stream 에는 세 개의 Source, Sink, Processor 인터페이스 가 내장 되 어 있다.
@ Output ("output") 은 출력 흐름 으로 표시 되 며 Spring IOC 관리 에 추 가 됩 니 다.
@ Input ("input") 은 입력 흐름 으로 표시 되 며, Spring IOC 관리 에 추 가 됩 니 다.
@ Enablebinding ({Source. class, Sink, class}) 입력 흐름 과 출력 흐름 의 프 록 시 바 인 딩 을 엽 니 다.
@ StreamListener (Sink. INPUT) 입력 흐름 감청
@ ServiceActivator (inputChannel = Sink. INPUT) 감청 입력 흐름 은 @ StreamListener 보다 우선 순위 가 낮 습 니 다.
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-stream-rabbitartifactId>
dependency>
1. 자바 구현 (어떤 것 이 든 가능)
1. 기본 구현
package com.ksaas.cloud.service.system.infrastructure.event.default2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author kylin
* @desc demo
*/
@Component
public class SourcePublisher {
@Resource
@Qualifier(Source.OUTPUT)
MessageChannel messageChannel;
public void test(String message) {
messageChannel.send(MessageBuilder.withPayload(message).build());
System.out.println("test default source");
}
}
package com.ksaas.cloud.service.system.infrastructure.event.default2;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @author kylin
* demo
*/
@Component
public class SinkReceiver {
// ,
@StreamListener(Sink.INPUT)
public void test(Message message) {
System.out.println("test default sink: " + message);
}
// // , @StreamListener
// @ServiceActivator(inputChannel = Sink.INPUT)
// public void test(Message message) {
// System.out.println("test default sink: " + message);
// }
// // , , @PostConstruct+SubscribableChannel
// @Resource
// @Qualifier(Sink.INPUT)
// SubscribableChannel subscribableChannel;
// @PostConstruct
// public void test() {
// subscribableChannel.subscribe(message -> {
// System.out.println("test default sink: " + message);
// });
// }
}
package com.ksaas.cloud.service.system.infrastructure.event.default2;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
/**
* @author kylin
*/
@EnableBinding({Source.class, Sink.class})
public class DefaultSpringCloudStreamConfig {
}
package com.ksaas.cloud.service.system.infrastructure.event.custom;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* ,( Source , )
*
* @author kylin
*/
public interface CustomSource {
String TEST_OUT_PUT = "customOutput";
@Output(TEST_OUT_PUT)
MessageChannel testOutput();
}
package com.ksaas.cloud.service.system.infrastructure.event.custom;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
/**
* ,( Sink , )
*
* @author kylin
*/
public interface CustomSink {
String TEST_IN_PUT = "customInput";
@Input(TEST_IN_PUT)
MessageChannel testOutput();
}
package com.ksaas.cloud.service.system.infrastructure.event.custom;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* demo
*
* @author kylin
*/
@Component
public class CustomSourcePublisher {
@Resource
@Qualifier(CustomSource.TEST_OUT_PUT)
MessageChannel messageChannel;
public void test(String message) {
messageChannel.send(MessageBuilder.withPayload(message).build());
System.out.println("test custom source");
}
}
package com.ksaas.cloud.service.system.infrastructure.event.custom;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* demo
*
* @author kylin
*/
@Component
public class CustomSinkReceiver {
@StreamListener(CustomSink.TEST_IN_PUT)
public void test(Message message) {
System.out.println();
System.out.println("test custom sink: " + message);
}
}
// ....
package com.ksaas.cloud.service.system.infrastructure.event.custom;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
* @author kylin
*/
@EnableBinding({CustomSource.class, CustomSink.class})
public class CustomSpringCloudStreamConfig {
}
송신 단 과 소비 단의 귀속 설정
spring:
cloud:
stream:
bindings:
# destination
#( , / )
output:
destination: kylin_default
content-type: application/json
# destination
input:
destination: kylin_default
content-type: application/json
group: group-cus1
# : customOutput customInput kylin_custom
customOutput:
destination: kylin_custom
content-type: application/json
customInput:
destination: kylin_custom
content-type: application/json
group: group-cus1
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[MeU] Hashtag 기능 개발➡️ 기존 Tag 테이블에 존재하지 않는 해시태그라면 Tag , tagPostMapping 테이블에 모두 추가 ➡️ 기존에 존재하는 해시태그라면, tagPostMapping 테이블에만 추가 이후에 개발할 태그 기반 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.