Kafka [2] Producer
⚙️프로젝트 소스 가져오기
데브원영님 강의 코드에서 소스를 다운받아서 시작하면 편하다.
👊실습 시작
✅simple-kafka-producer
public class SimpleProducer {
private static String TOPIC_NAME = "test";
private static String BOOTSTRAP_SERVERS = "ip:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
public class SimpleProducer {
private static String TOPIC_NAME = "test";
private static String BOOTSTRAP_SERVERS = "ip:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
프로젝트에 소스를 다음과 같이 수정하고 테스트해보자.
configs
에 설정 내용을 key : value
형태로 넣고 있는데 Producer는 내용을 입력하는 입장이기 때문에 StringSerializer를 통해 Consumer에서 데이터를 받았을 때 깨지지 않도록 설정 해놓는 것이다.
프로젝트를 실행시키면
다음과 같이 콘솔창에서도 실행되며 컨슈머에도 동일한 내용이 출력된다.
👊kafka-producer-key-value
public class ProducerWithKeyValue {
private static String TOPIC_NAME = "test";
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
ProducerRecord를 생성할 때 매개변수를 3개를 준다면 key:value
형태로 생성되는 생성자를 확인할 수 있다.
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --property print.key=true --property key.separator="-"
컨슈머의 실행 명령어도 다음과 같이 key value를 구분할 수 있게 다음과 같이 실행해주어야 한다.
실행 결과를 보면 컨슈머에는 idx후에 컨슈머에 설정한 옵션으로 인해 -
가 추가되어 메세지가 전달된다.
여기서 컨슈머를 key value 옵션으로 적용했지만 key값을 만들지 않고 메세지를 보내게되면 key를 null이 default로 입력되어 전달된다.
👊kafka-producer-exact-partition
public class ProducerExactParition {
private static String TOPIC_NAME = "test";
private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
private static int PARTITION_NUMBER = 1;
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, Integer.toString(index), data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
위의 소스와 동일하지만 파티션 넘버를 지정해준다면 특정 파티션에 데이터를 넣을 수 있다.
key를 굳이 안쓰겠다면 key를 null로 해주면 된다.
Author And Source
이 문제에 관하여(Kafka [2] Producer), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@ililil9482/Kafka-2-Producer저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)