hbase-0.92.1-cdh4.1.3 HTablePool 구현

hbase-0.92.1-cdh4.1.3 HTablePool 구현:
 
1. PoolType: Reusable: (기본값) 하나의 실례 탱크, 다중 루틴 복용, 내부는ConcurrentLinkedQueue로 여러 실례 HTable를 설치;ThreadLocal: 모든 라인은 하나의 실례만 있고 라인과 라인 사이에는 서로 영향을 주지 않습니다.ThreadLocal;스레드가 증가함에 따라 Pool의 HTable가 증가하지만 서로 영향을 주지 않는 것이 특징이다.RoundRobin: Pool의 HTable는CopyOnWriteArrayList로 포장;
2. 초기화: HTablePool = new HTablePool(conf, 5);//기본 PoolType.Reusablepool = new HTablePool(conf, maxSize, tableFactory, PoolMap.PoolType.ThreadLocal);//PoolMap.PoolType.ThreadLocalpool = new HTablePool(conf, maxSize, tableFactory, PoolMap.PoolType.RoundRobin);//PoolMap.PoolType.RoundRobin 실례화 PoolMap 실례화 HTablePool, 이 때 HTable 실례가 없습니다.tables가 비어 있습니다.
3. HTableInterface 객체 가져오기: pool.getTable(TEST_TABLE_NAME);테이블이 테이블을 포함하고 있는지 확인하십시오. 그렇지 않으면 HTable 실례를 만들어서 HTable 실례를 PooledHTable 실례로 봉하여 PooledHTable 실례를 되돌려줍니다.close();PoolMap 뒤에 놓기;
4. HTablePool은 모든 Table의 HTableInterface 실례를 수용할 수 있습니다.HTable 실례는 같은 zookeeper 연결 HTable 실례를 공유합니다. 같은 Region Server에서 같은 연결 HBase Client $Connection HTable Pool에 최대 사이즈가 있지만 HTable 실례가 이 사이즈보다 크지 못하도록 제한하지 않습니다. 이 사이즈를 초과하면 실례화되지만 실례 풀로 돌아갈 때 풀이 가득 차면 버려집니다.HTable 인스턴스 스레드가 안전하지 않습니다.
 
주의점: 1.다중 스레드에서 HTablePool을 사용하여 같은 테이블의 HTable를 가져올 때, 스레드 개수가 maxsize보다 크면, 쓰기가 항상 autoflush입니다!
public HTableInterface getTable(String tableName) {
   // call the old getTable implementation renamed to findOrCreateTable
   HTableInterface table = findOrCreateTable(tableName);
   // return a proxy table so when user closes the proxy, the actual table
   // will be returned to the pool
   return new PooledHTable(table);
}
public void close() throws IOException {
   returnTable(table);
}
private void returnTable(HTableInterface table) throws IOException {
   // this is the old putTable method renamed and made private
   String tableName = Bytes.toString(table.getTableName());
   if (tables.size(tableName) >= maxSize) {
     // release table instance since we're not reusing it
     this.tables.remove(tableName, table);
     this.tableFactory.releaseHTableInterface(table);
     return;
   }
   tables.put(tableName, table);
}

 
하면, 만약, 만약...maxsize가 maxsize보다 크면 저장된 HTable 대상을 제거하고releaseHTableInterface가 실제적으로 호출하는 것이 HTable의close 방법입니다.close 방법은flushHTable의buffer를 강제하기 때문에 autoflush를 사용하지 않고 쓰기 속도를 높이려면 효력을 상실합니다.
2. flushCommit(고정 주파수 + 메모리 사용량 >1M)
@Override
public void put(final List<Put> puts) throws IOException {
    super.put(puts);
    needFlush();
}
private void needFlush() throws IOException {
    long currentTime = System.currentTimeMillis();
    if ((currentTime - lastFlushTime.longValue()) > flushInterval) {
        super.flushCommits();
        lastFlushTime.set(currentTime);
    }
}

 
사용 코드 예제 초기화
 
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;

public class HTablePoolTest2 {

	protected static String TEST_TABLE_NAME = "testtable";

	protected static String ROW1_STR = "row1";
	protected static String COLFAM1_STR = "colfam1";
	protected static String QUAL1_STR = "qual1";

	private final static byte[] ROW1 = Bytes.toBytes(ROW1_STR);
	private final static byte[] COLFAM1 = Bytes.toBytes(COLFAM1_STR);
	private final static byte[] QUAL1 = Bytes.toBytes(QUAL1_STR);

	private static HTablePool pool;

	@BeforeClass
	public static void runBeforeClass() throws IOException {
		Configuration conf = HBaseConfiguration.create();
		//     PoolType.Reusable
		pool = new HTablePool(conf, 10);
		//      pool
		HTableInterface[] tables = new HTableInterface[10];
		for (int n = 0; n < 10; n++) {
			tables[n] = pool.getTable(TEST_TABLE_NAME);
		}
		// close ,PooledTable    pool
		for (HTableInterface table : tables) {
			table.close();
		}
	}

	@Test
	public void testHTablePool() throws IOException, InterruptedException,
			ExecutionException {
		Callable<Result> callable = new Callable<Result>() {
			public Result call() throws Exception {
				return get();
			}
		};
		FutureTask<Result> task1 = new FutureTask<Result>(callable);
		FutureTask<Result> task2 = new FutureTask<Result>(callable);
		Thread thread1 = new Thread(task1, "THREAD-1");
		thread1.start();
		Thread thread2 = new Thread(task2, "THREAD-2");
		thread2.start();
		Result result1 = task1.get();
		System.out.println(Bytes.toString(result1.getValue(COLFAM1, QUAL1)));
		Result result2 = task2.get();
		System.out.println(Bytes.toString(result2.getValue(COLFAM1, QUAL1)));
	}

	private Result get() {
		HTableInterface table = pool.getTable(TEST_TABLE_NAME);
		Get get = new Get(ROW1);
		try {
			Result result = table.get(get);
			return result;
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		} finally {
			try {
				table.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}

http://blog.csdn.net/mrtitan/article/details/8892815 http://helpbs.iteye.com/blog/1492054

좋은 웹페이지 즐겨찾기