Kafka 0.8에서 0.10으로 업그레이드 시 변경 사항
1620 단어 kafka
Kafka 버전 0.8:
val kafkaParams = Map[String, String](
"metadata.broker.list" -> kafka_ip, // kafka IP
"refresh.leader.backoff.ms" -> "30000")
val lines = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder]( // kafka key value , String
ssc,
kafkaParams,
topics)
val infos = lines.reduceByKey((a: String, b: String) => YourFunc(a, b), 100) // key value
Kafka 0.10 버전:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kafka_ip, // 0.8 “metadata.broker.list” “bootstrap.servers”
"key.deserializer" -> classOf[StringDeserializer], // kafka
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my_test", //0.10 topic groupid key, group.id
"auto.offset.reset" -> "latest",
"refresh.leader.backoff.ms" -> "30000")
var lines = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams));//createDirectStream 0.8 ,
val infos = lines.map(record => (record.key(),record.value()))// : ConsumerRecord , reduceByKey
val infos1 = infos.reduceByKey((a: String, b: String) => YourFunc(a, b), 100)
PS: 설명이 부족하고 심지어 약간의 문제가 존재하지만 코드에 문제가 없고 소비 가능한 데이터를 직접 측정할 수 있다. 인터넷에서 많은 자료를 봤기 때문이다(수동으로 얼굴을 가린다).중요한 것은 필요한 사람에게 도움을 주고 자신도 필기를 하는 것이다:)
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spring Cloud를 사용한 기능적 Kafka - 1부지금까지 찾을 수 없었던 Spring Cloud Kafka의 작업 데모를 만들기 위해 이 기사를 정리했습니다. Confluent 스키마 레지스트리 7.1.0 이 기사는 먼저 Spring Cloud Stream을 사용...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.