Hadoop 기본
빅데이터에서 RDB가 아닌 분산 DB를 사용하는 이유
- 성능, 편리 면에서 좋지만 비용이 많이 사용됨
- 빅데이터 분석에 RDB는 비효율적
- 핵심은 Scale-out!!!
scale-out
- 값싼 서버를 여러개 이용
scale-up
- 비싼 서버를 여러개 이용
데이터 중심 어플리케이션 분야에선 값싼 서버를 여러개이용하는 scale-out을 선호
=> 가격이 두배라고 성능이 두배가 되지는 않기 때문
Hadoop 클러스터
- Name node를 제외하고 수평적 확장이 가능
- Hadoop HDFS에서 빅데이터에 맞게 Data Node 추가 가능
- HDFS에 YARN이 올라가고 그위에 필요한 다른 Hive, Spark, HBase등이 올라감
- NoSQL(Hive) / In_Memory(Spark) / SQL(Hive) - tez필요
맵리듀스 프레임워크(MapReduce)
- 많은 수의 컴퓨터를 묶어서 클러스터를 만들고 빅데이터 처리를 위한 Scalable한 병렬 소프트웨어의 구현을 도와주는 프로그래밍모델
- Low-Level의 시스템까지 알필요가 없기에 코딩하기 간단함
- 구글의 MapReduce, 오픈소스 Hadoop - MapReduce Framework
- Main함수가 Map함수와 Reduce함수를 호출해서 처리하는 방식
- 각각의 record, tuple은 key, value 쌍으로 표현
- Main함수(Driver에 해당) / Map함수(key, value) / Reduce함수(key, List[value])
- main함수를 한개의 master machine에서 수행
- master machine은 map함수를 수행하기전 전처리를 하거나 reduce함수의 결과를 후처리 하는데 사용가능
MapReduce Phase
- map과 reduce라는 유저가 정의한 함수 한 쌍으로 이루어짐
- 1번의 phase는 map함수를 호출한 뒤 reduce 함수를 호출하는데 map함수가 끝난 뒤에 combine함수를 중간에 수행할 수 있다.
- main함수(driver)에서 mapreduce phase를 수행시킴
- map phase -> shuffling phase -> reduce phase 순서로 수행
- map phase
- 제일 먼저 수행됨, 여러 파티션에 병렬 분산으로 호출되어 수행
- 각 머신마다 수행된 Mapper는 입력 데이터의 한 줄마다 map함수 호출
- map 함수는 Key,Value쌍 형태로 결과 출력, 여러 머신에 나누어 보내며 같은 Key를 가진 데이터는 같은 머신으로 보냄\
- shuffling phase
- 모든 머신에서 map phase가 끝나면 시작
- 각각의 머신으로 보내진 (key, value)쌍을 key를 이용해서 sorting => 같은 key를 가진 데이터를 모아서 value-list를 만들고 (key, valueList)형태로 key에 따라 여러 머신에 분산해서 보냄
- reduce phase
- 모든 머신에서 shuffling phase가 끝나면 시작
- shuffling phase마다 머신으로 보낸 각각의 (Key, ValueList)마다 reduce 함수가 호출, 하나의 reduce함수가 끝나면 다음 (key, valueList)쌍에 reduce 함수 호출
- 출력이 될때는 (Key, Value)쌍으로 출력
Hadoop
- Apache프로젝트의 MapReduce FrameWork의 오픈소스
- HDFS(하둡 분산 파일 시스템)
- 빅 데이터 파일을 여러 컴퓨터에 나눠 저장
- 각 파일은 여러개의 순차적인 블록으로 저장
- 하나의 파일의 각각의 블록은 여러개로 복사되어 여러머신에 저장(어느 하나가 문제가 생겨도 다른 머신에 있는 것으로 해결 가능)
- 데이터를 분산
- 빅 데이터를 수천대의 값싼 컴퓨터에 병렬 처리하기 위해 분산
- MapReduce
- 소프트웨어의 수행을 분산
- 하나의 Namenode(master)
- 파일 시스템 관리, 클라이언트가 파일에 접근할 수 있게 함
- 여러 개의 Datanode(slaves)
- 컴퓨터에 들어있는 데이터를 접근할 수 있게 함
- 자바 언어로 MapReduce알고리즘을 구현
MapReduce 함수
- Map함수
- org.apache.hadoop.mapreduce 패키지의 Mapper클래스를 상속받아 map method수정
- 입력 텍스트 파일에서 라인 단위 호출, 입력은 (key, valueList)현태
- key는 입력 텍스트 파일에서 맨 앞 문자를 기준으로 맵 함수가 호출된 라인의 첫번째 문자까지 오프셋
- value는 텍스트의 해당 라인 전체가 포함
- Reduce함수
- ..hadoop.mapreduce 패키지의 Reducer클래스를 상속받아 reduce method수정
- 셔플링 페이즈의 출력을 입력으로 받음, (Key,valueList)의 형태
- valueList는 맵 함수의 출력에서 key를 같은 key,value쌍의 value리스트
- Combine 함수
- reduce 함수와 유사, 각 머신의 map phase에서 map 함수의 출력 크기를 줄여서 shuffling phase와 reduce phase의 비용을 줄여줌
MapReduce
- Mapper and Reducer
- 각 머신에서 독립적으로 수행
- Mapper는 map함수, Reducer은 reuce 함수 각각 수행
- Combine functions
- map함수 끝난 후 Reduce함수가 하는 일을 부분적으로 수행
- 셔플링 비용과 네트워크 트래픽을 감소시킴
- Mapper와 Reducer은 필요시 setup(), cleanup()을 수행 가능
- setup()
- 첫 map, reduce함수 호출 전 맨 먼저 수행
- map함수들이 공유하는 자료구조를 초기화 시 사용
- map함수들에게 전달해야 할 파라미터 정보를 main함수에서 받아오는데 사용
- cleanup()
- 마지막 map, reduce함수 끝난 후 수행
- 모든 map함수들이 공유하는 자료구조의 결과를 출력하는데 사용
- setup()
- 한 개의 mapreduce job을 수행할 때 map phase만 수행하고 중단 가능
Linux설치 후 그 위에 Hadoop설치함
→ Hadoop에서 MapReduce코드를 실행시키면 여기에 만들어주고 그걸 실행시킬 수 있음
→ src : 실제 코드를 넣는 곳
→ template : 과제를 넣는곳
→ datagen : data를 generate할 수 있는 프로그램 들어있음
→ data : data 저장
→ build.xml : Map reduce를 위한 것들이 들어있음
-
MapReduce코드를 작성 -> Template폴더의 템플릿을 가져가서 코딩 -> src파일에 넣어서 컴파일, 수행
-
Hadoop, MapReduce코드를 돌릴려면 HDFS에 데이터를 옮겨야함
-
src 디렉토리에 코드를 만들 때마다 pgd.addClass를 넣어야함
-
ant를 수행하면 컴파일됨
-
=⇒wordcount코드가 수행되고 ssafy.jar이라는 패키지를 만들어짐
-
=⇒ wordcout코드의 argument를 2개 작성 : 입력 파일이 들어있는 dir, 출력 파일이 들어갈 dir
Driver.java
→ 여기까지하면 MapReduce코드를 돌릴 준비 완료
Hadoop 실행 방법
$ hadoop jar [jar file][program name] <intput arguments ...>
ex) $ hadoop jar ssafy.jar wordcount wordcount_test wordcount_test_out
결과확인
- Reducer개수에 따라 출력 파일 개수 결정(ex. reducer가 2개 ⇒ 출력 파일 2개)
- cat명령어 : unix에서 파일을 화면에 찍어보는 명령어
map함수
- 하둡의 text type은 input으로 못주기 때문에 String으로 바꿈
- context.write : map함수가 내보내는 출력
Partitioner Class
- Map함수에서 (Key, Value)쌍이 어떤 Reducer(머신)으로 보내질 것인지에 대한 결정을 정의하는 Class(Key값이 같으면 같은 Reducer로 보냄)
- Reducer로 보내는 방법, 수량 등을 바꾸고 싶으면 이 Partitioner Class를 수정하면 된다.
- Hadoop에서는 Hash함수가 Default로 제공되기 때문에 Key에 대한 해시 값에 따라 어떤 Reducer로 보내질지 결정
- Hadoop 기본타입
- Text
- IntWritable
- LongWritable
- FloatWritable
- DoubleWritable
- EX) Key 값을 30이하, 초과로 나누고 싶은 경우 Partitioner class 수정
- Partitioner Class를 상속받아 MyPartitioner라는 Class 생성
- getPartition 함수를 Override함(numPartitions : Reducer의 갯수를 몇 개 설정했냐에 관계된 것)
- Key의 값에 따라 0,1머신으로 보냄
- 메인 함수에 추가
- Partitioner class를 import
Inverted Index
- 여러 문서와 단어가 있을 때 각 단어마다 해당 단어가 나타나는 문서 아이디(ex. doc1)와 그 위치를 doc:position 형태로 리스트를 만들어줌
- 문서에서 검색하는 일을 할 때 유용
Inverted index 를 생성하는 MapReduce 알고리즘
-
문서마다 map함수가 호출되어 단어마다 value를 만들어 출력
-
셔플링 페이즈 : map 함수가 출력한 value를 같은 key끼리 모아 list를 만들어서 출력
-
셔플링 페이즈가 끝난 후 각각의 Key마다 Reduce함수가 호출됨 :
ValueList에 있는 Value들을 합쳐서 (Key, Value)형태로 출
⇒ 출력 : map함수(Key, Value)→셔플링페이즈(Key, ValueList)→Reduce함수(Key, Value)
→ 지금 map함수가 호출 된 dataFile 이름을 return
Reduce 함수 결과 출력 디렉토리 자동 삭제 구현
추가
- import
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.fs.FileSystem;
- map함수
public static class TokenizerMapper
extends Mapper<Object,Text,Text,Text> {
// variable declairations
private Text pos = new Text(); //value로 사용할 변수
private Text word = new Text(); //key로 사용할 변수
private String filename; //현재 읽는 파일의 이름을 넣을 변수
protected void setup(Context context) throws IOException, InterruptedException{
filename = ((FileSplit)context.getInputSplit()).getPath().getName(); //setup함수에서 filename에 현재사용하는 파일 이름을 뽑아서 저장
}
// map function (Context -> fixed parameter)
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException { //입력 파일을 한 줄 읽고
StringTokenizer itr = new StringTokenizer(value.toString(), " ", true); //단어 단위로 자르기
long p = ((LongWritable)key).get(); //현재 위치가 시작지점에서 몇 byte인지 표시
while ( itr.hasMoreTokens() ) { //다음 단어가 없을 때까지 반복
String token = itr.nextToken(); //다음 단어 가져오기
word.set(token.trim()); //word에 단어를 잘라서 넣음
if(! token.equals(" ")){ //단어가 존재하는 경우
pos.set(filename+":"+p); //value에 파일의 이름과 현재 위치를 저장
context.write(word,pos); //key와 value를 출력
}
p+=token.length(); //현재 위치에 단어 길이만큼 더해줌
}
}
}
- Reduce
public static class IntSumReducer
extends Reducer<Text,Text,Text,Text> {
private Text list = new Text();
// key : a disticnt word
// values : Iterable type (data list)
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String s = new String();
int comma = 0;
for (Text val : values ) { // list의 값들을 하나씩 수행
if(comma == 0){
comma = 1;
s += (":"+val.toString()); //처음에는 앞에 ,를 붙이지 않음
}
else
s += (", "+val.toString()); //두번째 부터 앞에 컴마를 넣어줌
}
list.set(s); //value로 사용할 list에 저장
context.write(key,list); //key와 list 저장
}
}
- Main
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if ( otherArgs.length != 2 ) { //'hadoop jar jarname.jar [프로그램명]'을 제외한 변수 저장
System.err.println("Usage: <in> <out>");
System.exit(2);
}
FileSystem hdfs = FileSystem.get(conf);
Path output = new Path(otherArgs[1]);
if(hdfs.exists(output)) hdfs.delete(output, true); //output디렉토리 자동 삭제
Job job = new Job(conf,"inverted count");
job.setJarByClass(InvertedIndex.class); //Class 명 설정
job.setMapperClass(TokenizerMapper.class); //Map Class 설정
job.setReducerClass(IntSumReducer.class); //Reduce Class 설정
job.setOutputKeyClass(Text.class); //output key type 설정
job.setOutputValueClass(Text.class); //output value type 설정
job.setNumReduceTasks(2); //동시에 수행되는 reduce개수 설정
FileInputFormat.addInputPath(job,new Path(otherArgs[0])); //입력파일 디렉토리 설정
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //출력파일 디렉토리 설정
FileSystem.get(job.getConfiguration()).delete(new Path(otherArgs[1]),true);
System.exit(job.waitForCompletion(true) ? 0 : 1 ); //실행
}
Matrix Addition
순서
- MatrixAdd.java 파일 작성
- Project/src/Driver.java 파일 수정
- 실행
$ ant
//matrix data를 hdfs에 넣어야 하기때문에 matadd_test라는 input이 들어갈 dir생성
$ hdfs dfs -mkdir matadd_test
//.txt파일은 input data로써 matadd_test dir에 넣음
$ hdfs dfs -put data/matadd-data-2x2.txt matadd_test
//실행시키고 결과 확인
$ hadoop jar ssafy.jar matadd matadd_test matadd_test_out
$ hdfs dfs -cat matadd_test_out/part-r-00000 | more
$ hdfs dfs -cat matadd_test_out/part-r-00001 | more
MatrixAdd.java
public static class MAddMapper extends Mapper<Object, Text, Text , IntWritable>{
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String arr[] = value.toString().split("\t");
Text nextKey = new Text(arr[1]+"\t"+arr[2]);
IntWritable nextValue = new IntWritable(Integer.parseInt(arr[3]));
context.write(nextKey, nextValue);
}
}
public static class MAddReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value : values){
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
FileSystem hdfs = FileSystem.get(conf);
Path output = new Path(otherArgs[1]);
if (hdfs.exists(output))
hdfs.delete(output, true);
Job job = new Job(conf, "matrix addition");
job.setJarByClass(MatrixAdd.class);
job.setMapperClass(MAddMapper.class);
job.setReducerClass(MAddReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1)
}
Matrix Multiplication
- 행렬 A와 B가 compatible하다 : 행렬A의 열의 개수와 행렬 B의 행의 개수가 일치 ⇒ 곱이 가능하다
- EX)
2-Phase Matrix Multiplication
- map함수에서 value의 좌표(p)를 key로 넣어준 것(앞의 Matrix의 경우 key(r,p,c))
- 뒤 Matirx의 경우 key(p,r,c)
Phase1(Reduce)
- A Matrix의 key(r,p,c) value(n)과 B Matrix의 key(p,r,c) value(m)을 비교해서 key(r,c) value(n*m)을 출력
Phase2(Map→Reduce)
- list로 reduce에 input
- 해당 값의 합으로 output
- value list에 하나씩 읽어서 더하고 버림 → 1Phase에 비해 main 메모리에서 부담이 줄어듦
장단점
- reduce phase는 오버헤드가 큼 → 값을 메인메모리에 넣을 수 있다면 1phase가 빠름
- 2Phase의 장점 : main memory에 대한 요구사항이 적음
코드
- setup
public static class MMMapper extends Mapper<Object, Text, Text, Text>{
private Text keypair = new Text(); //
private Text valpair= new Text(); //
private String Matrix1name;
private String Matrix2name;
private int n; // 1번째 matrix의 row갯수
private int l; // 1번째 matrix의 column갯수
private int m; // 2번째 matrix의 column갯수
protected void setup(Context context) throws IOException, InterruptedException { //setup함수에서 값을 넣어줌
// ------------------------------------------------------
Configuration config = context.getConfiguration();
Matrix1name = config.get("Matrix1name","A"); //Matrix1name으로 된 String을 받아오는데 없으면 A를 가져옴
Matrix2name = config.get("Matrix2name","B");
n = config.getInt("n",10); //n으로 된 int를 받아오는데 없으면 1을 가져옴
l = config.getInt("l",10);
m = config.getInt("m",10);
// ------------------------------------------------------
}
- Main
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
// TODO
// ------------------------------------------------------
StringTokenizer token = new StringTokenizer(value.toString());
String mat = token.nextToken();
int row = Integer.parseInt(token.nextToken());
int col = Integer.parseInt(token.nextToken());
int v = Integer.parseInt(token.nextToken());
if(mat.equals(Matrix1name)){ //matrix이름이 1인 경우
valpair.set(""+col+" "+v); //value값에 col의 위치와 v값을 입력
for(int j=0; j<m; j++){ //row는 고정, col값을 1칸씩 증가시키며 key, value를 넣어준다
String p = "" + row + "," + j;
keypair.set(p);
context.write(keypair, valpair);
}
}
else if(mat.equals(Matrix2name)){ //2인 경우 row와 v값을 입력
valpair.set(""+row+" "+v);
for(int i=0; i<n; i++){ //value값은 col은 고정, row를 1개 씩 증가하며 넣어준다
String p = "" + i + "," + col;
keypair.set(p);
context.write(keypari, valpair);
}
}
// ------------------------------------------------------
}
- Reduce
Author And Source
이 문제에 관하여(Hadoop 기본), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@kht1997/Hadoop-기본저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)