spark 는 kafka-appender 지정 로 그 를 통 해 kafka 로 출력 하여 발생 하 는 잠 금 문제 입 니 다.

log4j을 사용 한 kafka-appender 수집spark작업 실행 로 그 를 사용 할 때yarn에 제출 된 작업 이 시종ACCEPTED상태 로RUNNING상태 에 들 어 갈 수 없 으 며,두 번 다시 시도 한 후 시간 을 초과 합 니 다.초기 에는 Yrn 자원 부족 으로 인 한 것 으로 생각 했 으 나,Yrn 자원 이 충분 하 다 는 것 을 확인 할 때 문 제 는 여전 하 며,기본적으로 안정 적 으로 복 원 될 수 있 었 다.
처음에는 spark 로 그 를 kafka 로 출력 하도록 설정 하 였 습 니 다:

log4j.rootCategory=INFO, console, kafka
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m%n

# Kafka appender
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# Set Kafka topic and brokerList
log4j.appender.kafka.topic=yarn_spark_log
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=10
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m
여 기 는org.apache.kafka.log4jappender.KafkaLog4jAppender기본 값 으로 모든 로 그 를 kafka 에 출력 합 니 다.이 appender 는 kafka 에 의 해 공식 적 으로 유지 되 었 고 안정성 이 보장 되 어야 합 니 다.
문제 포 지 셔 닝
문 제 를 발견 하면 kafka 에 출력 하 는 규칙 을 제거 하고 문제 가 해 제 됩 니 다!그래서 문 제 를 로 그 를 kafka 로 출력 하 는 것 과 관련 이 있 습 니 다.다른 테스트 를 통 해 목표 카 프 카 가 사실 정상 이라는 것 을 증명 하 는 것 은 매우 이상 하 다.
Yrn 의 ResourceManager 로 그 를 보 니 다음 과 같은 시간 초과 가 있 습 니 다.
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1578970174552_3204_000002 Timed out after 600 secs
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Updating application attempt appattempt_1578970174552_3204_000002 with final
 state: FAILED, and exit status: -1000
2020-05-07 21:49:48,231 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1578970174552_3204_000002 State change from LAUNCHED to FINAL_SAV
ING on event = EXPIRE
Yrn 자체 가 임 무 를 받 았 음 을 나타 내 지만 작업 이 늦게 시작 되 지 않 았 음 을 발견 합 니 다.spark 장면 에서 드라이버 만 작 동 했 지만 executor 를 시작 하지 않 았 다 는 뜻 입 니 다.
driver 로 그 를 보 니 로그 출력 이 한 곳 에 걸 려 서 계속 하지 않 습 니 다.성공 적 으로 실행 되 고 끊 긴 상황 을 비교 해 보면 로그 가 이 항목 에 걸 려 있 습 니 다.
2020/05/07 19:37:10.324 INFO SecurityManager: Changing view acls to: yarn,root
2020/05/07 19:37:10.344 INFO Metadata: Cluster ID: 6iG6WHA2SoK7FfgGgWHt_A
걸 리 면SecurityManager줄 만 치고Metadata줄 은 못 친다.
추측Metadata이 줄 은kafka-client자체 에서 나 온 것 입 니 다.전체 문맥 은 Yrn,spark,kafka-client 만 이 로 그 를 칠 수 있 기 때 문 입 니 다.
kafka-client 2.2.0 버 전에 서 이 로 그 를 찾 으 면 출력 위치 입 니 다:

public synchronized void update(MetadataResponse metadataResponse, long now) {
  ...

  String newClusterId = cache.cluster().clusterResource().clusterId();
  if (!Objects.equals(previousClusterId, newClusterId)) {
    log.info("Cluster ID: {}", newClusterId);
  }
  ...
}
synchronized을 보고 자물쇠 가 의 심 스 러 웠 다.그래서 고려 용jstack분석:
Yrn 에서 spark 작업 을 실행 할 때 driver 프로 세 스 는 applicationMaster 라 고 하고 executor 프로 세 스 는 CoarseGrained Executor Backend 라 고 합 니 다.여기 서 먼저 재현 과정 에서 drvier 가 최종 적 으로 어느 노드 에서 실행 되 는 지 찾 은 다음 jstack-F인쇄 스 택 을 빠르게 사용 합 니 다.
jstack 역시 대망 을 저 버 리 지 않 고 자물쇠 보고!여기 제 가 결 과 를 다 붙 일 게 요.

[root@node1 ~]# jstack 20136
20136: Unable to open socket file: target process not responding or HotSpot VM not loaded
The -F option can be used when the target process is not responding
[root@node1 ~]# jstack -F 20136
Attaching to process ID 20136, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.231-b11
Deadlock Detection:

Found one Java-level deadlock:
=============================

"kafka-producer-network-thread | producer-1":
 waiting to lock Monitor@0x00000000025fcc48 (Object@0x00000000ed680b60, a org/apache/kafka/log4jappender/KafkaLog4jAppender),
 which is held by "main"
"main":
 waiting to lock Monitor@0x00007fec9dbde038 (Object@0x00000000ee44de38, a org/apache/kafka/clients/Metadata),
 which is held by "kafka-producer-network-thread | producer-1"

Found a total of 1 deadlock.

Thread 20157: (state = BLOCKED)
 - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=0, line=231 (Interpreted frame)
 - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
 - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String, java.lang.Object) @bci=34, line=324 (Interpreted frame)
 - org.apache.kafka.clients.Metadata.update(org.apache.kafka.common.requests.MetadataResponse, long) @bci=317, line=365 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(org.apache.kafka.common.requests.RequestHeader, long, org.apache.kafka.common.requests.MetadataResponse) @bci=184, line=1031 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient.handleCompletedReceives(java.util.List, long) @bci=215, line=822 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient.poll(long, long) @bci=132, line=544 (Interpreted frame)
 - org.apache.kafka.clients.producer.internals.Sender.run(long) @bci=227, line=311 (Interpreted frame)
 - org.apache.kafka.clients.producer.internals.Sender.run() @bci=28, line=235 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)


Thread 20150: (state = BLOCKED)


Thread 20149: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.ref.ReferenceQueue.remove(long) @bci=59, line=144 (Interpreted frame)
 - java.lang.ref.ReferenceQueue.remove() @bci=2, line=165 (Interpreted frame)
 - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=216 (Interpreted frame)


Thread 20148: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=502 (Interpreted frame)
 - java.lang.ref.Reference.tryHandlePending(boolean) @bci=54, line=191 (Interpreted frame)
 - java.lang.ref.Reference$ReferenceHandler.run() @bci=1, line=153 (Interpreted frame)


Thread 20137: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - org.apache.kafka.clients.Metadata.awaitUpdate(int, long) @bci=63, line=261 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(java.lang.String, java.lang.Integer, long) @bci=160, line=983 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=19, line=860 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=12, line=840 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord) @bci=3, line=727 (Interpreted frame)
 - org.apache.kafka.log4jappender.KafkaLog4jAppender.append(org.apache.log4j.spi.LoggingEvent) @bci=69, line=283 (Interpreted frame)
 - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=106, line=251 (Interpreted frame)
 - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
 - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String) @bci=12, line=305 (Interpreted frame)
 - org.apache.spark.internal.Logging$class.logInfo(org.apache.spark.internal.Logging, scala.Function0) @bci=29, line=54 (Interpreted frame)
 - org.apache.spark.SecurityManager.logInfo(scala.Function0) @bci=2, line=44 (Interpreted frame)
 - org.apache.spark.SecurityManager.setViewAcls(scala.collection.immutable.Set, java.lang.String) @bci=36, line=139 (Interpreted frame)
 - org.apache.spark.SecurityManager.<init>(org.apache.spark.SparkConf, scala.Option) @bci=158, line=81 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster.<init>(org.apache.spark.deploy.yarn.ApplicationMasterArguments) @bci=85, line=70 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster$.main(java.lang.String[]) @bci=25, line=802 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster.main(java.lang.String[]) @bci=4 (Interpreted frame)
여기까지 잠 금 이 확 인 돼 driver 가 시작 하 자마자 운행 이 정체 되면 당연히 executor 를 제출 할 수 없습니다.
구체 적 인 자 물 쇠 는 잠시 후에 분석 하고 어떻게 해결 할 지 먼저 고려 해 보 자.감성 적 인 인식 으로 볼 때 kafka-client 로그 도 kafka 로 출력 하지 못 하 게 하면 되 는 것 같 습 니 다.실험 후,역시 이와 같다.org.apache.spark 로그 만 출력 하면 정상적으로 실 행 될 수 있다.
원인 분석
stack 의 결 과 를 보면 자물쇠 가 잠 긴 것 은 다음 과 같은 두 개의 스 레 드 입 니 다.
  • kafka-client 내부 의 네트워크 스 레 드 spark
  • 메 인 입구 스 레 드
  • 두 스 레 드 는 모두 로그 에 걸 려 있 습 니 다.스 택 을 관찰 하면 두 스 레 드 가 같은 log 대상 을 동시에 가지 고 있 음 을 알 수 있 습 니 다.이 로그 대상 은 사실상 kafka-appender 입 니 다.한편,kafka-appender 는 본질 적 으로 kafka-client 와 그 내부 의 Metadata 대상 을 가지 고 있다.log4j 의 doAppend 는 스 레 드 안전 을 위해synchronized로 수식 했다.
    
    public
     synchronized 
     void doAppend(LoggingEvent event) {
      if(closed) {
       LogLog.error("Attempted to append to closed appender named ["+name+"].");
       return;
      }
      
      if(!isAsSevereAsThreshold(event.level)) {
       return;
      }
    
      Filter f = this.headFilter;
      
      FILTER_LOOP:
      while(f != null) {
       switch(f.decide(event)) {
       case Filter.DENY: return;
       case Filter.ACCEPT: break FILTER_LOOP;
       case Filter.NEUTRAL: f = f.next;
       }
      }
      
      this.append(event);  
     }
    그래서 일이 시작 되 었 다.
  • main 스 레 드 에서 로 그 를 시도 합 니 다.먼저 synchronized 의 doAppend 에 들 어 갔 습 니 다.즉,kafka-appender의 잠 금
  • 을 가 져 왔 습 니 다.
  • kafka-appender내부 에서 kafka-client 로 그 를 kafka 로 보 내야 합 니 다.최종 적 으로Thread 20137에 호출 되 어 보 여 줍 니 다.Metadata.awaitUpdate(synchronized 방법 이기 도 합 니 다)로 실행 되 며 내부 wait 는 metadata 의 자 물 쇠 를 가 져 오 려 고 시도 합 니 다.(상세 하 게 보다https://github.com/apache/kaf...)
  • 그러나 이때 kafka-producer-network-thread 스 레 드 는 위 에서 언급 한 타Cluster ID로그 의 이 단계(update 방법 도 synchronized)에 들 어 갔다.즉,kafka-producer-network-thread 스 레 드 는 metadata 대상 의 잠 금
  • 을 얻 었 다.
  • kafka-producer-network-thread 스 레 드 는 로 그 를 인쇄 하려 면 synchronized 의 doAppend 를 실행 합 니 다.즉,kafka-appender의 잠 금
  • 을 가 져 왔 습 니 다.

    위의 그림 main-thread 는 log 대상 자 물 쇠 를 가지 고 있 으 며,metadata 대상 자 물 쇠 를 가 져 오 라 고 요구 합 니 다.kafka-producer-network-thread 는 metadata 대상 자 물 쇠 를 가지 고 있 으 며,log 대상 자 물 쇠 를 가 져 오 라 고 요구 하여 잠 금 이 되 었 습 니 다.
    총결산
    여기 서 spark 가 kafka-appender 지정 로 그 를 통 해 kafka 로 출력 되 는 잠 금 에 관 한 글 을 소개 합 니 다.더 많은 spark 지정 로그 출력 내용 은 우리 의 이전 글 을 검색 하거나 아래 의 관련 글 을 계속 찾 아 보 세 요.앞으로 많은 응원 부 탁 드 리 겠 습 니 다!

    좋은 웹페이지 즐겨찾기