Hadoop 파 티 셔 너 구성 요소

14634 단어 hadoop빅 데이터
1. Partitioner 구성 요 소 는 맵 이 Key 에 대해 구역 을 나 눌 수 있 고 키 에 따라 서로 다른 reduce 에 나 누 어 처리 할 수 있 습 니 다.2. 데이터 파일 이 서로 다른 성 을 포함 하 는 등 key 의 배포 규칙 을 사용자 정의 할 수 있 습 니 다. 출력 요 구 는 각 성 마다 하나의 파일 을 출력 하 는 것 입 니 다. 3. 기본 적 인 HashPartitioner 가 org. apache. hadop. mapreduce. lib. partition. HashPartitioner. 자바 에 있 습 니 다.
package org.apache.hadoop.mapreduce.lib.partition;

import org.apache.hadoop.mapreduce.Partitioner;

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

4. 사용자 정의 Partitioner 1) 추상 류 Partitioner 를 계승 하여 사용자 정의 getPartition () 방법 2) job. setPartitionerClass () 를 통 해 사용자 정의 Partitioner 를 org. apache. hadop. mapreduce. Partitioner. java 에 설정 합 니 다.
package org.apache.hadoop.mapreduce;

/** 
 * Partitions the key space.
 * 
 * 

Partitioner controls the partitioning of the keys of the * intermediate map-outputs. The key (or a subset of the key) is used to derive * the partition, typically by a hash function. The total number of partitions * is the same as the number of reduce tasks for the job. Hence this controls * which of the m reduce tasks the intermediate key (and hence the * record) is sent for reduction.

* * @see Reducer */ public abstract class Partitioner { /** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job. * *

Typically a hash function on a all or a subset of the key.

* * @param key the key to be partioned. * @param value the entry value. * @param numPartitions the total number of partitions. * @return the partition number for the key. */ public abstract int getPartition(KEY key, VALUE value, int numPartitions); }

Partitioner 예 Partitioner 응용 상황: 수요: 각 상품 의 주간 판매 상황 site 1 의 주간 판매 목록 을 각각 통계: shoes 20 hat 10 stockings 30 clothes 40
사이트 2 의 주간 판매 목록: shoes 15 hat 1 스타킹 90 clothes 80
집계 결과: shoes 35 hat 11 스타킹 120 옷 120
코드 는 다음 과 같 습 니 다: MyMapper. java
package com.partitioner;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        String[] s = value.toString().split("\\s+") ;
        context.write(new Text(s[0]), new IntWritable(Integer.parseInt(s[1]))) ;
    }

}

MyPartitioner.java
package com.partitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<Text,IntWritable>{

    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        if(key.toString().equals("shoes")){
            return 0 ;
        }

        if(key.toString().equals("hat")){
            return 1 ;
        }

        if(key.toString().equals("stockings")){
            return 2 ;
        }

        return 3 ;      
    }

}

MyReducer.java
package com.partitioner;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable value,Context context)
            throws IOException, InterruptedException {
        int sum = 0 ;
        for(IntWritable val : value ){
            sum += val.get() ;
        }
        context.write(key, new IntWritable(sum)) ;
    }

}

TestPartitioner.java
package com.partitioner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestPartitioner {
    public static void main(String args[])throws Exception{
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount  ");
          System.exit(2);
        }

        Job job = new Job(conf, "word count");
        job.setJarByClass(TestPartitioner.class);
        job.setMapperClass(MyMapper.class);
//      job.setCombinerClass(MyCombiner.class);
        job.setReducerClass(MyReducer.class);
        job.setPartitionerClass(MyPartitioner.class) ;
        job.setNumReduceTasks(4) ;

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

좋은 웹페이지 즐겨찾기