MR(Mapreduce) 여러 Job의 작업 실행 설정

4379 단어 Mapreduce 학습

 
MapReduce 여러 job 작업의 본질적인 원리는 job2 작업이 job1 작업에 의존하여 되돌아오는 결과인 job1의 출력 경로가 job2의 입력 경로라는 것이다.
job2 작업의 시작은 job1 작업이 끝나고 돌아오는 상태에 의존합니다.
자신의 실제 경험에 따르면 여러 개의job의 직렬 또는 병렬 트리거는 몇 개의 프로그램을 나누어 쓰고 스크립트로 실행 순서를 제어하면 프로그램의 디버깅, 관리에 편리하다고 생각합니다.
물론 그런 상황을 선택하는 것도 프로젝트의 수요에 따라 결정된다.
여러 job 종속 코드는 다음과 같습니다.
package more_job;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class moreJob {
	private static final LongWritable num = new LongWritable(1);
	static int pv = 0;

	public static class MMap extends Mapper {
		@Override
		protected void map(LongWritable key, Text value, Mapper.Context context)
				throws IOException, InterruptedException {
			String line[] = value.toString().split("\t");
			String url = line[3];
			if(url.contains("baidu.com")){
				context.write(new Text(url), num);
			}
		}
	}
	
	public static class MRed extends Reducer{
		@Override
		protected void reduce(Text key, Iterable value,
				Reducer.Context context) throws IOException, InterruptedException {
			for(LongWritable i :value){
				pv++;
			}
			context.write(new Text(key), new Text(Integer.toString(pv)));
		}
	}
	
	public static class MMap2 extends Mapper {
		@Override
		protected void map(LongWritable key, Text value, Mapper.Context context)
				throws IOException, InterruptedException {
			String line[] = value.toString().split("\t");
			String url = line[0].split("baidu.com")[0];
			context.write(new Text(url), num);
		}
	}
	
	public static class MRed2 extends Reducer{
		@Override
		protected void reduce(Text key, Iterable value,
				Reducer.Context context) throws IOException, InterruptedException {
			for(LongWritable i :value){
				pv++;
			}
			context.write(new Text(key), new LongWritable(pv));
		}
	}
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
                //job1   
		Job job1 = Job.getInstance(conf, "Job1");
		job1.setJarByClass(moreJob.class);
		job1.setMapperClass(MMap.class);
		job1.setReducerClass(MRed.class);
		job1.setMapOutputKeyClass(Text.class);
		job1.setMapOutputValueClass(LongWritable.class);
		FileInputFormat.addInputPath(job1, new Path(args[0]));   
                FileOutputFormat.setOutputPath(job1, new Path(args[1])); 
                FileSystem fs =new Path(args[1]).getFileSystem(conf);
                if(fs.exists(new Path(args[1]))){  
                      fs.delete(new Path(args[1]), true); 
                }  
		job1.setMaxMapAttempts(4);
		job1.setNumReduceTasks(50);
        
		/*	
		 * job1      job2     
		 *   job1       ,       job2
		 * job2    job1     ,     job1           。
		*/
			
		if(job1.waitForCompletion(true)){
                        //job2   
	                Job job2 = Job.getInstance(conf, "Job2");
			job2.setJarByClass(moreJob.class);
			job2.setMapperClass(MMap2.class);
			job2.setReducerClass(MRed2.class);
			job2.setMapOutputKeyClass(Text.class);
			job2.setMapOutputValueClass(LongWritable.class);
			job2.setOutputValueClass(LongWritable.class);
			FileInputFormat.addInputPath(job2, new Path(args[1]));   
	                FileOutputFormat.setOutputPath(job2, new Path(args[2]));
	                if(fs.exists(new Path(args[2]))){  
	                   fs.delete(new Path(args[2]), true); 
	                }  
			job2.setMaxMapAttempts(4);
			job2.setNumReduceTasks(50);
			System.exit(job2.waitForCompletion(true) ? 0 : 1); 
		}
	}
}

좋은 웹페이지 즐겨찾기