FileSystem 인스턴스화 프로세스

17090 단어 System
HDFS 사례 코드
Configuration configuration = new Configuration();

FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration);

    

InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));

OutputStream out = new FileOutputStream(new File("log4j_download.properties"));

IOUtils.copyBytes(in, out, 4096, true); //                  /  

 
FileSystem.java
static final Cache CACHE = new Cache();



public static FileSystem get(URI uri, Configuration conf) throws IOException {

    String scheme = uri.getScheme();   //hdfs

    String authority = uri.getAuthority();  //hadoop000:8020



    return CACHE.get(uri, conf);

}



FileSystem get(URI uri, Configuration conf) throws IOException{

    Key key = new Key(uri, conf);

    return getInternal(uri, conf, key);

}



private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{

    FileSystem fs;

    synchronized (this) {

        fs = map.get(key);

    }

    

    //  URI    FileSystem  ,      ,        ,     createFileSystem       

    if (fs != null) { 

        return fs;

    }

    

    fs = createFileSystem(uri, conf);

    synchronized (this) { 

        FileSystem oldfs = map.get(key);

        ... //   CACHE  

        return fs;

    }

}



private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {

    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); //     :org.apache.hadoop.hdfs.DistributedFileSystem

    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);

    fs.initialize(uri, conf); //   DistributedFileSystem

    return fs;

}



public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException {

    if (!FILE_SYSTEMS_LOADED) { //          ,     false

 loadFileSystems();

    }

    Class<? extends FileSystem> clazz = null;

    if (conf != null) {

        clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl ,        core-default.xml core-site.xml      

    }

    if (clazz == null) {

        clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem

    }

    if (clazz == null) {

        throw new IOException("No FileSystem for scheme: " + scheme);

    }

    return clazz;

}





private static void loadFileSystems() {

    synchronized (FileSystem.class) {

        if (!FILE_SYSTEMS_LOADED) {

            ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);

            for (FileSystem fs : serviceLoader) {

                SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());

            }

            FILE_SYSTEMS_LOADED = true; //            

        }

    }

}

loadFileSystems 이후 SERVICEFILE_SYSTEMS에는 다음과 같은 값이 있습니다.
file=class org.apache.hadoop.fs.LocalFileSystem, 

ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem, 

hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, 

hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem, 

webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, 

s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem, 

viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, 

swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, 

har=class org.apache.hadoop.fs.HarFileSystem, 

s3=class org.apache.hadoop.fs.s3.S3FileSystem, 

hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem

 
DistributedFileSystem.java
DFSClient dfs; //    :                DFSClient



@Override

public void initialize(URI uri, Configuration conf) throws IOException {

    super.initialize(uri, conf);

    setConf(conf);



    String host = uri.getHost();  //hadoop000



    this.dfs = new DFSClient(uri, conf, statistics); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());

    this.workingDir = getHomeDirectory();

}

 
DFSClient.java
final ClientProtocol namenode; //    :    NameNode   PRC  



public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException {

    

    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService();

    this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB

}

 
NameNodeProxies.java
public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {

    Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface);

    return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);

}



public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,

    UserGroupInformation ugi, boolean withRetries) throws IOException {

    Text dtService = SecurityUtil.buildTokenService(nnAddr);



    T proxy;

    if (xface == ClientProtocol.class) {

      proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries);

    } ...

    return new ProxyAndInfo<T>(proxy, dtService);

}



private static ClientProtocol createNNProxyWithClientProtocol(

    InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException {

 

    //Client NameNode RPC    

    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);

    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(

        ClientNamenodeProtocolPB.class, version, address, ugi, conf,

        NetUtils.getDefaultSocketFactory(conf),

        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)

            .getProxy();



    if (withRetries) { 

        //  jdk         

        proxy = (ClientNamenodeProtocolPB) RetryProxy.create(

          ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(

              ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy);

    }

    return new ClientNamenodeProtocolTranslatorPB(proxy);

}

 
RetryProxy.java
public static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {

    return Proxy.newProxyInstance(

        proxyProvider.getInterface().getClassLoader(),

        new Class<?>[] { iface },

        new RetryInvocationHandler<T>(proxyProvider, retryPolicy)

    );

}

 
FileSystem 인스턴스 소스 분석 요약 정보를 보려면 다음과 같이 하십시오.
1、FileSystem.get은DistributedFileSystem을 반사하여 실례화합니다.
2. Distributed FileSystem에서 new DFSCilent()는 그를 자신의 구성원 변수로 한다.
3. DFSClient 구조 방법에서createProxy를 호출하여 RPC 메커니즘을 사용하여NameNode의 프록시 대상을 얻어NameNode와 통신할 수 있다.
4, 전체 프로세스: FileSystem.get()--> DistributedFileSystem.initialize () --> DFSClient(RPC.getProtocolProxy () --> NameNode의 에이전트.

좋은 웹페이지 즐겨찾기