Spring Cloud를 사용한 기능적 Kafka - 2부
8282 단어 kafkafunctionalspringcloud
중단한 부분부터 계속해서 https://start.spring.io 웹 사이트를 사용하여 다른 Spring 애플리케이션을 생성해야 합니다.
이번에는 Spring Web 종속성을 생략할 수 있습니다!
우리는 아직 멋진 것을 추가하지 않을 것입니다. 코드 베이스를 가져와서 새 애플리케이션을 시작할 준비를 하십시오!
스프링 클라우드 설정
먼저 Spring Cloud 종속성을 설정하겠습니다.
pom.xml
에서 다음과 같이 변경해 보겠습니다.<properties>
<java.version>18</java.version>
<spring-cloud-release.version>2021.0.1</spring-cloud-release.version>
</properties>
다음으로 Spring Cloud POM을 참조하려고 합니다.
project
태그 아래에 다음 XML 블록을 추가합니다.<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud-release.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
여기에는 Spring Cloud v2021.0.1에서 필요한 모든 종속성이 포함됩니다.
문자열 기반 소비자의 경우 추가된 2개의 핵심 종속성과 1개의 테스트 종속성이 필요합니다.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
소비자 및 프로세서
이제 우리는 기본적으로 데이터 파이프라인이 될 코드를 추가할 수 있습니다. 주제에서 메시지를 가져온 다음 이를 다른 주제로 전달하기 전에 이를 사용하여 작업을 수행할 수 있습니다.
다음 빈으로 새 클래스
KafkaConfiguration.java
를 만듭니다.package com.example.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import java.util.function.Consumer;
import java.util.function.Function;
@Configuration
public class KafkaConfiguration {
private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
@Bean
public Function<Flux<String>, Flux<String>> employeeProcessor() {
return employee -> employee.map(emp -> {
log.info("Employee is {}", emp);
return emp;
}).log();
}
@Bean
public Consumer<String> employeeConsumer() {
return (value) -> log.info("Consumed {}", value);
}
}
코드를 살펴봅시다!
발생하는 값을 출력하는 기본 로거가 있습니다.
private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
다음으로 주제에서 오는 메시지의 입력/출력 역할을 할 bean이 있습니다.
@Bean
public Function<Flux<String>, Flux<String>> employeeProcessor() {
return employee -> employee.map(emp -> {
log.info("Employee is {}", emp);
return emp;
}).log();
}
우리의 메서드 서명은 (데이터 입력/데이터 출력) 파이프라인을 나타냅니다.
Function<Flux<String>, Flux<String>>
여기서 우리는 기본적으로 우리가 읽는 메시지가 문자열(
<Flux<String>,
)이 될 것이고 문자열( , Flux<String>>
)인 메시지를 보낼 것이라고 말하고 있습니다.프로세서 본체는 메시지를 읽고 로그에 출력합니다(몇 번!).
return employee -> employee.map(emp -> {
log.info("Employee is {}", emp);
return emp;
}).log();
일반적으로 다른 출력을 생성하는 수신 데이터로 더 많은 작업을 수행하기 위해 다른 메서드를 호출할 수 있습니다. 지금은 메시지를 있는 그대로 보냅니다.
다음 빈은 소비자입니다. 지금은 문자열 메시지만 로그아웃합니다.
@Bean
public Consumer<String> employeeConsumer() {
return (value) -> log.info("Consumed {}", value);
}
이 응용 프로그램을 실행하기 전에 항목이 아직 정의되지 않은 항목이 누락되었음을 알 수 있습니다.
Spring Cloud Functions를 사용하여 이러한 메시징 앱을 만들 때 이러한 빈을 구성하고 메시지 버스의 기본 토픽에 바인딩하는 바인딩 기능을 따라야 합니다.
src/main/resources/application.yml
파일을 열고 지금 해봅시다!spring:
cloud:
function:
definition: employeeConsumer;employeeProcessor
stream:
bindings:
employeeProcessor-in-0:
destination: employee
employeeProcessor-out-0:
destination: employees-processed
employeeConsumer-in-0:
destination: employees-processed
kafka:
binder:
producer-properties:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
먼저 구성에서 함수 정의를 만들었습니다.
spring:
cloud:
function:
definition: employeeConsumer;employeeProcessor
보시다시피 이들은 방금 정의한 빈의 이름입니다. 나중에 구성에서 참조 지점으로 사용할 것입니다.
다음으로 실제 Bean-to-Topic 바인딩을 정의했습니다.
spring:
cloud:
stream:
bindings:
employeeProcessor-in-0:
destination: employee
employeeProcessor-out-0:
destination: employees-processed
employeeConsumer-in-0:
destination: employees-processed
정의(
employeeProcessor, employeeConsumer
)를 사용하여 그에 따라 주제를 매핑했습니다. 따라서 employeeProcessor에는 2개의 매핑이 있습니다.employee
employees-processed
한편
employeeConsumer
에는 바인딩이 1개 있습니다.employees-processed
이는
employee
주제에 해당하는 모든 항목이 employeeProcesor
에 의해 선택됨을 의미합니다. employeeProcessor
는 메시지를 인쇄한 다음 메시지를 employees-processed
항목으로 보냅니다.employees-processed
대기열로 들어오는 모든 메시지는 employeeConsumer
에 의해 선택되며 다시 메시지를 로그에 출력합니다.마지막으로 구성에서 일반적으로 Kafka에 대한 몇 가지 기본 속성을 정의합니다.
spring:
cloud:
stream:
kafka:
binder:
producer-properties:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
간단히 말해서, 우리는 메시지가 나오고 토픽으로 들어가는 것을 문자열로 취급합니다!
애플리케이션 시작
이제 다음을 사용하여 애플리케이션을 실행할 수 있습니다.
$ ./mvnw spring-boot:run
이전 기사의 원본 웹 애플리케이션이 여전히 실행 중인 경우 API에서 메시지를 보내고 소비자가 메시지를 선택하는 것을 볼 수 있어야 합니다.
Employee is Marshall, Andrew
Consumed Marshall, Andrew
다음 기사에서는 직렬 변환기로 Avro를 소개하고 바이너리 메시지와 스키마 레지스트리를 보내는 방법을 보여줍니다!
Reference
이 문제에 관하여(Spring Cloud를 사용한 기능적 Kafka - 2부), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/anthonyikeda/functional-kafka-with-spring-cloud-part-2-20g9텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)