[초보자] AWS kiness를 사용해 보도록 하겠습니다.
모티프
해본 일
참고 자료
AWS CLI 이후 처음으로 Amazon Kiness 사용 시도
[새로운 기능] 시간 기반 모바일 전화 지원
Lambda(Python)에서 Kinesis Put(API)까지 Lambda(Python)에서 Get(Event)
단계(CLI의 쓰기/읽기) # shard 1本のstreamを作成
$ aws kinesis create-stream --stream-name mksamba-stream --shard-count 1
# shard 1本のstreamを作成
$ aws kinesis create-stream --stream-name mksamba-stream --shard-count 1
# 1個目のデータの書き込み
$ aws kinesis put-record --stream-name mksamba-stream --partition-key 123 --data anpanman
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49593062427984882686636760320963710719404264693501526018"
}
# 1個目のデータの読み出し
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name mksamba-stream
{
"ShardIterator": "AAAAAAAAAAE/WfX4ZpvcDznZT70nxmW4zUu8d3o7eqAYV9niw88UuaxVrTpm/MUC1jFrBXMyhI35oPQvKDRC+u3gY5wBYQh5jKZDF9eAdtMCHB2t136KTGcpDA7i54R5CTzOf5lo1aUqzEcmX1iBRfxAnzaBfi+XpPDRXKN52fDXJXCtPF3hg5MIcCj376QTwQvm5nwKaF+1ou1yrYWrdFnKUgTy0XoP"
}
$ aws kinesis get-records --shard-iterator AAAAAAAAAAE/WfX4ZpvcDznZT70nxmW4zUu8d3o7eqAYV9niw88UuaxVrTpm/MUC1jFrBXMyhI35oPQvKDRC+u3gY5wBYQh5jKZDF9eAdtMCHB2t136KTGcpDA7i54R5CTzOf5lo1aUqzEcmX1iBRfxAnzaBfi+XpPDRXKN52fDXJXCtPF3hg5MIcCj376QTwQvm5nwKaF+1ou1yrYWrdFnKUgTy0XoP
{
"Records": [
{
"Data": "YW5wYW5tYW4=",
"PartitionKey": "123",
"ApproximateArrivalTimestamp": 1550411747.067,
"SequenceNumber": "49593062427984882686636760320963710719404264693501526018"
}
],
"NextShardIterator": "AAAAAAAAAAElfsIV9+Qhkuy59jiyJgfSmaA7tAhra8Pitt280V+rhWch3VaqycIduEXiSZKgIIoNcpyyx9tbpb5oaWDkttJFguomKcGVtCbsMlh3b5aO8hjySCU1BOstTkoCKo+KmB3fVY8S4e+xllLP8D0c5IfTB5wmSTg2jPhku3pgCNIbhgrdmxo5NbocHCn77DySzrBlcD4Cvm9LaaLYQuZpW4vk",
"MillisBehindLatest": 0
}
$ echo YW5wYW5tYW4= | base64 --decode
anpanman
#2個目のデータの書き込み
$ aws kinesis put-record --stream-name mksamba-stream --partition-key 123 --data baikinman
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49593062427984882686636760321531905854623186722540748802"
}
#2個目のデータの読み出し
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name mksamba-stream
{
"ShardIterator": "AAAAAAAAAAFWgFqMKIxJNggikvbo7dus2kgZyyYGd8F/ZY0r453xruvlXHKlRydL05PpGDs36+wD/GuwY7A6KmU94eDwgi2bGgm3KEJ7wqnz9rN0qtmftCZ2t1O/lk1ztLWQWhzShbcQWxHuM4DLj0y56ahI0tgRdELS7ltSATdMlrqBMmdzJcDE6WUKvg4ZMi6kjtvQvGYBx90HR4EuR9je4ZSkkQdg"
}
$ aws kinesis get-records --shard-iterator AAAAAAAAAAFWgFqMKIxJNggikvbo7dus2kgZyyYGd8F/ZY0r453xruvlXHKlRydL05PpGDs36+wD/GuwY7A6KmU94eDwgi2bGgm3KEJ7wqnz9rN0qtmftCZ2t1O/lk1ztLWQWhzShbcQWxHuM4DLj0y56ahI0tgRdELS7ltSATdMlrqBMmdzJcDE6WUKvg4ZMi6kjtvQvGYBx90HR4EuR9je4ZSkkQdg
{
"Records": [
{
"Data": "YW5wYW5tYW4=",
"PartitionKey": "123",
"ApproximateArrivalTimestamp": 1550411747.067,
"SequenceNumber": "49593062427984882686636760320963710719404264693501526018"
},
{
"Data": "YmFpa2lubWFu",
"PartitionKey": "123",
"ApproximateArrivalTimestamp": 1550412420.802,
"SequenceNumber": "49593062427984882686636760321531905854623186722540748802"
}
],
"NextShardIterator": "AAAAAAAAAAGxAQrYwzNV3D84if3DuHkWj3qoZuKH7EJX0x9Chx6IHDaOe0jjrXSRGLMS614CPMuAB01Yqdat0XSqtuD3zAiD11McKb7+NrrnPJURFZMhHo9KABowzt9y6xwKId/eRqImrdlPgqDUoasCrFO9snOXYdqSeEBXp2ruEV/DYo9+FLRmF+RPSF7vS0NgV+f2KDxsmKv6MBNmzAX46E9WYQUw",
"MillisBehindLatest": 0
}
$ echo YmFpa2lubWFu | base64 --decode
baikinman
단계(Python Script로 stream에 쓰기) # kinesis-put.py
import boto3
client = boto3.client('kinesis')
response = client.put_record(
Data="dokin-chan",
PartitionKey='123',
StreamName='mksamba-stream'
)
print(response)
EC2 인스턴스(amazon linux2)에서stream의put를 실행합니다.
# kinesis-put.py
import boto3
client = boto3.client('kinesis')
response = client.put_record(
Data="dokin-chan",
PartitionKey='123',
StreamName='mksamba-stream'
)
print(response)
단계 (lambda로stream에서 읽기) # kinesis-lambda.py
import logging
import base64
def lambda_handler(event, context):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
for record in event['Records']:
#Kinesis data is base64 encoded so decode here
payload=base64.b64decode(record["kinesis"]["data"])
print("Decoded payload: " + str(payload))
- AWS의 샘플 코드로 lambda를 만듭니다.
키시스를 데이터 원본으로 설정합니다.
- stream에 쓰기가 있으면 lambda를 시작해서 읽습니다.(이 예에서는 closudwatch logs에서만 읽고 저장)
감상
# kinesis-lambda.py
import logging
import base64
def lambda_handler(event, context):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
for record in event['Records']:
#Kinesis data is base64 encoded so decode here
payload=base64.b64decode(record["kinesis"]["data"])
print("Decoded payload: " + str(payload))
Reference
이 문제에 관하여([초보자] AWS kiness를 사용해 보도록 하겠습니다.), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/mksamba/items/2854ee0b83e2043b5313텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)