flink on yarn 시작 프로 세 스 분석

본 고 는 주로 flink on yarn 의 임 무 를 제출 하 는 절 차 를 분석 하고 job 모델 을 예 로 들 었 다.
  • CliFront 는 명령 행 을 제출 하 는 입구
  • 명령 행 runJob 의 전체 호출 링크 는 다음 과 같 습 니 다.
  • run -> runProgram -> executeProgram -> ClusterClient.run


  • 그 중에서 도 Yrn 관련 절 차 는 runProgram 방법 에 있 습 니 다.
    //      customCommandLine
    final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
    try {
    	runProgram(customCommandLine, commandLine, runOptions, program);
    } finally {
    	program.deleteExtractedLibraries();
    }
    
    private <T> void runProgram(
    			CustomCommandLine<T> customCommandLine,
    			CommandLine commandLine,
    			RunOptions runOptions,
    			PackagedProgram program) throws ProgramInvocationException, FlinkException {
            //  customCommandLine   ClusterDescriptor
    		final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
            ...
            //     
    		client = clusterDescriptor.deployJobCluster(
    				clusterSpecification,
    				jobGraph,
    				runOptions.getDetachedMode());
    
            //         
    		executeProgram(program, client, userParallelism);
    		....
    	}
    

    클 러 스 터 배치 의 논 리 는 customCommandLine.createClusterDescriptor 에서 얻 은 Cluster Descriptor 를 통 해 이 루어 진 것 임 을 알 수 있다.그래서 일단 custom CommandLine 을 얻 는 논 리 를 살 펴 보 겠 습 니 다.
    public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) {
    		for (CustomCommandLine<?> cli : customCommandLines) {
    			if (cli.isActive(commandLine)) {
    				return cli;
    			}
    		}
    		throw new IllegalStateException("No command-line ran.");
    	}
    

    모든 커 스 텀 커 맨 드 라인 을 옮 겨 다 니 며 활성 화 된 첫 번 째 를 선택 하 는 것 입 니 다. 커 스 텀 커 맨 드 라인 이 어떤 것 이 있 는 지, 커 스 텀 커 맨 드 라인 을 초기 화 하 는 논 리 는 바로 CliFrontend.loadCustomCommandLines 방법 입 니 다.
    public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
    		List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
    
    		//	Command line interface of the YARN session, with a special initialization here
    		//	to prefix all options with y/yarn.
    		//	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
    		//	      active CustomCommandLine in order and DefaultCLI isActive always return true.
    		final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
    		try {
    			customCommandLines.add(
    				loadCustomCommandLine(flinkYarnSessionCLI,
    					configuration,
    					configurationDirectory,
    					"y",
    					"yarn"));
    		} catch (NoClassDefFoundError | Exception e) {
    			LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
    		}
    
    		customCommandLines.add(new DefaultCLI(configuration));
    
    		return customCommandLines;
    	}
    

    실제로 두 개의 custom CommandLine, 하 나 는 DefaultCLI, 하 나 는 FlinkYarnSessionCli 입 니 다.저 희 는 FlinkYarnSessionCli 의 isActive 방법 을 직접 보 겠 습 니 다.
    @Override
    	public boolean isActive(CommandLine commandLine) {
    		String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
    		boolean yarnJobManager = ID.equals(jobManagerOption);
    		boolean yarnAppId = commandLine.hasOption(applicationId.getOpt());
    		return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
    	}
    

    그 중 하 나 는 - m 옵션 private static final String ID = "yarn-cluster"; 을 선택 하면 됩 니 다.
    FlinkYarnSessionCli 의 createCluster Descriptor 방법 을 살 펴 보 겠 습 니 다. 마지막 으로 돌아 온 것 은 YarnCluster Descriptor 대상 입 니 다.그래서 우 리 는 그것 의 deploy JobCluster 방법 을 직접 보 았 다. 이 방법 은 마지막 으로 AbstractYarnClusterDescriptor.startAppMaster 호출 되 었 다.
    public ApplicationReport startAppMaster(
    			Configuration configuration,
    			String applicationName,
    			String yarnClusterEntrypoint,
    			JobGraph jobGraph,
    			YarnClient yarnClient,
    			YarnClientApplication yarnApplication,
    			ClusterSpecification clusterSpecification) throws Exception {
            ...
    
            //          
    		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
    			yarnClusterEntrypoint,
    			hasLogback,
    			hasLog4j,
    			hasKrb5,
    			clusterSpecification.getMasterMemoryMB());
            
            ...
    
    		// set classpath from YARN configuration
    		Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
    
    		amContainer.setEnvironment(appMasterEnv);
    
    		// Set up resource type requirements for ApplicationMaster
    		Resource capability = Records.newRecord(Resource.class);
    		capability.setMemory(clusterSpecification.getMasterMemoryMB());
    		capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
    
    		final String customApplicationName = customName != null ? customName : applicationName;
    
    		appContext.setApplicationName(customApplicationName);
    		appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
            //   ApplicationMaster     
    		appContext.setAMContainerSpec(amContainer);
    		appContext.setResource(capability);
    
    		if (yarnQueue != null) {
    			appContext.setQueue(yarnQueue);
    		}
    
    		setApplicationNodeLabel(appContext);
    
    		setApplicationTags(appContext);
    
    		// add a hook to clean up in case deployment fails
    		Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir);
    		Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
    		LOG.info("Submitting application master " + appId);
            //   Application
    		yarnClient.submitApplication(appContext);
    
            ...
    	}
    

    startAppMaster 도 하나의 논리 입 니 다. 우 리 는 주요 한 것 을 직접 봅 니 다. 주요 논 리 는 구축 ContainerLaunchContext 한 다음 에 Container LaunchContext 를 제출 매개 변수 중 하나 로 Yrn app 을 제출 하 는 것 입 니 다.Container LaunchContext 를 구성 하 는 방법 논 리 를 살 펴 보 겠 습 니 다.
    protected ContainerLaunchContext setupApplicationMasterContainer(
    			String yarnClusterEntrypoint,
    			boolean hasLogback,
    			boolean hasLog4j,
    			boolean hasKrb5,
    			int jobManagerMemoryMb) {
    		// ------------------ Prepare Application Master Container  ------------------------------
    
    		// respect custom JVM options in the YAML file
    		String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
    		if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
    			javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
    		}
    		//applicable only for YarnMiniCluster secure test run
    		//krb5.conf file will be available as local resource in JM/TM container
    		if (hasKrb5) {
    			javaOpts += " -Djava.security.krb5.conf=krb5.conf";
    		}
    
    		// Set up the container launch context for the application master
    		ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
    
    		final  Map<String, String> startCommandValues = new HashMap<>();
    		startCommandValues.put("java", "$JAVA_HOME/bin/java");
    
    		int heapSize = Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration);
    		String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize);
    		startCommandValues.put("jvmmem", jvmHeapMem);
    
    		startCommandValues.put("jvmopts", javaOpts);
    		String logging = "";
    
    		if (hasLogback || hasLog4j) {
    			logging = "-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
    
    			if (hasLogback) {
    				logging += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
    			}
    
    			if (hasLog4j) {
    				logging += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME;
    			}
    		}
    
    		startCommandValues.put("logging", logging);
            //  yarnClusterEntrypoint  Container        
    		startCommandValues.put("class", yarnClusterEntrypoint);
    		startCommandValues.put("redirects",
    			"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
    			"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");
    		startCommandValues.put("args", "");
    
    		final String commandTemplate = flinkConfiguration
    			.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
    				ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
            //         command
    		final String amCommand =
    			BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
            
            //   command
    		amContainer.setCommands(Collections.singletonList(amCommand));
    
    		LOG.debug("Application Master start command: " + amCommand);
    
    		return amContainer;
    	}
    

    마지막 으로 YrnClusterEntrypoint 의 출처 를 살 펴 보 겠 습 니 다. 사실은 YarnSession ClusterEntrypoint (session 모드) 와 YarnJobClusterEntrypoint (job 모드) 의 두 가지 유형 입 니 다.그래서 이 yarn 에 제출 한 AppMaster 의 논 리 는 사실상 YarnJob ClusterEntrypoint / YarnSession ClusterEntrypoint 의 main 방법 을 실행 한 것 이다.우 리 는 YarnJob ClusterEntrypoint 의 main 방법 을 직접 보 았 다.
    public static void main(String[] args) {
    		// startup checks and logging
    		EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
    		SignalHandler.register(LOG);
    		JvmShutdownSafeguard.installAsShutdownHook(LOG);
    
    		Map<String, String> env = System.getenv();
    
    		final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
    		Preconditions.checkArgument(
    			workingDirectory != null,
    			"Working directory variable (%s) not set",
    			ApplicationConstants.Environment.PWD.key());
    
    		try {
    			YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
    		} catch (IOException e) {
    			LOG.warn("Could not log YARN environment information.", e);
    		}
    
    		Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
    
    		YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(
    			configuration,
    			workingDirectory);
            //     
    		ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
    	}
    

    마지막 으로 호출 ClusterEntrypoint.runCluster 방법
    private void runCluster(Configuration configuration) throws Exception {
    		synchronized (lock) {
    			initializeServices(configuration);
    
    			// write host information into configuration
    			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
    			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
    
                //            factory
    			final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
    
                //   factory create  
    			clusterComponent = dispatcherResourceManagerComponentFactory.create(
    				configuration,
    				commonRpcService,
    				haServices,
    				blobServer,
    				heartbeatServices,
    				metricRegistry,
    				archivedExecutionGraphStore,
    				new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
    				this);
    
    			clusterComponent.getShutDownFuture().whenComplete(
    				(ApplicationStatus applicationStatus, Throwable throwable) -> {
    					if (throwable != null) {
    						shutDownAsync(
    							ApplicationStatus.UNKNOWN,
    							ExceptionUtils.stringifyException(throwable),
    							false);
    					} else {
    						// This is the general shutdown path. If a separate more specific shutdown was
    						// already triggered, this will do nothing
    						shutDownAsync(
    							applicationStatus,
    							null,
    							true);
    					}
    				});
    		}
    	}
    
    DispatcherResourceManagerComponentFactory 두 가지 구체 적 인 하위 클래스 가 있 는데 JobDispatcherResourceManagerComponentFactorySessionDispatcherResourceManagerComponentFactory 는 각각 session 과 job 모델 을 대표 한다.그 다음 에 createDispatcherResourceManagerComponentFactory 방법 은 각 Cluster Entry point 의 하위 클래스 에서 모두 실현 되 었 고 실제 차 이 는 바로 구축 한 ResourceManagerFactory 류 에 차이 가 있다 는 것 이다.자원 신청 방식 이 다르다 는 것 이다.
    다음은 각 구성 요소 의 구축 을 살 펴 보 겠 습 니 다.
    public DispatcherResourceManagerComponent<T> create(
    			Configuration configuration,
    			RpcService rpcService,
    			HighAvailabilityServices highAvailabilityServices,
    			BlobServer blobServer,
    			HeartbeatServices heartbeatServices,
    			MetricRegistry metricRegistry,
    			ArchivedExecutionGraphStore archivedExecutionGraphStore,
    			MetricQueryServiceRetriever metricQueryServiceRetriever,
    			FatalErrorHandler fatalErrorHandler) throws Exception {
    
            //     ,  HA
    		LeaderRetrievalService dispatcherLeaderRetrievalService = null;
    		LeaderRetrievalService resourceManagerRetrievalService = null;
            // webUI    
    		WebMonitorEndpoint<U> webMonitorEndpoint = null;
            //       
    		ResourceManager<?> resourceManager = null;
            // metric  
    		JobManagerMetricGroup jobManagerMetricGroup = null;
            //      dispatcher
    		T dispatcher = null;
    
    		try {
    			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
    
    			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
    
    			final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
    				rpcService,
    				DispatcherGateway.class,
    				DispatcherId::fromUuid,
    				10,
    				Time.milliseconds(50L));
    
    			final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
    				rpcService,
    				ResourceManagerGateway.class,
    				ResourceManagerId::fromUuid,
    				10,
    				Time.milliseconds(50L));
    
    			final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
    				configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
    				configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
    				"DispatcherRestEndpoint");
    
    			final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
    			final MetricFetcher metricFetcher = updateInterval == 0
    				? VoidMetricFetcher.INSTANCE
    				: MetricFetcherImpl.fromConfiguration(
    					configuration,
    					metricQueryServiceRetriever,
    					dispatcherGatewayRetriever,
    					executor);
    
    			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
    				configuration,
    				dispatcherGatewayRetriever,
    				resourceManagerGatewayRetriever,
    				blobServer,
    				executor,
    				metricFetcher,
    				highAvailabilityServices.getWebMonitorLeaderElectionService(),
    				fatalErrorHandler);
    
    			log.debug("Starting Dispatcher REST endpoint.");
    			webMonitorEndpoint.start();
    
    			final String hostname = getHostname(rpcService);
    
    			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
    				metricRegistry,
    				hostname,
    				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
                //   resourceManager
    			resourceManager = resourceManagerFactory.createResourceManager(
    				configuration,
    				ResourceID.generate(),
    				rpcService,
    				highAvailabilityServices,
    				heartbeatServices,
    				metricRegistry,
    				fatalErrorHandler,
    				new ClusterInformation(hostname, blobServer.getPort()),
    				webMonitorEndpoint.getRestBaseUrl(),
    				jobManagerMetricGroup);
    
    			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
    
                //   dispatcher
    			dispatcher = dispatcherFactory.createDispatcher(
    				configuration,
    				rpcService,
    				highAvailabilityServices,
    				resourceManagerGatewayRetriever,
    				blobServer,
    				heartbeatServices,
    				jobManagerMetricGroup,
    				metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
    				archivedExecutionGraphStore,
    				fatalErrorHandler,
    				historyServerArchivist);
    
    			log.debug("Starting ResourceManager.");
    			resourceManager.start();
    			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
    
    			log.debug("Starting Dispatcher.");
    			dispatcher.start();
    			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
    
    			return createDispatcherResourceManagerComponent(
    				dispatcher,
    				resourceManager,
    				dispatcherLeaderRetrievalService,
    				resourceManagerRetrievalService,
    				webMonitorEndpoint,
    				jobManagerMetricGroup);
    
    		} catch (Exception exception) {
    			// clean up all started components
    			if (dispatcherLeaderRetrievalService != null) {
    				try {
    					dispatcherLeaderRetrievalService.stop();
    				} catch (Exception e) {
    					exception = ExceptionUtils.firstOrSuppressed(e, exception);
    				}
    			}
    
    			if (resourceManagerRetrievalService != null) {
    				try {
    					resourceManagerRetrievalService.stop();
    				} catch (Exception e) {
    					exception = ExceptionUtils.firstOrSuppressed(e, exception);
    				}
    			}
    
    			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
    
    			if (webMonitorEndpoint != null) {
    				terminationFutures.add(webMonitorEndpoint.closeAsync());
    			}
    
    			if (resourceManager != null) {
    				terminationFutures.add(resourceManager.closeAsync());
    			}
    
    			if (dispatcher != null) {
    				terminationFutures.add(dispatcher.closeAsync());
    			}
    
    			final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures);
    
    			try {
    				terminationFuture.get();
    			} catch (Exception e) {
    				exception = ExceptionUtils.firstOrSuppressed(e, exception);
    			}
    
    			if (jobManagerMetricGroup != null) {
    				jobManagerMetricGroup.close();
    			}
    
    			throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);
    		}
    	}
    

    여기까지 클 러 스 터 가 시작 되 었 습 니 다. 이제 deploy JobCluster 방법 으로 클 러 스 터 클 라 이언 트 를 어떻게 구축 하 는 지 알 아 보 겠 습 니 다.
    protected ClusterClient<ApplicationId> deployInternal(
    			ClusterSpecification clusterSpecification,
    			String applicationName,
    			String yarnClusterEntrypoint,
    			@Nullable JobGraph jobGraph,
    			boolean detached) throws Exception {
    		// the Flink cluster is deployed in YARN. Represent cluster
    		return createYarnClusterClient(
    			this,
    			validClusterSpecification.getNumberTaskManagers(),
    			validClusterSpecification.getSlotsPerTaskManager(),
    			report,
    			flinkConfiguration,
    			true);
    	}
    
        @Override
    	protected ClusterClient<ApplicationId> createYarnClusterClient(
    			AbstractYarnClusterDescriptor descriptor,
    			int numberTaskManagers,
    			int slotsPerTaskManager,
    			ApplicationReport report,
    			Configuration flinkConfiguration,
    			boolean perJobCluster) throws Exception {
    		return new RestClusterClient<>(
    			flinkConfiguration,
    			report.getApplicationId());
    	}
    
        RestClusterClient(
    			Configuration configuration,
    			@Nullable RestClient restClient,
    			T clusterId,
    			WaitStrategy waitStrategy,
    			@Nullable LeaderRetrievalService webMonitorRetrievalService) throws Exception {
    		super(configuration);
    		this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
    
    		if (restClient != null) {
    			this.restClient = restClient;
    		} else {
                //   RestClient
    			this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService);
    		}
    
    		this.waitStrategy = Preconditions.checkNotNull(waitStrategy);
    		this.clusterId = Preconditions.checkNotNull(clusterId);
    
    		if (webMonitorRetrievalService == null) {
    			this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever();
    		} else {
    			this.webMonitorRetrievalService = webMonitorRetrievalService;
    		}
    		this.dispatcherRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
    		this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
    		startLeaderRetrievers();
    	}
    

    RestClient 는 사실상 netty 를 기반 으로 한 http client 입 니 다.지금까지 클 러 스 터 와 이 클 러 스 터 에 요청 할 수 있 는 클 라 이언 트 가 구성 되 어 있 습 니 다.다음은 숙제 를 제출 하 는 마지막 호출 RestClusterClient.submitJob 방법 입 니 다.
    @Override
    	public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
    		log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached());
    
    		final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
    
    		if (isDetached()) {
    			try {
    				return jobSubmissionFuture.get();
    			} catch (Exception e) {
    				throw new ProgramInvocationException("Could not submit job",
    					jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
    			}
    		} else {
    			final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose(
    				ignored -> requestJobResult(jobGraph.getJobID()));
    
    			final JobResult jobResult;
    			try {
    				jobResult = jobResultFuture.get();
    			} catch (Exception e) {
    				throw new ProgramInvocationException("Could not retrieve the execution result.",
    					jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
    			}
    
    			try {
    				this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
    				return lastJobExecutionResult;
    			} catch (JobExecutionException e) {
    				throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
    			} catch (IOException | ClassNotFoundException e) {
    				throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
    			}
    		}
    	}
    

    마지막 으로 호출 된 url 은 사실 JobSubmitHandler 의 처리 논리 에 이 르 렀 습 니 다. 어떻게 마지막 으로 호출 하 는 지 는 DispatcherGateway.submitJob 방법 입 니 다. 다음 논 리 는 상기 와 유사 합 니 다.
    @Override
    	protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
    		final Collection<File> uploadedFiles = request.getUploadedFiles();
    		final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
    			File::getName,
    			Path::fromLocalFile
    		));
    
    		if (uploadedFiles.size() != nameToFile.size()) {
    			throw new RestHandlerException(
    				String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
    					uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
    					nameToFile.size(),
    					uploadedFiles.size()),
    				HttpResponseStatus.BAD_REQUEST
    			);
    		}
    
    		final JobSubmitRequestBody requestBody = request.getRequestBody();
    
    		if (requestBody.jobGraphFileName == null) {
    			throw new RestHandlerException(
    				String.format("The %s field must not be omitted or be null.",
    					JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
    				HttpResponseStatus.BAD_REQUEST);
    		}
            //  request body      JobGraph
    		CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
    
    		Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
    
    		Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
    
            //  job       blobServer
    		CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
    
            //   dispatcher submitJob  
    		CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
    
    		return jobSubmissionFuture.thenCombine(jobGraphFuture,
    			(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    	}
    

    on k8s 에 대하 여
    Yrn 의 절차 에서 우 리 는 k8s 의 절차 로 쉽게 이전 할 수 있 습 니 다.
  • k8s 의 Custom CommandLine 을 실현 하여 k8s 와 관련 된 클 러 스 터 createCluster Descriptor
  • 를 구축 합 니 다.
  • k8s 의 Cluster Descriptor 를 실현 하여 deploy 클 러 스 터
  • 에 사용 합 니 다.
  • k8s 기반 리 소스 관리자
  • 실현
    k8s 에서 대체적으로 해 야 할 일 은 JobManager / taskManger 에 해당 하 는 operator 를 실현 하 는 것 이다.

    좋은 웹페이지 즐겨찾기