Amazon DynamoDB의 새로운 기능인 PartiQL, S3로 내보내기, Kinesis 데이터 흐름과 통합

AWS re:InventAround가 있을 때마다 AWS는 한 달 안에 많은 새로운 기능을 발표합니다.이 블로그에서는 아마존 다이나모DB가 내놓은 3가지 새로운 기능을 소개할 예정이다.DynamoDB는 모든 규모에서 1밀리초 단위의 성능을 제공하는 비관계형 호스팅 데이터베이스입니다.
아마존 DynamoDB의 새로운 기능
  • PartiQL - 아마존 다이나모DB의 SQL 호환 검색 언어.
  • S3으로 내보내기 - Amazon DynamoDB 테이블을 S3로 내보냅니다.이 블로그에 DynamoDB 항목을 역서열화하는 용례를 추가했습니다. S3에 쓰고 Athena를 사용하여 검색합니다.
  • DynamoDB와 Kinesis 흐름의 직접적인 통합인 아마존 DynamoDB는 Kinesis 데이터 흐름의 흐름 프로젝트급 이미지이다.
  • 먼저 새로운 Amazon DynamoDB 콘솔을 살펴보겠습니다.나는 이 블로그에서 놀 수 있는 DDB 시계가 두 개 있다.북스 테이블은 저자별로 나뉘고 제목별로 정렬됩니다.영화표는 연도에 따라 구분하고 제목에 따라 정렬한다.

    다음 기능으로 넘어가겠습니다.

  • PartIQ - Amazon DynamoDB에서 SQL을 사용하여 항목을 선택, 삽입, 업데이트 및 삭제할 수 있습니다.현재 Amazon DynamoDB 콘솔, AWS 명령줄 인터페이스(AWS CLI) 및 DynamoDB API에서PartiQL for DynamoDB를 사용할 수 있습니다.이 블로그에 대해 나는 AWS 콘솔을 사용한다.
  • DynamoDB>PartiQL 편집기
    SQLs 선택
    간편한 SQL 선택
    SELECT * FROM Books where Author='William Shakespeare'
    직함
    서식
    작자
    카테고리
    햄릿
    {"양장판": {"S": "GVJZQ7JK"}, "평장판": {"S": "A4TFUR98"}, "유성 도서": {"S": "XWMGHW96"}
    윌리엄 셰익스피어
    극적
    다음 SQL 키 경로는 제목, 양장본 및 범주를 반환합니다-
    SELECT Title, Formats['Hardcover'], category FROM Books where Author='John Grisham'
    카테고리
    직함
    양장본
    걱정하다
    회사명
    Q7QWE3U2
    걱정하다
    비를 만드는 자
    J4SUKVGU
    스릴러 영화
    청산하다
    무효이었어
    다음 SQL은 "contains"함수를 사용합니다. 속성 클래스에 "suspend"문자열이 있으면 TRUE-를 되돌려줍니다.
    SELECT Title, Formats['Audiobook'], Category FROM Books where Author='John Grisham' and contains(Category, 'Suspense')

    타이틀
    게시 날짜
    등급
    2011
    셜록 포르무스: 그림자 게임
    2011-12-10T00:00:00Z
    570
    SQL 삽입 -
    단일 항목 삽입 -
    INSERT INTO Books value {'Title' : 'A time to kill', 'Author' : 'John Grisham', 'Category' : 'Suspense' }
    
    SELECT * FROM Books WHERE Title='A time to kill'
    작자
    직함
    카테고리
    존 그레이슨
    살육의 순간
    걱정하다
    "INSERT INTO SELECT"SQL 실패,Validation Exception: 지원되지 않는 작업: 한 문장에 여러 항목을 삽입하는 것은 지원되지 않습니다. "INSERT INTO table Name VALUE item"으로 변경하십시오.
    SQL 업데이트 -
    이전 insert sql에서 Formats 열은 null입니다.그래서 이 책의 격식란을 업데이트합시다.
    UPDATE Books SET Formats={'Hardcover':'J4SUKVGU' ,'Paperback': 'D7YF4FCX'} WHERE Author='John Grisham' and Title='A time to kill'
    직함
    서식
    작자
    카테고리
    살육의 순간
    {"양장판": {"S": "J4SUKVGU"}, "평장판": {"S": "D7YF4FCX"}
    존 그레이슨
    걱정하다
    맵에서 키를 삭제하려면 sql 업데이트를 사용하십시오 -
    UPDATE Books REMOVE Formats.Paperback WHERE Author='John Grisham' and Title='A time to kill'
    직함
    서식
    작자
    카테고리
    살육의 순간
    {"양장": {"S": "J4SUKVGU"}
    존 그레이슨
    걱정하다
    SQL 삭제 -
    DELETE FROM Books WHERE Author='John Grisham' and Title='A time to kill'
    추가 참고 자료 - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.html
    2. S3로 내보내기 - 완전한 Amazon DynamoDB 테이블을 Amazon S3 메모리통으로 내보냅니다.DynamoDB 테이블을 내보내면 시점 복구 창에서 언제든지 Amazon DynamoDB 테이블에서 데이터를 내보낼 수 있습니다.이렇게 하려면 테이블에서 PITR(Point Recovery)을 활성화해야 합니다.테이블에 PITR이 설정되어 있지 않으면 S3으로 내보낼 때 테이블에 대한 사용 요청 오류가 보고됩니다.
    DynamoDB>S3으로 내보내기


    내보내기가 완료되면 목록 요약을 생성합니다.상세한 정보를 내보낸 json 파일과 목록 파일을 정리합니다.json에는 S3 파일 위치에 대한 자세한 정보가 들어 있습니다.
    S3 기능으로 내보내기 위해 데이터에 대한 분석과 복잡한 조회를 수행하기 위해 DynamoDB JSON 형식을 반서열화하고 AWS Athena에서 데이터를 조회할 수 있는 데이터 파이프라인을 만들 수 있습니다.

    워크플로에는 다음 단계가 포함됩니다.
  • 시간 기반 CloudWatch 이벤트를 트리거합니다.
  • 이 이벤트는 AWS Lambda 함수를 트리거합니다.
  • DynamoDB의 S3 내보내기가 시작되었습니다.
  • DynamoDB JSON 형식의 데이터를 S3 원시 저장통에 기록합니다.S3 객체는 압축된 json 파일입니다.
  • 종류마다.json.S3 원본 저장소 통의 gz 파일 이벤트 알림은 AWS Lambda를 트리거하도록 설정됩니다.다음은 AWS Lambda 섹션을 트리거하는 S3 이벤트입니다.
  • AWS Lambda는 S3 객체를 읽고 DynamoDB Type Deserializer 클래스를 사용하여 DynamoDB JSON 형식의 데이터를 반서열화합니다.이러한 유형은 DynamoDB 유형을 Python 유형으로 반서열화합니다.역서열화된 데이터는 S3 컨텐트 통에 맞춤법 형식으로 기록됩니다.코드는 Lambda 함수 코드 섹션에 있습니다.
  • AWS Lambda 함수는 AWS 풀 디렉토리의 테이블 위치를 업데이트합니다.
  • AWS Athena를 사용하여 데이터를 조회합니다.
  • S3 이벤트가 AWS Lambda를 트리거합니다.

    Lambda 편지 번호
    import io
    import gzip
    import json
    import boto3
    import uuid
    import pandas as pd
    import awswrangler as wr
    from datetime import datetime
    from urllib.parse import unquote_plus
    
    
    def update_glue_table(*, database, table_name, new_location, region_name):
        """ Update AWS Glue non-partitioned table location
        """
    
        glue = boto3.client("glue", region_name=region_name)
    
        response = glue.get_table(
            DatabaseName=database, Name=table_name)
    
        table_input = response["Table"]
        current_location = table_input["StorageDescriptor"]["Location"]
    
        table_input.pop("UpdateTime", None)
        table_input.pop("CreatedBy", None)
        table_input.pop("CreateTime", None)
        table_input.pop("DatabaseName", None)
        table_input.pop("IsRegisteredWithLakeFormation", None)
        table_input.pop("CatalogId", None)
    
        table_input["StorageDescriptor"]["Location"] = new_location
    
        response = glue.update_table(
            DatabaseName=database,
            TableInput=table_input
        )
    
        return response
        
    
    def lambda_handler(event, context): 
        
        """
        Uses class TypeDeserializer which deserializes DynamoDB types to Python types 
        
        Example - 
    
        raw data format :
            [{'ACTIVE': {'BOOL': True}, 'params': {'M': {'customer': {'S': 'TEST'}, 'index': {'N': '1'}}}}, ]
        deserialized data format:
            [{'ACTIVE': True, 'params': {'customer': 'TEST', 'index': Decimal('1')}}]
    
        """
            
        
        s3client = boto3.client('s3')
        athena_db = "default"
        athena_table = "movies"
        
        for record in event['Records']:
            bucket = record['s3']['bucket']['name']
            key = unquote_plus(record['s3']['object']['key'])
     
            response = s3client.get_object(Bucket=bucket, Key=key)
            content = response['Body'].read()
            with gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb') as fh:
                data = [json.loads(line) for line in fh]
                        
        all_data = []
        
        boto3.resource('dynamodb')
        deserializer = boto3.dynamodb.types.TypeDeserializer()
        for row in data:
            all_data.append({k: deserializer.deserialize(v) for k,v in row['Item'].items()})
    
        
        data_df = pd.DataFrame(all_data)
        
        dt = datetime.utcnow().strftime("%Y-%m-%d-%H-%M")
        s3_path="s3://%s/dynamodb/%s/content/dt=%s/" % (bucket, athena_table, dt)
        
        wr.s3.to_parquet(
            df=data_df,
            path=s3_path,
            dataset = True,
        )
        
    
        update_response = update_glue_table(
            database=athena_db, 
            table_name=athena_table, 
            new_location=s3_path,
            region_name="us-west-2")
    
        if update_response["ResponseMetadata"]["HTTPStatusCode"] == 200:
            return (f"Successfully updated glue table location - {athena_db}.{athena_table}")
        else:
            return (f"Failed updating glue table location - {athena_db}.{athena_table}")
    
    

    Query data from Athena

    SELECT title, info.actors, info.rating, info.release_date, year FROM movies where title='Christmas Vacation'
    타이틀
    배우.
    등급을 매기다
    게시 날짜

    1
    크리스마스 연휴
    [셰플란 채스, 베벌리 데안젤로, 줄리엣 루이스]
    0.73
    1989-11-30T00:00:00Z
    1989
    추가 참고 자료 - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataExport.html
    3. DynamoDB와 운동 흐름의 통합.
    DynamoDB Streams는 DynamoDB 표의 프로젝트 단계 수정 시간 순서를 포착합니다.일찌감치 DynamoDB 데이터를 S3에 실시간으로 발표하기 위해 그 중 하나는 DynamoDB 흐름을 사용하고 AWS Lambda 함수를 사용하여 데이터를 Kinesis Firehose로 전송하고 후자는 데이터를 S3에 발표하는 것이다.이를 위해aws실험실https://github.com/awslabs/lambda-streams-to-firehose에서 제공하는 간편한 소프트웨어 패키지를 사용할 수 있습니다.

    이러한 몇 가지 용례가 있는데, AWS는 현재 AWS 다이나모DB를 Amazon Kinesis Stream과 직접 통합시켰다.이제 DynamoDB 테이블의 프로젝트 단계를 Kinesis 데이터 흐름으로 포착할 수 있습니다.이렇게 하면 아래 파이프와 같이 S3에 데이터를 게시할 수 있습니다.

    파이프라인을 설정하려면 Amazon Kinesis 데이터 스트림을 생성합니다.

    Amazon Kinesis 데이터 스트림을 설정한 후 Amazon Kinesis Firehose를 만듭니다.Kinesis Firehose의 소스는 Amazon Kinesis 데이터 흐름이고 목표는 S3이다.

    계속하려면 움직임에 대한 흐름 전송을 계속 사용하십시오.
    DynamoDB>표>Kinesis 데이터 흐름 상세정보>Kinesis에 대한 흐름 관리


    흐름이 활성화되면 테이블의 프로젝트 수준 변경 사항은 Amazon S3 bucket에 캡처되어 기록됩니다.다음은 PartIQ를 사용하여 DynamoDB에서 업데이트된 레코드의 예입니다.DynamoDB streams에 기록된 대략적인 작성 날짜와 시간, 새 이미지와 이전 이미지를 기록합니다.이러한 레코드는 AWS Lambda 또는 AWS Glue를 사용하여 해석할 수 있으며 Data Lake에 저장되어 분석 용례에 사용됩니다.
    {
    “awsRegion”:“us-west-2”,
    “dynamodb”:{
    대략적인 생성 날짜 시간: 1606714671542,
    열쇠:
    작성자:
    "S": "제임스 패터슨"
    },
    직함:
    "S": "대통령이 실종되었습니다"
    }
    },
    "새로운 이미지":
    직함:
    "S": "대통령이 실종되었습니다"
    },
    형식:
    “M”:{
    정품 버전:
    “S”:“JSU4KGVU”
    }
    }
    },
    작성자:
    "S": "제임스 패터슨"
    },
    범주:
    S: 신비
    }
    },
    “OldImage”:{
    직함:
    "S": "대통령이 실종되었습니다"
    },
    형식:
    “M”:{
    정품 버전:
    “S”:“JSU4KGVU”
    },
    평장본:
    “S”:“DY7F4CFX”
    }
    }
    },
    작성자:
    "S": "제임스 패터슨"
    },
    범주:
    S: 신비
    }
    },
    크기 바이트: 254
    },
    이벤트 ID: "bcaaf073-7e0d-49c2-818e-fe3cf7e5f18a",
    이벤트Name: 수정,
    “userIdentity”:null,
    “recordFormat”:“application/json”,
    “tableName”:“Books”,
    “eventSource”:“aws:dynamodb”
    }
    마지막으로 본고에서 저는 PartiQL을 소개했는데 이것은 Amazon DynamoDB에 SQL과 호환되는 조회를 제공합니다.우리는 또한 S3 기능으로 내보내고 AWS Athena를 사용하여 Amazon S3 bucket의 Amazon DynamoDB 데이터를 조회하는 방법을 연구했다.마지막으로, 우리는 실시간 분석 용례를 연구했는데, 이 용례에서, 당신은 흐름이Kinesis 데이터 흐름의 형식으로 Amazon DynamoDB 표의 프로젝트 단계 변경을 포착할 수 있습니다.

    좋은 웹페이지 즐겨찾기