다 중 스 레 드 비 차단 서버 디자인

이어서 위의 일 지 를 쓰다.일반적으로 서버 를 설계 할 때 차단 되 지 않 고 간단 하기 위해 보통 하나의 스 레 드 로 설계 하여 조작 합 니 다.
그러나 이러한 디자인 의 단점 도 뚜렷 하 다. 만약 에 서버 에 많은 연결 이 있다 면 순환 할 때마다 많은 소켓 을 처리 할 것 이다. CPU 사용률 이 높 지 않 은 것 을 제외 하고 특정한 소켓 의 데이터 전송 속도 가 느 리 면 그의 호출 도 느 릴 것 이다.따라서 반드시 다른 소켓 의 데이터 전송 에 영향 을 줄 것 이다.
그래서 비 차단 + 다 중 스 레 드 는 대형 서버 에 필수 적 인 해결 방안 입 니 다.
다 중 스 레 드 가 같은 소켓 을 조작 할 때 고려 해 야 할 문제 가 비교적 복잡 하 다.http://blog.csdn.net/shallwake/archive/2009/12/15/5014160.aspx토론 이 있 습 니 다. 원칙 은 다 중 스 레 드 와 동시에 recv () 를 하지 않 는 것 입 니 다.다 중 스 레 드 send () 는 절대 안 됩 니 다.두 개의 스 레 드, 하나의 recv (), 하나의 send () 를 사용 할 수 있 습 니 다.이 유 는 간단 하 다. 소켓 은 스 레 드 가 안전 하지만 다 중 스 레 드 는 파 우 치 오류 가 발생 할 수 있다 는 것 이다.
따라서 서버 에 대해 모든 스 레 드 가 작 동 하 는 소켓 간 에 교 집합 이 없 으 면 됩 니 다. 물론 이 교 집합 은 논리 적 으로 교 집합 이 없 을 뿐 입 니 다. 예 를 들 어 특정한 스 레 드 는 A 소켓 send () 를 조작 하고 다른 스 레 드 는 A 소켓 recv () 를 조작 하 는 것 을 책임 집 니 다. 그러면 그들 은 논리 적 으로 교 집합 이 없 는 이 유 는 위 에서 설명 되 었 습 니 다.
나머지 는 데이터 구조의 디자인 이 고 개인 총 결 은 두 가지 가 있다.
1. 동적 생 성 스 레 드:
하나의 스 레 드 처리 N 개의 소켓 을 규정 하 는 것 입 니 다. 전체 소켓 이 이 값 을 초과 하면 스 레 드 처 리 를 따로 만 듭 니 다.
2. 미리 할당 스 레 드:
N 개의 스 레 드 를 먼저 분배 한 다음 에 소켓 이 증가 함 에 따라 그들 을 모든 스 레 드 에 골 고루 분배 하 는 것 이다.
난점: 소켓 은 동태 적 으로 변화 하 는 것 으로 증가 할 때 비교적 쉽 지만 감소 하면 방법 1 과 방법 2 는 난이도 차이 가 있다.
방법 1: 특정한 스 레 드 가 처리 하 는 소켓 이 감소 하고 N 보다 작 을 때 다음 에 소켓 을 추가 할 때 스 레 드 를 열 수 없습니다. 계속 이용 해 야 합 니 다. 소켓 이 0 으로 줄 어 들 면 이 스 레 드 는 스스로 소각 합 니 다.그래서 전체 동적 과정의 유지 보 수 는 비교적 복잡 하 다. 자신 은 단지 이해 하고 싶 을 뿐 시험 해 본 적 이 없다.
방법 2: 제 가 아래 에 해결 방안 을 제공 하 겠 습 니 다.
 
방법 2 의 실현:
먼저 kasicass GG 의 지난 글 에서 의 조언 에 감 사 드 립 니 다 ~ ~, 오늘 오후 그의 생각 대로 자바 로 간단하게 시도 해 보 았 습 니 다.
생각 은 비교적 간단 합 니 다. 5 개의 스 레 드 를 열 면 클 라 이언 트 정 보 를 비동기 로 읽 습 니 다. 물론 소켓 처리 가 없 으 면 스 레 드 를 종료 할 수 없 기 때문에 모든 스 레 드 는 BlockingQueue 를 사 용 했 고 하나의 변수 sockCount 를 유지 하여 현재 처리 하고 있 는 소켓 의 수량 을 기록 합 니 다.연결 이 있 을 때 메 인 스 레 드 는 sockCount 가 가장 적은 스 레 드 를 선택 하여 이 연결 을 처리 하면 OK 입 니 다. 동적 으로 유지 하 는 방법 만큼 복잡 하지 않 습 니 다.
응, 게으름뱅이 가 큰 수 를 써 서 코드 를 붙 였 어...
서버 클래스
import java.nio.channels.*;
import java.util.*;
import java.net.*;
import java.io.*;

public class Server extends Thread{

    private ServerSocketChannel sSockChan;

    private Selector selector;

    private ArrayList
   
     readers;

    public Server(){
        readers = new ArrayList();
        
    for(
    int i = 0; i < 5; i ++){
            EventReader er = new EventReader(this);
            er.start();
            readers.add(er);
        }

        initServerSocket();
    }

    public 
    int getIndex(){
        
    int min = 999999;
        
    int pos = 0;
        
    for(
    int i = 0; i < 5; i ++){
            
    if(min >= readers.get(i).getSocketCount()){
                 min = readers.get(i).getSocketCount();
                 pos = i;
            }
        }
        
    return pos;
    }

    private 
    void initServerSocket() {
	try {
	    
    // open a non-blocking server socket channel
	    sSockChan = ServerSocketChannel.
    open();
	    sSockChan.configureBlocking(false);

	    
    // bind to localhost on designated port
	    InetAddress addr = InetAddress.getLocalHost();
	    System.out.println("
    binding to address: " + addr.getHostAddress());
	    sSockChan.socket().bind(new InetSocketAddress(addr, 8550));

	    
    // get a selector
	    selector = Selector.
    open();

	    
    // register the channel with the selector to handle accepts
	    SelectionKey acceptKey = sSockChan.
    register(selector, SelectionKey.OP_ACCEPT);
	}
	catch (Exception e) {
	    System.out.println("
    error initializing ServerSocket");
	    System.
    exit(1);
	}
    }

    @Override
    public 
    void run(){

        
    while(true){

	    try {
		
    // blocking select, will return when we get a new connection
		selector.select();

		
    // fetch the keys
		Set readyKeys = selector.selectedKeys();

		
    // run through the keys and process
		Iterator i = readyKeys.iterator();
		
    while (i.hasNext()) {
		    SelectionKey key = (SelectionKey) i.next();
		    i.
    remove();

		    ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
		    SocketChannel clientChannel = ssChannel.accept();

		    
    //      。
		    readers.get(getIndex()).addNewClient(clientChannel,getIndex());
		    System.out.println("
    got connection from: " + clientChannel.socket().getInetAddress());
		}
	    }
	    catch (IOException ioe) {
		System.out.println("
    error during serverSocket select(): " + ioe.getMessage());
	    }
	    catch (Exception e) {
		System.out.println("
    exception in run()");
	    }
        }
    }

    public 
    static 
    void main(String[] args) {
        Server server = new Server();
        server.start();
    }
}
   
EventReader (      )
import java.nio.*;
import java.nio.channels.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public class EventReader extends Thread{
    private Server sv;
    
    private Selector selector;
    
    private BlockingQueue newClients;

    private int scoketCount = 0;

    public int getSocketCount(){
        return scoketCount;
    }

    public EventReader(Server sv){
        this.sv = sv;
        newClients = new LinkedBlockingQueue();
    }

    public void addNewClient(SocketChannel clientChannel, int id) {
        try {
            scoketCount++;
            newClients.put(clientChannel);
            selector.wakeup();
            System.out.println("I'm in thread" + id);
        } catch (InterruptedException ex) {
            Logger.getLogger(EventReader.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run(){
        try {
            selector = Selector.open();

            while(true){
                read();
                checkNewConnections();

                try { Thread.sleep(30); } catch (InterruptedException e) {}
            }
        } catch (IOException ex) {
            Logger.getLogger(EventReader.class.getName()).log(Level.SEVERE, null, ex);
        }

    }

    private void read(){
        try {
            selector.select();
            Set readyKeys = selector.selectedKeys();

            Iterator i = readyKeys.iterator();
            while (i.hasNext()) {
                SelectionKey key = (SelectionKey) i.next();
                i.remove();
                SocketChannel channel = (SocketChannel) key.channel();
                ByteBuffer buffer = (ByteBuffer)key.attachment();

                long nbytes = channel.read(buffer);

                if(nbytes == -1){
                    System.out.println("channel has closed");
                    scoketCount--;
                    channel.close();
                }
                System.out.println(buffer.array());

                //  ,        ,             ,           。
                buffer.clear();
            }
        } catch (IOException ex) {
            Logger.getLogger(EventReader.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private void checkNewConnections(){
        try {
            SocketChannel clientChannel = (SocketChannel) newClients.take();
            clientChannel.configureBlocking( false);
            clientChannel.register( selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
        } catch (InterruptedException ex) {
            Logger.getLogger(EventReader.class.getName()).log(Level.SEVERE, null, ex);
        }
        catch(IOException ex){

        }
    }
}

Over。。。

좋은 웹페이지 즐겨찾기