Storm-소스 코드 분석-토폴로지 제출-클 라 이언 트
16403 단어 storm
처음에 storm 명령 을 사용 하여 토폴로지 를 시작 합 니 다.다음 과 같 습 니 다.
storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology
이 storm 명령 은 python 으로 이 루어 졌 습 니 다.그 중의 jar 함 수 를 보 세 요.간단 합 니 다.exec 를 호출 합 니 다.storm_class,그 중 jvmtype="-client" 반면 execstorm_class 는 사실 자바 실행 명령 을 맞 추고 os.system(command)으로 실행 하 는 것 입 니 다.왜 Python 으로 간단 합 니까?storm 명령 을 직접 사용 할 수 있 습 니까? 여기 있 는 klass 는 바로 topology 류 이기 때문에 자바 명령 은 Topology 류 의 main 함수 만 호출 합 니 다.
def jar(jarfile, klass, *args):
"""Syntax: [storm jar topology-jar-path class ...]
Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
"""
exec_storm_class(
klass,
jvmtype="-client",
extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
args=args,
childopts="-Dstorm.jar=" + jarfile)
def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]):
nativepath = confvalue("java.library.path", extrajars)
args_str = "".join(map(lambda s: "\"" + s + "\"", args))
command = "java" + jvmtype + " -Dstorm.home=" + STORM_DIR + "" + get_config_opts() + " -Djava.library.path=" + nativepath + "" + childopts + " -cp" + get_classpath(extrajars) + "" + klass + "" + args_str
print "Running:" + command
os.system(command)
WordCountTopology 예 를 직접 보 는 main 함 수 는 무엇 을 실행 합 니까?
토폴로지 를 정의 하 는 것 외 에 최종 적 으로 StormSubmitter.submitTopology(args[0],conf,builder.createTopology()를 호출 하여 토폴로지 를 제출 합 니 다.
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if(args!=null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
StormSubmitter
submitTopology 직접 보기,1.매개 변수 설정
명령 행 파 라미 터 를 stormConf 에 두 고 conf/storm.yaml 에서 설정 파 라미 터 를 conf 로 읽 고 stormConf 도 conf 로 넣 으 면 명령 행 파라미터 의 우선 순위 가 더 높다 는 것 을 알 수 있다
stormConf 를 JSon 으로 바 꿉 니 다.이 설정 은 서버 에 보 내야 하기 때 문 입 니 다.
2. Submit Jar Storm Submitter 의 본질은 Thrift Client 이 고 Nimbus 는 Thrift Server 이기 때문에 모든 조작 은 Thrift RPC 를 통 해 이 루어 집 니 다.Thrift 참조Thrift Storm-소스 분석-Thrift 사용
먼저 topology NameExists 를 판단 하고 Thrift client 를 통 해 현재 실행 중인 topology 의 상황 을 얻 고 check
그리고 Submit Jar,다음 세 단 계 를 통과 합 니 다. client.getClient().beginFileUpload();
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
client.getClient().finishFileUpload(uploadLocation);
RPC 를 통 해 데 이 터 를 보 내 고 구체 적 으로 어떻게 저장 하 는 지 는 Nimbus 자신의 논리 적 인 일 입 니 다.
3. Submit Topology 간단 해 요.그냥 간단하게 RPC 를 호출 하 는 거 예요.
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
/** * Submits a topology to run on the cluster. A topology runs forever or until * explicitly killed. * * * @param name the name of the storm. * @param stormConf the topology-specific configuration. See {@link Config}. * @param topology the processing to execute. * @param options to manipulate the starting of the topology * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted */
public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
if(!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
try {
String serConf = JSONValue.toJSONString(stormConf);
if(localNimbus!=null) {
LOG.info("Submitting topology " + name + " in local mode");
localNimbus.submitTopology(name, null, serConf, topology);
} else {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
if(topologyNameExists(conf, name)) {
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
submitJar(conf);
try {
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
if(opts!=null) {
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, submittedJar, serConf, topology);
}
} catch(InvalidTopologyException e) {
LOG.warn("Topology submission exception", e);
throw e;
} catch(AlreadyAliveException e) {
LOG.warn("Topology already alive exception", e);
throw e;
} finally {
client.close();
}
}
LOG.info("Finished submitting topology: " + name);
} catch(TException e) {
throw new RuntimeException(e);
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
storm Async loop died! & reconnect자세히 보기 storm이 슈퍼바이저가 리셋되었을 때 topology가 오류를 보고하여 모든 spout이 소비되지 않습니다. 로그 위에 대량의reconnection IP에 로그인하여 6703 포트에 두 개의 워커가 있...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.