Kafka의 Hello World 프로젝트

5618 단어 KafkaHelloWorld

Kafka 환경



Kafka 1.1/HDF 3.2
JDK 8

Files





pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>k.helloworld</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>
</project>

log4j.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=OFF, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.apache.kafka=ERROR

package com.zzeng;

import java.util.Properties;
import java.util.Scanner;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaProducerExample {
    private final static String TOPIC_NAME = "topic_name";

    public static void main(String[] args) {
        // produce a test message
        // if u run this multiple times ... u will have multiple messages in the
        // test_topic topic (as would be expected)
        Producer<String, String> producer = KafkaProducerExample.createProducer();

        Scanner sc = new Scanner(System.in);
        try {
            while (true) {
                System.out.print("> ");
                String text = sc.nextLine();

                ProducerRecord<String, String> recordToSend = new ProducerRecord<String, String>(TOPIC_NAME, "message",
                        text + " , timeInMillis=" + System.currentTimeMillis());
                try {
                    // synchronous send.... get() waits for the computation to
                    // finish
                    RecordMetadata rmd = producer.send(recordToSend).get();
                    System.out.printf("Message Sent ==>> topic = %s, partition = %s, offset = %d\n", rmd.topic(),
                            rmd.partition(), rmd.offset());
                } catch (Exception ex) {
                    // this is test code...so don't judge me !!
                    ex.printStackTrace();
                }

                if (text.equalsIgnoreCase("exit")) {
                    break;
                }
            }
        } finally {
            sc.close();
        }
    }

    private static Producer<String, String> createProducer() {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "zzeng-hdp-1.field.hortonworks.com:6667");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>(kafkaProps);
    }

}


실행





좋은 웹페이지 즐겨찾기