Storm - 소스 분석 - Thrift 사용

14707 단어 thrift
1 IDL
먼저 storm. thrift 입 니 다. IDL 에서 사용 하 는 데이터 구조 와 서 비 스 를 정의 한 다음 에 backtype. storm. generated 는 IDL 에서 Thrift 를 통 해 자동 으로 전 환 된 자바 코드 를 저장 합 니 다.
예 를 들 어 Nimbus service 가 IDL 에서 의 정의 는
service Nimbus {
  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void killTopology(1: string name) throws (1: NotAliveException e);
  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
  void activate(1: string name) throws (1: NotAliveException e);
  void deactivate(1: string name) throws (1: NotAliveException e);
  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

  // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
  string beginFileUpload();
  void uploadChunk(1: string location, 2: binary chunk);
  void finishFileUpload(1: string location);
  
  string beginFileDownload(1: string file);
  //can stop downloading chunks when receive 0-length byte array back
  binary downloadChunk(1: string id);

  // returns json
  string getNimbusConf();
  // stats functions
  ClusterSummary getClusterInfo();
  TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
  //returns json
  string getTopologyConf(1: string id) throws (1: NotAliveException e);
  StormTopology getTopology(1: string id) throws (1: NotAliveException e);
  StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}

Nimbus. java 에 대응 하 는 자바 코드 는 다음 과 같 습 니 다.
public class Nimbus {

  public interface Iface {

    public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;

    public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;

    public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException, org.apache.thrift7.TException;

    public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void rebalance(String name, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException;

    public String beginFileUpload() throws org.apache.thrift7.TException;

    public void uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift7.TException;

    public void finishFileUpload(String location) throws org.apache.thrift7.TException;

    public String beginFileDownload(String file) throws org.apache.thrift7.TException;

    public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;

    public String getNimbusConf() throws org.apache.thrift7.TException;

    public ClusterSummary getClusterInfo() throws org.apache.thrift7.TException;

    public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;

    public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

  }

 
2 Client
1. 먼저 클 라 이언 트 를 얻 기,
NimbusClient client = NimbusClient.getConfiguredClient(conf);

backtype. storm. utils 아래 client. getConfiguredClient 의 논 리 를 보십시오. 설정 에서 nimbus 의 host: port 를 꺼 내 고 new NimbusClient 를 꺼 냅 니 다.
    public static NimbusClient getConfiguredClient(Map conf) {
        try {
            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
            int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
            return new NimbusClient(conf, nimbusHost, nimbusPort);
        } catch (TTransportException ex) {
            throw new RuntimeException(ex);
        }
    }

NimbusClient 는 ThriftClient 에서 계승 되 었 습 니 다. Public class NimbusClient extends ThriftClient ThriftClient 는 또 무엇 을 했 습 니까?관건 은 데이터 서열 화 를 어떻게 하고 데 이 터 를 remote 에 전송 하 는 지 입 니 다. Thrift 가 Transport 와 Protocol 에 대한 패 키 징 은 Transport 에 대한 것 입 니 다. 사실은 Socket 에 대한 패 키 징 입 니 다. TSocket (host, port) 을 사용 한 다음 protocol 에 대해 서 는 기본적으로 TBinary Protocol 을 사용 합 니 다. 지정 하지 않 으 면
    public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {
        try {
            //locate login configuration 
            Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);

            //construct a transport plugin
            ITransportPlugin  transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf);

            //create a socket with server
            if(host==null) {
                throw new IllegalArgumentException("host is not set");
            }
            if(port<=0) {
                throw new IllegalArgumentException("invalid port: "+port);
            }            
            TSocket socket = new TSocket(host, port);
            if(timeout!=null) {
                socket.setTimeout(timeout);
            }
            final TTransport underlyingTransport = socket;

            //establish client-server transport via plugin
            _transport =  transportPlugin.connect(underlyingTransport, host); 
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        _protocol = null;
        if (_transport != null)
            _protocol = new  TBinaryProtocol(_transport);
    }

 
2. 임의의 RPC 를 호출 하면 submitTopology With Opts 를 보십시오.
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); 

위의 Nimbus interface 에 이 방법의 정의 가 있 음 을 알 수 있 습 니 다. 또한 Thrift 는 자바 interface 를 자동 으로 생 성 할 뿐만 아니 라 전체 RPC client 엔 드 의 실현 도 제공 합 니 다.
    public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException
    {
      send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
      recv_submitTopologyWithOpts();
    }

두 걸음 으로 나 누 어
우선 sendsubmitTopology WithOpts, sendBase 호출
이어서, recvsubmitTopology WithOpts, receiveBase 호출
  protected void sendBase(String methodName, TBase args) throws TException {
    oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));
    args.write(oprot_);
    oprot_.writeMessageEnd();
    oprot_.getTransport().flush();
  }

  protected void receiveBase(TBase result, String methodName) throws TException {
    TMessage msg = iprot_.readMessageBegin();
    if (msg.type == TMessageType.EXCEPTION) {
      TApplicationException x = TApplicationException.read(iprot_);
      iprot_.readMessageEnd();
      throw x;
    }
    if (msg.seqid != seqid_) {
      throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
    }
    result.read(iprot_);
    iprot_.readMessageEnd();
  }

Thrift 가 protocol 에 대한 패 키 징 을 알 수 있 습 니 다. 스스로 직렬 화 를 처리 하지 않 고 protocol 의 인 터 페 이 스 를 호출 하여 해결 할 필요 가 없습니다.
 
3 Server
Thrift 의 강력 한 점 은 전체 프로 토 콜 스 택 을 실현 하 는 것 입 니 다. IDL 의 전환 뿐만 아니 라 server 에 대해 서도 다양한 실현 을 제공 합 니 다. 다음은 Nimbus server 엔 드 를 살 펴 보 세 요. clojure 로 작 성 된 것 입 니 다. 그 중에서 Thrift 로 포 장 된 Nonblocking Server Socket, THsHaServer, TBinary Protocol, Proccessor 는 매우 간단 합 니 다. 그 중에서 processor 는 service - handle 로 recv 에서 얻 은 데 이 터 를 처리 합 니 다.따라서 사용자 로 서 service - handle 에서 Nimbus $Iface 를 실현 해 야 합 니 다. 다른 server 와 관련 된 것 은 Thrift 가 모두 봉 인 했 습 니 다. 여기 서 사용 하 는 IDL 도 backtype. storm. generated 에 있 습 니 다. clojure 는 JVM 을 기반 으로 하기 때문에 IDL 은 자바 로 전환 하면 됩 니 다.
(defn launch-server! [conf nimbus]
  (validate-distributed-mode! conf)
  (let [service-handler (service-handler conf nimbus)
        options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
                    (THsHaServer$Args.)
                    (.workerThreads 64)
                    (.protocolFactory (TBinaryProtocol$Factory.))
                    (.processor (Nimbus$Processor. service-handler))
                    )
       server (THsHaServer. options)]
    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
    (log-message "Starting Nimbus server...")
    (.serve server)))

좋은 웹페이지 즐겨찾기