kafka - producer 의 생산 속도 와 kafka - consumer 의 소비 속도 비교
상류 데 이 터 는 kafka 에 저장 되 어 있 으 며, flume 을 사용 하여 데 이 터 를 수집 하여 hdfs 등 다양한 flume Sink 으로 전송 합 니 다.이 과정 에서 kafka 의 데이터 생산 속도 가 flume 의 소비 속도 보다 높 으 면 데이터 축적 이 발생 할 수 있다.이 과정 을 감시 하기 위해 서 는 카 프 카 의 생산 과 소비 상 태 를 정시 에 감시 해 야 한다.
2. 지식 축적
지식 참고:https://blog.csdn.net/yxgxy270187133/article/details/53666760
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic pz_test_topic --time -2
출력: pztest_topic:0:1000
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic pz_test_topic --time -1
출력: pztest_topic:0:1982
보 이 는 바, topic: pztest_topic 는 파 티 션: 0 이 있 고 offset 범 위 는: [1000, 1982] 입 니 다.
set /consumers/testgroup/offsets/pz_test_topic/0 1000
set /kafka/consumers/testgroup/offsets/pz_test_topic/0 1000
관련 프로그램 을 다시 시작 하면 설 정 된 offset 에서 데 이 터 를 읽 을 수 있 습 니 다.
bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic
인 자 를 입력 하지 않 은 상태 에서 kafka. tools. UpdateOffsetsInZK 류 가 입력 해 야 할 인 자 를 알 수 있 습 니 다.저희 consumer. properties 파일 설정 내용 은 다음 과 같 습 니 다.
zookeeper.connect=${zookeeper_ip_port}
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=group
이 도 구 는 Zookeeper 의 오프셋 을 earliest 또는 latest 로 설정 할 수 있 습 니 다. 다음 과 같 습 니 다.
bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK \
earliest config/consumer.properties iteblog
updating partition 0 with new offset: 276022922
updating partition 1 with new offset: 234360148
updating partition 2 with new offset: 157237157
updating partition 3 with new offset: 106968019
updating partition 4 with new offset: 80696130
updating partition 5 with new offset: 317144986
updating partition 6 with new offset: 299182459
updating partition 7 with new offset: 197012246
updating partition 8 with new offset: 230433681
updating partition 9 with new offset: 120971431
updating partition 10 with new offset: 51200673
updated the offset for 11 partitions
3. 구체 적 실천
kafka 생산자 와 소비자 간 에 kafka 의 명령 행 을 이용 하여 생산 과 소비 사이 에 데이터 축적 이 있 는 지 확인 할 수 있 습 니 다.이 방법 은 단지 재능 이 부족 하고 학식 이 얕 은 나의 생각 일 뿐 입 니 다. 만약 합 리 적 이지 않다 면, 여러분 께 서 비판 하고 지적 해 주 십시오. 더욱 효과 적 인 방법 이 있 으 면 알려 주 십시오.
#监控flume消费kafka数据过程中是否有数据积压
#kafka的ip与端口
ip_port=$1
#kafka_topic名
kafka_topic=$2
#consumer消费者组名
consumer_group_id=$3
#脚本执行时间
sys_time=`date +'%Y-%m-%d %H:%M:%S'`
#查看kafkaTOPIC的最大offset,并使用":"分隔,打印出第3列,即offset数字列,再将所有分区的offset值求和
topic_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic ${kafka_topic} --time -1|awk -F ":" '{print $3}' | awk '{sum+=$1}END{print sum}')
#kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic ${kafka_topic} --time -1|sort -f -t":" -k 3,3 -r |awk 'NR==1' 以":"进行分隔,用第3列的数值进行排序,并打印出第一行
#topic_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${ip_port} -topic ${kafka_topic} --time -1|sort -f|awk 'NR==1'|cut -d ":" -f 3) 排序,并打印出第1行,再使用":"分隔,切出第3列的值
#获取flume消费到的offset,由于flume的一个consumer.group中可能消费多个kafka topic,可以根据需要使用grep筛选出需要的topic的offset信息,并打印出第3行,并将该topic的所有分区offset求和
consumer_group_offset=$(kafka-consumer-groups.sh --bootstrap-server ${ip_port} --describe --group ${consumer_group_id}|grep -i ${kafka_topic}|awk '{print $3}'| awk '{sum+=$1}END{print sum}')
#consumer_group_offset=$(kafka-consumer-groups.sh --bootstrap-server ${ip_port} --describe --group ${consumer_group_id}|grep -i ${kafka_topic}|awk 'NR==1'|awk '{print $3}')
#计算两个offset的差
difference=$((topic_offset-consumer_group_offset))
#如果kafka topic的offset与flume consumer的offset值之间相差500以内,则认为是正常,否则则认为是存在s数据积压。
if [ ${difference} -le 500 ]; then
topic_state=NOMAL
else
topic_state=OVERSTACK
fi
#将监控结果写入mysql数据库,方便日常监控
source ${conf_path}/mysql_conn_conf.sh
mysql -A -u ${m_user} -p${m_password} -h ${m_host} -P ${m_port} -D ${m_db} --local-infile=1 -e"
insert into kafka_consumers_offset_monitor (sys_time ,kafka_topic ,consumer_group_id ,topic_offset,consumer_group_offset,difference,topic_state)
values ('${sys_time}','${kafka_topic}','${consumer_group_id}','${topic_offset}','${consumer_group_offset}','${difference}','${topic_state}');"
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spring Cloud를 사용한 기능적 Kafka - 1부지금까지 찾을 수 없었던 Spring Cloud Kafka의 작업 데모를 만들기 위해 이 기사를 정리했습니다. Confluent 스키마 레지스트리 7.1.0 이 기사는 먼저 Spring Cloud Stream을 사용...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.