Spring Cloud Stream Kafka Streams 바인더 + 프로세서 API

Spring Cloud Stream은 공유 메시징 시스템에 연결된 애플리케이션을 구축하기 위해 Spring에서 제공하는 솔루션입니다.

그것은 우리가 사용하는 구현(바인더)과 동일하게 작동하는 추상화(바인딩)를 제공합니다.
  • 아파치 카프카
  • 토끼 MQ
  • 카프카 스트림
  • 아마존 키네시스
  • ...

  • 이전 게시물에서 Kafka Streams 바인더를 사용하여 간단한 예제를 작업했습니다.

    여기서 목표는 Kafka Streams 바인더와 Kafka Streams Processor API을 사용하여 다음 시나리오를 구현하는 것입니다.


  • pub.user.token 항목에서 key = userId 및 value = { userId: string, token: number }인 메시지를 수신합니다
  • .
  • 1분 이내에 토큰 1, 2, 3, 4, 5를 수신하는 모든 userId에 대해 완료된 이벤트를 주제 pub.user.state로 보냅니다
  • .
  • 최소 하나의 토큰을 받았지만 1분 이내에 완전한 1, 2, 3, 4 및 5 시퀀스를 받지 못한 모든 사용자 ID에 대해 만료된 이벤트를 주제 pub.user.state로 보냅니다
  • .

    준비가 된? 코딩하자! 🤓


    로저비나스 / spring-cloud-stream-kafka-streams-processor


    🍀 Spring Cloud Streams & Kafka Streams 바인더 + 프로세서 API




    run this demoinformation about caching in the state stores를 읽는 것을 잊지 마십시오.
  • Test-first using kafka-streams-test-utils

  • UserStateStream implementation
  • 1. Aggregation by userId
  • 2. Completed UserStateEvents
  • 3. UserStateProcessor implementation
  • 4. UserStateStream and UserStateProcessor integration

  • Kafka Streams binder configuration
  • UserStateStream bean

  • Integration Test
  • 1. Kafka helpers
  • 2. DockerCompose Testcontainer
  • 3. Tests


  • kafka-streams-test-utils를 사용한 테스트 우선



    kafka-streams-test-utils@BeforeEach에 제대로 설정되면 다음 테스트를 구현할 수 있습니다.

    data class UserTokenEvent(val userId: String, val token: Int)
    
    enum class UserStateEventType { COMPLETED, EXPIRED }
    data class UserStateEvent(val userId: String, val state: UserStateEventType)
    
    @Test
    fun `should publish completed event for one user`() {
      topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
      topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))
      topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 3))
      topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 4))
      topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 5))
    
      topologyTestDriver.advanceWallClockTime(EXPIRATION.minusMillis(10))
    
      assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
        assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
        assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, COMPLETED))
      })
    }
    
    @Test
    fun `should publish expired event for one user`() {
      topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
      topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))
    
      topologyTestDriver.advanceWallClockTime(EXPIRATION.plus(SCHEDULE).plus(SCHEDULE))
    
      assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
        assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
        assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, EXPIRED))
      })
    }
    


    UserStateStream 구현



    먼저 UserStateStream 구현을 함수로 시작합니다.
  • Kafka 메시지의 키로 문자열을 원하고 Kafka 메시지의 값으로 UserTokenEvent를 원하므로 어떤 입력이 KStream입니까
  • 어떤 출력이 KStream입니까, 여기서는 문자열이 키이고 UserStateEvent가 값입니다.

  • class UserStateStream(
      private val schedule: Duration,
      private val expiration: Duration
    ) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
    
      override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
        TODO()
      }
    }
    


    이제 단계별로 ...

    1. userId별 집계




    private const val USER_STATE_STORE = "user-state"
    
    data class UserState(val userId: String = "", val tokens: List<Int> = emptyList()) {
      operator fun plus(event: UserTokenEvent) = UserState(event.userId, tokens + event.token)
    }
    
    class UserStateStream(
      private val schedule: Duration,
      private val expiration: Duration
    ) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
      override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
        return input
          .selectKey { _, event -> event.userId } // just in case but the key should be userId already
          .groupByKey()
          .aggregate(
            { UserState() },
            { userId, event, state ->
              logger.info("Aggregate $userId ${state.tokens} + ${event.token}")
              state + event // we use the UserState's plus operator
            },
            Materialized.`as`<String, UserState, KeyValueStore<Bytes, ByteArray>>(USER_STATE_STORE)
              .withKeySerde(Serdes.StringSerde())
              .withValueSerde(JsonSerde(UserState::class.java))
          )
          .toStream()
          // From here down it is just to avoid compilation errors
          .mapValues { userId, _ ->
            UserStateEvent(userId, COMPLETED) 
          }
      }
    }
    


    2. 완료된 UserStateEvents



    마지막 UserTokenEvent를 받으면 바로 완료된 UserStateEvents를 생성할 수 있습니다.

    data class UserState(val userId: String = "", val tokens: List<Int> = emptyList()) {
      // ...
      fun isCompleted() = tokens.containsAll(listOf(1, 2, 3, 4, 5))
    }
    
    class UserStateStream(
      private val schedule: Duration,
      private val expiration: Duration
    ) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
      override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
        return input
          // ...
          .toStream()
          .mapValues { state ->
            logger.info("State $state")
            when {
              state.isCompleted() -> UserStateEvent(state.userId, COMPLETED)
              else -> null
            }
          }
          .filter { _, event -> event != null }
          .mapValues { event ->
            logger.info("Publish $event")
            event!!
          }
      }
    }
    


    3. UserStateProcessor 구현



    UserStateProcessor는 주기적으로 "사용자 상태"저장소를 스캔하고 모든 UserState에 만료 논리를 적용합니다.

    class UserStateProcessor(
      private val schedule: Duration,
      private val expiration: Duration
    ) : Processor<String, UserState, Void, Void> {
    
      override fun init(context: ProcessorContext<Void, Void>) {
        context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
          val stateStore = context.getStateStore<KeyValueStore<String, ValueAndTimestamp<UserState>>>(USER_STATE_STORE)
          stateStore.all().forEachRemaining { it : KeyValue<String, ValueAndTimestamp<UserState>> ->
            logger.info("Do something with $it!!") // TODO
          }
        }
      }
    
      override fun process(record: Record<String, UserState>?) {
        // we do not need to do anything here
      }
    }
    


    만료 논리를 다음과 같이 적용하면 됩니다.

    data class UserState(val userId: String = "", val tokens: List<Int> = emptyList(), private val expired: Boolean = false) {
      // ...
      fun isExpired() = expired
      fun expire() = UserState(userId, tokens, true)
    }
    
    class UserStateProcessor(
      private val schedule: Duration,
      private val expiration: Duration
    ) : Processor<String, UserState, Void, Void> {
    
      override fun init(context: ProcessorContext<Void, Void>) {
        context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
          val stateStore = context.getStateStore<KeyValueStore<String, ValueAndTimestamp<UserState>>>(USER_STATE_STORE)
          stateStore.all().forEachRemaining {
            val age = Duration.ofMillis(time - it.value.timestamp())
            if (age > expiration) {
              if (it.value.value().isExpired()) {
                // if it is already expired from a previous execution, we delete it
                logger.info("Delete ${it.key}")
                stateStore.delete(it.key)
              } else {
                // if it has expired right now, we mark it as expired and we update it
                logger.info("Expire ${it.key}")
                stateStore.put(it.key, ValueAndTimestamp.make(it.value.value().expire(), it.value.timestamp()))
              }
            }
          }
        }
      }
    }
    


    4. UserStateStream 및 UserStateProcessor 통합




    class UserStateStream(
      private val schedule: Duration,
      private val expiration: Duration
    ) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
      override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
        return input
          // ...
          .toStream()
          // we add the UserStateProcessor
          .apply { process(ProcessorSupplier { UserStateProcessor(schedule, expiration) }, USER_STATE_STORE) }
          // downstream we will both receive upstream realtime values as the ones "generated" by the UserStateProcessor
          .mapValues { state ->
            logger.info("State $state")
            when {
              // null states are sent downstream by UserStateProcessor when deleting entries from the store
              state == null -> null // "null" value generated by UserStateProcessor deleting values from the store
              // completed states are sent downstream from upstream
              state.isCompleted() -> UserStateEvent(state.userId, COMPLETED)
              // expired states are sent downstream by UserStateProcessor when updating entries from the store
              state.isExpired() -> UserStateEvent(state.userId, EXPIRED)
              else -> null
            }
          }
          .filter { _, event -> event != null }
          .mapValues { event ->
            logger.info("Publish $event")
            event!!
          }
      }
    }
    


    그리고 바로 이 시점에서 우리UserStreamTest가 통과해야 합니다 🟩 👌

    Kafka Streams 바인더 구성



    쉬운!

    spring:
      application:
        name: "spring-cloud-stream-kafka-streams-processor"
      cloud:
        stream:
          function:
            definition: userStateStream
            bindings:
              userStateStream-in-0: "pub.user.token"
              userStateStream-out-0: "pub.user.state"
          kafka:
            streams:
              binder:
                applicationId: "${spring.application.name}"
                brokers: "localhost:9094"
                configuration:
                  default:
                    key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                    value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    


    이 구성:

  • Spring Cloud Stream은 localhost:9094에 연결된 Kafka Streams 바인더를 생성합니다.
  • Function 인터페이스를 구현해야 하는 userStateStream이라는 @Bean을 생성해야 합니다.
  • 이 @Bean은 pub.user.token 주제를 구독하는 KStream을 pub.user.state 주제에 게시하는 다른 KStream에 연결합니다
  • .


    Kafka Streams Properties 에 설명된 사용 가능한 모든 구성 속성을 찾을 수 있습니다.

    UserStateStream 빈



    구성에서 요구하는 대로 userStateStream라는 이름의 @Bean을 생성해야 합니다.

    @Configuration
    class ApplicationConfiguration {
    
      @Bean
      fun userStateStream(
        @Value("\${user.schedule}") schedule: Duration,
        @Value("\${user.expiration}") expiration: Duration
      ): Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> = UserStateStream(schedule, expiration)
    }
    


    통합 테스트



    우리는 이미 kafka-streams-test-utils를 사용하여 UserStateStream을 "단위 테스트"했지만 Kafka 컨테이너를 사용하여 통합 테스트도 필요합니다...Testcontainers 구조를 위해!

    1. 카프카 도우미



    먼저 Kafka에 생성하고 kafka-clients 라이브러리를 사용하여 Kafka에서 소비하는 유틸리티 클래스가 필요합니다.

    
    class KafkaConsumerHelper(bootstrapServers: String, topic: String) {
    
      fun consumeAll(): List<ConsumerRecord<String, String>> {
        // ...
      }
    
      fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List<ConsumerRecord<String, String>> {
        // ...
      }
    }
    
    class KafkaProducerHelper(bootstrapServers: String) {
    
      fun send(topic: String?, key: String, body: String) {
        // ...
      }
    }
    


    2. DockerCompose 테스트 컨테이너



    Testcontainers + Junit5에 설명된 대로 @Testcontainers 주석을 사용할 수 있습니다.

    @SpringBootTest
    @Testcontainers
    @ActiveProfiles("test")
    class ApplicationIntegrationTest {
    
      companion object {
    
        @Container
        val container = DockerComposeContainerHelper().createContainer()
      }
    
      // ...
    }
    


    3. 테스트



    그리고 마지막으로 비동기식 항목을 테스트할 때 Awaitility을 사용하여 테스트합니다.

    class ApplicationIntegrationTest {
    
      // ...
    
      @Test
      fun `should publish completed event`() {
        val username = UUID.randomUUID().toString()
    
        kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 1}""")
        kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 2}""")
        kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 3}""")
        kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 4}""")
        kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 5}""")
    
        await().atMost(ONE_MINUTE).untilAsserted {
          val record = kafkaConsumerHelper.consumeAtLeast(1, ONE_SECOND)
          assertThat(record).singleElement().satisfies(Consumer {
            assertThat(it.key()).isEqualTo(username)
            JSONAssert.assertEquals("""{"userId": "$username", "state": "COMPLETED"}""", it.value(), true)
          })
        }
      }
    
      @Test
      fun `should publish expired event`() {
        val username = UUID.randomUUID().toString()
    
        kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 1}""")
        kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 2}""")
        kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 3}""")
        kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 4}""")
    
        await().atMost(ONE_MINUTE).untilAsserted {
          val record = kafkaConsumerHelper.consumeAtLeast(1, ONE_SECOND)
          assertThat(record).singleElement().satisfies(Consumer {
            assertThat(it.key()).isEqualTo(username)
            JSONAssert.assertEquals("""{"userId": "$username", "state": "EXPIRED"}""", it.value(), true)
          })
        }
      }
    }
    


    그리고 바로 이 시점에서 모든 테스트를 통과해야 합니다 🟩 👏

    그게 다야, 행복한 코딩! 💙

    좋은 웹페이지 즐겨찾기