Akka의 Persistence(Event Sourcing) 용도 정보
개시하다
Akka Cluster를 사용하는 분포식 시스템을 실현하기 위해서는 약간의 Akka의 모듈을 이해해야 한다.
그중 하나는 Persistence (Event Sourcing)이다.액터는 지속해서 사용하기 위한 것이라고 자주 들었지만, 사용처가 잘 알려지지 않아 악카의 문서와 참고문헌을 읽으면서 필기를 했다.
Persistence(Event Sourcing)는
상태에서의 반응기를 지속화할 수 있는 모듈.
다음 두 가지 방법으로 영구화할 수 있다
- 업데이트 이벤트를 순서대로 저장
- 현재 동작 상태를 그대로 저장
지속성은 명령이 아니라 이벤트 처리로 적용 여부를 검증한 후의 이벤트를 저장하는 데 사용됩니다.그리고 복구할 때 적용되는 이벤트를 실행해야 하기 때문에 이벤트가 실패하지 않습니다.
참고 자료
이벤트 처리는 무엇입니까?
이벤트 처리는 프로그램의 현재 상태를 결정하는 역사를 저장함으로써 프로그램의 상태를 지속시키는 방법입니다.
인용자 MSDN 이벤트 처리 개요
참고 자료
왜 필요합니까?
사용Cluster Sharding한 경우 상태를 복원할 수 있도록 활용한다.
Cluster Sharding은 여러 노드에서 동작 액터를 이동하여 시간 경과 동작의 노드가 변경되더라도 논리적 식별자를 사용하여 상호 작용할 수 있도록 하는 모듈입니다.(실체반응기로 불리는 동작을 처리한다)
실체반응기는 상태를 가진 동작으로 한 노드가 관리하기 때문에 고장날 때 복원해야 한다.따라서 상태를 지속하려면 Persistence(Event Sourcing)를 사용합니다.
참고 자료
어떻게 실현합니까?
EventSourcedBehavior를 사용하여 이벤트 처리용 리액터 만들기
EventSourcedBehavior.apply의 서명
object EventSourcedBehavior {
// 永続的なアクター用のBehaviorを作成するapplyメソッド
def apply[Command, Event, State](
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) => Effect[Event, State],
eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = ???
EventSourcedBehavior.apply
를 사용할 때 다음과 같은 정보를 전달한다.
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
object MyPersistentBehavior {
sealed trait Command
final case class Add(data: String) extends Command
case object Clear extends Command
// Eventは過去形で定義している
sealed trait Event
final case class Added(data: String) extends Event
case object Cleared extends Event
// 最新の5つの状態をもつリスト
final case class State(history: List[String] = Nil)
//コマンドハンドラは、Effect永続化する1つまたは複数のイベント(存在する場合)を定義する指示を返します。
val commandHandler: (State, Command) => Effect[Event, State] = {
(state, command) =>
command match {
// エフェクトはファクトリを使用して作成
case Add(data) => Effect.persist(Added(data))
case Clear => Effect.persist(Cleared)
}
}
//イベントが正常に永続化されると、eventHandlerを使用してイベントを現在の状態に適用することにより、新しい状態が作成されます
val eventHandler: (State, Event) => State = { (state, event) =>
event match {
case Added(data) => state.copy((data :: state.history).take(5))
case Cleared => State(Nil)
}
}
def apply(id: String): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
// persistenceId 永続アクターの一意の識別子。ofUniqueIdで独自の識別子を作っている
persistenceId = PersistenceId.ofUniqueId(id),
// emptyState エンティティが最初に作成されるタイミングを定義
emptyState = State(Nil),
// 効果を生成することによってコマンドを処理する方法
commandHandler = commandHandler,
// イベントが永続化されたときの現在の状態を指定して、新しい状態を返す
eventHandler = eventHandler
)
}
코드 인용자Example and core API끝맺다
이번에는 간단한 샘플만 봤는데 퍼시스턴스(Event Sourcing)를 사용한 것 자체가 간단했다.
이번 문헌과 참고문헌을 보니 이제야 이해가 되네요. 앞으로 Cluster Sharding을 사용하려고 해요.
참고 문헌
Reference
이 문제에 관하여(Akka의 Persistence(Event Sourcing) 용도 정보), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://zenn.dev/taketora/articles/5b1fc78932656c텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)