스트림 시청
11368 단어 cloudserverlessdistributedsystemsaws
이는 요소를 하나씩 처리할 것으로 예상할 수 있는 스트림을 처리하는 일반적인 접근 방식과 모순되는 것처럼 들릴 수 있습니다. 그러나 다음과 같은 상황을 고려해 봅시다.
매우 간단한 접근 방식은 생산자의 실행을 추적하는 것입니다(예: 내 previous story에서 보여준 방법 사용). 세 번째 단계는 모든 생산자가 계산을 완료한 경우에만 실행됩니다.
이 접근 방식의 주요 단점은 스트림을 통해 레코드를 전달하고 영구 저장소에 저장하는 것과 관련된 지연을 고려하지 않는다는 것입니다. 이는 결국 부분적으로 완료된 데이터 세트에 대한 계산 집계로 이어질 수 있습니다.
더 나은 아이디어는 관찰자를 사용하여 스토리지에 영구적으로 저장된 레코드 수를 추적하는 것입니다.
AWS의 공통 구성 요소인 Kinesis Firehose, S3, Lambda, DynamoDB를 사용하는 다음 아키텍처를 고려해 보겠습니다.
전체 다이어그램은 다음과 같습니다.
일부 데이터를 생성하고 Kinesis Firehose로 푸시하는 많은 람다가 있습니다. 그런 다음 S3 버킷에 저장됩니다.
Firehose가 S3 버킷에 새 객체를 생성할 때마다 모니터 람다가 트리거됩니다. 이러한 이벤트를 계산하고 DynamoDB 테이블의 카운터를 업데이트합니다. 레코드의 기본 키는 관찰된 버킷 이름입니다.
이러한 이벤트에 대한 프로토타입 핸들러는 다음과 같이 정의할 수 있습니다.
import boto3
import os
dynamodb = boto3.resource('dynamodb')
observerTableName = os.environ.get('observerTableName')
table = dynamodb.Table(observerTableName)
def monitor(event, context):
for record in event.get('Records', []):
if record.get('eventName', '') == 'ObjectCreated:Put':
bucketName = record['s3']['bucket']['name']
table.update_item(
Key={'id': bucketName},
UpdateExpression='ADD num_records :val',
ExpressionAttributeValues={':val': 1}
)
다이어그램의 두 번째 부분은 관찰자입니다.
이 람다는 SQS 대기열을 사용하여 반복적으로 실행하고 스토리지 카운터 DynamoDB 테이블의 콘텐츠를 읽습니다.
관찰자 뒤에 있는 알고리즘은 매우 간단하며 다음 다이어그램으로 설명할 수 있습니다.
다음과 같이 구현할 수 있습니다.
def observer(event, context):
for record in event['Records']:
payload = record['body']
message = json.loads(payload)
if message.get('repeated', 0) >= MAX_NUM_REPEAT:
# call external service, ready to handle the data in storage.
print(f'{message=} finished - calling external service')
continue
res = table.get_item(Key={'id': message['bucket']})
if 'Item' in res:
item = res['Item']
num_records = int(item['num_records'])
if num_records == message['last_num_records']:
message['repeated'] += 1
else:
message['repeated'] = 0 # Reset the repeat counter.
message['last_num_records'] = num_records
sqs.send_message(QueueUrl=os.environ['selfSQSURL'], MessageBody=json.dumps(message), DelaySeconds=30)
다음 구조의 SQS 메시지 포함
{
"repeated": 0,
"last_num_records": 0,
"bucket": "bucket name"
}
두 가지 매개변수를 조정해야 합니다. 첫 번째는 관찰 창의 길이입니다(위 코드에서
MAX_NUM_REPEAT
로 선언됨). 두 번째 파라미터는 DynamoDB 테이블에서 읽기 사이의 지연이며 여기서는 30
초로 설정됩니다.이 두 매개 변수에 대해 설명하겠습니다.
생산자가 느린 프로세스이고 샘플링이 너무 빠르면(지연 시간이 짧음) 프로세스가 완료된 것으로 잘못 간주할 수 있습니다.
반면에 생산자가 빠른 프로세스인 경우 관찰자가 데이터가 준비되었다는 알림을 보내기 전에 불필요하게 기다릴 수 있습니다
MAX_NUM_REPEAT * delay
.지연 시간에 대해 다양한 최적화 전략을 사용할 수 있습니다.
초기 지연 시간을
repeated
카운터로 나눕니다.delay = 30 if repeated == 0 else int(30 / repeated)
지수 함수 사용
delay = numpy.ceil(30*numpy.exp(-repeated)).astype(int)
어떤 매개변수가 적절한지는 생산 공정의 특성에 따라 다릅니다.
여기서 제시한 방법은 스토리지(예: S3, Elasticsearch 등)에서 데이터 세트를 준비한 후 일부 후처리 작업을 실행해야 하는 데이터 처리에 유용할 수 있습니다.
적용하려면 약간의 조정이 필요하며 오탐지 사례를 처리하는 방법도 고려해야 합니다.
표지 이미지에서 벨기에 루벤 근처의 Dijle 강
Reference
이 문제에 관하여(스트림 시청), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/jkrajniak/watch-your-stream-1d7p텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)