MapReduce join 작업 실현

6333 단어
얼마 전에 MapReduce 가 join 작업 을 실현 하 는 알고리즘 을 구상 하 였 으 나 코드 차원 에서 착지 하지 않 았 다.오늘 드디어 시간 을 들 여 전체 절 차 를 한 번 걸 었 습 니 다. 그 동안 많은 번 거 로 움 을 겪 었 고 결국은 이 를 일일이 해결 할 수 있 었 습 니 다. 다시 한 번 깊이 깨 달 았 습 니 다. 계산 모델 부터 알고리즘 실현 까지 아직도 가 야 할 길이 많 습 니 다.
데이터 준비
우선 데 이 터 를 준비 하 세 요.이것 은 이미 숙련 된 과정 입 니 다. 예제 데 이 터 를 준비 하고 경로 와 필드 구분 자 를 기억 하 는 것 입 니 다.
다음 두 장의 시 계 를 준비 하 세 요.
(1)m_ys_lab_jointest_a (이하 표 A 로 약칭)
탭 문 구 는 다음 과 같 습 니 다.
create table if not exists m_ys_lab_jointest_a (
     id bigint,
     name string
)
row format delimited
fields terminated by '9'
lines terminated by '10'
stored as textfile;

데이터:
id     name
1     중국 북경
2     중국 천진
3     하북성
4     산 시 성 (산서성) 2
5     내몽고
6     랴오닝 성 2
7     길림성
8     흑룡강
(2)m_ys_lab_jointest_b (이하 표 B 로 약칭)
탭 문 구 는 다음 과 같 습 니 다.
create table if not exists m_ys_lab_jointest_b (
     id bigint,
     statyear bigint,
     num bigint
)
row format delimited
fields terminated by '9'
lines terminated by '10'
stored as textfile;

데이터:
id     statyear     num
1     2010     1962
1     2011     2019
2     2010     1299
2     2011     1355
4     2010     3574
4     2011     3593
9     2010     2303
9     2011     2347
우리 의 목적 은 id 를 key 로 join 작업 을 하고 다음 표를 얻 는 것 입 니 다.
m_ys_lab_jointest_ab
id     name    statyear     num
1       중국 북경    2011    2019
1       중국 북경    2010    1962
2       중국 천진    2011    1355
2       중국 천진    2010    1299
4       산 시 성 (산서성) 2    2011    3593
4       산 시 성 (산서성) 2    2010    3574
계산 모형
전체 계산 과정 은:
(1) map 단계 에서 모든 기록 을 < key, value > 형식 으로 표시 합 니 다. 그 중에서 key 는 id 이 고 value 는 출처 에 따라 서로 다른 형식 을 가 집 니 다. 표 A 의 기록 에서 기원 되 고 value 의 값 은 'a \ #' + name 입 니 다.표 B 의 기록 에서 유래 했 습 니 다. value 의 값 은 'b \ #' + score 입 니 다.
(2) reduce 단계 에서 각 key 아래 의 value 목록 을 각각 표 A 와 표 B 의 두 부분 으로 나 누 어 각각 두 개의 벡터 에 넣는다.그리고 두 개의 벡터 를 옮 겨 다 니 며 피리 칼 적 을 만들어 하나의 최종 결 과 를 이룬다.
다음 그림 에서 보 듯 이:
코드
코드 는 다음 과 같 습 니 다:
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * MapReduce  Join  
 */
public class MapRedJoin {
	public static final String DELIMITER = "\u0009"; //      
	
	// map  
	public static class MapClass extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, Text> {
						
		public void configure(JobConf job) {
			super.configure(job);
		}
		
		public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
				Reporter reporter) throws IOException, ClassCastException {
			//              
			String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
			//        
			String line = value.toString();
			//      
			if (line == null || line.equals("")) return; 
			
			//      A   
			if (filePath.contains("m_ys_lab_jointest_a")) {
				String[] values = line.split(DELIMITER); //          
				if (values.length < 2) return;
				
				String id = values[0]; // id
				String name = values[1]; // name
				
				output.collect(new Text(id), new Text("a#"+name));
			}
			//      B   
			else if (filePath.contains("m_ys_lab_jointest_b")) {
				String[] values = line.split(DELIMITER); //          
				if (values.length < 3) return;
				
				String id = values[0]; // id
				String statyear = values[1]; // statyear
				String num = values[2]; //num
				
				output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));
			}
		}
	}
	
	// reduce  
	public static class Reduce extends MapReduceBase
			implements Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterator<Text> values,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
					
			Vector<String> vecA = new Vector<String>(); //      A  
			Vector<String> vecB = new Vector<String>(); //      B  
			
			while (values.hasNext()) {
				String value = values.next().toString();
				if (value.startsWith("a#")) {
					vecA.add(value.substring(2));
				} else if (value.startsWith("b#")) {
					vecB.add(value.substring(2));
				}
			}
			
			int sizeA = vecA.size();
			int sizeB = vecB.size();
			
			//       
			int i, j;
			for (i = 0; i < sizeA; i ++) {
				for (j = 0; j < sizeB; j ++) {
					output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));
				}
			}	
		}
	}
	
	protected void configJob(JobConf conf) {
		conf.setMapOutputKeyClass(Text.class);
		conf.setMapOutputValueClass(Text.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(Text.class);
		conf.setOutputFormat(ReportOutFormat.class);
	}
}

기술 세부 사항
다음은 그 중의 몇 가지 기술 세부 사항 을 말씀 드 리 겠 습 니 다.
(1) 입력 데이터 가 두 장의 표 와 관련 되 기 때문에 우 리 는 현재 처리 한 기록 이 표 A 에서 왔 는 지 표 B 에서 왔 는 지 판단 해 야 한다.Reporter 클래스 getInputSplit () 방법 은 입력 데이터 의 경 로 를 얻 을 수 있 습 니 다. 구체 적 인 코드 는 다음 과 같 습 니 다.
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
(2) map 의 출력 결과, 같은 id 의 모든 기록 (표 A 든 표 B 든) 은 같은 key 에서 같은 목록 에 저 장 됩 니 다. reduce 단계 에서 분리 하여 피리 칼 의 적 에 해당 하 는 m x n 개의 기록 으로 저장 해 야 합 니 다.m, n 이 얼마 인지 사전에 알 지 못 했 기 때문에 여 기 는 두 개의 벡터 (증가 가능 한 배열) 를 사용 하여 표 A 와 표 B 에서 온 기록 을 각각 저장 하고 두 겹 의 포 함 된 순환 으로 우리 가 필요 로 하 는 최종 결 과 를 구성 했다.
(3) MapReduce 에서 System. out. println () 방법 으로 출력 하여 디 버 깅 에 편리 하도록 할 수 있 습 니 다.그러나 System. out. println () 의 내용 은 터미널 에 표시 되 지 않 고 stdout 과 stderr 두 파일 에 출력 되 었 습 니 다. 이 두 파일 은 logs / userlogs / attempt 에 있 습 니 다.xxx 디 렉 터 리 아래.웹 엔 드 의 과거 기록 job 를 통 해 'Analyse This Job' 을 볼 수 있 습 니 다. stdout 과 stderr 의 내용 을 볼 수 있 습 니 다.

좋은 웹페이지 즐겨찾기