storm-kafka-0.8-plus 소스 코드 분석

55014 단어 storm
https://github.com/wurstmeister/storm-kafka-0.8-plus
http://blog.csdn.net/xeseo/article/details/18615761
 
준비
GlobalPartitionInformation (storm.kafka.trident)
paritionid 와 broker 의 관 계 를 기록 합 니 다.
GlobalPartitionInformation info = new GlobalPartitionInformation();

info.addPartition(0, new Broker("10.1.110.24",9092));

info.addPartition(0, new Broker("10.1.110.21",9092));

GlobalPartition Information 를 정적 으로 생 성 할 수 있 습 니 다.위 코드 와 마찬가지 로 zk 에서 도 동적 으로 가 져 올 수 있 습 니 다.이 방식 을 추천 합 니 다.zk 에서 가 져 오 면 DynamicBrokersReader 를 사용 합 니 다.
 
DynamicBrokersReader
핵심 은 zk 에서 파 티 션 과 broker 의 대응 관 계 를 읽 는 것 입 니 다.zk 는 모두 curator 프레임 워 크 를 사용 합 니 다.
핵심 함수,
    /**
     * Get all partitions with their current leaders
     */
    public GlobalPartitionInformation getBrokerInfo() {
        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
        try {
            int numPartitionsForTopic = getNumPartitions(); // zk  partition   
            String brokerInfoPath = brokerPath();
            for (int partition = 0; partition < numPartitionsForTopic; partition++) {
                int leader = getLeaderFor(partition); // zk  partition leader broker
                String path = brokerInfoPath + "/" + leader;
                try {
                    byte[] brokerData = _curator.getData().forPath(path);
                    Broker hp = getBrokerHost(brokerData); // zk  broker host:port
                    globalPartitionInformation.addPartition(partition, hp);//  GlobalPartitionInformation 
                } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                    LOG.error("Node {} does not exist ", path);
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
        return globalPartitionInformation;
    }

 
DynamicPartitionConnections
모든 broker 의 connection 을 유지 하고 모든 broker 에 대응 하 는 paritions 를 기록 합 니 다.
핵심 데이터 구 조 는 모든 broker 에 하나의 Connection Info 를 유지 합 니 다.
Map<Broker, ConnectionInfo> _connections = new HashMap();

ConnectionInfo 의 정 의 는 이 브로커 를 연결 하 는 Simple Consumer 와 파 티 션 을 기록 하 는 set 를 포함 합 니 다.
    static class ConnectionInfo {
        SimpleConsumer consumer;
        Set<Integer> partitions = new HashSet();

        public ConnectionInfo(SimpleConsumer consumer) {
            this.consumer = consumer;
        }
    }

핵심 함수
    public SimpleConsumer register(Broker host, int partition) {
        if (!_connections.containsKey(host)) {
            _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
        }
        ConnectionInfo info = _connections.get(host);
        info.partitions.add(partition);
        return info.consumer;
    }

 
 
PartitionManager
핵심 논 리 는 partiton 의 읽 기 상 태 를 관리 하 는 데 사용 되 며 다음 변 수 를 먼저 이해 합 니 다.
Long _emittedToOffset;
Long _committedTo;
SortedSet<Long> _pending = new TreeSet<Long>();
LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();

kafka 는 하나의 partition 에 대해 반드시 offset 에서 어 렸 을 때 부터 순서대로 읽 었 을 것 입 니 다.또한 데 이 터 를 읽 지 않 고 잃 어 버 리 지 않도록 현재 상태 인 offset 을 zk 에 정기 적 으로 기록 합 니 다.
몇 개의 중간 상태,
kafka 에서 읽 은 offset,emitted ToOffset kafka 에서 읽 은 messages 는waiting ToEmit,이 list 를 넣 으 면 emit 가 될 거 라 고 생각 하기 때문에 emitted ToOffset 은 kafka 에서 읽 은 offset 이 라 고 볼 수 있 습 니 다.
성공 적 으로 처 리 된 offset,lastComplete offset message 는 storm 에서 처리 해 야 하기 때문에 fail 이 가능 하기 때문에 처리 하고 있 는 offset 은 캐 시 입 니 다pending 중의 만약pending 이 비어 있 으 면 lastComplete Offset=emittedToOffset 만약pending 이 비어 있 지 않 습 니 다.lastComplete Offset 는 pending list 의 첫 번 째 offset 입 니 다.뒤에 ack 를 기다 리 고 있 기 때 문 입 니 다.
    public long lastCompletedOffset() {
        if (_pending.isEmpty()) {
            return _emittedToOffset;
        } else {
            return _pending.first();
        }
    }

 
zk 에 기 록 된 offset,committed To 우 리 는 정기 적 으로 lastComplete Offset 를 zk 에 기록 해 야 합 니 다.그렇지 않 으 면 crash 후에 우 리 는 지난번 에 어디 까지 읽 었 는 지 모 릅 니 다.그래서committedTo <= lastCompletedOffset
전체 과정,
1.초기 화,
관건 은 파 티 션 을 등록 하고 offset 을 초기 화 하 는 것 입 니 다.어디서부터 읽 어야 할 지 알 수 있 습 니 다.
    public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
        _partition = id;
        _connections = connections;
        _spoutConfig = spoutConfig;
        _topologyInstanceId = topologyInstanceId;
        _consumer = connections.register(id.host, id.partition); //  partition connections,   simpleconsumer
        _state = state;
        _stormConf = stormConf;

        String jsonTopologyId = null;
        Long jsonOffset = null;
        String path = committedPath();
        try {
            Map<Object, Object> json = _state.readJSON(path);
            LOG.info("Read partition information from: " + path +  " --> " + json );
            if (json != null) {
                jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
                jsonOffset = (Long) json.get("offset"); //  zk   commited offset
            }
        } catch (Throwable e) {
            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
        }

        if (jsonTopologyId == null || jsonOffset == null) { // zk     ,    spoutConfig.startOffsetTime  offset,Earliest Latest
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
            LOG.info("No partition information found, using configuration to determine offset");
        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
        } else {
            _committedTo = jsonOffset;
        }

        _emittedToOffset = _committedTo; //     ,         
    }

 
2.kafka 에서 messages 를 읽 고waitingToEmit
kafka 에서 데 이 터 를 읽 을 수 있 습 니 다.ByteBufferMessageSet,emit 가 필요 한 msg,Message AndRealOffset 을waiting ToEmit,완성 되 지 않 은 offset 을 pending 업데이트 emitted ToOffset 에 놓 습 니 다.
    private void fill() {
        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
        for (MessageAndOffset msg : msgs) {
            _pending.add(_emittedToOffset);
            _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
            _emittedToOffset = msg.nextOffset();
        }
    }

그 중에서 fetch message 의 논 리 는 다음 과 같다.
    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
        ByteBufferMessageSet msgs = null;
        String topic = config.topic;
        int partitionId = partition.partition;
        for (int errors = 0; errors < 2 && msgs == null; errors++) { //       
            FetchRequestBuilder builder = new FetchRequestBuilder();
            FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
                    clientId(config.clientId).build();
            FetchResponse fetchResponse;
            try {
                fetchResponse = consumer.fetch(fetchRequest);
            } catch (Exception e) {
                if (e instanceof ConnectException) {
                    throw new FailedFetchException(e);
                } else {
                    throw new RuntimeException(e);
                }
            }
            if (fetchResponse.hasError()) { //     offset outofrange case,  getOffset earliest latest 
                KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
                if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
                    long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
                    LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
                            "retrying with default start offset time from configuration. " +
                            "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
                    offset = startOffset;
                } else {
                    String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                    LOG.error(message);
                    throw new FailedFetchException(message);
                }
            } else {
                msgs = fetchResponse.messageSet(topic, partitionId);
            }
        }
        return msgs;
    }

 
3. emit msg
부터waiting ToEmit 에서 msg 를 가 져 와 tuple 로 변환 한 후 collector.emit 를 통 해 보 냅 니 다.
    public EmitState next(SpoutOutputCollector collector) {
        if (_waitingToEmit.isEmpty()) {
            fill();
        }
        while (true) {
            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
            if (toEmit == null) {
                return EmitState.NO_EMITTED;
            }
            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
            if (tups != null) {
                for (List<Object> tup : tups) {
                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                }
                break;
            } else {
                ack(toEmit.offset);
            }
        }
        if (!_waitingToEmit.isEmpty()) {
            return EmitState.EMITTED_MORE_LEFT;
        } else {
            return EmitState.EMITTED_END;
        }
    }

tuple 을 바 꾸 는 과정 을 볼 수 있 습 니 다.kafkaConfig.scheme.deserialize 를 통 해 전환 하 는 것 을 볼 수 있 습 니 다.
    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
        Iterable<List<Object>> tups;
        ByteBuffer payload = msg.payload();
        ByteBuffer key = msg.key();
        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
        } else {
            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
        }
        return tups;
    }

그래서 사용 할 때 scheme 논 리 를 정의 해 야 합 니 다.
spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());

public class TestMessageScheme implements Scheme {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);
    
    @Override
    public List<Object> deserialize(byte[] bytes) {
    try {
        String msg = new String(bytes, "UTF-8");
        return new Values(msg);
    } catch (InvalidProtocolBufferException e) {
         LOGGER.error("Cannot parse the provided message!");
    }
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("msg");
    }
}

 
4.정기 적 인 commt offset
    public void commit() {
        long lastCompletedOffset = lastCompletedOffset();
        if (lastCompletedOffset != lastCommittedOffset()) {
            Map<Object, Object> data = ImmutableMap.builder()
                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                    .put("offset", lastCompletedOffset)
                    .put("partition", _partition.partition)
                    .put("broker", ImmutableMap.of("host", _partition.host.host,
                            "port", _partition.host.port))
                    .put("topic", _spoutConfig.topic).build();
            _state.writeJSON(committedPath(), data);
            _committedTo = lastCompletedOffset;
        } else {
            LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
        }
    }

 
5.마지막 으로 페 일 처리 에 주목
먼저 저 자 는 cache message 가 없고 cache offset 이기 때문에 fail 일 때 그 는 직접 replay 할 수 없습니다.그의 주석 에 쓰 여 있 습 니 다.이렇게 하지 않 는 이 유 는 메모리 가 터 질 까 봐 두 렵 기 때 문 입 니 다.
그래서 그의 방법 은 offset fail 일 때 바로emitted ToOffset 에서 현재 fail 로 굴 러 가 는 이 offset 다음 에 Kafka fetch 에서emitted ToOffset 을 읽 기 시작 하면 kafka 에 의존 하여 replay 를 하 는 것 이 좋 습 니 다.문 제 는 중복 문제 가 있 을 수 있 기 때문에 사용 할 때 중복 문 제 를 받 아들 일 수 있 는 지 를 고려 해 야 합 니 다.
    public void fail(Long offset) {
        //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
        // things might get crazy with lots of timeouts
        if (_emittedToOffset > offset) {
            _emittedToOffset = offset;
            _pending.tailSet(offset).clear();
        }
    }

 
KafkaSpout
마지막 으로 KafkaSpout 을 보 겠 습 니 다.
1.초기 화의 관건 은 DynamicPartition Connections 와 를 초기 화 하 는 것 입 니 다.coordinator
    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
        _collector = collector;

        Map stateConf = new HashMap(conf);
        List<String> zkServers = _spoutConfig.zkServers;
        if (zkServers == null) {
            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
        }
        Integer zkPort = _spoutConfig.zkPort;
        if (zkPort == null) {
            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
        }
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
        _state = new ZkState(stateConf);

        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));

        // using TransactionalState like this is a hack
        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
        if (_spoutConfig.hosts instanceof StaticHosts) {
            _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        } else {
            _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        }
    }

봐 봐코 디 네 이 터 는 뭐 하 는 사람 이에 요?이것 은 매우 관건 적 입 니 다.왜냐하면 우 리 는 보통 여러 개의 동시 다발 적 인 kafkaspout 을 열 기 때 문 입 니 다.high-level 중의 consumer group 과 유사 합 니 다.이러한 동시 다발 적 인 스 레 드 가 충돌 하지 않도록 어떻게 보장 합 니까?하 이 레벨 과 같은 사고방식 을 사용 하면 하나의 partition 은 하나의 spout 소비 만 있 을 수 있 습 니 다.이렇게 하면 번 거 로 운 방문 상호 배척 문 제 를 처리 하지 않 습 니 다.(kafka 는 방문 상호 배척 이 번 거 롭 습 니 다.생각해 보 세 요.)현재 spout 의 task 수 와 partition 수 에 따라 분배 되 고 task 와 partitioin 의 대응 관 계 를 가 지 며 각 partition 에 Partition Manager 를 구축 합 니 다.
여기 서 먼저 totalTasks 를 볼 수 있 습 니 다.바로 현재 이 spout component 의 task size StaticCoordinator 와 ZkCoordinator 의 차 이 는 StaticHost 에서 Zk 에서 partition 까지 의 정 보 를 읽 는 것 입 니 다.간단하게 StaticCoordinator 가 실현 되 는 것 을 보 세 요.
public class StaticCoordinator implements PartitionCoordinator {
    Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
    List<PartitionManager> _allManagers = new ArrayList();

    public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
        StaticHosts hosts = (StaticHosts) config.hosts;
        List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(hosts.getPartitionInformation(), totalTasks, taskIndex);
        for (Partition myPartition : myPartitions) {//   PartitionManager
            _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
        }
        _allManagers = new ArrayList(_managers.values());
    }

    @Override
    public List<PartitionManager> getMyManagedPartitions() {
        return _allManagers;
    }

    public PartitionManager getManager(Partition partition) {
        return _managers.get(partition);
    }

}

그 중에서 분 배 된 논 리 는 calculate Partitions ForTask 에 있 습 니 다.
    public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
        Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
        List<Partition> partitions = partitionInformation.getOrderedPartitions();
        int numPartitions = partitions.size();
        List<Partition> taskPartitions = new ArrayList<Partition>();
        for (int i = taskIndex; i < numPartitions; i += totalTasks) {//     ,
            Partition taskPartition = partitions.get(i);
            taskPartitions.add(taskPartition);
        }
        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
        return taskPartitions;
    }

 
2. nextTuple
논리 적 으로 tricky 라 고 쓰 여 있 습 니 다.사실 한 파 티 션 에서 한 번 만 읽 으 면 되 기 때문에 for 는 EmitState.NO 입 니 다.EMITED 는 읽 기 에 성공 할 수 있 도록 뒤의 partition 을 옮 겨 다 녀 야 합 니 다.
    @Override
    public void nextTuple() {
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); i++) {

            // in case the number of managers decreased
            _currPartitionIndex = _currPartitionIndex % managers.size(); //_currPartitionIndex   0,       partition
            EmitState state = managers.get(_currPartitionIndex).next(_collector); //  PartitonManager.next emit  
            if (state != EmitState.EMITTED_MORE_LEFT) { // EMITTED_MORE_LEFT ,    ,     ,   +1
                _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
            }
            if (state != EmitState.NO_EMITTED) { // EmitState.NO_EMITTED ,  partition       ,         ,    break
                break;
            }
        }

        long now = System.currentTimeMillis();
        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
            commit(); //  commit
        }
    }

정기 적 으로 commt 의 논 리 를 사용 하여 모든 Partition Manager 를 옮 겨 다 닙 니 다.
    private void commit() {
        _lastUpdateMs = System.currentTimeMillis();
        for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
            manager.commit();
        }
    }

 
3.Ack 와 Fail
파 티 션 관리자 직접 호출
    @Override
    public void ack(Object msgId) {
        KafkaMessageId id = (KafkaMessageId) msgId;
        PartitionManager m = _coordinator.getManager(id.partition);
        if (m != null) {
            m.ack(id.offset);
        }
    }

    @Override
    public void fail(Object msgId) {
        KafkaMessageId id = (KafkaMessageId) msgId;
        PartitionManager m = _coordinator.getManager(id.partition);
        if (m != null) {
            m.fail(id.offset);
        }
    }

 
4.declare OutputFields 때문에 scheme 에서 정의 해 야 합 니 다.deserialize 와 getOutputFields
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(_spoutConfig.scheme.getOutputFields());
    }

 
Metrics
Metrics 를 살 펴 보 겠 습 니 다.storm 에 metrics 를 넣 어 spout.open 에서 다음 두 metrics 를 초기 화 하 는 방법 을 배 우 는 것 이 관건 입 니 다.
kafka Offset 은 각 파 티 션 의 earliestTimeOffset,latestTimeOffset,latestEmitted Offset 를 반영 합 니 다.그 중에서 latestTimeOffset-latestEmitted Offset 은 spout lag 입 니 다.각 파 티 션 을 반영 하 는 것 외 에 모든 파 티 션 의 총 데 이 터 를 계산 합 니 다.
        context.registerMetric("kafkaOffset", new IMetric() {
            KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);

            @Override
            public Object getValueAndReset() {
                List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); // coordinator  pms   
                Set<Partition> latestPartitions = new HashSet();
                for (PartitionManager pm : pms) {
                    latestPartitions.add(pm.getPartition());
                }
                _kafkaOffsetMetric.refreshPartitions(latestPartitions); //     partition    metric       partition     
                for (PartitionManager pm : pms) {
                    _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset()); //  metric   partition      offset
                }
                return _kafkaOffsetMetric.getValueAndReset();
            }
        }, _spoutConfig.metricsTimeBucketSizeInSecs);

_kafkaOffsetMetric.getValueAndrReset,사실은 get 일 뿐,reset 필요 없습니다.
@Override
        public Object getValueAndReset() {
            try {
                long totalSpoutLag = 0;
                long totalEarliestTimeOffset = 0;
                long totalLatestTimeOffset = 0;
                long totalLatestEmittedOffset = 0;
                HashMap ret = new HashMap();
                if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
                    for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
                        Partition partition = e.getKey();
                        SimpleConsumer consumer = _connections.getConnection(partition);
                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
                        long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
                        long latestEmittedOffset = e.getValue();
                        long spoutLag = latestTimeOffset - latestEmittedOffset;
                        ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
                        ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
                        ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
                        ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
                        totalSpoutLag += spoutLag;
                        totalEarliestTimeOffset += earliestTimeOffset;
                        totalLatestTimeOffset += latestTimeOffset;
                        totalLatestEmittedOffset += latestEmittedOffset;
                    }
                    ret.put("totalSpoutLag", totalSpoutLag);
                    ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
                    ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
                    ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
                    return ret;
                } else {
                    LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
                }
            } catch (Throwable t) {
                LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
            }
            return null;
        }

 
kafkaPartition 은 Kafka fetch 데이터 의 상황 을 반영 합 니 다.fetchAPILatency Max,fetchAPILatency Mean,fetchAPICallCount 와 fetchAPIMessageCount.
        context.registerMetric("kafkaPartition", new IMetric() {
            @Override
            public Object getValueAndReset() {
                List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
                Map concatMetricsDataMaps = new HashMap();
                for (PartitionManager pm : pms) {
                    concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
                }
                return concatMetricsDataMaps;
            }
        }, _spoutConfig.metricsTimeBucketSizeInSecs);

pm.getMetricsDataMap(),
public Map getMetricsDataMap() {
        Map ret = new HashMap();
        ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
        ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
        ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
        ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
        return ret;
    }

업 데 이 트 된 논 리 는 다음 과 같 습 니 다.
    private void fill() {
        long start = System.nanoTime();
        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
        long end = System.nanoTime();
        long millis = (end - start) / 1000000;
        _fetchAPILatencyMax.update(millis);
        _fetchAPILatencyMean.update(millis);
        _fetchAPICallCount.incr();
        int numMessages = countMessages(msgs);
        _fetchAPIMessageCount.incrBy(numMessages);
}

 
우리 가 카 프 카 를 읽 을 때,
우선,모든 파 티 션 의 읽 기 상황 에 관심 을 가 집 니 다.이것 은 Kafka Offset Metrics 를 얻 으 면 알 수 있 습 니 다.
또한,우 리 는 replay 데이터 가 필요 합 니 다.high-level 인 터 페 이 스 를 사용 할 때 시스템 을 통 해 제공 할 수 있 는 도 구 를 사용 할 수 있 습 니 다.여 기 는 어떻게 합 니까?
아래 코드 를 보 세 요.첫 번 째 if 는 설정 파일 에서 설정 을 읽 지 못 한 경우 입 니 다.두 번 째 else if 는 topology InstanceId 가 바 뀌 었 을 때 forceFromStart 가 true 일 때 startOffsetTime 이 지정 한 offset(Latest 또는 Earliest)이라는 topology InstanceId 를 가 져 옵 니 다.KafkaSpout 대상 이 생 성 될 때마다 무 작위 로 생 성 됩 니 다.Stringuuid = UUID.randomUUID().toString(); Spout 대상 은 토폴로지 제출 시 클 라 이언 트 에서 한 번 생 성 되 므 로 토폴로지 가 멈 추고 다시 시작 하면 이 아 이 디 는 반드시 변 경 됩 니 다.
그래서 forceFromStart 를 true 로 설정 하고 토폴로지 를 다시 시작 하면 replay 가 가능 할 것 같 습 니 다.
        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
            LOG.info("No partition information found, using configuration to determine offset");
        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
        } else {
            _committedTo = jsonOffset;
            LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
        }

 
코드 예
storm-kafka 의 문서 가 매우 나 빠 서 마지막 으로 사용 하 는 예 를 첨부 합 니 다.
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.BrokerHosts;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;
import storm.kafka.KeyValueScheme;

    public static class SimplekVScheme implements KeyValueScheme { //  scheme
        @Override
        public List<Object> deserializeKeyAndValue(byte[] key, byte[] value){
            ArrayList tuple = new ArrayList();
            tuple.add(key);
            tuple.add(value);
            return tuple;
        }
        
        @Override
        public List<Object> deserialize(byte[] bytes) {
            ArrayList tuple = new ArrayList();
            tuple.add(bytes);
            return tuple;
        }

        @Override
        public Fields getOutputFields() {
            return new Fields("key","value");
        }

    }   

        String topic = “test”;  //
        String zkRoot = “/kafkastorm”; //
        String spoutId = “id”; //   status    ,/kafkastorm/id  ,  id  consumer group
        
        BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181"); 

        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
        spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new SimplekVScheme());
        
        /*spoutConfig.zkServers = new ArrayList<String>(){{ //   local            ,     
            add("10.118.136.107");
        }};
        spoutConfig.zkPort = 2181;*/
        
        spoutConfig.forceFromStart = false; 
        spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();    
        spoutConfig.metricsTimeBucketSizeInSecs = 6;

        builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), 1);

좋은 웹페이지 즐겨찾기