JAVA AIO 서버 와 클 라 이언 트 구현 예시

16523 단어 자바AIOJAVAEE
AIO 는 파일 처리 에 사용 하 는 것 이 비교적 즐 겁 지만 AIO 로 네트워크 메 시 지 를 써 서 서버 측 과 클 라 이언 트 를 처리 하 는 것 은 비교적 번 거 로 운 일이 다. 물론 이것 은 제 개인 적 인 의견 일 뿐 몇 가지 이유 가 있 습 니 다.
첫째, AIO 는 운영 체제 지원 이 필요 합 니 다. 다행히 Windows 와 Linux (시 뮬 레이 션) 가 모두 지원 합 니 다.
둘째, AIO 는 재 귀적 호출 과 비동기 호출 을 동시에 사용 하면 프로그래머 를 어 지 럽 히 기 쉽 고 코드 가 잘못 되 기 쉽다.
셋째, CompletionHandler 는 단독 스 레 드 를 사용 하여 다 중 스 레 드 문제 가 발생 하기 쉬 우 며 빈번 한 스 레 드 컨 텍스트 전환 은 자원 을 소모 합 니 다.
넷 째, 기록 할 데 이 터 를 캐 시 하기 위해 비동기 쓰기 입 니 다. 그렇지 않 으 면 Write Pending Exception 을 만 날 수 있 습 니 다.
상대 적 으로 NIO 는 직 설 적 이 고 통제 하기 쉽다.
또한 필 자 는 다 중 스 레 드 를 사용 하여 여러 클 라 이언 트 장면 을 모 의 하 는 데 실 패 했 습 니 다. 코드 는 run 방법 에서 Asynchronous SocketChannel. connect () 를 호출 하여 돌아 오지 않 았 습 니 다. 서버 에 연결 되 지 않 았 습 니 다. 왠 지 모 르 겠 습 니 다. 대 협 에 게 가르침 을 청 했 습 니 다. 마지막 으로 여러 프로 세 스 를 사용 하여 여러 클 라 이언 트 를 모 의 하고 다음 코드 와 유사 한 bat 파일 을 쓰 며 여러 개 를 동시에 실행 할
java -classpath .\ com.stevex.app.aio.Client 1

java -classpath .\ com.stevex.app.aio.Client 1

pause

서버 코드:
package com.stevex.app.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//import java.nio.channels.WritePendingException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executors;

public class XiaoNa {
	private final AsynchronousServerSocketChannel server;
	//   ,                 ,       WritePendingException
	//                  ,  AIO      
    private final Queue queue = new LinkedList();
    private boolean writing = false;
	
	public static void main(String[] args) throws IOException{
		XiaoNa xiaona = new XiaoNa();
		xiaona.listen();
	}

	public XiaoNa() throws IOException{
		//      CPU  
		AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
		server = AsynchronousServerSocketChannel.open(channelGroup);
		//    
		server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
		//               
		server.bind(new InetSocketAddress(8383), 80);		
	}

	public void listen() {
		System.out.println(Thread.currentThread().getName() + ": run in listen method" );
		//           
		server.accept(null, new CompletionHandler(){						
			@Override
			public void completed(AsynchronousSocketChannel channel,
					Object p_w_upload) {
				System.out.println(Thread.currentThread().getName() + ": run in accept completed method" );
				
				//            ,       ,         
				//    this    ,     
				server.accept(null, this);
				//      
				handle(channel);
			}

			private void handle(final AsynchronousSocketChannel channel) {
				System.out.println(Thread.currentThread().getName() + ": run in handle method" );
				//  AsynchronousSocketChannel,       
				final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
				readBuffer.clear();
				channel.read(readBuffer, null, new CompletionHandler(){

					@Override
					public void completed(Integer count, Object p_w_upload) {
						System.out.println(Thread.currentThread().getName() + ": run in read completed method" );	
						
						if(count > 0){
							try{
								readBuffer.flip();
								//CharBuffer charBuffer = CharsetHelper.decode(readBuffer); 
								CharBuffer charBuffer = Charset.forName("UTF-8").newDecoder().decode(readBuffer);
								String question = charBuffer.toString(); 
								String answer = Helper.getAnswer(question);
								/*//        ,       CompletionHandler            
								//channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));								
								try{
									channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
								}
								//Unchecked exception thrown when an attempt is made to write to an asynchronous socket channel and a previous write has not completed.
								//          
								catch(WritePendingException wpe){
									//       ,        
									Helper.sleep(1);
									channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
								}*/
								writeStringMessage(channel, answer);
								
								readBuffer.clear();
							}
							catch(IOException e){
								e.printStackTrace();
							}
						}
						else{
							try {
								//       socket,          ,    CPU
								channel.close();
							} catch (IOException e) {
								e.printStackTrace();
							}
						}
						
						//    OS        
						//    this    ,     
						channel.read(readBuffer, null, this);
					}

					/**
					 *         
					 * @param exc
					 * @param p_w_upload
					 */
					@Override
					public void failed(Throwable exc, Object p_w_upload) {
						System.out.println("server read failed: " + exc);			
						if(channel != null){
							try {
								channel.close();
							} catch (IOException e) {
								e.printStackTrace();
							}
						}
					}
					
				});								
			}

			/**
			 *            
			 * @param exc
			 * @param p_w_upload
			 */
			@Override
			public void failed(Throwable exc, Object p_w_upload) {
				System.out.println("server accept failed: " + exc);
			}
			
		});
	}
	
	/**
     * Enqueues a write of the buffer to the channel.
     * The call is asynchronous so the buffer is not safe to modify after
     * passing the buffer here.
     *
     * @param buffer the buffer to send to the channel
     */
    private void writeMessage(final AsynchronousSocketChannel channel, final ByteBuffer buffer) {
        boolean threadShouldWrite = false;

        synchronized(queue) {
            queue.add(buffer);
            // Currently no thread writing, make this thread dispatch a write
            if (!writing) {
                writing = true;
                threadShouldWrite = true;
            }
        }

        if (threadShouldWrite) {
            writeFromQueue(channel);
        }
    }

    private void writeFromQueue(final AsynchronousSocketChannel channel) {
        ByteBuffer buffer;

        synchronized (queue) {
            buffer = queue.poll();
            if (buffer == null) {
                writing = false;
            }
        }

        // No new data in buffer to write
        if (writing) {
            writeBuffer(channel, buffer);
        }
    }

    private void writeBuffer(final AsynchronousSocketChannel channel, ByteBuffer buffer) {
        channel.write(buffer, buffer, new CompletionHandler() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    // Go back and check if there is new data to write
                    writeFromQueue(channel);
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer p_w_upload) {
            	System.out.println("server write failed: " + exc);
            }
        });
    }

    /**
     * Sends a message
     * @param string the message
     * @throws CharacterCodingException 
     */
    private void writeStringMessage(final AsynchronousSocketChannel channel, String msg) throws CharacterCodingException {
    	writeMessage(channel, Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
    }
}

클 라 이언 트 코드:
package com.stevex.app.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

import com.stevex.app.nio.CharsetHelper;

public class Client implements Runnable{
	private AsynchronousSocketChannel channel;
	private Helper helper;
	private CountDownLatch latch;
    private final Queue queue = new LinkedList();
    private boolean writing = false;
	
	public Client(AsynchronousChannelGroup channelGroup, CountDownLatch latch) throws IOException, InterruptedException{
		this.latch = latch;
		helper = new Helper();
		initChannel(channelGroup);
	}

	private void initChannel(AsynchronousChannelGroup channelGroup) throws IOException {
		//   channel group     socket channel
		channel = AsynchronousSocketChannel.open(channelGroup);
		//  Socket  
		channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
		channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
		channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
	}

	public static void main(String[] args) throws IOException, InterruptedException {
		int sleepTime = Integer.parseInt(args[0]);
		Helper.sleep(sleepTime);
		
		AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
		//       ,     connect   ,      
		final int THREAD_NUM = 1;
		CountDownLatch latch = new CountDownLatch(THREAD_NUM);
		
		//             ,    ,  
		//                       
		for(int i=0; i(){
			final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
			
			@Override
			public void completed(Void result, Void p_w_upload) {
				//     ,     OS         
				try {
					//channel.write(CharsetHelper.encode(CharBuffer.wrap(helper.getWord())));
					writeStringMessage(helper.getWord());
				} catch (CharacterCodingException e) {
					e.printStackTrace();
				}
				
				//helper.sleep();//         
				readBuffer.clear();
				//    OS          
				channel.read(readBuffer, null, new CompletionHandler(){

					@Override
					public void completed(Integer result, Object p_w_upload) {
						try{
							//         
							if(result > 0){
								readBuffer.flip();
								CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
								String answer = charBuffer.toString(); 
								System.out.println(Thread.currentThread().getName() + "---" + answer);
								readBuffer.clear();
								
								String word = helper.getWord();
								if(word != null){
									//   
									//channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
									writeStringMessage(word);
									//helper.sleep();//      
									channel.read(readBuffer, null, this);
								}
								else{
									//      ,    channel
									shutdown();
								}
							}
							else{
								//      channel,      ,     
								shutdown();
							}														
						}
						catch(Exception e){
							e.printStackTrace();
						}						
					}					

					/**
					 *       
					 * @param exc
					 * @param p_w_upload
					 */
					@Override
					public void failed(Throwable exc, Object p_w_upload) {
						System.out.println("client read failed: " + exc);
						try {
							shutdown();
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
					
				});
			}

			/**
			 *       
			 * @param exc
			 * @param p_w_upload
			 */
			@Override
			public void failed(Throwable exc, Void p_w_upload) {
				System.out.println("client connect to server failed: " + exc);
				
				try {
					shutdown();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}			
		});		
	}
	
	private void shutdown() throws IOException {
		if(channel != null){
			channel.close();
		}
		
		latch.countDown();							
	}
	
	/**
     * Enqueues a write of the buffer to the channel.
     * The call is asynchronous so the buffer is not safe to modify after
     * passing the buffer here.
     *
     * @param buffer the buffer to send to the channel
     */
    private void writeMessage(final ByteBuffer buffer) {
        boolean threadShouldWrite = false;

        synchronized(queue) {
            queue.add(buffer);
            // Currently no thread writing, make this thread dispatch a write
            if (!writing) {
                writing = true;
                threadShouldWrite = true;
            }
        }

        if (threadShouldWrite) {
            writeFromQueue();
        }
    }

    private void writeFromQueue() {
        ByteBuffer buffer;

        synchronized (queue) {
            buffer = queue.poll();
            if (buffer == null) {
                writing = false;
            }
        }

        // No new data in buffer to write
        if (writing) {
            writeBuffer(buffer);
        }
    }

    private void writeBuffer(ByteBuffer buffer) {
        channel.write(buffer, buffer, new CompletionHandler() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    // Go back and check if there is new data to write
                    writeFromQueue();
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer p_w_upload) {
            }
        });
    }

    /**
     * Sends a message
     * @param string the message
     * @throws CharacterCodingException 
     */
    public void writeStringMessage(String msg) throws CharacterCodingException {
        writeMessage(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
    }
}

Helper 클래스:
package com.stevex.app.aio;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Helper {
	private static BlockingQueue words;
	private static Random random;
	
	public Helper() throws InterruptedException{
		words = new ArrayBlockingQueue(5);
		words.put("hi");
		words.put("who");
		words.put("what");
		words.put("where");
		words.put("bye");	
		
		random = new Random();
	}
	
	public String getWord(){
		return words.poll();
	}

	public void sleep() {
		try {
			TimeUnit.SECONDS.sleep(random.nextInt(3));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}	
	
	public static void sleep(long l) {
		try {
			TimeUnit.SECONDS.sleep(l);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static String getAnswer(String question){
		String answer = null;
		
		switch(question){
		case "who":
			answer = "    
"; break; case "what": answer = "
"; break; case "where": answer = "
"; break; case "hi": answer = "hello
"; break; case "bye": answer = "88
"; break; default: answer = "  who,  what,  where"; } return answer; } }

CharsetHelper 클래스:
package com.stevex.app.nio;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;

public class CharsetHelper {
	private static final String UTF_8 = "UTF-8";
	private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
	private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
	
	public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException{
		return encoder.encode(in);
	}

	public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
		return decoder.decode(in);
	}
}

좋은 웹페이지 즐겨찾기