SpringBoot 통합 Hadoop 시리즈 2--MapReduce 스타 웨이보 통계

10947 단어 빅데이터 개발

코드:

package com.hadoop.reduce.model;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 *           
 * @author linhaiy
 * @date 2019.05.18
 */
public class Weibo implements WritableComparable {
	//      
	private int friends;
	//     
	private int followers;
	//      
	private int num;

	public Weibo() {
	}

	public Weibo(int friends, int followers, int num) {
		this.friends = friends;
		this.followers = followers;
		this.num = num;
	}

	public void set(int friends, int followers, int num) {
		this.friends = friends;
		this.followers = followers;
		this.num = num;
	}

	@Override
	public int compareTo(Weibo weibo) {
		return weibo.getFriends() - this.friends;
	}

	@Override
	public void write(DataOutput output) throws IOException {
		output.writeInt(friends);
		output.writeInt(followers);
		output.writeInt(num);
	}

	@Override
	public void readFields(DataInput input) throws IOException {
		friends = input.readInt();
		followers = input.readInt();
		num = input.readInt();
	}

	public int getFriends() {
		return friends;
	}

	public void setFriends(int friends) {
		this.friends = friends;
	}

	public int getFollowers() {
		return followers;
	}

	public void setFollowers(int followers) {
		this.followers = followers;
	}

	public int getNum() {
		return num;
	}

	public void setNum(int num) {
		this.num = num;
	}
}
package com.hadoop.reduce.mapper;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.hadoop.reduce.model.Weibo;

import java.io.IOException;

/**
 *          ,  ,   
 * @author linhaiy
 * @date 2019.05.18
 */
public class WeiboMapper extends Mapper {
	/**
	 *    /java/weibo/weibo.txt   ,             24301532 200 2391               
	 *       
	 * @param key
	 * @param value
	 * @param context
	 * @throws IOException
	 * @throws InterruptedException
	 */
	@Override
	protected void map(Text key, Weibo value, Context context) throws IOException, InterruptedException {
		//      key = friends values =
		// [{"friends":22898071,"followers":11,"num":268}...]
		context.write(new Text("friends"), new Text(key.toString() + "\t" + value.getFriends()));
		context.write(new Text("followers"), new Text(key.toString() + "\t" + value.getFollowers()));
		context.write(new Text("num"), new Text(key.toString() + "\t" + value.getNum()));
	}
}
package com.hadoop.reduce.reducer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import com.hadoop.util.SortUtil;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 *       
 * @author linhaiy
 * @date 2019.05.18
 */
public class WeiboReduce extends Reducer {
	private MultipleOutputs outputs;

	@Override
	protected void setup(Context context) throws IOException, InterruptedException {
		outputs = new MultipleOutputs<>(context);
	}

	private Text text = new Text();

	/**
	 *    WeiboMapper   ,     key=friends, value=    627 ...
	 * @param key
	 * @param values
	 * @param context
	 * @throws IOException
	 * @throws InterruptedException
	 */
	@Override
	protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
		//        map 
		Map map = new HashMap<>();
		for (Text value : values) {
			String[] spilt = value.toString().split("\t");
			//      map 
			map.put(spilt[0], Integer.parseInt(spilt[1].toString()));
		}

		//  map    
		Map.Entry[] entries = SortUtil.sortHashMapByValue(map);
		// map      [{"  ":73343207},{"  ":71382446}...]
		for (Map.Entry entry : entries) {
			//      ,  : friends    73343207
			outputs.write(key.toString(), entry.getKey(), entry.getValue());
		}
	}

	@Override
	protected void cleanup(Context context) throws IOException, InterruptedException {
		outputs.close();
	}
}
package com.hadoop.reduce.service;

import java.io.IOException;

import javax.annotation.PostConstruct;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.hadoop.reduce.bean.StaffProvincePartitioner;
import com.hadoop.reduce.bean.WeiboInputFormat;
import com.hadoop.reduce.mapper.CounterMapper;
import com.hadoop.reduce.mapper.FriendsMapper;
import com.hadoop.reduce.mapper.JoinMapper;
import com.hadoop.reduce.mapper.StaffMap;
import com.hadoop.reduce.mapper.WeatherMap;
import com.hadoop.reduce.mapper.WeiboMapper;
import com.hadoop.reduce.mapper.WordCount;
import com.hadoop.reduce.mapper.WordCountMap;
import com.hadoop.reduce.model.GroupSortModel;
import com.hadoop.reduce.model.OrderInfo;
import com.hadoop.reduce.model.StaffModel;
import com.hadoop.reduce.model.Weibo;
import com.hadoop.reduce.reducer.FriendsReduce;
import com.hadoop.reduce.reducer.JoinReduce;
import com.hadoop.reduce.reducer.StaffReduce;
import com.hadoop.reduce.reducer.WeatherReduce;
import com.hadoop.reduce.reducer.WeiboReduce;
import com.hadoop.reduce.reducer.WordCountReduce;
import com.hadoop.util.GroupSort;

/**
 * Map/Reduce   
 * @author linhaiy
 * @date 2019.05.18
 */
@Component
public class ReduceJobsUtils {

	@Value("${hdfs.path}")
	private String path;

	private static String hdfsPath;

	/**
	 *   HDFS    
	 * @return
	 */
	public static Configuration getConfiguration() {
		Configuration configuration = new Configuration();
		configuration.set("fs.defaultFS", hdfsPath);
		configuration.set("mapred.job.tracker", hdfsPath);
		//    yarn     
		// configuration.set("mapreduce.framework.name", "yarn");
		//       main        mr  
		// configuration.set("yarn.resourcemanmager.hostname", "node1");
		return configuration;
	}

	/**
	 *       
	 * 
	 * @param jobName
	 * @param inputPath
	 * @param outputPath
	 * @throws IOException
	 * @throws ClassNotFoundException
	 * @throws InterruptedException
	 */
	public static void weibo(String jobName, String inputPath, String outputPath)
			throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = getConfiguration();
		Job job = Job.getInstance(conf, jobName);
		job.setJarByClass(Weibo.class);

		//   Mapper  
		job.setMapperClass(WeiboMapper.class);
		//   reduce  
		job.setReducerClass(WeiboReduce.class);

		//   Mapper   
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		//   Mapper     
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		//          
		FileInputFormat.addInputPath(job, new Path(inputPath));
		//          
		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		/**
		 *          
		 */
		job.setInputFormatClass(WeiboInputFormat.class);
		MultipleOutputs.addNamedOutput(job, "friends", org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class,
				Text.class, IntWritable.class);
		MultipleOutputs.addNamedOutput(job, "followers", org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class,
				Text.class, IntWritable.class);
		MultipleOutputs.addNamedOutput(job, "num", org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class,
				Text.class, IntWritable.class);

		//  job    ,   yarn   
		job.waitForCompletion(true);
	}

	@PostConstruct
	public void getPath() {
		hdfsPath = this.path;
	}

	public static String getHdfsPath() {
		return hdfsPath;
	}
}
package com.hadoop.reduce.service;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.springframework.stereotype.Service;
import com.hadoop.hdfs.service.HdfsService;

/**
 *     
 * @author linhaiy
 * @date 2019.05.18
 */
@Service
public class MapReduceService {

	//   reduce    
	private static final String OUTPUT_PATH = "/output";

	/**
	 *       
	 * @param jobName
	 * @param inputPath
	 * @throws Exception
	 */
	public void weibo(String jobName, String inputPath) throws Exception {
		if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
			return;
		}
		//      = output/  Job
		String outputPath = OUTPUT_PATH + "/" + jobName;
		if (HdfsService.existFile(outputPath)) {
			HdfsService.deleteFile(outputPath);
		}
		ReduceJobsUtils.weibo(jobName, inputPath, outputPath);
	}
}
package com.hadoop.reduce.controller;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.hadoop.reduce.service.MapReduceService;
import com.hadoop.util.Result;

/**
 * MapReduce     
 * @author linhaiy
 * @date 2019.05.18
 */
@RestController
@RequestMapping("/hadoop/reduce")
public class MapReduceAction {

	@Autowired
	MapReduceService mapReduceService;

	/**
	 *       
	 * @param jobName
	 * @param inputPath
	 * @return
	 * @throws Exception
	 */
	@RequestMapping(value = "weibo", method = RequestMethod.POST)
	@ResponseBody
	public Result weibo(@RequestParam("jobName") String jobName, @RequestParam("inputPath") String inputPath)
			throws Exception {
		if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
			return new Result(Result.FAILURE, "      ");
		}
		mapReduceService.weibo(jobName, inputPath);
		return new Result(Result.SUCCESS, "        ");
	}

}

좋은 웹페이지 즐겨찾기