Librdkafka 는 kafka topic - partition 의 관련 데이터 구조 와 조작 에 사용 합 니 다.
rd_kafka_topic_partition_t
typedef struct rd_kafka_topic_partition_s {
char *topic; /**< Topic name */
int32_t partition; /**< Partition */
int64_t offset; /**< Offset */
void *metadata; /**< Metadata */ // leader, replicas, isr
size_t metadata_size; /**< Metadata size */
void *opaque; /**< Application opaque */
rd_kafka_resp_err_t err; /**< Error code, depending on use. */
void *_private; /**< INTERNAL USE ONLY,
* INITIALIZE TO ZERO, DO NOT TOUCH */
} rd_kafka_topic_partition_t;
rd_kafka_topic_partition_list_t
rd_kafka_topic_partition_t
에 사용 되 는 동적 확장 가능 한 배열 typedef struct rd_kafka_topic_partition_list_s {
int cnt; /**< Current number of elements */ element
int size; /**< Current allocated size */ //
rd_kafka_topic_partition_t *elems; /**< Element array[] */
} rd_kafka_topic_partition_list_t;
rd_kafka_topic_partition_list_grow
: rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
int add_size) {
if (add_size < rktparlist->size)
add_size = RD_MAX(rktparlist->size, 32);
rktparlist->size += add_size;
// realloc
rktparlist->elems = rd_realloc(rktparlist->elems,
sizeof(*rktparlist->elems) *
rktparlist->size);
}
rd_kafka_topic_partition_list_new
: rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
rd_kafka_topic_partition_list_t *rktparlist;
rktparlist = rd_calloc(1, sizeof(*rktparlist));
rktparlist->size = size;
rktparlist->cnt = 0;
if (size > 0)
rd_kafka_topic_partition_list_grow(rktparlist, size);
return rktparlist;
}
rd_kafka_topic_partition_list_find
: topic 과 partition 이 모두 같 아야 같은 편 입 니 다 rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition) {
int i = rd_kafka_topic_partition_list_find0(rktparlist,
topic, partition);
if (i == -1)
return NULL;
else
return &rktparlist->elems[i];
}
rd_kafka_topic_partition_list_del_by_idx
rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
int idx) {
if (unlikely(idx < 0 || idx >= rktparlist->cnt))
return 0;
// element 1
rktparlist->cnt--;
// destory
rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);
// ,
memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
(rktparlist->cnt - idx) * sizeof(rktparlist->elems[idx]));
return 1;
}
rd_kafka_topic_partition_list_sort_by_topic
topic 이름 에 따라 topic 이름 으로 배열 하고 topic 이름 이 같 으 면 partition 으로 배열 void rd_kafka_topic_partition_list_sort_by_topic (
rd_kafka_topic_partition_list_t *rktparlist) {
rd_kafka_topic_partition_list_sort(rktparlist,
rd_kafka_topic_partition_cmp, NULL);
}
rd_kafka_toppar_s
struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink; /* rd_kafka_t link */
TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink; /* rd_kafka_broker_t link*/
CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_fetchlink; /* rkb_fetch_toppars */
TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink; /* rd_kafka_itopic_t link*/
TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;/* rd_kafka_cgrp_t link */
rd_kafka_itopic_t *rktp_rkt;
shptr_rd_kafka_itopic_t *rktp_s_rkt; /* shared pointer for rktp_rkt */
int32_t rktp_partition;
//LOCK: toppar_lock() + topic_wrlock()
//LOCK: .. in partition_available()
int32_t rktp_leader_id; /**< Current leader broker id.
* This is updated directly
* from metadata. */
rd_kafka_broker_t *rktp_leader; /**< Current leader broker
* This updated asynchronously
* by issuing JOIN op to
* broker thread, so be careful
* in using this since it
* may lag. */
rd_kafka_broker_t *rktp_next_leader; /**< Next leader broker after
* async migration op. */
rd_refcnt_t rktp_refcnt;
mtx_t rktp_lock;
rd_atomic32_t rktp_version; /* Latest op version.
* Authoritative (app thread)*/
int32_t rktp_op_version; /* Op version of curr command
* state from.
* (broker thread) */
int32_t rktp_fetch_version; /* Op version of curr fetch.
(broker thread) */
enum {
RD_KAFKA_TOPPAR_FETCH_NONE = 0,
RD_KAFKA_TOPPAR_FETCH_STOPPING,
RD_KAFKA_TOPPAR_FETCH_STOPPED,
RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
RD_KAFKA_TOPPAR_FETCH_ACTIVE,
} rktp_fetch_state;
int32_t rktp_fetch_msg_max_bytes; /* Max number of bytes to
* fetch.
* Locality: broker thread
*/
rd_ts_t rktp_ts_fetch_backoff; /* Back off fetcher for
* this partition until this
* absolute timestamp
* expires. */
int64_t rktp_query_offset; /* Offset to query broker for*/
int64_t rktp_next_offset; /* Next offset to start
* fetching from.
* Locality: toppar thread */
int64_t rktp_last_next_offset; /* Last next_offset handled
* by fetch_decide().
* Locality: broker thread */
int64_t rktp_app_offset; /* Last offset delivered to
* application + 1 */
int64_t rktp_stored_offset; /* Last stored offset, but
* maybe not committed yet. */
int64_t rktp_committing_offset; /* Offset currently being
* committed */
int64_t rktp_committed_offset; /* Last committed offset */
rd_ts_t rktp_ts_committed_offset; /* Timestamp of last
* commit */
struct offset_stats rktp_offsets; /* Current offsets.
* Locality: broker thread*/
struct offset_stats rktp_offsets_fin; /* Finalized offset for stats.
* Updated periodically
* by broker thread.
* Locks: toppar_lock */
int64_t rktp_hi_offset; /* Current high offset.
* Locks: toppar_lock */
int64_t rktp_lo_offset;
rd_ts_t rktp_ts_offset_lag;
char *rktp_offset_path; /* Path to offset file */
FILE *rktp_offset_fp; /* Offset file pointer */
rd_kafka_cgrp_t *rktp_cgrp; /* Belongs to this cgrp */
int rktp_assigned; /* Partition in cgrp assignment */
rd_kafka_replyq_t rktp_replyq; /* Current replyq+version
* for propagating
* major operations, e.g.,
* FETCH_STOP. */
int rktp_flags;
shptr_rd_kafka_toppar_t *rktp_s_for_desp; /* Shared pointer for
* rkt_desp list */
shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; /* Shared pointer for
* rkcg_toppars list */
shptr_rd_kafka_toppar_t *rktp_s_for_rkb; /* Shared pointer for
* rkb_toppars list */
/*
* Timers
*/
rd_kafka_timer_t rktp_offset_query_tmr; /* Offset query timer */
rd_kafka_timer_t rktp_offset_commit_tmr; /* Offset commit timer */
rd_kafka_timer_t rktp_offset_sync_tmr; /* Offset file sync timer */
rd_kafka_timer_t rktp_consumer_lag_tmr; /* Consumer lag monitoring
* timer */
int rktp_wait_consumer_lag_resp; /* Waiting for consumer lag
* response. */
struct {
rd_atomic64_t tx_msgs;
rd_atomic64_t tx_bytes;
rd_atomic64_t msgs;
rd_atomic64_t rx_ver_drops;
} rktp_c;
}
rd_kafka_toppar_t
: shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt,
int32_t partition,
const char *func, int line) {
rd_kafka_toppar_t *rktp;
//
rktp = rd_calloc(1, sizeof(*rktp));
//
rktp->rktp_partition = partition;
// topic
rktp->rktp_rkt = rkt;
rktp->rktp_leader_id = -1;
rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
rktp->rktp_fetch_msg_max_bytes
= rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
rktp->rktp_offset_fp = NULL;
rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
rd_kafka_msgq_init(&rktp->rktp_msgq);
rktp->rktp_msgq_wakeup_fd = -1;
rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
mtx_init(&rktp->rktp_lock, mtx_plain);
rd_refcnt_init(&rktp->rktp_refcnt, 0);
rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk);
rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
rktp->rktp_ops->rkq_opaque = rktp;
rd_atomic32_init(&rktp->rktp_version, 1);
rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);
// timer, lag , `rd_kafka_toppar_t` timer, , partiton timer
if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
if (intvl < 10 * 1000 /* 10s */)
intvl = 10 * 1000;
rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
&rktp->rktp_consumer_lag_tmr,
intvl * 1000ll,
rd_kafka_toppar_consumer_lag_tmr_cb,
rktp);
}
rktp->rktp_s_rkt = rd_kafka_topic_keep(rkt);
// fwd op queue rd_kakfa_t rd_ops, rd_kafka_toppar_t ops_queue rd_kafka_t
rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW", "NEW %s [%"PRId32"] %p (at %s:%d)",
rkt->rkt_topic->str, rktp->rktp_partition, rktp,
func, line);
return rd_kafka_toppar_keep_src(func, line, rktp);
}
rd_kafka_toppar_new0
대상 폐기 rd_kafka_toppar_t
void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {
// timer, ops queue
rd_kafka_toppar_remove(rktp);
// msgq kafka message app
rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
RD_KAFKA_RESP_ERR__DESTROY);
rd_kafka_q_destroy_owner(rktp->rktp_fetchq);
rd_kafka_q_destroy_owner(rktp->rktp_ops);
rd_kafka_replyq_destroy(&rktp->rktp_replyq);
rd_kafka_topic_destroy0(rktp->rktp_s_rkt);
mtx_destroy(&rktp->rktp_lock);
rd_refcnt_destroy(&rktp->rktp_refcnt);
rd_free(rktp);
}
rd_kafka_toppar_destroy_final
(이것 은 우리 뒤에 전문 적 인 장 이 있 습 니 다. 여 기 는 topic 를 표시 하 는 것 만 알 면 됩 니 다. 그 안에 parition 목록 이 포함 되 어 있 습 니 다) 에서 지정 한 parition 을 가 져 옵 니 다. shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
const rd_kafka_itopic_t *rkt,
int32_t partition,
int ua_on_miss) {
shptr_rd_kafka_toppar_t *s_rktp;
// partition
if (partition >= 0 && partition < rkt->rkt_partition_cnt)
s_rktp = rkt->rkt_p[partition];
else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
s_rktp = rkt->rkt_ua;
else
return NULL;
if (s_rktp)
// 1
return rd_kafka_toppar_keep_src(func,line,
rd_kafka_toppar_s2i(s_rktp));
return NULL;
}
rd_kafka_itopic_t
대상 을 가 져 옵 니 다. topic 을 찾 지 못 하면 이 rd_kafka_toppar_t
대상 shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
const char *topic,
int32_t partition,
int ua_on_miss,
int create_on_miss) {
shptr_rd_kafka_itopic_t *s_rkt;
rd_kafka_itopic_t *rkt;
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_wrlock(rk);
/* Find or create topic */
// rd_kafka_itopic_t rd_kafka_t rkt_topic tailq ,
if (unlikely(!(s_rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
if (!create_on_miss) {
rd_kafka_wrunlock(rk);
return NULL;
}
// rd_kafka_itopic_t
s_rkt = rd_kafka_topic_new0(rk, topic, NULL,
NULL, 0/*no-lock*/);
if (!s_rkt) {
rd_kafka_wrunlock(rk);
rd_kafka_log(rk, LOG_ERR, "TOPIC",
"Failed to create local topic \"%s\": %s",
topic, rd_strerror(errno));
return NULL;
}
}
rd_kafka_wrunlock(rk);
rkt = rd_kafka_topic_s2i(s_rkt);
rd_kafka_topic_wrlock(rkt);
s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
rd_kafka_topic_wrunlock(rkt);
rd_kafka_topic_destroy0(s_rkt);
return s_rktp;
}
rd_kafka_itopic_t
: desire partition 상태의 parititon, 소스 코드 의 설명 은 다음 과 같다. 쉽게 말 하면 특정한 파 티 션 이 필요 하 다 는 것 이다. 그러나 이 parition 의 구체 적 인 정 보 는 broker 에서 제거 되 지 않 았 다. 이런 parition 은 바로 desire parition 이다.
desired partition
에서 하나의 rd_kafka_itopic_t
list 가 있 는데 이런 parition 을 저장 하 는 데 사용 되 는데 다음 과 같은 몇 가지 조작 이 있 는데 모두 간단 하 다.rd_kafka_toppar_desired_get
rd_kafka_toppar_desired_link
rd_kafka_toppar_desired_unlink
rd_kafka_toppar_desired_add0
rd_kafka_toppar_desired_add
rd_kafka_toppar_desired_del
rkt_desp
: static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *old_rkb,
rd_kafka_broker_t *new_rkb) {
rd_kafka_op_t *rko;
rd_kafka_broker_t *dest_rkb;
int had_next_leader = rktp->rktp_next_leader ? 1 : 0;
/* Update next leader */
if (new_rkb)
rd_kafka_broker_keep(new_rkb);
if (rktp->rktp_next_leader)
rd_kafka_broker_destroy(rktp->rktp_next_leader);
rktp->rktp_next_leader = new_rkb;
// , ?
if (had_next_leader)
return;
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) {
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr,
500*1000,
rd_kafka_offset_query_tmr_cb,
rktp);
}
// broker LEAVE op
if (old_rkb) {
rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
dest_rkb = old_rkb;
} else {
/* No existing broker, send join op directly to new leader. */
rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
dest_rkb = new_rkb;
}
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
}
void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *rkb,
int for_removal) {
rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
int internal_fallback = 0;
/* Delegate toppars with no leader to the
* internal broker for bookkeeping. */
// broker NULL, internal broker -> rkb
if (!rkb && !for_removal && !rd_kafka_terminating(rk)) {
rkb = rd_kafka_broker_internal(rk);
internal_fallback = 1;
}
if (rktp->rktp_leader == rkb && !rktp->rktp_next_leader) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
"%.*s [%"PRId32"]: not updating broker: "
"already on correct broker %s",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rkb ? rd_kafka_broker_name(rkb) : "(none)");
if (internal_fallback)
rd_kafka_broker_destroy(rkb);
return;
}
//
if (rktp->rktp_leader || rkb)
rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_leader, rkb);
if (internal_fallback)
rd_kafka_broker_destroy(rkb);
}
rd_kafka_toppar_broker_migrate
void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset,
const char *metadata) {
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_topic_partition_t *rktpar;
// rd_kafka_topic_partition_list, topic , offset
offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
rktpar->offset = offset;
if (metadata) {
rktpar->metadata = rd_strdup(metadata);
rktpar->metadata_size = strlen(metadata);
}
// rd_kafka_toppar_t rktp_committing_offset, offset
rktp->rktp_committing_offset = offset;
// offset, kafka consumer
rd_kafka_commit(rktp->rktp_rkt->rkt_rk, offsets, 1/*async*/);
rd_kafka_topic_partition_list_destroy(offsets);
}
rd_kafka_toppar_offset_commit
의 rd_kafka_toppar_t
void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
int64_t Offset) {
// Offset BEGINNING,END, rd_kafka_toppar_offset_request , broker offset
// Offset RD_KAFKA_OFFSET_INVALID, enqueue error op, fetch RD_KAFKA_TOPPAR_FETCH_NONE
if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
/* Offset storage returned logical offset (e.g. "end"),
* look it up. */
rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
"update");
return;
}
/* Adjust by TAIL count if, if wanted */
// tail cnt offset
if (rktp->rktp_query_offset <=
RD_KAFKA_OFFSET_TAIL_BASE) {
int64_t orig_Offset = Offset;
int64_t tail_cnt =
llabs(rktp->rktp_query_offset -
RD_KAFKA_OFFSET_TAIL_BASE);
if (tail_cnt > Offset)
Offset = 0;
else
Offset -= tail_cnt;
}
// rktp_next_offset
rktp->rktp_next_offset = Offset;
rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
/* Wake-up broker thread which might be idling on IO */
if (rktp->rktp_leader)
rd_kafka_broker_wakeup(rktp->rktp_leader);
}
rktp_next_offset
: void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
rd_kafka_replyq_t replyq) {
rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
rd_kafka_topic_partition_list_t *part;
rd_kafka_op_t *rko;
part = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add0(part,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rd_kafka_toppar_keep(rktp));
// OffsetFetch operator
rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rko->rko_replyq = replyq;
rko->rko_u.offset_fetch.partitions = part;
rko->rko_u.offset_fetch.do_free = 1;
// OffsetFetch , cgrp op queue
rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
}
void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
int64_t query_offset, int backoff_ms) {
rd_kafka_broker_t *rkb;
rkb = rktp->rktp_leader;
// rkb , timer query
if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
backoff_ms = 500;
if (backoff_ms) {
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
// timer, timer rd_kafka_offset_query_tmr_cb ,
rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr,
backoff_ms*1000ll,
rd_kafka_offset_query_tmr_cb, rktp);
return;
}
// stop timer
rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr, 1/*lock*/);
// coordinattor offset
if (query_offset == RD_KAFKA_OFFSET_STORED &&
rktp->rktp_rkt->rkt_conf.offset_store_method ==
RD_KAFKA_OFFSET_METHOD_BROKER) {
/*
* Get stored offset from broker based storage:
* ask cgrp manager for offsets
*/
rd_kafka_toppar_offset_fetch(
rktp,
RD_KAFKA_REPLYQ(rktp->rktp_ops,
rktp->rktp_op_version));
} else {
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_topic_partition_list_t *offsets;
/*
* Look up logical offset (end,beginning,tail,..)
*/
s_rktp = rd_kafka_toppar_keep(rktp);
if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
query_offset = RD_KAFKA_OFFSET_END;
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(
offsets,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition)->offset = query_offset;
// reset offset, partition offset offset
rd_kafka_OffsetRequest(rkb, offsets, 0,
RD_KAFKA_REPLYQ(rktp->rktp_ops,
rktp->rktp_op_version),
rd_kafka_toppar_handle_Offset,
s_rktp);
rd_kafka_topic_partition_list_destroy(offsets);
}
rd_kafka_toppar_set_fetch_state(rktp,
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
}
Librdkafka 소스 코드 분석 - Content Table
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.