[상세 설명] opensearch-js의 helpers에서 search scroll과 bulk의 API를 처리합니다

안녕하세요!
작년부터 제 본업은 OpenSearch의 전체적인 운용과 유지를 진행했습니다.
최근 Opensearch-js(OpenSearch의 JS 클라이언트 라이브러리)를 처리하는 문서 액세스 처리에 여러 차례 개선되었다.
나는 그중에서 얻은 지식과 견문을 수출하고 싶다.

개막사


전제는 다음과 같은 집행 환경이다.
※ (후술) 일부 예외
큰 가방
version
AWS OpenSearch Service
1.0
opensearch-js
1.1.0

client.helpers를 이용해서 얻은 이점


ElasticSearch든 OpenSearch든
일시적으로 읽기/쓰기 요청에 집중하면 JVM 메모리 압력이 상승하여 회로 차단기가 작동합니다.
이렇게 되면 IO 성능이 떨어지고 429Too Many Request가 돌아오며read/write 자물쇠도 잠깁니다
부수적인 시스템에 영향을 줄 수 있다.
그리고client는 다음과 같은 번거로운 제어를 진행합니다.helpers가 갈 거예요.
  • 범용
  • 429 반환 시 자동retry(기본 최대 3회)
  • helpers를 이용하지 않고 아무것도 깨닫지 못하면 Response Error가 업그레이드
  • retry에서 즉시 다시 요청하는 것이 아니라wait(기본값 5000ms) 이후 요청
  • client.helpers.scrollSearch
  • 가져오기 전에 귀속 함수를 인코딩할 필요가 없습니다
  • 또한scroll의 끝에서clearScroll의 수속을 진행하는 등 번거로운 제어가 필요 없음
  • client.검색 응답이나 첨부 처리의 인코딩은 검색을 사용하는 것보다 쉽다
  • client.helpers.bulk
  • 분할 요청 사이즈와 동시 요청 수의 제한을 제어하기 쉽다
  • 쓰기 실패에 대한 쿼리
  • 도 얻을 수 있음

    검색 API 처리 방법


    helpers를 사용하지 않을 때


    스크롤을 자세히 제어할 때의 콜백 함수를
    나는 더욱 복잡한 통제가 있을 것이라고 생각한다.
    await new Promise((resolve, reject) => {
      const results = [];
      client.search({
        index: 'hoge',
        size: 10,
        scroll: '1m',
        body: {
          query: { ...foo },
        },
      }, async function scroll(error, response){
        try {
          if (!response || !response.body.hits) {
            if(response.body._scroll_id) await client.clearScroll({ scroll_id: response.body._scroll_id });
            reject(error);
          }
          if (response.body.hits.hits.length <= 0) {
            await client.clearScroll({ scroll_id: response.body._scroll_id });
            resolve(results)
          }
          results.push(response.body.hits.hits);
          client.scroll(
            { scroll_id: String(response.body._scroll_id), scroll: '1m' },
            scroll
          );
        } catch(e) {
          await client.clearScroll({ scroll_id: response.body._scroll_id });
          reject(error);
        }
      });
    });
    

    helpers 사용 시


    틀릴 때의 수속은 지능적으로 변한다.
    const scrollSearch = client.helpers.scrollSearch({
      index: 'hoge',
      size: 10,
      scroll: '1m',
      body: {
        query: { ...foo },
      },
    }, { maxRetries: 3, wait: 5000 });
    
    for await (const response of scrollSearch) {
      console.log(response)
    }
    
    helpers.scrollSearch 사용에 대한 고려 사항은 다음과 같습니다.

    두 번째 매개 변수에 대한 옵션


    키 이름
    과업
    maxRetries
    429 오류 반환 시 최대 재시도
    wait
    429 오류 발생 시 재시도되는 ms

    response를 꺼낼 때.

  • response.body.존재hits
  • && response.body.hits.hits.length > 0
  • 일은 이미 확정되었다.
    이하.
    https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L122-L122

    response 추출 방법


    client.helpers.scrollSearch 자체가 Generater 함수입니다.
    순차적으로 반환하다.
    https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L129-L129
    ※ 따라서 추출할 때마다 조금씩 뿌려집니다.

    스크롤에 대한clear 방법


    Generation 함수에서 터미널의response를 꺼낼 때,clearScroll이 자동으로 진행됩니다.
    https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L150-L150
    다음은clear scroll의 처리입니다.
    https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L114-L120
    또한 터미널의response를 꺼내기 전에clear scroll을 원하는 경우 다음과 같다.
    여기서
    for await (const response of scrollSearch) {
      if (cond) response.clear()
    }
    
    response.clear에clearScroll 처리된iterator가 있는지 확인할 수 있습니다.
    https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L126-L126
    clearScroll의 제어 처리
    https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L114-L120

    429 Too Many Request 제어


    429 오류 시,wait (두 번째 인자의 옵션에서 지정한wait) 에서 요청합니다.
    https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L141-L142
    retry (두 번째 인자의 옵션에서 지정한 retry) 횟수 전에 그것을 다시 요청합니다.
    https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L135-L135
    ※ Retry 횟수가 되기 전 429가 해제되지 않은 경우에는 Response Error가 레벨업됩니다.
    https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L144-L146

    Bulk API 처리 방법 정보


    helpers를 사용하지 않을 때


    그냥 정상적인 시스템 처리가 이렇게 되면
    const response = await client.bulk({
      index: "test",
      body: [
        { create: { _id: "sample-1" } },
        { user_id: 1, name: "foo" },
    
        { index : { _id : "sample-2" } },
        { user_id : 2, name: "hoge" },
    
        { delete : { _id : "sample-2" } },
    
        { update : { _id : "sample-1" } },
        { doc : { name : "Foo" } },
    
        { update : { _id : "sample-2" } },
        { script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo" } }, scripted_upsert: true }
      ]
    });
    
    또한 429 오류를 반환하지 않기 위해
  • 크기 조정 요청
  • 동시 요청 수 제한
  • 등의 조화는 좀 번거롭다.
    또한 OpenSearch 측 리소스가 넉넉한 경우에도
    모든 요청의 문장수 상한선max_clause_count은 반드시 주의해야 한다.
    그 밖에
    어떤 조회가 실패했는지 실패한 조회만 꺼내려고 합니다
    이런 통제 처리는 쉽게 번잡해진다.

    helpers 사용 시


    개막사


    원래helpers.bulk에서 업데이트 작업하는script 문서가 지원되지 않기 때문에
    사용할 수 있도록 수정하고 홍보를 했습니다.
    https://github.com/opensearch-project/opensearch-js/pull/210
    ↑ 아직 뽑히지 않았다
    패치를 먼저 사용합니다.
    const datasource = [
      [
        { create: { index: "test", _id: "sample-1" } },
        { user_id: 1, name: "foo" },
      ],
      [
        { index : { index: "test", _id : "sample-2" } },
        { user_id : 2, name: "hoge" },
      ],
      [
        { delete : { index: "test", _id : "sample-2" } },
      ],
      [
        { update : { index: "test", _id : "sample-1" } },
        { doc : { name : "Foo" } },
      ],
      [
        { update : { index: "test", _id : "sample-2" } },
        { script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo" } }, scripted_upsert: true }
      ]
    ];
    
    await client.helpers.bulk({
      datasource,
      onDocument: doc => {
        // deleteオペレーションの場合は、'{"delete":{"_id":"sample-2"}}'のみ返却する
        if(doc[0]?.delete){
          return doc[0];
        }
        return doc;
      },
      onDrop: ({operation, document, retried, error}) => {
        if (retried) return; // trueの場合は "retries > リトライ回数" の状態
        const query = [operation];
    
        // deleteオペレーション以外の場合は2つ目のobject(アクションクエリ)がある
        if (document) query.push(document);
    
        // [{"create":{"_id":"sample-1"}},{"user_id":1,"name":"foo"}]
        console.log(JSON.stringify(query))
      },
      concurrency: 2,
      flushBytes: 1024 * 1024 * 10,
      refreshOnCompletion: false,
      retries: 3,
    });
    

    onDocument


    bulk에서 요청한 데이터에 사용할 콜백 함수를 되돌려줍니다.
    콜백 함수 매개 변수doc에서
    다음 순서대로 데이터 원본 파라미터의 값을 꺼낼 수 있습니다.
  • datasource[0]
  • datasource[1]
  • datasource[2]
  • datasource[3]
  • 또한bulk에서 요청한 데이터는 다음과 같이 반환해야 합니다.
    update(doc)update(script) 만지지 않으면patch 기재된 반환치 동작에 따르지 않습니다.
    operation
    기대치
    create
    [ { create: { index: "test", _id: "sample-1"} }, { user_id: 1, name: "foo"} ]
    index
    [ { index: { index: "test", _id: "sample-2"} }, { user_id : 2, name: "hoge"} ]
    delete
    { delete : { index: "test", _id : "sample-2"} }
    update(doc)
    [ { update : { index: "test", _id : "sample-1"} }, { doc : { name : "Foo"} } ]
    update(script)
    [ { update : { index: "test", _id : "sample-2"} }, { script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo"} }, scripted_upsert: true } ]
    따라서 delete 동작에서만 Object로 되돌아오는 것을 제어합니다.
    onDocument: doc => {
      if(doc[0]?.delete){
        return doc[0];
      }
      return doc;
    },
    

    onDrop


    요청이 실패했을 때 실행되는 콜백 함수입니다.
    매개 변수에 전달된 Object에는 다음과 같은 정보가 있습니다.
    콜백을 통해 전달되는 매개 변수
    컨텐트
    operation
    예:object 상태에서 데이터 원본 [0][0]과 데이터 원본[1][0]이 포함된 조작 데이터
    document
    예:object의 상태에서 데이터 원본[0][1]과 데이터 원본[1]을 포함하는 문서 데이터.delete 작업에서null을 저장합니다.
    retried
    지정한 횟수 (기본값은 3회, 또는retries 옵션으로 지정) 를 다시 시도하면 가짜가 추가됩니다.그 외의 상황은 사실이다.
    status
    마지막으로 실패한 HTTP 상태 코드입니다.
    error
    마지막으로 실패했을 때의 오류 내용입니다.

    추가 매개변수 정보


    매개 변수
    컨텐트
    concurrency
    최대 동시 (비동기 병렬) 요청 수
    flushBytes
    요청한 최대 사이즈입니다.초과 시 그때 요청
    refreshOnCompletion
    index, 업데이트, delete,bulkapi의 refresh 옵션와 동일
    wait
    429 오류 발생 시 재시도되는 ms
    retries
    429 오류 반환 시 최대 재시도

    429 Too Many Request 제어

    wait 밀리초 후 bulk 요청이 다시 시도됩니다.
    또한 retries회로 재시도됩니다.
    이때onDropcallback 함수retried의 매개 변수에는 다음과 같은 수치가 포함되어 있습니다.
    컨디션
    retried 값
    재시도 횟수
    true
    재시도 횟수>=retries
    false

    bulk 요청 실패에 대한 검색 방법


    operation과document을 통해 onDropp의 콜백 함수를 얻을 수 있습니다.
    onDrop: ({operation, document}) => {
      const query = [operation];
    
      // deleteオペレーション以外の場合は2つ目のobject(アクションクエリ)がある
      if (document) query.push(document);
    
      // [{"create":{"_id":"sample-1"}},{"user_id":1,"name":"foo"}]
      console.log(JSON.stringify(query))
    }
    
    delete 조작을 제외한 경우 다음과 같은 데이터 원본의 조회를 얻을 수 있습니다.
    [operation, document]
    
    ※ delete 작업 시document는 비어 있으니 주의

    총결산


    OpenSearch의 공식 문서가 아직 충실하지 않기 때문이다
    ElasticSearch의 공식 문서를 자주 참고하세요.
    나는 GW의 후반부에서 부하를 완화시키는 조화를 좀 더 배우고 싶다.

    좋은 웹페이지 즐겨찾기