Kafka producer 엔 드 개발 코드 인 스 턴 스
producer 는 사용자 가 producer 를 시작 하 는 스 레 드 를 사용 하여 보 낼 메 시 지 를 Producer Record 류 인 스 턴 스 에 패키지 한 다음 직렬 화 한 후 paritioner 에 보 내 고 후자 가 대상 파 티 션 을 확인 한 후 producer 프로그램 에 있 는 메모리 버퍼 에 함께 보 냅 니 다.한편,producer 의 다른 스 레 드(Sender 스 레 드)는 이 버퍼 에서 준비 되 어 있 는 메 시 지 를 실시 간 으로 추출 하여 일괄(batch)로 보 내 고 해당 하 는 broker 에 게 통일 적 으로 보 냅 니 다.구체 적 인 절 차 는 다음 그림 과 같 습 니 다.
2.producer 예제 프로그램 개발
먼저 kafka 관련 의존 도 를 도입 하고 pom.xml 파일 에 다음 과 같은 의존 도 를 추가 합 니 다.
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.2.0</version>
</dependency>
resources 아래 에 kafka-producer.properties 설정 파일 을 만 들 고 kafka 인 자 를 설정 하 는 데 사용 합 니 다.내용 은 다음 과 같 습 니 다.
bootstrap.servers=192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=-1
retries=3
batch.size=323840
linger.ms=10
buffer.memory=33554432
max.block.ms=3000
그 중에서 앞의 세 개의 매개 변 수 는 명확 하 게 지정 해 야 합 니 다.이 세 개의 매개 변 수 는 기본 값 이 없 기 때 문 입 니 다(주:kafka 의 producer 매개 변수 설정 은 참고 할 수 있 습 니 다http://kafka.apache.org/documentation/.그리고 producer 가 메 시 지 를 보 내 는 코드 를 작성 해 야 합 니 다.
/**
* Kafka
* @throws IOException
*/
public void sendMsg() throws IOException {
//1. properties
Properties properties = new Properties();
FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties");
properties.load(fileInputStream);
fileInputStream.close();
//2. kafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 100; i++) {
//3. producerRecord , topic, key value
ProducerRecord testTopic = new ProducerRecord("testTopic", Integer.toString(i), Integer.toString(i));
//4. kafkaProducer send
producer.send(testTopic);
}
//5. kafkaProducer
producer.close();
}
그리고 kafka 가 있 는 서버 에 로그 인하 여 다음 명령 을 실행 하여 정 보 를 감청 합 니 다.cd /usr/local/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic testTopic --from-beginning
sendmsg 방법 을 실행 하고 소비 단 을 주의 깊 게 관찰 합 니 다.
0-99 사이 의 숫자 가 순서대로 소비 되 는 것 을 볼 수 있 는데 이것 은 메시지 가 성공 적 으로 발송 되 었 다 는 것 을 의미한다.
3.비동기 와 동기 화 메시지 보 내기
위 에서 메 시 지 를 보 낸 예제 프로그램 에 서 는 발송 결 과 를 처리 하지 않 았 으 며,메시지 발송 에 실패 하면 알 수 없 으 며,이러한 방법 은 실제 응용 에 서 는 추천 하지 않 습 니 다.실제 사용 장면 에서 보통 비동기 와 동기 화 두 가지 흔 한 전송 방식 을 사용한다.자바 버 전 producer 의 send 방법 은 Future 대상 을 되 돌려 줍 니 다.Future.get()방법 을 호출 하면 결 과 를 되 돌려 주 기 를 무한 기다 리 고 동기 화 효 과 를 실현 합 니 다.그렇지 않 으 면 비동기 전송 입 니 다.
1.비동기 메시지 보 내기
자바 버 전 producer 의 send()방법 은 비동기 전송 과 전송 결과 에 대한 응답 을 실현 하기 위해 리 셋 파 라 메 터 를 제공 합 니 다.구체 적 인 코드 는 다음 과 같 습 니 다.
/**
*
*
* @throws IOException
*/
public void sendMsg() throws IOException {
//1. properties
Properties properties = new Properties();
FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties");
properties.load(fileInputStream);
fileInputStream.close();
//2. kafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 100; i++) {
//3. producerRecord , topic, key value
ProducerRecord testTopic = new ProducerRecord("testTopic", Integer.toString(i), Integer.toString(i));
//4. kafkaProducer send , Callback
producer.send(testTopic, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (null == exception) {
//
System.out.println(" ");
} else {
//
System.out.println(" ");
}
}
});
}
//5. kafkaProducer
producer.close();
}
상기 코드 에서 send 방법 두 번 째 매개 변 수 는 익명 내부 클래스 대상 에 전달 되 고 org.apache.kafka.clients.producer.Callback 인 터 페 이 스 를 실현 하 는 클래스 대상 에 전 달 될 수 있 습 니 다.동시에 onComplete 방법의 두 개의 입 참 recordMetadata 와 exception 은 동시에 비어 있 지 않 습 니 다.메시지 발송 에 성공 하면 exception 은 null 이 고 메시지 발송 에 실패 한 후 recordMetadata 는 null 입 니 다.따라서 두 개의 입 참 에 따라 성공 과 실패 논리 적 처 리 를 할 수 있다.그 다음으로 Kafka 가 메 시 지 를 보 내 는 데 실패 한 유형 은 두 가지 가 있 습 니 다.이상 을 다시 시도 할 수 있 고 이상 을 다시 시도 할 수 없습니다.모든 재 시도 가능 이상 은 org.apache.kafka.comon.errors.RetriableException 추상 류 에 계승 되 어 있 습 니 다.이론 적 으로 RetriableException 류 를 계승 하 는 다른 이상 은 재 시도 할 수 없 는 이상 에 속 합 니 다.이 를 감안 하여 메시지 발송 에 실패 한 후에 다시 시도 할 수 있 는 지 여부 에 따라 서로 다른 처리 논리 처 리 를 할 수 있 습 니 다.
//4. kafkaProducer send
producer.send(testTopic, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (null == exception) {
//
System.out.println(" ");
} else {
if(exception instanceof RetriableException){
//
System.out.println(" ");
}else{
//
System.out.println(" ");
}
}
}
});
2.메시지 동기 화동기 화 전송 과 비동기 전송 은 자바 의 Futrue 를 통 해 구 분 됩 니 다.Future.get()을 호출 하여 결과 가 돌아 오 기 를 무한 대기 합 니 다.즉,동기 화 전송 결 과 를 실현 합 니 다.구체 적 인 코드 는 다음 과 같 습 니 다.
//
Future future = producer.send(testTopic);
try {
// get ,
future.get();
} catch (Exception e) {
System.out.println(" ");
}
4.기타 고급 특성1.메시지 분할 메커니즘
kafka producer 는 지정 한 topic 의 어느 파 티 션 에 메 시 지 를 보 낼 지 확인 하기 위해 파 티 션 정책 과 파 티 션 기(paritioner)를 제공 합 니 다.기본 파 티 션 기 는 murmur 2 알고리즘 에 따라 메시지 key 의 해시 값 을 계산 한 다음 에 총 파 티 션 모델 에 대해 메 시 지 를 보 낼 대상 의 지역 번 호 를 확인 합 니 다(이 점 은 redis 클 러 스 터 에서 key 값 의 배분 방법 을 생각 나 게 합 니 다).그러면 같은 key 의 메 시 지 를 같은 파 티 션 으로 보 낼 수 있 도록 합 니 다.메시지 에 key 값 이 없 으 면,topic 의 모든 파 티 션 에 메시지 가 고 르 게 분 배 될 수 있 도록 폴 링 방식 을 사용 합 니 다.
kafka 의 기본 파 티 션 체 제 를 사용 하 는 것 외 에 도 org.apache.kafka.clients.producer.Partitioner 인 터 페 이 스 를 통 해 파 티 션 기 를 사용자 정의 할 수 있 습 니 다.이 때 는 Kafka Producer 를 구성 하 는 properties 에 paritioner.class 를 추가 하여 파 티 션 기 실현 류 를 알려 야 합 니 다.예 를 들 어 paritioner.class=com.demo.service.customer Partitioner.
2.메시지 직렬 화
이 편 에서 시 작 된 producer 예제 프로그램 에 서 는 Kafka Producer 대상 을 구성 할 때 두 개의 설정 항목 이 있 습 니 다.
ByteArray Serializer:본질 적 으로 아무것도 하지 않 습 니 다.네트워크 에서 전송 하 는 것 은 바이트 로 전송 되 기 때 문 입 니 다.
ByteBufferSerializer:ByteBuffer 메시지 정렬;
BytesSerializer:kafka 사용자 정의 Bytes 형식 을 직렬 화 합 니 다.
IntegerSerializer:직렬 화 Integer 형식;
DoubleSerializer:직렬 화 Double 형식;
LongSerializer:직렬 화 긴 유형;
직렬 화 기 를 사용자 정의 하려 면 org.apache.kafka.comon.serialization.Serializer 인 터 페 이 스 를 실현 하고 key.serializer 와 value.serializer 를 사용자 정의 직렬 화 기로 설정 해 야 합 니 다.
3.메시지 압축
메시지 압축 은 디스크 의 점용 과 대역 폭 의 점용 을 현저히 낮 추어 I/O 밀집 형 응용 성능 을 효과적으로 향상 시 킬 수 있 으 나 압축 을 도입 하 는 동시에 추가 적 인 CPU 를 소모 할 수 있 기 때문에 압축 은 I/O 성능 과 CPU 자원 의 균형 이다.kafka 는 현재 3 가지 압축 알고리즘 을 지원 합 니 다:CZIP,Snappy,LZ4.성능 테스트 결과 세 가지 압축 알고리즘 의 성능 은 다음 과 같 습 니 다:LZ4>>Snappy>GZIP.현재 LZ4 를 사용 하여 메시지 압축 을 하 는 producer 의 스루풋 이 가장 높 습 니 다.
기본적으로 Kafka 는 메 시 지 를 압축 하지 않 지만 KafkaProducer 대상 을 만 들 때 producer 엔 드 파라미터 copression.type 을 설정 하여 메시지 압축 을 열 수 있 습 니 다.예 를 들 어 copression.type=LZ4 를 설정 합 니 다.그럼 압축 은 언제 켜 나 요?먼저 압축 을 사용 할 지 여 부 를 판단 하 는 근 거 는 I/O 자원 소모 와 CPU 자원 소모 의 대비 이다.만약 에 환경 적 으로 I/O 자원 이 매우 부족 하 다 면 producer 프로그램 이 대량의 네트워크 대역 폭 이나 broker 엔 드 의 디스크 점용 률 이 높 고 producer 엔 드 의 CPU 자원 이 매우 부유 하 다 면 producer 에 압축 을 여 는 것 을 고려 할 수 있다.
4.메시지 없 음 설정 분실
Kafka Producer.send()방법 으로 메 시 지 를 보 낼 때 사실은 메 시 지 를 버퍼 에 넣 고 하나의 전속 I/O 스 레 드 가 버퍼 에서 메 시 지 를 추출 하여 batch 에 밀봉 한 다음 에 보 냅 니 다.I/O 스 레 드 가 메 시 지 를 보 내기 전에 producer 가 무 너 지면 모든 메 시 지 를 잃 어 버 립 니 다.또한,다 중 메시지 전송 시 네트워크 디 더 링 으로 인해 메시지 가 혼 란 스 러 워 지 는 문제 가 존재 합 니 다.이 두 가지 문 제 를 해결 하기 위해 서 는 producer 단 과 broker 단 에서 설정 하여 피 할 수 있 습 니 다.
4.1 producer 엔 드 설정
max.block.ms=3000:block 을 설정 하 는 시간 이 길 고 버퍼 가 채 워 지 거나 metadata 가 잃 어 버 렸 을 때 block 이 발생 하여 새로운 메 시 지 를 받 지 않 습 니 다.
acks=all:모든 follower 가 메 시 지 를 보 내 는 데 성공 했다 고 응답 하 기 를 기다 리 고 있 습 니 다.
retries=Integer.MAX_VALUE:재 시도 횟수 를 설정 하고 비교적 큰 값 을 설정 하면 메 시 지 를 잃 어 버 리 지 않도록 할 수 있 습 니 다.
max.in.flight.requests.per.connection=1:producer 가 단일 broker 연결 에서 보 낼 수 있 는 응답 하지 않 은 요청 의 수량 을 제한 하여 같은 topic 와 통 일 된 파 티 션 에서 메시지 난 서 문 제 를 방지 합 니 다.
상기 매개 변 수 를 설정 하 는 것 외 에 메 시 지 를 보 낼 때 는 가능 한 한 리 셋 매개 변 수 를 가 진 send 방법 으로 발송 결 과 를 처리 해 야 하 며,데이터 전송 에 실패 하면 Kafka Producer.close(0)방법 으로 즉시 producer 를 닫 아 메시지 의 난 서 를 방지 하 는 것 을 표시 합 니 다.
4.2 브로커 엔 드 설정
unclean.leader.election.enable=false:unclean leader 선 거 를 닫 습 니 다.즉,ISR 이 아 닌 사본 이 leader 로 선출 되 는 것 을 허락 하지 않 습 니 다.
replication.factor>=3:최소 3 개의 복사 본 으로 데 이 터 를 저장 합 니 다.
min.issync.replicas>1:어떤 메시지 가 적어도 ISR 에 몇 개의 복사 본 에 기록 되 어야 성공 할 수 있 는 지 제어 하고 producer 엔 드 acks 파라미터 가 all 또는-1 로 설정 되 어야 만 이 매개 변 수 는 유효 합 니 다.
마지막 으로 replication.factor>min.issync.replicas 를 확보 합 니 다.둘 이 같 으 면 복사 본 하나만 끊 으 면 파 티 션 이 작 동 하지 않 습 니 다.replication.factor=min.issync.replicas+1 설정 을 추천 합 니 다.
producer 엔 드 의 개발 에 대해 서 는 여기까지 소개 하고 다음 편 은 consumer 엔 드 의 개발 을 소개 합 니 다.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
마이크로서비스 프레임워크적인 것을 목표로(Apach Kafka편)최종적으로는 SpirngBootLayer + BusinessLogicLayer + RepositoryLayer 라고 하는 구성의 서비스를 만들어, BusinessLogicLayer 이외는 application.yml...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.