storm-kafka-0.8-plus 소스 코드 분석
55014 단어 storm
http://blog.csdn.net/xeseo/article/details/18615761
준비
GlobalPartitionInformation (storm.kafka.trident)
paritionid 와 broker 의 관 계 를 기록 합 니 다.
GlobalPartitionInformation info = new GlobalPartitionInformation();
info.addPartition(0, new Broker("10.1.110.24",9092));
info.addPartition(0, new Broker("10.1.110.21",9092));
GlobalPartition Information 를 정적 으로 생 성 할 수 있 습 니 다.위 코드 와 마찬가지 로 zk 에서 도 동적 으로 가 져 올 수 있 습 니 다.이 방식 을 추천 합 니 다.zk 에서 가 져 오 면 DynamicBrokersReader 를 사용 합 니 다.
DynamicBrokersReader
핵심 은 zk 에서 파 티 션 과 broker 의 대응 관 계 를 읽 는 것 입 니 다.zk 는 모두 curator 프레임 워 크 를 사용 합 니 다.
핵심 함수,
/**
* Get all partitions with their current leaders
*/
public GlobalPartitionInformation getBrokerInfo() {
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
try {
int numPartitionsForTopic = getNumPartitions(); // zk partition
String brokerInfoPath = brokerPath();
for (int partition = 0; partition < numPartitionsForTopic; partition++) {
int leader = getLeaderFor(partition); // zk partition leader broker
String path = brokerInfoPath + "/" + leader;
try {
byte[] brokerData = _curator.getData().forPath(path);
Broker hp = getBrokerHost(brokerData); // zk broker host:port
globalPartitionInformation.addPartition(partition, hp);// GlobalPartitionInformation
} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
LOG.error("Node {} does not exist ", path);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
return globalPartitionInformation;
}
DynamicPartitionConnections
모든 broker 의 connection 을 유지 하고 모든 broker 에 대응 하 는 paritions 를 기록 합 니 다.
핵심 데이터 구 조 는 모든 broker 에 하나의 Connection Info 를 유지 합 니 다.
Map<Broker, ConnectionInfo> _connections = new HashMap();
ConnectionInfo 의 정 의 는 이 브로커 를 연결 하 는 Simple Consumer 와 파 티 션 을 기록 하 는 set 를 포함 합 니 다.
static class ConnectionInfo {
SimpleConsumer consumer;
Set<Integer> partitions = new HashSet();
public ConnectionInfo(SimpleConsumer consumer) {
this.consumer = consumer;
}
}
핵심 함수
public SimpleConsumer register(Broker host, int partition) {
if (!_connections.containsKey(host)) {
_connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
}
ConnectionInfo info = _connections.get(host);
info.partitions.add(partition);
return info.consumer;
}
PartitionManager
핵심 논 리 는 partiton 의 읽 기 상 태 를 관리 하 는 데 사용 되 며 다음 변 수 를 먼저 이해 합 니 다.
Long _emittedToOffset;
Long _committedTo;
SortedSet<Long> _pending = new TreeSet<Long>();
LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
kafka 는 하나의 partition 에 대해 반드시 offset 에서 어 렸 을 때 부터 순서대로 읽 었 을 것 입 니 다.또한 데 이 터 를 읽 지 않 고 잃 어 버 리 지 않도록 현재 상태 인 offset 을 zk 에 정기 적 으로 기록 합 니 다.
몇 개의 중간 상태,
kafka 에서 읽 은 offset,emitted ToOffset kafka 에서 읽 은 messages 는waiting ToEmit,이 list 를 넣 으 면 emit 가 될 거 라 고 생각 하기 때문에 emitted ToOffset 은 kafka 에서 읽 은 offset 이 라 고 볼 수 있 습 니 다.
성공 적 으로 처 리 된 offset,lastComplete offset message 는 storm 에서 처리 해 야 하기 때문에 fail 이 가능 하기 때문에 처리 하고 있 는 offset 은 캐 시 입 니 다pending 중의 만약pending 이 비어 있 으 면 lastComplete Offset=emittedToOffset 만약pending 이 비어 있 지 않 습 니 다.lastComplete Offset 는 pending list 의 첫 번 째 offset 입 니 다.뒤에 ack 를 기다 리 고 있 기 때 문 입 니 다.
public long lastCompletedOffset() {
if (_pending.isEmpty()) {
return _emittedToOffset;
} else {
return _pending.first();
}
}
zk 에 기 록 된 offset,committed To 우 리 는 정기 적 으로 lastComplete Offset 를 zk 에 기록 해 야 합 니 다.그렇지 않 으 면 crash 후에 우 리 는 지난번 에 어디 까지 읽 었 는 지 모 릅 니 다.그래서committedTo <= lastCompletedOffset
전체 과정,
1.초기 화,
관건 은 파 티 션 을 등록 하고 offset 을 초기 화 하 는 것 입 니 다.어디서부터 읽 어야 할 지 알 수 있 습 니 다.
public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
_partition = id;
_connections = connections;
_spoutConfig = spoutConfig;
_topologyInstanceId = topologyInstanceId;
_consumer = connections.register(id.host, id.partition); // partition connections, simpleconsumer
_state = state;
_stormConf = stormConf;
String jsonTopologyId = null;
Long jsonOffset = null;
String path = committedPath();
try {
Map<Object, Object> json = _state.readJSON(path);
LOG.info("Read partition information from: " + path + " --> " + json );
if (json != null) {
jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
jsonOffset = (Long) json.get("offset"); // zk commited offset
}
} catch (Throwable e) {
LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
}
if (jsonTopologyId == null || jsonOffset == null) { // zk , spoutConfig.startOffsetTime offset,Earliest Latest
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
LOG.info("No partition information found, using configuration to determine offset");
} else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
} else {
_committedTo = jsonOffset;
}
_emittedToOffset = _committedTo; // ,
}
2.kafka 에서 messages 를 읽 고waitingToEmit
kafka 에서 데 이 터 를 읽 을 수 있 습 니 다.ByteBufferMessageSet,emit 가 필요 한 msg,Message AndRealOffset 을waiting ToEmit,완성 되 지 않 은 offset 을 pending 업데이트 emitted ToOffset 에 놓 습 니 다.
private void fill() {
ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
for (MessageAndOffset msg : msgs) {
_pending.add(_emittedToOffset);
_waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
_emittedToOffset = msg.nextOffset();
}
}
그 중에서 fetch message 의 논 리 는 다음 과 같다.
public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
ByteBufferMessageSet msgs = null;
String topic = config.topic;
int partitionId = partition.partition;
for (int errors = 0; errors < 2 && msgs == null; errors++) { //
FetchRequestBuilder builder = new FetchRequestBuilder();
FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
clientId(config.clientId).build();
FetchResponse fetchResponse;
try {
fetchResponse = consumer.fetch(fetchRequest);
} catch (Exception e) {
if (e instanceof ConnectException) {
throw new FailedFetchException(e);
} else {
throw new RuntimeException(e);
}
}
if (fetchResponse.hasError()) { // offset outofrange case, getOffset earliest latest
KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
"retrying with default start offset time from configuration. " +
"configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
offset = startOffset;
} else {
String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
LOG.error(message);
throw new FailedFetchException(message);
}
} else {
msgs = fetchResponse.messageSet(topic, partitionId);
}
}
return msgs;
}
3. emit msg
부터waiting ToEmit 에서 msg 를 가 져 와 tuple 로 변환 한 후 collector.emit 를 통 해 보 냅 니 다.
public EmitState next(SpoutOutputCollector collector) {
if (_waitingToEmit.isEmpty()) {
fill();
}
while (true) {
MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
if (toEmit == null) {
return EmitState.NO_EMITTED;
}
Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if (tups != null) {
for (List<Object> tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
}
break;
} else {
ack(toEmit.offset);
}
}
if (!_waitingToEmit.isEmpty()) {
return EmitState.EMITTED_MORE_LEFT;
} else {
return EmitState.EMITTED_END;
}
}
tuple 을 바 꾸 는 과정 을 볼 수 있 습 니 다.kafkaConfig.scheme.deserialize 를 통 해 전환 하 는 것 을 볼 수 있 습 니 다.
public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
Iterable<List<Object>> tups;
ByteBuffer payload = msg.payload();
ByteBuffer key = msg.key();
if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
} else {
tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
}
return tups;
}
그래서 사용 할 때 scheme 논 리 를 정의 해 야 합 니 다.
spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());
public class TestMessageScheme implements Scheme {
private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);
@Override
public List<Object> deserialize(byte[] bytes) {
try {
String msg = new String(bytes, "UTF-8");
return new Values(msg);
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Cannot parse the provided message!");
}
return null;
}
@Override
public Fields getOutputFields() {
return new Fields("msg");
}
}
4.정기 적 인 commt offset
public void commit() {
long lastCompletedOffset = lastCompletedOffset();
if (lastCompletedOffset != lastCommittedOffset()) {
Map<Object, Object> data = ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
_committedTo = lastCompletedOffset;
} else {
LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}
}
5.마지막 으로 페 일 처리 에 주목
먼저 저 자 는 cache message 가 없고 cache offset 이기 때문에 fail 일 때 그 는 직접 replay 할 수 없습니다.그의 주석 에 쓰 여 있 습 니 다.이렇게 하지 않 는 이 유 는 메모리 가 터 질 까 봐 두 렵 기 때 문 입 니 다.
그래서 그의 방법 은 offset fail 일 때 바로emitted ToOffset 에서 현재 fail 로 굴 러 가 는 이 offset 다음 에 Kafka fetch 에서emitted ToOffset 을 읽 기 시작 하면 kafka 에 의존 하여 replay 를 하 는 것 이 좋 습 니 다.문 제 는 중복 문제 가 있 을 수 있 기 때문에 사용 할 때 중복 문 제 를 받 아들 일 수 있 는 지 를 고려 해 야 합 니 다.
public void fail(Long offset) {
//TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
// things might get crazy with lots of timeouts
if (_emittedToOffset > offset) {
_emittedToOffset = offset;
_pending.tailSet(offset).clear();
}
}
KafkaSpout
마지막 으로 KafkaSpout 을 보 겠 습 니 다.
1.초기 화의 관건 은 DynamicPartition Connections 와 를 초기 화 하 는 것 입 니 다.coordinator
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
Map stateConf = new HashMap(conf);
List<String> zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
}
}
봐 봐코 디 네 이 터 는 뭐 하 는 사람 이에 요?이것 은 매우 관건 적 입 니 다.왜냐하면 우 리 는 보통 여러 개의 동시 다발 적 인 kafkaspout 을 열 기 때 문 입 니 다.high-level 중의 consumer group 과 유사 합 니 다.이러한 동시 다발 적 인 스 레 드 가 충돌 하지 않도록 어떻게 보장 합 니까?하 이 레벨 과 같은 사고방식 을 사용 하면 하나의 partition 은 하나의 spout 소비 만 있 을 수 있 습 니 다.이렇게 하면 번 거 로 운 방문 상호 배척 문 제 를 처리 하지 않 습 니 다.(kafka 는 방문 상호 배척 이 번 거 롭 습 니 다.생각해 보 세 요.)현재 spout 의 task 수 와 partition 수 에 따라 분배 되 고 task 와 partitioin 의 대응 관 계 를 가 지 며 각 partition 에 Partition Manager 를 구축 합 니 다.
여기 서 먼저 totalTasks 를 볼 수 있 습 니 다.바로 현재 이 spout component 의 task size StaticCoordinator 와 ZkCoordinator 의 차 이 는 StaticHost 에서 Zk 에서 partition 까지 의 정 보 를 읽 는 것 입 니 다.간단하게 StaticCoordinator 가 실현 되 는 것 을 보 세 요.
public class StaticCoordinator implements PartitionCoordinator {
Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
List<PartitionManager> _allManagers = new ArrayList();
public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
StaticHosts hosts = (StaticHosts) config.hosts;
List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(hosts.getPartitionInformation(), totalTasks, taskIndex);
for (Partition myPartition : myPartitions) {// PartitionManager
_managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
}
_allManagers = new ArrayList(_managers.values());
}
@Override
public List<PartitionManager> getMyManagedPartitions() {
return _allManagers;
}
public PartitionManager getManager(Partition partition) {
return _managers.get(partition);
}
}
그 중에서 분 배 된 논 리 는 calculate Partitions ForTask 에 있 습 니 다.
public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
List<Partition> partitions = partitionInformation.getOrderedPartitions();
int numPartitions = partitions.size();
List<Partition> taskPartitions = new ArrayList<Partition>();
for (int i = taskIndex; i < numPartitions; i += totalTasks) {// ,
Partition taskPartition = partitions.get(i);
taskPartitions.add(taskPartition);
}
logPartitionMapping(totalTasks, taskIndex, taskPartitions);
return taskPartitions;
}
2. nextTuple
논리 적 으로 tricky 라 고 쓰 여 있 습 니 다.사실 한 파 티 션 에서 한 번 만 읽 으 면 되 기 때문에 for 는 EmitState.NO 입 니 다.EMITED 는 읽 기 에 성공 할 수 있 도록 뒤의 partition 을 옮 겨 다 녀 야 합 니 다.
@Override
public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size(); //_currPartitionIndex 0, partition
EmitState state = managers.get(_currPartitionIndex).next(_collector); // PartitonManager.next emit
if (state != EmitState.EMITTED_MORE_LEFT) { // EMITTED_MORE_LEFT , , , +1
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
}
if (state != EmitState.NO_EMITTED) { // EmitState.NO_EMITTED , partition , , break
break;
}
}
long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit(); // commit
}
}
정기 적 으로 commt 의 논 리 를 사용 하여 모든 Partition Manager 를 옮 겨 다 닙 니 다.
private void commit() {
_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
manager.commit();
}
}
3.Ack 와 Fail
파 티 션 관리자 직접 호출
@Override
public void ack(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.ack(id.offset);
}
}
@Override
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.fail(id.offset);
}
}
4.declare OutputFields 때문에 scheme 에서 정의 해 야 합 니 다.deserialize 와 getOutputFields
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(_spoutConfig.scheme.getOutputFields());
}
Metrics
Metrics 를 살 펴 보 겠 습 니 다.storm 에 metrics 를 넣 어 spout.open 에서 다음 두 metrics 를 초기 화 하 는 방법 을 배 우 는 것 이 관건 입 니 다.
kafka Offset 은 각 파 티 션 의 earliestTimeOffset,latestTimeOffset,latestEmitted Offset 를 반영 합 니 다.그 중에서 latestTimeOffset-latestEmitted Offset 은 spout lag 입 니 다.각 파 티 션 을 반영 하 는 것 외 에 모든 파 티 션 의 총 데 이 터 를 계산 합 니 다.
context.registerMetric("kafkaOffset", new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
@Override
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); // coordinator pms
Set<Partition> latestPartitions = new HashSet();
for (PartitionManager pm : pms) {
latestPartitions.add(pm.getPartition());
}
_kafkaOffsetMetric.refreshPartitions(latestPartitions); // partition metric partition
for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset()); // metric partition offset
}
return _kafkaOffsetMetric.getValueAndReset();
}
}, _spoutConfig.metricsTimeBucketSizeInSecs);
_kafkaOffsetMetric.getValueAndrReset,사실은 get 일 뿐,reset 필요 없습니다.
@Override
public Object getValueAndReset() {
try {
long totalSpoutLag = 0;
long totalEarliestTimeOffset = 0;
long totalLatestTimeOffset = 0;
long totalLatestEmittedOffset = 0;
HashMap ret = new HashMap();
if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
Partition partition = e.getKey();
SimpleConsumer consumer = _connections.getConnection(partition);
long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
long latestEmittedOffset = e.getValue();
long spoutLag = latestTimeOffset - latestEmittedOffset;
ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
totalSpoutLag += spoutLag;
totalEarliestTimeOffset += earliestTimeOffset;
totalLatestTimeOffset += latestTimeOffset;
totalLatestEmittedOffset += latestEmittedOffset;
}
ret.put("totalSpoutLag", totalSpoutLag);
ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
return ret;
} else {
LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
}
} catch (Throwable t) {
LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
}
return null;
}
kafkaPartition 은 Kafka fetch 데이터 의 상황 을 반영 합 니 다.fetchAPILatency Max,fetchAPILatency Mean,fetchAPICallCount 와 fetchAPIMessageCount.
context.registerMetric("kafkaPartition", new IMetric() {
@Override
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
for (PartitionManager pm : pms) {
concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
}
return concatMetricsDataMaps;
}
}, _spoutConfig.metricsTimeBucketSizeInSecs);
pm.getMetricsDataMap(),
public Map getMetricsDataMap() {
Map ret = new HashMap();
ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
return ret;
}
업 데 이 트 된 논 리 는 다음 과 같 습 니 다.
private void fill() {
long start = System.nanoTime();
ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_fetchAPILatencyMax.update(millis);
_fetchAPILatencyMean.update(millis);
_fetchAPICallCount.incr();
int numMessages = countMessages(msgs);
_fetchAPIMessageCount.incrBy(numMessages);
}
우리 가 카 프 카 를 읽 을 때,
우선,모든 파 티 션 의 읽 기 상황 에 관심 을 가 집 니 다.이것 은 Kafka Offset Metrics 를 얻 으 면 알 수 있 습 니 다.
또한,우 리 는 replay 데이터 가 필요 합 니 다.high-level 인 터 페 이 스 를 사용 할 때 시스템 을 통 해 제공 할 수 있 는 도 구 를 사용 할 수 있 습 니 다.여 기 는 어떻게 합 니까?
아래 코드 를 보 세 요.첫 번 째 if 는 설정 파일 에서 설정 을 읽 지 못 한 경우 입 니 다.두 번 째 else if 는 topology InstanceId 가 바 뀌 었 을 때 forceFromStart 가 true 일 때 startOffsetTime 이 지정 한 offset(Latest 또는 Earliest)이라는 topology InstanceId 를 가 져 옵 니 다.KafkaSpout 대상 이 생 성 될 때마다 무 작위 로 생 성 됩 니 다.Stringuuid = UUID.randomUUID().toString(); Spout 대상 은 토폴로지 제출 시 클 라 이언 트 에서 한 번 생 성 되 므 로 토폴로지 가 멈 추고 다시 시작 하면 이 아 이 디 는 반드시 변 경 됩 니 다.
그래서 forceFromStart 를 true 로 설정 하고 토폴로지 를 다시 시작 하면 replay 가 가능 할 것 같 습 니 다.
if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
LOG.info("No partition information found, using configuration to determine offset");
} else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
} else {
_committedTo = jsonOffset;
LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
}
코드 예
storm-kafka 의 문서 가 매우 나 빠 서 마지막 으로 사용 하 는 예 를 첨부 합 니 다.
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.BrokerHosts;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;
import storm.kafka.KeyValueScheme;
public static class SimplekVScheme implements KeyValueScheme { // scheme
@Override
public List<Object> deserializeKeyAndValue(byte[] key, byte[] value){
ArrayList tuple = new ArrayList();
tuple.add(key);
tuple.add(value);
return tuple;
}
@Override
public List<Object> deserialize(byte[] bytes) {
ArrayList tuple = new ArrayList();
tuple.add(bytes);
return tuple;
}
@Override
public Fields getOutputFields() {
return new Fields("key","value");
}
}
String topic = “test”; //
String zkRoot = “/kafkastorm”; //
String spoutId = “id”; // status ,/kafkastorm/id , id consumer group
BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new SimplekVScheme());
/*spoutConfig.zkServers = new ArrayList<String>(){{ // local ,
add("10.118.136.107");
}};
spoutConfig.zkPort = 2181;*/
spoutConfig.forceFromStart = false;
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
spoutConfig.metricsTimeBucketSizeInSecs = 6;
builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), 1);
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
storm Async loop died! & reconnect자세히 보기 storm이 슈퍼바이저가 리셋되었을 때 topology가 오류를 보고하여 모든 spout이 소비되지 않습니다. 로그 위에 대량의reconnection IP에 로그인하여 6703 포트에 두 개의 워커가 있...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.