hadop 테스트 KMeans (1): 원본 인 스 턴 스 실행

새로 배 운 hadop, hello word 급 프로그램 WordCount 를 테스트 했 습 니 다. hadop 으로 집합 분석 을 할 계획 입 니 다. 이번 테스트 는 KMeans, 2 차원 데이터, 구체 적 인 코드 는http://download.csdn.net/detail/tinycui/4384750#comment댓 글 에서 여러분 들 이 문서 에 대해 비교적 적 기 때문에 제 테스트 과정 (위 분포 식) 을 상세 하 게 소개 하여 참고 하 시기 바 랍 니 다.동시에 tinycui 가 제공 한 소스 코드 다운로드 에 감 사 드 립 니 다.
Step1:  eclipse 와 hadop 을 설정 하고 구체 적 으로 인터넷 작업 을 참고 할 수 있 습 니 다.
Step2:  Project -- > Map / Reduce Project 프로젝트 를 새로 만 듭 니 다. KMans 라 고 명명 되 었 습 니 다. Map / Reduce Project 프로젝트 를 선택 하 십시오. 그렇지 않 으 면 코드 를 추가 하면 import 오류 가 발생 할 수 있 습 니 다.
Step 3: tinycui 가 제공 하 는 사이트 주소 로 KMeans 의 소스 코드 를 다운로드 하고 src 와 bin 파일 을 새로 만 든 프로젝트 에 덮어 쓰 고 eclipse 에서 KMeans 프로젝트 를 새로 고 칩 니 다.
Step 4: DFS 에 두 개의 폴 더 center, cluster 를 새로 만 들 고 center 폴 더 에 빈 파일 center 를 업로드 하여 매번 교체 되 는 center 값 을 저장 하고 cluster 폴 더 에 cluster 파일 을 업로드 합 니 다. 이 파일 은 입력 데이터 입 니 다. 데이터 형식 은 (20, 30) (50, 61) (20, 32) (50, 64) (59, 67) (24, 34) (19, 39) (20, 32) (50, 65) (50, 77) (20, 30) (20, 31) (20, 32) (20, 32) 입 니 다.(50,64) (50,67) 
Step 5: main 의 입력 매개 변 수 를 설정 합 니 다. Run -- > Run Configurations 의 Arguments 에 main 의 세 가지 매개 변 수 를 설정 합 니 다. 입력 경로, KMeans 의 중심 경 로 를 저장 하고 출력 경로, 중간 빈 칸 을 분리 합 니 다.
각각
hdfs://192.168.56.171:9000/cluster 
hdfs://192.168.56.171:9000/center 
hdfs://192.168.56.171:9000/ouput
이곳 의 IP 는 자신의 IP 주소 나 localhost 를 입력 할 수 있 습 니 다.
Step 6: 부분 설정 코드 를 수정 하고 구체 적 인 코드 는 다음 과 같이 참고 할 수 있 습 니 다.
주 프로그램 KMeans. java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class KMeans {
	
	public static void main(String[] args) throws Exception
	{
		CenterInitial centerInitial = new CenterInitial();
		centerInitial.run(args);
		int times=0;
		double s = 0,shold = 0.0001;
		do {
			Configuration conf = new Configuration();
			conf.set("fs.default.name", "hdfs://192.168.56.171:9000");
			Job job = new Job(conf,"KMeans");
			job.setJarByClass(KMeans.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			job.setMapperClass(KMapper.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);
			job.setReducerClass(KReducer.class);
			FileSystem fs = FileSystem.get(conf);
			fs.delete(new Path(args[2]),true);
			FileInputFormat.addInputPath(job, new Path(args[0]));
			FileOutputFormat.setOutputPath(job, new Path(args[2]));
			job.waitForCompletion(true);
			if(job.waitForCompletion(true))
			{
				NewCenter newCenter = new NewCenter();
				s = newCenter.run(args);
				times++;
			}
		} while(s > shold);
		System.out.println("Iterator: " + times);		
	}

}

센터 초기 화 CenterInitial. java
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;


public class CenterInitial {
	
	
	public void run(String[] args) throws IOException
	{
		String[] clist;
		int k = 5;
		String string = "";
		String inpath = args[0]+"/cluster";  //cluster
		String outpath = args[1]+"/center";  //center
		Configuration conf1 = new Configuration(); //  hadoop       
		conf1.set("hadoop.job.ugi", "hadoop,hadoop"); 
		FileSystem fs = FileSystem.get(URI.create(inpath),conf1); //FileSystem     HDFS    ,   URI   HDFS     
		FSDataInputStream in = null; 
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		try{ 
         
			in = fs.open( new Path(inpath) ); 
			IOUtils.copyBytes(in,out,50,false);  // Hadoop IOUtils                         
			clist = out.toString().split(" ");
			} finally { 
				IOUtils.closeStream(in);
			}
		
		FileSystem filesystem = FileSystem.get(URI.create(outpath), conf1); 
		
		for(int i=0;i<k;i++)
		{
			int j=(int) (Math.random()*100) % clist.length;
			if(string.contains(clist[j]))  // choose the same one
			{
				k++;
				continue;
			}
			string = string + clist[j].replace(" ", "") + " ";
		}
		OutputStream out2 = filesystem.create(new Path(outpath) ); 
		IOUtils.copyBytes(new ByteArrayInputStream(string.getBytes()), out2, 4096,true); //write string
		System.out.println(string);
	}

}

KMapper.java
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class KMapper extends Mapper<LongWritable, Text, Text, Text> {
	
	private String[] center;
	
	protected void setup(Context context) throws IOException,InterruptedException  //read centerlist, and save to center[]
	{
		String centerlist = "hdfs://192.168.56.171:9000/center/center"; //center  
    	Configuration conf1 = new Configuration(); 
    	conf1.set("hadoop.job.ugi", "hadoop-user,hadoop-user"); 
       FileSystem fs = FileSystem.get(URI.create(centerlist),conf1); 
       FSDataInputStream in = null; 
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       try{ 
             
           in = fs.open( new Path(centerlist) ); 
           IOUtils.copyBytes(in,out,100,false);  
           center = out.toString().split(" ");
           }finally{ 
                IOUtils.closeStream(in);
            }
	}
	
	public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
	{
		StringTokenizer itr = new StringTokenizer(value.toString());
		while(itr.hasMoreTokens())
		{
			String outValue = new String(itr.nextToken());
			String[] list = outValue.replace("(", "").replace(")", "").split(",");
			String[] c = center[0].replace("(", "").replace(")", "").split(",");
			float min = 0;
			int pos = 0;
			for(int i=0;i<list.length;i++)
			{
				min += (float) Math.pow((Float.parseFloat(list[i]) - Float.parseFloat(c[i])),2);
			}
			for(int i=0;i<center.length;i++)
			{
				String[] centerStrings = center[i].replace("(", "").replace(")", "").split(",");
				float distance = 0;
				for(int j=0;j<list.length;j++)
					distance += (float) Math.pow((Float.parseFloat(list[j]) - Float.parseFloat(centerStrings[j])),2);
				if(min>distance)
				{
					min=distance;
					pos=i;
				}
			}
			context.write(new Text(center[pos]), new Text(outValue));
		}
	}

}

KReducer.java
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class KReducer extends Reducer<Text, Text, Text, Text> {
	
	
	public void reduce(Text key,Iterable<Text> value,Context context) throws IOException,InterruptedException
	{
		String outVal = "";
		int count=0;
		String center="";
		int length = key.toString().replace("(", "").replace(")", "").replace(":", "").split(",").length;
		float[] ave = new float[Float.SIZE*length];
		for(int i=0;i<length;i++)
			ave[i]=0; 
		for(Text val:value)
		{
			outVal += val.toString()+" ";
			String[] tmp = val.toString().replace("(", "").replace(")", "").split(",");
			for(int i=0;i<tmp.length;i++)
				ave[i] += Float.parseFloat(tmp[i]);
			count ++;
		}
		for(int i=0;i<length;i++)
		{
			ave[i]=ave[i]/count;
			if(i==0)
				center += "("+ave[i]+",";
			else {
				if(i==length-1)
					center += ave[i]+")";
				else {
					center += ave[i]+",";
				}
			}
		}
		System.out.println(center);
		context.write(key, new Text(outVal+center));
	}

}

NewCenter.java
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;


public class NewCenter {
	
	int k = 3;
	float shold=Integer.MIN_VALUE;
	String[] line;
	String newcenter = new String("");
	
	public float run(String[] args) throws IOException,InterruptedException
	{
		Configuration conf = new Configuration();
		conf.set("hadoop.job.ugi", "hadoop,hadoop"); 
		FileSystem fs = FileSystem.get(URI.create(args[2]+"/part-r-00000"),conf);
		FSDataInputStream in = null;
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		try{ 
	         
			in = fs.open( new Path(args[2]+"/part-r-00000")); 
			IOUtils.copyBytes(in,out,50,false);
			line = out.toString().split("
"); } finally { IOUtils.closeStream(in); } System.out.println(out.toString()); for(int i=0;i<k;i++) { String[] l = line[i].replace("\t", " ").split(" "); String[] startCenter = l[0].replace("(", "").replace(")", "").split(","); String[] finalCenter = l[l.length-1].replace("(", "").replace(")", "").split(","); float tmp = 0; for(int j=0;j<startCenter.length;j++) tmp += Math.pow(Float.parseFloat(startCenter[j])-Float.parseFloat(finalCenter[j]), 2); newcenter = newcenter + l[l.length - 1].replace("\t", "") + " "; if(shold <= tmp) shold = tmp; } OutputStream out2 = fs.create(new Path(args[1]+"/center") ); IOUtils.copyBytes(new ByteArrayInputStream(newcenter.getBytes()), out2, 4096,true); System.out.println(newcenter); return shold; } }

출력:
13/05/24 11:20:29 INFO mapred.Task: Task:attempt_local_0004_r_000000_0 is done. And is in the process of commiting
13/05/24 11:20:29 INFO mapred.LocalJobRunner: 
13/05/24 11:20:29 INFO mapred.Task: Task attempt_local_0004_r_000000_0 is allowed to commit now
13/05/24 11:20:29 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0004_r_000000_0' to hdfs://192.168.56.171:9000/ouput
13/05/24 11:20:30 INFO mapred.JobClient:  map 100% reduce 0%
13/05/24 11:20:32 INFO mapred.LocalJobRunner: reduce > reduce
13/05/24 11:20:32 INFO mapred.Task: Task 'attempt_local_0004_r_000000_0' done.
13/05/24 11:20:33 INFO mapred.JobClient:  map 100% reduce 100%
13/05/24 11:20:33 INFO mapred.JobClient: Job complete: job_local_0004
13/05/24 11:20:33 INFO mapred.JobClient: Counters: 22
13/05/24 11:20:33 INFO mapred.JobClient:   File Output Format Counters 
13/05/24 11:20:33 INFO mapred.JobClient:     Bytes Written=230
13/05/24 11:20:33 INFO mapred.JobClient:   FileSystemCounters
13/05/24 11:20:33 INFO mapred.JobClient:     FILE_BYTES_READ=3843
13/05/24 11:20:33 INFO mapred.JobClient:     HDFS_BYTES_READ=2896
13/05/24 11:20:33 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=326968
13/05/24 11:20:33 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=1916
13/05/24 11:20:33 INFO mapred.JobClient:   File Input Format Counters 
13/05/24 11:20:33 INFO mapred.JobClient:     Bytes Read=121
13/05/24 11:20:33 INFO mapred.JobClient:   Map-Reduce Framework
13/05/24 11:20:33 INFO mapred.JobClient:     Map output materialized bytes=469
13/05/24 11:20:33 INFO mapred.JobClient:     Map input records=1
13/05/24 11:20:33 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/05/24 11:20:33 INFO mapred.JobClient:     Spilled Records=30
13/05/24 11:20:33 INFO mapred.JobClient:     Map output bytes=433
13/05/24 11:20:33 INFO mapred.JobClient:     Total committed heap usage (bytes)=352845824
13/05/24 11:20:33 INFO mapred.JobClient:     CPU time spent (ms)=0
13/05/24 11:20:33 INFO mapred.JobClient:     SPLIT_RAW_BYTES=107
13/05/24 11:20:33 INFO mapred.JobClient:     Combine input records=0
13/05/24 11:20:33 INFO mapred.JobClient:     Reduce input records=15
13/05/24 11:20:33 INFO mapred.JobClient:     Reduce input groups=3
13/05/24 11:20:33 INFO mapred.JobClient:     Combine output records=0
13/05/24 11:20:33 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/05/24 11:20:33 INFO mapred.JobClient:     Reduce output records=3
13/05/24 11:20:33 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/05/24 11:20:33 INFO mapred.JobClient:     Map output records=15
13/05/24 11:20:33 INFO mapred.JobClient: Running job: job_local_0004
13/05/24 11:20:33 INFO mapred.JobClient: Job complete: job_local_0004
13/05/24 11:20:33 INFO mapred.JobClient: Counters: 22
13/05/24 11:20:33 INFO mapred.JobClient:   File Output Format Counters 
13/05/24 11:20:33 INFO mapred.JobClient:     Bytes Written=230
13/05/24 11:20:33 INFO mapred.JobClient:   FileSystemCounters
13/05/24 11:20:33 INFO mapred.JobClient:     FILE_BYTES_READ=3843
13/05/24 11:20:33 INFO mapred.JobClient:     HDFS_BYTES_READ=2896
13/05/24 11:20:33 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=326968
13/05/24 11:20:33 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=1916
13/05/24 11:20:33 INFO mapred.JobClient:   File Input Format Counters 
13/05/24 11:20:33 INFO mapred.JobClient:     Bytes Read=121
13/05/24 11:20:33 INFO mapred.JobClient:   Map-Reduce Framework
13/05/24 11:20:33 INFO mapred.JobClient:     Map output materialized bytes=469
13/05/24 11:20:33 INFO mapred.JobClient:     Map input records=1
13/05/24 11:20:33 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/05/24 11:20:33 INFO mapred.JobClient:     Spilled Records=30
13/05/24 11:20:33 INFO mapred.JobClient:     Map output bytes=433
13/05/24 11:20:33 INFO mapred.JobClient:     Total committed heap usage (bytes)=352845824
13/05/24 11:20:33 INFO mapred.JobClient:     CPU time spent (ms)=0
13/05/24 11:20:33 INFO mapred.JobClient:     SPLIT_RAW_BYTES=107
13/05/24 11:20:33 INFO mapred.JobClient:     Combine input records=0
13/05/24 11:20:33 INFO mapred.JobClient:     Reduce input records=15
13/05/24 11:20:33 INFO mapred.JobClient:     Reduce input groups=3
13/05/24 11:20:33 INFO mapred.JobClient:     Combine output records=0
13/05/24 11:20:33 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/05/24 11:20:33 INFO mapred.JobClient:     Reduce output records=3
13/05/24 11:20:33 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/05/24 11:20:33 INFO mapred.JobClient:     Map output records=15
(19.0,39.0)	(19,39) (19.0,39.0)
(20.571428,31.571428)	(20,30) (20,32) (24,34) (20,32) (20,30) (20,31) (20,32) (20.571428,31.571428)
(51.285713,66.42857)	(50,65) (50,77) (50,64) (59,67) (50,67) (50,61) (50,64) (51.285713,66.42857)

(19.0,39.0) (20.571428,31.571428) (51.285713,66.42857) 
Iterator: 4

좋은 웹페이지 즐겨찾기