Nodejs를 사용하여 Amazon QLDB 스트리밍에서 DynamoDB로 데이터를 실시간으로 전송하는 방법

출신 배경


2019년 9월에 AWS는 general availability of QLDB를 발표했는데 이것은 완전히 관리된 집중 분류 장부 데이터베이스이다.QLDB의 핵심은 로그만 첨부하는 것입니다.모든 요청은 로그를 통해 제출된 업무만 포함됩니다.이러한 데이터는 변경될 때마다 완벽한 감사 추적을 생성하여 신뢰할 수 있는 데이터가 필요한 디지털 세계에서 가장 중요하고 규정 준수를 충족해야 합니다.
분석과 다운스트림 이벤트 처리 같은 용례를 지원하거나 다른 전문적으로 구축된 데이터베이스에서 더 잘 지원하는 작업을 지원하는 동시에 QLDB를 사실의 원천으로 보존하는 것이 도전이다.최근 발표된 real-time streaming for Amazon QLDB 은 이미 이 문제에 대답했다.이것은 QLDB의 주요 새로운 특성입니다. 자세히 살펴보겠습니다.

QLDB 스트림


QLDB Streams는 로그에 대한 변경 사항을 거의 실시간으로 대상 Kinesis 데이터 흐름에 연속적으로 쓸 수 있도록 하는 기능입니다.소비자들은 흐름을 구독하고 적당한 행동을 취할 수 있다.이 방법에는 다음과 같은 여러 가지 이점이 있습니다.
  • QLDB Streams는 지정된 분류 장부에서 근거리 실시간 연속 데이터 흐름을 제공합니다
  • QLDB Streams에서 최소 1회 제공 보증
  • 시작/종료 날짜와 시간에 따라 여러 개의 흐름을 만들 수 있습니다.이것은 특정 시점에서 모든 문서 수정을 되돌려주고 재방송하는 능력을 제공한다.
  • 동각 데이터 흐름
  • 의 데이터를 소비하기 위해 최대 20개의 소비자(소프트 제한)를 구성할 수 있다
    QLDB 흐름을 이해하는 가장 좋은 방법은 직접 시도하는 것이다.Nodejs에서 GitHubQLDB Simple Demo에서 사용할 수 있는 데모 프로그램을 구축했습니다.
    프레젠테이션의 서버 아키텍처는 다음 그림과 같습니다.

    하나의 스택은 QLDB와 상호작용하는 AWS Lambda 함수를 호출하는 AWS API 게이트웨이를 통해 백엔드 공개 API를 구축합니다.별도의 스택은 Kinesis에서 트리거하는 AWS Lambda 함수를 포함하는 QLDB 흐름을 지원합니다.이 함수는 QLDB 데이터의 하위 집합을 사용하여 DynamoDB의 테이블을 업데이트하고 모든 개인 식별 정보(PII)를 삭제합니다.

    QLDB 스트림 로깅 유형


    QLDB는 세 가지 유형의 레코드를 작성합니다.QLDB 스트림 ARN, 레코드 유형 및 유효 로드로 구성된 일반 최상위 형식을 사용합니다.
    {
      qldbStreamArn: string,
      recordType: CONTROL | BLOCK | REVISION_DETAILS,
      payload: {
        // data
      }
    }
    
    제어 레코드
    제어 기록은 Kinesis에 기록된 첫 번째 기록이자 종료 날짜/시간을 지정할 때 쓴 마지막 기록입니다.유효 부하는 "만들기"의 첫 번째 이벤트인지 "완료"의 마지막 이벤트인지 설명할 뿐입니다.
    {
      controlRecordType:"CREATED/COMPLETED"
    }
    
    블록 레코드
    블록 요약 기록은 QLDB에 트랜잭션의 일부로 제출된 블록에 대한 세부 정보를 나타냅니다.QLDB와의 모든 상호 작용은 하나의 트랜잭션에서 발생합니다.프레젠테이션 응용 프로그램에서 새 자전거 면허증을 만들 때 3단계를 수행해야 한다.
  • 제공된 전자 메일 주소가 고유한지 확인하기 위해 테이블에서 찾기
  • 새 라이센스 레코드 작성
  • 2단계
  • 에서 QLDB가 생성하고 반환한 문서 ID를 포함하여 라이센스 기록을 업데이트합니다.
    여기서 발생하는 블록 레코드는 다음과 같습니다.
    {
      blockAddress: {...},
      ...
      transactionInfo: {
        statements: [
          {
            statement: "SELECT Email FROM BicycleLicence AS b WHERE b.Email = ?\",
            startTime: 2020-07-05T09:37:11.253Z,
            statementDigest: {{rXJNhQbB4tyQLAqYYCj6Ahcar2D45W3ySfxy1yTVTBY=}}
          },
          {
              statement: "INSERT INTO BicycleLicence ?\",
              startTime: 2020-07-05T09:37:11.290Z,
              statementDigest: {{DnDQJXtKop/ap9RNk9iIyrJ0zKSFYVciscrxiOZypqk=}}
          },
          {
              statement: "UPDATE BicycleLicence as b SET b.GUID = ?, b.LicenceId = ? WHERE b.Email = ?\",
              startTime: 2020-07-05T09:37:11.314Z,
              statementDigest: {{xxEkXzdXLX0/jmz+YFoBXZFFpUy1H803ph1OF2Lof0A=}}
          }
        ],
        documents: {...}
      },
      revisionSummaries: [{...}]
    }
    
    모든 실행된 PartiQL 문장은 블록 기록에 포함되며, SELECT 문장을 포함한다. 왜냐하면 그들은 같은 업무의 일부분을 구성하기 때문이다.여러 개의 테이블을 사용하면 같은 업무에서 실행되는 모든 테이블의 문장이 블록 기록에 나타납니다.
    개정 세부 정보 기록
    REVISION\u DETAILS 레코드는 분류 장부에 제출된 문서 수정을 나타냅니다.유효 부하는 최근에 제출한 보기와 관련된 테이블 이름과 Id를 포함합니다. 한 업무에서 세 개의 테이블을 업데이트하면 블록 기록과 세 개의 REVISION\u 상세 정보 기록을 만들 수 있습니다.이 중 하나의 레코드의 예는 다음과 같습니다.
    {
      tableInfo: {
        tableName: "Orders",
        tableId: "LY4HO2JU3bX99caTIXJonG"
      },
      revision: {
        blockAddress: {...},
        hash: {{hrhsCwsNPzLjCsOBHRtSkMCh2JGrB6q0eOGFswyQBPU=}},
        data: {
          OrderId: "12345",
          Item: "ABC12345",
          Quantity: 1
        },
        metadata: {
          id: "3Ax1in3Mt7L0YvVb6XhYyn",
          version: 0,
          txTime: 2020-07-05T18:22:14.019Z,
          txId: "84MQSpihZfxFzpQ4fGyXtX"
        }
      }
    }
    

    AWS Lambda에서 이벤트 처리


    기본적으로 QLDB 흐름은 Kinesis 데이터 흐름에서 레코드 집합을 지원하도록 구성됩니다.이렇게 하면 QLDB가 단일 Kinesis 데이터 흐름 기록에 여러 개의 흐름 기록을 게시할 수 있습니다.이것은 흡수량을 크게 향상시키고 원가 최적화를 높일 수 있다. 왜냐하면 put의 가격은 25KB의 유효 부하 "블록"이기 때문에 우리는 이 기능을 사용하고자 한다.
    데모 응용 프로그램 사용Nodejs Kinesis Aggregation and Disaggregation Modules.동적 기록 이벤트는 다음 구조의 동적 기록 세트로 구성됩니다.
    {
      Records: [
        {
          kinesis: {
              ...
              data: '...',
              approximateArrivalTimestamp: 1593728523.059
          },
          ...
        }
      ]
    };
    
    AWS Lambda 함수의 프로세서에서 함수map() 함수를 사용하여 그룹의 모든 요소가 전송된 기록을 한 번에 처리합니다.각 기록은 먼저 promiseDeaggregate 조정한 다음에 processRecords 조정한다.
    await Promise.all(
      event.Records.map(async (kinesisRecord) => {
        const records = await promiseDeaggregate(kinesisRecord.kinesis);
        await processRecords(records);
      })
    );
    
    promiseDeaggregate 함수는 처리 기록 집합의 deaggregateSync 인터페이스를 사용하고 모든 집합 해제 기록은 해석된 Promise으로 되돌아옵니다.
    const promiseDeaggregate = (record) =>
      new Promise((resolve, reject) => {
        deagg.deaggregateSync(record, computeChecksums, (err, responseObject) => {
          if (err) {
            //handle/report error
            return reject(err);
          }
          return resolve(responseObject);
        });
    });
    
    일단 돌아오면 기록은 처리된다.이것은 base64 인코딩 데이터에 대한 디코딩과 관련된다.유효 부하는 QLDB가 흐름에 게시하는 실제 이온 이진 기록입니다.ion-js를 사용하여 메모리에 로드한 다음 관련 작업을 수행할 수 있습니다.프레젠테이션 상황에서 처리된 유일한 기록 형식은 REVISION\u 디테일이고 다른 모든 기록 형식은 생략됩니다.
    async function processRecords(records) {
      await Promise.all(
        records.map(async (record) => {
          // Kinesis data is base64 encoded so decode here
          const payload = Buffer.from(record.data, "base64");
    
          // payload is the actual ion binary record published by QLDB to the stream
          const ionRecord = ion.load(payload);
    
          // Only process records where the record type is REVISION_DETAILS
          if (JSON.parse(ion.dumpText(ionRecord.recordType)) !== REVISION_DETAILS) {
            console.log(`Skipping record of type ${ion.dumpPrettyText(ionRecord.recordType)}`);
          } else {
            // process record
          }
        })
      );
    }
    

    상단 힌트


    데이터에 고유 문서 Id 추가
    QLDB에서 새 문서를 작성할 때 보증되는 유일한 식별자는 id 섹션의 metadata 필드입니다.프레젠테이션 응용 프로그램에서 이 값을 검색한 다음 응용 프로그램 data 부분에 채워 넣으십시오.이 점은 매우 중요합니다. 기록이 삭제되면 (변경할 수 없기 때문에 일기장에 보존됩니다) REVISION\u DETAILS 메시지에 빈 data 부분을 보냅니다.이 기록은 메타데이터 부분에서 사용할 수 있는 문서 id를 가지고 있으며 다음과 같이 검색할 수 있습니다.
    // retrieve the id from the metadata section of the message
    const id = ion
      .dumpText(ionRecord.payload.revision.metadata.id)
      .replace(/['"]+/g, "");
    
    DynamoDB에서 레코드를 식별하고 테이블에서 삭제할 수 있습니다.
    반복 및 무질서 기록 처리
    QLDB streams는 한 번 이상 배송됩니다.이것은 반복적이고 무질서한 기록을 Kinesis 데이터 흐름에 발표할 수 있다는 것을 의미한다.
    각 블록 레코드는 다음과 같습니다blockAddress.
    blockAddress: {
      strandId: "GJMmYanMuDRHevK9X6MX3h",
      sequenceNo: 3
    }
    
    이것은 분류 장부의 블록의 서열 번호를 상세하게 설명하였다.QLDB는 변경할 수 없으므로 각 블록은 로그 끝에 첨부됩니다.
    각 개정판의 세부 정보 기록에는 version 섹션에서 파일의 metadata 번호가 포함됩니다.각 문서는 버전 0 으로 작성된 레코드의 증가된 버전 번호를 사용합니다.
    만약 필요하다면, 그 중 하나 또는 두 개의 값을 사용하면 중복되거나 무질서한 기록을 처리하는 데 도움을 줄 수 있다.

    더 알고 싶어요.

  • awesome-QLDB
  • 에서 QLDB 가이드, 개발 도구 및 리소스에 대한 기획 목록을 찾을 수 있습니다.
  • 업데이트되는 QLDB 온라인 가이드는 QLDB Guide
  • 에서 확인할 수 있습니다.

    좋은 웹페이지 즐겨찾기