Flink-Streaming-State & Fault Tolerance-Checkpointing

5675 단어
Flink의 모든 기능과operator는 상태가 있을 수 있습니다.상태가 있는function은 모든 데이터를 처리할 때 데이터를 저장하여 상태state를 더욱 세밀한 조작이 필요한 임의의 유형의operator에 필요한 구조 블록으로 만든다.Flink는 상태 장애 복구를 위해 상태 스냅샷 체크포인트를 수행해야 합니다.checkpoint를 사용하면 Flink가 job의 상태와 흐름의 위치를 복구하고 응용 프로그램에 일치하는 의미를 제공할 수 있습니다. 마치 지금까지 고장이 발생한 적이 없는 것처럼.문서documentation on streaming fault tolerance는 Flink 고장 용인 메커니즘 뒤의 기술 세부 사항을 설명합니다.

Prerequisites 사전 요구 사항


Flink의 checkpoint 메커니즘은 흐름과 state에 안정적인 저장소를 제공해야 합니다.일반적인 요구 사항:
  • 특정한 시각에서 데이터를 재생할 수 있는 지속적(안정적) 데이터 원본.지속적인 데이터 대기열(예: kafka, RabbitMQ, Amazon Kinesis, Google PubSub) 또는 파일 시스템(예: HDFS, S3, GFS, NFS, Ceph)
  • 일반적으로 분산 파일 시스템 (예: HDFS, S3, GFS, NFS, Ceph...) 의 스토리지 state의 지속적인 스토리지입니다

  • Checkpointing 켜기 및 구성


    기본적으로 checkpoint는 닫힙니다.checkpoint를 열기 위해 StreamExecution Environment에서 enableCheckpointing(n) 방법을 호출합니다. 여기서 n은 checkpoint의 주파수(밀리초)를 나타냅니다.checkpoint의 다른 매개 변수는 다음과 같습니다.
  • exactly-once vs.at-least-once: enableCheckpointing(n)을 호출할 때 선택할 수 있는 매개 변수 모드를 전송할 수 있습니다. 두 가지 모드만 Flink가 제공하는 두 가지 등급의 보증을 대표합니다.Exactly-once는 대부분의 응용 프로그램에 가장 좋은 선택입니다.at-least-once는 초저 지연 (겨우 몇 밀리초) 이 필요한 응용에 적합할 수 있습니다
  • checkpoint timeout: 설정된 시간 후에도 checkpoint를 완성하지 못하면 이번 checkpoint는 버려집니다
  • minimum time between checkpoints: 흐르는 응용 프로그램에서 checkpoint가 많은 프로세스를 차지하지 않도록 두 번의 checkpoint 사이의 최소 시간 간격을 설정할 수 있습니다.만약 이 값이 5000으로 설정된다면, 다음 checkpoint는 지난번 checkpoint 실행이 끝난 후 5초 후에 실행될 것입니다. checkpoint가 얼마나 걸리든지 설정된 checkpoint interval에 상관없습니다.주의, 이 설정을 사용하면 checkpoint interval이 값보다 작지 않다는 것을 의미합니다. 응용 프로그램에서 "time between checkpoints"를 설정하는 것이 checkpoint interval을 설정하는 것보다 훨씬 쉽습니다. 왜냐하면 일부 checkpoint가 평균 시간보다 오래 걸릴 때(예를 들어 목표 저장 시스템이 짧고 느릴 때), "time between checkpoints"가 영향을 받지 않기 때문입니다
  • number of current checkpoint: 기본적으로 시스템은 지난번 checkpoint가 완성되지 않은 상황에서 새로운 checkpoint를 터치하지 않습니다.이것은 topology 토폴로지 구조가checkpoint에 너무 많은 시간을 소비하지 않을 것을 보장한다.Flink는 여러 개의 checkpoint 프로세스를 동시에 진행할 수 있습니다. 재미있는 예는 pipeline 장면에서 특정한 처리 지연(예를 들어 어떤 function이 원격 서비스를 호출하면 응답을 기다리는 시간이 필요하다)이나 빈번한 checkpoint (예를 들어 1% 밀리초) 를 통해 고장이 났을 때 빠른 복구를 실현하는 것입니다."minimum time between checkpoints"를 정의했을 때, 이 옵션은 적용되지 않습니다
  • externalized checkpoints: 외부 시스템으로 checkpoint의 주기적인 지속화를 설정할 수 있습니다.externalized checkpoints는 메모리 시스템에 메타데이터를 기록합니다.job가 고장났을 때도 자동으로 제거되지 않습니다.deployment notes on externalized checkpoint에 대한 자세한 설명이 있습니다..
  • fail/continue task on checkpoint errors: 이 설정은task를 실행하는 checkpoint 프로세스가 이상이 발생하면 이task가fail될지 여부를 결정합니다.기본값은 fail입니다.다른 옵션을 선택하면task는 checkpoint coodinator의 checkpoint 요청을 거부하고 계속 실행합니다..
  • StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000);
    
    // advanced options:
    
    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    
    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    
    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    
    // enable externalized checkpoints which are retained after job cancellation
    env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

    관련 구성 옵션
    더 많은 매개 변수나 기본 매개 변수는conf/flink-conf.yaml 파일을 참조할 수 있습니다
    Key
    Default
    Description
    state.backend
    (none)
    스토리지 상태 스냅샷의 상태 백엔드
    state.backend.async
    true
    이 설정 항목은 비동기식 스냅샷 방법을 사용할지 여부를 결정합니다.일부 상태 백엔드에서 비동기식 스냅샷을 지원하지 않거나 비동기식 스냅샷만 지원하면 이 설정을 무시할 수 있습니다
    state.backend.fs.memory-threshold
    1024
    상태 데이터 파일의 최소 용량입니다.모든state 블록은 이 설정보다 작고 루트 체크포인트 메타데이터 파일에 저장됩니다.
    state.backend.incremental
    false
    이 옵션은 변동분 스냅샷을 사용할지 여부를 결정합니다.증량 스냅샷의 경우 모든 상태를 저장하지 않고 이전 체크포인트와 다른 데이터만 저장합니다.일부 상태 백엔드가 지원되지 않을 수도 있습니다. 이 설정은 무시됩니다.
    state.backend.local-recovery
    false
    이 옵션은 로컬 복구를 사용할지 여부를 설정합니다.기본값은 사용되지 않습니다.현재 로컬 복구는 Keyed의 상태 복구만 지원합니다.현재 MemoryStateBackend는 로컬 복구를 지원하지 않으며 이 설정을 무시합니다.
    state.checkpoints.dir
    (none)
    checkpoint 데이터 파일과 메타데이터 파일의 기본 디렉터리를 저장합니다.이 디렉터리는 모든 프로세스/노드에 접근할 수 있어야 합니다(예를 들어 모든 TaskManager와 JobManager).
    state.checkpoints.num-retained

    예약된 완료된 스냅샷의 최대 데이터
    state.savepoints.dir
    (none)
    savepoint의 기본 저장 디렉터리입니다.상태 백엔드에서 savepoint를 파일 시스템에 쓰기(MemorystateBackend, FsStateBackend, RocksDBStateBackend)
    taskmanager.state.local.root-dirs
    (none)
    로컬 복구에 사용할 상태 디렉토리를 정의합니다.로컬 복구는 현재 Keyed 상태만 지원됩니다.현재 MemoryStateBackend는 로컬 복구를 지원하지 않으며 이 설정을 무시합니다.

    상태 백엔드 선택


    Flink의 체크포인트 메커니즘은timer와 상태가 있는operater의state 스냅샷을 저장합니다. 상태가 있는operator는connector, 윈도우 및 모든 사용자 정의 상태를 포함합니다.스냅샷이 어디에 저장되느냐에 따라 구성된 상태 백엔드 State Backend가 달라집니다.기본적으로 상태는 TaskManager 메모리에 저장되고 스냅샷은 JobManager 메모리에 저장됩니다.대량의state를 저장하기 위해flink는 다양한 상태 백엔드를 지원하여 상태 저장 및 상태 스냅샷을 저장합니다.다음 방법으로 상태 백엔드를 구성할 수 있습니다StreamExecutionEnvironment.setStateBackend(...) state 백엔드를 보고 선택할 수 있는 상태 백엔드와job와cluster에 대한 설정을 알 수 있습니다.

    Iterative(교체된)job의 상태 스냅샷state checkpoint


    flink는 현재iteration을 사용하지 않는job에만chckpoint 처리 보증을 제공합니다.iterationjob에서 checkpoint를 사용하면 이상이 발생할 수 있습니다.iterative 프로그램에서 checkpoint를 강제로 사용하려면 사용자는 checkpoint를 열 때 특수한 로고를 설정해야 합니다env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,force = true)

    정책 재시작


    Flink는 Restart Strategies를 참조하여 다양한 재가동 정책을 제공합니다.

    좋은 웹페이지 즐겨찾기