Netty 심장 박동 메커니즘 분석
What
말 그대로 심장박동이란 TCP 장거리 연결에서 클라이언트와 서버 간에 정기적으로 발송되는 특수한 데이터 패키지로 상대방에게 자신이 아직 온라인에 있다는 것을 알려 TCP 연결의 유효성을 확보하는 것이다.
Why
네트워크의 신뢰성이 떨어지기 때문에 TCP가 긴 연결을 유지하는 과정에서 일부 돌발적인 상황, 예를 들어 네트워크가 뽑히고 갑자기 전기가 떨어지는 등 서버와 클라이언트의 연결이 중단될 수 있다.이러한 돌발적인 상황에서 만약에 서버와 클라이언트 간에 상호작용이 없다면 그들은 짧은 시간 안에 상대방이 이미 오프라인 상태인 것을 발견할 수 없다.이 문제를 해결하기 위해서 우리는 심장 박동 메커니즘을 도입해야 한다.심장 박동 메커니즘의 작업 원리는 서버와 클라이언트 간에 일정 시간 동안 데이터 상호작용이 없을 때, 즉 idle 상태에 있을 때 클라이언트나 서버는 특수한 데이터 패키지를 상대방에게 보내고 수신자가 이 데이터 메시지를 받은 후에 바로 특수한 데이터 메시지를 보내서 발송자에게 응답한다. 이것이 바로 PING-PONG 상호작용이다.자연스럽게 어느 한 쪽에서 심장 박동 메시지를 받은 후에 상대방이 여전히 온라인이라는 것을 알게 되면 TCP 연결의 유효성을 확보한다
How
우리는 두 가지 방식으로 심장 박동 메커니즘을 실현할 수 있다.
TCP 프로토콜 차원에서 keepalive 보존 메커니즘을 제공했지만 이를 사용하는 데는 몇 가지 단점이 있다.
TCP 차원의keepalive 메커니즘을 사용하는 것은 사용자 정의 응용층 심장 박동 메커니즘보다 유량을 절약하지만 위의 몇 가지 단점을 바탕으로 일반적인 실천에서 사람들은 대부분 응용층에서 사용자 정의 심장 박동을 실현하는 것을 선택한다.그렇다면 넷티에서 어떻게 심장이 뛰는지 대충 살펴보자.Netty에서 심장 박동 메커니즘을 실현하는 관건은 IdleStateHandler입니다. 채널의 읽기/쓰기에 타이머를 설정할 수 있습니다. 채널이 일정한 이벤트 간격 내에 데이터 상호작용이 없을 때 (즉 idle 상태) 지정된 이벤트를 촉발합니다.
netty로 심장 박동 실현
위에서 언급한 바와 같이 넷티에서 심장 박동 메커니즘을 실현하는 관건은 IdleStateHandler입니다. 그러면 이 Handler는 어떻게 사용합니까?그 구조기를 살펴보자.
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
IdleStateHandler를 실례화하려면 다음 세 가지 매개변수를 제공해야 합니다.
구체적인 IdleStateHandler가 실현하는 심장 박동 메커니즘을 보여주기 위해 다음은 구체적인 EchoServer의 예를 구성합니다. 이 예는 다음과 같습니다.
+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"HELLO, WORLD" |
+--------+-----+---------------+
공통 섹션
위에서 정의한 행위에 따라 우리는 다음에 심장 박동을 실현하는 일반적인 부분인 Custom HeartbeatHandler:
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler {
public static final byte PING_MSG = 1;
public static final byte PONG_MSG = 2;
public static final byte CUSTOM_MSG = 3;
protected String name;
private int heartbeatCount = 0;
public CustomHeartbeatHandler(String name) {
this.name = name;
}
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
if (byteBuf.getByte(4) == PING_MSG) {
sendPongMsg(context);
} else if (byteBuf.getByte(4) == PONG_MSG){
System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
} else {
handleData(context, byteBuf);
}
}
protected void sendPingMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(5);
buf.writeInt(5);
buf.writeByte(PING_MSG);
context.writeAndFlush(buf);
heartbeatCount++;
System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
}
private void sendPongMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(5);
buf.writeInt(5);
buf.writeByte(PONG_MSG);
context.channel().writeAndFlush(buf);
heartbeatCount++;
System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
}
protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// IdleStateHandler IdleStateEvent .
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
}
protected void handleReaderIdle(ChannelHandlerContext ctx) {
System.err.println("---READER_IDLE---");
}
protected void handleWriterIdle(ChannelHandlerContext ctx) {
System.err.println("---WRITER_IDLE---");
}
protected void handleAllIdle(ChannelHandlerContext ctx) {
System.err.println("---ALL_IDLE---");
}
}
클래스 Custom Heartbeat Handler는 심장 박동의 송신과 수신을 책임지고 그 작용을 상세하게 분석합니다.앞에서 언급한 바와 같이 IdleStateHandler는 심장 박동을 실현하는 관건이다. IO idle 유형에 따라 서로 다른 IdleState Event 사건이 발생한다. 이 사건의 포획은 사실
userEventTriggered
방법에서 실현된.
Custom Heartbeat Handler를 살펴보겠습니다.userEventTriggered의 구체적인 구현:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
userEventTriggered에서 IdleStateEvent의state()에 따라 처리합니다.예를 들어 읽기 데이터 idle의 경우 e.state() = = READER_IDLE, 따라서
handleReaderIdle을 호출하여 처리합니다.CustomHeartbeatHandler는 세 가지 idle 처리 방법을 제공합니다:handleReaderIdle,handleWriterIdle,handleAllIdle,
이 세 가지 방법은 현재 기본적인 실현만 있을 뿐, 하위 클래스에서 다시 작성해야 한다. 현재 우리는 잠시 그것들을 생략하고, 구체적인 클라이언트와 서버의 실현 부분을 볼 때 그것들을 다시 볼 것이다.
이 점을 알게 된 후 데이터 처리 부분을 살펴보겠습니다.
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
if (byteBuf.getByte(4) == PING_MSG) {
sendPongMsg(context);
} else if (byteBuf.getByte(4) == PONG_MSG){
System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
} else {
handleData(context, byteBuf);
}
}
Custom Heartbeat Handler에서.channelRead0에서 메시지 프로토콜에 따라 다음을 수행합니다.
+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"HELLO, WORLD" |
+--------+-----+---------------+
현재 메시지 유형을 판단하기 위해 PING_MSG는 서버가 클라이언트로부터 PING 메시지를 받았음을 나타냅니다. 이때 서버는 PONG 메시지를 답장해야 합니다. 메시지 유형은 PONG_MSG. 메시지 던지기 유형은 퐁_MSG는 클라이언트가 서버에서 보낸 PONG 메시지를 받았음을 나타냅니다. 이때 로그를 인쇄하면 됩니다.
클라이언트 섹션
클라이언트 초기화:
public class Client {
public static void main(String[] args) {
NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
Random random = new Random(System.currentTimeMillis());
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 0, 5));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ClientHandler());
}
});
Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
for (int i = 0; i < 10; i++) {
String content = "client msg " + i;
ByteBuf buf = ch.alloc().buffer();
buf.writeInt(5 + content.getBytes().length);
buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
buf.writeBytes(content.getBytes());
ch.writeAndFlush(buf);
Thread.sleep(random.nextInt(20000));
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
workGroup.shutdownGracefully();
}
}
}
위의 코드는 Netty의 클라이언트 측의 초기화 코드입니다. Netty를 사용한 친구는 이 코드가 낯설지 않을 것입니다.다른 부분은 우리가 더 이상 군말 하지 않겠다. 우리 한번 보자
ChannelInitializer.initChannel 섹션은 다음과 같습니다.
.handler(new ChannelInitializer() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 0, 5));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ClientHandler());
}
});
우리는 pipeline에 세 개의 Handler를 추가했습니다.
IdleStateHandler
이handler는 심장 박동 메커니즘의 핵심입니다. 클라이언트 측에 읽기와 쓰기 시간을 초과했습니다. 시간 간격은 5s입니다. 즉, 만약에 손님이
사용자 측에서 5s 간격으로 서버의 메시지를 받거나 서버에 메시지를 보내지 않으면 ALL_IDLE 이벤트.
다음은 LengthFieldBasedFrameDecoder를 추가했습니다. 이것은 우리의 TCP 메시지를 해석하는 것을 책임지는 것입니다. 본고의 목적과 무관하기 때문에 여기는 상세하게 전개되지 않습니다.마지막 Handler는 ClientHandler입니다. 이것은 Custom HeartbeatHandler에 계승되어 저희가 업무 논리를 처리하는 부분입니다.
클라이언트
public class ClientHandler extends CustomHeartbeatHandler {
public ClientHandler() {
super("client");
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
byte[] data = new byte[byteBuf.readableBytes() - 5];
byteBuf.skipBytes(5);
byteBuf.readBytes(data);
String content = new String(data);
System.out.println(name + " get content: " + content);
}
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
super.handleAllIdle(ctx);
sendPingMsg(ctx);
}
}
ClientHandler는 Custom HeartbeatHandler의 계승으로 두 가지 방법을 다시 썼습니다. 하나는 Handle Data입니다. 이 안에서 받은 메시지만 출력할 수 있습니다.두 번째 다시 쓰는 방법은 handle All Idle입니다.앞에서 언급한 바와 같이 클라이언트는 심장이 뛰는 PING 메시지를 전송하고 클라이언트가 ALL_을 생성할 때IDLE 이벤트 후 상위 클래스의
CustomHeartbeatHandler.userEventTriggered 호출, userEventTriggered 에서는 e.state () 에 따라 다른 방법을 호출합니다. 따라서 마지막으로 호출한 것은
ClientHandler.handleAllIdle, 이 방법에서 클라이언트는sendPingMsg를 호출하여 서버에 PING 메시지를 보냅니다.
서버 섹션
서버 초기화
public class Server {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(10, 0, 0));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ServerHandler());
}
});
Channel ch = bootstrap.bind(12345).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
서버의 초기화 부분도 말할 것이 없습니다. 클라이언트의 초기화와 마찬가지로 pipeline에 세 개의 Handler를 추가했습니다.
서버 핸들러
public class ServerHandler extends CustomHeartbeatHandler {
public ServerHandler() {
super("server");
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
byte[] data = new byte[buf.readableBytes() - 5];
ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
buf.skipBytes(5);
buf.readBytes(data);
String content = new String(data);
System.out.println(name + " get content: " + content);
channelHandlerContext.write(responseBuf);
}
@Override
protected void handleReaderIdle(ChannelHandlerContext ctx) {
super.handleReaderIdle(ctx);
System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
ctx.close();
}
}
ServerHandler는 CustomHeartbeatHandler의 계승으로 두 가지 방법을 다시 썼다. 하나는 HandleData이고 이 안에서 Echo Server의 기능을 실현한다. 즉, 클라이언트의 메시지를 받은 후 바로 클라이언트에게 메시지를 고스란히 답장하는 것이다.두 번째 다시 쓰는 방법은handleReaderIdle입니다. 서버는 클라이언트의 읽기idle에만 관심이 있기 때문에 이 방법만 다시 썼습니다.서버가 지정된 시간 이후에 클라이언트로부터 메시지를 받지 못하면 READER_IDLE 메시지는 HandleReaderIdle을 호출합니다.앞에서 언급한 바와 같이 클라이언트는 심장이 뛰는 PING 메시지를 보내고 서버의 READER_IDLE의 시간 초과 시간은 클라이언트가 PING 메시지를 보내는 간격의 두 배이므로 서버 READER_IDLE가 터치하면 클라이언트가 오프라인 상태임을 확인할 수 있으므로 서버가 클라이언트 연결을 직접 닫으면 됩니다.
총결산
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.