Kafka 클 러 스 터 설치 사용

17663 단어 kafka군집
Kafka 소개
Kafka 는 높 은 스루풋 의 분포 식 게시 구독 메시지 시스템 으로 다음 과 같은 특성 이 있 습 니 다.
  • O (1) 의 디스크 데이터 구 조 를 통 해 정 보 를 지속 적 으로 제공 하 는데 이런 구 조 는 TB 의 메시지 로 저장 되 더 라 도 장시간 안정 적 인 성능 을 유지 할 수 있다.
  • 높 은 스루풋: 아주 일반적인 하드웨어 카 프 카 라 도 초당 수 십 만 개의 소식 을 지원 할 수 있 습 니 다.
  • kafka 서버 와 소비 기 클 러 스 터 를 통 해 파 티 션 메 시 지 를 지원 합 니 다.
  • Hadoop 병렬 데이터 로드 를 지원 합 니 다.

  • Kafka 의 목적 은 게시 구독 솔 루 션 을 제공 하 는 것 으로 소비자 규모 의 사이트 의 모든 동작 흐름 데 이 터 를 처리 할 수 있다.이러한 동작 (홈 페이지 조회, 검색 및 기타 사용자 의 행동) 은 현대 인터넷 에서 의 많은 사회 적 기능 의 관건 적 인 요소 이다.이 데 이 터 는 보통 스루풋 의 요구 로 로그 와 로그 취 합 을 통 해 해결 된다.Hadoop 과 같은 로그 데이터 와 오프라인 분석 시스템 에 대해 서도 실시 간 처리 제한 을 요구 하 는 것 은 실행 가능 한 해결 방안 이다.
    kafka 는 Scala 로 작 성 된 것 입 니 다. scalac 컴 파일 러 로 원본 파일 을 자바 의 class 파일 (즉 JVM 에서 실행 되 는 바이트 코드) 로 컴 파일 합 니 다. 따라서 Scala 는 JVM 기반 언어 이기 때문에 Kafka 를 사용 하려 면 기계 상류 JVM 지원 이 필요 합 니 다. 본 고 는 jdk 버 전 을 jdk - 7u 75 로 사용 합 니 다. 3 대 를 예 로 들 면 Follower 다운 이 고 서버 2 대 에 접근 할 수 있 습 니 다.Zookeeper 의 데 이 터 는 여러 개의 복사 본 이 있 기 때문에 데 이 터 를 잃 어 버 리 지 않 습 니 다. 리더 가 다운 되면 Zookeeper 는 새로운 Leader 를 선출 합 니 다.왜 홀수 대인 지, 주 키 퍼 클 러 스 터 가 3 대 라면 1 대, 4 대 라면 역시 1 대 를 허용 하 는 지, 선거 알고리즘 이 '절반 이상' 을 요구 하기 때문에 한 대 를 더 내 는 것 은 의미 가 없다.
    배치 절차
    Kafka 는 zookeeper 를 가지 고 있 지만 보통 군집 에 zk 가 있 기 때문에 군집 에 있 는 zookeeper 를 사용 합 니 다.
  • 다운로드 kafka 2.11 - 0.10.0.0. tgz
  • 프로필 server. properties
  • ############################# Server Basics #############################
    #       broker.
    broker.id=1
    ############################# Socket Server Settings #############################
    #            ,   hostname -i      ,        127.0.0.1,producer        
    listeners=PLAINTEXT://172.23.8.144:9092
    #broker producers consumers        ,      ,  listeners   ,        
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    #           
    num.network.threads=3
    #     I/O    
    num.io.threads=8
    # socket server   buffer   (SO_SNDBUF) 
    socket.send.buffer.bytes=102400
    # socket server   buffer   (SO_RCVBUF)
    socket.receive.buffer.bytes=102400
    #       size,      oom
    socket.request.max.bytes=104857600
    ############################# Log Basics #############################
    #          ,           ,       /tmp
    log.dirs=/usr/local/services/kafka/kafka-logs
    #  topic  partitions   ,                  。
    num.partitions=2
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    #       ,      ,    
    log.retention.hours=168
    #          ,         
    log.segment.bytes=1073741824
    #                 ,    
    log.retention.check.interval.ms=300000
    ############################# Zookeeper #############################
    #Zookeeper     ,     ,    172.23.8.59:2181/kakfa       kafka   zk     
    zookeeper.connect=172.23.8.144:2181,172.23.8.179:2181,172.23.8.59:2181
    #   zk     
    zookeeper.connection.timeout.ms=6000

    주요 프로필 은 server. properties 입 니 다. producer 와 consumer 에 대해 서 는 각각 producer. properties 와 consumer. properties 가 있 지만, 일반적으로 따로 설정 하지 않 아 도 server. properties 에서 읽 을 수 있 습 니 다. 3. 각 노드 를 시작 하여 이 프로필 을 나 누 어 주 고 broker. id 와 listeners 주 소 를 수정 하여 해당 하 는 디 렉 터 리 를 만 듭 니 다.
    [root@slave1 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties
    [root@slave2 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties
    [root@slave3 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties

    - daemon 을 배경 에 놓 고 실행 합 니 다. 4. 성공 여 부 를 검증 합 니 다. my - test 라 는 주제 명 을 만 듭 니 다.
    [root@slave1 kafka]# bin/kafka-topics.sh --create --zookeeper 172.23.8.144:2181 --replication-factor 3 --partitions 1 --topic my-test
    Created topic "my-test".

    4.2. 메시지 발송, ctrl + c 종료
    [root@slave1 kafka]# bin/kafka-console-producer.sh --broker-list 172.23.8.144:9092 --topic my-test
           
    hello

    4.3 다른 기계 에서 의 소비 소식
    [root@slave2 kafka]# bin/kafka-console-consumer.sh --zookeeper slave3:2181 --from-beginning --topic my-test
           
    hello

    계속 메 시 지 를 보 내 면 소비자 단말기 에서 새로운 메시지 가 계속 나타 납 니 다. 이로써 kafka 클 러 스 터 구축 에 성 공 했 습 니 다. 5. Kafka HelloWord 는 kafka 매 뉴 얼 에서 자바 버 전의 producer 와 cousumer 코드 예제 를 보 여 주 었 습 니 다. 주 소 를 수정 하고 쉼표 를 분리 합 니 다. 이 주 소 는 클 러 스 터 의 부분 집합 으로 클 러 스 터 를 탐지 합 니 다. 5.1. Producer 코드 예제
        import java.util.Properties;
        import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.ProducerRecord;
        public class Producer {
            public static void main(String[] args) {
                Properties props = new Properties();
                props.put("bootstrap.servers",
                        "172.23.8.144:9092,172.23.8.179:9092,172.23.8.59:9092");//         ,      。
                props.put("acks", "all");//       ,             
                props.put("retries", 3);//          
                props.put("batch.size", 16384);// batch   
                props.put("linger.ms", 1);//                ,        ,                    ,     ,producer       1ms,       kafka        
                props.put("buffer.memory", 33554432);//             
                props.put("key.serializer",
                        "org.apache.kafka.common.serialization.StringSerializer");//       ,
                                                                                    // ByteArraySerializer  StringSerializer
                props.put("value.serializer",
                        "org.apache.kafka.common.serialization.StringSerializer");
                KafkaProducer producer = new KafkaProducer<>(props);
                for (int i = 0; i < 10000; i++) {
                    //        topic, key,value,send()    ,          ,   。
                    producer.send(new ProducerRecord("my-topic",
                            Integer.toString(i), Integer.toString(i)));
                }
                producer.close();
            }
        }
    

    5.2. Consumer 코드 예제
        import java.util.Arrays;
        import java.util.Properties;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.apache.kafka.clients.consumer.ConsumerRecords;
        import org.apache.kafka.clients.consumer.KafkaConsumer;
        public class Consumer {
            public static void main(String[] args) {
                Properties props = new Properties();
                props.put("bootstrap.servers",
                        "172.23.8.144:9092,172.23.8.179:9092,172.23.8.59:9092");//          ,      。
                props.put("group.id", "test");// cousumer   id
                props.put("enable.auto.commit", "true");//     offsets
                props.put("auto.commit.interval.ms", "1000");//   1s,    offsets
                props.put("session.timeout.ms", "30000");// Consumer          ,     Consumer    ,kafka             
                props.put("key.deserializer",
                        "org.apache.kafka.common.serialization.StringDeserializer");//      
                props.put("value.deserializer",
                        "org.apache.kafka.common.serialization.StringDeserializer");
                KafkaConsumer consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList("my-topic"));//    topic,    
                while (true) {
                    ConsumerRecords records = consumer.poll(100);
                    for (ConsumerRecord record : records) {
                        System.out.printf("offset = %d, key = %s, value = %s",
                                record.offset(), record.key(), record.value());
                        System.out.println();
                    }
                }
            }
        }

    5.3. 각각 실행 하면 됩 니 다. comsumer 가 메시지 로 그 를 인쇄 하 는 것 을 보 았 습 니 다.

    좋은 웹페이지 즐겨찾기