Hadoop 기본

빅데이터에서 RDB가 아닌 분산 DB를 사용하는 이유

  • 성능, 편리 면에서 좋지만 비용이 많이 사용됨
  • 빅데이터 분석에 RDB는 비효율적
  • 핵심은 Scale-out!!!

scale-out

  • 값싼 서버를 여러개 이용

scale-up

  • 비싼 서버를 여러개 이용

데이터 중심 어플리케이션 분야에선 값싼 서버를 여러개이용하는 scale-out을 선호

=> 가격이 두배라고 성능이 두배가 되지는 않기 때문

Hadoop 클러스터

  • Name node를 제외하고 수평적 확장이 가능
  • Hadoop HDFS에서 빅데이터에 맞게 Data Node 추가 가능
  • HDFS에 YARN이 올라가고 그위에 필요한 다른 Hive, Spark, HBase등이 올라감
  • NoSQL(Hive) / In_Memory(Spark) / SQL(Hive) - tez필요

맵리듀스 프레임워크(MapReduce)

  • 많은 수의 컴퓨터를 묶어서 클러스터를 만들고 빅데이터 처리를 위한 Scalable한 병렬 소프트웨어의 구현을 도와주는 프로그래밍모델
  • Low-Level의 시스템까지 알필요가 없기에 코딩하기 간단함
  • 구글의 MapReduce, 오픈소스 Hadoop - MapReduce Framework
  • Main함수가 Map함수와 Reduce함수를 호출해서 처리하는 방식
  • 각각의 record, tuple은 key, value 쌍으로 표현
  • Main함수(Driver에 해당) / Map함수(key, value) / Reduce함수(key, List[value])
  • main함수를 한개의 master machine에서 수행
  • master machine은 map함수를 수행하기전 전처리를 하거나 reduce함수의 결과를 후처리 하는데 사용가능

MapReduce Phase

  • map과 reduce라는 유저가 정의한 함수 한 쌍으로 이루어짐
  • 1번의 phase는 map함수를 호출한 뒤 reduce 함수를 호출하는데 map함수가 끝난 뒤에 combine함수를 중간에 수행할 수 있다.
  • main함수(driver)에서 mapreduce phase를 수행시킴
  • map phase -> shuffling phase -> reduce phase 순서로 수행
  • map phase
    • 제일 먼저 수행됨, 여러 파티션에 병렬 분산으로 호출되어 수행
    • 각 머신마다 수행된 Mapper는 입력 데이터의 한 줄마다 map함수 호출
    • map 함수는 Key,Value쌍 형태로 결과 출력, 여러 머신에 나누어 보내며 같은 Key를 가진 데이터는 같은 머신으로 보냄\
  • shuffling phase
    • 모든 머신에서 map phase가 끝나면 시작
    • 각각의 머신으로 보내진 (key, value)쌍을 key를 이용해서 sorting => 같은 key를 가진 데이터를 모아서 value-list를 만들고 (key, valueList)형태로 key에 따라 여러 머신에 분산해서 보냄
  • reduce phase
    • 모든 머신에서 shuffling phase가 끝나면 시작
    • shuffling phase마다 머신으로 보낸 각각의 (Key, ValueList)마다 reduce 함수가 호출, 하나의 reduce함수가 끝나면 다음 (key, valueList)쌍에 reduce 함수 호출
    • 출력이 될때는 (Key, Value)쌍으로 출력

Hadoop

  • Apache프로젝트의 MapReduce FrameWork의 오픈소스
  • HDFS(하둡 분산 파일 시스템)
    • 빅 데이터 파일을 여러 컴퓨터에 나눠 저장
    • 각 파일은 여러개의 순차적인 블록으로 저장
    • 하나의 파일의 각각의 블록은 여러개로 복사되어 여러머신에 저장(어느 하나가 문제가 생겨도 다른 머신에 있는 것으로 해결 가능)
    • 데이터를 분산
  • 빅 데이터를 수천대의 값싼 컴퓨터에 병렬 처리하기 위해 분산
  • MapReduce
    • 소프트웨어의 수행을 분산
  • 하나의 Namenode(master)
    • 파일 시스템 관리, 클라이언트가 파일에 접근할 수 있게 함
  • 여러 개의 Datanode(slaves)
    • 컴퓨터에 들어있는 데이터를 접근할 수 있게 함
  • 자바 언어로 MapReduce알고리즘을 구현

MapReduce 함수

  • Map함수
    • org.apache.hadoop.mapreduce 패키지의 Mapper클래스를 상속받아 map method수정
    • 입력 텍스트 파일에서 라인 단위 호출, 입력은 (key, valueList)현태
    • key는 입력 텍스트 파일에서 맨 앞 문자를 기준으로 맵 함수가 호출된 라인의 첫번째 문자까지 오프셋
    • value는 텍스트의 해당 라인 전체가 포함
  • Reduce함수
    • ..hadoop.mapreduce 패키지의 Reducer클래스를 상속받아 reduce method수정
    • 셔플링 페이즈의 출력을 입력으로 받음, (Key,valueList)의 형태
    • valueList는 맵 함수의 출력에서 key를 같은 key,value쌍의 value리스트
  • Combine 함수
    • reduce 함수와 유사, 각 머신의 map phase에서 map 함수의 출력 크기를 줄여서 shuffling phase와 reduce phase의 비용을 줄여줌

MapReduce

  • Mapper and Reducer
    • 각 머신에서 독립적으로 수행
    • Mapper는 map함수, Reducer은 reuce 함수 각각 수행
  • Combine functions
    • map함수 끝난 후 Reduce함수가 하는 일을 부분적으로 수행
    • 셔플링 비용과 네트워크 트래픽을 감소시킴
  • Mapper와 Reducer은 필요시 setup(), cleanup()을 수행 가능
    • setup()
      • 첫 map, reduce함수 호출 전 맨 먼저 수행
      • map함수들이 공유하는 자료구조를 초기화 시 사용
      • map함수들에게 전달해야 할 파라미터 정보를 main함수에서 받아오는데 사용
    • cleanup()
      • 마지막 map, reduce함수 끝난 후 수행
      • 모든 map함수들이 공유하는 자료구조의 결과를 출력하는데 사용
  • 한 개의 mapreduce job을 수행할 때 map phase만 수행하고 중단 가능

Linux설치 후 그 위에 Hadoop설치함

→ Hadoop에서 MapReduce코드를 실행시키면 여기에 만들어주고 그걸 실행시킬 수 있음

→ src : 실제 코드를 넣는 곳

→ template : 과제를 넣는곳

→ datagen : data를 generate할 수 있는 프로그램 들어있음

→ data : data 저장

→ build.xml : Map reduce를 위한 것들이 들어있음

  • MapReduce코드를 작성 -> Template폴더의 템플릿을 가져가서 코딩 -> src파일에 넣어서 컴파일, 수행

  • Hadoop, MapReduce코드를 돌릴려면 HDFS에 데이터를 옮겨야함

  • src 디렉토리에 코드를 만들 때마다 pgd.addClass를 넣어야함

  • ant를 수행하면 컴파일됨

  • =⇒wordcount코드가 수행되고 ssafy.jar이라는 패키지를 만들어짐

  • =⇒ wordcout코드의 argument를 2개 작성 : 입력 파일이 들어있는 dir, 출력 파일이 들어갈 dir

Driver.java

→ 여기까지하면 MapReduce코드를 돌릴 준비 완료

Hadoop 실행 방법

$ hadoop jar [jar file][program name] <intput arguments ...>

ex) $ hadoop jar ssafy.jar wordcount wordcount_test wordcount_test_out

결과확인

  • Reducer개수에 따라 출력 파일 개수 결정(ex. reducer가 2개 ⇒ 출력 파일 2개)
  • cat명령어 : unix에서 파일을 화면에 찍어보는 명령어

map함수

  • 하둡의 text type은 input으로 못주기 때문에 String으로 바꿈
  • context.write : map함수가 내보내는 출력

Partitioner Class

  • Map함수에서 (Key, Value)쌍이 어떤 Reducer(머신)으로 보내질 것인지에 대한 결정을 정의하는 Class(Key값이 같으면 같은 Reducer로 보냄)
  • Reducer로 보내는 방법, 수량 등을 바꾸고 싶으면 이 Partitioner Class를 수정하면 된다.
  • Hadoop에서는 Hash함수가 Default로 제공되기 때문에 Key에 대한 해시 값에 따라 어떤 Reducer로 보내질지 결정
  • Hadoop 기본타입
    • Text
    • IntWritable
    • LongWritable
    • FloatWritable
    • DoubleWritable
  • EX) Key 값을 30이하, 초과로 나누고 싶은 경우 Partitioner class 수정
  1. Partitioner Class를 상속받아 MyPartitioner라는 Class 생성
  2. getPartition 함수를 Override함(numPartitions : Reducer의 갯수를 몇 개 설정했냐에 관계된 것)
  3. Key의 값에 따라 0,1머신으로 보냄
  4. 메인 함수에 추가
  5. Partitioner class를 import

Inverted Index

  • 여러 문서와 단어가 있을 때 각 단어마다 해당 단어가 나타나는 문서 아이디(ex. doc1)와 그 위치를 doc:position 형태로 리스트를 만들어줌
  • 문서에서 검색하는 일을 할 때 유용

Inverted index 를 생성하는 MapReduce 알고리즘

  1. 문서마다 map함수가 호출되어 단어마다 value를 만들어 출력

  2. 셔플링 페이즈 : map 함수가 출력한 value를 같은 key끼리 모아 list를 만들어서 출력

  3. 셔플링 페이즈가 끝난 후 각각의 Key마다 Reduce함수가 호출됨 :

    ValueList에 있는 Value들을 합쳐서 (Key, Value)형태로 출

    ⇒ 출력 : map함수(Key, Value)→셔플링페이즈(Key, ValueList)→Reduce함수(Key, Value)

→ 지금 map함수가 호출 된 dataFile 이름을 return

Reduce 함수 결과 출력 디렉토리 자동 삭제 구현

추가

  • import
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.fs.FileSystem;
  • map함수
public static class TokenizerMapper
                        extends Mapper<Object,Text,Text,Text> {

                // variable declairations
                private Text pos = new Text();		//value로 사용할 변수
                private Text word = new Text();		//key로 사용할 변수
                private String filename;			//현재 읽는 파일의 이름을 넣을 변수
                protected void setup(Context context) throws IOException, InterruptedException{
                        filename = ((FileSplit)context.getInputSplit()).getPath().getName(); 	//setup함수에서 filename에 현재사용하는 파일 이름을 뽑아서 저장
                }

                // map function (Context -> fixed parameter)
                public void map(Object key, Text value, Context context)
                                throws IOException, InterruptedException {		//입력 파일을 한 줄 읽고

                        StringTokenizer itr = new StringTokenizer(value.toString(), " ", true); 	//단어 단위로 자르기
                        long p = ((LongWritable)key).get();			//현재 위치가 시작지점에서 몇 byte인지 표시
                        while ( itr.hasMoreTokens() ) {			//다음 단어가 없을 때까지 반복
                                String token = itr.nextToken();			//다음 단어 가져오기
                                word.set(token.trim());				//word에 단어를 잘라서 넣음

                                if(! token.equals(" ")){				//단어가 존재하는 경우
                                        pos.set(filename+":"+p);			//value에 파일의 이름과 현재 위치를 저장
                                        context.write(word,pos);			//key와 value를 출력
                                }
														    
																p+=token.length();				//현재 위치에 단어 길이만큼 더해줌
                        }
                }
        }
  • Reduce
public static class IntSumReducer
                        extends Reducer<Text,Text,Text,Text> {

                        private Text list = new Text();

                // key : a disticnt word
                // values :  Iterable type (data list)
                public void reduce(Text key, Iterable<Text> values, Context context)       
                                throws IOException, InterruptedException {

                        String s = new String();
                        int comma = 0;
                        for (Text val : values ) {	// list의 값들을 하나씩 수행
                                if(comma == 0){
                                        comma = 1;
                                        s += (":"+val.toString());	//처음에는 앞에 ,를 붙이지 않음
                                }
                                else
                                        s += (",        "+val.toString());	//두번째 부터 앞에 컴마를 넣어줌
                        }
                        list.set(s);	//value로 사용할 list에 저장
                        context.write(key,list);	//key와 list 저장
                }
        }
  • Main
public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
                if ( otherArgs.length != 2 ) {		//'hadoop jar jarname.jar [프로그램명]'을 제외한 변수 저장
                        System.err.println("Usage: <in> <out>");
                        System.exit(2);
                }

                FileSystem hdfs = FileSystem.get(conf);	
                Path output = new Path(otherArgs[1]);
                if(hdfs.exists(output)) hdfs.delete(output, true);	//output디렉토리 자동 삭제

                Job job = new Job(conf,"inverted count");
                job.setJarByClass(InvertedIndex.class);		//Class 명 설정

                job.setMapperClass(TokenizerMapper.class);		//Map Class 설정
                job.setReducerClass(IntSumReducer.class);		//Reduce Class 설정

                job.setOutputKeyClass(Text.class);			//output key type 설정
                job.setOutputValueClass(Text.class);			//output value type 설정

                job.setNumReduceTasks(2);			//동시에 수행되는 reduce개수 설정

                FileInputFormat.addInputPath(job,new Path(otherArgs[0]));	//입력파일 디렉토리 설정
                FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));	//출력파일 디렉토리 설정
                FileSystem.get(job.getConfiguration()).delete(new Path(otherArgs[1]),true);
									System.exit(job.waitForCompletion(true) ? 0 : 1 ); //실행
        }

Matrix Addition

순서

  1. MatrixAdd.java 파일 작성
  2. Project/src/Driver.java 파일 수정
  3. 실행
$ ant
//matrix data를 hdfs에 넣어야 하기때문에 matadd_test라는 input이 들어갈 dir생성
$ hdfs dfs -mkdir matadd_test
//.txt파일은 input data로써 matadd_test dir에 넣음
$ hdfs dfs -put data/matadd-data-2x2.txt matadd_test

//실행시키고 결과 확인
$ hadoop jar ssafy.jar matadd matadd_test matadd_test_out
$ hdfs dfs -cat matadd_test_out/part-r-00000 | more
$ hdfs dfs -cat matadd_test_out/part-r-00001 | more

MatrixAdd.java

public static class MAddMapper  extends  Mapper<Object, Text, Text , IntWritable>{

     public void map(Object key, Text value, Context context)
	throws IOException, InterruptedException {
		String arr[] = value.toString().split("\t");  
 		Text nextKey = new Text(arr[1]+"\t"+arr[2]);
		IntWritable nextValue = new IntWritable(Integer.parseInt(arr[3]));
		context.write(nextKey, nextValue);
     }
  }
  public static class  MAddReducer  extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text  key, Iterable<IntWritable> values, Context  context) 
	throws IOException, InterruptedException {

		int sum = 0;
		for(IntWritable value : values){
			sum += value.get();
		}
		context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: <in> <out>");
      System.exit(2);
    }
    FileSystem hdfs = FileSystem.get(conf);
    Path output = new Path(otherArgs[1]);
    if (hdfs.exists(output))
            hdfs.delete(output, true);

    Job job = new Job(conf, "matrix addition");
    job.setJarByClass(MatrixAdd.class);
    job.setMapperClass(MAddMapper.class);	
    job.setReducerClass(MAddReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);	
    job.setNumReduceTasks(2);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1)
  }

Matrix Multiplication

  • 행렬 A와 B가 compatible하다 : 행렬A의 열의 개수와 행렬 B의 행의 개수가 일치 ⇒ 곱이 가능하다
  • EX)

2-Phase Matrix Multiplication

  • map함수에서 value의 좌표(p)를 key로 넣어준 것(앞의 Matrix의 경우 key(r,p,c))
  • 뒤 Matirx의 경우 key(p,r,c)

Phase1(Reduce)

  • A Matrix의 key(r,p,c) value(n)과 B Matrix의 key(p,r,c) value(m)을 비교해서 key(r,c) value(n*m)을 출력

Phase2(Map→Reduce)

  • list로 reduce에 input
  • 해당 값의 합으로 output
  • value list에 하나씩 읽어서 더하고 버림 → 1Phase에 비해 main 메모리에서 부담이 줄어듦

장단점

  • reduce phase는 오버헤드가 큼 → 값을 메인메모리에 넣을 수 있다면 1phase가 빠름
  • 2Phase의 장점 : main memory에 대한 요구사항이 적음

코드

  • setup
public static class MMMapper extends Mapper<Object, Text, Text, Text>{
                private Text keypair = new Text();      // 
                private Text valpair= new Text();       //
                private String Matrix1name;
                private String Matrix2name;

                private int n;  //      1번째 matrix의 row갯수
                private int l;	// 1번째 matrix의 column갯수
                private int m;  //    2번째 matrix의 column갯수
 
                protected void setup(Context context) throws IOException, InterruptedException {	//setup함수에서 값을 넣어줌
                        // ------------------------------------------------------
                        Configuration config = context.getConfiguration();
                        Matrix1name = config.get("Matrix1name","A");		//Matrix1name으로 된 String을 받아오는데 없으면 A를 가져옴
                        Matrix2name = config.get("Matrix2name","B");

                        n = config.getInt("n",10);				//n으로 된 int를 받아오는데 없으면 1을 가져옴
                        l = config.getInt("l",10);
                        m = config.getInt("m",10);
                        // ------------------------------------------------------
                }
  • Main
public void map(Object key, Text value, Context context
                                ) throws IOException, InterruptedException {

                        // TODO
                        // ------------------------------------------------------
                        StringTokenizer token = new StringTokenizer(value.toString());
                        String mat = token.nextToken();
                        int row = Integer.parseInt(token.nextToken());
                        int col = Integer.parseInt(token.nextToken());
                        int v = Integer.parseInt(token.nextToken());

                        if(mat.equals(Matrix1name)){		//matrix이름이 1인 경우
                                valpair.set(""+col+" "+v);	//value값에 col의 위치와 v값을 입력
                                for(int j=0; j<m; j++){		//row는 고정, col값을 1칸씩 증가시키며 key, value를 넣어준다
                                        String p = "" + row + "," + j;
                                        keypair.set(p);
                                        context.write(keypair, valpair);
                                }
                        }
                        else if(mat.equals(Matrix2name)){	//2인 경우 row와 v값을 입력
                                valpair.set(""+row+" "+v);
                                for(int i=0; i<n; i++){		//value값은 col은 고정, row를 1개 씩 증가하며 넣어준다
                                        String p = "" + i + "," + col;
                                        keypair.set(p);
                                        context.write(keypari, valpair);
                                }       
                        }
                        // ------------------------------------------------------
                }
  • Reduce

좋은 웹페이지 즐겨찾기