Spring Cloud Stream Kafka Streams 바인더 + 프로세서 API
54854 단어 springcloudstreamkafkastreamsshowdev
그것은 우리가 사용하는 구현(바인더)과 동일하게 작동하는 추상화(바인딩)를 제공합니다.
이전 게시물에서 Kafka Streams 바인더를 사용하여 간단한 예제를 작업했습니다.
여기서 목표는 Kafka Streams 바인더와 Kafka Streams Processor API을 사용하여 다음 시나리오를 구현하는 것입니다.
준비가 된? 코딩하자! 🤓
로저비나스 / spring-cloud-stream-kafka-streams-processor
🍀 Spring Cloud Streams & Kafka Streams 바인더 + 프로세서 API
run this demo 이 information about caching in the state stores를 읽는 것을 잊지 마십시오.
UserStateStream implementation
Integration Test
kafka-streams-test-utils를 사용한 테스트 우선
kafka-streams-test-utils이 @BeforeEach에 제대로 설정되면 다음 테스트를 구현할 수 있습니다.
data class UserTokenEvent(val userId: String, val token: Int)
enum class UserStateEventType { COMPLETED, EXPIRED }
data class UserStateEvent(val userId: String, val state: UserStateEventType)
@Test
fun `should publish completed event for one user`() {
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 3))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 4))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 5))
topologyTestDriver.advanceWallClockTime(EXPIRATION.minusMillis(10))
assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, COMPLETED))
})
}
@Test
fun `should publish expired event for one user`() {
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))
topologyTestDriver.advanceWallClockTime(EXPIRATION.plus(SCHEDULE).plus(SCHEDULE))
assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, EXPIRED))
})
}
UserStateStream 구현
먼저 UserStateStream 구현을 함수로 시작합니다.
class UserStateStream(
private val schedule: Duration,
private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
TODO()
}
}
이제 단계별로 ...
1. userId별 집계
private const val USER_STATE_STORE = "user-state"
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList()) {
operator fun plus(event: UserTokenEvent) = UserState(event.userId, tokens + event.token)
}
class UserStateStream(
private val schedule: Duration,
private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
return input
.selectKey { _, event -> event.userId } // just in case but the key should be userId already
.groupByKey()
.aggregate(
{ UserState() },
{ userId, event, state ->
logger.info("Aggregate $userId ${state.tokens} + ${event.token}")
state + event // we use the UserState's plus operator
},
Materialized.`as`<String, UserState, KeyValueStore<Bytes, ByteArray>>(USER_STATE_STORE)
.withKeySerde(Serdes.StringSerde())
.withValueSerde(JsonSerde(UserState::class.java))
)
.toStream()
// From here down it is just to avoid compilation errors
.mapValues { userId, _ ->
UserStateEvent(userId, COMPLETED)
}
}
}
2. 완료된 UserStateEvents
마지막 UserTokenEvent를 받으면 바로 완료된 UserStateEvents를 생성할 수 있습니다.
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList()) {
// ...
fun isCompleted() = tokens.containsAll(listOf(1, 2, 3, 4, 5))
}
class UserStateStream(
private val schedule: Duration,
private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
return input
// ...
.toStream()
.mapValues { state ->
logger.info("State $state")
when {
state.isCompleted() -> UserStateEvent(state.userId, COMPLETED)
else -> null
}
}
.filter { _, event -> event != null }
.mapValues { event ->
logger.info("Publish $event")
event!!
}
}
}
3. UserStateProcessor 구현
UserStateProcessor는 주기적으로 "사용자 상태"저장소를 스캔하고 모든 UserState에 만료 논리를 적용합니다.
class UserStateProcessor(
private val schedule: Duration,
private val expiration: Duration
) : Processor<String, UserState, Void, Void> {
override fun init(context: ProcessorContext<Void, Void>) {
context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
val stateStore = context.getStateStore<KeyValueStore<String, ValueAndTimestamp<UserState>>>(USER_STATE_STORE)
stateStore.all().forEachRemaining { it : KeyValue<String, ValueAndTimestamp<UserState>> ->
logger.info("Do something with $it!!") // TODO
}
}
}
override fun process(record: Record<String, UserState>?) {
// we do not need to do anything here
}
}
만료 논리를 다음과 같이 적용하면 됩니다.
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList(), private val expired: Boolean = false) {
// ...
fun isExpired() = expired
fun expire() = UserState(userId, tokens, true)
}
class UserStateProcessor(
private val schedule: Duration,
private val expiration: Duration
) : Processor<String, UserState, Void, Void> {
override fun init(context: ProcessorContext<Void, Void>) {
context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
val stateStore = context.getStateStore<KeyValueStore<String, ValueAndTimestamp<UserState>>>(USER_STATE_STORE)
stateStore.all().forEachRemaining {
val age = Duration.ofMillis(time - it.value.timestamp())
if (age > expiration) {
if (it.value.value().isExpired()) {
// if it is already expired from a previous execution, we delete it
logger.info("Delete ${it.key}")
stateStore.delete(it.key)
} else {
// if it has expired right now, we mark it as expired and we update it
logger.info("Expire ${it.key}")
stateStore.put(it.key, ValueAndTimestamp.make(it.value.value().expire(), it.value.timestamp()))
}
}
}
}
}
}
4. UserStateStream 및 UserStateProcessor 통합
class UserStateStream(
private val schedule: Duration,
private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
return input
// ...
.toStream()
// we add the UserStateProcessor
.apply { process(ProcessorSupplier { UserStateProcessor(schedule, expiration) }, USER_STATE_STORE) }
// downstream we will both receive upstream realtime values as the ones "generated" by the UserStateProcessor
.mapValues { state ->
logger.info("State $state")
when {
// null states are sent downstream by UserStateProcessor when deleting entries from the store
state == null -> null // "null" value generated by UserStateProcessor deleting values from the store
// completed states are sent downstream from upstream
state.isCompleted() -> UserStateEvent(state.userId, COMPLETED)
// expired states are sent downstream by UserStateProcessor when updating entries from the store
state.isExpired() -> UserStateEvent(state.userId, EXPIRED)
else -> null
}
}
.filter { _, event -> event != null }
.mapValues { event ->
logger.info("Publish $event")
event!!
}
}
}
그리고 바로 이 시점에서 우리UserStreamTest가 통과해야 합니다 🟩 👌
Kafka Streams 바인더 구성
쉬운!
spring:
application:
name: "spring-cloud-stream-kafka-streams-processor"
cloud:
stream:
function:
definition: userStateStream
bindings:
userStateStream-in-0: "pub.user.token"
userStateStream-out-0: "pub.user.state"
kafka:
streams:
binder:
applicationId: "${spring.application.name}"
brokers: "localhost:9094"
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
이 구성:
Spring Cloud Stream은 localhost:9094에 연결된 Kafka Streams 바인더를 생성합니다.
Kafka Streams Properties 에 설명된 사용 가능한 모든 구성 속성을 찾을 수 있습니다.
UserStateStream 빈
구성에서 요구하는 대로
userStateStream
라는 이름의 @Bean을 생성해야 합니다.@Configuration
class ApplicationConfiguration {
@Bean
fun userStateStream(
@Value("\${user.schedule}") schedule: Duration,
@Value("\${user.expiration}") expiration: Duration
): Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> = UserStateStream(schedule, expiration)
}
통합 테스트
우리는 이미 kafka-streams-test-utils를 사용하여 UserStateStream을 "단위 테스트"했지만 Kafka 컨테이너를 사용하여 통합 테스트도 필요합니다...Testcontainers 구조를 위해!
1. 카프카 도우미
먼저 Kafka에 생성하고 kafka-clients 라이브러리를 사용하여 Kafka에서 소비하는 유틸리티 클래스가 필요합니다.
class KafkaConsumerHelper(bootstrapServers: String, topic: String) {
fun consumeAll(): List<ConsumerRecord<String, String>> {
// ...
}
fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List<ConsumerRecord<String, String>> {
// ...
}
}
class KafkaProducerHelper(bootstrapServers: String) {
fun send(topic: String?, key: String, body: String) {
// ...
}
}
2. DockerCompose 테스트 컨테이너
Testcontainers + Junit5에 설명된 대로
@Testcontainers
주석을 사용할 수 있습니다.@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
class ApplicationIntegrationTest {
companion object {
@Container
val container = DockerComposeContainerHelper().createContainer()
}
// ...
}
3. 테스트
그리고 마지막으로 비동기식 항목을 테스트할 때 Awaitility을 사용하여 테스트합니다.
class ApplicationIntegrationTest {
// ...
@Test
fun `should publish completed event`() {
val username = UUID.randomUUID().toString()
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 1}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 2}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 3}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 4}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 5}""")
await().atMost(ONE_MINUTE).untilAsserted {
val record = kafkaConsumerHelper.consumeAtLeast(1, ONE_SECOND)
assertThat(record).singleElement().satisfies(Consumer {
assertThat(it.key()).isEqualTo(username)
JSONAssert.assertEquals("""{"userId": "$username", "state": "COMPLETED"}""", it.value(), true)
})
}
}
@Test
fun `should publish expired event`() {
val username = UUID.randomUUID().toString()
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 1}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 2}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 3}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 4}""")
await().atMost(ONE_MINUTE).untilAsserted {
val record = kafkaConsumerHelper.consumeAtLeast(1, ONE_SECOND)
assertThat(record).singleElement().satisfies(Consumer {
assertThat(it.key()).isEqualTo(username)
JSONAssert.assertEquals("""{"userId": "$username", "state": "EXPIRED"}""", it.value(), true)
})
}
}
}
그리고 바로 이 시점에서 모든 테스트를 통과해야 합니다 🟩 👏
그게 다야, 행복한 코딩! 💙
Reference
이 문제에 관하여(Spring Cloud Stream Kafka Streams 바인더 + 프로세서 API), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/rogervinas/spring-cloud-stream-kafka-streams-binder-processor-api-2hko텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)