Netty 심장 박동 메커니즘 분석

15519 단어

What


말 그대로 심장박동이란 TCP 장거리 연결에서 클라이언트와 서버 간에 정기적으로 발송되는 특수한 데이터 패키지로 상대방에게 자신이 아직 온라인에 있다는 것을 알려 TCP 연결의 유효성을 확보하는 것이다.

Why


네트워크의 신뢰성이 떨어지기 때문에 TCP가 긴 연결을 유지하는 과정에서 일부 돌발적인 상황, 예를 들어 네트워크가 뽑히고 갑자기 전기가 떨어지는 등 서버와 클라이언트의 연결이 중단될 수 있다.이러한 돌발적인 상황에서 만약에 서버와 클라이언트 간에 상호작용이 없다면 그들은 짧은 시간 안에 상대방이 이미 오프라인 상태인 것을 발견할 수 없다.이 문제를 해결하기 위해서 우리는 심장 박동 메커니즘을 도입해야 한다.심장 박동 메커니즘의 작업 원리는 서버와 클라이언트 간에 일정 시간 동안 데이터 상호작용이 없을 때, 즉 idle 상태에 있을 때 클라이언트나 서버는 특수한 데이터 패키지를 상대방에게 보내고 수신자가 이 데이터 메시지를 받은 후에 바로 특수한 데이터 메시지를 보내서 발송자에게 응답한다. 이것이 바로 PING-PONG 상호작용이다.자연스럽게 어느 한 쪽에서 심장 박동 메시지를 받은 후에 상대방이 여전히 온라인이라는 것을 알게 되면 TCP 연결의 유효성을 확보한다

How


우리는 두 가지 방식으로 심장 박동 메커니즘을 실현할 수 있다.
  • TCP 프로토콜 차원의keepalive 메커니즘을 사용합니다
  • 응용층에서 사용자 정의 심장 박동 메커니즘을 실현한다

  • TCP 프로토콜 차원에서 keepalive 보존 메커니즘을 제공했지만 이를 사용하는 데는 몇 가지 단점이 있다.
  • TCP의 표준 프로토콜이 아니며 기본적으로 닫힙니다
  • TCPkeepalive 메커니즘은 운영체제의 실현에 의존하고 기본적인keepalive의 심장 박동 시간은 두 시간이며keepalive의 수정은 시스템 호출(또는 시스템 설정 수정)이 필요하기 때문에 유연성이 부족하다
  • TCP keepalive와 TCP 프로토콜이 귀속되어 있기 때문에 UDP 프로토콜로 교체해야 할 때keepalive 메커니즘이 효력을 상실합니다..

  • TCP 차원의keepalive 메커니즘을 사용하는 것은 사용자 정의 응용층 심장 박동 메커니즘보다 유량을 절약하지만 위의 몇 가지 단점을 바탕으로 일반적인 실천에서 사람들은 대부분 응용층에서 사용자 정의 심장 박동을 실현하는 것을 선택한다.그렇다면 넷티에서 어떻게 심장이 뛰는지 대충 살펴보자.Netty에서 심장 박동 메커니즘을 실현하는 관건은 IdleStateHandler입니다. 채널의 읽기/쓰기에 타이머를 설정할 수 있습니다. 채널이 일정한 이벤트 간격 내에 데이터 상호작용이 없을 때 (즉 idle 상태) 지정된 이벤트를 촉발합니다.

    netty로 심장 박동 실현


    위에서 언급한 바와 같이 넷티에서 심장 박동 메커니즘을 실현하는 관건은 IdleStateHandler입니다. 그러면 이 Handler는 어떻게 사용합니까?그 구조기를 살펴보자.
    public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
        this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
    }

    IdleStateHandler를 실례화하려면 다음 세 가지 매개변수를 제공해야 합니다.
  • reader Idle Time Seconds, 읽기 시간 초과.즉, 지정된 시간 간격으로 채널에서 데이터를 읽지 못하면 READER_IDLE의 IdleState Event 이벤트.
  • writerIdleTimeSeconds, 쓰기 시간 초과.즉, 지정된 시간 내에 채널에 기록된 데이터가 없을 때 WRITER_IDLE의 IdleState Event 이벤트.
  • allIdleTimeSeconds, 읽기/쓰기 시간 초과.즉, 지정된 간격 내에 읽기 또는 쓰기 작업이 없으면 ALL_ 이 트리거됩니다.IDLE의 IdleState Event 이벤트.

  • 구체적인 IdleStateHandler가 실현하는 심장 박동 메커니즘을 보여주기 위해 다음은 구체적인 EchoServer의 예를 구성합니다. 이 예는 다음과 같습니다.
  • 이 예에서 클라이언트와 서버는 TCP 장거리 연결을 통해 통신을 한다
  • TCP 통신의 메시지 형식은:
  • +--------+-----+---------------+ 
    | Length |Type |   Content     |
    |   17   |  1  |"HELLO, WORLD" |
    +--------+-----+---------------+
  • 클라이언트는 랜덤 시간마다 서버에 메시지를 보내고 서버가 메시지를 받으면 받은 메시지를 클라이언트에게 고스란히 답장합니다
  • 클라이언트가 지정된 시간 간격으로 읽기/쓰기 작업을 하지 않으면 클라이언트는 자동으로 서버에 PING 하트비트를 전송합니다. 서버가 PING 하트비트 메시지를 받았을 때 PONG 메시지에 답장을 해야 합니다..

  • 공통 섹션


    위에서 정의한 행위에 따라 우리는 다음에 심장 박동을 실현하는 일반적인 부분인 Custom HeartbeatHandler:
    public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler {
        public static final byte PING_MSG = 1;
        public static final byte PONG_MSG = 2;
        public static final byte CUSTOM_MSG = 3;
        protected String name;
        private int heartbeatCount = 0;
    
        public CustomHeartbeatHandler(String name) {
            this.name = name;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
            if (byteBuf.getByte(4) == PING_MSG) {
                sendPongMsg(context);
            } else if (byteBuf.getByte(4) == PONG_MSG){
                System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
            } else {
                handleData(context, byteBuf);
            }
        }
    
        protected void sendPingMsg(ChannelHandlerContext context) {
            ByteBuf buf = context.alloc().buffer(5);
            buf.writeInt(5);
            buf.writeByte(PING_MSG);
            context.writeAndFlush(buf);
            heartbeatCount++;
            System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
        }
    
        private void sendPongMsg(ChannelHandlerContext context) {
            ByteBuf buf = context.alloc().buffer(5);
            buf.writeInt(5);
            buf.writeByte(PONG_MSG);
            context.channel().writeAndFlush(buf);
            heartbeatCount++;
            System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
        }
    
        protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            // IdleStateHandler   IdleStateEvent  .
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent) evt;
                switch (e.state()) {
                    case READER_IDLE:
                        handleReaderIdle(ctx);
                        break;
                    case WRITER_IDLE:
                        handleWriterIdle(ctx);
                        break;
                    case ALL_IDLE:
                        handleAllIdle(ctx);
                        break;
                    default:
                        break;
                }
            }
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
        }
    
        protected void handleReaderIdle(ChannelHandlerContext ctx) {
            System.err.println("---READER_IDLE---");
        }
    
        protected void handleWriterIdle(ChannelHandlerContext ctx) {
            System.err.println("---WRITER_IDLE---");
        }
    
        protected void handleAllIdle(ChannelHandlerContext ctx) {
            System.err.println("---ALL_IDLE---");
        }
    }

    클래스 Custom Heartbeat Handler는 심장 박동의 송신과 수신을 책임지고 그 작용을 상세하게 분석합니다.앞에서 언급한 바와 같이 IdleStateHandler는 심장 박동을 실현하는 관건이다. IO idle 유형에 따라 서로 다른 IdleState Event 사건이 발생한다. 이 사건의 포획은 사실
    userEventTriggered
    방법에서 실현된.
    Custom Heartbeat Handler를 살펴보겠습니다.userEventTriggered의 구체적인 구현:
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

    userEventTriggered에서 IdleStateEvent의state()에 따라 처리합니다.예를 들어 읽기 데이터 idle의 경우 e.state() = = READER_IDLE, 따라서
    handleReaderIdle을 호출하여 처리합니다.CustomHeartbeatHandler는 세 가지 idle 처리 방법을 제공합니다:handleReaderIdle,handleWriterIdle,handleAllIdle,
    이 세 가지 방법은 현재 기본적인 실현만 있을 뿐, 하위 클래스에서 다시 작성해야 한다. 현재 우리는 잠시 그것들을 생략하고, 구체적인 클라이언트와 서버의 실현 부분을 볼 때 그것들을 다시 볼 것이다.
    이 점을 알게 된 후 데이터 처리 부분을 살펴보겠습니다.
    @Override
    protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
        if (byteBuf.getByte(4) == PING_MSG) {
            sendPongMsg(context);
        } else if (byteBuf.getByte(4) == PONG_MSG){
            System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
        } else {
            handleData(context, byteBuf);
        }
    }

    Custom Heartbeat Handler에서.channelRead0에서 메시지 프로토콜에 따라 다음을 수행합니다.
    +--------+-----+---------------+ 
    | Length |Type |   Content     |
    |   17   |  1  |"HELLO, WORLD" |
    +--------+-----+---------------+

    현재 메시지 유형을 판단하기 위해 PING_MSG는 서버가 클라이언트로부터 PING 메시지를 받았음을 나타냅니다. 이때 서버는 PONG 메시지를 답장해야 합니다. 메시지 유형은 PONG_MSG. 메시지 던지기 유형은 퐁_MSG는 클라이언트가 서버에서 보낸 PONG 메시지를 받았음을 나타냅니다. 이때 로그를 인쇄하면 됩니다.

    클라이언트 섹션


    클라이언트 초기화:
    public class Client {
        public static void main(String[] args) {
            NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
            Random random = new Random(System.currentTimeMillis());
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap
                        .group(workGroup)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline p = socketChannel.pipeline();
                                p.addLast(new IdleStateHandler(0, 0, 5));
                                p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                                p.addLast(new ClientHandler());
                            }
                        });
    
                Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
                for (int i = 0; i < 10; i++) {
                    String content = "client msg " + i;
                    ByteBuf buf = ch.alloc().buffer();
                    buf.writeInt(5 + content.getBytes().length);
                    buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                    buf.writeBytes(content.getBytes());
                    ch.writeAndFlush(buf);
    
                    Thread.sleep(random.nextInt(20000));
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                workGroup.shutdownGracefully();
            }
        }
    }

    위의 코드는 Netty의 클라이언트 측의 초기화 코드입니다. Netty를 사용한 친구는 이 코드가 낯설지 않을 것입니다.다른 부분은 우리가 더 이상 군말 하지 않겠다. 우리 한번 보자
    ChannelInitializer.initChannel 섹션은 다음과 같습니다.
    .handler(new ChannelInitializer() {
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline p = socketChannel.pipeline();
            p.addLast(new IdleStateHandler(0, 0, 5));
            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
            p.addLast(new ClientHandler());
        }
    });

    우리는 pipeline에 세 개의 Handler를 추가했습니다.
    IdleStateHandler
    이handler는 심장 박동 메커니즘의 핵심입니다. 클라이언트 측에 읽기와 쓰기 시간을 초과했습니다. 시간 간격은 5s입니다. 즉, 만약에 손님이
    사용자 측에서 5s 간격으로 서버의 메시지를 받거나 서버에 메시지를 보내지 않으면 ALL_IDLE 이벤트.
    다음은 LengthFieldBasedFrameDecoder를 추가했습니다. 이것은 우리의 TCP 메시지를 해석하는 것을 책임지는 것입니다. 본고의 목적과 무관하기 때문에 여기는 상세하게 전개되지 않습니다.마지막 Handler는 ClientHandler입니다. 이것은 Custom HeartbeatHandler에 계승되어 저희가 업무 논리를 처리하는 부분입니다.


    클라이언트

    public class ClientHandler extends CustomHeartbeatHandler {
        public ClientHandler() {
            super("client");
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
            byte[] data = new byte[byteBuf.readableBytes() - 5];
            byteBuf.skipBytes(5);
            byteBuf.readBytes(data);
            String content = new String(data);
            System.out.println(name + " get content: " + content);
        }
    
        @Override
        protected void handleAllIdle(ChannelHandlerContext ctx) {
            super.handleAllIdle(ctx);
            sendPingMsg(ctx);
        }
    }

    ClientHandler는 Custom HeartbeatHandler의 계승으로 두 가지 방법을 다시 썼습니다. 하나는 Handle Data입니다. 이 안에서 받은 메시지만 출력할 수 있습니다.두 번째 다시 쓰는 방법은 handle All Idle입니다.앞에서 언급한 바와 같이 클라이언트는 심장이 뛰는 PING 메시지를 전송하고 클라이언트가 ALL_을 생성할 때IDLE 이벤트 후 상위 클래스의
    CustomHeartbeatHandler.userEventTriggered 호출, userEventTriggered 에서는 e.state () 에 따라 다른 방법을 호출합니다. 따라서 마지막으로 호출한 것은
    ClientHandler.handleAllIdle, 이 방법에서 클라이언트는sendPingMsg를 호출하여 서버에 PING 메시지를 보냅니다.

    서버 섹션


    서버 초기화
    public class Server {
        public static void main(String[] args) {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap
                        .group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline p = socketChannel.pipeline();
                                p.addLast(new IdleStateHandler(10, 0, 0));
                                p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                                p.addLast(new ServerHandler());
                            }
                        });
    
                Channel ch = bootstrap.bind(12345).sync().channel();
                ch.closeFuture().sync();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }

    서버의 초기화 부분도 말할 것이 없습니다. 클라이언트의 초기화와 마찬가지로 pipeline에 세 개의 Handler를 추가했습니다.
    서버 핸들러
    public class ServerHandler extends CustomHeartbeatHandler {
        public ServerHandler() {
            super("server");
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
            byte[] data = new byte[buf.readableBytes() - 5];
            ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
            buf.skipBytes(5);
            buf.readBytes(data);
            String content = new String(data);
            System.out.println(name + " get content: " + content);
            channelHandlerContext.write(responseBuf);
        }
    
        @Override
        protected void handleReaderIdle(ChannelHandlerContext ctx) {
            super.handleReaderIdle(ctx);
            System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
            ctx.close();
        }
    }

    ServerHandler는 CustomHeartbeatHandler의 계승으로 두 가지 방법을 다시 썼다. 하나는 HandleData이고 이 안에서 Echo Server의 기능을 실현한다. 즉, 클라이언트의 메시지를 받은 후 바로 클라이언트에게 메시지를 고스란히 답장하는 것이다.두 번째 다시 쓰는 방법은handleReaderIdle입니다. 서버는 클라이언트의 읽기idle에만 관심이 있기 때문에 이 방법만 다시 썼습니다.서버가 지정된 시간 이후에 클라이언트로부터 메시지를 받지 못하면 READER_IDLE 메시지는 HandleReaderIdle을 호출합니다.앞에서 언급한 바와 같이 클라이언트는 심장이 뛰는 PING 메시지를 보내고 서버의 READER_IDLE의 시간 초과 시간은 클라이언트가 PING 메시지를 보내는 간격의 두 배이므로 서버 READER_IDLE가 터치하면 클라이언트가 오프라인 상태임을 확인할 수 있으므로 서버가 클라이언트 연결을 직접 닫으면 됩니다.

    총결산

  • Netty를 사용하여 심장 박동 메커니즘을 실현하는 관건은 IdleStateHandler를 이용하여 대응하는 idle 이벤트를 생성하는 것이다
  • 일반적으로 클라이언트는 심장이 뛰는 PING 메시지를 보내는 것을 책임지기 때문에 클라이언트는 ALL_에 주의를 기울인다IDLE 이벤트, 이 이벤트가 발생한 후 클라이언트는 서버에 PING 메시지를 보내서'나는 아직 살아 있다'고 알려야 합니다
  • 서버는 클라이언트의 PING 메시지를 수신하기 때문에 서버가 주목하는 것은 READER_IDLE 이벤트 및 서버의 READER_IDLE 간격은 클라이언트의 ALL_보다 필요합니다.IDLE 이벤트 간격이 큽니다(예: 클라이언트 ALL_IDLE은 5s에서 읽기 및 쓰기가 없을 때 트리거되므로 서버의 READER_IDLE은 10s로 설정할 수 있습니다)
  • 서버가 클라이언트의 PING 메시지를 받으면 PONG 메시지를 답장으로 보냅니다.PING - PONG 소식은 심장 박동이 상호작용하는 것이다
  • 좋은 웹페이지 즐겨찾기