Hadoop 프로 그래 밍 학습 5 - pageRank 알고리즘 구현
24813 단어 Hadoop
PageRank 의 디자인 방향 은 다음 과 같다.
PageRank 의 소스 코드 는 다음 과 같 습 니 다 (모두 네 가지 종류 가 있 습 니 다).
1. PageRank_초기 화: 초기 화
package org.apache.hadoop.examples;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PageRank_Initialzation {
public static class Map extends Mapper
2. PageRankIter: 반복 계산 하기 PageRank
package org.apache.hadoop.examples;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PageRankIter {
private static double d = 0.85; //
public static class Map extends Mapper<Object,Text,Text,Text>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
//value page_name \t list_page_name( , ) \t pagerank
String page[]=value.toString().split("\t");
// page
String page_name=page[0];
Text prValue = new Text();
// page
if(page.length>2)
{
//page_list
String page_list[]=page[1].split(",");
double pr = Double.parseDouble(page[2]);
// context page_list page_name \t pagerank
for(String list:page_list)
{
if (list.isEmpty()) {
continue;
}
// page pagerank ( )
prValue.set( new Text(String.valueOf(pr / page_list.length)));
context.write(new Text(list),prValue);
}
// page_name | list_page_name( , ) context
context.write(new Text(page_name), new Text("|"+page[1]));
}
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>
{
public void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException
{
String list=""; //list_page_name
double pr=0; //pagerank
// key value
for(Text val:values)
{
// value page \t page_list list
if(val.toString().startsWith("|"))
list+=val.toString().substring(1);
// , value page page page (page_name \t each_page_rank)
else
{
// page each_page_rank
pr+=Double.parseDouble(val.toString());
}
}
// pagerank list , page_name \t list_page_name( , ) \t pagerank
pr=pr*d+(1-d);
String v="";
v=String.valueOf(pr);
context.write(key, new Text(list+"\t"+v));
}
}
//main
public static void main(String[] args) throws Exception {
//
if (args.length != 2)
{
System.err.println(" ");
System.exit(2);
}
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://10.102.0.197:9000");
final String OUTPUT_PATH = args[1];
Path path = new Path(OUTPUT_PATH);
//
FileSystem fileSystem = path.getFileSystem(conf);
//
if (fileSystem.exists(new Path(OUTPUT_PATH)))
{
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
//
Job job = Job.getInstance(conf,"PageRank_Iter");
job.setJarByClass(PageRankIter.class);
job.setMapperClass(Map.class); // Map
job.setReducerClass(Reduce.class); // Reduce
job.setOutputKeyClass(Text.class); // key ,Text String
job.setOutputValueClass(Text.class); // Value ,Text String
FileInputFormat.addInputPath(job, new Path(args[0])); //FileInputFormat ( 64M) , split Mapper
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
3. PageRankViewer: 각 페이지 의 PageRank 값 을 거꾸로 정렬 합 니 다.
package org.apache.hadoop.examples;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PageRankViewer {
public static class Map extends Mapper<Object,Text,DoubleWritable,Text>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String line[] =value.toString().split("\t");
DoubleWritable pr= new DoubleWritable();
pr.set(Double.parseDouble(line[2]));
// page_rank page_name context
context.write(pr, new Text(line[0]));
}
}
// Compare
public static class DescFloatComparator extends DoubleWritable.Comparator {
public float compare(WritableComparator a, WritableComparable b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static class Reduce extends Reducer<DoubleWritable,Text,Text,Text>
{
public void reduce(DoubleWritable key, Iterable values, Context context)throws IOException, InterruptedException
{
//key pagerank ,value page_name
//out_key page_name ,out_val pagerank
String out_key="(";
String out_val="";
for(Text val:values)
{
out_key+=val.toString();
}
out_val=String.format("%.10f", key.get())+")";
context.write(new Text(out_key),new Text(out_val));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//
if (args.length != 2)
{
System.err.println(" ");
System.exit(2);
}
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://10.102.0.197:9000");
conf.set("mapred.textoutputformat.ignoreseparator", "true");
conf.set("mapred.textoutputformat.separator", ",");
final String OUTPUT_PATH = args[1];
Path path = new Path(OUTPUT_PATH);
//
FileSystem fileSystem = path.getFileSystem(conf);
//
if (fileSystem.exists(new Path(OUTPUT_PATH)))
{
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
//
Job job = Job.getInstance(conf,"PageRankViewer");
job.setJarByClass(PageRankViewer.class);
job.setMapperClass(Map.class); // Map
job.setReducerClass(Reduce.class); // Reduce
job.setSortComparatorClass(DescFloatComparator.class);
job.setMapOutputKeyClass(DoubleWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); // key ,Text String
job.setOutputValueClass(Text.class); // Value ,Text String
FileInputFormat.addInputPath(job, new Path(args[0])); //FileInputFormat ( 64M) , split Mapper
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
4. PageRankDriver: 드라이브 클래스, 이 클래스 에서 PageRank 세 단 계 를 실행 하 는 main 방법
package org.apache.hadoop.examples;
public class PageRankDriver {
//main
public static void main(String[] args) throws Exception
{
//
String[] otherArgs = new String[]{"/Experiment_3","Experiment_3_Hadoop"};
if (otherArgs.length != 2)
{
System.err.println(" ");
System.exit(2);
}
//PageRank_Initialzation
String temp="temp";
String[] PR_Ini = { otherArgs[0], temp+"0"};
PageRank_Initialzation.main(PR_Ini);
//PageRankIter
String[] temp_PRIter_args = { "", "" };
int times = 10;
for (int i = 0; i < times; i++)
{
temp_PRIter_args[0] = temp + i;
temp_PRIter_args[1] = temp + (i + 1);
PageRankIter.main(temp_PRIter_args);
}
//PageRankViewer
String[] final_PR = { "temp10", otherArgs[1] };
PageRankViewer.main(final_PR);
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Hadoop 클러스터의 JobHistory Server 상세 정보역사 서버를 통해 이미 실행된 Mapreduce 작업 기록을 볼 수 있습니다. 이렇게 하면 우리는 해당 기계의 19888 포트에서 역사 서버의 WEB UI 인터페이스를 열 수 있다.이미 실행된 작업 상황을 볼 수 있...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.