봄철 카프카 계류 놀이터와 코틀린 - III


문맥
이 글은 Kotlin이 Spring boot과 Spring Kafka를 사용하여 간단한 Kafka Streams 프로그램을 만드는 시리즈의 일부분입니다.
강좌의 마지막 부분을 보고 공부를 시작하며 우리가 구축하고 있는 내용을 더 알아보세요.

If you want to start from here you can clone the source code for this project git clone [email protected]:mmaia/simple-spring-kafka-stream-kotlin.git and then checkout v6 git checkout v6 and follow from there continuing with this post.


본고에서 첫 번째 Spring Kafka 흐름, Global Ktable, Rocks DB 상태 저장소를 만들고, 두 번째 부분에서ReadOnly Key Value Store를 사용하여 만든 Rest 컨트롤러에 공개합니다.


Spring Kafka Streams용 어플리케이션 구성
이제 Kafka Streams를 사용하려면 Spring Boot이 이 점을 알아야 합니다. 그래서 @EnableKafkaStreams Spring Kafka 주석을 kafkaConfiguration 클래스 정의에 추가하기 시작했습니다. 기존의 @EnableKafka 클래스 정의 아래에 있습니다.
@Configuration
@EnableKafka
@EnableKafkaStreams
class KafkaConfiguration { 
...
우리가 이전에 소비자와 생산자에게 했던 것처럼 기본적인 기본 흐름 설정을 추가해야 하지만, 이번에는 Spring 가이드 설정으로 application.yaml 파일에 추가하는 것이 아니라 프로그래밍 방법을 사용할 것입니다. 이것도 도움이 될 것입니다. 그러나 이 두 가지 방법의 예시가 있어서 당신이 그것들을 이해하고 당신에게 가장 적합한 방법을 선택할 수 있어서 기쁩니다.

In case you have multiple consumers, producers or streams in the same Spring Boot application and want different configurations for each you will have to take the programmatic approach as it gives you more flexibility to configure each specific client while the configuration approach only enables you to configure the default spring bean of each type.

KafkaConfiguration 클래스에서 기본 Streams 구성을 만들었습니다.
@Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME])
    fun defaultKafkaStreamsConfig(): KafkaStreamsConfiguration {
        val props: MutableMap<String, Any> = HashMap()
        props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = KAFKA_HOSTS
        props[StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG] =
            LogAndContinueExceptionHandler::class.java
        props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = SCHEMA_REGISTRY_URL
        props[StreamsConfig.APPLICATION_ID_CONFIG] = "quote-stream"
        props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
        props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = SpecificAvroSerde::class.java
        props[ConsumerConfig.GROUP_ID_CONFIG] = "stock-quotes-stream-group"

        return KafkaStreamsConfiguration(props)
    }
우리는 또한 KafkaConfiguration 클래스에 다른 bean을 추가하여 흐름 상태를 출력할 수 있습니다. 이것은 학습과 이해에 도움이 됩니다. 실제 응용에서 이렇게 하지 않는 것을 선택할 수 있습니다. 만약 이렇게 한다면, 당신은 이 예에서처럼 stdout을 인쇄하지 않고 기록기를 사용할 것입니다.
@Bean
fun configurer(): StreamsBuilderFactoryBeanConfigurer? {
    return StreamsBuilderFactoryBeanConfigurer { fb: StreamsBuilderFactoryBean ->
        fb.setStateListener { newState: KafkaStreams.State, oldState: KafkaStreams.State ->
            println("State transition from $oldState to $newState")
        }
    }
}

레버용 GlobalKtable 생성
현재 카프카류가 정확하게 설정되어 있습니다. 우리는 첫 번째 카프카류 요소를 만들 수 있습니다.GlobalKtable과 레버리지 가격의 물적 시각을 만들기 시작하겠습니다.
같은 LeverageStream 패키지에 repository이라는 새로운 종류를 만들고 @Repository으로 주석을 달았다.
@Repository
class LeverageStream {
...
GlobalKtable을 만들고 이를 구체화하기 전에 Kafka Streams에서 처리하고 저장할 대상에 어떤 종류의 서열화와 반서열화가 필요한지 알려야 합니다.Kafka Streams에서는 Serde을 사용하여 이루어졌습니다. Avro를 사용하기 때문에 Avro Server SpecificAvroSerde을 사용하여 이전 단계에서 만들어진 LeveragePrice 유형을 전달하고 다음 내용을 Leverage Stream 클래스에 추가하여 Serde를 설명하고 설정하고 초기화합니다.
private val leveragePriceSerde = SpecificAvroSerde<LeveragePrice>()

val serdeConfig: MutableMap<String, String> = Collections.singletonMap(       AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)

@PostConstruct
fun init() {
    leveragePriceSerde.configure(serdeConfig, false)
}

Kafka Streams Serde과 Avro 모드의 설정을 어디서 가져오는지 알려 드리겠습니다. 다음 코드를 추가하여 물리화된 Rocks DB Key Value 저장소가 있는 Global Ktable를 만들 수 있습니다.
    @Bean
    fun leveragePriceBySymbolGKTable(streamsBuilder: StreamsBuilder): GlobalKTable<String, LeveragePrice> {      
        return streamsBuilder
            .globalTable(
                LEVERAGE_PRICES_TOPIC,
                Materialized.`as`<String, LeveragePrice, KeyValueStore<Bytes, ByteArray>>(LEVERAGE_BY_SYMBOL_TABLE)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(leveragePriceSerde)
            )
    }

The materialized view will by default create a RocksDB locally to your application for you, you don't have to manage or explicitly do anything it's... Magic!


이 Rocks DB가 어디서 만들어졌는지 알고 싶을 수도 있습니다. StreamsConfig.STATE_DIR_CONFIG (즉 -state.dir) 을 설정해서 제어하고 원하는 위치를 가리킬 수 있습니다.이 예에서 기본값을 보존했습니다. 저희는 속성을 지정하지 않았기 때문에, 이 강좌를 실행하는 운영체제에 따라 기본값을 선택할 것입니다.linux 기기에서 /tmp/kafka-streams/${stream-id}/${type}/rocksdb/${local_storage_name} 폴더에서 찾을 수 있습니다. 이 예의 전체 경로는 /tmp/kafka-streams/quote-stream/global/rocksdb/leverage-by-symbol-ktable/입니다. 그 중에서RocksDB 파일을 볼 수 있습니다.


물적 뷰 표시
좋습니다. 현재 우리는 로컬 저장소가 하나 생겼습니다. 우리는 그것을 조회하여 최신 기기의 이용률을 얻을 수 있기를 바랍니다.
Rocks DB를 조회할 수 있도록, 우리는 ReadOnlyKeyValueStore을 성명할 것입니다. 우리는 Kotlin의 lateinit 수식자를 사용할 것입니다. 왜냐하면 우리는 흐르는 생명주기를 초기화하기 전에 그것을 연결해야 하기 때문입니다.i, e-RocksDB 저장소가 필요합니다.LeverageStream에 신고:
private lateinit var leveragePriceView: ReadOnlyKeyValueStore<String, LeveragePrice>
다음에 @Bean 탐지기를 설명하고 흐름을 만든 후에 초기화합니다.
@Bean
fun afterStart(sbfb: StreamsBuilderFactoryBean): StreamsBuilderFactoryBean.Listener {
    val listener: StreamsBuilderFactoryBean.Listener = object : StreamsBuilderFactoryBean.Listener {
        override fun streamsAdded(id: String, streams: KafkaStreams) {
            leveragePriceView = streams.store<ReadOnlyKeyValueStore<String, LeveragePrice>>(
                    StoreQueryParameters.fromNameAndType(
                        LEVERAGE_BY_SYMBOL_TABLE,
                        QueryableStoreTypes.keyValueStore()
                    )
            )
        }
    }
    sbfb.addListener(listener)
    return listener
}
그런 다음 수신 키의 특정 값을 가져오는 함수를 만들 수 있습니다.
fun getLeveragePrice(key: String?): LeveragePrice? {
    return leveragePriceView[key]
}
마지막으로 우리는 새로운 REST 단점을 QuotesController 클래스에 추가하여 레버의 역할을 공개할 수 있다.
    @GetMapping("leveragePrice/{instrumentSymbol}")
    fun getLeveragePrice(@PathVariable instrumentSymbol: String): ResponseEntity<LeveragePriceDTO> {
        val leveragePrice: LeveragePrice = leverageStream.getLeveragePrice(instrumentSymbol)
            ?: return ResponseEntity.noContent().build()
        // if quote doesn't exist in our local store we return no content.
        val result = LeveragePriceDTO(leveragePrice.symbol.toString(), BigDecimal.valueOf(leveragePrice.leverage))
        return ResponseEntity.ok(result)
    }
이것으로 우리는 우리의 응용 프로그램을 구축하고 실행할 수 있다: mvn clean package -DskipTests && mvn spring-boot:run.이 시리즈와 의 지침에 따라 로컬 카프카드를 실행하고 일부 호출을 실행해야 합니다.
  • 에 지렛대를 추가하면 카프카 테마에 새로운 지렛대를 제공할 것입니다.
  • POST http://localhost:8080/api/leverage
    Content-Type: application/json
    
    < ./test-data/leveragePrice1.json
    
    RocksDB 로컬 저장소에서 조회하기
    GET http://localhost:8080/api/leveragePrice/APPL
    
    더 많은 견적을 추가하고, 그것을 조회할 때 작업을 진행하면, 저장소가 당신이 보낸 최신 견적으로 어떻게 자동으로 업데이트되는지 볼 수 있습니다.
    평소와 같이 코드를 검사할 수 있습니다. 지금까지 project repogit clone [email protected]:mmaia/simple-spring-kafka-stream-kotlin.git을 검사하고 v7과 git checkout v7을 검사할 수 있습니다.
    이것은 이 시리즈의 세 번째 부분입니다. 나는 지금까지 당신이 카프카류를 배우고 노는 데 약간의 즐거움을 가지고 있기를 바랍니다.본 시리즈의 다음 글에서 우리는 인용 흐름을 만들고 지렛대를 넣으며 일부 기준에 따라 서로 다른 주제로 나눌 것이니 주목해 주십시오.
    즐겁게 놀고, 자신과 타인을 잘 대해라!
    Ryland Dean, Unsplash에서 촬영

    좋은 웹페이지 즐겨찾기