MapReduce 가져오기 파일에서 Mapper 프로세스 간 프로세스
15896 단어 mapreduce
FileInputFormat.setInputPaths(job, new Path(input)); // MapReduce
job.waitForCompletion(true);
2. InputFormat 분석
public abstract class InputFormat<K, V> {
// , ,
public abstract List<InputSplit> getSplits(JobContext context);
// RecordReader, InputSplit
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) ;
}
각기 다른 InputFormat 는 각기 다른 파일 읽기 방식 과 분할 방식 을 실현하고, 각각의 입력 분할 (InputSplit) 은 별도의 맵 task 로 데이터 원본 이 된다
3、InputSplit
Mapper의 입력은 하나의 입력 슬라이스(InputSplit)
public abstract class InputSplit {
public abstract long getLength();
public abstract String[] getLocations();
}
public class FileSplit extends InputSplit implements Writable{
private Path file; //
private long start; //
private long length; //
private String[] hosts; // hosts
public FileSplit(Path file, long start, long length, String[] hosts) {
this.file = file;
this.start = start;
this.length = length;
this.hosts = hosts;
}
}
File Split은 Mapper의 입력 파일에 대응합니다. 이 파일이 아무리 작아도 단독 Input Split으로 처리됩니다.입력 파일이 대량의 작은 파일로 구성된 장면에서 대량의 인풋 스플릿이 생겨 대량의 Mapper 처리가 필요하다.엄청난 수의 Mapper Task 생성 및 제거 오버헤드가 발생합니다.CombineFileSplit을 사용하여 여러 개의 작은 파일을 병합하여 Mapper Task에서 처리할 수 있습니다.
4、FileInputFormat
public List<InputSplit> getSplits(JobContext job) throws IOException {
/**
* getFormatMinSplitSize() = 1
* job.getConfiguration().getLong(SPLIT_MINSIZE, 1L)
* SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"
* mapred-default.xml 0
*/
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // : max(1,0) = 1
/**
* SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"
* mapred-default.xml
*/
long maxSize = getMaxSplitSize(job); // :Long.MAX_VALUE
//
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
...
if (isSplitable(job, path)) { //
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);{
//max(1, min(Long.MAX_VALUE, 64M)) = 64M splitSize=blockSize
return Math.max(minSize, Math.min(maxSize, blockSize));
}
// , Split_Slop , , ,
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP = 1.1
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
//
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));
}
} else { // , ( )
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
}
} else {
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); //
LOG.debug("Total # of splits: " + splits.size());
return splits;
}
5、PathFilter
protected List<FileStatus> listStatus(JobContext job) throws IOException {
......
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
......
}
PathFilter 파일 필터 인터페이스는 어떤 파일을 입력으로 하고 어떤 파일을 입력으로 하지 않는지 제어할 수 있습니다.PathFilter는 수신한 Path가 포함되려면true를 되돌려주고 그렇지 않으면false를 되돌려주는 accept 방법이 있습니다.
public interface PathFilter {
boolean accept(Path path);
}
// _ .
private static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};
6、RecordReader
RecordReader는 InputSplit을 KEY-VALUE 쌍으로 분할합니다.
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
//InputSplit
public abstract void initialize(InputSplit split,TaskAttemptContext context) ;
// <key, value>
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
// KEY
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
// VALUE
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
//
public abstract float getProgress() throws IOException, InterruptedException;
// RecordReader
public abstract void close() throws IOException;
}
7、Mapper
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
// , map task
protected void setup(Context context) throws IOException, InterruptedException {
}
// InputSplit <key, value>
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
// ,
protected void cleanup(Context context) throws IOException, InterruptedException {
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
템플릿 모드의 응용:run 방법: 1)setup2) InputSplit에서 얻은 KV 호출맵 함수 처리 3)cleanup
이로써 MapReduce의 입력 파일이 어떻게 필터되고, 분할되고, 읽히고, 읽히는지, 그리고 Mapper 클래스에 맡겨 처리되는지 완성되었다
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
MongoDB mapreduce 인스턴스var action_count_map = function(){ var action_count_reduce = function(key, values){ db.log.mapReduce(action_count_map, a...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.