flink on yarn 시작 프로 세 스 분석
그 중에서 도 Yrn 관련 절 차 는 runProgram 방법 에 있 습 니 다.
// customCommandLine
final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
try {
runProgram(customCommandLine, commandLine, runOptions, program);
} finally {
program.deleteExtractedLibraries();
}
private <T> void runProgram(
CustomCommandLine<T> customCommandLine,
CommandLine commandLine,
RunOptions runOptions,
PackagedProgram program) throws ProgramInvocationException, FlinkException {
// customCommandLine ClusterDescriptor
final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
...
//
client = clusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
runOptions.getDetachedMode());
//
executeProgram(program, client, userParallelism);
....
}
클 러 스 터 배치 의 논 리 는
customCommandLine.createClusterDescriptor
에서 얻 은 Cluster Descriptor 를 통 해 이 루어 진 것 임 을 알 수 있다.그래서 일단 custom CommandLine 을 얻 는 논 리 를 살 펴 보 겠 습 니 다.public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) {
for (CustomCommandLine<?> cli : customCommandLines) {
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No command-line ran.");
}
모든 커 스 텀 커 맨 드 라인 을 옮 겨 다 니 며 활성 화 된 첫 번 째 를 선택 하 는 것 입 니 다. 커 스 텀 커 맨 드 라인 이 어떤 것 이 있 는 지, 커 스 텀 커 맨 드 라인 을 초기 화 하 는 논 리 는 바로
CliFrontend.loadCustomCommandLines
방법 입 니 다.public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
customCommandLines.add(
loadCustomCommandLine(flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
customCommandLines.add(new DefaultCLI(configuration));
return customCommandLines;
}
실제로 두 개의 custom CommandLine, 하 나 는 DefaultCLI, 하 나 는 FlinkYarnSessionCli 입 니 다.저 희 는 FlinkYarnSessionCli 의 isActive 방법 을 직접 보 겠 습 니 다.
@Override
public boolean isActive(CommandLine commandLine) {
String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
boolean yarnJobManager = ID.equals(jobManagerOption);
boolean yarnAppId = commandLine.hasOption(applicationId.getOpt());
return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
}
그 중 하 나 는 - m 옵션
private static final String ID = "yarn-cluster";
을 선택 하면 됩 니 다.FlinkYarnSessionCli 의 createCluster Descriptor 방법 을 살 펴 보 겠 습 니 다. 마지막 으로 돌아 온 것 은 YarnCluster Descriptor 대상 입 니 다.그래서 우 리 는 그것 의 deploy JobCluster 방법 을 직접 보 았 다. 이 방법 은 마지막 으로
AbstractYarnClusterDescriptor.startAppMaster
호출 되 었 다.public ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
...
//
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasLogback,
hasLog4j,
hasKrb5,
clusterSpecification.getMasterMemoryMB());
...
// set classpath from YARN configuration
Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
final String customApplicationName = customName != null ? customName : applicationName;
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
// ApplicationMaster
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
setApplicationNodeLabel(appContext);
setApplicationTags(appContext);
// add a hook to clean up in case deployment fails
Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
// Application
yarnClient.submitApplication(appContext);
...
}
startAppMaster 도 하나의 논리 입 니 다. 우 리 는 주요 한 것 을 직접 봅 니 다. 주요 논 리 는 구축
ContainerLaunchContext
한 다음 에 Container LaunchContext 를 제출 매개 변수 중 하나 로 Yrn app 을 제출 하 는 것 입 니 다.Container LaunchContext 를 구성 하 는 방법 논 리 를 살 펴 보 겠 습 니 다.protected ContainerLaunchContext setupApplicationMasterContainer(
String yarnClusterEntrypoint,
boolean hasLogback,
boolean hasLog4j,
boolean hasKrb5,
int jobManagerMemoryMb) {
// ------------------ Prepare Application Master Container ------------------------------
// respect custom JVM options in the YAML file
String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
}
//applicable only for YarnMiniCluster secure test run
//krb5.conf file will be available as local resource in JM/TM container
if (hasKrb5) {
javaOpts += " -Djava.security.krb5.conf=krb5.conf";
}
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
final Map<String, String> startCommandValues = new HashMap<>();
startCommandValues.put("java", "$JAVA_HOME/bin/java");
int heapSize = Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration);
String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize);
startCommandValues.put("jvmmem", jvmHeapMem);
startCommandValues.put("jvmopts", javaOpts);
String logging = "";
if (hasLogback || hasLog4j) {
logging = "-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
if (hasLogback) {
logging += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
}
if (hasLog4j) {
logging += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME;
}
}
startCommandValues.put("logging", logging);
// yarnClusterEntrypoint Container
startCommandValues.put("class", yarnClusterEntrypoint);
startCommandValues.put("redirects",
"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");
startCommandValues.put("args", "");
final String commandTemplate = flinkConfiguration
.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
// command
final String amCommand =
BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
// command
amContainer.setCommands(Collections.singletonList(amCommand));
LOG.debug("Application Master start command: " + amCommand);
return amContainer;
}
마지막 으로 YrnClusterEntrypoint 의 출처 를 살 펴 보 겠 습 니 다. 사실은 YarnSession ClusterEntrypoint (session 모드) 와 YarnJobClusterEntrypoint (job 모드) 의 두 가지 유형 입 니 다.그래서 이 yarn 에 제출 한 AppMaster 의 논 리 는 사실상 YarnJob ClusterEntrypoint / YarnSession ClusterEntrypoint 의 main 방법 을 실행 한 것 이다.우 리 는 YarnJob ClusterEntrypoint 의 main 방법 을 직접 보 았 다.
public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
Map<String, String> env = System.getenv();
final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
Preconditions.checkArgument(
workingDirectory != null,
"Working directory variable (%s) not set",
ApplicationConstants.Environment.PWD.key());
try {
YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
} catch (IOException e) {
LOG.warn("Could not log YARN environment information.", e);
}
Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(
configuration,
workingDirectory);
//
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
}
마지막 으로 호출
ClusterEntrypoint.runCluster
방법private void runCluster(Configuration configuration) throws Exception {
synchronized (lock) {
initializeServices(configuration);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
// factory
final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
// factory create
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
DispatcherResourceManagerComponentFactory
두 가지 구체 적 인 하위 클래스 가 있 는데 JobDispatcherResourceManagerComponentFactory
와 SessionDispatcherResourceManagerComponentFactory
는 각각 session 과 job 모델 을 대표 한다.그 다음 에 createDispatcherResourceManagerComponentFactory
방법 은 각 Cluster Entry point 의 하위 클래스 에서 모두 실현 되 었 고 실제 차 이 는 바로 구축 한 ResourceManagerFactory
류 에 차이 가 있다 는 것 이다.자원 신청 방식 이 다르다 는 것 이다.다음은 각 구성 요소 의 구축 을 살 펴 보 겠 습 니 다.
public DispatcherResourceManagerComponent<T> create(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
// , HA
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
LeaderRetrievalService resourceManagerRetrievalService = null;
// webUI
WebMonitorEndpoint<U> webMonitorEndpoint = null;
//
ResourceManager<?> resourceManager = null;
// metric
JobManagerMetricGroup jobManagerMetricGroup = null;
// dispatcher
T dispatcher = null;
try {
dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
10,
Time.milliseconds(50L));
final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
10,
Time.milliseconds(50L));
final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher = updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getWebMonitorLeaderElectionService(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
final String hostname = getHostname(rpcService);
jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry,
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
// resourceManager
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
jobManagerMetricGroup);
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
// dispatcher
dispatcher = dispatcherFactory.createDispatcher(
configuration,
rpcService,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
archivedExecutionGraphStore,
fatalErrorHandler,
historyServerArchivist);
log.debug("Starting ResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
log.debug("Starting Dispatcher.");
dispatcher.start();
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
return createDispatcherResourceManagerComponent(
dispatcher,
resourceManager,
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
jobManagerMetricGroup);
} catch (Exception exception) {
// clean up all started components
if (dispatcherLeaderRetrievalService != null) {
try {
dispatcherLeaderRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
if (resourceManagerRetrievalService != null) {
try {
resourceManagerRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
if (webMonitorEndpoint != null) {
terminationFutures.add(webMonitorEndpoint.closeAsync());
}
if (resourceManager != null) {
terminationFutures.add(resourceManager.closeAsync());
}
if (dispatcher != null) {
terminationFutures.add(dispatcher.closeAsync());
}
final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures);
try {
terminationFuture.get();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
if (jobManagerMetricGroup != null) {
jobManagerMetricGroup.close();
}
throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);
}
}
여기까지 클 러 스 터 가 시작 되 었 습 니 다. 이제 deploy JobCluster 방법 으로 클 러 스 터 클 라 이언 트 를 어떻게 구축 하 는 지 알 아 보 겠 습 니 다.
protected ClusterClient<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
this,
validClusterSpecification.getNumberTaskManagers(),
validClusterSpecification.getSlotsPerTaskManager(),
report,
flinkConfiguration,
true);
}
@Override
protected ClusterClient<ApplicationId> createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
int numberTaskManagers,
int slotsPerTaskManager,
ApplicationReport report,
Configuration flinkConfiguration,
boolean perJobCluster) throws Exception {
return new RestClusterClient<>(
flinkConfiguration,
report.getApplicationId());
}
RestClusterClient(
Configuration configuration,
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
@Nullable LeaderRetrievalService webMonitorRetrievalService) throws Exception {
super(configuration);
this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
if (restClient != null) {
this.restClient = restClient;
} else {
// RestClient
this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService);
}
this.waitStrategy = Preconditions.checkNotNull(waitStrategy);
this.clusterId = Preconditions.checkNotNull(clusterId);
if (webMonitorRetrievalService == null) {
this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever();
} else {
this.webMonitorRetrievalService = webMonitorRetrievalService;
}
this.dispatcherRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
startLeaderRetrievers();
}
RestClient 는 사실상 netty 를 기반 으로 한 http client 입 니 다.지금까지 클 러 스 터 와 이 클 러 스 터 에 요청 할 수 있 는 클 라 이언 트 가 구성 되 어 있 습 니 다.다음은 숙제 를 제출 하 는 마지막 호출
RestClusterClient.submitJob
방법 입 니 다.@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached());
final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
if (isDetached()) {
try {
return jobSubmissionFuture.get();
} catch (Exception e) {
throw new ProgramInvocationException("Could not submit job",
jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
}
} else {
final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose(
ignored -> requestJobResult(jobGraph.getJobID()));
final JobResult jobResult;
try {
jobResult = jobResultFuture.get();
} catch (Exception e) {
throw new ProgramInvocationException("Could not retrieve the execution result.",
jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
}
try {
this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
return lastJobExecutionResult;
} catch (JobExecutionException e) {
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
}
}
}
마지막 으로 호출 된 url 은 사실 JobSubmitHandler 의 처리 논리 에 이 르 렀 습 니 다. 어떻게 마지막 으로 호출 하 는 지 는
DispatcherGateway.submitJob
방법 입 니 다. 다음 논 리 는 상기 와 유사 합 니 다.@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
final Collection<File> uploadedFiles = request.getUploadedFiles();
final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
File::getName,
Path::fromLocalFile
));
if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
nameToFile.size(),
uploadedFiles.size()),
HttpResponseStatus.BAD_REQUEST
);
}
final JobSubmitRequestBody requestBody = request.getRequestBody();
if (requestBody.jobGraphFileName == null) {
throw new RestHandlerException(
String.format("The %s field must not be omitted or be null.",
JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
HttpResponseStatus.BAD_REQUEST);
}
// request body JobGraph
CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
// job blobServer
CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
// dispatcher submitJob
CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
return jobSubmissionFuture.thenCombine(jobGraphFuture,
(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}
on k8s 에 대하 여
Yrn 의 절차 에서 우 리 는 k8s 의 절차 로 쉽게 이전 할 수 있 습 니 다.
k8s 에서 대체적으로 해 야 할 일 은 JobManager / taskManger 에 해당 하 는 operator 를 실현 하 는 것 이다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[case52] flink Keyed Stream의 aggregation 작업에 대해 이야기합니다.flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java Keyed Stream의agg...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.