빅 데이터 - Hadop 학습 노트 09
7624 단어 빅 데이터 학습
mapreduce :map reduce 。 k-v , 。
map NCDC 。 , 。
1. MR
【 mapper】
public class MyMaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING = 9999;
/**
* mapper
*/
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
//value String
String line = value.toString();
//
String year = line.substring(15, 19);
//
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
//
String quality = line.substring(92, 93);
//
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
【 Reducer 생 성 】
public class MyMaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable values,
Reducer.Context context) throws IOException, InterruptedException {
//
int maxValue = Integer.MIN_VALUE;
//
for(IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
//
context.write(key, new IntWritable(maxValue));
}
}
[앱 실행 작업 생 성]
public class MyMaxTempApp {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage: MaxTemperature );
System.exit(1);
}
Job job = Job.getInstance();
job.setJarByClass(MyMaxTempApp.class);
//
job.setJobName("Max temp");
//
FileInputFormat.addInputPath(job, new Path(args[0]));
//
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// mapper
job.setMapperClass(MyMaxTempMapper.class);
// reducer
job.setReducerClass(MyMaxTempReducer.class);
// key
job.setOutputKeyClass(Text.class);
// value
job.setOutputValueClass(IntWritable.class);
// job
System.out.println(job.waitForCompletion(true) ? 0 : 1);
}
}
31. Job 제출 과정 분석
【 】
map( ) + reduce( )
【 】
1.job.waitForCompletion()
2.submit() cluster,
a)ensureState(JobState.DEFINE)
b)setUseNewAPI() API
c)connect()
d) JobSubmitter
3.submitter.submitJobInternal(Job.this, cluster)
a)checkSpecs(job) ,
b)JobSubmissionFiles.getStagingDir() hdfs
c)InetAddress.getLocalHost() ip
d)submitClient.getNewJobID() id
e)copyAndConfigureFiles() conf
f)writeSplits(job, submitJobDir)
g)conf.setInt(MRJobConfig.NUM_MAPS, maps) map
h)writeConf(conf, submitJobFile) job.xml
i)submitClient.submitJob()
4.submitClient.submitJob()
a)Job job = new Job() LocalJobRunner.Job
5.Job job = new Job()
a) job.xml JobConf
b)this.start() , run()
6.this.start()
a)TaskSplitMetaInfo[] task
b)getMapTaskRunnables() mapper runnable
c)runTasks(mapRunnables, mapService, "map")
d)getReduceTaskRunnables() reduce
e)runTasks(reduceRunnables, reduceService, "reduce")
7.runTasks()
for (Runnable r : runnables) {
service.submit(r);
}
8.LocalJobRunner$Job$MapTaskRunnable
a) MapAttempId
b) MapTask
c) MaoOutFile
d)map.setXXX()
e)map.run()
f)
9.org.apache.hadoop.mapred.MapTask$run()
a)runNewMapper()
10.runNewMapper
a) taskContext
b)taskContext.getMapperClass() Mapper
c) InputFormat
d) split
e) NewOutputCollector context
11.mapper.run(mapperContext)
12.MyMaxTempMapper$run()
setup(context);
try {
while (context.nextKeyValue()) {
map(...)
}
} finally {
cleanup()
}
【 】
,
hadoop jar jarFile classname arg1 arg2 ..
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Cloudera 가상 컴퓨터 입문Apache Hbase: 확장 가능 하고 분포 식 이 며 열 을 향 한 데이터 저장 소 입 니 다.Apache HBase 는 HDFS 에서 위탁 관리 하 는 대형 데이터 세트 에 대한 실시 간 읽 기 / 쓰기 무 작...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.