Flink 가속기 사용 정보
Accumulators collect distributed statistics or aggregates in a from user functions and operators. Each parallel instance creates and updates its own accumulator object, and the different parallel instances of the accumulator are later merged. merged by the system at the end of the job. The result can be obtained from the result of a job execution, or from the web runtime monitor. The accumulators are inspired by the Hadoop/MapReduce counters. The type added to the accumulator might differ from the type returned. This is the case e.g. for a set-accumulator: We add single objects, but the result is a set of objects.
이 단락의 영문 정의는 이해하기 쉬우므로 번역하지 않습니다!
직접 코드:
package com.daxin
import java.io.File
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
/**
* Created by Daxin on 2017/4/18.
*/
object Acc {
def main(args: Array[String]) {
val numLines = new IntCounter()
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements("1", "2", "5")
//TODO RichMapFunction RuntimeContext
val wm = data.map(new RichMapFunction[String, String]() {
override def open(parameters: Configuration): Unit = {// ,
getRuntimeContext.addAccumulator("daxinCounter", numLines)
}
override def map(value: String): String = {
numLines.add(2)
"111"
}
})
val file = new File("C:\\logs\\flink")
if (file.exists()) {
file.delete()
}
wm.writeAsText("C:\\logs\\flink") //
// wm.printToErr() // :No new data sinks have been defined since the last execution.
//println(env.getExecutionPlan())
val rs = env.execute()
val counter: Int = rs.getAccumulatorResult("daxinCounter")
//println("counter : " + counter)
}
}
다음은 Accelerator가 사용하는 간단한 데모입니다. 핵심은 RichMapFunction, RichMapFunction의 설명은 다음과 같습니다.
public abstract class RichMapFunction extends AbstractRichFunction implements MapFunction
다음은 RichFunction 설명을 살펴보겠습니다.
/**
* An base interface for all rich user-defined functions. This class defines methods for
* the life cycle of the functions, as well as methods to access the context in which the functions
* are executed.
*/
@Public
public interface RichFunction extends Function
RichFunction 함수는 사용자가 정의한 함수의 인터페이스입니다. 이 함수는 다음과 같은 주기적인 방법이 있고 이 인터페이스의 방법은 함수가 실행하는 상하문에 접근할 수 있습니다!
void open(Configuration parameters) throws Exception; // 。
void close() throws Exception;
컨텍스트 가져오기 방법:
RuntimeContext getRuntimeContext();
IterationRuntimeContext getIterationRuntimeContext();
함수에 가속기를 사용해야 한다면 함수 상하문에 가속기를 등록하고 함수에 사용해야 한다.따라서 RichMapFunction의 getRuntimeContext 방법을 사용하여 컨텍스트를 가져와야 합니다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.