hadop 2.x 의 IO:MapReduce 압축

18310 단어 hadoop
앞에서 우 리 는 hadop 의 압축 에 대해 말 했 습 니 다.Hadoop 에서 실행 하 는 데 이 터 는 보통 매우 크 고 입력 한 데이터 가 매우 크 며 출력 한 데이터 도 매우 큽 니 다.따라서 우 리 는 맵 과 Reduce 의 데 이 터 를 압축 하여 저장 할 필요 가 있다.
Reduce 를 압축 하려 면 두 가지 방법 이 있 습 니 다.하 나 는 Configuration 설정 을 사용 하 는 것 입 니 다.다른 하 나 는 FileOutputFormat 클래스 로 출력 을 설정 하 는 것 입 니까?
1.Reduce 압축(Configuration 사용)
Configuration 을 사용 하려 면mapred.output.compresstrue로 설정 해 야 합 니 다.우리 가 설정 하고 싶 은 codec 의 클래스 이름 을 설정 합 니 다.예 를 들 면:
Job 프로그램:mapred.output.compression.codec
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MaxTemperatureWithCompression {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperature  ");
            System.exit(-1);
        }
        Configuration conf = new Configuration();
        //       
        conf.setBoolean("mapred.output.compress", true);
        conf.set("mapred.output.compression.codec", GzipCodec.class.getName());
        conf.set("mapred.jar", "MaxTemperature.jar");
        Job job = Job.getInstance(conf);

        job.setJarByClass(MaxTemperatureWithCompression.class);
        job.setJobName("Max temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

지도 프로그램:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
        Mapper {
    private static final int MISSING = 9999;

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String year = line.substring(15, 23);
        int airTemperature;
        if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
                                        // signs
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
        if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

감소 프로그램
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends
        Reducer {
    @Override
    public void reduce(Text key, Iterable values, Context context)
            throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}

컴 파일 패키지 실행...
[grid@tiny01 myclass]$ hadoop fs -ls /
Found 5 items
-rw-r--r--   1 grid supergroup      49252 2017-07-29 00:07 /data.txt
-rw-r--r--   1 grid supergroup 4848295796 2017-07-01 00:40 /input
drwx------   - grid supergroup          0 2017-07-01 00:42 /tmp
drwxr-xr-x   - grid supergroup          0 2017-07-01 00:42 /user
[grid@tiny01 myclass]$ hadoop jar MaxTemperature.jar MaxTemperatureWithCompression /data.txt /out
[grid@tiny01 myclass]$ hadoop fs -cat /out/part-r-00000.gz |gunzip
20160622        380
20160623        310

Reduce 결과 압축 에 대한 속성:
속성 이름
유형
기본 값
묘사 하 다.
mapred.output.compress
boolean
false
압축 출력
mapred.output.compression.codec
String
org.apache.hadoop.io.compress.DefaultCodec
reduce 출력 에 사용 할 압축 codec
mapred.output.compression
String
RECORD
SqeuenceFile 의 출력 에 사용 할 압축 형식:NONE,RECORD,BLOCK
2.Reduce 를 압축(FileOutputFormat 사용)우 리 는 Job 클래스 만 수정 합 니 다.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperatureWithCompression2 {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperature  ");
            System.exit(-1);
        }
        Configuration conf = new Configuration();
        conf.set("mapred.jar", "MaxTemperature2.jar");
        Job job = Job.getInstance(conf);

        job.setJarByClass(MaxTemperatureWithCompression2.class);
        job.setJobName("Max temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //      
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

실행:
 [grid@tiny01 myclass]$ hadoop jar MaxTemperature2.jar MaxTemperatureWithCompression2 /data.txt /out2
[grid@tiny01 myclass]$ hadoop fs -cat /out2/part-r-00000.gz |gunzip
20160622        380
20160623        310

같은 거 야.
3.맵 작업 압축
맵 과 reduce 는 서로 다른 노드 에 있 기 때문에 네트워크 전송 이 필요 합 니 다.맵 작업 의 출력 이 LZO,LZ4 등 빠 른 압축 이 가능 한 알고리즘 을 사용 하면 Hadoop 의 성능 을 향상 시 킬 수 있 습 니 다.map 작업 의 압축 속성:
속성 이름
유형
기본 값
묘사 하 다.
mapred.compress.map.output
boolean
false
맵 작업 출력 압축
mapred.map.output.compression.codec
String
org.apache.hadoop.io.compress.DefaultCodec
map 출력 압축 에 사용 되 는 codec
JobConf(Configuration 의 하위 클래스)대상 을 사용 하여 9 개의 설정 을 설정 할 수 있 습 니 다.
JobConf conf = new JobConf();
conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class)
conf.set("mapred.jar", "classname.jar");
Job job = Job.getInstance(conf);

참고 자료[1]Hadoop:The Definitive Guide,Third Edition,by Tom White.Copyright 2013 Tom White,978-1-449-31152-0

좋은 웹페이지 즐겨찾기