DRPC 서비스 자체 생성
2452 단어 stormDRPC 서비스 만들기
먼저 DRPC의 원리를 설명합니다: 클라이언트가 DRPC 서버에 실행할 방법의 이름과 이 방법의 매개 변수를 보냅니다.이 함수를 실현한 topology는 DRPCSpout를 사용하여 DRPC 서버에서 함수 호출 흐름을 수신합니다.각 함수 호출은 DRPC 서버에 고유한 id로 표시됩니다.이 topology는 결과를 계산합니다. topology의 마지막 Return Results라는 bolt는 DRPC 서버에 연결되고 이 호출된 결과를 DRPC 서버에 보냅니다.DRPC 서버는 기다리는 클라이언트와 일치하는 유일한 id로 이 클라이언트를 깨우고 결과를 보냅니다.이 원리를 알게 되면 우리는 스스로 DRPC 서비스를 만들 수 있다.
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class ManualDRPC {
public static class ExclamationBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("result", "return-info"));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String arg = tuple.getString(0);
Object retInfo = tuple.getValue(1);
collector.emit(new Values(arg + "!!!", retInfo));
}
}
public static void main(String[] args) {
// DRPC
TopologyBuilder builder = new TopologyBuilder();
LocalDRPC drpc = new LocalDRPC();
DRPCSpout spout = new DRPCSpout("exclamation", drpc);
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("exclaim", conf, builder.createTopology());
System.out.println(drpc.execute("exclamation", "aaa"));
System.out.println(drpc.execute("exclamation", "bbb"));
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
storm Async loop died! & reconnect자세히 보기 storm이 슈퍼바이저가 리셋되었을 때 topology가 오류를 보고하여 모든 spout이 소비되지 않습니다. 로그 위에 대량의reconnection IP에 로그인하여 6703 포트에 두 개의 워커가 있...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.