Kafka 0.8에서 0.10으로 업그레이드 시 변경 사항

1620 단어 kafka
Kafka 0.8 버전이 0.10 버전으로 업그레이드되면 다음과 같은 몇 가지 수정 사항이 필요합니다.
Kafka 버전 0.8:
val kafkaParams = Map[String, String](
  "metadata.broker.list" -> kafka_ip,  //   kafka   IP
  "refresh.leader.backoff.ms" -> "30000")
val lines = KafkaUtils
  .createDirectStream[String, String, StringDecoder, StringDecoder](  //  kafka   key value       , String
  ssc,
  kafkaParams,
  topics)
val infos = lines.reduceByKey((a: String, b: String) => YourFunc(a, b), 100)  //             key    value      
      

Kafka 0.10 버전:
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> kafka_ip,   //    0.8  “metadata.broker.list”  “bootstrap.servers”
  "key.deserializer" -> classOf[StringDeserializer],  //      kafka       
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "my_test",  //0.10        topic    groupid  key,  group.id       
  "auto.offset.reset" -> "latest",
  "refresh.leader.backoff.ms" -> "30000")
var lines = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams));//createDirectStream    0.8   ,    
val infos = lines.map(record => (record.key(),record.value()))//    :            ConsumerRecord ,        reduceByKey
val infos1 = infos.reduceByKey((a: String, b: String) => YourFunc(a, b), 100)

PS: 설명이 부족하고 심지어 약간의 문제가 존재하지만 코드에 문제가 없고 소비 가능한 데이터를 직접 측정할 수 있다. 인터넷에서 많은 자료를 봤기 때문이다(수동으로 얼굴을 가린다).중요한 것은 필요한 사람에게 도움을 주고 자신도 필기를 하는 것이다:)

좋은 웹페이지 즐겨찾기