kafka - producer 의 생산 속도 와 kafka - consumer 의 소비 속도 비교

업무 수요
상류 데 이 터 는 kafka 에 저장 되 어 있 으 며, flume 을 사용 하여 데 이 터 를 수집 하여 hdfs 등 다양한 flume Sink 으로 전송 합 니 다.이 과정 에서 kafka 의 데이터 생산 속도 가 flume 의 소비 속도 보다 높 으 면 데이터 축적 이 발생 할 수 있다.이 과정 을 감시 하기 위해 서 는 카 프 카 의 생산 과 소비 상 태 를 정시 에 감시 해 야 한다.
2. 지식 축적
지식 참고:https://blog.csdn.net/yxgxy270187133/article/details/53666760
  • kafka topic 의 offset 범위 조회
  • topic = pz 보기test_topic, broker = ${ip port} 의 offset 최소 값
  • bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic pz_test_topic --time -2

    출력: pztest_topic:0:1000
  • offset 의 최대 치 보기
  • bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic pz_test_topic --time -1

    출력: pztest_topic:0:1982
    보 이 는 바, topic: pztest_topic 는 파 티 션: 0 이 있 고 offset 범 위 는: [1000, 1982] 입 니 다.
  • consumer group 의 offset 설정
  • zookeeper client 시작
  • /zookeeper/bin/zkCli.sh
  • consumer group 설정: testgroup topic: pztest_topic partition: 0 의 offset 는 1000:
  • set /consumers/testgroup/offsets/pz_test_topic/0 1000
  • 만약 에 kafka 가 zookeeper root 를 설정 했다 면/kafka 로 명령 을 바 꿔 야 합 니 다.
  • set /kafka/consumers/testgroup/offsets/pz_test_topic/0 1000

    관련 프로그램 을 다시 시작 하면 설 정 된 offset 에서 데 이 터 를 읽 을 수 있 습 니 다.
  • 수 동 으로 kafka 를 업데이트 하려 면 zookeeper 에 존재 하 는 오프셋 이 수 동 으로 특정한 topic 의 오프셋 을 특정한 값 으로 설정 해 야 할 때 가 있 습 니 다. Zookeeper 의 데 이 터 를 업데이트 해 야 합 니 다.Kafka 내장 은 우리 에 게 오프셋 을 수정 하 는 종 류 를 제공 합 니 다. kafka. tools. UpdateOffsetsInZK 는 Zookeeper 의 특정한 주제 의 오프셋 을 수정 할 수 있 습 니 다. 구체 적 인 조작 은 다음 과 같 습 니 다.
  • bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
    USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

    인 자 를 입력 하지 않 은 상태 에서 kafka. tools. UpdateOffsetsInZK 류 가 입력 해 야 할 인 자 를 알 수 있 습 니 다.저희 consumer. properties 파일 설정 내용 은 다음 과 같 습 니 다.
    # timeout in ms for connecting to zookeeper
    #consumer group id

    이 도 구 는 Zookeeper 의 오프셋 을 earliest 또는 latest 로 설정 할 수 있 습 니 다. 다음 과 같 습 니 다.
    bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK  \
         earliest config/consumer.properties iteblog
    updating partition 0 with new offset: 276022922
    updating partition 1 with new offset: 234360148
    updating partition 2 with new offset: 157237157
    updating partition 3 with new offset: 106968019
    updating partition 4 with new offset: 80696130
    updating partition 5 with new offset: 317144986
    updating partition 6 with new offset: 299182459
    updating partition 7 with new offset: 197012246
    updating partition 8 with new offset: 230433681
    updating partition 9 with new offset: 120971431
    updating partition 10 with new offset: 51200673
    updated the offset for 11 partitions

    3. 구체 적 실천
    kafka 생산자 와 소비자 간 에 kafka 의 명령 행 을 이용 하여 생산 과 소비 사이 에 데이터 축적 이 있 는 지 확인 할 수 있 습 니 다.이 방법 은 단지 재능 이 부족 하고 학식 이 얕 은 나의 생각 일 뿐 입 니 다. 만약 합 리 적 이지 않다 면, 여러분 께 서 비판 하고 지적 해 주 십시오. 더욱 효과 적 인 방법 이 있 으 면 알려 주 십시오.
    sys_time=`date +'%Y-%m-%d %H:%M:%S'`
    topic_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic ${kafka_topic} --time -1|awk -F ":"  '{print $3}' | awk '{sum+=$1}END{print sum}')
    #kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic ${kafka_topic} --time -1|sort -f -t":" -k 3,3 -r |awk 'NR==1' 以":"进行分隔,用第3列的数值进行排序,并打印出第一行
    #topic_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic ${kafka_topic} --time -1|sort -f|awk 'NR==1'|cut -d ":" -f 3)  排序,并打印出第1行,再使用":"分隔,切出第3列的值
    #获取flume消费到的offset,由于flume的一个consumer.group中可能消费多个kafka topic,可以根据需要使用grep筛选出需要的topic的offset信息,并打印出第3行,并将该topic的所有分区offset求和
    consumer_group_offset=$(kafka-consumer-groups.sh --bootstrap-server ${ip_port} --describe --group ${consumer_group_id}|grep -i ${kafka_topic}|awk '{print $3}'| awk '{sum+=$1}END{print sum}')
    #consumer_group_offset=$(kafka-consumer-groups.sh --bootstrap-server ${ip_port} --describe --group ${consumer_group_id}|grep -i ${kafka_topic}|awk 'NR==1'|awk '{print $3}')
    #如果kafka topic的offset与flume consumer的offset值之间相差500以内,则认为是正常,否则则认为是存在s数据积压。
    if [ ${difference} -le 500 ]; then
    source ${conf_path}/mysql_conn_conf.sh
    mysql -A -u ${m_user} -p${m_password} -h ${m_host} -P ${m_port} -D ${m_db} --local-infile=1 -e"
    insert into kafka_consumers_offset_monitor (sys_time ,kafka_topic ,consumer_group_id ,topic_offset,consumer_group_offset,difference,topic_state) 
    values ('${sys_time}','${kafka_topic}','${consumer_group_id}','${topic_offset}','${consumer_group_offset}','${difference}','${topic_state}');"

