NATS 흐름의 영구 로그 용례

26399 단어 loggonatsstreamingnats

영구 로그란?


이 상하문에서 로그는 질서정연한 메시지 서열로 로그에 추가할 수 있지만 기존 메시지를 되돌려주고 변경할 수 없습니다.지속성 비트는 단지 그것들이 기억되고 서버가 다시 시작된 후에 (디스크에서) 지속될 수 있다는 것을 의미할 뿐이다.

NATS 스트리밍이란 무엇입니까?


NATS StreamingNATS에 구축된 경량급 스트리밍 플랫폼으로 지속성 로그에 API를 제공한다.
다음과 같은 특징이 있습니다.
  • 경량, Go
  • 으로 작성
  • 단일 바이너리, 제로 런타임 종속성
  • 주문, 로그 기반 지속성
  • 최소 1회 제공 모델
  • 자동 사용자 오프셋 추적
  • 스트리밍 메시지 재방송 지원
  • 이러한 속성은 Apache Kafka이 제공하는 질서정연하고 로그 기반의 지속적인 흐름과 유사하다.이 두 시스템 사이에는 당연히 차이가 있지만, 우리는 여기서 그것들을 토론하지 않는다.내가 보기에 NATS 흐름 (및 NATS) 의 가장 좋은 특징은 그것과 클라이언트 API를 조작하는 단순성이다.더 많은 것을 알고 싶으시면 댓글로 남겨주세요:)

    용례


    나는 publish-subscribe pattern이 NATS 스트리밍이 제공하는 핵심 API의 기본 지식이라고 가정할 것이다.그러나 설령 없다 하더라도 너는 따르는 어려움이 있어서는 안 된다.
    다음은 게시-구독 모드의 특정 변체를 사용하여 해결할 수 있는 용례 목록입니다.이러한 문제를 해결하기 위해 Subscription API NATS 스트림을 사용하는 방법을 보여주고 제공된 의미와 보증을 논의합니다.
    구독자의 관점에서 보면...
  • "스트리밍으로만 메시지 받고 싶어요"
  • "연결이 끊기면 끊긴 곳에서 계속하고 싶어요"
  • "저는 새로 왔습니다. 이 개울의 모든 역사를 읽고 싶습니다."
  • "한 번만 처리하면 됩니다"
  • "메시지 처리 업무를 공유하고 싶습니다"
  • NATS 스트림 설정


    코드를 작성하고 이 모드를 시도하려면 download a release of NATS Streaming from GitHub으로 전화하십시오.그리고 official Docker image called nats-streaming .
    단일 바이너리 파일을 다운로드한 경우 다음 옵션을 사용하여 압축을 풀고 실행할 수 있습니다.
    $ nats-streaming-server \
      --store file \
      --dir ./data \
      --max_msgs 0 \
      --max_bytes 0
    
    기본적으로 NATS 스트림은 메모리 스토리지를 사용합니다.--store 옵션은 파일을 기반으로 변경하는 데 사용되며 재부팅 후에도 계속 사용할 수 있습니다.--max_msgs--max_bytes은 모든 채널의 모든 메시지를 보존하기 위해 0으로 설정됩니다.그렇지 않으면 서버는 기본적으로 100만 개의 메시지 또는 ~100MB 크기로 설정되며 이 경우 채널의 메시지는 도달한 제한보다 낮게 삭제됩니다(기록 삭제).
    일단 그것이 셸에서 실행되면, 우리는 코드를 작성하기 시작할 수 있다.코드 예시에 대해 나는 Go client을 사용할 것이다.downloads page에는 몇 개의 공식 고객과 몇 개의 지역사회 고객이 있다.

    샘플 코드


    우선 우리는 연락을 맺어야 한다.
    package main
    
    import (
        "log"
        stan "github.com/nats-io/go-nats-streaming"
    )
    
    // Convenience function to log the error on a deferred close.
    func logCloser(c io.Closer) {
        if err := c.Close(); err != nil {
            log.Printf("close error: %s", err)
        }
    }
    
    
    func main() {
        // Specify the cluster (of one node) and some client id for this connection.
        conn, err := stan.Connect("test-cluster", "test-client")
        if err != nil {
            log.Print(err)
            return
        }
        defer logCloser(conn)
    
        // Now the patterns..
    }
    

    '저는 그냥 흐름을 통해서 메시지를 받고 싶어요'.


    기본 설정의 구독을 사용하여 이 용례를 해결합니다.
    handle := func(msg *stan.Msg) {
        // Print message data as a string to stdout.
        fmt.Printf("%s", string(msg.Data))
    }
    
    sub, err := conn.Subscribe(
      "stream-name",
      handle,
    )
    if err != nil {
        log.Print(err)
        return
    }
    defer logCloser(sub)
    
    이렇게 간단해.NATS 스트리밍은 메시지가 순차적으로 수신되고 처리되도록 보장합니다.다음 예제에서 해결할 경고는 메시지가 서버(예: 연결 끊기 또는 시간 초과)로 처리되었음을 확인(확인)할 때 문제가 발생하면 나중에 (처리되기 전의 메시지 이후) 메시지가 다시 전달된다는 것입니다.
    마찬가지로 프로세스 중에 오류가 발생하면 기본적으로 ACK를 보내지 않을 수 없습니다.구독 옵션을 추가하여 stan.SetManualAckMode() 을 해결할 수 있습니다.
    handle := func(msg *stan.Msg) {
        // If the msg is handled successfully, you can manually
        // send an ACK to the server. More importantly, if processing
        // fails, you can choose *not* send an ACK and you will receive
        // the message again later.
    
        // This will only fail if the connection with the server
        // has gone awry.
        if err := msg.Ack(); err != nil {
            log.Prinf("failed to ACK msg: %d", msg.Sequence)
        }
    }
    
    conn.Subscribe(
        "stream-name",
        handle,
        stan.SetManualAckMode(),
    )
    
    만약 메시지가 처음 실패했다면 왜 다시 보냈을까 생각할 수도 있다.이것은 어떤 실패에 달려 있지만, 일시적인 실패라면 두 번째나 세 번째는 효과가 있을 수 있다.만약 그것이 당신의 코드 중의 버그이고 영원히 성공하지 못한다면 어떻게 해야 합니까?

    "연결이 끊기지 않도록 끊긴 곳을 계속하고 싶습니다."


    기본 옵션을 사용하면 구독은 온라인에서만 추적됩니다.클라이언트가 잠시 후에 다시 구독을 하면 새로운 메시지만 받는다는 뜻이다.오프라인에서 게시된 메시지는 수신되지 않습니다.
    어떤 용례에 대해서는 작업 대기열, data replication streams, CQRS architectures이 필요할 수도 있다.
    구독자를 복구 가능하게 하는 것은 다른 구독 옵션을 추가하는 것처럼 간단합니다.
    handle := func(msg *stan.Msg) {
        // ...
    }
    
    conn.Subscribe(
        "stream-name",
        handle,
        stan.DurableName("i-will-remember"),
    )
    
    stan.DurableName 옵션은 특정 구독을 위한 이름을 사용합니다.이것은 흐름에 연결되어 있기 때문에, 서로 다른 흐름에 대해 같은 지속적인 이름을 다시 사용할 수 있으며, 모든 흐름의 편이도는 단독으로 추적됩니다.
    이전 절의 끝에서, 나는 처리 프로그램 코드에 오류가 발생하면 무슨 일이 일어날지 물었다.지속적인 구독이 있으면 구독 서버를 자유롭게 오프라인으로 전환하고 오류를 복구한 다음 온라인으로 되돌려 멈추는 곳을 계속할 수 있다.
    프로세서가 실패했는지 확인하려면 이 오류를 기록해야 하지만, 다시 시도할 수 없는 첫 번째 오류가 발생했을 때 바로 연결을 끊을 수도 있습니다.
    // Declare above so the handler can reference it.
    var sub stan.Subscription
    
    handle := func(msg *stan.Msg) {
        err := process(msg)
    
        // Close subscription on error.
        if err != nil {
            logCloser(sub)
        }
    }
    
    sub, _ = conn.Subscribe(
        "stream-name",
        handle,
        stan.DurableName("i-will-remember"),
    )
    
    메시지가 순서대로 처리되기 때문에 첫 번째 오류가 발생할 때 구독을 닫으면 후속 메시지 처리를 막을 수 있습니다.다시 연결할 때 실패한 메시지가 다시 전달되고 그 다음에 모든 새로운 메시지가 전달됩니다.
    이런 방법은 또 어떤 상황이 발생하든지 간에 처리가 완전히 질서정연하다는 것을 보장한다.고장 이외의 소식은 처리되지 않기 때문에 재교부는 새로운 소식과 교차할 수 없습니다.MaxInFlight 옵션과 수동 확인을 통해 이러한 주문이 가능합니다.
    handle := func(msg *stan.Msg) {
        err := process(msg)
        if err == nil {
            msg.Ack()
        }
    }
    
    conn.Subscribe(
        "stream-name",
        handle,
        stan.DurableName("i-will-remember"),
        stan.MaxInflight(1),
        stan.SetManualAckMode(),
    )
    
    구독을 닫지 않아도 메시지가 순서대로 처리되고 재시도될 수 있습니다. 왜냐하면 한 번에 하나의 메시지가 비행 중이기 때문입니다.과거의 예에는 이런 제한이 없었기 때문에 여러 개의 메시지가 줄을 서서 처리를 기다릴 것이다.

    "한 번만 처리하면 돼요."


    위의 두 가지 예 중 하나는 처리가 성공했지만 ACK가 실패하면 무슨 일이 일어날까요?NATS 스트리밍에는 "적어도 한 번"전송 모드가 있는데, 이는 서버가 확인되지 않으면 같은 메시지를 계속 다시 전달한다는 것을 의미한다.
    이러한 상황을 만족시키기 위해서, 클라이언트는 성공적으로 처리된 마지막 메시지를 보류하는 책임을 져야 한다.
    var lastProcessed uint64
    
    handle := func(msg *stan.Msg) {    
        // Only process messages greater than the last one processed.
        // If it has been seen, skip to acknowledge it to the server.
        if msg.Sequence > lastProcessed {
            if err := process(msg); err != nil {
                // Log error and/or close subscription.
                return
            }
    
            // Processing successful, set the `lastProcessed` value.
            atomic.SwapUnint64(&lastProcessed, msg.Sequence)
        }
    
        // ACK with the server.
        msg.Ack()
    }
    
    conn.Subscribe(
        "stream-name",
        handle,
        stan.DurableName("i-will-remember"),
        stan.MaxInflight(1),
        stan.SetManualAckMode(),
    )
    
    서버는 클라이언트가 확인한 마지막 메시지 ID를 유지하지만, 한 번만 처리할 수 있도록 클라이언트는 월드맵을 유지해야 합니다.재부팅 후 이를 실현하려면 클라이언트는 lastProcessed 값을 영구화하고 시작할 때 불러와야 한다.그러나 이것은 마지막으로 처리된 메시지의 ID를 포함하는 로컬 파일처럼 간단할 수 있습니다.

    "이 개울의 모든 역사를 읽고 싶어요."


    이 용례는 흐름을 바탕으로 내부 상태를 구축하고자 하는 소비자들에게 가장 적합하다.사실상 this approach is exactly how many databases work은 검색을 지원하기 위해 내부 색인을 유지하는 데 매우 어렵다.모든 변경 사항은 로그에 기록된 다음, 내부 프로세스에서 이 변경 사항을 메모리에 있는 인덱스에 적용해서 빠른 검색을 지원합니다.
    일회용 인덱스를 구축하지 않으면 지속적인 구독을 원하기 때문에 다시 시작할 때 작은 변경 사항만 처리할 수 있습니다.처음부터 다른 선택일 뿐이다.
    handle := func(msg *stan.Msg) {
        // ...
    }
    
    conn.Subscribe(
        "stream-name",
        handle,
        stan.DurableName("i-will-remember"),
        stan.DeliverAllAvailable(),
    )
    
    오래된 메시지를 처리할 내부 상태의 새로운 버전을 배치하려고 할 때 (버그를 발견하거나 더 많은 기능을 사용하고 있기 때문에) 이것은 좋은 모델입니다.이것은 오프라인으로 완성할 수 있으니 얼마나 걸리든지 상관없다.일단 완공되면 구 버전과 함께 배치할 수 있고 데이터는 새 버전으로 이동할 수 있다.

    "메일 처리 업무를 공유하고 싶어요."


    지금까지 모든 용례는 구독자 한 명만 있으면 일을 완성할 수 있었다. 왜냐하면 이런 상황에서 구독은 매우 중요하기 때문이다(첫 번째는 제외될 수 있다).단, 정렬이 중요하지 않거나, 메시지 처리가 병행될 수 있다면 (나중에 조율될 수 있음), '대기열 구독자' 를 사용할 수 있습니다.
    대기열 구독자는 여러 클라이언트가 같은 '대기열 이름' 을 사용하여 같은 흐름을 구독할 수 있도록 하고, 메시지는 대기열 그룹의 모든 구성원에게 나누어 줍니다.
    handle := func(msg *stan.Msg) {
        // ...
    }
    
    conn.QueueSubscribe(
        "stream-name",
        "queue-name",
        handle,
        // options: durable, manual ack, etc.
    )
    
    위의 모든 옵션은 내구성을 포함하여 여전히 적용됩니다.DurableName 옵션을 추가하면 지속적인 대기열 구독이 가능합니다.

    후기


    항상 SetManualAckMode() 사용


    이것은handle 함수에 몇 줄의 추가 코드를 추가하더라도 acking에 대한 제어를 제공합니다.다른 상황이 없으면 확인 실패인 not currently being done with implicit ACKs을 기록할 수 있다.

    정보로부터 시작


    모든 것이 오래 지속되어야 한다고 가정하기 전에, 중요한 것은 처리 중인 메시지의 유형을 고려하는 것이다.구체적으로 그것들은 시간에 대해 어떤 민감성을 가지고 있습니까?만약 그렇다면 구독 서버가 오래 지속될 필요가 없거나, 오래 지속된다면,handle 함수는 이 점을 알고 기한이 지난 메시지를 뛰어넘어야 한다.
    마찬가지로 전체 주문이 필요한지 고려해 보겠습니다.기본적으로, 흐르는 모든 메시지가 이전 메시지의 처리 결과에 의존한다면, 총 정렬을 해야 한다.이 때 MaxInflight(1)을 사용하거나 오류가 발생하면 자동으로 구독을 닫아야 합니다.

    QueueSubscribe와 "한 번에 한 번"


    '딱 한 번'의 예는 단일 구독자에게만 적용된다.대기열 구독의 경우 lastProcessed ID는 대기열 구독의 모든 구성원이 중앙 집중식(및 원자)으로 액세스해야 합니다.만약 이것이 가능하다면, 가장 간단한 방법은 공유 키 값을 사용하여 저장하는 것이다. 이것은 값을 설정하는 원자 조작을 지원한다.

    예제 시뮬레이션


    나는 some examples을 함께 놓고 위에서 토론한 몇 가지 장면을 두드러지게 했다.이 동작을 설명하기 위해 실행 가능한 예시와 출력을 자술한 파일에 제공합니다.

    좋은 웹페이지 즐겨찾기