Golang에서 MongoDB 변경 스트림 구현

15867 단어 gomongodb


변경 스트림이란 무엇입니까?



변경 스트림은 데이터베이스의 항목, 테이블/컬렉션 또는 컬렉션의 테이블/문서 행에 대한 변경 사항에 대해 거의 실시간으로 정렬된 정보 흐름(스트림)입니다. 예를 들어, 특정 컬렉션/테이블에서 업데이트(삽입, 업데이트 또는 삭제)가 발생할 때마다 데이터베이스는 수정된 모든 데이터로 변경 이벤트를 트리거합니다.

MongoDB 변경 스트림



MongoDB 변경 스트림은 폴링(훨씬 더 높은 오버헤드가 발생함)을 사용하지 않고 MongoDB 데이터베이스, 컬렉션 또는 클러스터에 대한 변경 사항을 애플리케이션에 알릴 수 있는 고급 API를 제공합니다. MongoDB 변경 스트림의 특성은 다음과 같습니다.
  • 필터링 가능
  • 응용 프로그램은 필요한 변경 알림만 수신하도록 변경 사항을 필터링할 수 있습니다.

  • 재개 가능
  • 각 응답이 재개 토큰과 함께 제공되기 때문에 변경 스트림을 재개할 수 있습니다. 토큰을 사용하여 애플리케이션은 중단된 지점에서 스트림을 시작할 수 있습니다(연결이 끊긴 경우).

  • 주문
  • 변경 알림은 데이터베이스가 업데이트된 것과 동일한 순서로 발생합니다.

  • 내구성
  • 변경 스트림에는 대다수가 커밋된 변경 사항만 포함됩니다. 따라서 수신 응용 프로그램에서 볼 수 있는 모든 변경 사항은 새 기본 선택과 같은 오류 시나리오에서 내구성이 있습니다.

  • 보안
  • 모음을 읽을 수 있는 권한이 있는 사용자만 해당 모음에서 변경 스트림을 만들 수 있습니다.

  • 사용하기 쉬운
  • 변경 스트림 API의 구문은 기존 MongoDB 드라이버 및 쿼리 언어를 사용합니다.


  • Golang을 사용하여 MongoDB Change Stream 실험하기



    전제 조건


  • MongoDB Atlas Cluster, https://www.mongodb.com/cloud/atlas에서 무료로 받으세요
  • 코드베이스는 https://github.com/ksingh7/mongodb-change-events-go.git에서 사용할 수 있습니다.

  • MongoDB 스트림 시작하기: Golang 구현




    # export MongoDB URI
    
    export MONGODB_URI="mongodb+srv://admin:[email protected]/myFirstDatabase?retryWrites=true&w=majority"
    
    git clone https://github.com/ksingh7/mongodb-change-events-go.git
    cd mongodb-change-events-go
    go mod tidy
    go run main.go
    


    데모 비디오



    다음은 이 구현을 이해하는 데 도움이 될 수 있는 데모 비디오 녹화입니다.



    코드 연습



    main.go file already has required guidelines in the form of comments. However, in this section, I will explain sections that I think are crucial

    • Declaring struct returned by MongoDB Stream API


    type DbEvent struct {
        DocumentKey     documentKey     `bson:"documentKey"`
        OperationType    string                   `bson:"operationType"`
    }
    type documentKey struct {
        ID      primitive.ObjectID      `bson:"_id"`
    }
    


  • 컬렉션과 유사한 구조체 선언

  • type result struct {
        ID               primitive.ObjectID       `bson:"_id"`
        UserID        string                            `bson:"userID"`
        DeviceType string                            `bson:"deviceType"`
        GameState   string                            `bson:"gameState"`
    }
    


  • MongoDB에 연결

  •     client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("MONGODB_URI")))
        if err != nil {
            panic(err)
        }
    


  • DB 및 컬렉션 이름 설정

  •     database := client.Database("summit-demo")
        collection := database.Collection("bike-factory")
    


  • 변경 스트림 생성

  •     changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{})
        if err != nil {
            panic(err)
        }
    


  • 변경 스트림을 반복합니다.

  •     for changeStream.Next(context.TODO()) {
            change := changeStream.Current
            fmt.Printf("%+v\n", change)
        }
    


  • 변경 유형(삽입 또는 업데이트)을 감지하고 그에 따라 문서를 가져옵니다.

  •         // Print out the document that was inserted or updated
            if DbEvent.OperationType == "insert" ||  DbEvent.OperationType == "update" {
                // Find the mongodb document based on the objectID
                var result result
                err  := collection.FindOne(context.TODO(), DbEvent.DocumentKey).Decode(&result)
                if err != nil {
                    log.Fatal(err)
                }
                // Convert changd MongoDB document from BSON to JSON
                data, writeErr := bson.MarshalExtJSON(result, false, false)
                if writeErr != nil {
                    log.Fatal(writeErr)
                }
                // Print the changed document in JSON format
                fmt.Println(string(data))
                fmt.Println("")
            }
    


  • 변경 스트림 닫기

  •     if err := changeStream.Close(context.TODO()); err != nil {
            panic(err)
        }
    


    보너스 : MongoDB 컬렉션에 레코드를 삽입하는 기능




    func insertRecord(collection *mongo.Collection) {
            // pre-populated values for DeviceType and GameState    
            DeviceType := make([]string, 0)
            DeviceType = append(DeviceType, "mobile","laptop","karan-board","tablet","desktop","smart-watch")
            GameState := make([]string, 0)
            GameState = append(GameState, "playing","paused","stopped","finished","failed")
    
            // insert new records to MongoDB every 5 seconds
            for {
                item := result{
                    ID: primitive.NewObjectID(),
                    UserID: strconv.Itoa(rand.Intn(10000)),
                    DeviceType: DeviceType[rand.Intn(len(DeviceType))],
                    GameState: GameState[rand.Intn(len(GameState))],
                }
                _, err := collection.InsertOne(context.TODO(), item)
                if err != nil {
                    log.Fatal(err)
                }
    
                time.Sleep(5 * time.Second)
            }
        }
    


    요약



    이 게시물을 통해 MongoDB Change Streams와 이를 애플리케이션에서 사용하는 방법에 대해 더 잘 이해할 수 있기를 바랍니다.

    좋은 웹페이지 즐겨찾기