Hadoop에서 Bloom Filter 기반 연결
15295 단어 hadoopBloom filterhadoop join
처음으로 ITEye에 블로그를 썼는데 기분이 너무 설레요. 기술은 별로지만 그동안 Hadoop이 겪고 해결한 여러 가지 문제를 기록하기 위해 여러분과 작은 경험을 공유하고 나중에 필기를 하도록 하겠습니다.
2.윈도우즈에서 Hadoop 다운로드 및 설치
인터넷에는 Hadoop이 서로 다른 플랫폼 아래에서 설치하고 실행하고 개발하는 것에 관한 글이 많이 있습니다. 저는 여기서 군말하지 않겠습니다. 제 Hadoop 버전은 1.0.3이고 개발 환경은 윈도우즈XP입니다.6.1,hadoop이 eclipse 위에 있는 플러그인을 다운로드한다(다운로드한 hadoop의 원본의\src\contrib\eclipse-plugin 디렉터리 아래는 eclipse 플러그인의 원본이다.jar를 직접 컴파일하여 생성한 후 eclipse의pulgins 디렉터리에 놓으면 된다.내가 정상적으로 실행하는 eclipse의hadoop 플러그인을 첨부한다hadoop-eclipse-plugin-1.0.3.jar)
3.hadoop mapReduce datajoin
Bloom Filter는 비트 그룹을 이용하여 하나의 집합을 간결하게 표시하고 원소가 이 집합에 속하는지 판단하는 공간 효율이 높은 랜덤 데이터 구조이다.Bloom Filter의 주요 장점은 크기(비트 개수)가 상수이고 초기화할 때 설정된다는 것이다.Bloom Filter에 더 많은 요소를 추가하더라도 크기는 증가하지 않습니다.그것은 오보의 확률만 증가시킬 뿐이다.하지만 이 확률은 매우 낮다.Bloom Filter 원리 소개
hadoop은join을 조작할 때 주로 네 가지join방식이 있는데,
3.1, Reduce 측면 연결
mr 프로그램은 두 개의 파일을 처리하는데, 두 개의 파일 이름은 각각aa이다.txt와 bbb.txt, 맵 프로그램은 DataJoinMapper Base 클래스를 데이터 원본에 대응하는 tag(tag는 일반적으로 파일의 파일 이름으로 데이터 구조는 같지만 분할된 파일, 예를 들어aaa.txt는aaa 1.txt와aaa 2.txt로 분할하면 tag는aaa), 데이터 원본과 연결된 키를 설정하고 Reduce 프로그램에서 같은 연결키의 모든 기록을 함께 처리한다.reduce () 는 패키지 해제를 통해 원시 기록과 탭에 따라 기록된 데이터 원본을 얻을 수 있습니다.Reduce 측에서 연결 조작을 실행하는데 그 중에서 데이터join 패키지는 주요한 연결 조작을 실현하였다
3.2 DistributedCache 기반 복제 연결
여러 개의 데이터 원본을 연결할 때 그 중 작은 데이터 원본을 선택하여 메모리에 넣을 수 있다. 우리는 작은 데이터 원본을 통해 모든 마퍼에게 복제하고 마퍼 측면에서 연결을 실현하여 효율을 크게 향상시킬 수 있다.첫 번째, 작업을 설정할 때, 정적 방법인 Distributed Cache를 호출할 수 있습니다.addCachefile () 는 모든 노드에 전파될 파일을 설정합니다. 두 번째 부분은 각각의 단독 TaskTracker에 있는 마퍼가 정적 방법인DistributedCache를 호출합니다.getLocalcacheFiles () 는 그룹의 로컬 복사본이 있는 로컬 파일의 경로를 가져옵니다(나는 여러 번 테스트한 적이 있다. eclipse 플러그인이 당신의 Hadoop 환경에 연결될 때 기본적으로 설정 파라미터를 설정해 준다. 이 설정 파라미터는 모두 상대적인 경로이다. 윈도우즈 아래에서 개발하면 프로젝트가 있는 디스크의 루트 디렉터리에 기본적으로 생성된다. 가져온 URL은 직접 읽을 수 없고 비극적이다. 현재 프로젝트에 저장된 디스크만 수동으로 추가된다.부경로)
3.3, 반연결: 맵 사이드 필터 후 Reduce 사이드 연결
작은 데이터 원본이 메모리에 저장되지 않으면 작은 데이터 원본의 키를 모두 꺼내서 키 목록을 저장하는 파일을 만들 수 있습니다. 맵 단계에서Distributed Cache를 사용하여 키 값 파일을 각Task Tracker에 복사합니다. 필요하지 않거나 대응하는 데이터 원본의 키 값 목록을 제거하고 나머지는 Reduce측과 연결됩니다.
3.4, Reduce 측면 연결 + Bloom Filter
일부 경우, 옆으로 연결된 추출된 작은 시계의 키 집합이 메모리에 저장되지 않을 때, 공간을 절약하기 위해 BloomFiler를 사용할 수 있습니다.
뒤에 자세한 설명이 있을 거예요.
hadoop mapReduce join에 대한 더 자세한 설명은 여기 참조: hadoop mapReduce join
4. Reduce 사이드 연결 및 Bloom Filter 개발에 대한 상세 설명
4.1 Bloom Filter 구현
Bloom Filter의 실현은 개념적으로 매우 직관적이다. 그 내면은 m비트 비트의 수조로 표현된다. 우리는 k개의 독립된 산열 함수를 가지고 있다. 여기에 각 산열 함수의 입력은 하나의 대상이고 출력은 0과 m-1 사이에 있는 정수이다. 우리는 이 출력된 정수를 비트 그룹의 인덱스로 사용한다.
대상이 왔을 때, Bloom Filter에 추가되었는지 확인하려면, 대상을 추가할 때 같은 k개의 산열 함수를 사용하여 비트 데이터의 인덱스를 생성합니다.모든 비트레이트 그룹의 값이 모든 상황에서 0, 즉 존재하지 않는다고 가정하면 생성된 인덱스에 대응하는 값이 1이면 존재를 의미하고true, 0이면 존재하지 않음을 의미하며false를 의미한다.여기에 누락된 보고가 없습니다. 두 대상이 생성한 비트값이 같을 때만 오보가 있을 수 있지만, Reduce 측에서는 이러한 오보 데이터를 버릴 것입니다.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.BitSet;
import java.util.Random;
import org.apache.hadoop.io.Writable;
public class BloomFilter2<E> implements Writable{
private BitSet bf;
private int bitArraySize = 100000000;
private int numHashFunc = 6;
public BloomFilter2(){
bf = new BitSet(bitArraySize);
}
public void add(E obj){
int[] indexes = getHashIndexes(obj);
for(int index : indexes){
bf.set(index);
}
}
public boolean contains(E obj){
int[] indexes = getHashIndexes(obj);
for(int index : indexes){
if(!bf.get(index)){
return Boolean.FALSE;
}
}
return Boolean.TRUE;
}
protected int[] getHashIndexes(E obj){
int[] indexes = new int[numHashFunc];
long seed = 0;
byte[] digest;
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(obj.toString().getBytes());
digest = md.digest();
for(int i = 0;i < numHashFunc;i++){
seed = seed ^ (((long)digest[i] & 0xFF)) << (8*i);
}
} catch (Exception e) {
e.printStackTrace();
}
Random rdm = new Random(seed);
for(int i=0;i<numHashFunc;i++){
indexes[i] = rdm.nextInt(bitArraySize);
}
return indexes;
}
public void union(BloomFilter2<E> other){
bf.or(other.bf);
}
@Override
public void readFields(DataInput in) throws IOException {
int byteArraySize = (int) (bitArraySize / 8);
byte[] byteArray = new byte[byteArraySize];
in.readFully(byteArray);
for(int i=0;i<byteArraySize;i++){
byte nextByte = byteArray[i];
for(int j=0;j<8;j++){
if(((int)nextByte & (1<<j)) != 0){
bf.set(8*i+j);
}
}
}
}
@Override
public void write(DataOutput out) throws IOException {
int byteArraySize = (int) (bitArraySize / 8);
byte[] byteArray = new byte[byteArraySize];
for(int i = 0;i<byteArraySize;i++){
byte nextElement = 0;
for(int j = 0;j<8;j++){
if(bf.get(8*i+j)){
nextElement |= 1 << j;
}
}
byteArray[i] = nextElement;
}
out.write(byteArray);
}
}
이것은 Hadoop in action 책에서 발췌한 Bloom Filter의 실현이다. 두 집합의 합병을 할 때 union() 방법으로 교묘하게 실현한다. 각 맵퍼는 자신의 데이터에 따라 Bloom Filter를 구성한다. 우리는 이 Bloom filter를 하나의 단일한 Reducer에 보내서 그것들을 귀속시키고 최종 출력을 기록한다.
Bloom Filter는 마퍼의 출력에 따라 혼란스러워지기 때문에, Bloom Filter 클래스는wirtable 인터페이스를 실현해야 합니다. 이것은 wirter () 와readFileds () 방법을 포함합니다. 이러한 방법은 내부에 있는 BitSet 표시와 한 바이트 그룹의 전환을 실현하여 이 데이터를 DataInput/Data Outputput으로 서열화할 수 있도록 합니다.
4.2 기본 및 Bloom Filter의 mapreduce 프로그램
위의 코드를 보시면 마프리듀스 프로그램의 실현을 잘 아실 겁니다. 맵 프로그램에서 BloomFilter 클래스 변수를 실례화하고 매번 맵 방법에서 키 값을 BloomFliter에 추가합니다.
bloomfilter.add(key.toString());
부류를 다시 쓰는 close 방법 (hadoop0.2 버전 이전에만 있음), 더 높은 버전이면 cleanup (context) 방법으로 대체해야 하며, 이 방법에서bloom Filter를 출력합니다.
Reduce 코드에서도 부류의configuer(0.2버전 이후setup())와close()(cleanup()) 방법, 그리고reduce 방법이 필요합니다.
전역적인jobconf(0.20 이후context 획득) 변수를 정의하고configuer 방법에서job 대상을 초기화하며,reduce 방법에서 가장 데이터 연결이bloomFliter입니다.union(value);close () 방법에서 연결된 기록을 출력합니다.
표현이 잘 안 될 수 있습니다. 왜냐하면 저는 블룸 필터를 사용자 정의하지 않았기 때문에 책의 예를 컴파일했을 뿐입니다. Hadoop은 0.2.0버전 이후에 자신이 실현한 블룸 필터류가 있고 주로 Hadoop이 스스로 실현한 블룸 필터류를 사용합니다.
코드가 안 붙어요. Hadoop in action이라는 책에 다 있어요. 아래에 Hadoop 자체로 Bloom Filter를 실현하는 Maperduce 프로그램과 다른 점이 있어요.
4.3:hadoop 자체에 의한 Bloom Filter의 mapreduce 구현
Bloom filter 소스:\src\core\org\apache\hadoop\util 디렉토리
Bloom filter api: http://hadoop.apache.org/common/docs/r0.20.0/api/org/apache/hadoop/util/bloom/BloomFilter.html#BloomFilter%28int,%20int,%20int%29
Hadoop의 Bloom Filter를 정의합니다. 두 개의 구조 함수가 있는데 하나는 무참이고 하나는 유참이며, 무참한 구조 함수는 Reduce 함수의 출력을 지정할 때 사용됩니다. 우리는mapreduce에서 유참한 구조 함수를 사용해야 합니다.
public BloomFilter
(int vectorSize,
int nbHash,
int hashType)
api를 보면 세 개의 매개 변수의 뜻을 쉽게 이해할 수 있다. 첫 번째 매개 변수는vector의 크기이다. 이 값을 최대한 크게 주면 해시 대상이 있을 때 색인이 중복되는 것을 피할 수 있다. 두 번째 매개 변수는 산열 함수의 개수이고 세 번째는hash의 유형이다. int형이지만 기본 두 개의 값만 있다.원본을 보십시오. Hadoop의hash클래스에서 두 개의 상수를 지정하는 것은 두 개의hash유형의 실현입니다. 기본적으로Hash클래스에서 그 중의 변수를 지정할 수 있습니다.
public static final int INVALID_HASH = -1;
/** Constant to denote {@link JenkinsHash}. */
public static final int JENKINS_HASH = 0;
/** Constant to denote {@link MurmurHash}. */
public static final int MURMUR_HASH = 1;
public static int parseHashType(String name) {
if ("jenkins".equalsIgnoreCase(name)) {
return JENKINS_HASH;
} else if ("murmur".equalsIgnoreCase(name)) {
return MURMUR_HASH;
} else {
return INVALID_HASH;
}
이렇게 지정하지 않으면dd() 방법에서 빈 바늘의 이상을 던집니다. 원본 코드를 보면hash류가 초기화되지 않았기 때문인 것을 알 수 있습니다. bloom Filter의 구조 함수의 매개 변수가 잘못되었기 때문입니다!
그 다음에 맵 클래스와 Reduce 클래스에서 실례화된 Bloomfilter의 매개 변수가 가장 좋고 같지 않으면 하나를 던집니다
throw new IllegalArgumentException("filters cannot be and-ed");
or 방법을 호출하면 던집니다
throw new IllegalArgumentException("filters cannot be or-ed");
if(filter == null
|| !(filter instanceof BloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be and-ed");
}
이 이상을 보면 맵과 Reduce 클래스에서 초기화된 Bloom Filter가 다르기 때문이라는 것을 알 수 있습니다. 빨리 검사하면 ok입니다. Hadoop 자체가 구현한 Bloom Filter를 호출한 원본을 붙여 보세요.
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
public class HadoopSevenMapperDemo extends Configured implements Tool {
public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, BloomFilter>{
BloomFilter bloomFilter = new BloomFilter(1000000,6,Hash.JENKINS_HASH);
OutputCollector<Text, BloomFilter> ct = null;
@Override
public void map(Text key, Text arg1,
OutputCollector<Text, BloomFilter> output, Reporter arg3)
throws IOException {
if(ct == null){
ct = output;
}
System.out.println(key.toString()+"``````````````");
bloomFilter.add(new Key(key.toString().getBytes()));
}
@Override
public void close(){
try {
ct.collect(new Text("testKey"), bloomFilter);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class ReduceClass extends MapReduceBase implements Reducer<Text, BloomFilter, Text, Text>{
JobConf job = null;
BloomFilter bf = new BloomFilter(1000000,6,Hash.JENKINS_HASH);
@Override
public void configure(JobConf job) {
this.job = job;
}
@Override
public void reduce(Text key, Iterator<BloomFilter> values,OutputCollector<Text, Text> output, Reporter arg3)
throws IOException {
while(values.hasNext()){
bf.or(values.next());
}
}
@Override
public void close() throws IOException {
Path file = new Path(job.get("mapred.output.dir")+"/bloomfilter");
FSDataOutputStream out = file.getFileSystem(job).create(file);
bf.write(out);
out.close();
}
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf,HadoopSevenMapperDemo.class);
Path in = new Path("/user/Administrator/input7");
Path out = new Path("/user/Administrator/output7");
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("HadoopSevenMapperDemo");
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(1);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BloomFilter.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) {
int res;
try {
res = ToolRunner.run(new Configuration(),new HadoopSevenMapperDemo(), args);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
원본 코드에서 인용된 가방은 모두 마레드 아래의 클래스입니다. 이것은 0.2.0 이전 버전의 클래스이기 때문에 1.0.3 버전에서 여전히 호환됩니다. 새로운 버전의 클래스를 사용하려면 마레드 아래의 클래스를 인용하면 됩니다. 많은 클래스의 이름이 바뀌지 않았기 때문에 다른 가방 아래에 있을 뿐입니다.
5: 참고 자료
(1) 서적'Hadoop In Action'제5장
(2) BloomFilter 소개: http://blog.csdn.net/jiaomeng/article/details/1495500
(3)hadoop mapreduce join : http://xubindehao.iteye.com/blog/1405860
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Azure HDInsight + Microsoft R Server에서 연산 처리 분산Microsoft Azure HDInsight는 Microsoft가 제공하는 Hadoop의 PaaS 서비스로 인프라 주변의 구축 노하우를 몰라도 훌륭한 Hadoop 클러스터를 구축할 수 있는 훌륭한 서비스입니다. 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.