AWS Lambda에서 DynamoDB에서 얻은 값에 임의의 집계를 적용합니다(그룹 처리 추가)

10482 단어 람다DynamoDBAWS
이전 게시물의 업데이트 버전입니다. 이전의 구조에서는 하나의 데이터에 대해서의 집계를 걸 수 없었습니다만, 그러면 실운용상 너무 불편하다고 생각했으므로, 지정한 값으로 그룹핑할 수 있도록 해 보았습니다.

AWS Lambda에서 DynamoDB에서 얻은 값에 임의의 집계를 적용합니다.

입력 데이터 포맷 변경



거의 사용법은 이전의 것과 함께입니다만, 이하의 점만 바꾸었습니다.
  • ID는 배열 형식 (["sensor1", "sensor2"])으로 지정되도록 사양을 변경했습니다.
  • 테이블명을 환경 변수로부터 취득하도록(듯이) 했다.

  • ID는 배열 형식(["sensor1", "sensor2"])으로 지정하도록 사양 변경



    이런 느낌이 최신 형식입니다.
    {
      "label_id": "id",
      "label_range": "timestamp",
      "id": [
        "sensor1",
        "sensor2"
      ],
      "aggregator": "latest",
      "time_from": "2017-04-30T22:00:00.000",
      "time_to": "2017-04-30T22:06:00.000",
      "params": {
        "range": "timestamp"
      }
    }
    

    ID 부분을 배열로 만들었습니다. 이렇게 하면 지정한 ID의 최신 값을 얻을 수 있습니다.
    반환값은 이런 느낌이 됩니다.
    "[{\"timestamp\": \"2017-04-30T22:05:00.000\", \"score\": 0.0, \"id\": \"sensor1\"}, {\"timestamp\": \"2017-04-30T22:06:00.000\", \"score\": 1.0, \"id\": \"sensor2\"}]"
    

    덧붙여서 DynamoDB에는 이런 느낌의 값을 준비하고 있었습니다.



    테이블 이름을 환경 변수에서 가져옵니다.



    그 만마입니다. handler.py의 os.environ['TABLE'] 부분입니다. Lambda를 실행할 때 환경 변수를 이런 식으로 지정하십시오.



    handler.py
    import sys
    import boto3
    import json
    import decimal
    import os
    from boto3.dynamodb.conditions import Key
    
    from aggregator.lambda_aggregator import LambdaAggregator
    from aggregator.latest_aggregator import LatestAggregator
    from aggregator.max_aggregator import MaxAggregator
    from aggregator.min_aggregator import MinAggregator
    from aggregator.sum_aggregator import SumAggregator
    from aggregator.avg_aggregator import AvgAggregator
    from aggregator.count_aggregator import CountAggregator
    
    dynamodb = boto3.resource('dynamodb')
    table    = dynamodb.Table(os.environ['TABLE'])
    
    aggregator_map = {}
    aggregator_map['latest'] = LatestAggregator()
    aggregator_map['max'] = MaxAggregator()
    aggregator_map['min'] = MinAggregator()
    aggregator_map['sum'] = SumAggregator()
    aggregator_map['avg'] = AvgAggregator()
    aggregator_map['count'] = CountAggregator()
    
    def run(event, context):
        check_params(event)
        result = []
    
        for id in event['id']:
            res = table.query(
                    KeyConditionExpression=Key(event['label_id']).eq(id) & Key(event['label_range']).between(event['time_from'], event['time_to']),
                    ScanIndexForward=False
                )
    
            return_response = aggregator_map[event['aggregator']].aggregate(res['Items'], event['params'])
            result.append(return_response)
    
        return json.dumps(result, default=decimal_default)
    
    def decimal_default(obj):
        if isinstance(obj, decimal.Decimal):
            return float(obj)
        raise TypeError
    
    def check_params(params):
        if 'label_id' not in params or 'label_range' not in params or 'id' not in params or 'aggregator' not in params or 'time_from' not in params or 'time_to' not in params or 'params' not in params:
            sys.stderr.write("Parameters for label_id, label_range, id, aggregator, time_from, time_to and params are needed.")
            sys.exit()
    

    출처



    여기에 커밋하고 있습니다. (이전 항목 업데이트)

    좋은 웹페이지 즐겨찾기