kafka 는 어떻게 백만 급 의 높 은 병발 과 낮은 지연 을 합 니까?

Kafka 는 높 은 삼투 저 지연 의 높 은 병발, 고성능 의 메시지 미들웨어 로 빅 데이터 분야 에서 매우 광범 위 하 게 활용 된다.좋 은 Kafka 군집 을 설정 하면 초당 몇 십 만, 수백 만 의 초고 속 동시 다발 로 기록 할 수 있 습 니 다.카 프 카 는 도대체 어떻게 이렇게 높 은 스루풋 과 성능 을 만 들 었 을 까?우 리 는 오늘 kafka 의 server 단 에 들 어가 서 그것 의 Reactor 고 병발 네트워크 모델 체 제 를 탐구 해 보 았 다.
1.1 Kafka Reactor 모델 구조
Kafka 클 라 이언 트 와 서버 통신 은 NIO 의 reactor 모드 로 이벤트 구동 모드 입 니 다.그렇다면 흔히 볼 수 있 는 단일 스 레 드 Reactor 모드 에서 NIO 스 레 드 의 직책 은 어떤 것 이 있 습 니까?저 희 는 다음 과 같은 몇 가 지 를 정 리 했 습 니 다. 1. NIO 서버 로 서 클 라 이언 트 를 받 는 TCP 연결 2. NIO 클 라 이언 트 로 서 서버 에 TCP 연결 3 을 시작 하고 통신 대 측의 요청 이나 응답 메 시 지 를 읽 습 니 다. 4. 통신 대 단 에 메 시 지 를 보 내 거나 응답 메시지 이상 네 가지 대응 하 는 Reactor 모드 의 구조 도 는 다음 과 같 습 니 다.
일부 소 용량 의 업무 장면 에 대해 이런 단일 라인 의 모델 은 기본적으로 충분 하 다.그러나 높 은 부하, 큰 병발 의 응용 장면 에 적합 하지 않다. 주요 원인 은 다음 과 같다. 성능 문제 1: 하나의 NIO 스 레 드 가 수 십 만 심지어 백만 급 의 링크 성능 을 동시에 처리 하 는 것 은 지탱 할 수 없 는 성능 문제 이다.전체 시스템 이 사용 할 수 없 기 때문에 단일 고장 을 초래 하기 때문에 높 은 병행 처리 서 비 스 는 상기 구 조 를 최적화 하고 개조 해 야 한다. 예 를 들 어 처 리 는 다 중 스 레 드 모델 을 취하 고 수신 스 레 드 를 최대한 간소화 하 는 것 은 수신 스 레 드 를 하나의 접속 층 으로 하 는 것 과 같다.그럼 테마 카 프 카 로 돌아 가 는 reactor 모드 구 조 는 어 떨 까요?
위의 이 kafka 구조 도 에서 알 수 있 듯 이 다음 과 같은 몇 가지 절 차 를 포함한다. 1. 클 라 이언 트 가 NIO 의 커 넥 터 Acceptor 를 요청 하 는 동시에 사건 의 퍼 가기 기능 도 갖 추고 있 으 며 Processor 처리 2, 서버 네트워크 이벤트 프로세서 Processor 3, 요청 대기 열 RequestChannel 로 전송 하여 처리 해 야 할 모든 요청 정 보 를 저장 했다. 4. 요청 처리 스 레 드 풀(RequestHandler Pool)데 몬 스 레 드 윤 훈 RequestChannel 의 요청 처리 정 보 를 API 층 에 대응 하 는 프로세서 처리 5, API 층 프로세서 로 전송 하여 요청 처 리 를 완료 한 후 Response Queue 에 넣 고 Processor 에서 Response Queue 에서 꺼 내 대응 하 는 Client 로 보 내 는 데 주의해 야 할 점 은 Broker 층 에 여러 개의 Acceptor 가 포함 되 어 있 지만 kafka 의 reactor 모드 에 서 는 아직단일 스 레 드 Acceptor 다 중 스 레 드 handler 모드 입 니 다. 여기 있 는 여러 Acceptor 는 한 서버 아래 다 중 네트워크 카드 장면 을 대상 으로 하 는 것 입 니 다. 모든 EndPoint 는 하나의 네트워크 카드 로 하나의 ip 와 port 의 조합 에 대응 하고 모든 Endpoint 는 하나의 Acceptor 만 있 습 니 다.
1.2 Kafka Reactor 모델 소스 코드 상세 설명
위의 구조 도 에서 논술 한 몇 가지 절차 에 따라 kafka 안의 사건 수신, 처리, 응답 등 몇 단계 에 대응 합 니 다. 우 리 는 다음 과 같은 몇 단계 의 소스 코드 차원 에서 분석 하 겠 습 니 다. 1.2.1, Socketserver Socketserver 는 표준 적 인 NIO 서비스 단 에서 이 루어 집 니 다. 주로 다음 과 같은 변 수 를 포함 합 니 다. 1. RequestChannel: Processor 와 KafkaRequestHandler 데이터 교환 대기 열 2, Processors: processor 의 용기, processor 의 id 와 processor 대상 의 맵 3, Acceptors: acceptor 의 용기, EndPoint 와 acceptor 의 맵 4, ConnectionQuotas 를 저장 합 니 다. 링크 제한 기 는 각 IP 의 링크 수 에 대해 SocketServer 의 시작 절 차 를 다음 과 같이 제한 합 니 다.
부분 소스 코드 는 다음 과 같 습 니 다. 시작 입구:
 def startup(startupProcessors: Boolean = true) {
    this.synchronized {
      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
      createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
      if (startupProcessors) {
        startProcessors()
      }
}

Acceptor 및 Proccessor 생 성 논리:

     private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
    processors.foreach { processor =>
      KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
        processor).start()
    }
  }

1.2.2、Acceptor Acceptor 는 NIO 의 경량급 접속 서비스 입 니 다. 주로 다음 과 같은 변 수 를 포함 합 니 다. 1. nioSelector: Java 의 NIO 네트워크 선택 기 2, server Channel: ip 과 포트 를 socket 3, Processors: processor 의 용기 에 연결 하고 processor 대상 의 주요 처리 절 차 는 다음 과 같 습 니 다. 1. nioSelector 를 OP ACCEPT 2 로 등록 하고 윤 훈 은 nioSelector 에서 사건 을 읽 습 니 다. 3.RR 모드 에서 processor 4 를 선택 하고 새로운 링크 설정 을 받 습 니 다 (server SocketChannel 에서 socketChannel 을 가 져 오고 속성 을 설정 합 니 다) 5. processor 의 accept 처리 중요 논리 코드 는 다음 과 같 습 니 다.
def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable) {
                  val processor = synchronized {
//  RR  Processor
                    currentProcessor = currentProcessor % processors.size
                    processors(currentProcessor)
                  }
                  accept(key, processor)
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread, mod(numProcessors) will be done later
                currentProcessor = currentProcessor + 1
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }

socketChannel 의 링크 설정 논리:
def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)

      processor.accept(socketChannel)
    } 

1.2.3、Processor Processor 의 주요 직책 은 클 라 이언 트 로부터 온 네트워크 링크 요청 을 RequestContext 로 밀봉 하여 RequestChannel 에 보 내 는 것 입 니 다. handler 가 처리 한 응답 영수증 을 클 라 이언 트 에 보 내야 합 니 다. 주로 1. new Connections: 스 레 드 가 안전 한 대기 열 입 니 다. acceptor 에서 받 은 네트워크 새 링크 2, inflaghtResponses: 보 낸 클 라 이언 트 의 소리 입 니 다.응, 클 라 이언 트 와 의 링크 id (로 컬 ip, port 및 원 격 ip, port 및 추가 시퀀스 값 으로 구성) 와 응답 대상 의 맵 3, responseQueue: 차단 대기 열 입 니 다. handler 의 응답 요청 을 저장 합 니 다. 앞에서 사용 한 kafka reactor 모델 구조 도 를 개조 하면 다음 과 같은 proccessor 의 핵심 논리 구 조 를 얻 을 수 있 습 니 다.
그 핵심 논 리 는 다음 과 같은 몇 가지 절차 입 니 다. 1. proccessor 스 레 드 는 new Connections 에서 문의 하여 socketChannel 을 가 져 오고 selector 감청 사건 을 OP READ 로 수정 합 니 다. 2. processNewResponses 는 새로운 응답 수 요 를 처리 합 니 다. 그 중에서 SendAction 의 유형 은 클 라 이언 트 에 응답 을 보 내 고 보 낸 응답 을 inflaghtResponses 에 기록 하 는 것 입 니 다. 그 핵심 논 리 는 sendResponse 와 같은 것 입 니 다.다음:
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
    val connectionId = response.request.context.connectionId
    if (channel(connectionId).isEmpty) {
      response.request.updateRequestMetrics(0L, response)
    }
    if (openOrClosingChannel(connectionId).isDefined) {
      selector.send(responseSend)
      inflightResponses += (connectionId -> response)
    }
  }

3. Selector 는 폴 이 클 라 이언 트 로부터 받 은 요청 정 보 를 호출 하고 받 은 NetworkReceive 를 completedReceives 캐 시 에 추가 합 니 다. 4. processComplete Receives 는 completedReceives 의 수신 정 보 를 처리 하 며 마지막 으로 RequestChannel. Request 로 봉 인 된 다음 requestChannel 을 호출 하여 전송 대기 열 (즉 requestQueue) 에 요청 을 추가 합 니 다.그 중에서 소스 코드 논 리 는 다음 과 같다.
private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        openOrClosingChannel(receive.source) match {
          case Some(channel) =>
            val header = RequestHeader.parse(receive.payload)
            val context = new RequestContext(header, receive.source, channel.socketAddress,
              channel.principal, listenerName, securityProtocol)
            val req = new RequestChannel.Request(processor = id, context = context,
              startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
            requestChannel.sendRequest(req)
            selector.mute(receive.source)
          case None =>
            throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
        }
      } catch {
        case e: Throwable =>
          processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
      }
    }
  }

1.2.4 RequestChannel requestChannel 은 kafka 요청 과 응답 하 는 모든 퍼 가기 를 실 었 습 니 다. 다음 과 같은 두 가지 변 수 를 포함 합 니 다. 1. requestQueue: 잠 금 차단 대기 열, RequestChannel 전송 요청 과 응답 정 보 를 포함 하 는 중요 한 구성 요소 입 니 다. 위 에서 말 한 RequestChannel. Request 는 이 대기 열 에 넣 는 것 입 니 다. 2. Processors: processorid 와 processor 의 영상 을 저장 합 니 다.방사 관 계 는 주로 response 가 보 낼 때 그 중에서 대응 하 는 processor 를 선택 하 는 것 입 니 다. 두 가지 핵심 기능 은 요청 을 추가 하고 응답 영수증 을 보 내 는 것 입 니 다. 소스 코드 논 리 는 다음 과 같 습 니 다.
def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

응답 영수증 을 보 내 는 것 은 이전 processor 와 약간 다 릅 니 다. 여 기 는 response 를 response Queue 에 추가 한 다음 에 processor 교대 훈련 에서 영수증 을 꺼 내 클 라 이언 트 에 보 냅 니 다.
def sendResponse(response: RequestChannel.Response) {
   //  log trace
    val processor = processors.get(response.processor)
    if (processor != null) {
      processor.enqueueResponse(response)
    }
  }

1.2.5、KafkaRequestHandler KafkaRequestHandler 하면... ,우선 어떻게 생 겼 는 지 다시 한 번 이야기 해 보 세 요. KafkaRequestHandlerPool 에 의 해 만 들 어 졌 습 니 다. pool 은 kafkaServer 가 시 작 될 때 만 들 어 졌 습 니 다. 원본 코드 는 다음 과 같 습 니 다.
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i 

자, KafkaRequestHandler 설명 끝 났 습 니 다. 생 성 과정, 그 다음은 처리 논리 입 니 다. 그 논 리 는 매우 간단 합 니 다. 절 차 는 다음 과 같 습 니 다. 1. requestChannel 에서 요청 2 를 끌 어 내 고 요청 유형 을 판단 합 니 다. Request 유형 이 라면 KafkaApis 를 호출 하여 해당 하 는 요청 을 처리 합 니 다.
1.3. 개선 과 최적화
이로써 우 리 는 kafka 의 reactor 모델 을 분석 하고 마지막 으로 발산 적 인 문 제 를 제기 했다. kafka 가 실현 한 이 reactor 모델 과 소스 코드 에 대한 분석 을 실현 했다. 만약 에 디자인 을 하 라 고 하면 성능 병목 이 존재 할 수 있 는 곳 이 어디 라 고 생각 하 십 니까? 여러분 은 아래 에 댓 글 을 달 아 당신 의 견 해 를 발표 할 수 있 습 니 다. 다음 기 에 저 를 이 문제 에 대해라 는 생각 을 나 눴 다.

좋은 웹페이지 즐겨찾기