Flink 상태 관리 - State Backends
2440 단어 flink
MemoryStateBackend
MemoryStateBackend 내부에서state를taskManager의 메모리에 대상으로 저장하고 checkpoint 메커니즘을 통해MemoryStateBackend는state를 스냅샷하여 Jobmanager의 메모리에 저장합니다.
Memory State Backend는 비동기 스냅샷(asynchronous snapshots)을 사용하도록 설정할 수 있으며, 비동기 스냅샷을 사용하면 파이프가 막히는 것을 피할 수 있으며, 현재는 기본적으로 켜져 있습니다.
Memory StateBackend의 제한 사항:
Memory StateBackend 적용 장면:
FsStateBackend는 파일 시스템 경로를 설정하여 동적 데이터를taskmanger의 메모리에 저장하고 checkpoint 메커니즘을 통해 설정된 파일 시스템이나 디렉터리에 상태 스냅샷을 기록합니다.
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//fs , file:///, taskmanager
val checkPointPath = new Path("hdfs:///flink/checkpoints")
val fsStateBackend: StateBackend = new FsStateBackend(checkPointPath)
env.setStateBackend(fsStateBackend)
FsStateBackend 적용 시나리오:
RocksDBStateBackend는 작업 상태를 RocksDB 데이터베이스에 저장합니다.checkpoint를 통해 로크스DB 데이터베이스 전체가 설정된 파일 시스템이나 디렉터리로 복사됩니다
private val checkpointDataUri = "hdfs:///flink/checkpoints"
private val tmpDir = "file:///tmp/rocksdb/data/"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val fsStateBackend: StateBackend = new FsStateBackend(checkpointDataUri)
val rocksDBBackend: RocksDBStateBackend = new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE)
val config = new Configuration()
//TIMER HEAP( , ) RocksDB( )
config.setString(RocksDBOptions.TIMER_SERVICE_FACTORY,RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString)
rocksDBBackend.configure(config)
rocksDBBackend.setDbStoragePath(tmpDir)
env.setStateBackend(rocksDBBackend.asInstanceOf[StateBackend])
RocksDBStateBackend 적용 장면:
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
flink-dump-fullgc log 인쇄 분석dump fullgc log jps: 프로세스 ID, 프로세스 시작 경로 등 모든 jvm 프로세스를 보십시오. jstack: jvm에서 현재 모든 라인의 운행 상황과 라인의 현재 상태를 관찰합니다. jstat: JV...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.