MapReduce 가져오기 파일에서 Mapper 프로세스 간 프로세스

15896 단어 mapreduce
1. MapReduce 코드 입구
FileInputFormat.setInputPaths(job, new Path(input)); //  MapReduce    

job.waitForCompletion(true);

2. InputFormat 분석
public abstract class InputFormat<K, V> {

    //         ,      ,       

    public abstract  List<InputSplit> getSplits(JobContext context);

    

    //  RecordReader, InputSplit     

    public abstract  RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) ;

}

각기 다른 InputFormat 는 각기 다른 파일 읽기 방식 과 분할 방식 을 실현하고, 각각의 입력 분할 (InputSplit) 은 별도의 맵 task 로 데이터 원본 이 된다
3、InputSplit
Mapper의 입력은 하나의 입력 슬라이스(InputSplit)
public abstract class InputSplit {

  public abstract long getLength();

  public abstract String[] getLocations();

}



public class FileSplit extends InputSplit implements Writable{

    private Path file; //    

    private long start; //      

    private long length;  //    

    private String[] hosts; //     hosts

    

    public FileSplit(Path file, long start, long length, String[] hosts) {

        this.file = file;

        this.start = start;

        this.length = length;

        this.hosts = hosts;

    }

}

File Split은 Mapper의 입력 파일에 대응합니다. 이 파일이 아무리 작아도 단독 Input Split으로 처리됩니다.입력 파일이 대량의 작은 파일로 구성된 장면에서 대량의 인풋 스플릿이 생겨 대량의 Mapper 처리가 필요하다.엄청난 수의 Mapper Task 생성 및 제거 오버헤드가 발생합니다.CombineFileSplit을 사용하여 여러 개의 작은 파일을 병합하여 Mapper Task에서 처리할 수 있습니다.
4、FileInputFormat
public List<InputSplit> getSplits(JobContext job) throws IOException {

    /**

     * getFormatMinSplitSize() = 1

     * job.getConfiguration().getLong(SPLIT_MINSIZE, 1L)

     * SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"

     * mapred-default.xml    0

     */

    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //        : max(1,0) = 1

    

    /**

     * SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"

     * mapred-default.xml     

     */

    long maxSize = getMaxSplitSize(job); //        :Long.MAX_VALUE

   

    //           

    List<InputSplit> splits = new ArrayList<InputSplit>();

    List<FileStatus> files = listStatus(job);

    for (FileStatus file: files) {

        Path path = file.getPath();

        long length = file.getLen();

        if (length != 0) {

            ...

            if (isSplitable(job, path)) { //   

                long blockSize = file.getBlockSize();

                long splitSize = computeSplitSize(blockSize, minSize, maxSize);{

                    //max(1, min(Long.MAX_VALUE, 64M)) = 64M      splitSize=blockSize

                    return Math.max(minSize, Math.min(maxSize, blockSize)); 

                }



                //    ,              Split_Slop ,    ,     ,    

                long bytesRemaining = length;

                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP = 1.1

                    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

                    splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));

                    bytesRemaining -= splitSize;

                }



                //       

                if (bytesRemaining != 0) {

                    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

                    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));

                }

            } else { //     ,    (             )

                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));

            }

        } else { 

            splits.add(makeSplit(path, 0, length, new String[0]));

        }

    }

    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); //         

    LOG.debug("Total # of splits: " + splits.size());

    return splits;

}

5、PathFilter
protected List<FileStatus> listStatus(JobContext job) throws IOException {

    ......

    List<PathFilter> filters = new ArrayList<PathFilter>();

    filters.add(hiddenFileFilter);

    PathFilter jobFilter = getInputPathFilter(job);

    if (jobFilter != null) {

      filters.add(jobFilter);

    }

    PathFilter inputFilter = new MultiPathFilter(filters);

    ......

}

PathFilter 파일 필터 인터페이스는 어떤 파일을 입력으로 하고 어떤 파일을 입력으로 하지 않는지 제어할 수 있습니다.PathFilter는 수신한 Path가 포함되려면true를 되돌려주고 그렇지 않으면false를 되돌려주는 accept 방법이 있습니다.
public interface PathFilter {

    boolean accept(Path path);

}



//       _  .     

private static final PathFilter hiddenFileFilter = new PathFilter(){

    public boolean accept(Path p){

        String name = p.getName(); 

        return !name.startsWith("_") && !name.startsWith("."); 

    }

}; 

6、RecordReader
RecordReader는 InputSplit을 KEY-VALUE 쌍으로 분할합니다.
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {

    //InputSplit   

    public abstract void initialize(InputSplit split,TaskAttemptContext context) ;

    

    //       <key, value> 

    public abstract boolean nextKeyValue() throws IOException, InterruptedException;

    

    //        KEY

    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

    

    //        VALUE

     public abstract  VALUEIN getCurrentValue() throws IOException, InterruptedException;

    

    //         

    public abstract float getProgress() throws IOException, InterruptedException;

    

    //  RecordReader

    public abstract void close() throws IOException;

}

7、Mapper
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

    public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

    }

  

    //   ,  map task       

    protected void setup(Context context) throws IOException, InterruptedException {

    }



    //  InputSplit     <key, value>      

    protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {

        context.write((KEYOUT) key, (VALUEOUT) value);

    }



    //

    protected void cleanup(Context context) throws IOException, InterruptedException {

    }

  

    public void run(Context context) throws IOException, InterruptedException {

        setup(context);

        try {

            while (context.nextKeyValue()) {

                map(context.getCurrentKey(), context.getCurrentValue(), context);

            }

        } finally {

            cleanup(context);

        }

    }

}

템플릿 모드의 응용:run 방법: 1)setup2) InputSplit에서 얻은 KV 호출맵 함수 처리 3)cleanup
 
이로써 MapReduce의 입력 파일이 어떻게 필터되고, 분할되고, 읽히고, 읽히는지, 그리고 Mapper 클래스에 맡겨 처리되는지 완성되었다
 
 

좋은 웹페이지 즐겨찾기