socket 네트워크 프로그래밍 - 단체 채팅의 실현

16611 단어
저희가 채팅방을 이루고 싶은데 어떻게 하나요?먼저 우리는 채팅은 클라이언트가 클라이언트에게 하는 것이고 그들 사이에는 하나의 서버가 중개가 되어야 한다는 것을 알고 있다. 지난 글에서 우리는 점대점 전송 socket 네트워크 프로그래밍-UDP 보조 TCP를 실현하여 점대점 전송을 실현하고 서버에서 우리가 직접 데이터를 인쇄하는
  @Override
        public void run() {
            super.run();
            try {
                //  , 
                BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));

                do {
                    //  
                    String str = socketInput.readLine();
                    if (str == null) {
                        System.out.println(" !");
                        //  
                        ClientHandler.this.exitBySelf();
                        break;
                    }
                    //  
                    System.out.println(str);
                } while (!done);
            } catch (Exception e) {
                if (!done) {
                    System.out.println(" ");
                    ClientHandler.this.exitBySelf();
                }
            } finally {
                //  
                CloseUtils.close(inputStream);
            }
}

사고방식: 이 코드를 수정하여 데이터를 이전의 TCPServer로 되돌려보냅니다.

그룹 채팅의 실현

  • ClientHandler의 수정으로 새 인터페이스가 TCPServer로 되돌아왔습니다. 인터페이스 이름은 onNewMessageArrived입니다
  • public class ClientHandler {
        private final Socket socket;
        private final ClientReadHandler readHandler;
        private final ClientWriteHandler writeHandler;
        private final ClientHandlerCallback clientHandlerCallback;
        private final String clientInfo;
    
        public ClientHandler(Socket socket, ClientHandlerCallback closeNotify) throws IOException {
            this.socket = socket;
            this.readHandler = new ClientReadHandler(socket.getInputStream());
            this.writeHandler = new ClientWriteHandler(socket.getOutputStream());
            this.clientHandlerCallback = closeNotify;
            clientInfo = "IP :" + socket.getInetAddress() +
                    " P:" + socket.getPort();
            System.out.println(" :" + clientInfo);
        }
    
        public String getClientInfo() {
            return clientInfo;
        }
    
        public void exit() {
            readHandler.exit();
            writeHandler.exit();
            CloseUtils.close(socket);
            System.out.println(" :" + socket.getInetAddress() +
                    " P:" + socket.getPort());
        }
    
        public void send(String str) {
            writeHandler.send(str);
        }
    
        public void readToPrint() {
            readHandler.start();
        }
    
        private void exitBySelf() {
            exit();
            clientHandlerCallback.onSelfClosed(this);
        }
    
        public interface ClientHandlerCallback {
            void onSelfClosed(ClientHandler handler);
    
            // 
            void onNewMessageArrived(ClientHandler handler, String msg);
        }
    
        class ClientReadHandler extends Thread {
            private boolean done = false;
            private final InputStream inputStream;
    
            ClientReadHandler(InputStream inputStream) {
                this.inputStream = inputStream;
            }
    
            @Override
            public void run() {
                super.run();
                try {
                    //  , 
                    BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
    
                    do {
                        //  
                        String str = socketInput.readLine();
                        if (str == null) {
                            System.out.println(" !");
                            //  
                            ClientHandler.this.exitBySelf();
                            break;
                        }
                        //  
                        //System.out.println(str);
                        clientHandlerCallback.onNewMessageArrived(ClientHandler.this, str);
                    } while (!done);
                } catch (Exception e) {
                    if (!done) {
                        System.out.println(" ");
                        ClientHandler.this.exitBySelf();
                    }
                } finally {
                    //  
                    CloseUtils.close(inputStream);
                }
            }
    
            void exit() {
                done = true;
                CloseUtils.close(inputStream);
            }
        }
    
        class ClientWriteHandler {
            private boolean done = false;
            private final PrintStream printStream;
            private final ExecutorService executorService;
    
            ClientWriteHandler(OutputStream outputStream) {
                this.printStream = new PrintStream(outputStream);
                this.executorService = Executors.newSingleThreadExecutor();
            }
    
            void exit() {
                done = true;
                CloseUtils.close(printStream);
                executorService.shutdownNow();
            }
    
            void send(String str) {
                if(done){
                    return;
                }
                executorService.execute(new WriteRunnable(str));
            }
    
            class WriteRunnable implements Runnable {
                private final String msg;
    
                WriteRunnable(String msg) {
                    this.msg = msg;
                }
    
                @Override
                public void run() {
                    if (ClientWriteHandler.this.done) {
                        return;
                    }
    
                    try {
                        ClientWriteHandler.this.printStream.println(msg);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    ClientHandler는 서버 내부의 구현이므로 서버 TCPServer로 돌아가야 합니다.
  • TCPServer 코드 수정: 다른 사람들이 보기에 편리하기 때문에 저는 직접 생각을 복사합니다. 다시 추가하지만 메시지를 받아들이는 것은 라인을 막을 수 없습니다. 그래서 우리는 라인을 열어야 합니다. 우리는 모든 클라이언트를 훑어보고 자신을 배출하고 다른 클라이언트에게 메시지를 보냅니다
  • public class TCPServer implements ClientHandler.ClientHandlerCallback {
        private final int port;
        private ClientListener mListener;
        private List clientHandlerList = new ArrayList<>();
        private final ExecutorService forwardingThreadExecutor;
    
        public TCPServer(int port) {
            this.port = port;
            this.forwardingThreadExecutor = Executors.newSingleThreadExecutor();
        }
    
        public boolean start() {
            try {
                ClientListener listener = new ClientListener(port);
                mListener = listener;
                listener.start();
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
            return true;
        }
    
        public void stop() {
            if (mListener != null) {
                mListener.exit();
            }
            synchronized (TCPServer.this) {
                for (ClientHandler clientHandler : clientHandlerList) {
                    clientHandler.exit();
                }
    
                clientHandlerList.clear();
            }
            forwardingThreadExecutor.shutdownNow();
    
        }
    
        public void broadcast(String str) {
            synchronized (TCPServer.this) {
                for (ClientHandler clientHandler : clientHandlerList) {
                    clientHandler.send(str);
                }
            }
        }
    
        @Override
        public synchronized void onSelfClosed(ClientHandler handler) {
            clientHandlerList.remove(handler);
        }
    
        @Override
        public void onNewMessageArrived(final ClientHandler handler, final String msg) {
            // 
            System.out.println("Received:" + handler.getClientInfo() + ":" + msg);
            // , 
            // 
            forwardingThreadExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    synchronized (TCPServer.this) {
                        for (ClientHandler clientHandler : clientHandlerList) {
                            // 
                            if (clientHandler.equals(handler)) {
                                continue;
                            }
                            // 
                            clientHandler.send(msg);
                        }
                    }
                }
            });
        }
    
        private class ClientListener extends Thread {
            private ServerSocket server;
            private boolean done = false;
    
            private ClientListener(int port) throws IOException {
                server = new ServerSocket(port);
                System.out.println(" :" + server.getInetAddress() + " P:" + server.getLocalPort());
            }
    
            @Override
            public void run() {
                super.run();
    
                System.out.println(" ~");
                //  
                do {
                    //  
                    Socket client;
                    try {
                        client = server.accept();
                    } catch (IOException e) {
                        continue;
                    }
                    try {
                        //  
                        ClientHandler clientHandler = new ClientHandler(client, TCPServer.this);
                        //  
                        clientHandler.readToPrint();
                        // 
                        synchronized (TCPServer.this) {
                            clientHandlerList.add(clientHandler);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        System.out.println(" :" + e.getMessage());
                    }
                } while (!done);
    
                System.out.println(" !");
            }
    
            void exit() {
                done = true;
                try {
                    server.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    서버 상태 및 테스트 용례 구축


    상태: 바쁘다
  • 클라이언트마다 서버가 두 번 대기해야 한다
  • 클라이언트 쌍통: 클라이언트가 서버의 수락 채널로 데이터를 보냅니다
  • 서버 쌍통: 서버 회송 메시지의 송신 채널
  • 모든 통로가 막혀서 비동기적으로만 이루어질 수 있다

  • 서버 스레드 수
  • 클라이언트 하나: 두 개의 라인이 필요합니다
  • n 클라이언트: 2n 라인
  • 서버 실제 수량: 2n+

  • 클라이언트를 수정하기 전의 클라이언트는 내부 TCPClient를 제어할 수 없습니다.
  • 클라이언트 코드
  • public class Client {
        public static void main(String[] args) {
            ServerInfo info = UDPSearcher.searchServer(10000);
            System.out.println("Server:" + info);
    
            if (info != null) {
                TCPClient tcpClient = null;
                try {
                    tcpClient = TCPClient.startWith(info);
                    if (tcpClient == null) {
                        return;
                    }
                    write(tcpClient);
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (tcpClient != null) {
                        tcpClient.exit();
                    }
                }
            }
        }
    
        private static void write(TCPClient client) throws IOException {
            //  
            InputStream in = System.in;
            BufferedReader input = new BufferedReader(new InputStreamReader(in));
    
    
            do {
                //  
                String str = input.readLine();
                //  
                client.send(str);
    
                if ("bye".equalsIgnoreCase(str)) {
                    break;
                }
            } while (true);
    
    
        }
    }
    
  • TCPClient 수정: 주로 제어 가능
  • public class TCPClient {
        private final Socket socket;
        private final ReadHandler readHandler;
        private final PrintStream printStream;
    
        public TCPClient(Socket socket, ReadHandler readHandler) throws IOException {
            this.socket = socket;
            this.readHandler = readHandler;
            this.printStream = new PrintStream(socket.getOutputStream());
        }
    
        public void exit() {
            readHandler.exit();
            CloseUtils.close(printStream);
            CloseUtils.close(socket);
        }
    
        public void send(String msg) {
            printStream.println(msg);
        }
    
        public static TCPClient startWith(ServerInfo info) throws IOException {
            Socket socket = new Socket();
            //  
            socket.setSoTimeout(3000);
    
            //  , 2000; 3000ms
            socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()), info.getPort()), 3000);
    
            System.out.println(" , ~");
            System.out.println(" :" + socket.getLocalAddress() + "  :" + socket.getLocalPort());
            System.out.println(" :" + socket.getInetAddress() + "  :" + socket.getPort());
    
            try {
                ReadHandler readHandler = new ReadHandler(socket.getInputStream());
                readHandler.start();
    
                return new TCPClient(socket, readHandler);
    
            } catch (Exception e) {
                System.out.println(" ");
                CloseUtils.close(socket);
                return null;
            }
    
    
        }
    
    
    
        static class ReadHandler extends Thread {
            private boolean done = false;
            private final InputStream inputStream;
    
            ReadHandler(InputStream inputStream) {
                this.inputStream = inputStream;
            }
    
            @Override
            public void run() {
                super.run();
                try {
                    //  , 
                    BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
    
                    do {
                        String str;
                        try {
                            //  
                            str = socketInput.readLine();
                        } catch (SocketTimeoutException e) {
                            continue;
                        }
                        if (str == null) {
                            System.out.println(" , !");
                            break;
                        }
                        //  
                        System.out.println(str);
                    } while (!done);
                } catch (Exception e) {
                    if (!done) {
                        System.out.println(" :" + e.getMessage());
                    }
                } finally {
                    //  
                    try {
                        if (inputStream != null) {
                            inputStream.close();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            void exit() {
                done = true;
                try {
                    if (inputStream != null) {
                        inputStream.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    테스트 용례
    public class ClientTest {
        private static boolean done=false;
    
        public static void main(String[] args) throws IOException {
            ServerInfo info = UDPSearcher.searchServer(10000);
            System.out.println("Server:" + info);
            if (info == null) return;
            // 
            int size = 0;
            final List tcpClientList = new ArrayList<>();
            for (int i = 0; i < 100; i++) {
                try {
                    TCPClient tcpClient = TCPClient.startWith(info);
                    if (tcpClient == null) {
                        System.out.println(" ");
                        continue;
                    }
                    tcpClientList.add(tcpClient);
                    System.out.println(" :" + (++size));
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println(" ");
                }
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.in.read();
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        while (!done) {
                            for (TCPClient tcpClient : tcpClientList) {
                              tcpClient.send("Hello Peakmain!!!!!");
                            }
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                };
                Thread thread = new Thread(runnable);
                thread.start();
                System.in.read();
                done=true;
                try {
                    // 
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (TCPClient tcpClient : tcpClientList) {
                    tcpClient.exit();
                }
            }
        }
    }
    

    CPU: 데이터의 빈도성, 데이터 전송의 복잡성 메모리: 클라이언트의 수, 고객이 보내는 데이터의 크기 라인: 연결된 클라이언트의 수에 따라 다름
    서버 최적화 방안 분석
  • 스레드 수량을 줄이다
  • 라인 실행이 바쁜 상태를 증가합니다
  • 클라이언트 버퍼 복용 메커니즘
  • 좋은 웹페이지 즐겨찾기