Hadoop - mapreduce 사례 - map 엔 드 조인

주문 데이터 orders. txt
1001    pd001    300
1002    pd002    20
1003    pd003    40
1004    pd002    50

상품 데이터 pdts. txt
pd001    apple
pd002    xiaomi
pd003    cuizi

경사 문제: 전자상거래 플랫폼 에 서 는 샤 오미 휴대 전화 와 애플 휴대 전 화 를 사 는 주문 수량 이 많 고 망치 휴대 전 화 를 사 는 주문 수량 이 적 으 며 전통 적 인 Mapreduce 방법 에 따라 3 개의 reduce 데 이 터 는 불 균형 할 것 이다.예 를 들 어 샤 오미 의 reduce 를 받 으 면 받 는 데이터 가 많 고 망치 데 이 터 를 받 는 reduce 는 받 는 데이터 가 적다.
해결 방향: map 엔 드 연결 을 사용 하여 정렬 과정 을 map 에서 직접 실행 하고 상품 정 보 를 map 정보 에 불 러 와 maprediuce 의 입력 캐 시 체 제 를 도입 합 니 다.
기술 난점: 어떻게 상품 정 보 를 maptask 에 불 러 옵 니까?
해결 방법: distributedcache. addCacheFile () 을 사용 하여 join 이 필요 한 다른 파일 을 모든 Map 캐 시 에 추가 합 니 다. distributedcache 는 사용자 가 응용 프로그램 개발 을 편리 하 게 하기 위해 디자인 된 파일 배포 도구 입 니 다.이것 은 task 가 실 행 될 때 불 러 올 수 있 도록 읽 기 전용 외부 파일 을 각 노드 에 자동 으로 나 누 어 로 컬 캐 시 를 할 수 있 습 니 다.
코드 구현:
맵 엔 드 로 연결 할 때 reduce 를 적용 하지 않 을 수 있 습 니 다. 이 때 reducetask 의 수량 을 0 으로 설정 할 수 있 습 니 다.
package com.tianjie.mapsidejoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapSideJoin {

    static class MapSideJoinMappe extends Mapper{
        
        
        //map        k v key     ,v     
        Map     pdInfoMap = new HashMap();
        Text ktext = new Text();
        
        
        /*setup          hadoop   
         * */
        protected void setup(Mapper.Context context)
                throws IOException, InterruptedException {
            
            //           ,       
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("C:/Users/admin/Desktop/join/cache/pdts.txt")));
            String line;
            while(StringUtils.isNotEmpty(line = br.readLine())){
                //        k     ,value     
                String[] split = line.split("\t");
                pdInfoMap.put(split[0], split[1]);
                
            }
            
        }
        /*
         * hadoop      */
        
        
        /*
         * map      key value ,       TextInputFormat,
         *     key          ,value       
         *     Text,NullWriable map      
         *     */
        protected void map(LongWritable key, Text value, Mapper.Context context)
                throws IOException, InterruptedException {
            
            //         
            String orderline  = value.toString();
            //            
            String[] fields = orderline.split("\t");
            //      ,      
            String pdName = pdInfoMap.get(fields[1]);
            //       ,             
            ktext.set(orderline+"\t"+pdName);
            //         
            context.write(ktext, NullWritable.get());
        }
        
    }
        
    
    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        
        //  hadoop       
        Configuration conf = new Configuration();
        //    job  
        Job job = Job.getInstance(conf);
        //  job    
        job.setJarByClass(MapSideJoin.class);
        
        //  mapper  
        job.setMapperClass(MapSideJoinMappe.class);
        //  mapper      
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        //         
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        
        //         
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(args[1]);
        if(fs.isDirectory(path)){
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //             maptask        
        //job.addArchiveToClassPath("");   jar  task     classpath 
        //job.addFileToClassPath(file);        task     classpath 
        //job.addCacheArchive(uri);              task          
        
        //1:       task         
        job.addCacheFile(new URI("file:///C:/Users/admin/Desktop/join/cache/pdts.txt")); 
        
        //2:  map         reduce  ,  reducetask   0
        job.setNumReduceTasks(0);
        
        //  job  ,  job     
        boolean res =job.waitForCompletion(true);
        System.exit(res?1:0);
        
        }
}

원문:https://www.cnblogs.com/fengdashen/p/6610953.html

좋은 웹페이지 즐겨찾기