SORACOM Funnel을 사용하여 Kinesis Firehose에 JSON을 보내 S3에 PUT합니다 (이어서 DynamoDB에 씁니다)
소개
지금 무렵 Kinesis Firehose라든지 지금 갱감이 있습니다만 제대로 사용하고 싶었다고 하고, Funnel로부터 Firehose에 JSON을 던지고 싶다고 하는 요망이 빠져 왔기 때문에, 그다지 요구는 없다고 생각합니다만 기사로 해 보았다
처리 흐름
※점선 부분은 덤
Create S3
Firehose가 PUT하는 S3를 만들어 둡니다. (만드는 방법은 생략합니다)
Create Kinesis Firehose
콘솔에서 Firehose 화면으로 이동하여 'Create delivery stream'에서 Firehose를 만듭니다.
아직 도쿄 지역에서 사용할 수 없기 때문에 오리건 지역에서 만들었습니다.
이제 Firehose에 PUTRecord 된 데이터는 S3에 PUTObject됩니다.
※AWS-CLI로 테스트 실시
aws firehose put-record --delivery-stream-name deliveryStreamName --record Data="test"
SORACOM Funnel 설정
새로운 SIM 그룹을 만들고 Funnel을 설정합니다.
이제 "http://funnel.soracom.io'에 요청을 보내면 Firehose로 데이터가 전송됩니다.
자, 테스트 시간입니다.
방금 설정한 SIM 그룹의 SIM이 박힌 장치에서 POST 방법을 보냅니다.
curl -X POST --data @test.json -H "Content-Type: application/json" http://funnel.soracom.io
전송 후 S3에 데이터가 저장되면 성공합니다.
덤
S3에 PUT이 발생하면 Lambda를 킥으로 DynamoDB에 데이터 저장
대략적인 흐름
1. Funnel을 통해 Firehose에 JSON 보내기
2. Firehose가 S3로 PutRecord
3. S3가 람다를 킥
4. Lambda가 S3에 Put 된 JSON을 얻고 DynamoDB에 Put
람다
Lambda의 역할에는 DynamoDB에 대한 권한이 있습니다.
index.js
const AWS = require("aws-sdk");
const co = require("co");
const dynamodb = new AWS.DynamoDB.DocumentClient({
region: "us-west-2"
});
const s3 = new AWS.S3();
const dynamoPutData = require("./lib/dynamo_put_data");
const s3GetObject = require("./lib/s3_get_object");
exports.handler = (event, context, callback) => {
console.log(JSON.stringify(event));
co(function *() {
// S3のBucketNameとPUTされたJSONのKey情報を取得
const bucketName = event["Records"][0]["s3"]["bucket"]["name"];
const objectKey = event["Records"][0]["s3"]["object"]["key"];
// S3からJSONファイルを取得
const s3GetData = yield s3GetObject.getObject(s3, bucketName, objectKey);
const item = {
id: "test",
payloads: s3GetData
};
return yield dynamoPutData.putDynamoDB(dynamodb, item);
}).then(() => {
console.log("success");
}).catch((err) => {
console.log(err);
});
};
s3_get_object.js
class s3GetObject {
/**
* S3からObjectを取得する
* @param s3
* @param bucket
* @param key
* @returns {Promise}
*/
static getObject(s3, bucket, key) {
return new Promise((resolve, reject) => {
const params = {
Bucket: bucket,
Key: key
};
s3.getObject(params, (err, data) => {
if(err) {
return reject(err);
} else {
const getData = JSON.parse(data.Body.toString());
return resolve(getData["payloads"]);
}
});
});
}
}
module.exports = s3GetObject;
dynamo_put_data.js
class dynamoPutData {
/**
* DynamoDBにデータを保存する(Put)
* @param {DocumentClient} dynamoDB
* @param item
* @returns {Promise}
*/
static putDynamoDB(dynamoDB, item) {
return new Promise((resolve, reject) => {
const params = {
TableName: "tableName",
Item: item
};
return dynamoDB.put(params, (err, data) => {
if (err) {
console.log(err);
return reject(err);
} else {
console.log(data);
return resolve(data);
}
});
});
}
}
module.exports = dynamoPutData;
S3
이것으로 작업 완료입니다.
Funnel을 통해 JSON을 보내 S3 또는 DynamoDB에 데이터가 저장되어 있는지 확인하십시오.
요약
우선 Firehose가 아무래도 1분 데이터를 유지해 버리므로 Stream에 비하면 처리 시간이 걸려 버리는 것 같습니다
그렇다면 실시간 성을 원하면 Stream을 사용하고 싶습니다.
얼마 지나지 않아 도쿄에 도착하지 않을 것이라고 생각합니다.
그럼 또!
Reference
이 문제에 관하여(SORACOM Funnel을 사용하여 Kinesis Firehose에 JSON을 보내 S3에 PUT합니다 (이어서 DynamoDB에 씁니다)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/is_ryo/items/9e4b1fc8ce8f9af268ff텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)