빅 데이터 학습 의 길 (8) - MapReduce 실전 (hot weather top 2)

이전의 WordCount 는 비교적 간단 합 니 다. 코드 만 올 랐 습 니 다. 그 다음 에 조금 복잡 한 항목 은 전체 프로젝트 의 작성 과정 을 기록 할 것 입 니 다.
매년, 매월 가장 더 운 이틀 의 온도 통계
데이터:
1949-10-01 14:21:02 34c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c

기대:
1949-10-02:36   36
1949-10-01:34   34
1950-10-02:41   41
1950-10-01:37   37
1951-07-03:47   47
1951-07-02:46   46

생각:
『 8195 』 1. 우리 가 해 야 할 일 은 매년 매달 최고 온 도 를 통계 하 는 것 이기 때문에 소스 데이터 에서 우리 가 필요 로 하 는 것 은 , , 이다.
그 다음 에 우 리 는 정렬 방식 sort 를 수정 하고 년, 월 의 오름차 순 (내림차 순 으로 배열 해도 된다) 에 따라 온도 의 내림차 순 으로 배열 해 야 한다. 그러면 우 리 는 최종 결과 에서 앞의 두 데 이 터 를 얻 으 면 ok 이다.
    3. 우 리 는 group 을 다시 쓰 고 MApReduce 의 조합 방식 을 수정 하여 년, 월 의 같은 데 이 터 를 같은 reducer 에 넣 어 계산 해 야 한다.
    4. 우리 의 데 이 터 는 더 이상 WordCount 처럼 간단 한 Text 가 아 닙 니 다. 우 리 는 자바 bean 을 써 서 Writable Comparable 류 를 실현 하여 우리 의 데 이 터 를 저장 해 야 합 니 다.
코드:
  • 먼저 자바 비 안 을 쓰 고 이름 을 짓 기 weather
  • 
    public class Weather implements WritableComparable {
        //  
        private Integer year;
        //  
        private Integer month;
        //  
        private Integer day;
        //   
        private Integer temperature;
    
        public Integer getYear() {
            return year;
        }
    
        public void setYear(int year) {
            this.year = year;
        }
    
        public Integer getMonth() {
            return month;
        }
    
        public void setMonth(int month) {
            this.month = month;
        }
    
        public Integer getDay() {
            return day;
        }
    
        public void setDay(int day) {
            this.day = day;
        }
    
        public Integer getTemperature() {
            return temperature;
        }
    
        public void setTemperature(int temperature) {
            this.temperature = temperature;
        }
    
        @Override
        public int compareTo(Object o) {
            Weather w = (Weather) o;
            int res1 = Integer.compare(year, w.getYear());
            if (res1 == 0) {
                int res2 = Integer.compare(month, w.getMonth());
                if (res2 == 0) {
                    return Integer.compare(w.getTemperature(), temperature);
                }
                return res2;
            }
            return res1;
        }
    
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(year);
            dataOutput.writeInt(month);
            dataOutput.writeInt(day);
            dataOutput.writeInt(temperature);
        }
    
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            year = dataInput.readInt();
            month = dataInput.readInt();
            day = dataInput.readInt();
            temperature = dataInput.readInt();
        }
    }
  • 이어서 우리 의 Mapper 류
  • public class MyMapper extends Mapper<LongWritable, Text, Weather, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
            Calendar c = Calendar.getInstance();
            /*
                    : 1949-10-01 14:21:02   34c
                         tab   ,     \t   
            */
            String line = value.toString();
            String[] list = StringUtils.split(line, '\t');
            if (list.length == 2) {
                // 1949-10-01 14:21:02
                String arg = list[0];
                // 34c
                String temp = list[1];
                Weather w = new Weather();
                try {
                    Date date =dateFormat.parse(arg);
                    c.setTime(date);
                    // 1949
                    w.setYear(c.get(Calendar.YEAR));
                    // 10
                    w.setMonth(c.get(Calendar.MONTH) + 1);
                    // 01 -> 1
                    w.setDay(c.get(Calendar.DATE));
                    // 34c -> 34,          ,          2,    c      
                    int t = Integer.parseInt(temp.substring(0, temp.toString().lastIndexOf("c")));
                    w.setTemperature(t);
                    context.write(w, new IntWritable(t));
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
        }
    }
  • map 가 끝 난 후에 우 리 는 분할 을 합 니 다. 기본 값 은 hash 분할 (사실은 쓰 지 않 아 도 됩 니 다. 데이터 양 이 평균 적 으로 분 배 될 수 있 도록 보장 하기 만 하면 됩 니 다)
  • public class MyPartitioner extends HashPartitioner<Weather, IntWritable>{
        @Override
        public int getPartition(Weather key, IntWritable value, int numReduceTasks) {
            //           ,         task,numReduceTasks task   
            return (key.getYear()-1949) % numReduceTasks;
        }
    }
  • 다음은 sort 방법 다시 쓰기
  • public class MySort extends WritableComparator {
        public MySort() {
            super(Weather.class,true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            //    Weather
            Weather w1 = (Weather) a;
            Weather w2 = (Weather) b;
            int res1 = w1.getYear().compareTo(w2.getYear());
            if (res1 == 0)
            {
                int res2 = w1.getMonth().compareTo(w2.getMonth());
                if (res2 == 0) {
                    // -w1.getTemperature().compareTo(w2.getTemperature());   
                    //            
                    return w2.getTemperature().compareTo(w1.getTemperature());
                }
                return res2;
            }
            return res1;
        }
    }
  • 그리고 group 방법
  • //            ,group          ok 
    public class MyGroup extends WritableComparator {
        public MyGroup() {
            super(Weather.class,true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            Weather w1 = (Weather) a;
            Weather w2 = (Weather) b;
            int res1 = w1.getYear().compareTo(w2.getYear());
            if (res1 == 0)
            {
                return w1.getMonth().compareTo(w2.getMonth());
            }
            return res1;
        }
    }
  • 다음은 저희 reducer
  • 입 니 다.
    public class MyReducer extends Reducer<Weather, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Weather key, Iterable values, Context context) throws IOException, InterruptedException {
    
            int i = 0;
            for (IntWritable t : values) {
                if (i++ == 2) {
                    //       ok
                    break;
                }
                // 
                String val = key.getYear()+"-"+key.getMonth()+"-"+key.getDay();
                context.write(new Text(val), t);
            }
        }
    }
  • 마지막 으로 우리 의 설정 을 써 서 실행 합 시다
  • 
    public class RunJob {
    
        static Configuration conf;
    
        public static void main (String[] args) {
            //       
            try {
                conf = new Configuration();
                //        
                conf.set("fs.defaultFS", "hdfs://localhost:9000");
                conf.set("yarn.resourcemanager.hostname", "localhost");
                //    hdfs
                FileSystem fs = FileSystem.get(conf);
                //   job  
                Job job = Job.getInstance(conf);
    
                job.setJarByClass(RunJob.class);
                job.setMapperClass(MyMapper.class);
                job.setPartitionerClass(MyPartitioner.class);
                job.setSortComparatorClass(MySort.class);
                job.setGroupingComparatorClass(MyGroup.class);
                job.setReducerClass(MyReducer.class);
                job.setMapOutputKeyClass(Weather.class);
                job.setMapOutputValueClass(IntWritable.class);
                job.setNumReduceTasks(3);
                //     ,        
                // hdfs dfs -put data /weather/input/data
                Path input = new Path("/weather/input/data");
                if (!fs.exists(input)) {
                    System.out.println("       !");
                    System.exit(1);
                }
                FileInputFormat.addInputPath(job, input);
    
                Path output = new Path("/weather/output");
                //          
                if (fs.exists(output)) {
                    fs.delete(output, true);
                }
                //         
                FileOutputFormat.setOutputPath(job, output);
                boolean res = job.waitForCompletion(true);
                if(res){
                    System.out.println("job     ");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    테스트, main 실행, 도착 [http://localhost:8088/cluster/apps/RUNNING ] 퀘 스 트 보기
    출력 파일 보기
    hdfs dfs -ls /weather/output 
    

    출력 데이터 보기
    hdfs dfs -cat /weather/output/part-r-00000
    hdfs dfs -cat /weather/output/part-r-00001
    hdfs dfs -cat /weather/output/part-r-00002
    

    예상 데이터 대비, ok 완료
    코드 다운로드 주소 [https://github.com/qn9301/bigdata-learn ] 마음 에 드 시 면 스타 를 환영 합 니 다. 제 가 하 겠 습 니 다. 제 가 공부 하 는 과정 을 모두 기록 하고 공부 하고 싶 은 친구 들 은 같이 참고 할 수 있 습 니 다.

    좋은 웹페이지 즐겨찾기