hadop join 의 reduce side join

이 실례 를 소개 하기 전에 여러분 이 참고 하 시기 바 랍 니 다.http://bjyjtdj.iteye.com/blog/1453410.
reduce side join 은 가장 간단 한 join 방식 으로 그 주요 사상 은 다음 과 같다. map 단계 에서 map 함 수 는 두 개의 파일 File1 과 File2 를 동시에 읽 습 니 다. 두 가지 소스 의 key/value 데 이 터 를 구분 하기 위해 모든 데이터 에 탭 (tag) 을 합 니 다. 예 를 들 어 tag = 0 은 파일 File1, tag = 2 는 파일 File2 에서 온 것 을 표시 합 니 다.즉, map 단계 의 주요 임 무 는 서로 다른 파일 의 데이터 에 라벨 을 붙 이 는 것 입 니 다.reduce 단계 에서 reduce 함 수 는 key 와 같은 File1 과 File2 파일 의 value list 를 가 져 온 다음 같은 key 에 대해 File1 과 File2 의 데 이 터 를 join (피리 칼 곱 하기) 합 니 다.즉, reduce 단계 에서 실제 연결 작업 을 하 는 것 이다.이 예 에서 우 리 는 두 개의 데이터 파일 이 다음 과 같다 고 가정 합 니 다.
user. csv 파일:
"ID","NAME","SEX""1","user1","0""2","user2","0""3","user3","0""4","user4","1""5","user5","0""6","user6","0""7","user7","1""8","user8","0""9","user9","0"
order. csv 파일:
"USER_ID","NAME""1","order1""2","order2""3","order3""4","order4""7","order7""8","order8""9","order9"
현재 인터넷 의 예 는 대부분이 0.20 이전 버 전의 API 를 바탕 으로 쓴 것 이기 때문에 우 리 는 새로운 API 를 사용 하여 쓴다. 구체 적 인 코드 는 다음 과 같다.
public class MyJoin
{
    public static class MapClass extends 
        Mapper<LongWritable, Text, Text, Text>
    {

        //   map       ,   map          
        private Text key = new Text();
        private Text value = new Text();
        private String[] keyValue = null;
        
        @Override
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException
        {
            //          TextInputFormat,
            //                    ,
            //key       (   ,LongWritable  ),
            //value       ,Text  ,      key value     
            keyValue = value.toString().split(",", 2);
            this.key.set(keyValue[0]);
            this.value.set(keyValue[1]);
            context.write(this.key, this.value);
        }
        
    }
    
    public static class Reduce extends Reducer<Text, Text, Text, Text>
    {

        //   reduce       ,   reduce          
        private Text value = new Text();
        
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException
        {
            StringBuilder valueStr = new StringBuilder();
            
            //values                   key  
            //  map          key value   
            for(Text val : values)
            {
                valueStr.append(val);
                valueStr.append(",");
            }
            
            this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());
            context.write(key, this.value);
        }
        
    }
    
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "MyJoin");
        
        job.setJarByClass(MyJoin.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
        //job.setCombinerClass(Reduce.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        //    TextInputFormat TextOutputFormat            
        //     ,   Hadoop       
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

좋은 웹페이지 즐겨찾기