Hadoop 프로 그래 밍 학습 5 - pageRank 알고리즘 구현

24813 단어 Hadoop
PageRank 웹 페이지 순위 알고리즘 은 구 글 이 부자 가 되 는 보물 이 었 습 니 다.특정 웹 페이지 가 검색엔진 색인 에 있 는 다른 웹 페이지 에 비해 중요 도 를 평가 하 는 데 사용 된다.
PageRank 의 디자인 방향 은 다음 과 같다.
  • 초기 화 과정: 원본 문서 의 모든 페이지 줄 끝 에 1.0 을 추가 하여 나타 내 는 PageRank 값 을 1
  • 로 초기 화 합 니 다.
  • 교체 계산 과정: Map 방법 으로 처리 한 후 각 줄 에 저 장 된 데이터 형식 은 pagename \t list_page_name (사용, 분리) \ t pagerank, Map 에서 각 페이지 의 체인 을 나 누 는 rank 값 을 반복 적 으로 계산 합 니 다. Reduce 에서 같은 page 가 Map 에서 얻 은 모든 rank 값 을 더 하면 최종 PageRank
  • 를 얻 을 수 있 습 니 다.
  • 최종 정렬 및 결과 획득 과정: 상기 두 과정 을 거 친 후에 얻 은 것 은 여전히 pagename \t list_page_name (사용, 분리) \ t pagerank 형식의 파일 입 니 다. 중간 list 가 필요 없습니다.page_name, 따라서 이 과정 에서 중간 부분 을 제거 하고 PageRank 값 으로 정렬 합 니 다.

  • PageRank 의 소스 코드 는 다음 과 같 습 니 다 (모두 네 가지 종류 가 있 습 니 다).
    1. PageRank_초기 화: 초기 화
    package org.apache.hadoop.examples;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class PageRank_Initialzation {
    
        public static class Map extends Mapper
        {
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
            {
                String pr="1.0";  //   PageRank 
                context.write(value, new Text(pr));
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            //        
            if (args.length != 2) 
            {
                System.err.println("    ");
                System.exit(2);
            }
    
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://10.102.0.197:9000");
    
            final String OUTPUT_PATH = args[1];  
            Path path = new Path(OUTPUT_PATH);  
    
            //      
            FileSystem fileSystem = path.getFileSystem(conf);
    
            //          
            if (fileSystem.exists(new Path(OUTPUT_PATH))) 
            {  
               fileSystem.delete(new Path(OUTPUT_PATH),true);  
            }  
    
            //     
            Job job = Job.getInstance(conf,"PageRank_Initialzation");
            job.setJarByClass(PageRank_Initialzation.class);
            job.setMapperClass(Map.class);  //       Map 
            job.setOutputKeyClass(Text.class);  //     key   ,Text   String 
            job.setOutputValueClass(Text.class);  //     Value   ,Text   String 
    
            FileInputFormat.addInputPath(job, new Path(args[0]));  //FileInputFormat       (   64M)      ,  split      Mapper  
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.waitForCompletion(true);
    
        }
    
    }
    

    2. PageRankIter: 반복 계산 하기 PageRank
    package org.apache.hadoop.examples;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class PageRankIter {
    
        private static double d = 0.85;  //     
    
        public static class Map extends Mapper<Object,Text,Text,Text>
        {
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
            {
                //value     page_name \t list_page_name( ,  ) \t pagerank
                String page[]=value.toString().split("\t"); 
    
                // page
                String page_name=page[0];
    
                Text prValue = new Text();
    
                //   page   
                if(page.length>2)
                {
                    //page_list       
                    String page_list[]=page[1].split(",");
    
                    double pr = Double.parseDouble(page[2]);
    
                    //  context     page_list  page_name \t pagerank
                    for(String list:page_list)
                    {
                        if (list.isEmpty()) {
                            continue;
                        }
    
                        //   page       pagerank   (   )
                        prValue.set( new Text(String.valueOf(pr / page_list.length)));
    
                        context.write(new Text(list),prValue);
                    }
    
                    //  page_name  | list_page_name( ,  )   context
                    context.write(new Text(page_name), new Text("|"+page[1]));
                }
            }
        }
    
        public static class Reduce extends Reducer<Text,Text,Text,Text>
        {
            public void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException 
            {
                 String list="";  //list_page_name
                 double pr=0;  //pagerank
    
                 //    key   value
                 for(Text val:values)
                 {
                     //   value     page \t page_list     list
                     if(val.toString().startsWith("|"))
                         list+=val.toString().substring(1);
    
                     //  , value     page   page  page       (page_name \t each_page_rank)
                     else
                     {
                         //      page    each_page_rank   
                         pr+=Double.parseDouble(val.toString());
                     }
                 }
    
                 //         pagerank  list       ,       page_name \t list_page_name( ,  ) \t pagerank
                 pr=pr*d+(1-d);
                 String v="";
    
                 v=String.valueOf(pr);
    
                 context.write(key, new Text(list+"\t"+v));
            }
        }
    
    
        //main  
        public static void main(String[] args) throws Exception {
    
            //        
            if (args.length != 2) 
            {
                System.err.println("    ");
                System.exit(2);
            }
    
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://10.102.0.197:9000");
    
            final String OUTPUT_PATH = args[1];  
            Path path = new Path(OUTPUT_PATH);  
    
            //      
            FileSystem fileSystem = path.getFileSystem(conf);
    
            //          
            if (fileSystem.exists(new Path(OUTPUT_PATH))) 
            {  
               fileSystem.delete(new Path(OUTPUT_PATH),true);  
            }  
    
            //     
            Job job = Job.getInstance(conf,"PageRank_Iter");
            job.setJarByClass(PageRankIter.class);
            job.setMapperClass(Map.class);  //       Map 
            job.setReducerClass(Reduce.class);  //       Reduce 
            job.setOutputKeyClass(Text.class);  //     key   ,Text   String 
            job.setOutputValueClass(Text.class);  //     Value   ,Text   String 
    
            FileInputFormat.addInputPath(job, new Path(args[0]));  //FileInputFormat       (   64M)      ,  split      Mapper  
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.waitForCompletion(true);
    
        }
    
    }
    

    3. PageRankViewer: 각 페이지 의 PageRank 값 을 거꾸로 정렬 합 니 다.
    package org.apache.hadoop.examples;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class PageRankViewer {
    
        public static class Map extends Mapper<Object,Text,DoubleWritable,Text>
        {
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
            {
                String line[] =value.toString().split("\t");
    
                DoubleWritable pr= new DoubleWritable();
                pr.set(Double.parseDouble(line[2]));
    
                // page_rank page_name  context
                context.write(pr, new Text(line[0]));
            }
        }
    
        //  Compare       
        public static class DescFloatComparator extends DoubleWritable.Comparator {
    
            public float compare(WritableComparator a, WritableComparable b) {
                return -super.compare(a, b);
            }
    
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                return -super.compare(b1, s1, l1, b2, s2, l2);
            }
        }
    
        public static class Reduce extends Reducer<DoubleWritable,Text,Text,Text>
        {
            public void reduce(DoubleWritable key, Iterable values, Context context)throws IOException, InterruptedException 
            {
                //key pagerank  ,value page_name
    
                //out_key  page_name  ,out_val pagerank
                String out_key="(";
                String out_val="";
    
                for(Text val:values)
                {
                    out_key+=val.toString();
                }
    
                out_val=String.format("%.10f", key.get())+")";
    
                context.write(new Text(out_key),new Text(out_val));
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            //        
            if (args.length != 2) 
            {
                System.err.println("    ");
                System.exit(2);
            }
    
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://10.102.0.197:9000");
            conf.set("mapred.textoutputformat.ignoreseparator", "true");  
            conf.set("mapred.textoutputformat.separator", ",");  
    
            final String OUTPUT_PATH = args[1];  
            Path path = new Path(OUTPUT_PATH);  
            //      
            FileSystem fileSystem = path.getFileSystem(conf);
    
            //          
            if (fileSystem.exists(new Path(OUTPUT_PATH))) 
            {  
               fileSystem.delete(new Path(OUTPUT_PATH),true);  
            }  
    
            //     
            Job job = Job.getInstance(conf,"PageRankViewer");
            job.setJarByClass(PageRankViewer.class);
            job.setMapperClass(Map.class);  //       Map 
            job.setReducerClass(Reduce.class);  //       Reduce 
            job.setSortComparatorClass(DescFloatComparator.class);
    
            job.setMapOutputKeyClass(DoubleWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);  //     key   ,Text   String 
            job.setOutputValueClass(Text.class);  //     Value   ,Text   String 
    
            FileInputFormat.addInputPath(job, new Path(args[0]));  //FileInputFormat       (   64M)      ,  split      Mapper  
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.waitForCompletion(true);
    
        }
    
    }
    

    4. PageRankDriver: 드라이브 클래스, 이 클래스 에서 PageRank 세 단 계 를 실행 하 는 main 방법
    package org.apache.hadoop.examples;
    
    public class PageRankDriver {
    
        //main  
        public static void main(String[] args) throws Exception 
        {
    
            //        
            String[] otherArgs = new String[]{"/Experiment_3","Experiment_3_Hadoop"};
            if (otherArgs.length != 2) 
            {
                System.err.println("    ");
                System.exit(2);
            }
    
            //PageRank_Initialzation
            String temp="temp";
            String[] PR_Ini = { otherArgs[0], temp+"0"};
            PageRank_Initialzation.main(PR_Ini);
    
            //PageRankIter
            String[] temp_PRIter_args = { "", "" };
            int times = 10;
    
            for (int i = 0; i < times; i++) 
            {
                temp_PRIter_args[0] = temp + i;
                temp_PRIter_args[1] = temp + (i + 1);
                PageRankIter.main(temp_PRIter_args);
            }       
    
            //PageRankViewer
            String[] final_PR = { "temp10", otherArgs[1] };
            PageRankViewer.main(final_PR);
    
    
    
        }
    
    }
    

    좋은 웹페이지 즐겨찾기