Storm-소스 분석 -Streaming Grouping(backtype.storm.daemon.executor)

10720 단어 executor
excutor는 outbounding 메시지를 보낼 때nextcomponent에 보낼 tasks를 결정해야 합니다. 여기에streaming grouping을 사용해야 합니다.
 

1. mk-grouper


direct grouping을 제외하고 되돌아오는 것은grouper function입니다. 이 grouper function을 실행하면 target tasks list direct grouping이 되돌아옵니다.
(defn- mk-grouper
  "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index."
  [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks]
  (let [num-tasks (count target-tasks)
        random (Random.)
        target-tasks (vec (sort target-tasks))]
    (condp = (thrift/grouping-type thrift-grouping)
      :fields                                         ;;1.1 fields-grouping,  field grouping
        (if (thrift/global-grouping? thrift-grouping) ;;1.2 fields , global-grouping, tuple task
          (fn [task-id tuple]
            ;; It's possible for target to have multiple tasks if it reads multiple sources
            (first target-tasks))                     ;; global-grouping, task, taskid task
          (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))] ;; group-fields  
            (mk-fields-grouper out-fields group-fields target-tasks)
            ))
      :all
        (fn [task-id tuple] target-tasks) ;;1.3 all-grouping,  ,  task,  target-tasks
      :shuffle
        (mk-shuffle-grouper target-tasks) ;;1.4 shuffle-grouping
      :local-or-shuffle                   ;;1.5 local ,  tasks local shuffle local tasks
        (let [same-tasks (set/intersection
                           (set target-tasks)
                           (set (.getThisWorkerTasks context)))]
          (if-not (empty? same-tasks)
            (mk-shuffle-grouper (vec same-tasks))
            (mk-shuffle-grouper target-tasks)))
      :none                              ;;1.6  random, target-tasks    
        (fn [task-id tuple]
          (let [i (mod (.nextInt random) num-tasks)]
            (.get target-tasks i)
            ))
      :custom-object
        (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]
          (mk-custom-grouper grouping context component-id stream-id target-tasks))
      :custom-serialized
        (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))]
          (mk-custom-grouper grouping context component-id stream-id target-tasks))
      :direct
        :direct
      )))

 

1.1 fields-groups


사용select는 그룹-fields가 tuple에 대응하는valueslist를 추출합니다. 여러 개의 필드를 사용하여 그룹을 만들 수 있습니다. 그룹은tuple/list-hash-code를 사용하고valueslist에hashcode를num-tasks에mod를 추출하고task-getter를 사용하여 대응하는target-tasks를 추출합니다.
(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List target-tasks]
  (let [num-tasks (count target-tasks)
        task-getter (fn [i] (.get target-tasks i))]
    (fn [task-id ^List values]
      (-> (.select out-fields group-fields values)
          tuple/list-hash-code
          (mod num-tasks)
          task-getter))))

Fields 클래스는fields의list를 저장하는 것 외에 빠른 필드 읽기에 사용되는 index index의 생성도 있습니다. 간단합니다. 필드를 기록하고 자연 정렬을 사용할 때 select를 호출하여 어떤 필드의value가 필요한지, 그리고 tuple가 index에서fields의 index 값을 읽어내서 tuple에서 대응하는 value를 직접 읽는 것입니다(물론 tuple를 생성할 때는fields의 순서를 설치하여 생성해야 합니다)
public class Fields implements Iterable<String>, Serializable {
    private List<String> _fields;
    private Map<String, Integer> _index = new HashMap<String, Integer>();

    private void index() {
        for(int i=0; i<_fields.size(); i++) {
            _index.put(_fields.get(i), i);
        }
    }

    public List<Object> select(Fields selector, List<Object> tuple) {
        List<Object> ret = new ArrayList<Object>(selector.size());
        for(String s: selector) {
            ret.add(tuple.get(_index.get(s)));
        }
        return ret;
    }
}

1.2 globle-groups


fields grouping, 그러나field가 비어 있는 것은 globle grouping을 대표합니다. 모든tuple는task로 발송됩니다.
기본 첫 번째 task 선택

1.3 all-groups


모든 tasks로 보내기

1.4 shuffle-grouper


비교적 간단한 랜덤으로 직접 값을 얻는 방식(none-grouping과 구별)이 없습니다.loadbalance를 고려하여 다음과 같은 위조 랜덤 실현 방식을 사용합니다
target-tasks에 대해 우선 랜덤 shuffle, 순서를 어지럽히는 acquire-random-range-id, 모든 task를 순서대로 읽습니다. 이렇게 하면 순서는 랜덤이지만 모든 task는curr가 경계를 넘을 때curr를 비우고 새 shuffle target-tasks에서 선택됩니다.
(defn- mk-shuffle-grouper [^List target-tasks]
  (let [choices (rotating-random-range target-tasks)]
    (fn [task-id tuple]
      (acquire-random-range-id choices))))
(defn rotating-random-range [choices]
  (let [rand (Random.)
        choices (ArrayList. choices)]
    (Collections/shuffle choices rand)
    [(MutableInt. -1) choices rand]))

(defn acquire-random-range-id [[^MutableInt curr ^List state ^Random rand]]
  (when (>= (.increment curr) (.size state))
    (.set curr 0)
    (Collections/shuffle state rand))
  (.get state (.get curr)))

1.5 local-or-shuffle

localtasks 우선 선택 및 shuffle 방식

1.6 none-grouping

nocare grouping 방식, 현재의 실현은 간단한 random

1.7 customing-grouping


CustomStreamGrouping을 사용자 정의할 수 있습니다. 관건은 chooseTasks 논리를 정의하여 자신의tasks choose 정책을 실현하는 것입니다.
(defn- mk-custom-grouper [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks]
  (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks)
  (fn [task-id ^List values]
    (.chooseTasks grouping task-id values)
    ))
public interface CustomStreamGrouping extends Serializable {   
   /**
     * Tells the stream grouping at runtime the tasks in the target bolt.
     * This information should be used in chooseTasks to determine the target tasks.
     * 
     * It also tells the grouping the metadata on the stream this grouping will be used on.
     */
   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
    
   /**
     * This function implements a custom stream grouping. It takes in as input
     * the number of tasks in the target bolt in prepare and returns the
     * tasks to send the tuples to.
     * 
     * @param values the values to group on
     */
   List<Integer> chooseTasks(int taskId, List<Object> values); 
}

:custom-object와:custom-serialized의 차이는thrift-grouping이 서열화되었는지 없는지 object를 직접 읽을 수 있습니다. 그렇지 않으면object로 반서열해야 합니다.

1.8 direct-grouping


producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams.
여기로 바로 돌아갑니다:direct,direct-grouping, 어느 tasks로 보낼지 프로덕터가 tuple을 생성할 때 결정했기 때문에 여기는 어떤 그룹핑과 관련된 작업도 할 필요가 없습니다

2 stream->component->grouper


outbound-components 하나의 executor는 하나의 component에만 대응하기 때문에 현재 executor의component-id getTargets를 제시하면 모든 outbound components, [streamid, [target-componentid, grouping]를 얻을 수 있습니다.]
outbound-groupings를 호출하여 최종적으로 [streamid [component grouper]의hashmap을 되돌려주고 executor-data의stream->component->grouper에 값을 부여합니다
task는 최종적으로 메시지를 보낼 때stream->component->grouper를 통해 진정한 target taskslist를 생성합니다
(defn outbound-components
  "Returns map of stream id to component id to grouper"
  [^WorkerTopologyContext worker-context component-id]
  (->> (.getTargets worker-context component-id) ;;[streamid, [target-componentid, grouping]]
        clojurify-structure
        (map (fn [[stream-id component->grouping]]
               [stream-id
                (outbound-groupings
                  worker-context
                  component-id
                  stream-id
                  (.getComponentOutputFields worker-context component-id stream-id)
                  component->grouping)]))
         (into {})
         (HashMap.)))

 
outbound-groupings는 모든 task가 비어 있지 않은 target component에 mk-grouper mk-grouper를 호출하여 그룹 fn을 되돌려줍니다. 그래서 최종 되돌려줍니다. [component, 그룹]
(defn- outbound-groupings [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping]
  (->> component->grouping
       (filter-key #(-> worker-context  ;;component tasks 0
                        (.getComponentTasks %)
                        count
                        pos?))
       (map (fn [[component tgrouping]]
               [component
                (mk-grouper worker-context
                            this-component-id
                            stream-id
                            out-fields
                            tgrouping
                            (.getComponentTasks worker-context component)
                            )]))
       (into {})
       (HashMap.)))

좋은 웹페이지 즐겨찾기