빅 데이터 - Hadop 학습 노트 09

7624 단어 빅 데이터 학습
30.MapReduce
    mapreduce            :map   reduce  。      k-v        ,         。
    map      NCDC    。              ,              。

1.  MR  
【  mapper】
public class MyMaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private static final int MISSING = 9999;
    /**
     * mapper
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context)
            throws IOException, InterruptedException {
        //value String
        String line = value.toString();
        //    
        String year = line.substring(15, 19);
        //    
        int airTemperature;
        if (line.charAt(87) == '+') {
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        //  
        String quality = line.substring(92, 93);
        //        
        if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

【 Reducer 생 성 】
public class MyMaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override
    protected void reduce(Text key, Iterable values,
            Reducer.Context context) throws IOException, InterruptedException {
        //       
        int maxValue = Integer.MIN_VALUE;
        //        
        for(IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        //    
        context.write(key, new IntWritable(maxValue));
    }
}

[앱 실행 작업 생 성]
public class MyMaxTempApp {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.println("Usage: MaxTemperature  ");
            System.exit(1); 
        }
        Job job = Job.getInstance();
        job.setJarByClass(MyMaxTempApp.class);
        //      
        job.setJobName("Max temp");
        //    
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //    
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //  mapper  
        job.setMapperClass(MyMaxTempMapper.class);
        //  reducer  
        job.setReducerClass(MyMaxTempReducer.class);
        //      key   
        job.setOutputKeyClass(Text.class);
        //      value   
        job.setOutputValueClass(IntWritable.class);
        //    job
        System.out.println(job.waitForCompletion(true) ? 0 : 1);
    }
}

31. Job 제출 과정 분석
【    】
    map(  ) + reduce(  )
【    】
    1.job.waitForCompletion()
    2.submit()     cluster,     
      a)ensureState(JobState.DEFINE)    
      b)setUseNewAPI()    API
      c)connect()      
      d)  JobSubmitter
    3.submitter.submitJobInternal(Job.this, cluster)
      a)checkSpecs(job)      ,       
      b)JobSubmissionFiles.getStagingDir()  hdfs     
      c)InetAddress.getLocalHost()    ip
      d)submitClient.getNewJobID()    id
      e)copyAndConfigureFiles()         conf  
      f)writeSplits(job, submitJobDir)              
      g)conf.setInt(MRJobConfig.NUM_MAPS, maps)  map  
      h)writeConf(conf, submitJobFile)  job.xml     
      i)submitClient.submitJob()        
    4.submitClient.submitJob()         
      a)Job job = new Job()  LocalJobRunner.Job     
    5.Job job = new Job()
      a)       job.xml  JobConf
      b)this.start()    ,   run()  
    6.this.start()
      a)TaskSplitMetaInfo[]  task    
      b)getMapTaskRunnables()  mapper   runnable
      c)runTasks(mapRunnables, mapService, "map")
      d)getReduceTaskRunnables()  reduce    
      e)runTasks(reduceRunnables, reduceService, "reduce")
    7.runTasks()
      for (Runnable r : runnables) {
          service.submit(r);
      }
    8.LocalJobRunner$Job$MapTaskRunnable
      a)  MapAttempId
      b)  MapTask
      c)  MaoOutFile
      d)map.setXXX()
      e)map.run()
      f)
    9.org.apache.hadoop.mapred.MapTask$run()
      a)runNewMapper()
    10.runNewMapper
      a)  taskContext
      b)taskContext.getMapperClass()     Mapper  
      c)  InputFormat
      d)  split
      e)  NewOutputCollector context  
    11.mapper.run(mapperContext)
    12.MyMaxTempMapper$run()
      setup(context);
      try {
          while (context.nextKeyValue()) {
              map(...)
          }
      } finally {
          cleanup()
      }

【       】

         ,      
    hadoop jar jarFile classname arg1 arg2 ..
           

좋은 웹페이지 즐겨찾기