JAVA 병행 처리 경험 (4) 병행 모드 와 알고리즘 6: NIO 네트워크 프로 그래 밍

머리말
우선 NIO 의 기본 개념 을 알 아야 합 니 다.
channel: NIO 의 한 통로 입 니 다. 우리 가 말 한 흐름 과 같 습 니 다.파이프
Buffer: byte 배열 로 이해 합 니 다.채널 과 교류 하 다.물줄기
selector: selectable Chancel 이 구현 되 었 습 니 다. 스 레 드 관리 - 선택 기
NIO 프로 그래 밍
2.1 NIO 서버
package pattern.nio;


import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by ycy on 16/1/19.
 */
public class NioServer {
    //   
    private Selector selector;
    //   
    private ExecutorService tp = Executors.newCachedThreadPool();
    //     map
    public static Map<Socket, Long> time_stat = new HashMap<Socket, Long>(10240);

    public void startServer() throws Exception {
        //1' selectorPrivider       ,        
        selector = SelectorProvider.provider().openSelector();
        //2'       
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //block -     true,            ;    false,             
        ssc.configureBlocking(false);

        InetSocketAddress isa = new InetSocketAddress(65500);
        // InetSocketAddress isa=new InetSocketAddress(8000);
        //               。  ServerSocket        (IP       )
        ssc.socket().bind(isa);
        //         ,
        SelectionKey accpetKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
        try {
            for (; ; ) {
                selector.select();//    
                Set readykeys = selector.selectedKeys();//        keys
                Iterator i = readykeys.iterator();//  
                long e = 0;
                while (i.hasNext()) {
                    SelectionKey sk = (SelectionKey) i.next();
                    i.remove();//    ,      
                    if (sk.isAcceptable()) {
                        doAccept(sk);//       ,  
                    } else if (sk.isValid() && sk.isReadable()) {//     
                        if (!time_stat.containsKey(((SocketChannel) sk.channel()).socket())) {
                            // socket   map
                            time_stat.put(((SocketChannel) sk.channel()).socket(),
                                    System.currentTimeMillis());//       
                            //  
                            doRead(sk);
                        }
                    } else if (sk.isValid() && sk.isWritable()) {
                        // 
                        doWrite(sk);
                        e = System.currentTimeMillis();
                        long b = time_stat.remove(((SocketChannel) sk.channel()).socket());
                        System.out.println("spend:" + (e - b) + "ms");//        
                    }
                }

            }
        } catch (ClosedSelectorException e) {
            System.out.println("       ");
        }

    }

    /*
            
     */
    private void doAccept(SelectionKey sk) {

        try {
            ServerSocketChannel server = (ServerSocketChannel) sk.channel();
            SocketChannel clientChannel;
            clientChannel = server.accept();//    channel        
            clientChannel.configureBlocking(false);//     
            //Register this channel for reading
            SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
            //Allocate an Echoclient instance adn attach it to this selction key
            EchoClient echoClient = new EchoClient();//            
            clientKey.attach(echoClient);//    ,        


            InetAddress clientAddress = clientChannel.socket().getInetAddress();
            System.out.println("Acceprted connection form " + clientAddress.getHostAddress() + ".")
            ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //      
    private void doRead(SelectionKey sk) {
        SocketChannel channel = (SocketChannel) sk.channel();
        ByteBuffer bb = ByteBuffer.allocate(8192);
        int len;
        try {
            len = channel.read(bb);
            if (len < 0) {
                disconnect(sk);
                return;
            }
        } catch (Exception e) {
            System.out.println("faild to read from client");
            e.printStackTrace();
            disconnect(sk);
            return;
        }

        bb.flip();
        tp.execute(new HanldeMsg(sk, bb));
    }

    private void disconnect(SelectionKey sk) {
        try {

            SocketChannel channel = (SocketChannel) sk.channel();
            channel.close();
            //sk.cancel();
            sk.selector().close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //     
    private void doWrite(SelectionKey sk) {
        SocketChannel channel = (SocketChannel) sk.channel();
        EchoClient echoClient = (EchoClient) sk.attachment();
        LinkedList<ByteBuffer> outq = echoClient.getOutputQuquq();

        ByteBuffer bb = outq.getLast();

        try {
            int len = channel.write(bb);
            if (len == -1) {
                disconnect(sk);
                return;
            }
            if (bb.remaining() == 0) {
                //    
                outq.removeLast();
            }

        } catch (Exception e) {
            System.out.println("Faild to write to client");
            e.printStackTrace();
            disconnect(sk);
        }
        if (outq.size() == 0) {//   
            sk.interestOps(SelectionKey.OP_READ);
        }
    }

    /////////////////     /////////////////////
    class EchoClient {
        private LinkedList<ByteBuffer> outq;

        EchoClient() {
            outq = new LinkedList<ByteBuffer>();
        }

        public LinkedList<ByteBuffer> getOutputQuquq() {
            return outq;
        }

        public void enqueue(ByteBuffer bb) {
            outq.addFirst(bb);
        }
    }

    //        EchClient,        u  ,            op_write
    class HanldeMsg implements Runnable {
        SelectionKey sk;
        ByteBuffer bb;

        public HanldeMsg(SelectionKey sk, ByteBuffer bb) {
            this.sk = sk;
            this.bb = bb;
        }

        public void run() {
            EchoClient echoClient = (EchoClient) sk.attachment();
            echoClient.enqueue(bb);
            sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            //  selector    
            selector.wakeup();
        }
    }

}

2.2NIO main 방법
package pattern.nio;

/**
 * Created by ycy on 16/1/20.
 */
public class NioMain {
    public static void main(String[] args) throws Exception {
        NioServer nioServer=new NioServer();
        nioServer.startServer();
    }
}

2.3 NIO 클 라 이언 트
package pattern.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;

/**
 * Created by ycy on 16/1/20.
 */
public class NioClient {
    private Selector selector;
    public void init(String ip,int port) throws IOException {
        SocketChannel channel=SocketChannel.open();
        channel.configureBlocking(false);

        this.selector= SelectorProvider.provider().openSelector();
        channel.connect(new InetSocketAddress(ip,port));
        channel.register(selector,SelectionKey.OP_CONNECT);

    }
    public void working() throws IOException {
        while (true){
            if (!selector.isOpen()){
                break;
            }
                selector.select();
                Iterator<SelectionKey> ite=this.selector.selectedKeys().iterator();
                while (ite.hasNext()){
                    SelectionKey key=ite.next();
                    ite.remove();
                    //    
                    if (key.isConnectable()){
                        connect(key);
                    }else if(key.isReadable()){
                        read(key);
                    }
                }

        }
    }
    /*
      
     */
    public void connect(SelectionKey key) throws IOException {
       SocketChannel channel=(SocketChannel)key.channel();
        //      ,     
        if(channel.isConnectionPending()){
            channel.finishConnect();

        }
        channel.configureBlocking(false);
        channel.write(ByteBuffer.wrap(new String("HELLO" ).getBytes()));
        channel.register(this.selector,SelectionKey.OP_READ);
    }

    public void read(SelectionKey key) throws IOException {
        SocketChannel channel=(SocketChannel)key.channel();
        //     
        ByteBuffer bb=ByteBuffer.allocate(1000);
        channel.read(bb);
        byte[] data=bb.array();
        String msg=new String(data).trim();
        System.out.println("       :"+msg);
        channel.close();
        key.selector().close();
    }

    public static void main(String[] args) throws IOException {
        NioClient nioClient=new NioClient();
        nioClient.init("127.0.0.1",65500);
        nioClient.working();
    }
}

좋은 웹페이지 즐겨찾기