Hadoop 시작 사례 (3) 전체 정렬의 사용자 정의 구역 숫자 정렬

요구 사항 소개
대량의 텍스트 중에는 대량의 숫자가 있기 때문에, 숫자에 대해 전체 정렬을 하고, 승차순에 따라 정렬해야 한다
테스트된 텍스트
145 
95 167 84 6 120 164 195 81 35 63 1 11 89 170 55 
58 88 125 173 2 173 129 74 69 24 107 55 149 83 178 
159 147 178 53 137 53 132 134 154 174 164 122 108 130 184 
28 129 93 157 171 127 192 86 194 41 111 114 190 98 99 
99 5 161 146 120 122 80 1 66 171 47 54 121 130 170 
125 119 8 52 182 112 146 1 198 0 149 72 56 191 48 
172 165 49 73 107 134 179 0 59 16 143 83 92 113 152 
109 118 186 186 97 117 193 67 34 152 92 179 52 51 26 
163 121 115 72 17 61 107 125 115 163 18 76 2 172 39 
190 184 73 108 7 142 68 54 60 169 71 28 141 48 139 
182 140 158 102 99 36 158 55 190 176 45 63 126 179 130 
95 22 120 109 59 78 38 13 5 88 1 87 184 83 198 
47 73 82 94 141 190 184 161 56 141 99 177 107 21 158 
71 149 61 137 

원리 분석
맵 Reduce에서 맵에서 Reduce 끝까지의 shuffle를 이용하여 정렬을 한다.Map Reduce는 각 구역의 내부 질서만 보장할 수 있지만 전체적인 질서를 보장할 수 없다. 그래서 저는 구역을 사용자 정의했다. 맵 뒤, shuffle 전에 저는 50보다 작은 것을 0구역에 놓고 50-100은 1구역에 놓고 100에서 150은 2구역에 놓고 나머지는 3구역에 두었다. 그러면 먼저 구역과 구역 사이가 전체적인 질서를 확보한다.그리고 각 구역은 각각의 shuffle를 진행하여 구역 내부를 질서정연하게 한다
코드
package com.myhadoop.mapreduce.test;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class TotalSort extends Configured implements Tool{

    public static class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable>
    {
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
        {
            String[] split = value.toString().split("\\s+");
            for (int i = 0; i try{
                    IntWritable intWritable = new IntWritable(Integer.parseInt(split[i]));
                    context.write(intWritable, intWritable);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }


        }
    }

    public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, Text>
    {
        public void reduce(IntWritable key,Iterable values,Context context) throws IOException,InterruptedException
        {
            for (IntWritable value:values) {
                context.write(value, new Text(""));
            }
        }
    }

    /*
    ·*   Partition   
     */
    public static class Partition extends Partitioner<IntWritable, IntWritable>{
        @Override
        public int getPartition(IntWritable key, IntWritable intWritable2, int i) {
            int i1=key.get();
            if(i1<50){
                return 0;
            }else if(i1<100){
                return 1;
            }else if(i1<150){
                return 2;
            }
            return 3;

        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(TotalSort.class);
        job.setJobName("TotalSort");

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        job.setPartitionerClass(Partition.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setNumReduceTasks(4);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean success = job.waitForCompletion(true);
        return success ? 0:1;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new TotalSort(), args);
        System.exit(ret);
    }
}

실행 결과
4개의 파일이 생성되었는데, part-00000, part-r-00001, part-r-00002, part-r-00003, 이 네 개의 파일 내부는 모두 질서정연하다. part-r-00000 내의 요소는 모두 part-r00001의 요소보다 작고, 다른 것은 이것으로 추정된다.
총결산
사용자 정의 구역을 이용하여 전체적인 질서를 확보하고mapreduce 내부의 shuffle를 이용하여 키를 정렬하여 국부적인 질서를 확보하여 전체 정렬을 실현하였다.

좋은 웹페이지 즐겨찾기