AIO 모델

글 목록
  • 1. AIO: 비동기 비 차단 안내
  • 2. AsynchronousServerSocketChannel: AIO 에서 네트워크 통신 서버 Socket
  • 3. AIO 프로 그래 밍
  • 서버
  • 클 라 이언 트

  • 1. AIO: 비동기 비 차단 안내
    AIO 는 운영 체제 의 지원 이 필요 합 니 다. Liux 커 널 2.6 버 전에 서 진정한 비동기 IO 에 대한 지원 을 추 가 했 습 니 다. 자바 는 jdk 1.7 부터 AIO 를 지원 합 니 다.
    핵심 클래스 는 AsynchronousSocketChannel, AsynchronousServerSocketChannel, AsynchronousChannel Group 이다
    AsynchronousChannel Group 은 비동기 채널 의 그룹 관리자 로 자원 공 유 를 실현 할 수 있 습 니 다.
    AsynchronousChannel Group 을 만 들 때 ExecutorService, 즉 하나의 스 레 드 탱크 를 연결 해 야 합 니 다. 이 스 레 드 탱크 는 두 가지 임 무 를 수행 합 니 다. IO 이벤트 처리 와 CompletionHandler 리 셋 인 터 페 이 스 를 촉발 합 니 다.
    2. AsynchronousServerSocketChannel: AIO 에서 네트워크 통신 서버 Socketaccept() 방법: AsynchronousServerSocketChannel 이 생 성 된 후에 ServerSocket 과 유사 하고 accept() 방법 으로 클 라 이언 트 의 연결 을 받 아들 일 수 있 습 니 다. 비동기 IO 의 실제 IO 작업 은 운영 체제 에 맡 겨 진 것 이기 때문에 사용자 프로 세 스 는 운영 체제 에 IO 를 진행 하고 운영 체제 IO 가 완 성 된 알림 만 받 습 니 다.따라서 비동기 적 인 ServerChannel 호출 accept() 방법 을 사용 하면 현재 스 레 드 가 막 히 지 않 습 니 다. 프로그램 도 accept () 방법 이 언제 클 라 이언 트 요청 을 받 을 수 있 는 지 모 르 고 운영 체제 가 네트워크 IO 를 완성 합 니 다. 이 문 제 를 해결 하기 위해 AIO 는 accept () 방법 에 두 가지 버 전 을 제공 합 니 다.
  • Future accept(): 클 라 이언 트 요청 을 받 기 시 작 했 습 니 다. 현재 스 레 드 가 네트워크 IO (즉 획득 AsynchronousSocketChannel 를 해 야 한다 면 이 방법 으로 되 돌아 오 는 Future 대상 get() 방법 을 사용 해 야 합 니 다. 그러나 get() 방법 은 이 스 레 드 를 막 을 수 있 기 때문에 이 방식 은 차단 식 비동기 IO 입 니 다.
  • void accept (Aattachment,CompletionHandler handler): 클 라 이언 트 의 요청 을 받 아들 이기 시작 하면 연결 이 성공 하거나 실패 하면 CompletionHandler 대상 의 해당 방법 을 촉발 합 니 다.이 가운데 AsynchronousSocketChannel 프로세서 가 연결 을 처리 하 는 데 성공 한 result 는 CompletionHandler 의 인 스 턴 스 입 니 다.한편, AsynchronousSocketChannel 인터페이스 에서 두 가지 방법 을 정 의 했 습 니 다. CompletionHandler IO 가 완 료 될 때 이 방법 을 촉발 합 니 다. 이 방법의 첫 번 째 매개 변 수 는 IO 작업 이 돌아 오 는 대상 을 대표 하고 두 번 째 매개 변 수 는 IO 작업 을 시작 할 때 들 어 오 는 추가 매개 변 수 를 대표 합 니 다.completed(V result , A attachment): IO 가 실 패 했 을 때 이 방법 을 촉발 합 니 다. 첫 번 째 매개 변 수 는 IO 작업 실패 로 인 한 이상 이나 오 류 를 대표 합 니 다.

  • 3. AIO 프로 그래 밍
    서버
    public class AioServer {
         
    
        private static int DEFAULT_PORT = 12345;
        private static ServerHandler serverHandle;
        public volatile static long clientCount = 0;
        public static void start(){
         
            start(DEFAULT_PORT);
        }
        public static synchronized void start(int port){
         
            if(serverHandle!=null)
                return;
            serverHandle = new ServerHandler(port);
            new Thread(serverHandle,"Server").start();
        }
        public static void main(String[] args) {
         
            AioServer.start();
        }
    }
    
    public class ServerHandler implements Runnable{
         
        private AsynchronousServerSocketChannel channel;
    
        public ServerHandler(int port) {
         
            try {
         
                //       
                channel = AsynchronousServerSocketChannel.open();
                //    
                channel.bind(new InetSocketAddress(port));
                System.out.println("      ,   :"+port);
            } catch (IOException e) {
         
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
         
            channel.accept(this, new AcceptHandler());
    //        Future  accept = channel.accept();
            //                      
            //  1: while(true)+sleep
            while (true) {
         
                try {
         
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
         
                    e.printStackTrace();
                }
            }
    
    //        //  2 CountDownLatch   :              ,              ,       ,            
    //
    //        CountDownLatch count = new CountDownLatch(1);
    //        channel.accept(this, new AcceptHandler());
    //        try {
         
    //            count.await();
    //        } catch (InterruptedException e) {
         
    //            e.printStackTrace();
    //        }
        }
    
        // CompletionHandler
        // V-IO     ,          ,AsynchronousSocketChannel
       // A-IO    ,    AsynchronousServerSocketChannel             
    
        class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, ServerHandler> {
         
    
            @Override
            public void completed(AsynchronousSocketChannel channel, ServerHandler serverHandler) {
         
                //    Buffer
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //                    Handler
    //            channel.read(buffer, buffer, new ReadHandler(channel));
                //           
                serverHandler.channel.accept(null, this);
            }
    
            @Override
            public void failed(Throwable exc, ServerHandler serverHandler) {
         
                exc.printStackTrace();
            }
        }
    
        class ReadHandler implements CompletionHandler<ByteBuffer, ByteBuffer> {
         
            //           channel
            private AsynchronousSocketChannel channel;
    
            public ReadHandler(AsynchronousSocketChannel channel) {
         
                this.channel = channel;
            }
    
            @Override
            public void completed(ByteBuffer result, ByteBuffer attachment) {
         
                result.flip();
                byte[] msg = new byte[result.remaining()];
                result.get(msg);
    
                try {
         
                    String expression = new String(msg, "UTF-8");
                    System.out.println("       : " + expression);
    //                String result1 = "       
    ";
    result.clear(); // doWrite(expression); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } // private void doWrite(String msg) { byte[] bytes = msg.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(bytes); buffer.flip(); // channel.write(buffer, buffer, new CompletionHandler <Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { // , if (attachment.hasRemaining()) { channel.write(attachment, attachment, this); } else { // Buffer ByteBuffer allocate = ByteBuffer.allocate(1024); // Handler // channel.read(allocate, attachment, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } }

    클 라 이언 트
    public class AioClient {
         
        private static String DEFAULT_HOST = "127.0.0.1";
        private static int DEFAULT_PORT = 12345;
        private static ClientHandler clientHandle;
        public static void start(){
         
            start(DEFAULT_HOST,DEFAULT_PORT);
        }
        public static synchronized void start(String ip,int port){
         
            if(clientHandle!=null)
                return;
            clientHandle = new ClientHandler(ip,port);
            new Thread(clientHandle,"Client").start();
        }
        //        
        public static boolean sendMsg(String msg) throws Exception{
         
            if(msg.equals("exit")) return false;
            clientHandle.sendMsg(msg);
            return true;
        }
        public static void main(String[] args) throws Exception{
         
            AioClient.start();
            System.out.println("       :");
            Scanner scanner = new Scanner(System.in);
            while(AioClient.sendMsg(scanner.nextLine()));
        }
    
    }
    
    public class ClientHandler implements Runnable{
         
        private AsynchronousSocketChannel clientChannel;
        private String host;
        private int port;
        private CountDownLatch latch;
        public ClientHandler(String host, int port) {
         
            this.host = host;
            this.port = port;
            try {
         
                //          
                clientChannel = AsynchronousSocketChannel.open();
            } catch (IOException e) {
         
                e.printStackTrace();
            }
        }
        @Override
        public void run() {
         
            //  CountDownLatch  
    //        latch = new CountDownLatch(1);
            //        ,           ,         completed  
            clientChannel.connect(new InetSocketAddress(host, port), this, new AcceptHandler());
            while (true) {
         
                try {
         
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
         
                    e.printStackTrace();
                }
            }
    //        try {
         
    //            latch.await();
    //        } catch (InterruptedException e1) {
         
    //            e1.printStackTrace();
    //        }
    //        try {
         
    //            clientChannel.close();
    //        } catch (IOException e) {
         
    //            e.printStackTrace();
    //        }
        }
    
        //        
        public void sendMsg(String msg){
         
            byte[] req = msg.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            System.out.println(">>>>>>msg:"+msg);
            writeBuffer.put(req);
            writeBuffer.flip();
            //   
            clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel));
        }
    
    
        /**
         *    
         */
        class AcceptHandler implements CompletionHandler<Void, ClientHandler> {
         
    
            public AcceptHandler() {
         }
    
            @Override
            public void completed(Void result, ClientHandler attachment) {
         
                System.out.println("       ");
        }
    
            @Override
            public void failed(Throwable exc, ClientHandler attachment) {
         
                exc.printStackTrace();
                try {
         
                    attachment.clientChannel.close();
                } catch (IOException e) {
         
                    e.printStackTrace();
                }
            }
        }
    
        class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
         
            private AsynchronousSocketChannel channel;
    
            public WriteHandler(AsynchronousSocketChannel channel) {
         
                this.channel = channel;
            }
    
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
         
                //         
                if (attachment.hasRemaining()) {
         
                    //      ,   
                    System.out.println("WriteHandler.hasRemaining>>>>>");
                    clientChannel.write(attachment, attachment, this);
                } else {
         
                    //    
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel));
                }
            }
    
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
         
                exc.printStackTrace();
                try {
         
                    channel.close();
                } catch (IOException e) {
         
                    e.printStackTrace();
                }
            }
        }
    
        class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
         
            private AsynchronousSocketChannel clientChannel;
            public ReadHandler(AsynchronousSocketChannel clientChannel) {
         
                this.clientChannel = clientChannel;
            }
            @Override
            public void completed(Integer result,ByteBuffer buffer) {
         
                buffer.flip();
                byte[] bytes = new byte[buffer.remaining()];
                buffer.get(bytes);
                String body;
                try {
         
                    body = new String(bytes,"UTF-8");
                    System.out.println("       :"+ body);
                } catch (UnsupportedEncodingException e) {
         
                    e.printStackTrace();
                }
            }
            @Override
            public void failed(Throwable exc,ByteBuffer attachment) {
         
                System.err.println("      ...");
                try {
         
                    clientChannel.close();
                } catch (IOException e) {
         
                }
            }
        }
    }
    

    좋은 웹페이지 즐겨찾기