고성능 서비스 통신 프레임 워 크 Gaea 의 상세 한 실현--server 요청 처리 프로 세 스
Gaea 는 tcp/http/telnet 세 가지 통신 정 보 를 지원 하 는데 그 중에서 주요 한 통신 부분 은 netty 통신 프레임 워 크 로 이 루어 지고 netty 는 고성능 의 비 차단 통신 도 구 를 제공 합 니 다.
Gaea 각 서비스 시작
서 비 스 를 시작 하 는 설정 입 니 다.tcp 설정 에 대해 간단히 소개 하 겠 습 니 다.telnet,http 의 기본 은 같 습 니 다.
<property>
<name>gaea.servers</name>
<value>gaea.server.tcp,gaea.server.http,gaea.server.telnet</value>
</property>
<property>
<name>gaea.server.tcp.enable</name>
<value>true</value>
</property>
<property>
<name>gaea.server.tcp.implement</name>
<value>com.bj58.spat.gaea.server.core.communication.tcp.SocketServer</value>
</property>
// IP , , ,
이상 의 설정 을 설정 하면 TCP 서비스의 시작 을 표시 할 수 있 습 니 다.또한 어떤 서 비 스 를 스스로 실현 하고 IServer 인 터 페 이 스 를 계승 한 다음 설정 파일 에 상기 설정 을 추가 하면 서비스 가 시 작 될 때 서 비 스 를 시작 할 수 있 습 니 다.예 를 들 어 Gaea 모니터링 서비스.
IServer
public interface IServer {
public void start() throws Exception;
public void stop() throws Exception;
}
Socketserver 는 TCP 서비스의 IServer 구현 클래스 입 니 다.
//SocketServer
initSocketServer() { // TCPServer
SocketHandler handler = new SocketHandler(); //TCP Handler
bootstrap.setPipelineFactory(new SocketPipelineFactory(handler,
Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.frameMaxLength")));
bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
bootstrap.setOption("child.receiveBufferSize",
Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.receiveBufferSize"));
bootstrap.setOption("child.sendBufferSize",
Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.sendBufferSize"));
try {
InetSocketAddress socketAddress = null;
socketAddress = new InetSocketAddress(Global.getSingleton().getServiceConfig().getString("gaea.server.tcp.listenIP"),
Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.listenPort"));
Channel channel = bootstrap.bind(socketAddress); //
allChannels.add(channel); // channel ChannelGroup ,
} catch (Exception e) {
logger.error("init socket server error", e);
System.exit(1);
}
}
netty 에 관 해 서 는 인터넷 에 자료 도 많 으 니 많이 볼 수 있 습 니 다.
Gaea 수신 데이터
netty 수신 데이터 등 이벤트 처 리 는 모두 Handler 에서 이 루어 집 니 다.따라서 여기 서 SocketHandler 에 대해 말씀 드 리 겠 습 니 다.
채널 이 연결 되 었 을 때 채널 오픈 이벤트 가 발생 합 니 다.
SocketServer.allChannels.add(e.getChannel());
/**
* , SecureContext
*/
if(Global.getSingleton().getGlobalSecureIsRights()){//
Global.getSingleton().addChannelMap(e.getChannel(), new SecureContext()); SecureContext。
}
클 라 이언 트 가 연 결 될 때 채널 커 넥 티 드 이 벤트 를 촉발 합 니 다.
for(IFilter filter : Global.getSingleton().getConnectionFilterList()) { //Global , 。
filter.filter(new GaeaContext(new GaeaChannel(e.getChannel())));// 。
}
연결 필 터 를 주로 실행 하여 연결 을 제어 합 니 다.
수신 데이터
try {
logger.debug("message receive");
ByteBuffer buffer = ((ChannelBuffer)e.getMessage()).toByteBuffer(); //Netty ChannelBuffer , NIO ByteBuffer
byte[] reciveByte = buffer.array();
logger.debug("reciveByte.length:" + reciveByte.length);
byte[] headDelimiter = new byte[0];
System.arraycopy(reciveByte, 0, headDelimiter, 0, 0);
byte[] requestBuffer = new byte[reciveByte.length];
System.arraycopy(reciveByte, 0, requestBuffer, 0, (reciveByte.length));
GaeaContext gaeaContext = new GaeaContext(requestBuffer, // Gaea GaeaContext; GaeaContext 。
new GaeaChannel(e.getChannel()),
ServerType.TCP,
this);
SocketServer.invokerHandle.invoke(gaeaContext);// GaeaContext
} catch(Throwable ex) {
byte[] response = ExceptionHelper.createErrorProtocol(); // , ,
e.getChannel().write(response);
logger.error("SocketHandler invoke error", ex);
}
invoke 에서 Gaea 는 동기 화 와 비동기 화 두 가지 처리 방식 을 제공 합 니 다.물론 동기 화 는 초기 결과 물이 어야 합 니 다.현재 처 리 는 대체적으로 비동기 처리 입 니 다.
동기 처리 프로 세 스
@Override
public void invoke(GaeaContext context) throws Exception {
try {
for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {//
if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) {
f.filter(context); // filter
}
}
if(context.isDoInvoke()) {
doInvoke(context);//
}
logger.debug("begin response filter");
// response filter
for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {//
if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) {
f.filter(context);// filter
}
}
context.getServerHandler().writeResponse(context);//
} catch(Exception ex) {
context.setError(ex);
context.getServerHandler().writeResponse(context); // ,
logger.error("in async messageReceived", ex);
}
}
상기 코드 에서 알 수 있 듯 이 Gaea 는 일련의 요청 필 터 를 거 친 후에 야 진정한 방법 을 실행 하고 최종 적 으로 필 터 를 되 돌려 주 며 마지막 으로 클 라 이언 트 에 게 다시 쓴다.
비동기 처리 프로 세 스
invoke 를 호출 하면 Gaea 의 이번 요청 은 AsyncInvoker 의 비동기 실행 기 에 넣 어 실 행 됩 니 다.다음 요청 을 받 고 빠르게 되 돌아 갑 니 다.
asyncInvoker.run(taskTimeOut, new IAsyncHandler(){...};
IAsynHandler 에 서 는 구체 적 으로 세 가지 방법 이 있 습 니 다.run 은 임 무 를 수행 하 는 것 이 고 message Received 는 실 행 된 후에 실행 결 과 를 되 돌려 줍 니 다.exception Caught 는 실행 과정 에서 의 모든 이상 을 처리 합 니 다.IAsyncHandler 정 의 는 다음 과 같 습 니 다.
public interface IAsyncHandler {
public Object run() throws Throwable;
public void messageReceived(Object obj);
public void exceptionCaught(Throwable e);
}
asyncInvoker.run 을 실행 할 때 비동기 실행 기 는 작업 을 64 개의 대기 열 에 던 졌 을 뿐 기본 값 은 64 이 며 설정 도 할 수 있 습 니 다.설정 항목 gaea.async.worker.count
AsyncTask task = new AsyncTask(timeOut, handler); //handler ,timeOut ,
if(rr > 10000) {
rr = 0;
}
int idx = rr % workers.length; //
workers[idx].addTask(task);
++rr;
그 중에서 비동기 작업 중 시간 초과 가 있 습 니 다.만약 에 대기 열 에 있 는 시간 이 이 값 보다 많 으 면 Gaea 는 이 임 무 를 버 리 고 전체 서비스의 정상 적 인 운행 을 보호 합 니 다.이것 이 바로 Gaea 의 서버 부하 보호 전략 입 니 다.서버 의 압력 이 너무 지연 되 는 것 을 방지 하고 일부 임 무 를 버 려 서 대부분의 작업 의 효과 적 인 집행 을 보호 합 니 다.
비동기 실행 기 를 초기 화 할 때 64 개의 작업 스 레 드 와 스 레 드 풀 을 시작 합 니 다.
private AsyncInvoker(int workerCount, boolean timeoutEffect) {
workers = new AsyncWorker[workerCount];
ExecutorService executor = Executors.newCachedThreadPool(new ThreadRenameFactory("async task thread"));
for(int i=0; i<workers.length; i++) {
workers[i] = new AsyncWorker(executor, timeoutEffect);
workers[i].setDaemon(true);
workers[i].setName("async task worker[" + i + "]");
workers[i].start();
}
}
여기에 64 개의 스 레 드 와 하나의 스 레 드 풀 을 제공 하 는 역할 은 Gaea 가 제공 하 는 두 가지 처리 작업 방식 이다.하 나 는 작업 분리 이 고 64 개의 대기 열 은 각자 의 작업 을 처리 하 며 하 나 는 스 레 드 풀 로 하나의 대기 열 을 처리 하고 작업 의 다 중 실행 시간 을 설정 했다.
private void execNoTimeLimitTask() {
AsyncTask task = null;
try {
task = taskQueue.take();
if(task != null){
if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) { //
task.getHandler().exceptionCaught(new TimeoutException("async task timeout!"));
return;
} else {
Object obj = task.getHandler().run(); //
task.getHandler().messageReceived(obj); //
}
}else{
logger.error("execNoTimeLimitTask take task is null");
}
} catch(InterruptedException ie) {
} catch(Throwable ex) {
if(task != null) {
task.getHandler().exceptionCaught(ex); //
}
}
}
try {
final AsyncTask task = taskQueue.take();
if(task != null) {
if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) {
task.getHandler().exceptionCaught(new TimeoutException("async task timeout!"));
return;
}else{
final CountDownLatch cdl = new CountDownLatch(1);
executor.execute(new Runnable(){
@Override
public void run() {
try {
Object obj = task.getHandler().run(); //
task.getHandler().messageReceived(obj); //
} catch(Throwable ex) {
task.getHandler().exceptionCaught(ex);
} finally {
cdl.countDown();
}
}
}
);
cdl.await(getTimeout(task.getTimeout(), taskQueue.size()), TimeUnit.MILLISECONDS); // cdl.countDown , , 。
if(cdl.getCount() > 0) {
task.getHandler().exceptionCaught(new TimeoutException("async task timeout!")); //
}
}
}else{
logger.error("execTimeoutTask take task is null");
}
} catch(InterruptedException ie) {
logger.error("");
} catch (Throwable e) {
logger.error("get task from poll error", e);
}
public Object run() throws Throwable {
logger.debug("begin request filter");
// request filter
for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {
if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) {
f.filter(context);
}
}
if(context.isDoInvoke()) {
if(context.getServerType() == ServerType.HTTP){ // http
httpThreadLocal.set(context.getHttpContext());
}
doInvoke(context);
}
logger.debug("begin response filter");
// response filter
for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {
if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) {
f.filter(context);
}
}
return context;
}
요청 필터
요청 필 터 는 말 그대로 작업 을 수행 하기 전에 Gaea 가 요청 에 대한 처리 입 니 다.Gaea 기본 프레임 워 크 의 요청 필 터 는 어떤 것들 이 있 나 요?
<property>
<name>gaea.filter.global.request</name>
<value>com.bj58.spat.gaea.server.filter.ProtocolParseFilter,com.bj58.spat.gaea.server.filter.HandclaspFilter,com.bj58.spat.gaea.server.filter.ExecuteMethodFilter</value>
</property>
Gaea 가 데 이 터 를 처음 받 았 을 때 Gaea 는 요청 한 바 이 너 리 흐름 을 컨 텍스트 루트 GaeaContext 에 넣 었 습 니 다.이 Gaea 는 바 이 너 리 흐름 을 분석 하고 복원 하여 작업 을 수행 하 는 데 사 용 했 습 니 다.
public void filter(GaeaContext context) throws Exception {
if(context.getServerType() == ServerType.TCP) {
byte[] desKeyByte = null;
String desKeyStr = null;
boolean bool = false;
Global global = Global.getSingleton();
if(global != null){
//
if(global.getGlobalSecureIsRights()){
SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel());
bool = securecontext.isRights();
if(bool){
desKeyStr = securecontext.getDesKey();
}
}
}
if(desKeyStr != null){
desKeyByte = desKeyStr.getBytes("utf-8");
}
Protocol protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte); // deskey
context.getGaeaRequest().setProtocol(protocol);
/**
*
*/
if(Global.getSingleton().getServerState() == ServerStateType.Reboot && protocol.getPlatformType() == PlatformType.Java){
GaeaResponse response = new GaeaResponse();
ResetProtocol rp = new ResetProtocol();
rp.setMsg("This server is reboot!");
protocol.setSdpEntity(rp);
response.setResponseBuffer(protocol.toBytes(global.getGlobalSecureIsRights(),desKeyByte));
context.setGaeaResponse(response);
context.setExecFilter(ExecFilterType.None);
context.setDoInvoke(false);
}
}
}
전체 프로 세 스 는 상기 코드 와 같이 권한 인증 을 사용 할 지 여 부 를 판단 한 다음 에 권한 인증 에 따라 바 이 너 리 흐름 에 대해 프로 토 콜 분석 을 합 니 다.Gaea 프로 토 콜 에 대해 서 는 사용자 정의 바 이 너 리 프로 토 콜 로 구체 적 으로 다른 글 을 상세 하 게 설명 합 니 다.분석 후 서비스 재 부팅 작업 이 라면 추가 작업 을 위해 GaeaContext 를 기록 합 니 다.
/**
* filter
*/
@Override
public void filter(GaeaContext context) throws Exception {
Protocol protocol = context.getGaeaRequest().getProtocol();
if(protocol.getPlatformType() == PlatformType.Java && context.getServerType() == ServerType.TCP){//java
GaeaResponse response = new GaeaResponse();
Global global = Global.getSingleton();
//
if(Global.getSingleton().getGlobalSecureIsRights()){
SecureContext sc = global.getGlobalSecureContext(context.getChannel().getNettyChannel());
// channel
if(!sc.isRights()){
//
if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){
SecureKey sk = new SecureKey();
HandclaspProtocol handclaspProtocol = (HandclaspProtocol)protocol.getSdpEntity();
/**
*
*/
if("1".equals(handclaspProtocol.getType())){
sk.initRSAkey();
//
String clientPublicKey = handclaspProtocol.getData();
if(null == clientPublicKey || "".equals(clientPublicKey)){
logger.warn("get client publicKey warn!");
}
//java
if(protocol.getPlatformType() == PlatformType.Java){
// / ,
sc.setServerPublicKey(sk.getStringPublicKey());
sc.setServerPrivateKey(sk.getStringPrivateKey());
sc.setClientPublicKey(clientPublicKey);
handclaspProtocol.setData(sk.getStringPublicKey());//
}
protocol.setSdpEntity(handclaspProtocol);
response.setResponseBuffer(protocol.toBytes());
context.setGaeaResponse(response);
this.setInvokeAndFilter(context);
logger.info("send server publieKey sucess!");
}
/**
*
*/
else if("2".equals(handclaspProtocol.getType())){
//
String clientSecureInfo = handclaspProtocol.getData();
if(null == clientSecureInfo || "".equals(clientSecureInfo)){
logger.warn("get client secureKey warn!");
}
// ( )
String sourceInfo = sk.decryptByPrivateKey(clientSecureInfo, sc.getServerPrivateKey());
//
// , DES ,
if(global.containsSecureMap(sourceInfo)){
logger.info("secureKey is ok!");
String desKey = StringUtils.getRandomNumAndStr(8);
// channel
sc.setDesKey(desKey);
sc.setRights(true);
handclaspProtocol.setData(sk.encryptByPublicKey(desKey, sc.getClientPublicKey()));
protocol.setSdpEntity(handclaspProtocol);
response.setResponseBuffer(protocol.toBytes());
context.setGaeaResponse(response);
}else{
logger.error("It's bad secureKey!");
this.ContextException(context, protocol, response, " !");
}
this.setInvokeAndFilter(context);
}else{
//
logger.error(" !");
this.ContextException(context, protocol, response, " !");
}
response = null;
sk = null;
handclaspProtocol = null;
}else{
//
logger.error(" !");
this.ContextException(context, protocol, response, " !");
}
}
}else{
if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){
// --
logger.error(" !");
this.ContextException(context, protocol, response, " !");
}
}
}
}
임 무 를 집행 하 다
작업 을 수행 하 는 것 은 doInvoke(gaeaContext)입 니 다.
이 단 계 는 간단 합 니 다.프로 토 콜 에 따라 필터 가 분석 한 요청 데 이 터 를 분석 하고 요청 한 클래스 이름 을 찾 은 다음 클래스 이름 에 따라 IProxy Factory 에서 해당 하 는 프 록 시 클래스 를 추출 한 다음 에 프 록 시 를 통 해 진정한 방법 을 수행 합 니 다.IProxy Factory 클래스 에 대해 서 는 Gaea 의 시작 과정 에서 어떻게 생 성 되 는 지 보 세 요.
IProxyStub localProxy = Global.getSingleton().getProxyFactory().getProxy(request.getLookup()); //
GaeaResponse gaeaResponse = localProxy.invoke(context);//
이 과정 에서 여러 가지 이상 을 처 리 했 고 모든 처리 결 과 를 GaeaContext 에 넣 었 다.StopWatch 는 호출 정 보 를 주로 기록 하고 되 돌아 오 는 필터 에 실행 시간 을 기록 합 니 다.
필터 되 돌리 기
프레임 의 리 턴 필터
<!-- global response filter -->
<property>
<name>gaea.filter.global.response</name>
<value>com.bj58.spat.gaea.server.filter.ProtocolCreateFilter,com.bj58.spat.gaea.server.filter.ExecuteTimeFilter</value>
</property>
context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte));
public void filter(GaeaContext context) throws Exception {
StopWatch sw = context.getStopWatch();
Collection<PerformanceCounter> pcList = sw.getMapCounter().values();
for(PerformanceCounter pc : pcList) {
if(pc.getExecuteTime() > minRecordTime) {
StringBuilder sbMsg = new StringBuilder();
sbMsg.append(serviceName);
sbMsg.append("--");
sbMsg.append(pc.getKey());
sbMsg.append("--time: ");
sbMsg.append(pc.getExecuteTime());
sbMsg.append(" [fromIP: ");
sbMsg.append(sw.getFromIP());
sbMsg.append(";localIP: ");
sbMsg.append(sw.getLocalIP()+"]");
udpClient.send(sbMsg.toString());
}
}
결과 반환
위의 필터,실행 등 과정 에서 얻 은 결 과 를 상하 문 GaeaContext 에 봉 하여 이 단계 에서 되 돌려 줍 니 다
public void messageReceived(Object obj) {
if(context.getServerType() == ServerType.HTTP){
httpThreadLocal.remove();
}
if(obj != null) {
GaeaContext ctx = (GaeaContext)obj;
ctx.getServerHandler().writeResponse(ctx);
} else {
logger.error("context is null!");
}
}
if(context != null && context.getGaeaResponse() != null){
context.getChannel().write(context.getGaeaResponse().getResponseBuffer()); //getResponseBuffer
} else {
context.getChannel().write(new byte[]{0});
logger.error("context is null or response is null in writeResponse");
}
예외 처리
Gaea 서 비 스 를 수행 하 는 과정 에서 모든 이상 이 던 져 지고 exception Caught(exception)에 의 해 받 아들 여지 고 Gaea 패 키 징 을 거 쳐 직렬 화 되 어 클 라 이언 트 에 게 돌아 가 호출 자 에 게 어떤 원인 으로 호출 이 실 패 했 는 지 알려 줍 니 다.
public void exceptionCaught(Throwable e) {
if(context.getServerType() == ServerType.HTTP){
httpThreadLocal.remove();
}
if(context.getGaeaResponse() == null){
GaeaResponse respone = new GaeaResponse();
context.setGaeaResponse(respone);
}
try {
byte[] desKeyByte = null;
String desKeyStr = null;
boolean bool = false;
Global global = Global.getSingleton();
if(global != null){
//
if(global.getGlobalSecureIsRights()){
SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel());
bool = securecontext.isRights();
if(bool){
desKeyStr = securecontext.getDesKey();
}
}
}
if(desKeyStr != null){
desKeyByte = desKeyStr.getBytes("utf-8");
}
Protocol protocol = context.getGaeaRequest().getProtocol();
if(protocol == null){
protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte);
context.getGaeaRequest().setProtocol(protocol);
}
protocol.setSdpEntity(ExceptionHelper.createError(e));
context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte));
} catch (Exception ex) {
context.getGaeaResponse().setResponseBuffer(new byte[]{0});
logger.error("AsyncInvokerHandle invoke-exceptionCaught error", ex);
}
context.getServerHandler().writeResponse(context);
logger.error("AsyncInvokerHandle invoke error", e);
}
});
총결산
이로써 한 클 라 이언 트 의 요청 처리 가 끝 났 습 니 다.Gaea 의 전체 디자인 에서 알 수 있 듯 이 많은 것들 이 인 터 페 이 스 를 남 겨 두 었 고 구조 자체 에서 자신의 업무 에 적합 한 처 리 를 할 수 있 습 니 다.전체 디자인 은 서비스 통신 구조의 성능 을 결정 합 니 다.
le284
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.