Librdkafka 는 kafka topic - partition 의 관련 데이터 구조 와 조작 에 사용 합 니 다.

27967 단어
  • topic - partition 은 kafka 분포 식 의 정수 이자 kafka 를 대상 으로 생산 하거나 소비 하 는 최소 단원 이다.
  • 이 편 에서 우 리 는 관련 데이터 구 조 를 소개 하기 시작 했다
  • 내용 은 다음 과 같다.
  • rd_kafka_topic_partition_t
  • rd_kafka_topic_partition_list_t
  • rd_kafka_toppar_s


  • rd_kafka_topic_partition_t
  • 소재 파일: src / rdkafka. h
  • 파 티 션 의 관련 데이터 구 조 를 정의 하고 간단하게 정의 하 며 자리 차지 문자
  • 정의:
  • typedef struct rd_kafka_topic_partition_s {
            char        *topic;             /**< Topic name */
            int32_t      partition;         /**< Partition */
        int64_t      offset;            /**< Offset */
            void        *metadata;          /**< Metadata */ //    leader, replicas, isr   
            size_t       metadata_size;     /**< Metadata size */
            void        *opaque;            /**< Application opaque */
            rd_kafka_resp_err_t err;        /**< Error code, depending on use. */
            void       *_private;           /**< INTERNAL USE ONLY,
                                             *   INITIALIZE TO ZERO, DO NOT TOUCH */
    } rd_kafka_topic_partition_t;
    
    

    rd_kafka_topic_partition_list_t
  • 소재 파일: src / rdkafka. h
  • 저장 rd_kafka_topic_partition_t 에 사용 되 는 동적 확장 가능 한 배열
  • 정의:
  • typedef struct rd_kafka_topic_partition_list_s {
            int cnt;               /**< Current number of elements */         element  
            int size;              /**< Current allocated size */ //        
            rd_kafka_topic_partition_t *elems; /**< Element array[] */       
    } rd_kafka_topic_partition_list_t;
    
  • 확장 작업 rd_kafka_topic_partition_list_grow:
  • rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
                                        int add_size) {
            if (add_size < rktparlist->size)
                    add_size = RD_MAX(rktparlist->size, 32);
    
            rktparlist->size += add_size;
            //   realloc      
            rktparlist->elems = rd_realloc(rktparlist->elems,
                                           sizeof(*rktparlist->elems) *
                                           rktparlist->size);
    
    }
    
  • 생 성 작업 rd_kafka_topic_partition_list_new:
  • rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
            rd_kafka_topic_partition_list_t *rktparlist;
            rktparlist = rd_calloc(1, sizeof(*rktparlist));
            rktparlist->size = size;
            rktparlist->cnt = 0;
            if (size > 0)
                    rd_kafka_topic_partition_list_grow(rktparlist, size);
            return rktparlist;
    }
    
  • 찾기 동작 rd_kafka_topic_partition_list_find: topic 과 partition 이 모두 같 아야 같은 편 입 니 다
  • rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist,
                         const char *topic, int32_t partition) {
        int i = rd_kafka_topic_partition_list_find0(rktparlist,
                                topic, partition);
        if (i == -1)
            return NULL;
        else
            return &rktparlist->elems[i];
    }
    
  • 색인 별로 삭제 rd_kafka_topic_partition_list_del_by_idx
  • rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
                          int idx) {
        if (unlikely(idx < 0 || idx >= rktparlist->cnt))
            return 0;
    
            // element   1
        rktparlist->cnt--;
            // destory       
        rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);
    
            //       ,     
        memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
            (rktparlist->cnt - idx) * sizeof(rktparlist->elems[idx]));
    
        return 1;
    }
    
  • 정렬 rd_kafka_topic_partition_list_sort_by_topic topic 이름 에 따라 topic 이름 으로 배열 하고 topic 이름 이 같 으 면 partition 으로 배열
  • void rd_kafka_topic_partition_list_sort_by_topic (
            rd_kafka_topic_partition_list_t *rktparlist) {
            rd_kafka_topic_partition_list_sort(rktparlist,
                                               rd_kafka_topic_partition_cmp, NULL);
    }
    

    rd_kafka_toppar_s
  • 소재 파일: src / rdkafkapartition.h
  • 무게 데이터 구조, topic, partition, leader, 생산, 소비, 각종 정시 타이머 가 안에 있 습 니 다
  • 정의, 이 구조 체 는 거대 하 다
  • struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
        TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink;  /* rd_kafka_t link */
        TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink; /* rd_kafka_broker_t link*/
            CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_fetchlink; /* rkb_fetch_toppars */
        TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink; /* rd_kafka_itopic_t link*/
            TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;/* rd_kafka_cgrp_t link */
            rd_kafka_itopic_t       *rktp_rkt;
            shptr_rd_kafka_itopic_t *rktp_s_rkt;  /* shared pointer for rktp_rkt */
        int32_t            rktp_partition;
            //LOCK: toppar_lock() + topic_wrlock()
            //LOCK: .. in partition_available()
            int32_t            rktp_leader_id;   /**< Current leader broker id.
                                                  *   This is updated directly
                                                  *   from metadata. */
        rd_kafka_broker_t *rktp_leader;      /**< Current leader broker
                                                  *   This updated asynchronously
                                                  *   by issuing JOIN op to
                                                  *   broker thread, so be careful
                                                  *   in using this since it
                                                  *   may lag. */
            rd_kafka_broker_t *rktp_next_leader; /**< Next leader broker after
                                                  *   async migration op. */
        rd_refcnt_t        rktp_refcnt;
        mtx_t              rktp_lock;
     rd_atomic32_t      rktp_version;         /* Latest op version.
                                                      * Authoritative (app thread)*/
        int32_t            rktp_op_version;      /* Op version of curr command
                              * state from.
                              * (broker thread) */
            int32_t            rktp_fetch_version;   /* Op version of curr fetch.
                                                        (broker thread) */
    
        enum {
            RD_KAFKA_TOPPAR_FETCH_NONE = 0,
                    RD_KAFKA_TOPPAR_FETCH_STOPPING,
                    RD_KAFKA_TOPPAR_FETCH_STOPPED,
            RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
            RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
            RD_KAFKA_TOPPAR_FETCH_ACTIVE,
        } rktp_fetch_state;    
    int32_t            rktp_fetch_msg_max_bytes; /* Max number of bytes to
                                                          * fetch.
                                                          * Locality: broker thread
                                                          */
    
            rd_ts_t            rktp_ts_fetch_backoff; /* Back off fetcher for
                                                       * this partition until this
                                                       * absolute timestamp
                                                       * expires. */
    
        int64_t            rktp_query_offset;    /* Offset to query broker for*/
        int64_t            rktp_next_offset;     /* Next offset to start
                                                      * fetching from.
                                                      * Locality: toppar thread */
        int64_t            rktp_last_next_offset; /* Last next_offset handled
                               * by fetch_decide().
                               * Locality: broker thread */
        int64_t            rktp_app_offset;      /* Last offset delivered to
                              * application + 1 */
        int64_t            rktp_stored_offset;   /* Last stored offset, but
                              * maybe not committed yet. */
            int64_t            rktp_committing_offset; /* Offset currently being
                                                        * committed */
        int64_t            rktp_committed_offset; /* Last committed offset */
        rd_ts_t            rktp_ts_committed_offset; /* Timestamp of last
                                                          * commit */
    
            struct offset_stats rktp_offsets; /* Current offsets.
                                               * Locality: broker thread*/
            struct offset_stats rktp_offsets_fin; /* Finalized offset for stats.
                                                   * Updated periodically
                                                   * by broker thread.
                                                   * Locks: toppar_lock */
    
        int64_t rktp_hi_offset;              /* Current high offset.
                              * Locks: toppar_lock */
            int64_t rktp_lo_offset;         
     rd_ts_t            rktp_ts_offset_lag;
    
        char              *rktp_offset_path;     /* Path to offset file */
        FILE              *rktp_offset_fp;       /* Offset file pointer */
            rd_kafka_cgrp_t   *rktp_cgrp;            /* Belongs to this cgrp */
    
            int                rktp_assigned;   /* Partition in cgrp assignment */
    
            rd_kafka_replyq_t  rktp_replyq; /* Current replyq+version
                         * for propagating
                         * major operations, e.g.,
                         * FETCH_STOP. */
        int                rktp_flags;
    
            shptr_rd_kafka_toppar_t *rktp_s_for_desp; /* Shared pointer for
                                                       * rkt_desp list */
            shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; /* Shared pointer for
                                                       * rkcg_toppars list */
            shptr_rd_kafka_toppar_t *rktp_s_for_rkb;  /* Shared pointer for
                                                       * rkb_toppars list */
    
        /*
         * Timers
         */
        rd_kafka_timer_t rktp_offset_query_tmr;  /* Offset query timer */
        rd_kafka_timer_t rktp_offset_commit_tmr; /* Offset commit timer */
        rd_kafka_timer_t rktp_offset_sync_tmr;   /* Offset file sync timer */
            rd_kafka_timer_t rktp_consumer_lag_tmr;  /* Consumer lag monitoring
                              * timer */
    
            int rktp_wait_consumer_lag_resp;         /* Waiting for consumer lag
                                                      * response. */
    
        struct {
            rd_atomic64_t tx_msgs;
            rd_atomic64_t tx_bytes;
                    rd_atomic64_t msgs;
                    rd_atomic64_t rx_ver_drops;
        } rktp_c;
    }
    
  • 대상 만 들 기 rd_kafka_toppar_t:
  • shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt,
                               int32_t partition,
                               const char *func, int line) {
        rd_kafka_toppar_t *rktp;
    
           //     
        rktp = rd_calloc(1, sizeof(*rktp));
    
            //     
        rktp->rktp_partition = partition;
    
            //     topic
        rktp->rktp_rkt = rkt;
    
            rktp->rktp_leader_id = -1;
        rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
            rktp->rktp_fetch_msg_max_bytes
                = rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
        rktp->rktp_offset_fp = NULL;
            rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
            rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
            rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
        rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
        rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
            rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
            rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
        rd_kafka_msgq_init(&rktp->rktp_msgq);
            rktp->rktp_msgq_wakeup_fd = -1;
        rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
        mtx_init(&rktp->rktp_lock, mtx_plain);
    
            rd_refcnt_init(&rktp->rktp_refcnt, 0);
        rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
            rktp->rktp_ops    = rd_kafka_q_new(rkt->rkt_rk);
            rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
            rktp->rktp_ops->rkq_opaque = rktp;
            rd_atomic32_init(&rktp->rktp_version, 1);
        rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);
    
            //     timer,         lag  ,       `rd_kafka_toppar_t`     timer,    ,           partiton timer
            if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
                rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
                rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
                    int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
                    if (intvl < 10 * 1000 /* 10s */)
                            intvl = 10 * 1000;
            rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
                         &rktp->rktp_consumer_lag_tmr,
                                         intvl * 1000ll,
                         rd_kafka_toppar_consumer_lag_tmr_cb,
                         rktp);
            }
    
            rktp->rktp_s_rkt = rd_kafka_topic_keep(rkt);
    
            //    fwd op queue rd_kakfa_t  rd_ops,     rd_kafka_toppar_t     ops_queue  rd_kafka_t  
        rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW", "NEW %s [%"PRId32"] %p (at %s:%d)",
                 rkt->rkt_topic->str, rktp->rktp_partition, rktp,
                 func, line);
    
        return rd_kafka_toppar_keep_src(func, line, rktp);
    }
    
  • 1 개 rd_kafka_toppar_new0 대상 폐기 rd_kafka_toppar_t
  • void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {
            //      timer,   ops queue
            rd_kafka_toppar_remove(rktp);
    
            //  msgq  kafka message   app    
        rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
                 RD_KAFKA_RESP_ERR__DESTROY);
        rd_kafka_q_destroy_owner(rktp->rktp_fetchq);
            rd_kafka_q_destroy_owner(rktp->rktp_ops);
    
        rd_kafka_replyq_destroy(&rktp->rktp_replyq);
    
        rd_kafka_topic_destroy0(rktp->rktp_s_rkt);
    
        mtx_destroy(&rktp->rktp_lock);
    
            rd_refcnt_destroy(&rktp->rktp_refcnt);
    
        rd_free(rktp);
    }
    
  • 하나 rd_kafka_toppar_destroy_final (이것 은 우리 뒤에 전문 적 인 장 이 있 습 니 다. 여 기 는 topic 를 표시 하 는 것 만 알 면 됩 니 다. 그 안에 parition 목록 이 포함 되 어 있 습 니 다) 에서 지정 한 parition 을 가 져 옵 니 다.
  • shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
                                                   const rd_kafka_itopic_t *rkt,
                                                   int32_t partition,
                                                   int ua_on_miss) {
            shptr_rd_kafka_toppar_t *s_rktp;
     
            //           partition
        if (partition >= 0 && partition < rkt->rkt_partition_cnt)
            s_rktp = rkt->rkt_p[partition];
        else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
            s_rktp = rkt->rkt_ua;
        else
            return NULL;
    
        if (s_rktp)
                   //      1 
                    return rd_kafka_toppar_keep_src(func,line,
                                                    rd_kafka_toppar_s2i(s_rktp));
    
        return NULL;
    }
    
  • topic 이름과 partition 에 따라 rd_kafka_itopic_t 대상 을 가 져 옵 니 다. topic 을 찾 지 못 하면 이 rd_kafka_toppar_t 대상
  • 을 먼저 만 듭 니 다.
    shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
                                                   const char *topic,
                                                   int32_t partition,
                                                   int ua_on_miss,
                                                   int create_on_miss) {
        shptr_rd_kafka_itopic_t *s_rkt;
            rd_kafka_itopic_t *rkt;
            shptr_rd_kafka_toppar_t *s_rktp;
    
            rd_kafka_wrlock(rk);
    
            /* Find or create topic */
            //     rd_kafka_itopic_t     rd_kafka_t rkt_topic tailq   ,      
        if (unlikely(!(s_rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
                    if (!create_on_miss) {
                            rd_kafka_wrunlock(rk);
                            return NULL;
                    }
                    //          rd_kafka_itopic_t  
                    s_rkt = rd_kafka_topic_new0(rk, topic, NULL,
                            NULL, 0/*no-lock*/);
                    if (!s_rkt) {
                            rd_kafka_wrunlock(rk);
                            rd_kafka_log(rk, LOG_ERR, "TOPIC",
                                         "Failed to create local topic \"%s\": %s",
                                         topic, rd_strerror(errno));
                            return NULL;
                    }
            }
    
            rd_kafka_wrunlock(rk);
    
            rkt = rd_kafka_topic_s2i(s_rkt);
    
        rd_kafka_topic_wrlock(rkt);
        s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
        rd_kafka_topic_wrunlock(rkt);
    
            rd_kafka_topic_destroy0(s_rkt);
    
        return s_rktp;
    }
    
  • rd_kafka_itopic_t: desire partition 상태의 parititon, 소스 코드 의 설명 은 다음 과 같다.
  • The desired partition list is the list of partitions that are desired (e.g., by the consumer) but not yet seen on a broker. As soon as the partition is seen on a broker the toppar is moved from the desired list and onto the normal rkt_p array. When the partition on the broker goes away a desired partition is put back on the desired list
    쉽게 말 하면 특정한 파 티 션 이 필요 하 다 는 것 이다. 그러나 이 parition 의 구체 적 인 정 보 는 broker 에서 제거 되 지 않 았 다. 이런 parition 은 바로 desire parition 이다. desired partition 에서 하나의 rd_kafka_itopic_t list 가 있 는데 이런 parition 을 저장 하 는 데 사용 되 는데 다음 과 같은 몇 가지 조작 이 있 는데 모두 간단 하 다.
    rd_kafka_toppar_desired_get
    rd_kafka_toppar_desired_link
    rd_kafka_toppar_desired_unlink
    rd_kafka_toppar_desired_add0
    rd_kafka_toppar_desired_add
    rd_kafka_toppar_desired_del
    
  • partition 이 broker 간 이전 rkt_desp:
  • static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
                                                rd_kafka_broker_t *old_rkb,
                                                rd_kafka_broker_t *new_rkb) {
            rd_kafka_op_t *rko;
            rd_kafka_broker_t *dest_rkb;
            int had_next_leader = rktp->rktp_next_leader ? 1 : 0;
    
            /* Update next leader */
            if (new_rkb)
                    rd_kafka_broker_keep(new_rkb);
            if (rktp->rktp_next_leader)
                    rd_kafka_broker_destroy(rktp->rktp_next_leader);
            rktp->rktp_next_leader = new_rkb;
            
            //                ,            ? 
            if (had_next_leader)
                    return;
    
        if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) {
            rd_kafka_toppar_set_fetch_state(
                rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
            rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
                         &rktp->rktp_offset_query_tmr,
                         500*1000,
                         rd_kafka_offset_query_tmr_cb,
                         rktp);
        }
    
            //     broker  LEAVE op
            if (old_rkb) {
                    rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
                    dest_rkb = old_rkb;
            } else {
                    /* No existing broker, send join op directly to new leader. */
                    rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
                    dest_rkb = new_rkb;
            }
    
            rko->rko_rktp = rd_kafka_toppar_keep(rktp);
    
            rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
    }
    
  • broker 의 delegate 작업:
  • void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
                          rd_kafka_broker_t *rkb,
                          int for_removal) {
            rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
            int internal_fallback = 0;
    
            /* Delegate toppars with no leader to the
             * internal broker for bookkeeping. */
            //       broker NULL,      internal broker -> rkb
            if (!rkb && !for_removal && !rd_kafka_terminating(rk)) {
                    rkb = rd_kafka_broker_internal(rk);
                    internal_fallback = 1;
            }
    
        if (rktp->rktp_leader == rkb && !rktp->rktp_next_leader) {
                    rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
                     "%.*s [%"PRId32"]: not updating broker: "
                                 "already on correct broker %s",
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition,
                                 rkb ? rd_kafka_broker_name(rkb) : "(none)");
    
                    if (internal_fallback)
                            rd_kafka_broker_destroy(rkb);
            return;
            }
    
            //        
            if (rktp->rktp_leader || rkb)
                    rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_leader, rkb);
    
            if (internal_fallback)
                    rd_kafka_broker_destroy(rkb);
    }
    
  • offset 를 broker rd_kafka_toppar_broker_migrate
  • 에 제출
    void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset,
                        const char *metadata) {
            rd_kafka_topic_partition_list_t *offsets;
            rd_kafka_topic_partition_t *rktpar;
    
            //      rd_kafka_topic_partition_list,     topic    ,       offset
            offsets = rd_kafka_topic_partition_list_new(1);
            rktpar = rd_kafka_topic_partition_list_add(
                    offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
            rktpar->offset = offset;
            if (metadata) {
                    rktpar->metadata = rd_strdup(metadata);
                    rktpar->metadata_size = strlen(metadata);
            }
    
            // rd_kafka_toppar_t    rktp_committing_offset,       offset
            rktp->rktp_committing_offset = offset;
    
           //     offset,          kafka consumer      
            rd_kafka_commit(rktp->rktp_rkt->rkt_rk, offsets, 1/*async*/);
    
            rd_kafka_topic_partition_list_destroy(offsets);
    }
    
  • 다음 데 이 터 를 끌 어 올 릴 때 시작 하 는 offset 위 치 를 설정 합 니 다. 즉 rd_kafka_toppar_offset_commitrd_kafka_toppar_t
  • void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
                                             int64_t Offset) {
            //   Offset BEGINNING,END,     rd_kafka_toppar_offset_request  , broker  offset
            //   Offset RD_KAFKA_OFFSET_INVALID,   enqueue  error op,   fetch   RD_KAFKA_TOPPAR_FETCH_NONE
            if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
                    /* Offset storage returned logical offset (e.g. "end"),
                     * look it up. */
                    rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
                                          "update");
                    return;
            }
    
            /* Adjust by TAIL count if, if wanted */
            //    tail     cnt offset   
            if (rktp->rktp_query_offset <=
                RD_KAFKA_OFFSET_TAIL_BASE) {
                    int64_t orig_Offset = Offset;
                    int64_t tail_cnt =
                            llabs(rktp->rktp_query_offset -
                                  RD_KAFKA_OFFSET_TAIL_BASE);
    
                    if (tail_cnt > Offset)
                            Offset = 0;
                    else
                            Offset -= tail_cnt;
            }
    
            //  rktp_next_offset
            rktp->rktp_next_offset = Offset;
    
            rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
    
            /* Wake-up broker thread which might be idling on IO */
            if (rktp->rktp_leader)
                    rd_kafka_broker_wakeup(rktp->rktp_leader);
    
    }
    
  • coordinator 에서 제출 한 offset (FetchOffset Request) rktp_next_offset:
  • void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
                                       rd_kafka_replyq_t replyq) {
            rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
            rd_kafka_topic_partition_list_t *part;
            rd_kafka_op_t *rko;
    
            part = rd_kafka_topic_partition_list_new(1);
            rd_kafka_topic_partition_list_add0(part,
                                               rktp->rktp_rkt->rkt_topic->str,
                                               rktp->rktp_partition,
                           rd_kafka_toppar_keep(rktp));
    
            //   OffsetFetch operator
            rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
        rko->rko_rktp = rd_kafka_toppar_keep(rktp);
        rko->rko_replyq = replyq;
    
        rko->rko_u.offset_fetch.partitions = part;
        rko->rko_u.offset_fetch.do_free = 1;
    
            // OffsetFetch          ,  cgrp op queue 
            rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
    }
    
  • 소비 에 사용 할 효과 적 인 offset
  • 가 져 오기
    void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
                         int64_t query_offset, int backoff_ms) {
        rd_kafka_broker_t *rkb;
            rkb = rktp->rktp_leader;
    
             //   rkb    ,     timer   query
            if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
                    backoff_ms = 500;
    
            if (backoff_ms) {
                    rd_kafka_toppar_set_fetch_state(
                            rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
                    //   timer, timer     rd_kafka_offset_query_tmr_cb  ,              
            rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
                         &rktp->rktp_offset_query_tmr,
                         backoff_ms*1000ll,
                         rd_kafka_offset_query_tmr_cb, rktp);
            return;
            }
    
            // stop     timer
            rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
                                &rktp->rktp_offset_query_tmr, 1/*lock*/);
    
            //  coordinattor       offset
        if (query_offset == RD_KAFKA_OFFSET_STORED &&
                rktp->rktp_rkt->rkt_conf.offset_store_method ==
                RD_KAFKA_OFFSET_METHOD_BROKER) {
                    /*
                     * Get stored offset from broker based storage:
                     * ask cgrp manager for offsets
                     */
                    rd_kafka_toppar_offset_fetch(
                rktp,
                RD_KAFKA_REPLYQ(rktp->rktp_ops,
                        rktp->rktp_op_version));
    
        } else {
                    shptr_rd_kafka_toppar_t *s_rktp;
                    rd_kafka_topic_partition_list_t *offsets;
    
                    /*
                     * Look up logical offset (end,beginning,tail,..)
                     */
                    s_rktp = rd_kafka_toppar_keep(rktp);
    
            if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
                query_offset = RD_KAFKA_OFFSET_END;
    
                    offsets = rd_kafka_topic_partition_list_new(1);
                    rd_kafka_topic_partition_list_add(
                            offsets,
                            rktp->rktp_rkt->rkt_topic->str,
                            rktp->rktp_partition)->offset = query_offset;
                    
                    //      reset offset,     partition   offset   offset
                    rd_kafka_OffsetRequest(rkb, offsets, 0,
                                           RD_KAFKA_REPLYQ(rktp->rktp_ops,
                                                           rktp->rktp_op_version),
                                           rd_kafka_toppar_handle_Offset,
                                           s_rktp);
    
                    rd_kafka_topic_partition_list_destroy(offsets);
            }
    
            rd_kafka_toppar_set_fetch_state(rktp,
                    RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
    }
    

    Librdkafka 소스 코드 분석 - Content Table

    좋은 웹페이지 즐겨찾기