FileSystem 인스턴스화 프로세스
17090 단어 System
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration);
InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));
OutputStream out = new FileOutputStream(new File("log4j_download.properties"));
IOUtils.copyBytes(in, out, 4096, true); // /
FileSystem.java
static final Cache CACHE = new Cache();
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme(); //hdfs
String authority = uri.getAuthority(); //hadoop000:8020
return CACHE.get(uri, conf);
}
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
}
private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs;
synchronized (this) {
fs = map.get(key);
}
// URI FileSystem , , , createFileSystem
if (fs != null) {
return fs;
}
fs = createFileSystem(uri, conf);
synchronized (this) {
FileSystem oldfs = map.get(key);
... // CACHE
return fs;
}
}
private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // :org.apache.hadoop.hdfs.DistributedFileSystem
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf); // DistributedFileSystem
return fs;
}
public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException {
if (!FILE_SYSTEMS_LOADED) { // , false
loadFileSystems();
}
Class<? extends FileSystem> clazz = null;
if (conf != null) {
clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl , core-default.xml core-site.xml
}
if (clazz == null) {
clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem
}
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + scheme);
}
return clazz;
}
private static void loadFileSystems() {
synchronized (FileSystem.class) {
if (!FILE_SYSTEMS_LOADED) {
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
for (FileSystem fs : serviceLoader) {
SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
}
FILE_SYSTEMS_LOADED = true; //
}
}
}
loadFileSystems 이후 SERVICEFILE_SYSTEMS에는 다음과 같은 값이 있습니다.
file=class org.apache.hadoop.fs.LocalFileSystem,
ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem,
hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem,
hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem,
webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem,
s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem,
viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem,
swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem,
har=class org.apache.hadoop.fs.HarFileSystem,
s3=class org.apache.hadoop.fs.s3.S3FileSystem,
hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem
DistributedFileSystem.java
DFSClient dfs; // : DFSClient
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
String host = uri.getHost(); //hadoop000
this.dfs = new DFSClient(uri, conf, statistics); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}
DFSClient.java
final ClientProtocol namenode; // : NameNode PRC
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException {
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB
}
NameNodeProxies.java
public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface);
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);
}
public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException {
Text dtService = SecurityUtil.buildTokenService(nnAddr);
T proxy;
if (xface == ClientProtocol.class) {
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries);
} ...
return new ProxyAndInfo<T>(proxy, dtService);
}
private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException {
//Client NameNode RPC
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
.getProxy();
if (withRetries) {
// jdk
proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy);
}
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
RetryProxy.java
public static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
);
}
FileSystem 인스턴스 소스 분석 요약 정보를 보려면 다음과 같이 하십시오.
1、FileSystem.get은DistributedFileSystem을 반사하여 실례화합니다.
2. Distributed FileSystem에서 new DFSCilent()는 그를 자신의 구성원 변수로 한다.
3. DFSClient 구조 방법에서createProxy를 호출하여 RPC 메커니즘을 사용하여NameNode의 프록시 대상을 얻어NameNode와 통신할 수 있다.
4, 전체 프로세스: FileSystem.get()--> DistributedFileSystem.initialize () --> DFSClient(RPC.getProtocolProxy () --> NameNode의 에이전트.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Bring application to foreground with a keypressFrom Forum Nokia Wiki Inorder to capture the keys while you application under background you've to override CCoeAppUi::H...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.