Storm - 소스 분석 - Thrift 사용
14707 단어 thrift
먼저 storm. thrift 입 니 다. IDL 에서 사용 하 는 데이터 구조 와 서 비 스 를 정의 한 다음 에 backtype. storm. generated 는 IDL 에서 Thrift 를 통 해 자동 으로 전 환 된 자바 코드 를 저장 합 니 다.
예 를 들 어 Nimbus service 가 IDL 에서 의 정의 는
service Nimbus {
void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
void killTopology(1: string name) throws (1: NotAliveException e);
void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
void activate(1: string name) throws (1: NotAliveException e);
void deactivate(1: string name) throws (1: NotAliveException e);
void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);
// need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
string beginFileUpload();
void uploadChunk(1: string location, 2: binary chunk);
void finishFileUpload(1: string location);
string beginFileDownload(1: string file);
//can stop downloading chunks when receive 0-length byte array back
binary downloadChunk(1: string id);
// returns json
string getNimbusConf();
// stats functions
ClusterSummary getClusterInfo();
TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
//returns json
string getTopologyConf(1: string id) throws (1: NotAliveException e);
StormTopology getTopology(1: string id) throws (1: NotAliveException e);
StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}
Nimbus. java 에 대응 하 는 자바 코드 는 다음 과 같 습 니 다.
public class Nimbus {
public interface Iface {
public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;
public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException;
public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;
public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException, org.apache.thrift7.TException;
public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;
public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;
public void rebalance(String name, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException;
public String beginFileUpload() throws org.apache.thrift7.TException;
public void uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift7.TException;
public void finishFileUpload(String location) throws org.apache.thrift7.TException;
public String beginFileDownload(String file) throws org.apache.thrift7.TException;
public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;
public String getNimbusConf() throws org.apache.thrift7.TException;
public ClusterSummary getClusterInfo() throws org.apache.thrift7.TException;
public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;
public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;
public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;
public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;
}
2 Client
1. 먼저 클 라 이언 트 를 얻 기,
NimbusClient client = NimbusClient.getConfiguredClient(conf);
backtype. storm. utils 아래 client. getConfiguredClient 의 논 리 를 보십시오. 설정 에서 nimbus 의 host: port 를 꺼 내 고 new NimbusClient 를 꺼 냅 니 다.
public static NimbusClient getConfiguredClient(Map conf) {
try {
String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
return new NimbusClient(conf, nimbusHost, nimbusPort);
} catch (TTransportException ex) {
throw new RuntimeException(ex);
}
}
NimbusClient 는 ThriftClient 에서 계승 되 었 습 니 다. Public class NimbusClient extends ThriftClient ThriftClient 는 또 무엇 을 했 습 니까?관건 은 데이터 서열 화 를 어떻게 하고 데 이 터 를 remote 에 전송 하 는 지 입 니 다. Thrift 가 Transport 와 Protocol 에 대한 패 키 징 은 Transport 에 대한 것 입 니 다. 사실은 Socket 에 대한 패 키 징 입 니 다. TSocket (host, port) 을 사용 한 다음 protocol 에 대해 서 는 기본적으로 TBinary Protocol 을 사용 합 니 다. 지정 하지 않 으 면
public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {
try {
//locate login configuration
Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);
//construct a transport plugin
ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf);
//create a socket with server
if(host==null) {
throw new IllegalArgumentException("host is not set");
}
if(port<=0) {
throw new IllegalArgumentException("invalid port: "+port);
}
TSocket socket = new TSocket(host, port);
if(timeout!=null) {
socket.setTimeout(timeout);
}
final TTransport underlyingTransport = socket;
//establish client-server transport via plugin
_transport = transportPlugin.connect(underlyingTransport, host);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
_protocol = null;
if (_transport != null)
_protocol = new TBinaryProtocol(_transport);
}
2. 임의의 RPC 를 호출 하면 submitTopology With Opts 를 보십시오.
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
위의 Nimbus interface 에 이 방법의 정의 가 있 음 을 알 수 있 습 니 다. 또한 Thrift 는 자바 interface 를 자동 으로 생 성 할 뿐만 아니 라 전체 RPC client 엔 드 의 실현 도 제공 합 니 다.
public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift7.TException
{
send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
recv_submitTopologyWithOpts();
}
두 걸음 으로 나 누 어
우선 sendsubmitTopology WithOpts, sendBase 호출
이어서, recvsubmitTopology WithOpts, receiveBase 호출
protected void sendBase(String methodName, TBase args) throws TException {
oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));
args.write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
}
protected void receiveBase(TBase result, String methodName) throws TException {
TMessage msg = iprot_.readMessageBegin();
if (msg.type == TMessageType.EXCEPTION) {
TApplicationException x = TApplicationException.read(iprot_);
iprot_.readMessageEnd();
throw x;
}
if (msg.seqid != seqid_) {
throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
}
result.read(iprot_);
iprot_.readMessageEnd();
}
Thrift 가 protocol 에 대한 패 키 징 을 알 수 있 습 니 다. 스스로 직렬 화 를 처리 하지 않 고 protocol 의 인 터 페 이 스 를 호출 하여 해결 할 필요 가 없습니다.
3 Server
Thrift 의 강력 한 점 은 전체 프로 토 콜 스 택 을 실현 하 는 것 입 니 다. IDL 의 전환 뿐만 아니 라 server 에 대해 서도 다양한 실현 을 제공 합 니 다. 다음은 Nimbus server 엔 드 를 살 펴 보 세 요. clojure 로 작 성 된 것 입 니 다. 그 중에서 Thrift 로 포 장 된 Nonblocking Server Socket, THsHaServer, TBinary Protocol, Proccessor 는 매우 간단 합 니 다. 그 중에서 processor 는 service - handle 로 recv 에서 얻 은 데 이 터 를 처리 합 니 다.따라서 사용자 로 서 service - handle 에서 Nimbus $Iface 를 실현 해 야 합 니 다. 다른 server 와 관련 된 것 은 Thrift 가 모두 봉 인 했 습 니 다. 여기 서 사용 하 는 IDL 도 backtype. storm. generated 에 있 습 니 다. clojure 는 JVM 을 기반 으로 하기 때문에 IDL 은 자바 로 전환 하면 됩 니 다.
(defn launch-server! [conf nimbus]
(validate-distributed-mode! conf)
(let [service-handler (service-handler conf nimbus)
options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (Nimbus$Processor. service-handler))
)
server (THsHaServer. options)]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
(log-message "Starting Nimbus server...")
(.serve server)))
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Thrift를 사용하여 Ruby에서 HBase에 액세스하려는 경우Thrift를 사용하여 Ruby에서 HBase에 액세스하려고 합니다.(제목 엄마) 원래는 HBase에 REST API를 통해 접근하는 루비 프로그램이라고 쓰여 있었으나 Thrift만 지원하는 작업을 하려고 Thrif...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.