자바 가 작성 한 hadop wordcount, 단일 MR 작업 은 주파수 순 으로 출력 결 과 를 수행 합 니 다.

10493 단어 hadoop
이전에 MR 을 쓰 는 작업 은 모두 Streamming 방식 으로 python 언어 로 작성 되 었 기 때문에 전체 MR 의 과정 에 대한 세부 사항 에 대한 요구 가 높 지 않 고 이해 할 필요 도 없습니다.그러나 자 바 는 hadop 의 네 이 티 브 언어 로 서 성능 효율, 규범 성, 출력 도구 의 용이 성과 완전 성에 있어 서 모두 python 과 비교 할 수 없 기 때문에 자바 로 MR 작업 을 작성 하 는 방법 을 배 웁 니 다.
첫 번 째 워드 카운터 임 무 는 번 거 로 움 을 만 났 다. 단순 한 주파수 통 계 를 하 는 것 은 매우 간단 하지만 마지막 결 과 를 주파수 순 으로 정렬 하여 역순 으로 출력 하려 면 비교적 번거롭다.자 료 를 찾 아 보 니 솔 루 션 은 MR 작업 을 하나 더 쓰 고 hadop 이 자체 적 으로 가지 고 있 는 key - value 호 환 Map (Invert) 을 이용 하여 키 값 을 호 환 한 다음 hadop 의 map 단계 에 따라 키 값 에 따라 정렬 하 는 메커니즘 에 따라 정렬 하 는 것 이 었 습 니 다. 물론 비교 기 를 역순 으로 정렬 하 는 것 을 수정 해 야 합 니 다.이러한 방법 은 가능 하지만 2 개의 MR 작업 이 필요 합 니 다. 중간 결 과 는 Sequsen 방식 으로 저장 하고 읽 을 수 있 지만 약간 복잡 합 니 다. 따라서 데 이 터 를 모두 하나의 reducer 에 읽 고 처리 할 때 hashtable 을 이용 하여 저장 하고 reducer 청소 단계 에서 정렬 하여 FileSystem 을 받 아 결 과 를 HDFS 에 기록 하 는 것 을 고려 합 니 다.
전체적으로 볼 때 이 프로그램 은 매우 간결 하지만 단점 도 있다. 데이터 양 이 매우 많 을 때 하나의 reducer 가 정렬 하면 메모리 와 cpu 에 큰 부담 을 줄 수 있다 는 것 이다.
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.*;

/**
 * Created by yuanye8 on 2016/10/10.
 */
public class WordCountExample {

    public static class WordCountMapper implements Mapper<LongWritable, Text, Text, IntWritable> {
        private Text key = new Text();
        private IntWritable value = new IntWritable();

        public void configure(JobConf jobConf) {

        }

        public void map(LongWritable longWritable, Text text, OutputCollector outputCollector, Reporter reporter) throws IOException {
            String[] words = text.toString().trim().split(" ");
            key = new Text();
            for (String word : words) {
                key.set(word);
                value.set(1);
                outputCollector.collect(key, value);
            }
        }

        public void close() throws IOException {

        }
    }

    public static class WordCountReducer implements Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable value = new IntWritable();
        private Hashtable hashtable = new Hashtable();
        private String output_path = null;

        public void configure(JobConf jobConf) {
            this.output_path = jobConf.get("output_path");
        }

        public void reduce(Text text, Iterator iterator, OutputCollector outputCollector, Reporter reporter) throws IOException {
            int sum = 0;
            while (iterator.hasNext()) {
                int tmp = iterator.next().get();
                sum += tmp;
            }
            value.set(sum);
            hashtable.put(text.toString(), sum);
            outputCollector.collect(text, value);

        }

        public void close() throws IOException {
            outputSortResult();
        }

        public void outputSortResult() {
            FileSystem hdfs = null;
            BufferedWriter bw = null;
            try {
                hdfs = FileSystem.get(new JobConf());
                bw = new BufferedWriter(
                        new OutputStreamWriter(
                                hdfs.create(new Path(output_path + "_sort"), true)));

                Set> set = this.hashtable.entrySet();
                Map.Entry[] entries = set.toArray(new Map.Entry[set.size()]);
                Arrays.sort(entries, new Comparator() {
                    public int compare(Map.Entry o1, Map.Entry o2) {
                        int v1 = (Integer) o1.getValue();
                        int v2 = (Integer) o2.getValue();
                        return v2 - v1;
                    }
                });

                for (Map.Entry entry : entries) {
                    String key = entry.getKey();
                    int value = entry.getValue();
                    bw.write(key + "\t" + value + "
"
); } bw.flush(); } catch (IOException e) { e.printStackTrace(); } finally { FileTool.close(bw); } } } public static void main(String[] args) throws IOException { if (args.length < 2) { System.out.println("Usage : WordCountExample "); System.exit(-1); } Path input_path = new Path(args[0]); Path output_path = new Path(args[1]); JobConf conf = new JobConf(); conf.set("output_path", args[1]); conf.setJobName("WordCount"); conf.setJarByClass(WordCountExample.class); conf.setMapperClass(WordCountMapper.class); conf.setReducerClass(WordCountReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); TextInputFormat.addInputPath(conf, input_path); FileOutputFormat.setOutputPath(conf, output_path); conf.setCombinerClass(WordCountReducer.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setNumReduceTasks(1); RunningJob runningJob = JobClient.runJob(conf); runningJob.waitForCompletion(); } }

마지막 으로 해결 되 지 않 은 질문 을 기록 합 니 다. 파일 읽 기 흐름 을 닫 을 때 FileTool. close (bw);FileTool. close (bw, hdfs) 였 습 니 다.그러나 오 류 를 보고 할 수 있 습 니 다. 오 류 는 map 단계 의 FileSystem Closed 라 는 오류 입 니 다.그러나 내 가 닫 은 것 은 reduce 의 청소 단계, 즉 close () 에서 이 루어 진 것 이 므 로 이 오 류 를 보고 해 서 는 안 된다.
  • 그러면 reduce 정리 단계 에서 파일 시스템 을 닫 았 을 때 전체 작업 이 아직 끝나 지 않 았 고 map 단계 에서 도 파일 시스템 을 다시 사용 하여 작업 을 해 야 할 것 같 습 니 다.
  • 다른 소스 코드 를 읽 고 주 프로그램 에서 열 린 파일 시스템 FileSystem 에 대해 마지막 에 닫 히 지 않 은 동작 인 데 MR 작업 이 완료 되면 자동 으로 파일 시스템 을 닫 습 니까?프로그래머 가 자동 으로 꺼 지지 않 아 도 됩 니까?

  • 제 문 제 를 잘 이해 해 주신 다 면 댓 글 을 남 겨 주세요. 감사합니다!

    좋은 웹페이지 즐겨찾기