RPC Server for Erlang, In Java
It's really a need to get some massive data processed in other languages, for example, C, Java etc. That's why I tried to write RPC server for Erlang, in Java.
There is a jinterface lib with OTP/Erlang, which is for communication between Erlang and Java. And there are docs for how to get it to work. But, for a RPC server that is called from Erlang, there are still some tips for real world:
1. When you send back the result to caller, you need set the result as a tuple, with caller's tag Ref as the first element, and the destination should be the caller's Pid. It's something like:
OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[] {call.tag, tResult});
sConnection.send(call.to, msg);
where, call.tag is a OtpErlangRef, and tResult can be any OtpErlangObject, call.to is a OtpErlangPid.
2. If you need to send back a massive data back to caller, the default buffer size of OtpErlangOutputStream is not good, I set it to 1024 * 1024 * 10
3. Since there may be a lot of concurrent callers call your RPC server, you have to consider the concurrent performance of your server, I choose using thread pool here.
The RPC server in Java has two class, RpcNode.java, and RpcMsg.java:
package net.lightpole.rpcnode;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangPid;
import com.ericsson.otp.erlang.OtpErlangRef;
import com.ericsson.otp.erlang.OtpErlangTuple;
/**
*
* @author Caoyuan Deng
*/
public class RpcMsg {
public OtpErlangAtom call;
public OtpErlangAtom mod;
public OtpErlangAtom fun;
public OtpErlangList args;
public OtpErlangPid user;
public OtpErlangPid to;
public OtpErlangRef tag;
public RpcMsg(OtpErlangTuple from, OtpErlangTuple request) throws IllegalArgumentException {
if (request.arity() != 5) {
throw new IllegalArgumentException("Not a rpc call");
}
/* {call, Mod, Fun, Args, userPid} */
if (request.elementAt(0) instanceof OtpErlangAtom && ((OtpErlangAtom) request.elementAt(0)).atomValue().equals("call") &&
request.elementAt(1) instanceof OtpErlangAtom &&
request.elementAt(2) instanceof OtpErlangAtom &&
request.elementAt(3) instanceof OtpErlangList &&
request.elementAt(4) instanceof OtpErlangPid &&
from.elementAt(0) instanceof OtpErlangPid &&
from.elementAt(1) instanceof OtpErlangRef) {
call = (OtpErlangAtom) request.elementAt(0);
mod = (OtpErlangAtom) request.elementAt(1);
fun = (OtpErlangAtom) request.elementAt(2);
args = (OtpErlangList) request.elementAt(3);
user = (OtpErlangPid) request.elementAt(4);
to = (OtpErlangPid) from.elementAt(0);
tag = (OtpErlangRef) from.elementAt(1);
} else {
throw new IllegalArgumentException("Not a rpc call.");
}
}
/* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */
public static RpcMsg tryToResolveRcpCall(OtpErlangObject msg) {
if (msg instanceof OtpErlangTuple) {
OtpErlangTuple tMsg = (OtpErlangTuple) msg;
if (tMsg.arity() == 3) {
OtpErlangObject[] o = tMsg.elements();
if (o[0] instanceof OtpErlangAtom && ((OtpErlangAtom) o[0]).atomValue().equals("$gen_call") &&
o[1] instanceof OtpErlangTuple && ((OtpErlangTuple) o[1]).arity() == 2 &&
o[2] instanceof OtpErlangTuple && ((OtpErlangTuple) o[2]).arity() == 5) {
OtpErlangTuple from = (OtpErlangTuple) o[1];
OtpErlangTuple request = (OtpErlangTuple) o[2];
try {
return new RpcMsg(from, request);
} catch (IllegalArgumentException ex) {
ex.printStackTrace();
}
}
}
}
return null;
}
}
package net.lightpole.rpcnode;
import com.ericsson.otp.erlang.OtpAuthException;
import com.ericsson.otp.erlang.OtpConnection;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangExit;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import com.ericsson.otp.erlang.OtpErlangTuple;
import com.ericsson.otp.erlang.OtpSelf;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* Usage:
* $ erl -sname clientnode -setcookie mycookie
* (clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).
*
* @author Caoyuan Deng
*/
public abstract class RpcNode {
public static final OtpErlangAtom OK = new OtpErlangAtom("ok");
public static final OtpErlangAtom ERROR = new OtpErlangAtom("error");
public static final OtpErlangAtom STOPED = new OtpErlangAtom("stoped");
private static final int THREAD_POOL_SIZE = 100;
private OtpSelf xSelf;
private OtpConnection sConnection;
private ExecutorService execService;
public RpcNode(String xnodeName, String cookie) {
this(xnodeName, cookie, THREAD_POOL_SIZE);
}
public RpcNode(String xnodeName, String cookie, int threadPoolSize) {
execService = Executors.newFixedThreadPool(threadPoolSize);
startServerConnection(xnodeName, cookie);
loop();
}
private void startServerConnection(String xnodeName, String cookie) {
try {
xSelf = new OtpSelf(xnodeName, cookie);
boolean registered = xSelf.publishPort();
if (registered) {
System.out.println(xSelf.node() + " is ready.");
/**
* Accept an incoming connection from a remote node. A call to this
* method will block until an incoming connection is at least
* attempted.
*/
sConnection = xSelf.accept();
} else {
System.out.println("There should be an epmd running, start an epmd by running 'erl'.");
}
} catch (IOException ex) {
Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
} catch (OtpAuthException ex) {
Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
}
}
private void loop() {
while (true) {
try {
final int[] flag = {0};
final OtpErlangTuple msg = (OtpErlangTuple) sConnection.receive();
Runnable task = new Runnable() {
public void run() {
RpcMsg call = RpcMsg.tryToResolveRcpCall(msg);
if (call != null) {
long t0 = System.currentTimeMillis();
flag[0] = processRpcCall(call);
System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0);
} else {
try {
sConnection.send(sConnection.peer().node(), new OtpErlangString("unknown request"));
} catch (IOException ex) {
Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
};
execService.execute(task);
if (flag[0] == -1) {
System.out.println("Exited");
break;
}
} catch (OtpErlangExit ex) {
Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
} catch (OtpAuthException ex) {
Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
protected void sendRpcResult(RpcMsg call, OtpErlangAtom head, OtpErlangObject result) throws IOException {
OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] {head, result});
// Should specify call.tag here
OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[]{call.tag, tResult});
// Should specify call.to here
sConnection.send(call.to, msg, 1024 * 1024 * 10);
}
public abstract int processRpcCall(RpcMsg call);
// ------ helper
public static String getShortLocalHost() {
return getLocalHost(false);
}
public static String getLongLocalHost() {
return getLocalHost(true);
}
private static String getLocalHost(boolean longName) {
String localHost;
try {
localHost = InetAddress.getLocalHost().getHostName();
if (!longName) {
/* Make sure it's a short name, i.e. strip of everything after first '.' */
int dot = localHost.indexOf(".");
if (dot != -1) {
localHost = localHost.substring(0, dot);
}
}
} catch (UnknownHostException e) {
localHost = "localhost";
}
return localHost;
}
}
As you can see, the RpcNode is an abstract class, by implement int processRpcCall(RpcMsg call), you can get your what ever wanted features. For example:
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package net.lightpole.xmlnode;
import basexnode.Main;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import java.io.IOException;
import net.lightpole.rpcnode.RpcMsg;
import net.lightpole.rpcnode.RpcNode;
/**
*
* @author dcaoyuan
*/
public class MyNode extends RpcNode {
public MyNode(String xnodeName, String cookie, int threadPoolSize) {
super(xnodeName, cookie, threadPoolSize);
}
@Override
public int processRpcCall(RpcMsg call) {
final String modStr = call.mod.atomValue();
final String funStr = call.fun.atomValue();
final OtpErlangList args = call.args;
try {
OtpErlangAtom head = ERROR;
OtpErlangObject result = null;
if (modStr.equals("xnode") && funStr.equals("stop")) {
head = OK;
sendRpcResult(call, head, STOPED);
return -1;
}
if (modStr.equals("System") && funStr.equals("currentTimeMillis")) {
head = OK;
long t = System.currentTimeMillis();
result = new OtpErlangLong(t);
} else {
result = new OtpErlangString("{undef,{" + modStr + "," + funStr + "}}");
}
if (result == null) {
result = new OtpErlangAtom("undefined");
}
sendRpcResult(call, head, result);
} catch (IOException ex) {
ex.printStackTrace();
} catch (Exception ex) {
}
return 0;
}
}
I tested MyNode by:
$ erl -sname clientnode -setcookie mycookie
...
(clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).
And you can try to test its concurrent performance by:
%% $ erl -sname clientnode -setcookie mycookie
%% > xnode_test:test(10000)
-module(xnode_test).
-export([test/1]).
test(ProcN) ->
Workers = [spawn_worker(self(), fun rpc_parse/1, {})
|| I <- lists:seq(0, ProcN - 1)],
Results = [wait_result(Worker) || Worker <- Workers].
rpc_parse({}) ->
rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).
spawn_worker(Parent, F, A) ->
erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).
wait_result({Pid, Ref}) ->
receive
{'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
{'DOWN', Ref, _, _, Reason} -> exit(Reason)
end.
I spawned 10000 calls to it, and it run smoothly.
I'm also considering to write a more general-purpose RPC server in Java, which can dynamically call any existed methods of Java class.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Is Eclipse IDE dying?In 2014 the Eclipse IDE is the leading development environment for Java with a market share of approximately 65%. but ac...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.