kafka 및storm-kafka 통합

9792 단어 hadoop#strom
kafka 노트
  • kafka는 분포식 메시지 캐시 시스템
  • 이다
  • kafka 집단의 서버를 브로커
  • 라고 부른다
  • kafka는 두 가지 클라이언트가 있는데 하나는 Producer(메시지 생산자), 하나는 consumer(메시지 소비자), 클라이언트와 브로커 서버 간에 tcp 프로토콜로 연결
  • kafka에서 서로 다른 업무 시스템의 메시지는 topic를 통해 구분할 수 있고 모든 메시지 topic는 메시지 읽기와 쓰기의 부하를 분담하기 위해 구분된다
  • 파티션당 여러 복제본으로 데이터 손실 방지
  • 어떤 구역의 데이터가 업데이트가 필요하면 이 구역의 모든 복사본의leader를 통해 업데이트해야 한다
  • 소비자는 그룹을 나눌 수 있다. 예를 들어 두 개의 소비자 그룹 A와 B가 있는데 하나의 topic:order 를 공동으로 소비한다.info, A와 B가 소비하는 정보는 중복되지 않습니다. 예를 들어orderinfo에는 100개의 메시지가 있는데 매 메시지마다 id가 있고 번호가 0-99입니다. 그러면 A조가 0-49호를 소비하면 B조는 50-99호
  • 를 소비합니다.
  • 소비자가 특정한 topic의 정보를 구체적으로 소비할 때 시작 편이량
  • 을 지정할 수 있다
    그룹 설치 1, 압축 해제 2, 서버 수정.properties
    broker.id=1
    zookeeper.connect=weekend05:2181,weekend06:2181,weekend07:2181

    3、zookeeper 그룹을 시작합니다 4、각 노드에서broker를 시작합니다
    bin/kafka-server-start.sh config/server.properties

    5,kafka 그룹에 topic 만들기
    bin/kafka-topics.sh --create --zookeeper weekend05:2181 --replication-factor 3 --partitions 1 --topic order

    6, 하나의 제품으로 어떤 topic에 메시지를 쓰기
    bin/kafka-console-producer.sh --broker-list weekend:9092 --topic order

    7、comsumer로 어떤 topic에서 정보를 읽는다
    bin/kafka-console-consumer.sh --zookeeper weekend05:2181 --from-beginning --topic order

    8、topic의 구역 및 던전 상태 정보 보기
    bin/kafka-topics.sh --describe --zookeeper weekend05:2181 --topic order

     
    kafka 소비자 그룹 구성 메커니즘
    kafka와storm 통합
    kafka 생산자
    package cn.itcast.kafka;
    
    
    import java.util.Properties;
    
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    
    public class ProducerDemo {
    public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put("zk.connect", "weekend01:2181,weekend02:2181,weekend03:2181");
    props.put("metadata.broker.list","weekend01:9092,weekend02:9092,weekend03:9092");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    ProducerConfig config = new ProducerConfig(props);
    Producer producer = new Producer(config);
    
    
    //       
    //               socket  
    for (int i = 1; i <= 100; i++) {
    Thread.sleep(500);
    producer.send(new KeyedMessage("wordcount",
    "i said i love you baby for" + i + "times,will you have a nice day with me tomorrow"));
    }
    
    
    }
    }

    kafka 소비자
    package cn.itcast.kafka;
    
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    
    public class ConsumerDemo {
    private static final String topic = "mysons";
    private static final Integer threads = 1;
    
    
    public static void main(String[] args) {
    
    Properties props = new Properties();
    props.put("zookeeper.connect", "weekend01:2181,weekend02:2181,weekend03:2181");
    props.put("group.id", "1111");
    props.put("auto.offset.reset", "smallest");
    
    
    ConsumerConfig config = new ConsumerConfig(props);
    ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
    Map topicCountMap = new HashMap();
    topicCountMap.put(topic, 1);
    topicCountMap.put("mygirls", 1);
    topicCountMap.put("myboys", 1);
    Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List> streams = consumerMap.get("mygirls");
    
    for(final KafkaStream kafkaStream : streams){
    new Thread(new Runnable() {
    @Override
    public void run() {
    for(MessageAndMetadata mm : kafkaStream){
    String msg = new String(mm.message());
    System.out.println(msg);
    }
    }
    
    }).start();
    
    }
    }
    }
    
    
    
    

    kafka-storm 통합
    spouts: 토폴로지 정보원
    package cn.itcast.storm.spout;
    
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    
    import backtype.storm.spout.Scheme;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    
    public class MessageScheme implements Scheme {
    
    private static final long serialVersionUID = 8423372426211017613L;
    
    
    @Override
    public List deserialize(byte[] bytes) {
    try {
    String msg = new String(bytes, "UTF-8");
    return new Values(msg); 
    } catch (UnsupportedEncodingException e) {
    e.printStackTrace();
    }
    return null;
    }
    
    
    @Override
    public Fields getOutputFields() {
    return new Fields("msg");
    }
    
    
    }

    bolts: 토폴로지의 처리 논리 단위
    package cn.itcast.storm.bolt;
    
    
    import org.apache.commons.lang.StringUtils;
    
    
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    
    public class WordSpliter extends BaseBasicBolt {
    
    
    private static final long serialVersionUID = -5653803832498574866L;
    
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
    String line = input.getString(0);
    String[] words = line.split(" ");
    for (String word : words) {
    word = word.trim();
    if (StringUtils.isNotBlank(word)) {
    word = word.toLowerCase();
    collector.emit(new Values(word));
    }
    }
    }
    
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
    
    
    }
    
    
    }
    package cn.itcast.storm.bolt;
    
    
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.Map;
    import java.util.UUID;
    
    
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Tuple;
    /**
     *        
     * @author [email protected]
     *
     */
    public class WriterBolt extends BaseBasicBolt {
    
    
    private static final long serialVersionUID = -6586283337287975719L;
    
    private FileWriter writer = null;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
    try {
    writer = new FileWriter("c:\\storm-kafka\\" + "wordcount"+UUID.randomUUID().toString());
    } catch (IOException e) {
    throw new RuntimeException(e);
    }
    }
    
    
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
    
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
    String s = input.getString(0);
    try {
    writer.write(s);
    writer.write("
    "); writer.flush(); } catch (IOException e) { throw new RuntimeException(e); } } }

    topology:
    package cn.itcast.storm.topology;
    
    
    import storm.kafka.BrokerHosts;
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.ZkHosts;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import cn.itcast.storm.bolt.WordSpliter;
    import cn.itcast.storm.bolt.WriterBolt;
    import cn.itcast.storm.spout.MessageScheme;
    
    
    public class KafkaTopo {
    
    
    public static void main(String[] args) throws Exception {
    
    String topic = "wordcount";
    String zkRoot = "/kafka-storm";
    String spoutId = "KafkaSpout";
    BrokerHosts brokerHosts = new ZkHosts("weekend01:2181,weekend02:2181,weekend03:2181"); 
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "wordcount", zkRoot, spoutId);
    spoutConfig.forceFromStart = true;
    spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
    TopologyBuilder builder = new TopologyBuilder();
    //    spout   kaflka                 bolt  ,    spout        ,  storm       KafkaSpout
    builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig));
    builder.setBolt("word-spilter", new WordSpliter()).shuffleGrouping(spoutId);
    builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("word-spilter", new Fields("word"));
    Config conf = new Config();
    conf.setNumWorkers(4);
    conf.setNumAckers(0);
    conf.setDebug(false);
    
    //LocalCluster   topology          ,      
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("WordCount", conf, builder.createTopology());
    
    //  topology storm     
    // StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());
    }
    
    
    }
    
    

    utils:
    package cn.itcast.storm.utils;
    
    
    import java.io.InputStream;
    import java.util.Properties;
    
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    
    /**
     *         
     */
    public class PropertyUtil {
    
    
    private static final Log log = LogFactory.getLog(PropertyUtil.class);
    private static Properties pros = new Properties();
    
    
    //       
    static {
    try {
    InputStream in = PropertyUtil.class.getClassLoader().getResourceAsStream("config.properties");
    pros.load(in);
    } catch (Exception e) {
    log.error("load configuration error", e);
    }
    }
    
    
    /**
    *           
    * @param key
    * @return
    */
    public static String getProperty(String key) {
    return pros.getProperty(key);
    }
    
    
    }

    config.properties:
    zkConnect=master:2181
    zkSessionTimeoutMs=30000
    zkConnectionTimeoutMs=30000
    zkSyncTimeMs=5000
    
    
    scheme=date,id,content
    separator=,
    target=date

    좋은 웹페이지 즐겨찾기