MINA 프레임워크의 인코딩 및 패키지 분리 처리

15795 단어 android
우리는 MINA에서 책임체인을 사용하여 2진 바이트 흐름 데이터를 자바 대상으로 전환하거나 자바 대상을 2진 바이트 흐름 데이터로 전환하는 것을 실현하는 것을 알고 있다. 그러면 이 전환 과정은 도대체 어떻게 진행되는 것일까?이것은 MINA의 인코딩과 디코딩 문제와 관련된다.먼저 디코딩 프로세스를 살펴보겠습니다.
서버에서 클라이언트가 보낸 메시지를 읽을 때AbstractPolling Io Processor에 있는read 방법을 실행합니다. 왜냐하면 이전에 MINA에 대한 원본 분석에서 모든 세션의 요청에 대해 실제 집행자는 Io Processor 대상이고read 방법에서 현재 Io 세션에 대응하는 Io Filter 책임 체인을 얻을 수 있기 때문입니다.이어서 책임체인의 FireMessage Received 방법을 사용하고 FireMessage Received에서는callNext Message Received 방법을 사용하며callNext Message Received 방법에서IoFilter의 메시지 Received 방법을 실행한다. 이 방법의 진정한 실현은Protocol CodecFilter에 있다. 그의 구체적인 실현을 살펴보자.
        ProtocolCodecFilter$messageReceived()
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
        LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());

        if (!(message instanceof IoBuffer)) {
            nextFilter.messageReceived(session, message);
            return;
        }

        IoBuffer in = (IoBuffer) message;
        ProtocolDecoder decoder = factory.getDecoder(session);
        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);

        // Loop until we don't have anymore byte in the buffer,
        // or until the decoder throws an unrecoverable exception or
        // can't decoder a message, because there are not enough
        // data in the buffer
        while (in.hasRemaining()) {
            int oldPos = in.position();
            try {
                synchronized (session) {
                    // Call the decoder with the read bytes
                    decoder.decode(session, in, decoderOut);
                }
                // Finish decoding if no exception was thrown.
                decoderOut.flush(nextFilter, session);
            } catch (Exception e) {
                ProtocolDecoderException pde;
                if (e instanceof ProtocolDecoderException) {
                    pde = (ProtocolDecoderException) e;
                } else {
                    pde = new ProtocolDecoderException(e);
                }
                if (pde.getHexdump() == null) {
                    // Generate a message hex dump
                    int curPos = in.position();
                    in.position(oldPos);
                    pde.setHexdump(in.getHexDump());
                    in.position(curPos);
                }
                // Fire the exceptionCaught event.
                decoderOut.flush(nextFilter, session);
                nextFilter.exceptionCaught(session, pde);
                // Retry only if the type of the caught exception is
                // recoverable and the buffer position has changed.
                // We check buffer position additionally to prevent an
                // infinite loop.
                if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
                    break;
                }
            }
        }
    }
이 방법의 네 번째 줄은 현재 메시지가IoBuffer인지 아닌지를 먼저 판단합니다. 그렇지 않으면 nextFilter의 메시지 Received를 호출하여 다음 IoFilter에서 처리하고 현재 IoFilter 처리를 끝냅니다. 여기의nextFilter는 EntryImpl의 대상입니다. 그는DefaultIoFilterChain의 내부 클래스입니다.그의 메시지 Received 방법을 살펴보겠습니다.
        DefaultIoFilterChain#EntryImpl
public void messageReceived(IoSession session, Object message) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextMessageReceived(nextEntry, session, message);
                }
실제로 호출된 것은
callNextMessageReceived 메서드
callNextMessageReceived 메서드가 다시 호출됩니다.
IoFilter의 메시지 Received 방법은 현재의 메시지 대상이 IoBuffer인지 아닌지를 계속 판단한다. 지금까지는 간접적인 귀속 작업에 해당한다.
현재 메시지가IoBuffer라면 10줄에서 디코더 공장을 통해 디코더 Protocol Decoder를 얻을 수 있습니다. Protocol Decoder 인터페이스를 통해 디코더를 실현할 수 있습니다. 17줄에서 현재 IoBuffer에 디코더가 필요한 내용이 있는지 판단하고 존재하면 22줄의 디코더를 실행하는 방법입니다.만약 디코더가 우리 스스로 실현된다면 이 방법은 우리 스스로 실현될 것이다. 그리고 우리가 실현한 decode 방법으로 이 메시지를 디코딩한 후에Protocol Decoder Output의 write 방법을 호출하여 디코더된 정보를 대기열에 추가한다.
        AbstractProtocolDecoderOutput$write()
public void write(Object message) {
        if (message == null) {
            throw new IllegalArgumentException("message");
        }

        messageQueue.add(message);
    }
write 방법을 실행한 후
Protocol Codec Filter $message Received () 의 25 줄에서 flush 방법을 실행했습니다. 이 방법은 현재 해석된 정보를 전달하고 다음 IoFilter에서 처리합니다. flush 방법의 구체적인 실현은 Protocol Codec Filter의 정적 내부 클래스인 Protocol Decoder Output Impl 안에 있습니다.
        ProtocolDecoderOutputImpl$flush
public void flush(NextFilter nextFilter, IoSession session) {
            Queue messageQueue = getMessageQueue();

            while (!messageQueue.isEmpty()) {
                nextFilter.messageReceived(session, messageQueue.poll());
            }
        }
사실 실행하는 방법은 넥스트Filter의 메시지 Received 방법입니다. 이렇게 계속 실행하면 책임체인의 끝부분인 TailFilter에 도달할 때까지TailFilter를 실행합니다.
메시지 Received, 사실상 IoHandler를 실행합니다
메시지 Received 방법입니다. 이 방법에서 저희 업무 정보를 처리하면 됩니다.
이것이 바로 디코딩 과정입니다. 그러면 인코딩 과정은 어떤 모양입니까?실제로는 역방향 디코딩 과정이다.
클라이언트가 MINA를 통해 서버에 데이터를 전달하려면 먼저 IoSession 대상을 얻어 그의 write 방법을 실행해야 한다. 구체적인 실현은 AbstractIoSession에서 이루어진 것이다. write 방법에서 현재 IoSession에 대응하는 IoFilter 체인을 얻고 IoFilter 체인의 FireFilter Write 방법을 호출한다.실제로 실행된 것은DefaultIo Filter Chain 안의 Fire Filter Write 방법에서call Previous Filter Write 방법을 실행하고call Previous Filter Write에서Io Filter Write 방법을 실행한다. 이 방법은 우리가 말하고자 하는 중점이다. 그의 실현은Protocol Codec Filter 안에 있다.
        ProtocolCodecFilter$filterWrite
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
        Object message = writeRequest.getMessage();

        // Bypass the encoding if the message is contained in a IoBuffer,
        // as it has already been encoded before
        if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
            nextFilter.filterWrite(session, writeRequest);
            return;
        }

        // Get the encoder in the session
        ProtocolEncoder encoder = factory.getEncoder(session);

        ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);

        if (encoder == null) {
            throw new ProtocolEncoderException("The encoder is null for the session " + session);
        }

        try {
            // Now we can try to encode the response
            encoder.encode(session, message, encoderOut);

            // Send it directly
            Queue bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();

            // Write all the encoded messages now
            while (!bufferQueue.isEmpty()) {
                Object encodedMessage = bufferQueue.poll();

                if (encodedMessage == null) {
                    break;
                }

                // Flush only when the buffer has remaining.
                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
                    SocketAddress destination = writeRequest.getDestination();
                    WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);

                    nextFilter.filterWrite(session, encodedWriteRequest);
                }
            }

            // Call the next filter
            nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
        } catch (Exception e) {
            ProtocolEncoderException pee;
public void filterWrite(IoSession session, WriteRequest writeRequest) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterWrite(nextEntry, session, writeRequest);
                }
}
              ,            message    
  IoBuffer,      ,     nextFilter filterWrite  ,    
  filterWrite        
  DefaultIoFilterChain   EntryImpl  filterWrite 
   
  

       DefaultIoFilterChain$EntryImpl

public void filterWrite(IoSession session, WriteRequest writeRequest) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterWrite(nextEntry, session, writeRequest);
                }

이 방법은 실제적으로callPreviousFilterWrite 방법을 실행하고callPreviousFilterWrite 방법에서 IoFilter의filterWrite 방법을 실행하는 것을 볼 수 있다. 이렇게 간접 귀속 작업이 형성되었고 현재 메시지가IoBuffer 대상일 때까지 12번째 줄에서ProtocolEncoder 인터페이스를 실현하는 인코더를 계속 아래로 실행한다.22 줄에서 인코더의 encode 방법을 호출하여 인코딩을 한다. 이 방법은 인코더의 encode 방법을 우리가 직접 실현할 수 있다. 우리는 자신이 실현한 encode 방법에서Protocol Encoder Output의 write 방법을 호출하여 인코딩 결과를 대기열에 쓰고 마지막으로nextFilter의 Filter Write 방법을 호출하여 현재 결과를 다음 필터에 전달한다.이것이 바로 전체 인코딩 과정이다.
코딩 과정을 설명하고 또 하나의 문제는 미나가 데이터를 전달하는 과정에서 나타나는 스티커와 끊기는 현상을 어떻게 해결하는가이다.
먼저 스티커의 개념을 말하자면 TCP 프로토콜에서 발송자가 보낸 몇 개의 패키지 데이터가 수신자가 수신할 때 한 패키지로 붙는 것을 말한다. 수신 버퍼를 보면 뒷 패키지 데이터의 머리가 앞의 패키지 데이터의 끝에 바짝 붙어 있다.발생할 수 있는 원인: 송신단은 버퍼가 가득 차서 발송해야 하기 때문에 패키지 수신자가 버퍼의 패키지를 제때에 수신하지 않아 여러 개의 패키지가 수신된다.
단봉의 개념: 즉 데이터가 완전하지 않다는 것이다. 예를 들어 가방이 너무 크면 가방을 여러 개의 작은 가방으로 분해하여 여러 번 발송하여 매번 데이터를 받을 때마다 완전하지 못하게 한다.
메시지 형식은 두 가지가 있는데 하나는 메시지 길이 + 메시지 헤더 + 메시지체이다. 즉, 전 N 바이트는 메시지의 길이를 저장하는 데 사용되며, 현재 메시지가 언제 끝날지 판단하는 데 사용된다.하나는 메시지 헤더 + 메시지 헤더, 즉 고정된 길이의 메시지로, 앞의 몇 바이트는 메시지 헤더이고, 뒤의 것은 메시지 헤더이다.MINA에서 사용하는 것은 전자가 실현한 것으로 구체적으로 말하면 4바이트를 통해 메시지의 길이를 저장하여 현재 메시지가 언제 끝날지 판단하는 데 사용한다.
MINA에서 스티커와 패키지 끊기 문제를 해결하려면 우리의 디코더는 Cumulative Protocol Decoder 추상 클래스를 실현해야 한다. 우리가 이전에 디코더를 실현했다면 Protocol Decoder 인터페이스를 실현한 것이다. 이 인터페이스를 단독으로 실현하면 스티커와 패키지 끊기 문제를 해결할 수 없다. Cumulative Protocol Decoder 추상 클래스는 Protocol Decoder Adapter 추상 클래스를 계승했다.이 클래스는 Protocol Decoder 인터페이스를 실현했고 Cumulative Protocol Decoder에서 Dodecode라는 방법을 추가했다. 이 방법은 우리가 Cumulative Protocol Decoder 추상적인 클래스를 실현할 때 스스로 실현한 것이다. 이 방법은 되돌아오는 값이다. 바로 이 되돌아오는 값이 존재하기 때문에 우리는 패키지 문제를 해결할 수 있고true로 되돌아가면이미 완전한 가방이 되었다는 것을 나타낸다. 우리는 디코딩을 할 수 있다.false로 돌아가면 패키지가 끊겼다는 것을 의미한다. 우리는 그것을 캐시하고 완전한 가방을 구성한 후에 디코딩을 해야 한다.그럼 Dodecode는 누가 호출한 건가요?당연히 decode죠. 구체적으로 decode에서 무슨 일을 했는지 봅시다.
 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (!session.getTransportMetadata().hasFragmentation()) {
            while (in.hasRemaining()) {
                if (!doDecode(session, in, out)) {
                    break;
                }
            }

            return;
        }

        boolean usingSessionBuffer = true;
        IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
        // If we have a session buffer, append data to that; otherwise
        // use the buffer read from the network directly.
        if (buf != null) {
            boolean appended = false;
            // Make sure that the buffer is auto-expanded.
            if (buf.isAutoExpand()) {
                try {
                    buf.put(in);
                    appended = true;
                } catch (IllegalStateException e) {
                    // A user called derivation method (e.g. slice()),
                    // which disables auto-expansion of the parent buffer.
                } catch (IndexOutOfBoundsException e) {
                    // A user disabled auto-expansion.
                }
            }

            if (appended) {
                buf.flip();
            } else {
                // Reallocate the buffer if append operation failed due to
                // derivation or disabled auto-expansion.
                buf.flip();
                IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
                newBuf.order(buf.order());
                newBuf.put(buf);
                newBuf.put(in);
                newBuf.flip();
                buf = newBuf;

                // Update the session attribute.
                session.setAttribute(BUFFER, buf);
            }
        } else {
            buf = in;
            usingSessionBuffer = false;
        }

        for (;;) {
            int oldPos = buf.position();
            boolean decoded = doDecode(session, buf, out);
            if (decoded) {
                if (buf.position() == oldPos) {
                    throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
                }

                if (!buf.hasRemaining()) {
                    break;
                }
            } else {
                break;
            }
        }

        // if there is any data left that cannot be decoded, we store
        // it in a buffer in the session and next time this decoder is
        // invoked the session buffer gets appended to
        if (buf.hasRemaining()) {
            if (usingSessionBuffer && buf.isAutoExpand()) {
                buf.compact();
            } else {
                storeRemainingInSession(buf, session);
            }
        } else {
            if (usingSessionBuffer) {
                removeSessionBuffer(session);
            }
        }
    }

Cumulative Protocol Decoder를 보면 AttributeKey 형식의 속성 대상 버퍼가 존재하는데 이 버퍼는 사실상 패키지 데이터를 저장하는 데 사용됩니다.
private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");

먼저 13행에서 지난번 패키지 데이터를 얻고 16행에서 패키지 데이터가 존재하는지 판단한다. 존재하면if문장 블록에 들어가고 19행에서 현재 IoBuffer가 확장을 허용하는지 판단한다. 허용하면if문장 블록에 들어가고 21행에서 패키지 수량과 현재 데이터를 연결하며 appended 로고를 수정한다.이어 31줄에서 appended의 값을 판단한다.true와 같으면 31줄을 실행하고 IoBuffer를 읽기 모드로 설정한다. 그렇지 않으면 현재 IoBuffer가 확장을 허용하지 않는다는 뜻이다. 37줄에 새로운 IoBuffer 대상을 만들어서 패키지 데이터와 현재 입력된 데이터를 연결해야 한다. 마지막으로 42줄에서 연결이 완성된 IoBuffer를 원래의 IoBuffer에 부여한다.그 다음에 52줄의 사순환에 들어갑니다. 54줄에서 추상적인 클래스인 Cumulative Protocol Decoder의do Decode 방법을 호출하여 디코딩을 하고 그의 되돌아오는 값을 가져옵니다. 만약에 되돌아오는 값이false라면 순환을 종료합니다. 이것은 패키지가 끊겼다는 뜻입니다. 이 패키지의 내용을 저장해야 합니다. 그 다음 71줄에서 81줄은 패키지 내용을 IoSession에 저장합니다.다음에 decode 방법을 호출할 때 우리는 이번에 IoSession에 저장된 패키지 끊기 내용을 얻을 수 있다. 이것이 바로 패키지 끊기 처리 과정이다.
그럼 끈적끈적한 가방은 어떻게 처리해요?위에서 우리는 52줄에서 66줄에 대한 설명이 좀 거칠다. 사실 이 부분은 스티커에 대한 처리이다. 왜 사순환을 사용했는지 생각해 보면 스티커의 뜻은 수신자에 약간의 데이터가 한데 붙었다는 것을 알 수 있다. 그러면 우리는 이 소식을 여러 번 처리해야 한다. 왜냐하면 우리가 실현한doDecode 방법은 디코딩 규칙이기 때문이다.패키지에 있는 완전한 데이터가 한 개가 아닐 수도 있습니다. 우리는 당연히 순환 처리를 해야 합니다. 나머지 메시지 데이터가 더 이상 완전한 메시지가 아니거나 IoBuffer에 데이터가 존재하지 않을 때까지, 즉 61줄과 64줄에서 우리가 본 두 개의break가 순환을 종료해야 합니다.
이상은 미나의 코딩 및 스티커, 끊기 처리였습니다!

좋은 웹페이지 즐겨찾기