kafka 는 어떻게 백만 급 의 높 은 병발 과 낮은 지연 을 합 니까?
1.1 Kafka Reactor 모델 구조
Kafka 클 라 이언 트 와 서버 통신 은 NIO 의 reactor 모드 로 이벤트 구동 모드 입 니 다.그렇다면 흔히 볼 수 있 는 단일 스 레 드 Reactor 모드 에서 NIO 스 레 드 의 직책 은 어떤 것 이 있 습 니까?저 희 는 다음 과 같은 몇 가 지 를 정 리 했 습 니 다. 1. NIO 서버 로 서 클 라 이언 트 를 받 는 TCP 연결 2. NIO 클 라 이언 트 로 서 서버 에 TCP 연결 3 을 시작 하고 통신 대 측의 요청 이나 응답 메 시 지 를 읽 습 니 다. 4. 통신 대 단 에 메 시 지 를 보 내 거나 응답 메시지 이상 네 가지 대응 하 는 Reactor 모드 의 구조 도 는 다음 과 같 습 니 다.
일부 소 용량 의 업무 장면 에 대해 이런 단일 라인 의 모델 은 기본적으로 충분 하 다.그러나 높 은 부하, 큰 병발 의 응용 장면 에 적합 하지 않다. 주요 원인 은 다음 과 같다. 성능 문제 1: 하나의 NIO 스 레 드 가 수 십 만 심지어 백만 급 의 링크 성능 을 동시에 처리 하 는 것 은 지탱 할 수 없 는 성능 문제 이다.전체 시스템 이 사용 할 수 없 기 때문에 단일 고장 을 초래 하기 때문에 높 은 병행 처리 서 비 스 는 상기 구 조 를 최적화 하고 개조 해 야 한다. 예 를 들 어 처 리 는 다 중 스 레 드 모델 을 취하 고 수신 스 레 드 를 최대한 간소화 하 는 것 은 수신 스 레 드 를 하나의 접속 층 으로 하 는 것 과 같다.그럼 테마 카 프 카 로 돌아 가 는 reactor 모드 구 조 는 어 떨 까요?
위의 이 kafka 구조 도 에서 알 수 있 듯 이 다음 과 같은 몇 가지 절 차 를 포함한다. 1. 클 라 이언 트 가 NIO 의 커 넥 터 Acceptor 를 요청 하 는 동시에 사건 의 퍼 가기 기능 도 갖 추고 있 으 며 Processor 처리 2, 서버 네트워크 이벤트 프로세서 Processor 3, 요청 대기 열 RequestChannel 로 전송 하여 처리 해 야 할 모든 요청 정 보 를 저장 했다. 4. 요청 처리 스 레 드 풀(RequestHandler Pool)데 몬 스 레 드 윤 훈 RequestChannel 의 요청 처리 정 보 를 API 층 에 대응 하 는 프로세서 처리 5, API 층 프로세서 로 전송 하여 요청 처 리 를 완료 한 후 Response Queue 에 넣 고 Processor 에서 Response Queue 에서 꺼 내 대응 하 는 Client 로 보 내 는 데 주의해 야 할 점 은 Broker 층 에 여러 개의 Acceptor 가 포함 되 어 있 지만 kafka 의 reactor 모드 에 서 는 아직단일 스 레 드 Acceptor 다 중 스 레 드 handler 모드 입 니 다. 여기 있 는 여러 Acceptor 는 한 서버 아래 다 중 네트워크 카드 장면 을 대상 으로 하 는 것 입 니 다. 모든 EndPoint 는 하나의 네트워크 카드 로 하나의 ip 와 port 의 조합 에 대응 하고 모든 Endpoint 는 하나의 Acceptor 만 있 습 니 다.
1.2 Kafka Reactor 모델 소스 코드 상세 설명
위의 구조 도 에서 논술 한 몇 가지 절차 에 따라 kafka 안의 사건 수신, 처리, 응답 등 몇 단계 에 대응 합 니 다. 우 리 는 다음 과 같은 몇 단계 의 소스 코드 차원 에서 분석 하 겠 습 니 다. 1.2.1, Socketserver Socketserver 는 표준 적 인 NIO 서비스 단 에서 이 루어 집 니 다. 주로 다음 과 같은 변 수 를 포함 합 니 다. 1. RequestChannel: Processor 와 KafkaRequestHandler 데이터 교환 대기 열 2, Processors: processor 의 용기, processor 의 id 와 processor 대상 의 맵 3, Acceptors: acceptor 의 용기, EndPoint 와 acceptor 의 맵 4, ConnectionQuotas 를 저장 합 니 다. 링크 제한 기 는 각 IP 의 링크 수 에 대해 SocketServer 의 시작 절 차 를 다음 과 같이 제한 합 니 다.
부분 소스 코드 는 다음 과 같 습 니 다. 시작 입구:
def startup(startupProcessors: Boolean = true) {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
if (startupProcessors) {
startProcessors()
}
}
Acceptor 및 Proccessor 생 성 논리:
private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
processors.foreach { processor =>
KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor).start()
}
}
1.2.2、Acceptor Acceptor 는 NIO 의 경량급 접속 서비스 입 니 다. 주로 다음 과 같은 변 수 를 포함 합 니 다. 1. nioSelector: Java 의 NIO 네트워크 선택 기 2, server Channel: ip 과 포트 를 socket 3, Processors: processor 의 용기 에 연결 하고 processor 대상 의 주요 처리 절 차 는 다음 과 같 습 니 다. 1. nioSelector 를 OP ACCEPT 2 로 등록 하고 윤 훈 은 nioSelector 에서 사건 을 읽 습 니 다. 3.RR 모드 에서 processor 4 를 선택 하고 새로운 링크 설정 을 받 습 니 다 (server SocketChannel 에서 socketChannel 을 가 져 오고 속성 을 설정 합 니 다) 5. processor 의 accept 처리 중요 논리 코드 는 다음 과 같 습 니 다.
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
val processor = synchronized {
// RR Processor
currentProcessor = currentProcessor % processors.size
processors(currentProcessor)
}
accept(key, processor)
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread, mod(numProcessors) will be done later
currentProcessor = currentProcessor + 1
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
socketChannel 의 링크 설정 논리:
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
processor.accept(socketChannel)
}
1.2.3、Processor Processor 의 주요 직책 은 클 라 이언 트 로부터 온 네트워크 링크 요청 을 RequestContext 로 밀봉 하여 RequestChannel 에 보 내 는 것 입 니 다. handler 가 처리 한 응답 영수증 을 클 라 이언 트 에 보 내야 합 니 다. 주로 1. new Connections: 스 레 드 가 안전 한 대기 열 입 니 다. acceptor 에서 받 은 네트워크 새 링크 2, inflaghtResponses: 보 낸 클 라 이언 트 의 소리 입 니 다.응, 클 라 이언 트 와 의 링크 id (로 컬 ip, port 및 원 격 ip, port 및 추가 시퀀스 값 으로 구성) 와 응답 대상 의 맵 3, responseQueue: 차단 대기 열 입 니 다. handler 의 응답 요청 을 저장 합 니 다. 앞에서 사용 한 kafka reactor 모델 구조 도 를 개조 하면 다음 과 같은 proccessor 의 핵심 논리 구 조 를 얻 을 수 있 습 니 다.
그 핵심 논 리 는 다음 과 같은 몇 가지 절차 입 니 다. 1. proccessor 스 레 드 는 new Connections 에서 문의 하여 socketChannel 을 가 져 오고 selector 감청 사건 을 OP READ 로 수정 합 니 다. 2. processNewResponses 는 새로운 응답 수 요 를 처리 합 니 다. 그 중에서 SendAction 의 유형 은 클 라 이언 트 에 응답 을 보 내 고 보 낸 응답 을 inflaghtResponses 에 기록 하 는 것 입 니 다. 그 핵심 논 리 는 sendResponse 와 같은 것 입 니 다.다음:
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
val connectionId = response.request.context.connectionId
if (channel(connectionId).isEmpty) {
response.request.updateRequestMetrics(0L, response)
}
if (openOrClosingChannel(connectionId).isDefined) {
selector.send(responseSend)
inflightResponses += (connectionId -> response)
}
}
3. Selector 는 폴 이 클 라 이언 트 로부터 받 은 요청 정 보 를 호출 하고 받 은 NetworkReceive 를 completedReceives 캐 시 에 추가 합 니 다. 4. processComplete Receives 는 completedReceives 의 수신 정 보 를 처리 하 며 마지막 으로 RequestChannel. Request 로 봉 인 된 다음 requestChannel 을 호출 하여 전송 대기 열 (즉 requestQueue) 에 요청 을 추가 합 니 다.그 중에서 소스 코드 논 리 는 다음 과 같다.
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = RequestHeader.parse(receive.payload)
val context = new RequestContext(header, receive.source, channel.socketAddress,
channel.principal, listenerName, securityProtocol)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
requestChannel.sendRequest(req)
selector.mute(receive.source)
case None =>
throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
}
} catch {
case e: Throwable =>
processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
}
}
}
1.2.4 RequestChannel requestChannel 은 kafka 요청 과 응답 하 는 모든 퍼 가기 를 실 었 습 니 다. 다음 과 같은 두 가지 변 수 를 포함 합 니 다. 1. requestQueue: 잠 금 차단 대기 열, RequestChannel 전송 요청 과 응답 정 보 를 포함 하 는 중요 한 구성 요소 입 니 다. 위 에서 말 한 RequestChannel. Request 는 이 대기 열 에 넣 는 것 입 니 다. 2. Processors: processorid 와 processor 의 영상 을 저장 합 니 다.방사 관 계 는 주로 response 가 보 낼 때 그 중에서 대응 하 는 processor 를 선택 하 는 것 입 니 다. 두 가지 핵심 기능 은 요청 을 추가 하고 응답 영수증 을 보 내 는 것 입 니 다. 소스 코드 논 리 는 다음 과 같 습 니 다.
def sendRequest(request: RequestChannel.Request) {
requestQueue.put(request)
}
응답 영수증 을 보 내 는 것 은 이전 processor 와 약간 다 릅 니 다. 여 기 는 response 를 response Queue 에 추가 한 다음 에 processor 교대 훈련 에서 영수증 을 꺼 내 클 라 이언 트 에 보 냅 니 다.
def sendResponse(response: RequestChannel.Response) {
// log trace
val processor = processors.get(response.processor)
if (processor != null) {
processor.enqueueResponse(response)
}
}
1.2.5、KafkaRequestHandler KafkaRequestHandler 하면... ,우선 어떻게 생 겼 는 지 다시 한 번 이야기 해 보 세 요. KafkaRequestHandlerPool 에 의 해 만 들 어 졌 습 니 다. pool 은 kafkaServer 가 시 작 될 때 만 들 어 졌 습 니 다. 원본 코드 는 다음 과 같 습 니 다.
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i
자, KafkaRequestHandler 설명 끝 났 습 니 다. 생 성 과정, 그 다음은 처리 논리 입 니 다. 그 논 리 는 매우 간단 합 니 다. 절 차 는 다음 과 같 습 니 다. 1. requestChannel 에서 요청 2 를 끌 어 내 고 요청 유형 을 판단 합 니 다. Request 유형 이 라면 KafkaApis 를 호출 하여 해당 하 는 요청 을 처리 합 니 다.
1.3. 개선 과 최적화
이로써 우 리 는 kafka 의 reactor 모델 을 분석 하고 마지막 으로 발산 적 인 문 제 를 제기 했다. kafka 가 실현 한 이 reactor 모델 과 소스 코드 에 대한 분석 을 실현 했다. 만약 에 디자인 을 하 라 고 하면 성능 병목 이 존재 할 수 있 는 곳 이 어디 라 고 생각 하 십 니까? 여러분 은 아래 에 댓 글 을 달 아 당신 의 견 해 를 발표 할 수 있 습 니 다. 다음 기 에 저 를 이 문제 에 대해라 는 생각 을 나 눴 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
재 미 있 는 Ngin x 선 에 오 류 를 적어 주세요.두 서버 에 nginx 한 대 를 부하 균형 이 높 은 데 사용 할 수 있 도록 설정 하 였 으 나, nginx 를 통 해 데 이 터 를 요청 할 때, 매번 첫 번 째 서버 에 전 화 를 걸 때마다 오 류 를 보고 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.