SpringBoot 통합 Hadoop 시리즈 2--MapReduce 스타 웨이보 통계
10947 단어 빅데이터 개발
코드:
package com.hadoop.reduce.model;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
*
* @author linhaiy
* @date 2019.05.18
*/
public class Weibo implements WritableComparable {
//
private int friends;
//
private int followers;
//
private int num;
public Weibo() {
}
public Weibo(int friends, int followers, int num) {
this.friends = friends;
this.followers = followers;
this.num = num;
}
public void set(int friends, int followers, int num) {
this.friends = friends;
this.followers = followers;
this.num = num;
}
@Override
public int compareTo(Weibo weibo) {
return weibo.getFriends() - this.friends;
}
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(friends);
output.writeInt(followers);
output.writeInt(num);
}
@Override
public void readFields(DataInput input) throws IOException {
friends = input.readInt();
followers = input.readInt();
num = input.readInt();
}
public int getFriends() {
return friends;
}
public void setFriends(int friends) {
this.friends = friends;
}
public int getFollowers() {
return followers;
}
public void setFollowers(int followers) {
this.followers = followers;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}
package com.hadoop.reduce.mapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.hadoop.reduce.model.Weibo;
import java.io.IOException;
/**
* , ,
* @author linhaiy
* @date 2019.05.18
*/
public class WeiboMapper extends Mapper {
/**
* /java/weibo/weibo.txt , 24301532 200 2391
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(Text key, Weibo value, Context context) throws IOException, InterruptedException {
// key = friends values =
// [{"friends":22898071,"followers":11,"num":268}...]
context.write(new Text("friends"), new Text(key.toString() + "\t" + value.getFriends()));
context.write(new Text("followers"), new Text(key.toString() + "\t" + value.getFollowers()));
context.write(new Text("num"), new Text(key.toString() + "\t" + value.getNum()));
}
}
package com.hadoop.reduce.reducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import com.hadoop.util.SortUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*
* @author linhaiy
* @date 2019.05.18
*/
public class WeiboReduce extends Reducer {
private MultipleOutputs outputs;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
outputs = new MultipleOutputs<>(context);
}
private Text text = new Text();
/**
* WeiboMapper , key=friends, value= 627 ...
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// map
Map map = new HashMap<>();
for (Text value : values) {
String[] spilt = value.toString().split("\t");
// map
map.put(spilt[0], Integer.parseInt(spilt[1].toString()));
}
// map
Map.Entry[] entries = SortUtil.sortHashMapByValue(map);
// map [{" ":73343207},{" ":71382446}...]
for (Map.Entry entry : entries) {
// , : friends 73343207
outputs.write(key.toString(), entry.getKey(), entry.getValue());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
outputs.close();
}
}
package com.hadoop.reduce.service;
import java.io.IOException;
import javax.annotation.PostConstruct;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.hadoop.reduce.bean.StaffProvincePartitioner;
import com.hadoop.reduce.bean.WeiboInputFormat;
import com.hadoop.reduce.mapper.CounterMapper;
import com.hadoop.reduce.mapper.FriendsMapper;
import com.hadoop.reduce.mapper.JoinMapper;
import com.hadoop.reduce.mapper.StaffMap;
import com.hadoop.reduce.mapper.WeatherMap;
import com.hadoop.reduce.mapper.WeiboMapper;
import com.hadoop.reduce.mapper.WordCount;
import com.hadoop.reduce.mapper.WordCountMap;
import com.hadoop.reduce.model.GroupSortModel;
import com.hadoop.reduce.model.OrderInfo;
import com.hadoop.reduce.model.StaffModel;
import com.hadoop.reduce.model.Weibo;
import com.hadoop.reduce.reducer.FriendsReduce;
import com.hadoop.reduce.reducer.JoinReduce;
import com.hadoop.reduce.reducer.StaffReduce;
import com.hadoop.reduce.reducer.WeatherReduce;
import com.hadoop.reduce.reducer.WeiboReduce;
import com.hadoop.reduce.reducer.WordCountReduce;
import com.hadoop.util.GroupSort;
/**
* Map/Reduce
* @author linhaiy
* @date 2019.05.18
*/
@Component
public class ReduceJobsUtils {
@Value("${hdfs.path}")
private String path;
private static String hdfsPath;
/**
* HDFS
* @return
*/
public static Configuration getConfiguration() {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", hdfsPath);
configuration.set("mapred.job.tracker", hdfsPath);
// yarn
// configuration.set("mapreduce.framework.name", "yarn");
// main mr
// configuration.set("yarn.resourcemanmager.hostname", "node1");
return configuration;
}
/**
*
*
* @param jobName
* @param inputPath
* @param outputPath
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void weibo(String jobName, String inputPath, String outputPath)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = getConfiguration();
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(Weibo.class);
// Mapper
job.setMapperClass(WeiboMapper.class);
// reduce
job.setReducerClass(WeiboReduce.class);
// Mapper
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// Mapper
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//
FileInputFormat.addInputPath(job, new Path(inputPath));
//
FileOutputFormat.setOutputPath(job, new Path(outputPath));
/**
*
*/
job.setInputFormatClass(WeiboInputFormat.class);
MultipleOutputs.addNamedOutput(job, "friends", org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class,
Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "followers", org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class,
Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "num", org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class,
Text.class, IntWritable.class);
// job , yarn
job.waitForCompletion(true);
}
@PostConstruct
public void getPath() {
hdfsPath = this.path;
}
public static String getHdfsPath() {
return hdfsPath;
}
}
package com.hadoop.reduce.service;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.springframework.stereotype.Service;
import com.hadoop.hdfs.service.HdfsService;
/**
*
* @author linhaiy
* @date 2019.05.18
*/
@Service
public class MapReduceService {
// reduce
private static final String OUTPUT_PATH = "/output";
/**
*
* @param jobName
* @param inputPath
* @throws Exception
*/
public void weibo(String jobName, String inputPath) throws Exception {
if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
return;
}
// = output/ Job
String outputPath = OUTPUT_PATH + "/" + jobName;
if (HdfsService.existFile(outputPath)) {
HdfsService.deleteFile(outputPath);
}
ReduceJobsUtils.weibo(jobName, inputPath, outputPath);
}
}
package com.hadoop.reduce.controller;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.hadoop.reduce.service.MapReduceService;
import com.hadoop.util.Result;
/**
* MapReduce
* @author linhaiy
* @date 2019.05.18
*/
@RestController
@RequestMapping("/hadoop/reduce")
public class MapReduceAction {
@Autowired
MapReduceService mapReduceService;
/**
*
* @param jobName
* @param inputPath
* @return
* @throws Exception
*/
@RequestMapping(value = "weibo", method = RequestMethod.POST)
@ResponseBody
public Result weibo(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath)
throws Exception {
if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
return new Result(Result.FAILURE, " ");
}
mapReduceService.weibo(jobName, inputPath);
return new Result(Result.SUCCESS, " ");
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
SpringBoot 통합 Hadoop 시리즈 2--MapReduce 스타 웨이보 통계텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.