MR(Mapreduce) 여러 Job의 작업 실행 설정
4379 단어 Mapreduce 학습
MapReduce 여러 job 작업의 본질적인 원리는 job2 작업이 job1 작업에 의존하여 되돌아오는 결과인 job1의 출력 경로가 job2의 입력 경로라는 것이다.
job2 작업의 시작은 job1 작업이 끝나고 돌아오는 상태에 의존합니다.
자신의 실제 경험에 따르면 여러 개의job의 직렬 또는 병렬 트리거는 몇 개의 프로그램을 나누어 쓰고 스크립트로 실행 순서를 제어하면 프로그램의 디버깅, 관리에 편리하다고 생각합니다.
물론 그런 상황을 선택하는 것도 프로젝트의 수요에 따라 결정된다.
여러 job 종속 코드는 다음과 같습니다.
package more_job;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class moreJob {
private static final LongWritable num = new LongWritable(1);
static int pv = 0;
public static class MMap extends Mapper {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String line[] = value.toString().split("\t");
String url = line[3];
if(url.contains("baidu.com")){
context.write(new Text(url), num);
}
}
}
public static class MRed extends Reducer{
@Override
protected void reduce(Text key, Iterable value,
Reducer.Context context) throws IOException, InterruptedException {
for(LongWritable i :value){
pv++;
}
context.write(new Text(key), new Text(Integer.toString(pv)));
}
}
public static class MMap2 extends Mapper {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String line[] = value.toString().split("\t");
String url = line[0].split("baidu.com")[0];
context.write(new Text(url), num);
}
}
public static class MRed2 extends Reducer{
@Override
protected void reduce(Text key, Iterable value,
Reducer.Context context) throws IOException, InterruptedException {
for(LongWritable i :value){
pv++;
}
context.write(new Text(key), new LongWritable(pv));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
//job1
Job job1 = Job.getInstance(conf, "Job1");
job1.setJarByClass(moreJob.class);
job1.setMapperClass(MMap.class);
job1.setReducerClass(MRed.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
FileSystem fs =new Path(args[1]).getFileSystem(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]), true);
}
job1.setMaxMapAttempts(4);
job1.setNumReduceTasks(50);
/*
* job1 job2
* job1 , job2
* job2 job1 , job1 。
*/
if(job1.waitForCompletion(true)){
//job2
Job job2 = Job.getInstance(conf, "Job2");
job2.setJarByClass(moreJob.class);
job2.setMapperClass(MMap2.class);
job2.setReducerClass(MRed2.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(LongWritable.class);
job2.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job2, new Path(args[1]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
if(fs.exists(new Path(args[2]))){
fs.delete(new Path(args[2]), true);
}
job2.setMaxMapAttempts(4);
job2.setNumReduceTasks(50);
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}
}