Hadoop - mapreduce 사례 - map 엔 드 조인
1001 pd001 300
1002 pd002 20
1003 pd003 40
1004 pd002 50
상품 데이터 pdts. txt
pd001 apple
pd002 xiaomi
pd003 cuizi
경사 문제: 전자상거래 플랫폼 에 서 는 샤 오미 휴대 전화 와 애플 휴대 전 화 를 사 는 주문 수량 이 많 고 망치 휴대 전 화 를 사 는 주문 수량 이 적 으 며 전통 적 인 Mapreduce 방법 에 따라 3 개의 reduce 데 이 터 는 불 균형 할 것 이다.예 를 들 어 샤 오미 의 reduce 를 받 으 면 받 는 데이터 가 많 고 망치 데 이 터 를 받 는 reduce 는 받 는 데이터 가 적다.
해결 방향: map 엔 드 연결 을 사용 하여 정렬 과정 을 map 에서 직접 실행 하고 상품 정 보 를 map 정보 에 불 러 와 maprediuce 의 입력 캐 시 체 제 를 도입 합 니 다.
기술 난점: 어떻게 상품 정 보 를 maptask 에 불 러 옵 니까?
해결 방법: distributedcache. addCacheFile () 을 사용 하여 join 이 필요 한 다른 파일 을 모든 Map 캐 시 에 추가 합 니 다. distributedcache 는 사용자 가 응용 프로그램 개발 을 편리 하 게 하기 위해 디자인 된 파일 배포 도구 입 니 다.이것 은 task 가 실 행 될 때 불 러 올 수 있 도록 읽 기 전용 외부 파일 을 각 노드 에 자동 으로 나 누 어 로 컬 캐 시 를 할 수 있 습 니 다.
코드 구현:
맵 엔 드 로 연결 할 때 reduce 를 적용 하지 않 을 수 있 습 니 다. 이 때 reducetask 의 수량 을 0 으로 설정 할 수 있 습 니 다.
package com.tianjie.mapsidejoin;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapSideJoin {
static class MapSideJoinMappe extends Mapper{
//map k v key ,v
Map pdInfoMap = new HashMap();
Text ktext = new Text();
/*setup hadoop
* */
protected void setup(Mapper.Context context)
throws IOException, InterruptedException {
// ,
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("C:/Users/admin/Desktop/join/cache/pdts.txt")));
String line;
while(StringUtils.isNotEmpty(line = br.readLine())){
// k ,value
String[] split = line.split("\t");
pdInfoMap.put(split[0], split[1]);
}
}
/*
* hadoop */
/*
* map key value , TextInputFormat,
* key ,value
* Text,NullWriable map
* */
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
//
String orderline = value.toString();
//
String[] fields = orderline.split("\t");
// ,
String pdName = pdInfoMap.get(fields[1]);
// ,
ktext.set(orderline+"\t"+pdName);
//
context.write(ktext, NullWritable.get());
}
}
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// hadoop
Configuration conf = new Configuration();
// job
Job job = Job.getInstance(conf);
// job
job.setJarByClass(MapSideJoin.class);
// mapper
job.setMapperClass(MapSideJoinMappe.class);
// mapper
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//
FileInputFormat.setInputPaths(job, new Path(args[0]));
//
FileSystem fs = FileSystem.get(conf);
Path path = new Path(args[1]);
if(fs.isDirectory(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// maptask
//job.addArchiveToClassPath(""); jar task classpath
//job.addFileToClassPath(file); task classpath
//job.addCacheArchive(uri); task
//1: task
job.addCacheFile(new URI("file:///C:/Users/admin/Desktop/join/cache/pdts.txt"));
//2: map reduce , reducetask 0
job.setNumReduceTasks(0);
// job , job
boolean res =job.waitForCompletion(true);
System.exit(res?1:0);
}
}
원문:https://www.cnblogs.com/fengdashen/p/6610953.html
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.