AIO 모델
54945 단어 #자바 네트워크 프로 그래 밍자바
1. AIO: 비동기 비 차단 안내
AIO 는 운영 체제 의 지원 이 필요 합 니 다. Liux 커 널 2.6 버 전에 서 진정한 비동기 IO 에 대한 지원 을 추 가 했 습 니 다. 자바 는 jdk 1.7 부터 AIO 를 지원 합 니 다.
핵심 클래스 는 AsynchronousSocketChannel, AsynchronousServerSocketChannel, AsynchronousChannel Group 이다
AsynchronousChannel Group 은 비동기 채널 의 그룹 관리자 로 자원 공 유 를 실현 할 수 있 습 니 다.
AsynchronousChannel Group 을 만 들 때 ExecutorService, 즉 하나의 스 레 드 탱크 를 연결 해 야 합 니 다. 이 스 레 드 탱크 는 두 가지 임 무 를 수행 합 니 다. IO 이벤트 처리 와 CompletionHandler 리 셋 인 터 페 이 스 를 촉발 합 니 다.
2. AsynchronousServerSocketChannel: AIO 에서 네트워크 통신 서버 Socket
accept()
방법: AsynchronousServerSocketChannel 이 생 성 된 후에 ServerSocket 과 유사 하고 accept()
방법 으로 클 라 이언 트 의 연결 을 받 아들 일 수 있 습 니 다. 비동기 IO 의 실제 IO 작업 은 운영 체제 에 맡 겨 진 것 이기 때문에 사용자 프로 세 스 는 운영 체제 에 IO 를 진행 하고 운영 체제 IO 가 완 성 된 알림 만 받 습 니 다.따라서 비동기 적 인 ServerChannel 호출 accept()
방법 을 사용 하면 현재 스 레 드 가 막 히 지 않 습 니 다. 프로그램 도 accept () 방법 이 언제 클 라 이언 트 요청 을 받 을 수 있 는 지 모 르 고 운영 체제 가 네트워크 IO 를 완성 합 니 다. 이 문 제 를 해결 하기 위해 AIO 는 accept () 방법 에 두 가지 버 전 을 제공 합 니 다.Future accept()
: 클 라 이언 트 요청 을 받 기 시 작 했 습 니 다. 현재 스 레 드 가 네트워크 IO (즉 획득 AsynchronousSocketChannel
를 해 야 한다 면 이 방법 으로 되 돌아 오 는 Future 대상 get()
방법 을 사용 해 야 합 니 다. 그러나 get()
방법 은 이 스 레 드 를 막 을 수 있 기 때문에 이 방식 은 차단 식 비동기 IO 입 니 다. void accept (Aattachment,CompletionHandler handler)
: 클 라 이언 트 의 요청 을 받 아들 이기 시작 하면 연결 이 성공 하거나 실패 하면 CompletionHandler
대상 의 해당 방법 을 촉발 합 니 다.이 가운데 AsynchronousSocketChannel
프로세서 가 연결 을 처리 하 는 데 성공 한 result 는 CompletionHandler
의 인 스 턴 스 입 니 다.한편, AsynchronousSocketChannel
인터페이스 에서 두 가지 방법 을 정 의 했 습 니 다. CompletionHandler
IO 가 완 료 될 때 이 방법 을 촉발 합 니 다. 이 방법의 첫 번 째 매개 변 수 는 IO 작업 이 돌아 오 는 대상 을 대표 하고 두 번 째 매개 변 수 는 IO 작업 을 시작 할 때 들 어 오 는 추가 매개 변 수 를 대표 합 니 다.completed(V result , A attachment)
: IO 가 실 패 했 을 때 이 방법 을 촉발 합 니 다. 첫 번 째 매개 변 수 는 IO 작업 실패 로 인 한 이상 이나 오 류 를 대표 합 니 다.3. AIO 프로 그래 밍
서버
public class AioServer {
private static int DEFAULT_PORT = 12345;
private static ServerHandler serverHandle;
public volatile static long clientCount = 0;
public static void start(){
start(DEFAULT_PORT);
}
public static synchronized void start(int port){
if(serverHandle!=null)
return;
serverHandle = new ServerHandler(port);
new Thread(serverHandle,"Server").start();
}
public static void main(String[] args) {
AioServer.start();
}
}
public class ServerHandler implements Runnable{
private AsynchronousServerSocketChannel channel;
public ServerHandler(int port) {
try {
//
channel = AsynchronousServerSocketChannel.open();
//
channel.bind(new InetSocketAddress(port));
System.out.println(" , :"+port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
channel.accept(this, new AcceptHandler());
// Future accept = channel.accept();
//
// 1: while(true)+sleep
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// // 2 CountDownLatch : , , ,
//
// CountDownLatch count = new CountDownLatch(1);
// channel.accept(this, new AcceptHandler());
// try {
// count.await();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
// CompletionHandler
// V-IO , ,AsynchronousSocketChannel
// A-IO , AsynchronousServerSocketChannel
class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, ServerHandler> {
@Override
public void completed(AsynchronousSocketChannel channel, ServerHandler serverHandler) {
// Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// Handler
// channel.read(buffer, buffer, new ReadHandler(channel));
//
serverHandler.channel.accept(null, this);
}
@Override
public void failed(Throwable exc, ServerHandler serverHandler) {
exc.printStackTrace();
}
}
class ReadHandler implements CompletionHandler<ByteBuffer, ByteBuffer> {
// channel
private AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(ByteBuffer result, ByteBuffer attachment) {
result.flip();
byte[] msg = new byte[result.remaining()];
result.get(msg);
try {
String expression = new String(msg, "UTF-8");
System.out.println(" : " + expression);
// String result1 = "
";
result.clear();
//
doWrite(expression);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//
private void doWrite(String msg) {
byte[] bytes = msg.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(bytes);
buffer.flip();
//
channel.write(buffer, buffer, new CompletionHandler <Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
// ,
if (attachment.hasRemaining()) {
channel.write(attachment, attachment, this);
} else {
// Buffer
ByteBuffer allocate = ByteBuffer.allocate(1024);
// Handler
// channel.read(allocate, attachment, new ReadHandler(channel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
클 라 이언 트
public class AioClient {
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 12345;
private static ClientHandler clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
return;
clientHandle = new ClientHandler(ip,port);
new Thread(clientHandle,"Client").start();
}
//
public static boolean sendMsg(String msg) throws Exception{
if(msg.equals("exit")) return false;
clientHandle.sendMsg(msg);
return true;
}
public static void main(String[] args) throws Exception{
AioClient.start();
System.out.println(" :");
Scanner scanner = new Scanner(System.in);
while(AioClient.sendMsg(scanner.nextLine()));
}
}
public class ClientHandler implements Runnable{
private AsynchronousSocketChannel clientChannel;
private String host;
private int port;
private CountDownLatch latch;
public ClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
//
clientChannel = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
// CountDownLatch
// latch = new CountDownLatch(1);
// , , completed
clientChannel.connect(new InetSocketAddress(host, port), this, new AcceptHandler());
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// try {
// latch.await();
// } catch (InterruptedException e1) {
// e1.printStackTrace();
// }
// try {
// clientChannel.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
}
//
public void sendMsg(String msg){
byte[] req = msg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
System.out.println(">>>>>>msg:"+msg);
writeBuffer.put(req);
writeBuffer.flip();
//
clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel));
}
/**
*
*/
class AcceptHandler implements CompletionHandler<Void, ClientHandler> {
public AcceptHandler() {
}
@Override
public void completed(Void result, ClientHandler attachment) {
System.out.println(" ");
}
@Override
public void failed(Throwable exc, ClientHandler attachment) {
exc.printStackTrace();
try {
attachment.clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public WriteHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
//
if (attachment.hasRemaining()) {
// ,
System.out.println("WriteHandler.hasRemaining>>>>>");
clientChannel.write(attachment, attachment, this);
} else {
//
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
public ReadHandler(AsynchronousSocketChannel clientChannel) {
this.clientChannel = clientChannel;
}
@Override
public void completed(Integer result,ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,"UTF-8");
System.out.println(" :"+ body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {
System.err.println(" ...");
try {
clientChannel.close();
} catch (IOException e) {
}
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Rails Turbolinks를 페이지 단위로 비활성화하는 방법원래 Turobolinks란? Turbolinks는 링크를 생성하는 요소인 a 요소의 클릭을 후크로 하고, 이동한 페이지를 Ajax에서 가져옵니다. 그 후, 취득 페이지의 데이터가 천이 전의 페이지와 동일한 것이 있...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.