Akka의 Persistence(Event Sourcing) 용도 정보

14868 단어 ScalaAkkatech
이 글은 Scala Advent Calendar 2021일 보도 16일째다.
https://qiita.com/advent-calendar/2021/scala

개시하다


Akka Cluster를 사용하는 분포식 시스템을 실현하기 위해서는 약간의 Akka의 모듈을 이해해야 한다.
그중 하나는 Persistence (Event Sourcing)이다.액터는 지속해서 사용하기 위한 것이라고 자주 들었지만, 사용처가 잘 알려지지 않아 악카의 문서와 참고문헌을 읽으면서 필기를 했다.

Persistence(Event Sourcing)는


상태에서의 반응기를 지속화할 수 있는 모듈.
다음 두 가지 방법으로 영구화할 수 있다
  • Journal
    - 업데이트 이벤트를 순서대로 저장
  • Snapshot
    - 현재 동작 상태를 그대로 저장
  • 상태 리액터가 있는 스냅샷도 사용할 수 있지만 (복구 시 스냅샷을 사용하면 복구 시간을 단축할 수 있다)중요한 개념은 실제 반응기의 스냅샷을 저장하는 것이 아니라 이벤트만 저장하는 것이다.

    지속성은 명령이 아니라 이벤트 처리로 적용 여부를 검증한 후의 이벤트를 저장하는 데 사용됩니다.그리고 복구할 때 적용되는 이벤트를 실행해야 하기 때문에 이벤트가 실패하지 않습니다.

    참고 자료

  • Persistence Plugins
  • 이벤트 처리는 무엇입니까?


    이벤트 처리는 프로그램의 현재 상태를 결정하는 역사를 저장함으로써 프로그램의 상태를 지속시키는 방법입니다.
    인용자 MSDN 이벤트 처리 개요

    참고 자료

  • 구체적인 구현 코드에서 Event Sourcing 이해
  • CRUD의 응용 프로그램에 비해 실크 코드를 써서 참고가 되었다.

    왜 필요합니까?


    사용Cluster Sharding한 경우 상태를 복원할 수 있도록 활용한다.
    Cluster Sharding은 여러 노드에서 동작 액터를 이동하여 시간 경과 동작의 노드가 변경되더라도 논리적 식별자를 사용하여 상호 작용할 수 있도록 하는 모듈입니다.(실체반응기로 불리는 동작을 처리한다)
    실체반응기는 상태를 가진 동작으로 한 노드가 관리하기 때문에 고장날 때 복원해야 한다.따라서 상태를 지속하려면 Persistence(Event Sourcing)를 사용합니다.

    참고 자료

  • Building Stateful Systems with Akka Cluster Sharding
  • 어떻게 실현합니까?


    EventSourcedBehavior를 사용하여 이벤트 처리용 리액터 만들기
    EventSourcedBehavior.apply의 서명
    object EventSourcedBehavior {
    
    // 永続的なアクター用のBehaviorを作成するapplyメソッド
      def apply[Command, Event, State](
          persistenceId: PersistenceId,
          emptyState: State,
          commandHandler: (State, Command) => Effect[Event, State],
          eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = ???
    
    EventSourcedBehavior.apply를 사용할 때 다음과 같은 정보를 전달한다.
  • persistenceId
  • 이벤트를 원본으로 하는 행위의 유일한 식별자
  • emptyState
  • 사건 처리 전 실체의 초기 상태
  • commandHandler
  • 효과에 명령 매핑
  • eventHandler
  • 이벤트가 지속될 때 현재 상태에서 새 상태 계산
  • 샘플 코드
    
    import akka.actor.typed.scaladsl.Behaviors
    import akka.actor.typed.{ActorRef, Behavior}
    import akka.persistence.typed.PersistenceId
    import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
    import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
    
    object MyPersistentBehavior {
    
      sealed trait Command
      final case class Add(data: String) extends Command
      case object Clear extends Command
    
      // Eventは過去形で定義している
      sealed trait Event
      final case class Added(data: String) extends Event
      case object Cleared extends Event
    
      // 最新の5つの状態をもつリスト
      final case class State(history: List[String] = Nil)
    
      //コマンドハンドラは、Effect永続化する1つまたは複数のイベント(存在する場合)を定義する指示を返します。
      val commandHandler: (State, Command) => Effect[Event, State] = {
        (state, command) =>
          command match {
            // エフェクトはファクトリを使用して作成
            case Add(data) => Effect.persist(Added(data))
            case Clear     => Effect.persist(Cleared)
          }
      }
    
      //イベントが正常に永続化されると、eventHandlerを使用してイベントを現在の状態に適用することにより、新しい状態が作成されます
      val eventHandler: (State, Event) => State = { (state, event) =>
        event match {
          case Added(data) => state.copy((data :: state.history).take(5))
          case Cleared     => State(Nil)
        }
      }
      
    
      def apply(id: String): Behavior[Command] =
        EventSourcedBehavior[Command, Event, State](
          // persistenceId 永続アクターの一意の識別子。ofUniqueIdで独自の識別子を作っている
          persistenceId = PersistenceId.ofUniqueId(id),
          // emptyState エンティティが最初に作成されるタイミングを定義
          emptyState = State(Nil),
          // 効果を生成することによってコマンドを処理する方法
          commandHandler = commandHandler,
          // イベントが永続化されたときの現在の状態を指定して、新しい状態を返す
          eventHandler = eventHandler
        )
     }
    
    코드 인용자Example and core API

    끝맺다


    이번에는 간단한 샘플만 봤는데 퍼시스턴스(Event Sourcing)를 사용한 것 자체가 간단했다.
    이번 문헌과 참고문헌을 보니 이제야 이해가 되네요. 앞으로 Cluster Sharding을 사용하려고 해요.

    참고 문헌

  • 메모리 데이터 스토리지를 사용하여 Akka Cluster + Akka Persistence 를 자동으로 테스트하는 애플리케이션
  • Akka Persistence Testkit을 이동하기 전의 단계를 기억하십시오.
  • AWS에서 CQRS/Event Sourcing 수행 방법
  • 구체적인 구현 코드에서 Event Sourcing 이해

  • Akka Cluster로 초능력 획득
  • Akka Cluster로 초레길리스 획득(의 2)
  • Akka 클러스터에서 리액터를 사용하는 대규모 분산 시스템 구축
  • akka-perrsistence 플러그인 만들기
  • Events As First-Class Citizens
  • 좋은 웹페이지 즐겨찾기