MapReduce join 작업 실현
데이터 준비
우선 데 이 터 를 준비 하 세 요.이것 은 이미 숙련 된 과정 입 니 다. 예제 데 이 터 를 준비 하고 경로 와 필드 구분 자 를 기억 하 는 것 입 니 다.
다음 두 장의 시 계 를 준비 하 세 요.
(1)m_ys_lab_jointest_a (이하 표 A 로 약칭)
탭 문 구 는 다음 과 같 습 니 다.
create table if not exists m_ys_lab_jointest_a (
id bigint,
name string
)
row format delimited
fields terminated by '9'
lines terminated by '10'
stored as textfile;
데이터:
id name
1 중국 북경
2 중국 천진
3 하북성
4 산 시 성 (산서성) 2
5 내몽고
6 랴오닝 성 2
7 길림성
8 흑룡강
(2)m_ys_lab_jointest_b (이하 표 B 로 약칭)
탭 문 구 는 다음 과 같 습 니 다.
create table if not exists m_ys_lab_jointest_b (
id bigint,
statyear bigint,
num bigint
)
row format delimited
fields terminated by '9'
lines terminated by '10'
stored as textfile;
데이터:
id statyear num
1 2010 1962
1 2011 2019
2 2010 1299
2 2011 1355
4 2010 3574
4 2011 3593
9 2010 2303
9 2011 2347
우리 의 목적 은 id 를 key 로 join 작업 을 하고 다음 표를 얻 는 것 입 니 다.
m_ys_lab_jointest_ab
id name statyear num
1 중국 북경 2011 2019
1 중국 북경 2010 1962
2 중국 천진 2011 1355
2 중국 천진 2010 1299
4 산 시 성 (산서성) 2 2011 3593
4 산 시 성 (산서성) 2 2010 3574
계산 모형
전체 계산 과정 은:
(1) map 단계 에서 모든 기록 을 < key, value > 형식 으로 표시 합 니 다. 그 중에서 key 는 id 이 고 value 는 출처 에 따라 서로 다른 형식 을 가 집 니 다. 표 A 의 기록 에서 기원 되 고 value 의 값 은 'a \ #' + name 입 니 다.표 B 의 기록 에서 유래 했 습 니 다. value 의 값 은 'b \ #' + score 입 니 다.
(2) reduce 단계 에서 각 key 아래 의 value 목록 을 각각 표 A 와 표 B 의 두 부분 으로 나 누 어 각각 두 개의 벡터 에 넣는다.그리고 두 개의 벡터 를 옮 겨 다 니 며 피리 칼 적 을 만들어 하나의 최종 결 과 를 이룬다.
다음 그림 에서 보 듯 이:
코드
코드 는 다음 과 같 습 니 다:
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* MapReduce Join
*/
public class MapRedJoin {
public static final String DELIMITER = "\u0009"; //
// map
public static class MapClass extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
public void configure(JobConf job) {
super.configure(job);
}
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException, ClassCastException {
//
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
//
String line = value.toString();
//
if (line == null || line.equals("")) return;
// A
if (filePath.contains("m_ys_lab_jointest_a")) {
String[] values = line.split(DELIMITER); //
if (values.length < 2) return;
String id = values[0]; // id
String name = values[1]; // name
output.collect(new Text(id), new Text("a#"+name));
}
// B
else if (filePath.contains("m_ys_lab_jointest_b")) {
String[] values = line.split(DELIMITER); //
if (values.length < 3) return;
String id = values[0]; // id
String statyear = values[1]; // statyear
String num = values[2]; //num
output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));
}
}
}
// reduce
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
Vector<String> vecA = new Vector<String>(); // A
Vector<String> vecB = new Vector<String>(); // B
while (values.hasNext()) {
String value = values.next().toString();
if (value.startsWith("a#")) {
vecA.add(value.substring(2));
} else if (value.startsWith("b#")) {
vecB.add(value.substring(2));
}
}
int sizeA = vecA.size();
int sizeB = vecB.size();
//
int i, j;
for (i = 0; i < sizeA; i ++) {
for (j = 0; j < sizeB; j ++) {
output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));
}
}
}
}
protected void configJob(JobConf conf) {
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setOutputFormat(ReportOutFormat.class);
}
}
기술 세부 사항
다음은 그 중의 몇 가지 기술 세부 사항 을 말씀 드 리 겠 습 니 다.
(1) 입력 데이터 가 두 장의 표 와 관련 되 기 때문에 우 리 는 현재 처리 한 기록 이 표 A 에서 왔 는 지 표 B 에서 왔 는 지 판단 해 야 한다.Reporter 클래스 getInputSplit () 방법 은 입력 데이터 의 경 로 를 얻 을 수 있 습 니 다. 구체 적 인 코드 는 다음 과 같 습 니 다.
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
(2) map 의 출력 결과, 같은 id 의 모든 기록 (표 A 든 표 B 든) 은 같은 key 에서 같은 목록 에 저 장 됩 니 다. reduce 단계 에서 분리 하여 피리 칼 의 적 에 해당 하 는 m x n 개의 기록 으로 저장 해 야 합 니 다.m, n 이 얼마 인지 사전에 알 지 못 했 기 때문에 여 기 는 두 개의 벡터 (증가 가능 한 배열) 를 사용 하여 표 A 와 표 B 에서 온 기록 을 각각 저장 하고 두 겹 의 포 함 된 순환 으로 우리 가 필요 로 하 는 최종 결 과 를 구성 했다.
(3) MapReduce 에서 System. out. println () 방법 으로 출력 하여 디 버 깅 에 편리 하도록 할 수 있 습 니 다.그러나 System. out. println () 의 내용 은 터미널 에 표시 되 지 않 고 stdout 과 stderr 두 파일 에 출력 되 었 습 니 다. 이 두 파일 은 logs / userlogs / attempt 에 있 습 니 다.xxx 디 렉 터 리 아래.웹 엔 드 의 과거 기록 job 를 통 해 'Analyse This Job' 을 볼 수 있 습 니 다. stdout 과 stderr 의 내용 을 볼 수 있 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.