hbase-0.92.1-cdh4.1.3 HTablePool 구현
1. PoolType: Reusable: (기본값) 하나의 실례 탱크, 다중 루틴 복용, 내부는ConcurrentLinkedQueue로 여러 실례 HTable를 설치;ThreadLocal: 모든 라인은 하나의 실례만 있고 라인과 라인 사이에는 서로 영향을 주지 않습니다.ThreadLocal;스레드가 증가함에 따라 Pool의 HTable가 증가하지만 서로 영향을 주지 않는 것이 특징이다.RoundRobin: Pool의 HTable는CopyOnWriteArrayList로 포장;
2. 초기화: HTablePool = new HTablePool(conf, 5);//기본 PoolType.Reusablepool = new HTablePool(conf, maxSize, tableFactory, PoolMap.PoolType.ThreadLocal);//PoolMap.PoolType.ThreadLocalpool = new HTablePool(conf, maxSize, tableFactory, PoolMap.PoolType.RoundRobin);//PoolMap.PoolType.RoundRobin 실례화 PoolMap 실례화 HTablePool, 이 때 HTable 실례가 없습니다.tables가 비어 있습니다.
3. HTableInterface 객체 가져오기: pool.getTable(TEST_TABLE_NAME);테이블이 테이블을 포함하고 있는지 확인하십시오. 그렇지 않으면 HTable 실례를 만들어서 HTable 실례를 PooledHTable 실례로 봉하여 PooledHTable 실례를 되돌려줍니다.close();PoolMap 뒤에 놓기;
4. HTablePool은 모든 Table의 HTableInterface 실례를 수용할 수 있습니다.HTable 실례는 같은 zookeeper 연결 HTable 실례를 공유합니다. 같은 Region Server에서 같은 연결 HBase Client $Connection HTable Pool에 최대 사이즈가 있지만 HTable 실례가 이 사이즈보다 크지 못하도록 제한하지 않습니다. 이 사이즈를 초과하면 실례화되지만 실례 풀로 돌아갈 때 풀이 가득 차면 버려집니다.HTable 인스턴스 스레드가 안전하지 않습니다.
주의점: 1.다중 스레드에서 HTablePool을 사용하여 같은 테이블의 HTable를 가져올 때, 스레드 개수가 maxsize보다 크면, 쓰기가 항상 autoflush입니다!
public HTableInterface getTable(String tableName) {
// call the old getTable implementation renamed to findOrCreateTable
HTableInterface table = findOrCreateTable(tableName);
// return a proxy table so when user closes the proxy, the actual table
// will be returned to the pool
return new PooledHTable(table);
}
public void close() throws IOException {
returnTable(table);
}
private void returnTable(HTableInterface table) throws IOException {
// this is the old putTable method renamed and made private
String tableName = Bytes.toString(table.getTableName());
if (tables.size(tableName) >= maxSize) {
// release table instance since we're not reusing it
this.tables.remove(tableName, table);
this.tableFactory.releaseHTableInterface(table);
return;
}
tables.put(tableName, table);
}
하면, 만약, 만약...maxsize가 maxsize보다 크면 저장된 HTable 대상을 제거하고releaseHTableInterface가 실제적으로 호출하는 것이 HTable의close 방법입니다.close 방법은flushHTable의buffer를 강제하기 때문에 autoflush를 사용하지 않고 쓰기 속도를 높이려면 효력을 상실합니다.
2. flushCommit(고정 주파수 + 메모리 사용량 >1M)
@Override
public void put(final List<Put> puts) throws IOException {
super.put(puts);
needFlush();
}
private void needFlush() throws IOException {
long currentTime = System.currentTimeMillis();
if ((currentTime - lastFlushTime.longValue()) > flushInterval) {
super.flushCommits();
lastFlushTime.set(currentTime);
}
}
사용 코드 예제 초기화
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
public class HTablePoolTest2 {
protected static String TEST_TABLE_NAME = "testtable";
protected static String ROW1_STR = "row1";
protected static String COLFAM1_STR = "colfam1";
protected static String QUAL1_STR = "qual1";
private final static byte[] ROW1 = Bytes.toBytes(ROW1_STR);
private final static byte[] COLFAM1 = Bytes.toBytes(COLFAM1_STR);
private final static byte[] QUAL1 = Bytes.toBytes(QUAL1_STR);
private static HTablePool pool;
@BeforeClass
public static void runBeforeClass() throws IOException {
Configuration conf = HBaseConfiguration.create();
// PoolType.Reusable
pool = new HTablePool(conf, 10);
// pool
HTableInterface[] tables = new HTableInterface[10];
for (int n = 0; n < 10; n++) {
tables[n] = pool.getTable(TEST_TABLE_NAME);
}
// close ,PooledTable pool
for (HTableInterface table : tables) {
table.close();
}
}
@Test
public void testHTablePool() throws IOException, InterruptedException,
ExecutionException {
Callable<Result> callable = new Callable<Result>() {
public Result call() throws Exception {
return get();
}
};
FutureTask<Result> task1 = new FutureTask<Result>(callable);
FutureTask<Result> task2 = new FutureTask<Result>(callable);
Thread thread1 = new Thread(task1, "THREAD-1");
thread1.start();
Thread thread2 = new Thread(task2, "THREAD-2");
thread2.start();
Result result1 = task1.get();
System.out.println(Bytes.toString(result1.getValue(COLFAM1, QUAL1)));
Result result2 = task2.get();
System.out.println(Bytes.toString(result2.getValue(COLFAM1, QUAL1)));
}
private Result get() {
HTableInterface table = pool.getTable(TEST_TABLE_NAME);
Get get = new Get(ROW1);
try {
Result result = table.get(get);
return result;
} catch (IOException e) {
e.printStackTrace();
return null;
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
http://blog.csdn.net/mrtitan/article/details/8892815 http://helpbs.iteye.com/blog/1492054