kafka 홈페이지 예제 설명 -- KafkaConsumer

22900 단어 hadoop

Kafka client는 kafka cluster의 기록을 소비합니다.


이것은 Kafka 집단의 서버 고장을 투명하게 처리하고 집단 내에서 이동하는 데이터 구역에 투명하게 적응할 것이다.이 클라이언트는 또한 서버와 상호작용을 하여 사용자 그룹이 소비자 그룹을 사용하여 부하 균형 소비를 할 수 있도록 한다.


소비자는 데이터를 얻기 위해 TCP를 필요한 에이전트에 연결합니다.사용 후 꺼지지 않으면 소비자들이 이 연결을 누설할 수 있다.소비자는 라인이 안전한 것이 아니다.더 많은 세부 사항은 다중 스레드 처리를 보십시오.

오프셋


Kafka는 파티션의 각 레코드에 대해 수치 오프셋을 유지합니다.이 편이량은 이 구역에 기록된 유일한 표지부호를 충당하고, 이 구역에 있는 소비자의 위치를 표시한다.즉, 5번 위치를 가진 소비자는 오프셋 값이 0에서 4인 기록을 사용하고 오프셋 5를 사용하여 다음 기록을 기록한다.실제로 사용자의 사용자와 관련된 위치는 두 가지 개념이 있다.
소비자의 위치는 다음 기록의 편이량을 제공할 것이다.그것은 소비자들이 이 구역에서 본 최고 편이량보다 하나가 클 것이다.그것은 소비자가 데이터를 수신할 때마다poll(long)을 호출하고 메시지를 수신할 때 자동으로 전진한다.
커밋된 위치는 안전하게 저장된 마지막 오프셋입니다.프로세스가 실패하고 다시 시작하면 프로세스가 복원될 오프셋으로 복원됩니다.소비자는 정기적으로 자동으로 보상을 제출할 수 있다.또는commitSync를 호출하여 이 제출 위치를 수동으로 제어할 수 있습니다. 제출 과정에서 보상이나 치명적인 오류가 성공적으로 제출되었거나 막히지 않는commitAsync를 제출할 때까지,OffsetCommitCallback을 터치하거나, 제출에 성공하거나 실패할 때까지 막힐 수 있습니다.

컨슈머 그룹 및 구독 주제


Kafka는 소비자 그룹(Consumer Groups)의 개념을 사용하여 하나의 프로세스 탱크로 소비와 기록을 구분하고 처리하는 작업을 허용했다.이러한 프로세스는 같은 기계에서 실행될 수 있거나, 많은 기계에 분포되어 처리에 추가적인 신축성과 용착을 제공할 수 있다.
모든 Kafka 소비자는 하나의 소비자 단체를 설정할 수 있습니다. 이것은 속하고 주제가 구독 목록 (목록, Consumer Rebalance Listener) 을 통해 구독하거나 구독을 통해 모든 주제가 특정한 패턴 (패턴, Consumer Rebalance Listener) 과 일치하도록 동적 설정할 수 있습니다.Kafka는 구독 테마의 모든 메시지를 사용자 그룹의 한 프로세스에서 전달합니다.이것은 각 그룹의 소비자 과정에서 주제의 구분을 균형 있게 함으로써 실현된 것이다.따라서 만약에 네 개의 구역이 있는 주제와 두 개의 프로세스가 있는 소비자 그룹이 있다면 각 프로세스는 두 개의 구역에서 소모될 것이다.이 그룹의 구성원은 동적 유지보수입니다. 프로세스가 실패하면 그 그룹에 분배된 구역은 같은 그룹의 다른 프로세스에 다시 분배되고, 새 프로세스가 이 그룹에 가입하면 구역은 기존 소비자에서 이 새 프로세스로 옮겨집니다.
따라서 만약에 두 프로세스가 한 주제에 가입하여 서로 다른 그룹을 지정한다면 이 주제에서 모든 기록을 얻을 것이다.만약 그들이 모두 같은 그룹을 지정한다면, 그들은 대략 절반의 기록을 얻을 것이다.개념적으로 말하자면, 당신은 소비자 그룹을 여러 프로세스로 구성된 단일 논리 구독자로 볼 수 있습니다.다중 사용자 시스템으로서 카프카는 중복 데이터가 없는 상황에서 주어진 주제에 임의의 수량을 제공하는 사용자 그룹을 자연히 지원한다.이것은 정보 전달 시스템에서 흔히 볼 수 있는 기능에 대한 경미한 개괄이다.
전통적인 메시지 전달 시스템의 대기열과 비슷한 의미를 얻기 위해 모든 절차는 단일 소비자 그룹의 일부이기 때문에 기록 교부는 그룹에서 대기열과 같다.전통적인 메시지 전달 시스템과 달리 당신은 여러 개의 이런 그룹을 가질 수 있습니다.전통적인 메시지 전달 시스템에서의pub-sub와 유사한 의미를 얻으려면 모든 프로세스는 자신의 소비자 그룹이 있기 때문에 모든 프로세스는 이 주제에 대한 모든 기록을 구독하여 발표할 것이다.
또한 자동 그룹 할당 시 소비자는 Consumer Rebalance Listener를 통해 알림할 수 있다. 이것은 청소 등 필요한 응용 프로그램 수준의 논리적 상태를 완성하고 수동으로 제출을 상쇄할 수 있다. (주의, 상쇄는 항상 주어진 소비자 단체에 대한 약속) 등등의 상세한 정보는 저장 상쇄 외부 카프카를 참고하여 소비자에게 수동으로 구역을 지정하여 분배할 수 있다. (목록) 이 동적 구역 분배를 사용하지 않도록 한다.

사용 예


1. Offset 자동 확인

Properties props = new Properties();
/*  kakfa  , broker  */
props.put("bootstrap.servers", "localhost:9092");
/*  consumer group */
props.put("group.id", "test");
/*  offset */
props.put("enable.auto.commit", "true");
/*  offset  */
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
/* key  */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* value  */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 /*  consumer */
KafkaConsumer consumer = new KafkaConsumer<>(props);
/*  topic,   */
consumer.subscribe(Arrays.asList("foo", "bar"));

 /*  , 100ms */
while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

설명: 1.bootstrap.서버는 kafka의 연결 입구를 대표할 뿐, 집단 중의 어떤 브로커만 지정하면 된다.  2. 일단consumer와kakfa집단이 연결을 맺으면consumer는 심장박동으로 고속집단으로 자신이 살아있다면session.timeout.ms 내 심장 박동이 서버에 도착하지 않았습니다. 서버는 심장 박동이 분실되면 리밸런스를 할 것이라고 생각합니다.

2. Offset 수동 제어


만약consumer가 데이터를 얻은 후에 처리를 추가해야 한다면, 데이터가 끝난 후에야offset을 확인하고, 프로그램이 offset의 확인을 제어해야 합니다.밤 들기: consumer가 데이터를 얻은 후 데이터를 DB에 오래 보관해야 한다.offset을 자동으로 확인하는 상황에서 데이터가 kafka 집단에서 읽히면 확인하지만 지속화 과정에 실패하면 데이터가 손실됩니다.오프셋을 제어하는 확인이 필요합니다.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
/*   */
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    /*  , DB, offset */
    if (buffer.size() >= minBatchSize) {
        insertIntoDb(buffer);
        consumer.commitSync();
        buffer.clear();
    }
}

또한 구체적인 구역별 오프셋 데이터에 대한 확인을 세밀하게 제어할 수 있다.
try {
    while(running) {
        ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
        for (TopicPartition partition : records.partitions()) {
            List> partitionRecords = records.records(partition);
            for (ConsumerRecord record : partitionRecords) {
                System.out.println(record.offset() + ": " + record.value());
            }
            /*  offset */
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
} finally {
  consumer.close();
}

설명: 확인된 오프셋은 수용된 데이터의 최대 오프셋 +1입니다.

3. 분할 구독


특정 구역에 메시지를 구독할 수 있습니다.하지만 파티션의 부하 분담을 잃게 됩니다.몇 가지 장면이 이렇게 놀 수 있다.이 컴퓨터의 디스크의 섹션 데이터만 가져오기;  2. 프로그램 자신이나 외부 프로그램은 부하와 오류 처리를 스스로 실현할 수 있다.예를 들어 YARN/Mesos의 개입은consumer가 끊긴 후에consumer를 시작합니다.
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

설명: 1.이 경우 consumer Group을 사용해도 부하 균형이 맞지 않습니다.  2. topic의 구독과 섹션 구독은 같은consumer에서 혼용할 수 없습니다.

4. 외부 저장 오프셋


소비자들은 카프카의 오프셋 저장 위치를 사용자 정의할 수 있다.이 디자인의 주요 목적은 소비자들이 데이터와 오프셋을 원자적으로 저장하도록 하는 것이다.이렇게 하면 위에서 언급한 중복 소비 문제를 피할 수 있다.밤 들기 설명: 특정 구역을 구독합니다.얻은 기록을 저장할 때, 모든 기록의 오프셋을 함께 저장합니다.데이터와 오프셋의 저장이 원자적임을 보증합니다.비동기 저장소가 이상하게 끊겼을 때 이미 저장된 데이터는 상응하는 오프셋 기록이 있다.이런 방식은 데이터가 분실되지 않고 반복적으로 서버에서 읽히지 않을 것을 보장할 수 있다.어떻게 설정 실현: 1.오프셋 자동 확인: enable.auto.commit=false;  2. ConsumerRecord에서 offset을 가져와 저장하기;  3. Consumer가 리셋할 때 seek(TopicPartition,long)를 호출하여 서버에 있는 소비 기록을 리셋합니다.
만약 소비 구역도 사용자 정의라면, 이런 방식은 사용하기에 매우 시원할 것이다.만약 구역이 자동으로 분배된다면, 구역이reblance가 발생할 때, 잘 고려해야 한다.업그레이드 등으로 인해 오프셋을 업데이트하지 않는consumer로 구역이 이동하면 개에게 해가 된다.이 경우: 1.원consumer는 구역 취소 이벤트를 감청하고 취소할 때 오프셋을 확인해야 합니다.인터페이스: ConsumerRebalanceListener.onPartitionsRevoked(Collection);  2. 새로운consumer는 섹션 분배 이벤트를 감청하여 현재 섹션에서 소비하는 오프셋을 가져옵니다.인터페이스: ConsumerRebalanceListener.onPartitionsAssigned(Collection);  3. consumer가 Consumer Rebalance 이벤트를 감청하여 아직 처리하거나 영구화된 캐시 데이터flush를 삭제하지 않았습니다.

5. 소비 위치 제어


대부분의 경우 서비스 측의 Consumer의 소비 위치는 클라이언트가 간헐적으로 확인한다.Kafka가 소비자 스스로 소비 시작점을 설정할 수 있도록 허용하는 효과: 1.이미 소비한 데이터를 소비할 수 있다.  2. 비약적인 소비 데이터이렇게 하는 장면을 봅시다. 1.Consumer에 있어 데이터는 시효성을 갖추고 최근 일정 시간 동안의 데이터만 얻으면 비약적으로 데이터를 얻을 수 있다.  2. 위에 오프셋 장면을 저장하고 다시 시작하면 지정된 위치부터 소비해야 합니다.인터페이스에 언급된 바와 같이 seek(TopicPartition, long)을 사용합니다.마알아, 바늘을 말하면 되잖아, 이 소절은 쓸데없는 중얼거림이야.

6. 소비 흐름 조절 Consumption Flow Control


만약consumer가 여러 개의 구역을 동시에 소비한다면 기본적으로 이 여러 구역의 우선순위는 같고 동시에 소비한다.Kafka는 일부 구역의 소비를 멈추고 다른 구역의 내용을 먼저 얻을 수 있는 메커니즘을 제공합니다.장면 전율: 1.흐름식 계산,consumer는 두 개의 Topic을 동시에 소비한 다음에 두 개의 Topic 데이터에 대해 Join 조작을 한다.그러나 이 두 Topic 안의 데이터 발생 속도 차이가 비교적 크다.Consumer는 논리를 제어하여 느린 Topic을 얻고 느린 데이터를 읽은 후에 빠른 것을 읽어야 한다.  2. 같은 여러 개의 Topic을 동시에 소비하지만 Consumer 시작은 로컬에 이미 대량의 Topic 데이터가 저장되어 있다는 것이다.이때 다른 Topic을 우선적으로 소비할 수 있다.
조절 수단: 어떤 구역의 소비를 잠시 멈추고 때가 되면 다시 회복한 다음에poll을 이어줍니다.인터페이스:pause(TopicPartition...),resume(TopicPartition...)

7. 다중 스레드 처리 모델 멀티스레드 처리


Kafka의 Consumer 인터페이스는 비스레드로 안전합니다.다중 스레드 공용 IO, Consumer 스레드는 스스로 스레드 동기화를 잘 해야 합니다.consumer를 즉시 종료하려면 호출 인터페이스:wakeup () 을 사용하여 처리 라인에 WakeupException을 생성하는 것이 유일한 방법입니다.
public class KafkaConsumerRunner implements Runnable {
    /*  ,  */
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(10000);
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

설명: 1.Kafka Consumer Runner는runnable입니다. 자각적으로 다선정 운행을 보충하십시오.  2. 외부 스레드가 Kafka ConsumerRunner 스레드의 정지를 제어합니다.  3. 주로 다선정 소비가 같은 topic이지 같은 구역을 소비하는 것이 아니다.
두 모델 비교:
Consumer 단일 스레드 모델
장점: 실현이 쉽다.라인 간의 협업이 없다.보통 아래의 것보다 더 빠르다.단일 구역 데이터의 순서 처리;단점: 여러 개의 TCP 연결이 있지만 관계가 크지 않기 때문에 kafka는 자신의 서버에 대해 자신만만하다.너무 많은 Request는 서버의 삼키기를 떨어뜨릴 수 있습니다.consumer 수량은 구역 수량의 제한을 받고 하나의consumer 구역;
Consumer 다중 스레드 모델
장점: 하나의consumer가 임의로 많은 라인으로 라인 수는 구역수 제한을 받지 않는다.단점: 만약에 질서정연한 수요가 있다면 스스로 논리를 제어해야 한다.이 모델에서 수동 offset을 사용하면 스스로 논리를 제어해야 한다.실행 가능한 해결 방법: 모든 구역에 독립된 저장소를 분배하고 얻은 데이터는 데이터가 있는 구역에 따라hash 저장소를 한다.이렇게 하면 순서 소비와 오프셋의 확인 문제를 해결할 수 있다.
홈페이지 주소:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

좋은 웹페이지 즐겨찾기