스트림 시청

last story에서 분산 클라우드 컴퓨팅을 관찰하는 방법에 대해 논의했습니다. 여기서는 조금 비슷한 주제에 집중하겠습니다. 주로 시스템으로 흐르는 데이터 흐름을 관찰하는 방법, 특정 순간에 더 이상 데이터 흐름이 없을 것이라고 정확히 결정하는 방법, 후처리를 수행할 준비가 되었습니다.

이는 요소를 하나씩 처리할 것으로 예상할 수 있는 스트림을 처리하는 일반적인 접근 방식과 모순되는 것처럼 들릴 수 있습니다. 그러나 다음과 같은 상황을 고려해 봅시다.
  • 초기 이벤트는 생성자 집합을 호출하여 데이터를 스트림으로 푸시합니다.
  • 생산자가 동시에 데이터를 스트림으로 푸시합니다. 그런 다음 소비자가 S3와 같은 일부 영구 저장소에 레코드를 저장한다고 가정합니다.
  • 모든 예상 데이터가 영구 저장소에 있는 후에만 다음 변환을 실행할 수 있습니다. 일부 집계;

  • 매우 간단한 접근 방식은 생산자의 실행을 추적하는 것입니다(예: 내 previous story에서 보여준 방법 사용). 세 번째 단계는 모든 생산자가 계산을 완료한 경우에만 실행됩니다.

    이 접근 방식의 주요 단점은 스트림을 통해 레코드를 전달하고 영구 저장소에 저장하는 것과 관련된 지연을 고려하지 않는다는 것입니다. 이는 결국 부분적으로 완료된 데이터 세트에 대한 계산 집계로 이어질 수 있습니다.


    더 나은 아이디어는 관찰자를 사용하여 스토리지에 영구적으로 저장된 레코드 수를 추적하는 것입니다.
    AWS의 공통 구성 요소인 Kinesis Firehose, S3, Lambda, DynamoDB를 사용하는 다음 아키텍처를 고려해 보겠습니다.
    전체 다이어그램은 다음과 같습니다.



    일부 데이터를 생성하고 Kinesis Firehose로 푸시하는 많은 람다가 있습니다. 그런 다음 S3 버킷에 저장됩니다.
    Firehose가 S3 버킷에 새 객체를 생성할 때마다 모니터 람다가 트리거됩니다. 이러한 이벤트를 계산하고 DynamoDB 테이블의 카운터를 업데이트합니다. 레코드의 기본 키는 관찰된 버킷 이름입니다.

    이러한 이벤트에 대한 프로토타입 핸들러는 다음과 같이 정의할 수 있습니다.

    import boto3
    import os
    
    dynamodb = boto3.resource('dynamodb')
    
    observerTableName = os.environ.get('observerTableName')
    table = dynamodb.Table(observerTableName)
    
    
    def monitor(event, context):
        for record in event.get('Records', []):
            if record.get('eventName', '') == 'ObjectCreated:Put':
                bucketName = record['s3']['bucket']['name']
                table.update_item(
                    Key={'id': bucketName},
                    UpdateExpression='ADD num_records :val',
                    ExpressionAttributeValues={':val': 1}
                )
    
    

    다이어그램의 두 번째 부분은 관찰자입니다.
    이 람다는 SQS 대기열을 사용하여 반복적으로 실행하고 스토리지 카운터 DynamoDB 테이블의 콘텐츠를 읽습니다.
    관찰자 뒤에 있는 알고리즘은 매우 간단하며 다음 다이어그램으로 설명할 수 있습니다.


    다음과 같이 구현할 수 있습니다.

    def observer(event, context):
        for record in event['Records']:
            payload = record['body']
            message = json.loads(payload)
            if message.get('repeated', 0) >= MAX_NUM_REPEAT:
                # call external service, ready to handle the data in storage.
                print(f'{message=} finished - calling external service')
                continue
    
            res = table.get_item(Key={'id': message['bucket']})
            if 'Item' in res:
                item = res['Item']
                num_records = int(item['num_records'])
                if num_records == message['last_num_records']:
                    message['repeated'] += 1
                else:
                    message['repeated'] = 0  # Reset the repeat counter.
                message['last_num_records'] = num_records
                sqs.send_message(QueueUrl=os.environ['selfSQSURL'], MessageBody=json.dumps(message), DelaySeconds=30)
    

    다음 구조의 SQS 메시지 포함

    {
      "repeated": 0,
      "last_num_records": 0,
      "bucket": "bucket name"
    }
    

    두 가지 매개변수를 조정해야 합니다. 첫 번째는 관찰 창의 길이입니다(위 코드에서 MAX_NUM_REPEAT 로 선언됨). 두 번째 파라미터는 DynamoDB 테이블에서 읽기 사이의 지연이며 여기서는 30초로 설정됩니다.

    이 두 매개 변수에 대해 설명하겠습니다.
    생산자가 느린 프로세스이고 샘플링이 너무 빠르면(지연 시간이 짧음) 프로세스가 완료된 것으로 잘못 간주할 수 있습니다.
    반면에 생산자가 빠른 프로세스인 경우 관찰자가 데이터가 준비되었다는 알림을 보내기 전에 불필요하게 기다릴 수 있습니다MAX_NUM_REPEAT * delay.

    지연 시간에 대해 다양한 최적화 전략을 사용할 수 있습니다.

  • 초기 지연 시간을 repeated 카운터로 나눕니다.

    delay = 30 if repeated == 0 else int(30 / repeated)
    




  • 지수 함수 사용

    delay = numpy.ceil(30*numpy.exp(-repeated)).astype(int)
    



  • 어떤 매개변수가 적절한지는 생산 공정의 특성에 따라 다릅니다.


    여기서 제시한 방법은 스토리지(예: S3, Elasticsearch 등)에서 데이터 세트를 준비한 후 일부 후처리 작업을 실행해야 하는 데이터 처리에 유용할 수 있습니다.
    적용하려면 약간의 조정이 필요하며 오탐지 사례를 처리하는 방법도 고려해야 합니다.


    표지 이미지에서 벨기에 루벤 근처의 Dijle 강

    좋은 웹페이지 즐겨찾기