Amazon DynamoDB의 새로운 기능인 PartiQL, S3로 내보내기, Kinesis 데이터 흐름과 통합
12679 단어 dynamodbdataengineeringawsdatabase
아마존 DynamoDB의 새로운 기능
다음 기능으로 넘어가겠습니다.
PartIQ - Amazon DynamoDB에서 SQL을 사용하여 항목을 선택, 삽입, 업데이트 및 삭제할 수 있습니다.현재 Amazon DynamoDB 콘솔, AWS 명령줄 인터페이스(AWS CLI) 및 DynamoDB API에서PartiQL for DynamoDB를 사용할 수 있습니다.이 블로그에 대해 나는 AWS 콘솔을 사용한다.
SQLs 선택
간편한 SQL 선택
SELECT * FROM Books where Author='William Shakespeare'
직함서식
작자
카테고리
햄릿
{"양장판": {"S": "GVJZQ7JK"}, "평장판": {"S": "A4TFUR98"}, "유성 도서": {"S": "XWMGHW96"}
윌리엄 셰익스피어
극적
다음 SQL 키 경로는 제목, 양장본 및 범주를 반환합니다-
SELECT Title, Formats['Hardcover'], category FROM Books where Author='John Grisham'
카테고리직함
양장본
걱정하다
회사명
Q7QWE3U2
걱정하다
비를 만드는 자
J4SUKVGU
스릴러 영화
청산하다
무효이었어
다음 SQL은 "contains"함수를 사용합니다. 속성 클래스에 "suspend"문자열이 있으면 TRUE-를 되돌려줍니다.
SELECT Title, Formats['Audiobook'], Category FROM Books where Author='John Grisham' and contains(Category, 'Suspense')
년타이틀
게시 날짜
등급
2011
셜록 포르무스: 그림자 게임
2011-12-10T00:00:00Z
570
SQL 삽입 -
단일 항목 삽입 -
INSERT INTO Books value {'Title' : 'A time to kill', 'Author' : 'John Grisham', 'Category' : 'Suspense' }
SELECT * FROM Books WHERE Title='A time to kill'
작자직함
카테고리
존 그레이슨
살육의 순간
걱정하다
"INSERT INTO SELECT"SQL 실패,Validation Exception: 지원되지 않는 작업: 한 문장에 여러 항목을 삽입하는 것은 지원되지 않습니다. "INSERT INTO table Name VALUE item"으로 변경하십시오.
SQL 업데이트 -
이전 insert sql에서 Formats 열은 null입니다.그래서 이 책의 격식란을 업데이트합시다.
UPDATE Books SET Formats={'Hardcover':'J4SUKVGU' ,'Paperback': 'D7YF4FCX'} WHERE Author='John Grisham' and Title='A time to kill'
직함서식
작자
카테고리
살육의 순간
{"양장판": {"S": "J4SUKVGU"}, "평장판": {"S": "D7YF4FCX"}
존 그레이슨
걱정하다
맵에서 키를 삭제하려면 sql 업데이트를 사용하십시오 -
UPDATE Books REMOVE Formats.Paperback WHERE Author='John Grisham' and Title='A time to kill'
직함서식
작자
카테고리
살육의 순간
{"양장": {"S": "J4SUKVGU"}
존 그레이슨
걱정하다
SQL 삭제 -
DELETE FROM Books WHERE Author='John Grisham' and Title='A time to kill'
추가 참고 자료 - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.html2. S3로 내보내기 - 완전한 Amazon DynamoDB 테이블을 Amazon S3 메모리통으로 내보냅니다.DynamoDB 테이블을 내보내면 시점 복구 창에서 언제든지 Amazon DynamoDB 테이블에서 데이터를 내보낼 수 있습니다.이렇게 하려면 테이블에서 PITR(Point Recovery)을 활성화해야 합니다.테이블에 PITR이 설정되어 있지 않으면 S3으로 내보낼 때 테이블에 대한 사용 요청 오류가 보고됩니다.
DynamoDB>S3으로 내보내기
내보내기가 완료되면 목록 요약을 생성합니다.상세한 정보를 내보낸 json 파일과 목록 파일을 정리합니다.json에는 S3 파일 위치에 대한 자세한 정보가 들어 있습니다.
S3 기능으로 내보내기 위해 데이터에 대한 분석과 복잡한 조회를 수행하기 위해 DynamoDB JSON 형식을 반서열화하고 AWS Athena에서 데이터를 조회할 수 있는 데이터 파이프라인을 만들 수 있습니다.
워크플로에는 다음 단계가 포함됩니다.
Lambda 편지 번호
import io import gzip import json import boto3 import uuid import pandas as pd import awswrangler as wr from datetime import datetime from urllib.parse import unquote_plus def update_glue_table(*, database, table_name, new_location, region_name): """ Update AWS Glue non-partitioned table location """ glue = boto3.client("glue", region_name=region_name) response = glue.get_table( DatabaseName=database, Name=table_name) table_input = response["Table"] current_location = table_input["StorageDescriptor"]["Location"] table_input.pop("UpdateTime", None) table_input.pop("CreatedBy", None) table_input.pop("CreateTime", None) table_input.pop("DatabaseName", None) table_input.pop("IsRegisteredWithLakeFormation", None) table_input.pop("CatalogId", None) table_input["StorageDescriptor"]["Location"] = new_location response = glue.update_table( DatabaseName=database, TableInput=table_input ) return response def lambda_handler(event, context): """ Uses class TypeDeserializer which deserializes DynamoDB types to Python types Example - raw data format : [{'ACTIVE': {'BOOL': True}, 'params': {'M': {'customer': {'S': 'TEST'}, 'index': {'N': '1'}}}}, ] deserialized data format: [{'ACTIVE': True, 'params': {'customer': 'TEST', 'index': Decimal('1')}}] """ s3client = boto3.client('s3') athena_db = "default" athena_table = "movies" for record in event['Records']: bucket = record['s3']['bucket']['name'] key = unquote_plus(record['s3']['object']['key']) response = s3client.get_object(Bucket=bucket, Key=key) content = response['Body'].read() with gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb') as fh: data = [json.loads(line) for line in fh] all_data = [] boto3.resource('dynamodb') deserializer = boto3.dynamodb.types.TypeDeserializer() for row in data: all_data.append({k: deserializer.deserialize(v) for k,v in row['Item'].items()}) data_df = pd.DataFrame(all_data) dt = datetime.utcnow().strftime("%Y-%m-%d-%H-%M") s3_path="s3://%s/dynamodb/%s/content/dt=%s/" % (bucket, athena_table, dt) wr.s3.to_parquet( df=data_df, path=s3_path, dataset = True, ) update_response = update_glue_table( database=athena_db, table_name=athena_table, new_location=s3_path, region_name="us-west-2") if update_response["ResponseMetadata"]["HTTPStatusCode"] == 200: return (f"Successfully updated glue table location - {athena_db}.{athena_table}") else: return (f"Failed updating glue table location - {athena_db}.{athena_table}")
Query data from Athena
SELECT title, info.actors, info.rating, info.release_date, year FROM movies where title='Christmas Vacation'
타이틀배우.
등급을 매기다
게시 날짜
년
1
크리스마스 연휴
[셰플란 채스, 베벌리 데안젤로, 줄리엣 루이스]
0.73
1989-11-30T00:00:00Z
1989
추가 참고 자료 - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataExport.html
3. DynamoDB와 운동 흐름의 통합.
DynamoDB Streams는 DynamoDB 표의 프로젝트 단계 수정 시간 순서를 포착합니다.일찌감치 DynamoDB 데이터를 S3에 실시간으로 발표하기 위해 그 중 하나는 DynamoDB 흐름을 사용하고 AWS Lambda 함수를 사용하여 데이터를 Kinesis Firehose로 전송하고 후자는 데이터를 S3에 발표하는 것이다.이를 위해aws실험실https://github.com/awslabs/lambda-streams-to-firehose에서 제공하는 간편한 소프트웨어 패키지를 사용할 수 있습니다.
이러한 몇 가지 용례가 있는데, AWS는 현재 AWS 다이나모DB를 Amazon Kinesis Stream과 직접 통합시켰다.이제 DynamoDB 테이블의 프로젝트 단계를 Kinesis 데이터 흐름으로 포착할 수 있습니다.이렇게 하면 아래 파이프와 같이 S3에 데이터를 게시할 수 있습니다.
파이프라인을 설정하려면 Amazon Kinesis 데이터 스트림을 생성합니다.
Amazon Kinesis 데이터 스트림을 설정한 후 Amazon Kinesis Firehose를 만듭니다.Kinesis Firehose의 소스는 Amazon Kinesis 데이터 흐름이고 목표는 S3이다.
계속하려면 움직임에 대한 흐름 전송을 계속 사용하십시오.
DynamoDB>표>Kinesis 데이터 흐름 상세정보>Kinesis에 대한 흐름 관리
흐름이 활성화되면 테이블의 프로젝트 수준 변경 사항은 Amazon S3 bucket에 캡처되어 기록됩니다.다음은 PartIQ를 사용하여 DynamoDB에서 업데이트된 레코드의 예입니다.DynamoDB streams에 기록된 대략적인 작성 날짜와 시간, 새 이미지와 이전 이미지를 기록합니다.이러한 레코드는 AWS Lambda 또는 AWS Glue를 사용하여 해석할 수 있으며 Data Lake에 저장되어 분석 용례에 사용됩니다.
{
“awsRegion”:“us-west-2”,
“dynamodb”:{
대략적인 생성 날짜 시간: 1606714671542,
열쇠:
작성자:
"S": "제임스 패터슨"
},
직함:
"S": "대통령이 실종되었습니다"
}
},
"새로운 이미지":
직함:
"S": "대통령이 실종되었습니다"
},
형식:
“M”:{
정품 버전:
“S”:“JSU4KGVU”
}
}
},
작성자:
"S": "제임스 패터슨"
},
범주:
S: 신비
}
},
“OldImage”:{
직함:
"S": "대통령이 실종되었습니다"
},
형식:
“M”:{
정품 버전:
“S”:“JSU4KGVU”
},
평장본:
“S”:“DY7F4CFX”
}
}
},
작성자:
"S": "제임스 패터슨"
},
범주:
S: 신비
}
},
크기 바이트: 254
},
이벤트 ID: "bcaaf073-7e0d-49c2-818e-fe3cf7e5f18a",
이벤트Name: 수정,
“userIdentity”:null,
“recordFormat”:“application/json”,
“tableName”:“Books”,
“eventSource”:“aws:dynamodb”
}
마지막으로 본고에서 저는 PartiQL을 소개했는데 이것은 Amazon DynamoDB에 SQL과 호환되는 조회를 제공합니다.우리는 또한 S3 기능으로 내보내고 AWS Athena를 사용하여 Amazon S3 bucket의 Amazon DynamoDB 데이터를 조회하는 방법을 연구했다.마지막으로, 우리는 실시간 분석 용례를 연구했는데, 이 용례에서, 당신은 흐름이Kinesis 데이터 흐름의 형식으로 Amazon DynamoDB 표의 프로젝트 단계 변경을 포착할 수 있습니다.
Reference
이 문제에 관하여(Amazon DynamoDB의 새로운 기능인 PartiQL, S3로 내보내기, Kinesis 데이터 흐름과 통합), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/anandp86/new-features-in-amazon-dynamodb-partiql-export-to-s3-integration-with-kinesis-data-streams-54eg텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)