[초보자] AWS kiness를 사용해 보도록 하겠습니다.

6022 단어 AWSKinesis

모티프

  • 일 때문에 킨스의 화제가 됐어요.알 듯 모를 듯 하기 위해 사전에 실기로 기본 동작을 확인한다.
  • 해본 일

  • stream의 제작
  • stream에 데이터 쓰기/읽기(AWS CLI)
  • ptyhon 스크립트에서stream을 쓰고 기록이stream에 쓰이면 lambda 처리
  • 를 통해

    참고 자료


    AWS CLI 이후 처음으로 Amazon Kiness 사용 시도
    [새로운 기능] 시간 기반 모바일 전화 지원
    Lambda(Python)에서 Kinesis Put(API)까지 Lambda(Python)에서 Get(Event)

    단계(CLI의 쓰기/읽기)

    # shard 1本のstreamを作成
    $ aws kinesis create-stream --stream-name mksamba-stream --shard-count 1
    
  • 가장 간단한shard로stream을 만든다.
  • # 1個目のデータの書き込み
    $ aws kinesis put-record --stream-name mksamba-stream --partition-key 123 --data anpanman
    {
        "ShardId": "shardId-000000000000",
        "SequenceNumber": "49593062427984882686636760320963710719404264693501526018"
    }
    
  • shard이기 때문에partition-key의 내용이 어떻든지 간에 이shard에 들어가야 합니다.
  • # 1個目のデータの読み出し
    $ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name mksamba-stream
    {
        "ShardIterator": "AAAAAAAAAAE/WfX4ZpvcDznZT70nxmW4zUu8d3o7eqAYV9niw88UuaxVrTpm/MUC1jFrBXMyhI35oPQvKDRC+u3gY5wBYQh5jKZDF9eAdtMCHB2t136KTGcpDA7i54R5CTzOf5lo1aUqzEcmX1iBRfxAnzaBfi+XpPDRXKN52fDXJXCtPF3hg5MIcCj376QTwQvm5nwKaF+1ou1yrYWrdFnKUgTy0XoP"
    }
    
    $ aws kinesis get-records --shard-iterator AAAAAAAAAAE/WfX4ZpvcDznZT70nxmW4zUu8d3o7eqAYV9niw88UuaxVrTpm/MUC1jFrBXMyhI35oPQvKDRC+u3gY5wBYQh5jKZDF9eAdtMCHB2t136KTGcpDA7i54R5CTzOf5lo1aUqzEcmX1iBRfxAnzaBfi+XpPDRXKN52fDXJXCtPF3hg5MIcCj376QTwQvm5nwKaF+1ou1yrYWrdFnKUgTy0XoP
    {
        "Records": [
            {
                "Data": "YW5wYW5tYW4=",
                "PartitionKey": "123",
                "ApproximateArrivalTimestamp": 1550411747.067,
                "SequenceNumber": "49593062427984882686636760320963710719404264693501526018"
            }
        ],
        "NextShardIterator": "AAAAAAAAAAElfsIV9+Qhkuy59jiyJgfSmaA7tAhra8Pitt280V+rhWch3VaqycIduEXiSZKgIIoNcpyyx9tbpb5oaWDkttJFguomKcGVtCbsMlh3b5aO8hjySCU1BOstTkoCKo+KmB3fVY8S4e+xllLP8D0c5IfTB5wmSTg2jPhku3pgCNIbhgrdmxo5NbocHCn77DySzrBlcD4Cvm9LaaLYQuZpW4vk",
        "MillisBehindLatest": 0
    }
    $ echo YW5wYW5tYW4= | base64 --decode
    anpanman
    
  • 우선 균형기를 가져오고 균형기를 지정해서 기록을 가져옵니다.음반은 베이스64 encode이기 때문에 decode를 진행합니다.
  • #2個目のデータの書き込み
    $ aws kinesis put-record --stream-name mksamba-stream --partition-key 123 --data baikinman
    {
        "ShardId": "shardId-000000000000",
        "SequenceNumber": "49593062427984882686636760321531905854623186722540748802"
    }
    
  • 같은 주식에 두 번째 데이터를 쓴다.
  • #2個目のデータの読み出し
    $ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name mksamba-stream
    {
        "ShardIterator": "AAAAAAAAAAFWgFqMKIxJNggikvbo7dus2kgZyyYGd8F/ZY0r453xruvlXHKlRydL05PpGDs36+wD/GuwY7A6KmU94eDwgi2bGgm3KEJ7wqnz9rN0qtmftCZ2t1O/lk1ztLWQWhzShbcQWxHuM4DLj0y56ahI0tgRdELS7ltSATdMlrqBMmdzJcDE6WUKvg4ZMi6kjtvQvGYBx90HR4EuR9je4ZSkkQdg"
    }
    
    $ aws kinesis get-records --shard-iterator AAAAAAAAAAFWgFqMKIxJNggikvbo7dus2kgZyyYGd8F/ZY0r453xruvlXHKlRydL05PpGDs36+wD/GuwY7A6KmU94eDwgi2bGgm3KEJ7wqnz9rN0qtmftCZ2t1O/lk1ztLWQWhzShbcQWxHuM4DLj0y56ahI0tgRdELS7ltSATdMlrqBMmdzJcDE6WUKvg4ZMi6kjtvQvGYBx90HR4EuR9je4ZSkkQdg
    {
        "Records": [
            {
                "Data": "YW5wYW5tYW4=",
                "PartitionKey": "123",
                "ApproximateArrivalTimestamp": 1550411747.067,
                "SequenceNumber": "49593062427984882686636760320963710719404264693501526018"
            },
            {
                "Data": "YmFpa2lubWFu",
                "PartitionKey": "123",
                "ApproximateArrivalTimestamp": 1550412420.802,
                "SequenceNumber": "49593062427984882686636760321531905854623186722540748802"
            }
        ],
        "NextShardIterator": "AAAAAAAAAAGxAQrYwzNV3D84if3DuHkWj3qoZuKH7EJX0x9Chx6IHDaOe0jjrXSRGLMS614CPMuAB01Yqdat0XSqtuD3zAiD11McKb7+NrrnPJURFZMhHo9KABowzt9y6xwKId/eRqImrdlPgqDUoasCrFO9snOXYdqSeEBXp2ruEV/DYo9+FLRmF+RPSF7vS0NgV+f2KDxsmKv6MBNmzAX46E9WYQUw",
        "MillisBehindLatest": 0
    }
    
    $ echo YmFpa2lubWFu | base64 --decode
    baikinman
    
    
  • shard-iterator-type TRIM_HORIZON으로 설정됨에 따라 주식 내에 저장된 기록이 다시 표시됩니다.
  • 단계(Python Script로 stream에 쓰기)

    # kinesis-put.py
    import boto3
    
    client = boto3.client('kinesis')
    response = client.put_record(
        Data="dokin-chan",
        PartitionKey='123',
        StreamName='mksamba-stream'
    )
    print(response)
    
    EC2 인스턴스(amazon linux2)에서stream의put를 실행합니다.
  • python3,boto3,credential,region 등이 미리 설정된 전제 조건이다.
  • 단계 (lambda로stream에서 읽기)

    # kinesis-lambda.py
    import logging
    import base64
    
    def lambda_handler(event, context):
        logger = logging.getLogger()
        logger.setLevel(logging.INFO)
    
        for record in event['Records']:
           #Kinesis data is base64 encoded so decode here
           payload=base64.b64decode(record["kinesis"]["data"])
           print("Decoded payload: " + str(payload))
    

    - AWS의 샘플 코드로 lambda를 만듭니다.
    키시스를 데이터 원본으로 설정합니다.
    - stream에 쓰기가 있으면 lambda를 시작해서 읽습니다.(이 예에서는 closudwatch logs에서만 읽고 저장)

    감상

  • 섀시 같은 초기본 개념을 이해했지만, 이동전화 지정 방법(TRIM HORIZON) 등 잘 모르는 부분을 사용했기 때문에 계속 접촉하고 싶었다.
  • 좋은 웹페이지 즐겨찾기