Spring Cloud Stream 간단 한 사용법
Spring Cloud Stream 을 간단하게 사용 하여 RocketMQ 기반 생산자 와 소비 자 를 구축 합 니 다.
생산자
pom 파일 에 의존 추가
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
</dependencies>
프로필 에 Spring Cloud Stream binder 와 bings 에 대한 설정 추가
spring:
application:
name: zhao-cloud-stream-producer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
group: test
sync: true
bindings:
output:
destination: stream-test-topic
content-type: text/plain # 。 JSON
그 중에서 destination 은 생산 된 데이터 가 보 낸 topic 를 대표 하고 channel 을 데이터 전송 에 사용 하도록 정의 합 니 다.
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface TestChannel {
@Output("output")
MessageChannel output();
}
마지막 구조 데이터 전송 인터페이스
@Controller
public class SendMessageController {
@Resource
private TestChannel testChannel;
@ResponseBody
@RequestMapping(value = "send", method = RequestMethod.GET)
public String sendMessage() {
String messageId = UUID.randomUUID().toString();
Message<String> message = MessageBuilder
.withPayload("this is a test:" + messageId)
.setHeader(MessageConst.PROPERTY_TAGS, "test")
.build();
try {
testChannel.output().send(message);
return messageId + " ";
} catch (Exception e) {
return messageId + " , :" + e.getMessage();
}
}
}
소비자소비자 의 pom 도입 은 생산자 와 같 습 니 다.더 이상 군말 하지 않 습 니 다.설정 할 때 stream 의 output 를 input 으로 수정 하고 대응 하 는 속성 을 수정 해 야 합 니 다.
spring:
application:
name: zhao-cloud-stream-consumer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
consumer:
tags: test
bindings:
input:
destination: stream-test-topic
content-type: text/plain # 。 JSON
group: test
그리고 채널 의 구조 도 똑 같이 수정 해 야 돼 요.
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface TestChannel {
@Input("input")
SubscribableChannel input();
}
마지막 으로 나 는 시동 류 에서 받 은 소식 을 감청 했다.
@StreamListener("input")
public void receiveInput(@Payload Message message) throws ValidationException {
System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
}
테스트 결과다른 기능 스 트림
메시지 전송 실패 처리
메 시 지 를 보 내 는 데 실 패 했 습 니 다.기본 적 인"topic.errors"채널 에 보 낸 것 을 후회 합 니 다.(topic 은 설 정 된 destination 입 니 다.)메시지 전송 에 실패 한 처 리 를 설정 하려 면 오류 메시지 의 channel 을 열 어야 합 니 다.소비자 설정 은 다음 과 같 습 니 다.
spring:
application:
name: zhao-cloud-stream-producer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
group: test
sync: true
bindings:
output:
destination: stream-test-topic
content-type: text/plain # 。 JSON
producer:
errorChannelEnabled: true
시작 클래스 에 잘못된 메 시 지 를 설정 하 는 채널 정보
@Bean("stream-test-topic.errors")
MessageChannel testoutPutErrorChannel(){
return new PublishSubscribeChannel();
}
새 예외 처리 서비스
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
public class ErrorProducerService {
@ServiceActivator(inputChannel = "stream-test-topic.errors")
public void receiveProducerError(Message message){
System.out.println("receive error msg :"+message);
}
}
이상 이 발생 했 을 때 테스트 클래스 에서 이상 포획 이 되 었 기 때문에 발송 이상 처 리 는 주로 여기에서 진행 된다.rocketMq 와 끊 어 진 장면 을 시 뮬 레이 션 합 니 다.볼 수 있다소비자 오류 처리
우선 설정 추가
spring:
application:
name: zhao-cloud-stream-producer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
group: test
sync: true
bindings:
output:
destination: stream-test-topic
content-type: text/plain # 。 JSON
producer:
errorChannelEnabled: true
시 뮬 레이 션 이상 동작 추가
@StreamListener("input")
public void receiveInput(@Payload Message message) throws ValidationException {
//System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
throw new RuntimeException("oops");
}
@ServiceActivator(inputChannel = "stream-test-topic.test.errors")
public void receiveConsumeError(Message message){
System.out.println("receive error msg"+message.getPayload());
}
코드 주소https://github.com/zhendiao/deme-code/tree/main/zp
스프링 클 라 우 드 스 트림 의 간단 한 사용법 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.스프링 클 라 우 드 스 트림 의 사용 내용 에 대해 서 는 예전 의 글 을 검색 하거나 아래 의 관련 글 을 계속 찾 아 보 세 요.앞으로 도 많은 응원 부 탁 드 리 겠 습 니 다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[MeU] Hashtag 기능 개발➡️ 기존 Tag 테이블에 존재하지 않는 해시태그라면 Tag , tagPostMapping 테이블에 모두 추가 ➡️ 기존에 존재하는 해시태그라면, tagPostMapping 테이블에만 추가 이후에 개발할 태그 기반 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.