Spring Cloud Stream 간단 한 사용법

7631 단어 SpringCloudStream
Spring Cloud Stream 은 Spring Cloud 시스템 의 Mq 에 대해 좋 은 상층 추상 화 를 가 져 왔 습 니 다.우 리 는 구체 적 인 메시지 미들웨어 와 결합 하여 바 텀 구체 적 인 MQ 메시지 미들웨어 의 세부 적 인 차 이 를 차단 할 수 있 습 니 다.마치 Hibernate 가 구체 적 인 데이터 베 이 스 를 차단 한 것 과 같 습 니 다(Mysql/Oracle*12032).이렇게 하면 우 리 는 MQ 를 배우 고 개발 하 며 유지 하 는 것 이 쉬 워 질 것 이다.⽬전 Spring Cloud Stream 원생⽀는 RabbitMQ 와 Kafka 를 지 녔 고,아 리 는 이 를 바탕 으로 RocketMQ 의 지원 을 제공 했다.
Spring Cloud Stream 을 간단하게 사용 하여 RocketMQ 기반 생산자 와 소비 자 를 구축 합 니 다.
생산자
pom 파일 에 의존 추가

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>

    </dependencies>
프로필 에 Spring Cloud Stream binder 와 bings 에 대한 설정 추가

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain #     。     JSON
그 중에서 destination 은 생산 된 데이터 가 보 낸 topic 를 대표 하고 channel 을 데이터 전송 에 사용 하도록 정의 합 니 다.

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface TestChannel {
    @Output("output")
    MessageChannel output();
}
마지막 구조 데이터 전송 인터페이스

@Controller
public class SendMessageController {
    @Resource
    private TestChannel testChannel;

    @ResponseBody
    @RequestMapping(value = "send", method = RequestMethod.GET)
    public String sendMessage() {
        String messageId = UUID.randomUUID().toString();
        Message<String> message = MessageBuilder
                .withPayload("this is a test:" + messageId)
                .setHeader(MessageConst.PROPERTY_TAGS, "test")
                .build();
        try {
            testChannel.output().send(message);
            return messageId + "    ";
        } catch (Exception e) {
            return messageId + "    ,  :" + e.getMessage();
        }
    }
}
소비자
소비자 의 pom 도입 은 생산자 와 같 습 니 다.더 이상 군말 하지 않 습 니 다.설정 할 때 stream 의 output 를 input 으로 수정 하고 대응 하 는 속성 을 수정 해 야 합 니 다.

spring:
  application:
    name: zhao-cloud-stream-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          input:
            consumer:
              tags: test
      bindings:
        input:
          destination: stream-test-topic
          content-type: text/plain #     。     JSON
          group: test
그리고 채널 의 구조 도 똑 같이 수정 해 야 돼 요.

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface TestChannel {
    @Input("input")
    SubscribableChannel input();
}
마지막 으로 나 는 시동 류 에서 받 은 소식 을 감청 했다.

   @StreamListener("input")
    public void receiveInput(@Payload Message message) throws ValidationException {
        System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
    }
테스트 결과
file
file
다른 기능 스 트림
메시지 전송 실패 처리
메 시 지 를 보 내 는 데 실 패 했 습 니 다.기본 적 인"topic.errors"채널 에 보 낸 것 을 후회 합 니 다.(topic 은 설 정 된 destination 입 니 다.)메시지 전송 에 실패 한 처 리 를 설정 하려 면 오류 메시지 의 channel 을 열 어야 합 니 다.소비자 설정 은 다음 과 같 습 니 다.

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain #     。     JSON
          producer:
            errorChannelEnabled: true
시작 클래스 에 잘못된 메 시 지 를 설정 하 는 채널 정보

 @Bean("stream-test-topic.errors")
    MessageChannel testoutPutErrorChannel(){
        return new PublishSubscribeChannel();
    }
새 예외 처리 서비스

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service
public class ErrorProducerService {

    @ServiceActivator(inputChannel = "stream-test-topic.errors")
    public void receiveProducerError(Message message){
        System.out.println("receive error msg :"+message);
    }
}
이상 이 발생 했 을 때 테스트 클래스 에서 이상 포획 이 되 었 기 때문에 발송 이상 처 리 는 주로 여기에서 진행 된다.rocketMq 와 끊 어 진 장면 을 시 뮬 레이 션 합 니 다.볼 수 있다
file file
소비자 오류 처리
우선 설정 추가

spring:
  application:
    name: zhao-cloud-stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              group: test
              sync: true
      bindings:
        output:
          destination: stream-test-topic
          content-type: text/plain #     。     JSON
          producer:
            errorChannelEnabled: true
시 뮬 레이 션 이상 동작 추가

 @StreamListener("input")
    public void receiveInput(@Payload Message message) throws ValidationException {
        //System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo"));
        throw new RuntimeException("oops");
    }
    @ServiceActivator(inputChannel = "stream-test-topic.test.errors")
    public void receiveConsumeError(Message message){
        System.out.println("receive error msg"+message.getPayload());
    }
file
코드 주소https://github.com/zhendiao/deme-code/tree/main/zp
스프링 클 라 우 드 스 트림 의 간단 한 사용법 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.스프링 클 라 우 드 스 트림 의 사용 내용 에 대해 서 는 예전 의 글 을 검색 하거나 아래 의 관련 글 을 계속 찾 아 보 세 요.앞으로 도 많은 응원 부 탁 드 리 겠 습 니 다!

좋은 웹페이지 즐겨찾기