[Hadoop] MapReduce 연습: 다 중 job 관련 역 색인 실현

8708 단어
개술
역 배열 색인 (영어: Inverted index) 은 역방향 색인, 파일 삽입 또는 역방향 파일 이 라 고도 불 리 며 전체 텍스트 검색 에 저 장 된 한 단어 가 한 문서 나 한 문서 에 저 장 된 위 치 를 나타 내 는 색인 방법 입 니 다.그것 은 문서 검색 시스템 에서 가장 자주 사용 하 는 데이터 구조 이다.역 열 색인 을 통 해 이 단 어 를 포함 하 는 문서 목록 을 단어 에 따라 빠르게 가 져 올 수 있 습 니 다.역 배열 색인 은 주로 두 부분 으로 구성 된다. '단어 사전' 과 '역 배열 파일' 이다.
역 배열 색인 은 두 가지 서로 다른 역방향 색인 형식 이 있 습 니 다. 기 록 된 수평 역방향 색인 (또는 역방향 파일 색인) 은 모든 단 어 를 참조 하 는 문서 의 목록 을 포함 합 니 다.한 단어의 수평 역방향 색인 (또는 완전 역방향 색인) 은 각 단어 가 한 문서 에 있 는 위 치 를 포함한다.후자 의 형식 은 더 많은 호환성 (예 를 들 어 구문 검색) 을 제공 하지만 더 많은 시간 과 공간 이 필요 합 니 다.
현대 검색엔진 의 색인 은 모두 역 배열 색인 에 기초 하고 있다.'서명 파일', '접미사 트 리' 등 색인 구조 에 비해 '거꾸로 색인' 은 단어 에서 문서 맵 관 계 를 실현 하 는 가장 좋 은 실현 방식 과 가장 효과 적 인 색인 구조 이다.
다 중 Job 직렬 연결: 첫 번 째 job 에서 발생 한 출력 결 과 는 두 번 째 job 의 입력 입 니 다. 두 번 째 job 가 실행 되 는 전 제 는 첫 번 째 job 의 출력 결 과 를 얻 는 것 입 니 다. 두 번 째 job 는 첫 번 째 job 에 의존 하고 두 번 째 job 는 직렬 실행 관계 입 니 다.job1----->job2----->jobn
예시
수요: 대량의 텍스트 (문서, 웹 페이지) 가 있 습 니 다. 검색 색인 을 만들어 야 합 니 다.
예제: a. txt, b. txt, c. txt 세 개의 파일 이 있 고 각 파일 은 각각 키워드 에 대응 합 니 다.
a. txt 는 다음 과 같 습 니 다.
map
reduce
MapReduce
index Inverted index
Inverted index
    
   
hadoop MapReduce hdfs
Inverted index

b. txt 는 다음 과 같다.
hadoop MapReduce hdfs
Inverted index
    
   
map
reduce
MapReduce

c. txt 는 다음 과 같다.
Inverted index
    
   
hadoop MapReduce hdfs
Inverted index
hadoop MapReduce hdfs
Inverted index
map
reduce
MapReduce

위의 파일 을 분석 하여 역 렬 색인 을 실현 하고 다 중 job 관련 을 진행 하려 면 두 개의 job 가 분석 해 야 합 니 다.
첫 번 째 job 는 다음 과 같은 방식 을 실현 하 기 를 기대 합 니 다.
Inverted    a.txt-->3
Inverted    b.txt-->1
Inverted    c.txt-->3
MapReduce   a.txt-->2
MapReduce   b.txt-->2
MapReduce   c.txt-->3
hadoop  a.txt-->1
hadoop  b.txt-->1
hadoop  c.txt-->2
hdfs    a.txt-->1
hdfs    b.txt-->1
hdfs    c.txt-->2
index   a.txt-->4
index   b.txt-->1
index   c.txt-->3
map a.txt-->1
map b.txt-->1
map c.txt-->1
reduce  a.txt-->1
reduce  b.txt-->1
reduce  c.txt-->1
        a.txt-->1
        b.txt-->1
        c.txt-->1
    a.txt-->1
    b.txt-->1
    c.txt-->1

두 번 째 job 는 최종 효 과 를 기대 합 니 다.
Inverted    b.txt-->1   a.txt-->3   c.txt-->3   
MapReduce   a.txt-->2   b.txt-->2   c.txt-->3   
hadoop  a.txt-->1   b.txt-->1   c.txt-->2   
hdfs    a.txt-->1   b.txt-->1   c.txt-->2   
index   b.txt-->1   c.txt-->3   a.txt-->4   
map a.txt-->1   b.txt-->1   c.txt-->1   
reduce  a.txt-->1   b.txt-->1   c.txt-->1   
        a.txt-->1   b.txt-->1   c.txt-->1   
    a.txt-->1   b.txt-->1   c.txt-->1   

코드 상세 정보
IndexOne
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class IndexOne { }

class IndexMapperOne extends Mapper {
    String name;
    Text tKey = new Text();
    IntWritable tValue = new IntWritable();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //      
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        //     
        name = inputSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //    
        String line = value.toString();
        //     
        String[] words = line.split(" ");
        for (String word : words) {
            //           ,     key
            tKey.set(word + "@" + name);
            tValue.set(1);
            context.write(tKey, tValue);
        }
    }
}

class IndexReduceOne extends Reducer {

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        String line = key.toString();
        String[] splits = line.split("@");
        StringBuilder builder = new StringBuilder();
        int count = 0;
        for (IntWritable number : values) {
            count += number.get();
        }
        builder.append(splits[1] + "-->" + count);
        context.write(new Text(splits[0]), new Text(builder.toString()));
    }
}

IndexTwo
package descIndex;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class IndexTwo {

}

class IndexMapperTwo extends Mapper {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] splits = line.split("\t");
        context.write(new Text(splits[0]), new Text(splits[1]));
    }
}

class IndexReduceTwo extends Reducer {

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        StringBuilder builder = new StringBuilder();
        for (Text txt : values) {
            builder.append(txt + "\t");
        }
        context.write(key, new Text(builder.toString()));
    }
}

JobUtils
package descIndex;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

public class JobUtils {

    private static Configuration conf;

    static {
        conf = new Configuration();
    }

    /**
     *   job  
     * @param paths          
     * @param params       
     * @return  job  
     * @throws IOException
     */
    public static Job getJobInstance(String[] paths, Class... params) throws IOException {

        Job job = Job.getInstance(conf);
        job.setJarByClass(params[0]);
        job.setMapperClass(params[1]);
        job.setReducerClass(params[2]);

        job.setMapOutputKeyClass(params[3]);
        job.setMapOutputValueClass(params[4]);
        job.setOutputKeyClass(params[5]);
        job.setOutputValueClass(params[6]);

        FileInputFormat.setInputPaths(job, new Path(paths[0]));
        FileOutputFormat.setOutputPath(job, new Path(paths[1]));
        return job;
    }

}

IndexDriver
package descIndex;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class IndexDriver {
    public static void main(String[] args) throws IOException, InterruptedException {

        String[] paths1 = {"F:/index", "F:/outindex1"};
        Job job1 = JobUtils.getJobInstance(paths1,IndexDriver.class,IndexMapperOne.class,IndexReduceOne.class
        ,Text.class,IntWritable.class,Text.class,Text.class);

        String[] paths2 = {"F:/outindex1", "F:/outindex2"};
        Job job2 = JobUtils.getJobInstance(paths2,IndexDriver.class,IndexMapperTwo.class,IndexReduceTwo.class
                ,Text.class,Text.class,Text.class,Text.class);

        ControlledJob cjJob1 = new ControlledJob(job1.getConfiguration());
        ControlledJob cjJob2 = new ControlledJob(job2.getConfiguration());
        //    job   control 
        JobControl ssrs = new JobControl("ssrs");
        //  job
        ssrs.addJob(cjJob1);
        ssrs.addJob(cjJob2);
        //        
        cjJob2.addDependingJob(cjJob1);
        //    
        Thread t1 = new Thread(ssrs);
        t1.start();

        while(!ssrs.allFinished()){
            Thread.sleep(1000);
        }

        System.exit(0);
    }
}

좋은 웹페이지 즐겨찾기