spark 는 kafka-appender 지정 로 그 를 통 해 kafka 로 출력 하여 발생 하 는 잠 금 문제 입 니 다.
log4j
을 사용 한 kafka-appender 수집spark
작업 실행 로 그 를 사용 할 때yarn
에 제출 된 작업 이 시종ACCEPTED
상태 로RUNNING
상태 에 들 어 갈 수 없 으 며,두 번 다시 시도 한 후 시간 을 초과 합 니 다.초기 에는 Yrn 자원 부족 으로 인 한 것 으로 생각 했 으 나,Yrn 자원 이 충분 하 다 는 것 을 확인 할 때 문 제 는 여전 하 며,기본적으로 안정 적 으로 복 원 될 수 있 었 다.처음에는 spark 로 그 를 kafka 로 출력 하도록 설정 하 였 습 니 다:
log4j.rootCategory=INFO, console, kafka
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m%n
# Kafka appender
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# Set Kafka topic and brokerList
log4j.appender.kafka.topic=yarn_spark_log
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=10
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m
여 기 는org.apache.kafka.log4jappender.KafkaLog4jAppender
기본 값 으로 모든 로 그 를 kafka 에 출력 합 니 다.이 appender 는 kafka 에 의 해 공식 적 으로 유지 되 었 고 안정성 이 보장 되 어야 합 니 다.문제 포 지 셔 닝
문 제 를 발견 하면 kafka 에 출력 하 는 규칙 을 제거 하고 문제 가 해 제 됩 니 다!그래서 문 제 를 로 그 를 kafka 로 출력 하 는 것 과 관련 이 있 습 니 다.다른 테스트 를 통 해 목표 카 프 카 가 사실 정상 이라는 것 을 증명 하 는 것 은 매우 이상 하 다.
Yrn 의 ResourceManager 로 그 를 보 니 다음 과 같은 시간 초과 가 있 습 니 다.
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1578970174552_3204_000002 Timed out after 600 secs
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Updating application attempt appattempt_1578970174552_3204_000002 with final
state: FAILED, and exit status: -1000
2020-05-07 21:49:48,231 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1578970174552_3204_000002 State change from LAUNCHED to FINAL_SAV
ING on event = EXPIRE
Yrn 자체 가 임 무 를 받 았 음 을 나타 내 지만 작업 이 늦게 시작 되 지 않 았 음 을 발견 합 니 다.spark 장면 에서 드라이버 만 작 동 했 지만 executor 를 시작 하지 않 았 다 는 뜻 입 니 다.
driver 로 그 를 보 니 로그 출력 이 한 곳 에 걸 려 서 계속 하지 않 습 니 다.성공 적 으로 실행 되 고 끊 긴 상황 을 비교 해 보면 로그 가 이 항목 에 걸 려 있 습 니 다.
2020/05/07 19:37:10.324 INFO SecurityManager: Changing view acls to: yarn,root
2020/05/07 19:37:10.344 INFO Metadata: Cluster ID: 6iG6WHA2SoK7FfgGgWHt_A
걸 리 면
SecurityManager
줄 만 치고Metadata
줄 은 못 친다.추측
Metadata
이 줄 은kafka-client
자체 에서 나 온 것 입 니 다.전체 문맥 은 Yrn,spark,kafka-client 만 이 로 그 를 칠 수 있 기 때 문 입 니 다.kafka-client 2.2.0 버 전에 서 이 로 그 를 찾 으 면 출력 위치 입 니 다:
public synchronized void update(MetadataResponse metadataResponse, long now) {
...
String newClusterId = cache.cluster().clusterResource().clusterId();
if (!Objects.equals(previousClusterId, newClusterId)) {
log.info("Cluster ID: {}", newClusterId);
}
...
}
synchronized
을 보고 자물쇠 가 의 심 스 러 웠 다.그래서 고려 용jstack
분석:Yrn 에서 spark 작업 을 실행 할 때 driver 프로 세 스 는 applicationMaster 라 고 하고 executor 프로 세 스 는 CoarseGrained Executor Backend 라 고 합 니 다.여기 서 먼저 재현 과정 에서 drvier 가 최종 적 으로 어느 노드 에서 실행 되 는 지 찾 은 다음 jstack-F
jstack 역시 대망 을 저 버 리 지 않 고 자물쇠 보고!여기 제 가 결 과 를 다 붙 일 게 요.
[root@node1 ~]# jstack 20136
20136: Unable to open socket file: target process not responding or HotSpot VM not loaded
The -F option can be used when the target process is not responding
[root@node1 ~]# jstack -F 20136
Attaching to process ID 20136, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.231-b11
Deadlock Detection:
Found one Java-level deadlock:
=============================
"kafka-producer-network-thread | producer-1":
waiting to lock Monitor@0x00000000025fcc48 (Object@0x00000000ed680b60, a org/apache/kafka/log4jappender/KafkaLog4jAppender),
which is held by "main"
"main":
waiting to lock Monitor@0x00007fec9dbde038 (Object@0x00000000ee44de38, a org/apache/kafka/clients/Metadata),
which is held by "kafka-producer-network-thread | producer-1"
Found a total of 1 deadlock.
Thread 20157: (state = BLOCKED)
- org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=0, line=231 (Interpreted frame)
- org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
- org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
- org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
- org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
- org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String, java.lang.Object) @bci=34, line=324 (Interpreted frame)
- org.apache.kafka.clients.Metadata.update(org.apache.kafka.common.requests.MetadataResponse, long) @bci=317, line=365 (Interpreted frame)
- org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(org.apache.kafka.common.requests.RequestHeader, long, org.apache.kafka.common.requests.MetadataResponse) @bci=184, line=1031 (Interpreted frame)
- org.apache.kafka.clients.NetworkClient.handleCompletedReceives(java.util.List, long) @bci=215, line=822 (Interpreted frame)
- org.apache.kafka.clients.NetworkClient.poll(long, long) @bci=132, line=544 (Interpreted frame)
- org.apache.kafka.clients.producer.internals.Sender.run(long) @bci=227, line=311 (Interpreted frame)
- org.apache.kafka.clients.producer.internals.Sender.run() @bci=28, line=235 (Interpreted frame)
- java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
Thread 20150: (state = BLOCKED)
Thread 20149: (state = BLOCKED)
- java.lang.Object.wait(long) @bci=0 (Interpreted frame)
- java.lang.ref.ReferenceQueue.remove(long) @bci=59, line=144 (Interpreted frame)
- java.lang.ref.ReferenceQueue.remove() @bci=2, line=165 (Interpreted frame)
- java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=216 (Interpreted frame)
Thread 20148: (state = BLOCKED)
- java.lang.Object.wait(long) @bci=0 (Interpreted frame)
- java.lang.Object.wait() @bci=2, line=502 (Interpreted frame)
- java.lang.ref.Reference.tryHandlePending(boolean) @bci=54, line=191 (Interpreted frame)
- java.lang.ref.Reference$ReferenceHandler.run() @bci=1, line=153 (Interpreted frame)
Thread 20137: (state = BLOCKED)
- java.lang.Object.wait(long) @bci=0 (Interpreted frame)
- org.apache.kafka.clients.Metadata.awaitUpdate(int, long) @bci=63, line=261 (Interpreted frame)
- org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(java.lang.String, java.lang.Integer, long) @bci=160, line=983 (Interpreted frame)
- org.apache.kafka.clients.producer.KafkaProducer.doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=19, line=860 (Interpreted frame)
- org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=12, line=840 (Interpreted frame)
- org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord) @bci=3, line=727 (Interpreted frame)
- org.apache.kafka.log4jappender.KafkaLog4jAppender.append(org.apache.log4j.spi.LoggingEvent) @bci=69, line=283 (Interpreted frame)
- org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=106, line=251 (Interpreted frame)
- org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
- org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
- org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
- org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
- org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String) @bci=12, line=305 (Interpreted frame)
- org.apache.spark.internal.Logging$class.logInfo(org.apache.spark.internal.Logging, scala.Function0) @bci=29, line=54 (Interpreted frame)
- org.apache.spark.SecurityManager.logInfo(scala.Function0) @bci=2, line=44 (Interpreted frame)
- org.apache.spark.SecurityManager.setViewAcls(scala.collection.immutable.Set, java.lang.String) @bci=36, line=139 (Interpreted frame)
- org.apache.spark.SecurityManager.<init>(org.apache.spark.SparkConf, scala.Option) @bci=158, line=81 (Interpreted frame)
- org.apache.spark.deploy.yarn.ApplicationMaster.<init>(org.apache.spark.deploy.yarn.ApplicationMasterArguments) @bci=85, line=70 (Interpreted frame)
- org.apache.spark.deploy.yarn.ApplicationMaster$.main(java.lang.String[]) @bci=25, line=802 (Interpreted frame)
- org.apache.spark.deploy.yarn.ApplicationMaster.main(java.lang.String[]) @bci=4 (Interpreted frame)
여기까지 잠 금 이 확 인 돼 driver 가 시작 하 자마자 운행 이 정체 되면 당연히 executor 를 제출 할 수 없습니다.구체 적 인 자 물 쇠 는 잠시 후에 분석 하고 어떻게 해결 할 지 먼저 고려 해 보 자.감성 적 인 인식 으로 볼 때 kafka-client 로그 도 kafka 로 출력 하지 못 하 게 하면 되 는 것 같 습 니 다.실험 후,역시 이와 같다.org.apache.spark 로그 만 출력 하면 정상적으로 실 행 될 수 있다.
원인 분석
stack 의 결 과 를 보면 자물쇠 가 잠 긴 것 은 다음 과 같은 두 개의 스 레 드 입 니 다.
synchronized
로 수식 했다.
public
synchronized
void doAppend(LoggingEvent event) {
if(closed) {
LogLog.error("Attempted to append to closed appender named ["+name+"].");
return;
}
if(!isAsSevereAsThreshold(event.level)) {
return;
}
Filter f = this.headFilter;
FILTER_LOOP:
while(f != null) {
switch(f.decide(event)) {
case Filter.DENY: return;
case Filter.ACCEPT: break FILTER_LOOP;
case Filter.NEUTRAL: f = f.next;
}
}
this.append(event);
}
그래서 일이 시작 되 었 다.kafka-appender
의 잠 금kafka-appender
내부 에서 kafka-client 로 그 를 kafka 로 보 내야 합 니 다.최종 적 으로Thread 20137
에 호출 되 어 보 여 줍 니 다.Metadata.awaitUpdate(synchronized 방법 이기 도 합 니 다)로 실행 되 며 내부 wait 는 metadata 의 자 물 쇠 를 가 져 오 려 고 시도 합 니 다.(상세 하 게 보다https://github.com/apache/kaf...) Cluster ID
로그 의 이 단계(update 방법 도 synchronized)에 들 어 갔다.즉,kafka-producer-network-thread 스 레 드 는 metadata 대상 의 잠 금kafka-appender
의 잠 금위의 그림 main-thread 는 log 대상 자 물 쇠 를 가지 고 있 으 며,metadata 대상 자 물 쇠 를 가 져 오 라 고 요구 합 니 다.kafka-producer-network-thread 는 metadata 대상 자 물 쇠 를 가지 고 있 으 며,log 대상 자 물 쇠 를 가 져 오 라 고 요구 하여 잠 금 이 되 었 습 니 다.
총결산
여기 서 spark 가 kafka-appender 지정 로 그 를 통 해 kafka 로 출력 되 는 잠 금 에 관 한 글 을 소개 합 니 다.더 많은 spark 지정 로그 출력 내용 은 우리 의 이전 글 을 검색 하거나 아래 의 관련 글 을 계속 찾 아 보 세 요.앞으로 많은 응원 부 탁 드 리 겠 습 니 다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spring Cloud를 사용한 기능적 Kafka - 1부지금까지 찾을 수 없었던 Spring Cloud Kafka의 작업 데모를 만들기 위해 이 기사를 정리했습니다. Confluent 스키마 레지스트리 7.1.0 이 기사는 먼저 Spring Cloud Stream을 사용...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.