Many2One 버퍼

5879 단어 man
다중 스레드 병행 작업 중 동기화 자물쇠의 비용을 최대한 줄이기 위해 일반적으로 동기화 작업을 최대한 줄인다.다음은 다중 스레드 쓰기입니다. 쓰기 작업은 동기화가 필요하고 읽기 작업은 부분적인 동기화가 필요합니다.읽기 작업의 동기화는 버퍼가 바뀔 때 발생합니다.
다음은 간단한 자바 구현입니다.

package org.jf;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;

/**
 * 
*    
*     :Util   
*    :MyBuffer   
*    :              
*    :Administrator   
*     :Mar 6, 2012 9:48:44 AM   
*    :Administrator   
*     :Mar 6, 2012 9:48:44 AM   
*     :   
* @version    
*
 */
public class Many2OneBuffer <T>{
	
	private Object writeLock;
	
	private int writeIndex=0;
	private int readIndex=0;
	
	private Object [] writeBuffer;
	private Object [] readBuffer;
	
	private int limit = -1;
	private int readLimit = -1;
	private int capacity ;

	
	public Many2OneBuffer(int capacity)
	{
		this.capacity=capacity;
		writeBuffer = new Object[capacity];
		readBuffer = new Object[capacity];
		writeLock = new Object();
	}
	
	public Many2OneBuffer() 
	{
		this(500);
	}
	
	public boolean  put(T obj)throws InterruptedException
	{
		synchronized(writeLock)
		{
			while(writeIndex == capacity)
			{
				writeLock.wait();
			}
			writeBuffer[writeIndex++] = obj;
			limit++;
			
			writeLock.notifyAll();
			return true;		
		}
	}
	
	public boolean put(T obj,int timeout)throws InterruptedException
	{
		synchronized(writeLock)
		{
			if(writeIndex == capacity)
			{
				writeLock.wait(timeout*1000);
			}
			
			if(writeIndex == capacity)
			{
				return false;
			}
			
			writeBuffer[writeIndex++] = obj;
			limit++;
			
			writeLock.notify();
			return true;		
		}
	}

	public T poll()throws InterruptedException
	{
		Object obj = null;
		
		if(readLimit == -1)
		{
			if(limit == -1)
			synchronized(writeLock)
			{
				while(limit == -1)
					writeLock.wait();
			}
//			if(limit== -1)
//				return  null;
			
			synchronized(writeLock)
			{
				//      
				Object [] tmp = null;
				tmp = this.writeBuffer;
				this.writeBuffer = this.readBuffer;
				this.readBuffer = tmp;
				readLimit = limit;
				this.readIndex = 0;
				this.writeIndex = 0;
				this.limit = -1;
				writeLock.notify();
			}
			
		}
	 
		//readLimit     0
		if(readIndex == readLimit )
		{
			obj = readBuffer[readIndex++];
			readLimit = -1;
		}
		else //if(readIndex<readLimit) readIndex        readLimit
		{
			obj = readBuffer[readIndex++];
		}
//		
		return (T)obj;
	}
	
	
	public boolean isFull()
	{
		return this.writeIndex == this.capacity;
	}
	
	public static void main(String args[]) throws InterruptedException
	{
		Many2OneBuffer buffer = new Many2OneBuffer(500);
		WriteThread writeThread = new WriteThread(buffer);
		writeThread.start();
		
		PutThread putThread1 = new PutThread(buffer,"PutThread1");
		PutThread putThread2 = new PutThread(buffer,"PutThread2");
		PutThread putThread3 = new PutThread(buffer,"PutThread3");
		PutThread putThread4 = new PutThread(buffer,"PutThread4");
		putThread1.start();
		putThread2.start();
		putThread3.start();
		putThread4.start();

		
		
		Thread.sleep(6*10*1000);
		putThread1.exit();
		putThread2.exit();
		putThread3.exit();
		putThread4.exit();
		writeThread.exit();
	}
	

}

class PutThread extends Thread
{
	Many2OneBuffer buffer ;
	String rawStr;
	boolean exit = false; 
	PutThread(Many2OneBuffer myBuffer,String rawStr)
	{
		buffer = myBuffer;
		this.rawStr = rawStr;
	}
	
	public void run()
	{
		int i = 0;
		while(!exit)
		{
			try {
				boolean succ = buffer.put(rawStr+"_"+i);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
//			if(!succ)
//				System.out.println("put failed :"+rawStr+"_"+i);
			i++;
		}
//		System.out.println("put thread exit_"+this.rawStr);
	}
	
	public void exit()
	{
		exit = true;
		this.interrupt();
	}
	
}

class WriteThread extends Thread
{
	Many2OneBuffer buffer ;
	boolean exit = false;
	WriteThread(Many2OneBuffer myBuffer)
	{
		buffer = myBuffer;
	}
	
	public void exit()
	{
		exit = true;
	}
	
	public void run()
	{
		try {
			BufferedWriter bw = new BufferedWriter(new FileWriter("e:/write.txt",false));
			int count = 0;
			while(!exit)
			{
				Object obj = buffer.poll();
				if(obj!=null)
				{
					bw.write(obj.toString()+"
"); count++; bw.flush(); if(count==100) { this.sleep(100); count = 0; } } } System.out.println("write thread exit"); bw.flush(); bw.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }

좋은 웹페이지 즐겨찾기