JAVA AIO 서버 와 클 라 이언 트 구현 예시
첫째, AIO 는 운영 체제 지원 이 필요 합 니 다. 다행히 Windows 와 Linux (시 뮬 레이 션) 가 모두 지원 합 니 다.
둘째, AIO 는 재 귀적 호출 과 비동기 호출 을 동시에 사용 하면 프로그래머 를 어 지 럽 히 기 쉽 고 코드 가 잘못 되 기 쉽다.
셋째, CompletionHandler 는 단독 스 레 드 를 사용 하여 다 중 스 레 드 문제 가 발생 하기 쉬 우 며 빈번 한 스 레 드 컨 텍스트 전환 은 자원 을 소모 합 니 다.
넷 째, 기록 할 데 이 터 를 캐 시 하기 위해 비동기 쓰기 입 니 다. 그렇지 않 으 면 Write Pending Exception 을 만 날 수 있 습 니 다.
상대 적 으로 NIO 는 직 설 적 이 고 통제 하기 쉽다.
또한 필 자 는 다 중 스 레 드 를 사용 하여 여러 클 라 이언 트 장면 을 모 의 하 는 데 실 패 했 습 니 다. 코드 는 run 방법 에서 Asynchronous SocketChannel. connect () 를 호출 하여 돌아 오지 않 았 습 니 다. 서버 에 연결 되 지 않 았 습 니 다. 왠 지 모 르 겠 습 니 다. 대 협 에 게 가르침 을 청 했 습 니 다. 마지막 으로 여러 프로 세 스 를 사용 하여 여러 클 라 이언 트 를 모 의 하고 다음 코드 와 유사 한 bat 파일 을 쓰 며 여러 개 를 동시에 실행 할
java -classpath .\ com.stevex.app.aio.Client 1
java -classpath .\ com.stevex.app.aio.Client 1
pause
서버 코드:
package com.stevex.app.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//import java.nio.channels.WritePendingException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executors;
public class XiaoNa {
private final AsynchronousServerSocketChannel server;
// , , WritePendingException
// , AIO
private final Queue queue = new LinkedList();
private boolean writing = false;
public static void main(String[] args) throws IOException{
XiaoNa xiaona = new XiaoNa();
xiaona.listen();
}
public XiaoNa() throws IOException{
// CPU
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
server = AsynchronousServerSocketChannel.open(channelGroup);
//
server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
//
server.bind(new InetSocketAddress(8383), 80);
}
public void listen() {
System.out.println(Thread.currentThread().getName() + ": run in listen method" );
//
server.accept(null, new CompletionHandler(){
@Override
public void completed(AsynchronousSocketChannel channel,
Object p_w_upload) {
System.out.println(Thread.currentThread().getName() + ": run in accept completed method" );
// , ,
// this ,
server.accept(null, this);
//
handle(channel);
}
private void handle(final AsynchronousSocketChannel channel) {
System.out.println(Thread.currentThread().getName() + ": run in handle method" );
// AsynchronousSocketChannel,
final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
readBuffer.clear();
channel.read(readBuffer, null, new CompletionHandler(){
@Override
public void completed(Integer count, Object p_w_upload) {
System.out.println(Thread.currentThread().getName() + ": run in read completed method" );
if(count > 0){
try{
readBuffer.flip();
//CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
CharBuffer charBuffer = Charset.forName("UTF-8").newDecoder().decode(readBuffer);
String question = charBuffer.toString();
String answer = Helper.getAnswer(question);
/*// , CompletionHandler
//channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
try{
channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
}
//Unchecked exception thrown when an attempt is made to write to an asynchronous socket channel and a previous write has not completed.
//
catch(WritePendingException wpe){
// ,
Helper.sleep(1);
channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
}*/
writeStringMessage(channel, answer);
readBuffer.clear();
}
catch(IOException e){
e.printStackTrace();
}
}
else{
try {
// socket, , CPU
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// OS
// this ,
channel.read(readBuffer, null, this);
}
/**
*
* @param exc
* @param p_w_upload
*/
@Override
public void failed(Throwable exc, Object p_w_upload) {
System.out.println("server read failed: " + exc);
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
/**
*
* @param exc
* @param p_w_upload
*/
@Override
public void failed(Throwable exc, Object p_w_upload) {
System.out.println("server accept failed: " + exc);
}
});
}
/**
* Enqueues a write of the buffer to the channel.
* The call is asynchronous so the buffer is not safe to modify after
* passing the buffer here.
*
* @param buffer the buffer to send to the channel
*/
private void writeMessage(final AsynchronousSocketChannel channel, final ByteBuffer buffer) {
boolean threadShouldWrite = false;
synchronized(queue) {
queue.add(buffer);
// Currently no thread writing, make this thread dispatch a write
if (!writing) {
writing = true;
threadShouldWrite = true;
}
}
if (threadShouldWrite) {
writeFromQueue(channel);
}
}
private void writeFromQueue(final AsynchronousSocketChannel channel) {
ByteBuffer buffer;
synchronized (queue) {
buffer = queue.poll();
if (buffer == null) {
writing = false;
}
}
// No new data in buffer to write
if (writing) {
writeBuffer(channel, buffer);
}
}
private void writeBuffer(final AsynchronousSocketChannel channel, ByteBuffer buffer) {
channel.write(buffer, buffer, new CompletionHandler() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
} else {
// Go back and check if there is new data to write
writeFromQueue(channel);
}
}
@Override
public void failed(Throwable exc, ByteBuffer p_w_upload) {
System.out.println("server write failed: " + exc);
}
});
}
/**
* Sends a message
* @param string the message
* @throws CharacterCodingException
*/
private void writeStringMessage(final AsynchronousSocketChannel channel, String msg) throws CharacterCodingException {
writeMessage(channel, Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
}
}
클 라 이언 트 코드:
package com.stevex.app.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import com.stevex.app.nio.CharsetHelper;
public class Client implements Runnable{
private AsynchronousSocketChannel channel;
private Helper helper;
private CountDownLatch latch;
private final Queue queue = new LinkedList();
private boolean writing = false;
public Client(AsynchronousChannelGroup channelGroup, CountDownLatch latch) throws IOException, InterruptedException{
this.latch = latch;
helper = new Helper();
initChannel(channelGroup);
}
private void initChannel(AsynchronousChannelGroup channelGroup) throws IOException {
// channel group socket channel
channel = AsynchronousSocketChannel.open(channelGroup);
// Socket
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
}
public static void main(String[] args) throws IOException, InterruptedException {
int sleepTime = Integer.parseInt(args[0]);
Helper.sleep(sleepTime);
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
// , connect ,
final int THREAD_NUM = 1;
CountDownLatch latch = new CountDownLatch(THREAD_NUM);
// , ,
//
for(int i=0; i(){
final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
@Override
public void completed(Void result, Void p_w_upload) {
// , OS
try {
//channel.write(CharsetHelper.encode(CharBuffer.wrap(helper.getWord())));
writeStringMessage(helper.getWord());
} catch (CharacterCodingException e) {
e.printStackTrace();
}
//helper.sleep();//
readBuffer.clear();
// OS
channel.read(readBuffer, null, new CompletionHandler(){
@Override
public void completed(Integer result, Object p_w_upload) {
try{
//
if(result > 0){
readBuffer.flip();
CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
String answer = charBuffer.toString();
System.out.println(Thread.currentThread().getName() + "---" + answer);
readBuffer.clear();
String word = helper.getWord();
if(word != null){
//
//channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
writeStringMessage(word);
//helper.sleep();//
channel.read(readBuffer, null, this);
}
else{
// , channel
shutdown();
}
}
else{
// channel, ,
shutdown();
}
}
catch(Exception e){
e.printStackTrace();
}
}
/**
*
* @param exc
* @param p_w_upload
*/
@Override
public void failed(Throwable exc, Object p_w_upload) {
System.out.println("client read failed: " + exc);
try {
shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
/**
*
* @param exc
* @param p_w_upload
*/
@Override
public void failed(Throwable exc, Void p_w_upload) {
System.out.println("client connect to server failed: " + exc);
try {
shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
private void shutdown() throws IOException {
if(channel != null){
channel.close();
}
latch.countDown();
}
/**
* Enqueues a write of the buffer to the channel.
* The call is asynchronous so the buffer is not safe to modify after
* passing the buffer here.
*
* @param buffer the buffer to send to the channel
*/
private void writeMessage(final ByteBuffer buffer) {
boolean threadShouldWrite = false;
synchronized(queue) {
queue.add(buffer);
// Currently no thread writing, make this thread dispatch a write
if (!writing) {
writing = true;
threadShouldWrite = true;
}
}
if (threadShouldWrite) {
writeFromQueue();
}
}
private void writeFromQueue() {
ByteBuffer buffer;
synchronized (queue) {
buffer = queue.poll();
if (buffer == null) {
writing = false;
}
}
// No new data in buffer to write
if (writing) {
writeBuffer(buffer);
}
}
private void writeBuffer(ByteBuffer buffer) {
channel.write(buffer, buffer, new CompletionHandler() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
} else {
// Go back and check if there is new data to write
writeFromQueue();
}
}
@Override
public void failed(Throwable exc, ByteBuffer p_w_upload) {
}
});
}
/**
* Sends a message
* @param string the message
* @throws CharacterCodingException
*/
public void writeStringMessage(String msg) throws CharacterCodingException {
writeMessage(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
}
}
Helper 클래스:
package com.stevex.app.aio;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Helper {
private static BlockingQueue words;
private static Random random;
public Helper() throws InterruptedException{
words = new ArrayBlockingQueue(5);
words.put("hi");
words.put("who");
words.put("what");
words.put("where");
words.put("bye");
random = new Random();
}
public String getWord(){
return words.poll();
}
public void sleep() {
try {
TimeUnit.SECONDS.sleep(random.nextInt(3));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void sleep(long l) {
try {
TimeUnit.SECONDS.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static String getAnswer(String question){
String answer = null;
switch(question){
case "who":
answer = "
";
break;
case "what":
answer = "
";
break;
case "where":
answer = "
";
break;
case "hi":
answer = "hello
";
break;
case "bye":
answer = "88
";
break;
default:
answer = " who, what, where";
}
return answer;
}
}
CharsetHelper 클래스:
package com.stevex.app.nio;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
public class CharsetHelper {
private static final String UTF_8 = "UTF-8";
private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException{
return encoder.encode(in);
}
public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
return decoder.decode(in);
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Is Eclipse IDE dying?In 2014 the Eclipse IDE is the leading development environment for Java with a market share of approximately 65%. but ac...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.