KAFKA 소비자 이상이 일으킨 사고

문제 설명:


온라인에 서버 한 대가 너무 느려서 서버에 있는kafkabroker를 닫았습니다.닫은 후에 일부 kafkaconsumer가 데이터를 정상적으로 소비할 수 없음을 발견했습니다. 로그 오류: o.a.kakfa.clients.consumer.internals.AbstractCordinator Marking the coordinator (39.0.2.100) as dead.

원인:


배열 결과 consumer group 정보: (kafka.coordinator.GroupMetadataMessageFormatter 유형): groupId: [groupId, Some(consumer), groupState, Map(memberId -> [memberId,clientId,clientHost,sessionTimeoutMs],...-> []],KAFKA 내부 topic: __consumer_offsets 에 저장된 키는groupId입니다.브로커 매개 변수 offsets를 동시에 발견합니다.topic.replication.factor가 1로 잘못 설정되었습니다.이 매개변수는 TOPIC: __Consumer_offsets 의 복제본 수를 나타냅니다.이렇게 하면 어떤 브로커가 닫히면, 닫힌 브로커가 __Consumer_offsets의 일부 파티션의 리더입니다.일부 소비자 그룹을 사용할 수 없게 됩니다.브로커가 시작되면 명령줄을 통해 복사본 수를 수동으로 확장해야 합니다.
reassignment.json:
{"version":1,
 "partitions": [{"topic": "xxx", "partition": 0, "replicas": {brokerId1, brokerId2}}]
}
kafka-reassign-partitions  --zookeeper localhost:2818 --reassignment-json-file  reassignment.json --execute

클라이언트가 Consumer Coordinator를 찾는 과정: 클라이언트 org.apache.kafka.clients.consumer.internals.AbstractCoordinator가 알 수 없는 경우(AbstractCoordinator.coordinatorUnknown(), lookupCoordinator를 요청하여 부하가 가장 낮은 노드에 FindCoordinatorRequest 보내기
서버 KafkaApis.handleFindCoordinatorRequest 수신 요청: 먼저 GroupMetaManager를 호출합니다.partitionFor(consumerGroupId)consunerGroupId의hashCode는 __consumer_offsets 전체 분할 모드에서 partition id를 가져와 __consumer_offset 이 Topic에서 partition에 대응하는 Partition Metadata를 찾고 해당하는 Partition leader를 가져와 클라이언트에게 되돌려줍니다.

생각을 끌어내다


KAFKA의 failover 메커니즘은 도대체 어떤 것입니까?만약__consumer_offset에 정확한 사본수를 설정했다면 재선거의 과정은 어떠한가.만약 브로커가 다운된 후에 일부 복사본을 사용할 수 없게 된다면, 복사본은 자동으로 다른 노드로 이동합니까?이 질문과 함께 KAFKA 관련 코드를 살짝 읽었습니다.
Broker가 꺼지면 두 가지 동작이 있습니다: Kafka Controller.onBrokerFailure ->KafkaController.onReplicasBecomeOffline은 주로 PartitionStateMachine을 통해 이루어집니다.handleStateChanges 메서드는 Partition 상태기에서 상태를 오프라인으로 설정한다고 알려줍니다.ReplicaStateMachine.handleStateChanges 메서드는 Replica 상태를 OfflineReplica로 수정하고 partition ISR도 수정합니다.브라커가 파티션 리더로 닫히면 파티션 리더 선거를 다시 촉발하고 마지막으로 Leader AndIsrRequest를 보내서 최신 Leader ISR 정보를 가져와야 합니다.KafkaController.unregisterBrokerModificationsHandler는 등록된 BrokerModificationsHandler를 취소하고 zookeeper에서 Broker 이벤트의 감청을 취소합니다.
ISR 요청이 전송되면 KafkaApis.handleLeaderAndIsrRequest()가 호출됩니다.여기에서 만약에 리더의 파티션을 변경해야 한다면 __consumer_offset라는 특수한 topic에 속하고 현재의 브로커 노드가 파티션 리더인지에 달려 있다.GroupCoordinator를 각각 호출합니다.handleGroupImmigration과 GroupCoordinator.handleGroupEmmigration. 파티션 리더라면 GroupCoordinator.handleGroupImmigration -> GroupMetadataManager.loadGroupsForPartition은 __consumer_offset 에서 그룹 데이터를 로컬 metadata cache로 다시 읽습니다. 파티션 팔로워라면 그룹Coordniator입니다.handleGroupImigration -> GroupMetadataManager.removeGroupsForPartition은 metadata cache에서 그룹 정보를 제거합니다.그리고 onGroupUnloaded 콜백 함수에서 그룹의 상태를 dead로 변경합니다.조인이나sync를 기다리는 모든 그룹 구성원에게 통지합니다.
KAFKA는 Broker가 종료될 때 자동으로 파티션 복사본을 마이그레이션하지 않습니다.이 때 닫힌 Broker의 복사본은under replicated 상태가 됩니다.이 상태는 Broker가 다시 당겨지고 새로운 데이터를 따라잡거나 사용자가 명령줄을 통해 다른 노드로 수동으로 복사할 때까지 지속됩니다.
정부는gracefulshutdown을 보장하기 위해 두 개의 파라미터를 설정할 것을 건의합니다.controlled.shutdown.enable=true auto.leader.rebalance.enable=true 전자는 꺼지기 전에 로그 데이터를 디스크에 동기화하고 재선거합니다.후자는 브로커가 다시 회복된 후에 다시 다운되기 전 리더 상태를 보장합니다.리더의 분배가 고르지 않아 읽기와 쓰기의 핫이슈를 초래하는 것을 피하다.

Reference


https://blog.csdn.net/zhanglh046/article/details/72833129https://blog.csdn.net/huochen1994/article/details/80511038https://www.jianshu.com/p/1aba6e226763

좋은 웹페이지 즐겨찾기