Dubbo 소스 학습 - ForkingCluster 클러스터링 오류 (8)
Forking Cluster
여러 서버를 병렬 호출하여 하나만 성공하면 되돌아옵니다.일반적으로 실시간 요구가 높은 읽기 작업에 사용되지만 더 많은 서비스 자원을 낭비해야 한다.통과할 수 있다
forks="2"
최대 병렬 수를 설정합니다.Forking Cluster에서는 Forking Cluster Invoker가 구현을 도와줍니다.
public class ForkingCluster implements Cluster {
public final static String NAME = "forking";
public Invoker join(Directory directory) throws RpcException {
return new ForkingClusterInvoker(directory);
}
}
ForkingCluster Invoker에서 여러 개의 서비스 제공자를 선택하고 여러 개의 스레드를 만들어서 스레드 탱크에 추가해서 서비스를 호출하면 서비스 호출 결과에 따라 판단하며, 호출에 성공한 서비스가 있으면 결과를 되돌려주고, 그렇지 않으면 이상 정보를 되돌려줍니다.
public class ForkingClusterInvoker extends AbstractClusterInvoker {
private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true));
public ForkingClusterInvoker(Directory directory) {
super(directory);
}
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
final List> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
//
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue