자바 가 작성 한 hadop wordcount, 단일 MR 작업 은 주파수 순 으로 출력 결 과 를 수행 합 니 다.
10493 단어 hadoop
첫 번 째 워드 카운터 임 무 는 번 거 로 움 을 만 났 다. 단순 한 주파수 통 계 를 하 는 것 은 매우 간단 하지만 마지막 결 과 를 주파수 순 으로 정렬 하여 역순 으로 출력 하려 면 비교적 번거롭다.자 료 를 찾 아 보 니 솔 루 션 은 MR 작업 을 하나 더 쓰 고 hadop 이 자체 적 으로 가지 고 있 는 key - value 호 환 Map (Invert) 을 이용 하여 키 값 을 호 환 한 다음 hadop 의 map 단계 에 따라 키 값 에 따라 정렬 하 는 메커니즘 에 따라 정렬 하 는 것 이 었 습 니 다. 물론 비교 기 를 역순 으로 정렬 하 는 것 을 수정 해 야 합 니 다.이러한 방법 은 가능 하지만 2 개의 MR 작업 이 필요 합 니 다. 중간 결 과 는 Sequsen 방식 으로 저장 하고 읽 을 수 있 지만 약간 복잡 합 니 다. 따라서 데 이 터 를 모두 하나의 reducer 에 읽 고 처리 할 때 hashtable 을 이용 하여 저장 하고 reducer 청소 단계 에서 정렬 하여 FileSystem 을 받 아 결 과 를 HDFS 에 기록 하 는 것 을 고려 합 니 다.
전체적으로 볼 때 이 프로그램 은 매우 간결 하지만 단점 도 있다. 데이터 양 이 매우 많 을 때 하나의 reducer 가 정렬 하면 메모리 와 cpu 에 큰 부담 을 줄 수 있다 는 것 이다.
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.*;
/**
* Created by yuanye8 on 2016/10/10.
*/
public class WordCountExample {
public static class WordCountMapper implements Mapper<LongWritable, Text, Text, IntWritable> {
private Text key = new Text();
private IntWritable value = new IntWritable();
public void configure(JobConf jobConf) {
}
public void map(LongWritable longWritable, Text text, OutputCollector outputCollector, Reporter reporter) throws IOException {
String[] words = text.toString().trim().split(" ");
key = new Text();
for (String word : words) {
key.set(word);
value.set(1);
outputCollector.collect(key, value);
}
}
public void close() throws IOException {
}
}
public static class WordCountReducer implements Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable value = new IntWritable();
private Hashtable hashtable = new Hashtable();
private String output_path = null;
public void configure(JobConf jobConf) {
this.output_path = jobConf.get("output_path");
}
public void reduce(Text text, Iterator iterator, OutputCollector outputCollector, Reporter reporter) throws IOException {
int sum = 0;
while (iterator.hasNext()) {
int tmp = iterator.next().get();
sum += tmp;
}
value.set(sum);
hashtable.put(text.toString(), sum);
outputCollector.collect(text, value);
}
public void close() throws IOException {
outputSortResult();
}
public void outputSortResult() {
FileSystem hdfs = null;
BufferedWriter bw = null;
try {
hdfs = FileSystem.get(new JobConf());
bw = new BufferedWriter(
new OutputStreamWriter(
hdfs.create(new Path(output_path + "_sort"), true)));
Set> set = this.hashtable.entrySet();
Map.Entry[] entries = set.toArray(new Map.Entry[set.size()]);
Arrays.sort(entries, new Comparator() {
public int compare(Map.Entry o1, Map.Entry o2) {
int v1 = (Integer) o1.getValue();
int v2 = (Integer) o2.getValue();
return v2 - v1;
}
});
for (Map.Entry entry : entries) {
String key = entry.getKey();
int value = entry.getValue();
bw.write(key + "\t" + value + "
");
}
bw.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
FileTool.close(bw);
}
}
}
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage : WordCountExample );
System.exit(-1);
}
Path input_path = new Path(args[0]);
Path output_path = new Path(args[1]);
JobConf conf = new JobConf();
conf.set("output_path", args[1]);
conf.setJobName("WordCount");
conf.setJarByClass(WordCountExample.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(WordCountReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
TextInputFormat.addInputPath(conf, input_path);
FileOutputFormat.setOutputPath(conf, output_path);
conf.setCombinerClass(WordCountReducer.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setNumReduceTasks(1);
RunningJob runningJob = JobClient.runJob(conf);
runningJob.waitForCompletion();
}
}
마지막 으로 해결 되 지 않 은 질문 을 기록 합 니 다. 파일 읽 기 흐름 을 닫 을 때 FileTool. close (bw);FileTool. close (bw, hdfs) 였 습 니 다.그러나 오 류 를 보고 할 수 있 습 니 다. 오 류 는 map 단계 의 FileSystem Closed 라 는 오류 입 니 다.그러나 내 가 닫 은 것 은 reduce 의 청소 단계, 즉 close () 에서 이 루어 진 것 이 므 로 이 오 류 를 보고 해 서 는 안 된다.
제 문 제 를 잘 이해 해 주신 다 면 댓 글 을 남 겨 주세요. 감사합니다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Azure HDInsight + Microsoft R Server에서 연산 처리 분산Microsoft Azure HDInsight는 Microsoft가 제공하는 Hadoop의 PaaS 서비스로 인프라 주변의 구축 노하우를 몰라도 훌륭한 Hadoop 클러스터를 구축할 수 있는 훌륭한 서비스입니다. 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.