까다로운 데이터 흐름.2: MongodB 보기에서 문서 가져오기

이것은 내가 까다로운 데이터 흐름 시리즈의 두 번째 회이다. 이 회에서 나는 구글 클라우드 데이터 흐름을 사용하여 파이프를 실현할 때 겪는 가장 까다로운 문제들과 내가 어떻게 이런 문제들을 극복했는지 소개했다.
이번에는 완전히 다른 데이터베이스 스타일인 몬godB에 대해 이야기해 봅시다.
MongoDB는 현재 DB 세계에서 상당히 보편적이며 시장에서 가장 유명한 Nosql 데이터베이스라고 할 수 있다.이에 따라 Dataflow SDK는 예상대로 하나MongoDB connector ready to ease the usage of MongoDB as a datasource가 있다.
그것은 MongoDB의 집합을 읽고 쓰는 능력을 제공하기 때문에 나는 (당시 MongoDB에 익숙하지 않았던 천진난만한 나) 이런 간단한 파이프라인을 실현하는 데 필요한 모든 것을 실현한다고 생각한다.

그러나 물론, 그렇지 않으면 블로그를 쓰는 것은 의미가 없다. 모든 것이 내가 예상했던 것처럼 순조롭지 않다.

그래서 당신은 보기를 조회하고 싶군요. 그렇습니까?


파이프의 첫 번째 버전에서 나는 몸을 풀기 위해 만든 것이다. 나는 MongoDbIO.read().withUri(...).withDatabase(...).withCollection(...)로 수집한 문서를 직접 읽었지만 진정한 문제에 부딪히지 않았다.그러나 한 가지 미묘한 것이 있다. 당시에 나는 이 점의 중요성을 의식하지 못했다.
소스 MongoDB 인스턴스가 AtlasMongoDbIO was not allowed to run the default splitVector() command에 호스팅되므로 it was mandatory to add withBucketAuto(true) clause to download the collection.
내가 순진하게 보기 이름을 사용해서 집합을 대체하려고 시도했을 때, 나는 이런 어려움이 발생할 것이라고 예상하지 못했다.

[WARNING] org.apache.beam.sdk.Pipeline$PipelineExecutionException: com.mongodb.MongoCommandException: Command failed with error 166 (CommandNotSupportedOnView): 'Namespace [myview] is a view, not a collection' on server [***]


분명히 MongodB는 나의 보기를 알고 있습니다. 이 보기를 요청하고 싶지만, 그렇지 않습니다. 문서를 검색할 수 없습니다.사실 보기에서 문서를 얻는 것은 결코 간단한 방법이 아니다.이것은 당연히 좋은 해석이 하나 있지만, 나는 찾을 수 없다.너무 낙담했어...

그런 느낌 알잖아...(위키백과 사진/Nlan86)
실제로 몬고DB의 보기는 SQL 세계의 일반적인 보기처럼 간단하지 않다. 몬고DB의 보기는 집합 파이프 처리 집합 문서의 결과이다.AggregationQuery.withQueryFn()에 전달할 수 있기 때문에 MongodbIO는 읽기 집합에 대해 집합 조회를 실행할 수 있다.솔루션 출시:
  • 모음에서 읽기
  • 보기 옵션
  • 에서 집합 정의 검색
  • 집합 파이프를 withQueryFn에 전달
  • MongoDB는 제공된 파이프라인을 통해 문서를 처리하고 보기
  • 와 같은 문서를 생성합니다
    우리 계획대로 합시다!

    뷰의 집계 파이프 읽어들이기


    파이프를 얻기 위해서는 mongojava 클라이언트를 직접 사용하고 정보를 수집하는 데 사용해야 합니다.매우 지루함:
    static List<BsonDocument> retrieveViewPipeline(Options options) {
            if (Strings.isNullOrEmpty(options.getView())) {
                LOG.debug("No view in options");
                return new ArrayList<>();
            }
            com.mongodb.MongoClientOptions.Builder optionsBuilder = new com.mongodb.MongoClientOptions.Builder();
            optionsBuilder.maxConnectionIdleTime(60000);
            MongoClient mongoClient = new MongoClient(new MongoClientURI("mongodb+srv://" + options.getMongoDBUri(),
                    optionsBuilder));
    
            List<Document> viewPipeline = null;
            for (Document collecInfosDoc : mongoClient.getDatabase(options.getDatabase()).listCollections()) {
                if (collecInfosDoc.getString("name").equalsIgnoreCase(options.getView())) {
                    viewPipeline = collecInfosDoc.get("options", Document.class).getList("pipeline", Document.class);
                    break;
                }
            }
            checkArgument(viewPipeline != null, String.format("%s view not found", options.getView()));
    
            return viewPipeline.stream().map((doc) -> doc.toBsonDocument(BsonDocument.class,
                    MongoClient.getDefaultCodecRegistry())).collect(Collectors.toList());
        }
    

    MongodbIO로 파이프 보내기


    앞에서 말한 바와 같이 MongodbIO는 집합을 처리하는 방법이 하나 있다. withQueryFn.그러나 파이프에 여러 단계가 있을 때 이런 방법은 실제로has a little bug in the current version (2.27):

    71줄: 파이프 마지막 단계의 어려운 시간: (Github 캡처)
    물론 간단한 해결 방법이 하나 있다. 파이프 목록에 쓸모없는 항목을 추가하면 bucket() 단계에 의해 대체될 것이다.
    if (viewPipeline.size() > 1) {
        viewPipeline.add(new BsonDocument());
    }
    
    이제 다음과 같이 구성된 소스 커넥터를 사용하여 보기 문서를 읽어들일 수 있습니다.
    PCollectionTuple mongoDocs =
        pipeline.apply("Read from MongoDB",
            MongoDbIO.read()
            .withUri("mongodb+srv://" + options.getMongoDBUri())         
            .withDatabase(options.getDatabase())                        
            .withCollection(options.getCollection())
            .withBucketAuto(true) 
            .withQueryFn(
                AggregationQuery.create()
                    .withMongoDbPipeline(viewPipeline))
        )
    

    근데 잠깐만!그것은 대형 소장품에 효과가 있습니까?


    드디어!이제 테스트 데이터에서 문서를 집중적으로 검색할 수 있으며, 실제적이고 거대한 몬godB 보기에서 반짝이는 새 파이프를 테스트할 수 있습니다.그리고...

    com.mongodb.MongoCommandException:
    Command failed with error 16819 (Location16819): ‘Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.’


    ... 결국 너는 아직 다 하지 못했다.적어도 오류 정보는 매우 명확하다. 몬goDB 실례에서 집합 파이프를 처리할 때 메모리 (RAM) 제한이 초과되었다.유감스럽게도, 이 제한은 설정할 수 없는 것이다.유일한 해결 방법은 몬goDB가 교환 파일을 사용할 수 있도록 하는 것이다. 집합 파이프 옆에 파라미터allowDiskUse: true를 설정해서 교환 파일을 강제로 사용할 수 있다.AggregateIterable.allowDiskUse() 때문에 이 매개 변수는 mongojava 클라이언트를 통해 쉽게 접근할 수 있습니다.문제는 불행하게도 이런 방법은 아직 몬godbIO에 공개되지 않았다는 점이다.There is a feature request for it 하지만 아직 로드맵에는 없습니다.
    불행하게도 allowDiskUse()MongoDB 광속 연결기의 두 위치는 필수적이며 덮어쓸 수 없습니다.
  • MongoDbIO.buildAutoBuckets
  • AggregateIterable<Document> buckets = mongoCollection.aggregate(aggregates).allowDiskUse(true);
    
  • AggregationQuery.apply()
  • return collection.aggregate(mongoDbPipeline()).allowDiskUse(true).iterator();
    
    따라서 현재 이런 종류를 편집하는 유일한 방법은 갈라지거나 복제하는 것이다.이것은 완벽하지는 않지만 최소한 파이프 의존 항목을 정리할 수 있다.
        <!-- MongoDB connector -->
        <!-- Because of limitations, a fork of this lib is used -->
        <!--<dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-mongodb</artifactId>
          <version>${beam.version}</version>
        </dependency>-->
        <!-- The fork needs the Mongo-java driver -->
        <dependency>
          <groupId>org.mongodb</groupId>
          <artifactId>mongo-java-driver</artifactId>
          <version>3.12.7</version>
        </dependency>
    
    mongojava 드라이버만 있으면 됩니다
    이 긴 이야기는 즐거운 결말을 맺었다. allow DiskUse와 swap 파일 덕분에 당신의 맞춤형 몬godbIO 연결기는 이제 어떤 크기의 몬godb 보기를 조회할 수 있습니다!
    2회예요.계속해서 다음을 주목해 주십시오. 저는 GCP 작업 흐름을 소개할 것입니다. 이것은 데이터 흐름을 조율하는 편리한 방식입니다.

    좋은 웹페이지 즐겨찾기