Spring Cloud Stream 은 어떻게 서비스 간 의 통신 을 실현 합 니까?

Spring Cloud Stream
Srping cloud Bus 의 바 텀 실현 은 바로 Spring Cloud Stream 이 고 Spring Cloud Stream 의 목적 은 메시지 구동(또는 이벤트 구동)을 바탕 으로 하 는 마이크로 서비스 구 조 를 구축 하 는 것 이다.Spring Cloud Stream 자 체 는 Spring Messaging,Spring Integration,Spring Boot Actuator,Spring Boot Externalized Configuration 등 모듈 을 패키지(통합)하고 확장 합 니 다.다음은 두 서비스 간 의 통신 을 실현 하여 Spring Cloud Stream 의 사용 방법 을 보 여 드 리 겠 습 니 다.
전체 개술

서 비 스 는 다른 서비스 와 통신 하려 면 채널 을 정의 해 야 합 니 다.보통 출력 채널 과 입력 채널 을 정의 합 니 다.출력 채널 은 메 시 지 를 보 내 는 데 사 용 됩 니 다.입력 채널 은 메 시 지 를 받 는 데 사 용 됩 니 다.모든 채널 에 이름 이 있 습 니 다(입력 과 출력 은 채널 유형 일 뿐 서로 다른 이름 으로 많은 채널 을 정의 할 수 있 습 니 다).서로 다른 채널 의 이름 이 같 을 수 없습니다.그렇지 않 으 면 오류 가 발생 합 니 다.(입력 채널 과 출력 채널 의 서로 다른 유형의 채널 이름 도 같 을 수 없습니다)바 인 딩 기 는 RabbitMQ 나 Kafka 를 조작 하 는 추상 적 인 층 입 니 다.이러한 메시지 미들웨어 의 복잡성 과 불일치 성 을 차단 하기 위해 바 인 딩 기 는 채널 의 이름 으로 메시지 미들웨어 에서 테 마 를 정의 합 니 다.한 주제 안의 정보 생산 자 는 여러 서비스 에서 왔 고 한 주제 안의 정 보 를 가 진 소비자 도 여러 서비스 이다.즉,메시지 의 발표 와 소 비 는 주 제 를 통 해 정의 되 고 조직 된 것 이다.채널 의 이름 은 바로 주제 의 이름 이 고 RabbitMQ 에서 주 제 는 Exchange 를 사용 하여 이 루어 지 며 Kafka 에서 주 제 는 Topic 을 사용 하여 이 루어 진다.
준비 환경
두 프로젝트 를 만 듭 니 다.spring-cloud-stream-a 와 spring-cloud-stream-b,spring-cloud-stream-a 는 Spring Cloud Stream 으로 통신 을 실현 합 니 다.spring-cloud-stream-b 는 Spring Cloud Stream 의 바 텀 모듈 Spring Integration 으로 통신 을 실현 합 니 다.
두 항목 의 POM 파일 의존 도 는 다음 과 같 습 니 다.

<dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-test-support</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
spring-cloud-stream-binder-rabbit 는 바 인 딩 기의 실현 을 위해 RabbitMQ 를 사용 하 는 것 을 말한다.
프로젝트 설정 내용 application.properties:

spring.application.name=spring-cloud-stream-a
server.port=9010

#       
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.application.name=spring-cloud-stream-b
server.port=9011

#       
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
rabbitmq 시작 하기:

docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
프로젝트 코드 작성
A 프로젝트 에서 입력 채널 의 출력 채널 을 정의 합 니 다.정의 채널 은 인터페이스 에서@Input 과@Output 주해 정 의 를 사용 합 니 다.프로그램 이 시 작 될 때 Spring Cloud Stream 은 인터페이스 정의 에 따라 클래스 자동 주입 을 실현 합 니 다(Spring Cloud Stream 은 이 인 터 페 이 스 를 자동 으로 구현 합 니 다.코드 를 쓸 필요 가 없습니다).
서비스 입력 채널,채널 이름 ChatExchange.A.Input,인터페이스 정의 입력 채널 은 SubscribableChannel 로 돌아 가 야 합 니 다.

public interface ChatInput {
  String INPUT = "ChatExchanges.A.Input";
  @Input(ChatInput.INPUT)
  SubscribableChannel input();
}
서비스 출력 채널,채널 이름 ChatExchange.A.출력,출력 채널 은 메시지 채널 로 돌아 가 야 합 니 다.

public interface ChatOutput {

  String OUTPUT = "ChatExchanges.A.Output";

  @Output(ChatOutput.OUTPUT)
  MessageChannel output();
}
메시지 실체 클래스 정의:

public class ChatMessage implements Serializable {

  private String name;
  private String message;
  private Date chatDate;

  //                
  private ChatMessage(){}

  public ChatMessage(String name,String message,Date chatDate){
    this.name = name;
    this.message = message;
    this.chatDate = chatDate;
  }

  public String getName(){
    return this.name;
  }

  public String getMessage(){
    return this.message;
  }

  public Date getChatDate() { return this.chatDate; }

  public String ShowMessage(){
    return String.format("    :%s   ,%s %s。",this.chatDate,this.name,this.message);
  }
}
업무 처리 클래스 에 서 는@Enablebinding 으로 바 인 딩 입력 채널 과 출력 채널 을 설명 합 니 다.이 바 인 딩 동작 은 입력 과 출력 채널 을 만 들 고 등록 하 는 실현 클래스 이기 때문에@Autowired 를 사용 하여 직접 주입 할 수 있 습 니 다.또한 메시지 의 직렬 화 는 기본적으로 application/json 형식(com.fastexml.jackson)을 사용 합 니 다.마지막 으로@StreamListener 주석 으로 지정 한 채널 메시지 의 감청 을 진행 합 니 다.

//ChatInput.class           ,         AClient    。
//Input Output           ,          。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {

  private static Logger logger = LoggerFactory.getLogger(AClient.class);

  @Autowired
  private ChatOutput chatOutput;

  //StreamListener   Json      ,  B        B      。
  @StreamListener(ChatInput.INPUT)
  public void PrintInput(ChatMessage message) {

    logger.info(message.ShowMessage());

    ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

    chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
  }
}
이 A 프로젝트 코드 작성 완료.
B 프로젝트 코드 작성
B 프로젝트 는 Spring Integration 을 사용 하여 메시지 의 발표 와 소 비 를 실현 하고 채널 을 정의 할 때 우 리 는 입력 채널 과 출력 채널 의 이름 을 교환 해 야 합 니 다.

public interface ChatProcessor {

  String OUTPUT = "ChatExchanges.A.Input";
  String INPUT = "ChatExchanges.A.Output";

  @Input(ChatProcessor.INPUT)
  SubscribableChannel input();

  @Output(ChatProcessor.OUTPUT)
  MessageChannel output();
}
메시지 실체 클래스:

public class ChatMessage {
  private String name;
  private String message;
  private Date chatDate;

  //                
  private ChatMessage(){}

  public ChatMessage(String name,String message,Date chatDate){
    this.name = name;
    this.message = message;
    this.chatDate = chatDate;
  }

  public String getName(){
    return this.name;
  }

  public String getMessage(){
    return this.message;
  }

  public Date getChatDate() { return this.chatDate; }

  public String ShowMessage(){
    return String.format("    :%s   ,%s %s。",this.chatDate,this.name,this.message);
  }
}
업무 처리 클래스 는@StreamListener 대신@ServiceActivator 주석 을 사용 하고@InboundChannel Adapter 주석 으로 메 시 지 를 발표 합 니 다.

@EnableBinding(ChatProcessor.class)
public class BClient {

  private static Logger logger = LoggerFactory.getLogger(BClient.class);

  //@ServiceActivator  Json          @Transformer  
  @ServiceActivator(inputChannel=ChatProcessor.INPUT)
  public void PrintInput(ChatMessage message) {

    logger.info(message.ShowMessage());
  }

  @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
  public ChatMessage transform(String message) throws Exception{
    ObjectMapper objectMapper = new ObjectMapper();
    return objectMapper.readValue(message,ChatMessage.class);
  }

  //         A
  @Bean
  @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
  public GenericMessage<ChatMessage> SendChatMessage(){
    ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
    GenericMessage<ChatMessage> gm = new GenericMessage<>(message);
    return gm;
  }
}
실행 프로그램
A 항목 과 B 항목 시작:


소스 코드
Github 창고:https://github.com/sunweisheng/spring-cloud-example
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기