Kafka [2] Producer

24266 단어 kafkakafka

⚙️프로젝트 소스 가져오기

데브원영님 강의 코드에서 소스를 다운받아서 시작하면 편하다.

👊실습 시작

✅simple-kafka-producer

public class SimpleProducer {
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "ip:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

프로젝트에 소스를 다음과 같이 수정하고 테스트해보자.

configs에 설정 내용을 key : value형태로 넣고 있는데 Producer는 내용을 입력하는 입장이기 때문에 StringSerializer를 통해 Consumer에서 데이터를 받았을 때 깨지지 않도록 설정 해놓는 것이다.

프로젝트를 실행시키면


다음과 같이 콘솔창에서도 실행되며 컨슈머에도 동일한 내용이 출력된다.

👊kafka-producer-key-value

public class ProducerWithKeyValue {
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

ProducerRecord를 생성할 때 매개변수를 3개를 준다면 key:value형태로 생성되는 생성자를 확인할 수 있다.

./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --property print.key=true --property key.separator="-"

컨슈머의 실행 명령어도 다음과 같이 key value를 구분할 수 있게 다음과 같이 실행해주어야 한다.

실행 결과를 보면 컨슈머에는 idx후에 컨슈머에 설정한 옵션으로 인해 -가 추가되어 메세지가 전달된다.

여기서 컨슈머를 key value 옵션으로 적용했지만 key값을 만들지 않고 메세지를 보내게되면 key를 null이 default로 입력되어 전달된다.

👊kafka-producer-exact-partition

public class ProducerExactParition {
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";
    private static int PARTITION_NUMBER = 1;

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, Integer.toString(index), data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

위의 소스와 동일하지만 파티션 넘버를 지정해준다면 특정 파티션에 데이터를 넣을 수 있다.

key를 굳이 안쓰겠다면 key를 null로 해주면 된다.

좋은 웹페이지 즐겨찾기