Hadoop 작은 파일 처리 SequcenceFile 은 작은 파일 을 직접 찾 을 수 있 습 니 다 info

4492 단어 빅 데이터
SequenceFile 개요
SequenceFile 은 Hadoop API 가 제공 하 는 바 이 너 리 파일 지원 으로 사용 이 편리 하고 분할 가능 하 며 압축 가능 한 특징 이 있 습 니 다.
SequenceFile 파일 에서 모든 key - value 는 하나의 기록 (Record) 으로 간주 되 기 때문에 Record 의 압축 정책 을 바탕 으로 SequenceFile 파일 ( SequenceFile.CompressionType ):
  • NONE: records 를 압축 하지 않 습 니 다.
  • RECORD: 모든 record 의 value 값 만 압축 합 니 다. 레코드 압축 률 이 낮 음,
  • BLOCK: 하나의 block 에 있 는 모든 records 를 압축 합 니 다.  일반적으로 BLOCK 압축 을 권장 합 니 다.

  • 사상: sequence file 은 일련의 바 이 너 리 key / value 로 구성 되 어 있 으 며, key 작은 파일 이름, value 가 파일 내용 이 라면 많은 작은 파일 을 큰 파일 로 합 칠 수 있 습 니 다. 이 작은 파일 들 의 위치 정보 에 대해 색인 을 구축 합 니 다.그러나 이런 해결 방안 은 Hadoop 의 또 다른 파일 형식 인 MapFile 파일 과 관련된다.SequenceFile 파일 은 키 - value 데 이 터 를 키 의 특정한 순서에 따라 저장 하 는 것 을 보장 하지 않 으 며 append 작업 도 지원 하지 않 습 니 다.
    코드 사고: 작은 파일 의 경 로 를 추가 하 는 방법 을 정의 합 니 다. 주어진 경 로 는 폴 더 이 고 폴 더 를 옮 겨 다 니 며 하위 폴 더 의 파일 을 모두 small FilePaths 에 넣 고 주어진 경 로 는 파일 이 며, 파일 의 경 로 를 small FilePaths 에 넣 고, small FilePaths 의 작은 문 서 를 옮 겨 다 니 며 읽 은 다음 합 친 sequencefile 용기 에 넣 고 작은 파일 을 옮 겨 다 니 며 읽 습 니 다.sequencefile 에 하나씩 기록 하고 파일 경 로 를 key 로 하 며 파일 내용 을 value 로 하여 sequencefile 에 넣 고 큰 파일 의 작은 파일 을 읽 습 니 다. 
    package com.lj.small;
     
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileReader;
    import java.io.IOException;
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.List;
     
    import org.apache.commons.codec.digest.DigestUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileUtil;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.SequenceFile.Reader;
    import org.apache.hadoop.io.SequenceFile.Writer;
    import org.apache.hadoop.io.Text;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
     
    public class MergeSmallFilesToSequenceFile {
    	private static Logger logger = LoggerFactory.getLogger(MergeSmallFilesToSequenceFile.class);
    	private Configuration configuration = new Configuration();
    	private List smallFilePaths = new ArrayList();
    	
    	//              
    	public void addInputPath(String inputPath) throws Exception{
    		File file = new File(inputPath);
    		//        ,      ,            smallFilePaths
    		//       ,         smallFilePaths
    		if(file.isDirectory()){
    			File[] files = FileUtil.listFiles(file);
    			for(File sFile:files){
    				smallFilePaths.add(sFile.getPath());
    				logger.info("       :" + sFile.getPath());
    			}
    		}else{
    			smallFilePaths.add(file.getPath());
    			logger.info("       :" + file.getPath());
    		}
    	}
    	// smallFilePaths        ,       sequencefile   
    	public void mergeFile() throws Exception{
    		Writer.Option bigFile = Writer.file(new Path("/bigfile.seq"));
    		Writer.Option keyClass = Writer.keyClass(Text.class);
    		Writer.Option valueClass = Writer.valueClass(BytesWritable.class);
    		//  writer
    		Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);
    		//       ,    sequencefile
    		Text key = new Text();
    		for(String path:smallFilePaths){
    			File file = new File(path);
    			long fileSize = file.length();//          
    			byte[] fileContent = new byte[(int)fileSize];
    			FileInputStream inputStream = new FileInputStream(file);
    			inputStream.read(fileContent, 0, (int)fileSize);//           fileContent      
    			String md5Str = DigestUtils.md5Hex(fileContent);
    			logger.info("merge   :"+path+",md5:"+md5Str);
    			key.set(path);
    			//       key,      value,   sequencefile 
    			writer.append(key, new BytesWritable(fileContent));
    		}
    		writer.hflush();
    		writer.close();
    	}
    	//          
    	public void readMergedFile() throws Exception{
    		Reader.Option file = Reader.file(new Path("/bigfile.seq"));
    		Reader reader = new Reader(configuration, file);
    		Text key = new Text();
    		BytesWritable value = new BytesWritable();
    		while(reader.next(key, value)){
    			byte[] bytes = value.copyBytes();
    			String md5 = DigestUtils.md5Hex(bytes);
    			String content = new String(bytes, Charset.forName("GBK"));
    			logger.info("     :"+key+",md5:"+md5+",content:"+content);
    		}
    	}
    	
    	public static void main(String[] args) throws Exception {
    		MergeSmallFilesToSequenceFile msf = new MergeSmallFilesToSequenceFile();
    		//     
    //		msf.addInputPath("D:\\smallfiles");
    //		msf.mergeFile();
    		//     
    		msf.readMergedFile();
    	}
    }

    좋은 웹페이지 즐겨찾기