sparkstreamingkafka에 대한 데이터 압축 문제

13127 단어 kafkaspark
kafka 데이터 축적 문제
1. 문제 설명 생산 환경은sparkstreaming이kafka에 연결되고 데이터 처리를 하는 프로그램을 개발했다.처음에 프로그램이 잘 실행되었는데kafka 집단이 누군가에 의해 움직인 후에spark 프로그램을 다시 시작할 때 다음과 같은 경보 메시지가 나타납니다.
18/06/20 15:29:21 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
18/06/20 15:29:21 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
18/06/20 15:29:21 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-ymtopic
18/06/20 15:29:21 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
18/06/20 15:34:00 WARN internals.ConsumerCoordinator: Auto-commit of offsets {iptv_js-10=OffsetAndMetadata{offset=915889, metadata=''}, iptv_js-9=OffsetAndMetadata{offset=1018618, metadata=''}, iptv_js-11=OffsetAndMetadata{offset=1018619, metadata=''}, iptv_js-0=OffsetAndMetadata{offset=915887, metadata=''}, iptv_js-2=OffsetAndMetadata{offset=915888, metadata=''}, iptv_js-1=OffsetAndMetadata{offset=1018616, metadata=''}, iptv_js-4=OffsetAndMetadata{offset=915883, metadata=''}, iptv_js-3=OffsetAndMetadata{offset=1018619, metadata=''}, iptv_js-6=OffsetAndMetadata{offset=915887, metadata=''}, iptv_js-5=OffsetAndMetadata{offset=1018618, metadata=''}, iptv_js-8=OffsetAndMetadata{offset=915887, metadata=''}, iptv_js-7=OffsetAndMetadata{offset=1018621, metadata=''}} failed for group ymtopic: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

[warn information] Auto-commit of offsets {…} failed for group xxxx: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. [Google translate] xxx 그룹에 속하는offsets 자동 제출이 실패했습니다.commit가 완성되지 못한 이유는 그룹이 리balanced를 하고 파티션을 다른 구성원에게 재분배했기 때문입니다.이것은 다음에 poll () 방법을 연속적으로 호출하는 시간 간격이 설정된 max.poll보다 크다는 것을 의미한다.interval.ms의 값, 이것은 보통 폴 () 방법으로 데이터를 처리하는 시간이 너무 길기 때문입니다.세션 시간 (max.poll.interval.ms) 을 늘리거나poll () 방법으로 처리된 최대 기록 항목 (max.poll.records) 을 줄여서 이 문제를 복구할 수 있습니다.
오류 알림에 따라 max.poll을 증대할 수 있습니다.interval.ms 또는 max.poll 감소.records로 이 문제를 해결합니다.논리적으로 말하자면 이것은 물론 옳지만, 이것은 결코 문제를 해결하는 근본적인 방법이 아니다.이 문제를 일으킨 직접적인 원인은poll() 방법으로 데이터를 처리하는 시간이 너무 길기 때문이며, 근본적인 원인은kafka 데이터가 쌓였기 때문이다.한편,kafka 데이터가 쌓인 근본적인 원인은 우리 프로그램이 지정한 kafka의offset이 덮어씌워졌기 때문이다. 경보 정보는 위의 4개의overriding을 보이고 원본 코드는 다음과 같다.
/**
   * Tweak kafka params to prevent issues on executors
   */
  private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
    logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)

    logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")

    // driver and executor should be in different consumer groups
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
    if (null == originalGroupId) {
      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
    }
    val groupId = "spark-executor-" + originalGroupId
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

    // possible workaround for KAFKA-3135
    val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
    if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
      logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
      kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
    }
  }

overriding auto.offset.reset to none for executor. none:topic 각 구역에 제출한offset이 존재할 때offset 이후부터 소비하기;제출한 오프셋이 존재하지 않는 구역이 있으면 이상을 던집니다.
경보 정보에 따르면 이전 프로그램이 자동으로 저장한offset은 대략 915883~1018619사이에 있다.이것은 카프카 집단이 수동적으로 움직인 후에 또 많은 데이터를 생산했기 때문에sparkstreaming은 지난번에 저장한offset부터 소비할 수 없게 되었다는 것을 설명한다.
spark 원본을 보십시오. 다음 그림과 같습니다. 원본에서 알 수 있듯이, 우리가Subscribe에offsets 변수를 설정하면 auto를 건너갈 수 있습니다.offset.reset 파라미터가 덮어쓰는 데 미치는 영향입니다.
2. 해결 방법
프로그램이 시작되지 않는 근본적인 원인은 카프카의 데이터 축적 문제이다.그러면 저희가 수동으로 오프셋 변수를 유지하면 데이터 축적 문제를 뛰어넘을 수 있습니다. [이것은 데이터 분실을 허용하는 업무에만 적용됩니다]
운영 코드는 다음과 같습니다.
    /*
        sparkstreaming  kafka  
     */
    //     ,    Executors   kafka      “PreferBrokers”,    PreferConsistent
    val strategies = LocationStrategies.PreferConsistent
    //   ,    kafka       topic
    val topics = Array(KAFKA_Input_TOPIC)
    val kafkaParams = collection.Map[String, Object](
      "bootstrap.servers" -> KAFKA_IPS,
      "key.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer],
      "value.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer],
      "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
      "value.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
      "group.id" -> KAFKA_Group_ID,
      "auto.offset.reset" -> "latest",
      "max.poll.interval.ms" -> KAFKA_MAX_POLL_INTERVAL_MS,
      "max.poll.records" -> KAFKA_MAX_POLL_RECORDS,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //   kafka    
    val offsets = collection.Map[TopicPartition, Long] {
      new TopicPartition(KAFKA_Input_TOPIC, KAFKA_NUM_PARTITION.toInt) -> KAFKA_NOW_OFFSET
    }

    val subscribe = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)

    //      
    val stream = KafkaUtils.createDirectStream(ssc, strategies, subscribe)

좋은 웹페이지 즐겨찾기