CQRS 확장 가능한 집계

집계는 시스템의 핵심입니다. 그들은 도메인 논리를 보유하고 다음을 수행할 이벤트를 방출할 책임이 있습니다.
결국 여러 데이터 프로젝션 및 제한된 컨텍스트에서 데이터의 일관성을 유지합니다. 집계가 너무
중요합니다. 작고 이해하기 쉽게 유지하는 것이 중요하지만 도메인이 커지고
더 큰?

이 기사에서는 집계의 복잡성을 줄이고 확장성을 향상시키는 방법을 보여 드리겠습니다.

쓰기 측 도메인 개요



시작하기 전에 명령이 실행되고 이벤트로 전환되는 방법을 검토해 보겠습니다.

사용자가 보낸 명령을 수신한 후 명령 핸들러는 저장소를 호출하여 집계 상태를 로드합니다.
메모리. 그런 다음 도메인 이벤트 또는 오류를 내보내는 집계에서 명령이 실행됩니다. 마지막으로 그
이벤트 또는 실패는 저장소를 사용하여 지속됩니다.

예를 들어

[👤 Coder]
  -> [📨 Comment issue command]
  -> [📫 Comment issue handler]
  -> [🍇 Issue aggregate]
  -> [📅 Issue commented event]


첫번째 시도



먼저 실제 프로젝트에서 쓰기 측 도메인이 구현된 방식을 간략하게 보여 드리겠습니다.
에. 그런 다음 이 솔루션의 확장 문제와 집계를 깨끗하게 유지하기 위해 개선할 수 있는 방법에 대해 논의합니다.

다음과 같이 표시됩니다.

public class CommentIssue : ICommand
{
  public CommentIssue(Guid issueId, string message)
  {
    IssueId = issueId;
    Message = message;
  }

  public Guid IssueId { get; }
  public string Message { get; }
}

public class CommentIssueHandler : ICommandHandler<CommentIssue>
{
  private readonly IRepository<Issue> _repository;

  public CommentIssueHandler(IRepository<Issue> repository) => _repository = repository;

  public void Handle(CommentIssue command)
  {
    // 1. Loads the aggregate in memory.
    var issue = _repository.Find(command.IssueId);
    // 2. Invokes the aggregate method.
    issue.Comment(command.message);
    // 3. Saves the aggregate.
    _repository.Save(issue);
  }
}

public class Issue : AggregateRoot<Issue, Guid>
{
  private readonly ISet<IssueComment> _comments = new HashSet<IssueComment>();

  public Issue(Guid id) : base(id) { }

  public IEnumerable<IssueComment> Comments => _comments;

  public void Comment(string message)
  {
    if (string.IsNullOrWhiteSpace(message)) return;
    // `Emit()` internally invokes `Apply()` to avoid code duplication.
    Emit(new IssueCommented(new IssueComment(message)));
  }

  // All other command related methods go here e.g.
  // `public void Edit() { }`
  // `public void Close() { }`
  // `public void Unsubscribe() { }`

  protected override void Apply(IDomainEvent @event)
  {
    // This could be done using reflexion, but be aware of performance issues.
    switch (@event)
    {
      case IssueCommented e: Apply(e); break;
    }
  }

  private void Apply(IssueCommented @event) =>
    _comments.Add(new IssueComment(@event.CommentId, @event.Message));
}

public class IssueCommented : IDomainEvent
{
  public IssueCommented(Comment comment)
  {
    CommentId = comment.Id;
    Message = comment.Message;
  }

  public Guid CommentId { get; }
  public string Message { get; }
}


보시다시피 이 기술을 사용하면 추가하는 모든 명령에 대해 집계가 증가합니다. 이 때문에 쉽게
이 새로 생성된 집합체가 우리 도메인이 성장함에 따라 유지하기 어려운 엉망으로 변하는 속도를 상상해 보십시오.

또한 모든 명령 처리기는 정확히 동일한 작업을 수행합니다.
  • 메모리에 집계를 로드합니다.
  • 집계 메서드를 호출합니다.
  • 집계를 저장합니다.

  • 우리가 이것을 고칠 수 있는 방법을 보자!

    리팩토링된 솔루션



    비결은 명령과 이벤트가 스스로 실행/적용되도록 하는 것입니다!

    public class CommentIssue : ICommand<Issue>
    {
      public CommentIssue(Guid issueId, string message)
      {
        IssueId = issueId;
        Message = message;
      }
    
      public Guid IssueId { get; }
      public string Message { get; }
    
      // Commands now know how to execute themselves.
      public IEnumerable<IDomainEvent<Issue>> ExecuteOn(Issue aggregate)
      {
        if (string.IsNullOrWhiteSpace(Message)) yield break;
        yield return new IssueCommented(new IssueComment(IssueId, Message));
      }
    }
    
    public class IssueCommented : IDomainEvent<Issue>
    {
      public IssueCommented(IssueComment comment)
      {
        CommentId = comment.Id;
        Message = comment.Message;
      }
    
      public Guid CommentId { get; }
      public string Message { get; }
    
      // Events now know how to apply themselves.
      public void ApplyTo(Issue aggregate) =>
        aggregate.Comments.Add(new IssueComment(CommentId, Message));
    }
    


    이렇게 하면 집계 기본 클래스에서 새로운 Execute() 메서드를 구현할 수 있습니다.
    명령을 실행하고 반환된 이벤트를 적용합니다.

    또한 Apply() 메서드를 일반화하여 집계 기본 클래스로 이동할 수 있습니다.

    public abstract class AggregateRoot<TSelf, TId> : Entity<TSelf, TId>, IAggregateRoot<TId>
      where TSelf : AggregateRoot<TSelf, TId>
    {
      private readonly List<IDomainEvent<TSelf>> _uncommittedEvents = new List<IDomainEvent<TSelf>>();
    
      // ...
    
      public void Apply(IDomainEvent<TSelf> @event) => @event.ApplyTo((TSelf)this);
    
      public void Execute(ICommand<TSelf> command)
      {
        var events = command.ExecuteOn((TSelf)this);
        _uncommittedEvents.AddRange(events);
        foreach (var @event in events) Apply(@event);
      }
    }
    


    이제 Execute() 메서드를 추가했으므로 명령 처리기에서 바로 호출할 수 있습니다.

    또한 아래에서 볼 수 있듯이 명령 처리기의 이름은 이제 IssueCommandsHandler 대신 CommentIssueHandler 입니다. 왜
    그게? 이는 모든 명령 핸들러가 정확히 동일하여 동일한 명령을 사용할 수 있도록 충분히 단순화했기 때문입니다.
    모든 명령에 대한 명령 핸들.

    public class IssueCommandsHandler : ICommandHandler<CommentIssue, Issue>
    {
      private readonly IRepository<Issue, Guid> _repository;
    
      public IssueCommandsHandler(IRepository<Issue, Guid> repository) => _repository = repository;
    
      public void Handle(CommentIssue command)
      {
        var issue = _repository.Find(command.IssueId);
        // The command can now be executed directly on the aggregate.
        issue.Execute(command);
        _repository.Save(issue);
      }
    }
    


    우리의 목표는 집계를 정리하는 것이었고 아래에서 볼 수 있듯이 아무것도 남지 않았기 때문에 성공했다고 생각합니다!

    public class Issue : AggregateRoot<Issue, Guid>
    {
      public Issue(Guid id) : base(id) { }
    
      public ISet<IssueComment> Comments { get; } = new HashSet<IssueComment>();
    
      // Only common validations/business rules go here!
    }
    


    단순화를 위해 Comments.Add() 인터페이스를 통해 ISet를 노출했음에 유의하십시오. 실제 프로젝트에서
    사용자 정의 비즈니스 규칙을 보유하기 위해 IssueComments와 같은 사용자 정의 콜렉션 클래스를 추가하는 것을 고려하십시오.


    소스 코드를 찾을 수 있습니다
    here .

    관련된


  • Implementing an Event Sourced Aggregate 닉 체임벌린
  • Command Handlers 닉 체임벌린
  • 좋은 웹페이지 즐겨찾기