SORACOM Funnel을 사용하여 Kinesis Firehose에 JSON을 보내 S3에 PUT합니다 (이어서 DynamoDB에 씁니다)

소개



지금 무렵 Kinesis Firehose라든지 지금 갱감이 있습니다만 제대로 사용하고 싶었다고 하고, Funnel로부터 Firehose에 JSON을 던지고 싶다고 하는 요망이 빠져 왔기 때문에, 그다지 요구는 없다고 생각합니다만 기사로 해 보았다

처리 흐름



※점선 부분은 덤



Create S3



Firehose가 PUT하는 S3를 만들어 둡니다. (만드는 방법은 생략합니다)

Create Kinesis Firehose



콘솔에서 Firehose 화면으로 이동하여 'Create delivery stream'에서 Firehose를 만듭니다.
아직 도쿄 지역에서 사용할 수 없기 때문에 오리건 지역에서 만들었습니다.
  • "Delivery stream name"에 임의의 문자열을 입력하고 그대로 "next"를 클릭
  • Firehose에서 Lambda를 킥하지 않기 때문에 그대로 "next"를 클릭
  • Destination에서 S3을 선택하고 (아마도 기본적으로 선택됨) PUT 할 버킷 선택
  • 「S3 buffer conditions」의 「Buffer interval」을 최소의 「60」으로 설정. 이제 60초 동안 기다렸다가 S3로 PUT
  • 다른 데이터를 압축할지, 암호화할지 등을 임의로 설정하여 IAMrole을 만듭니다
  • 새 IAMrole을 만들면 이전 설정에서 필요한 권한이 있는 IAMrole이 만들어집니다
  • "Status"가 "ACTIVE"가되면 작성 완료

  • 이제 Firehose에 PUTRecord 된 데이터는 S3에 PUTObject됩니다.

    ※AWS-CLI로 테스트 실시
    aws firehose put-record --delivery-stream-name deliveryStreamName --record Data="test"
    

    SORACOM Funnel 설정



    새로운 SIM 그룹을 만들고 Funnel을 설정합니다.
  • 대상 서비스에서 Amazon Kinesis Firehose 선택
  • "대상 URL"에 "https://firehose.[region].amazonaws.com/[deliveryStreamName]"
  • 를 입력하십시오.
  • "자격 증명"에는 Kinesis Firehose의 실행 권한이있는 키 정보가있는 것을 만들고 적용합니다
  • 전송 데이터 형식으로 JSON을 선택합니다

  • 이제 "http://funnel.soracom.io'에 요청을 보내면 Firehose로 데이터가 전송됩니다.

    자, 테스트 시간입니다.



    방금 설정한 SIM 그룹의 SIM이 박힌 장치에서 POST 방법을 보냅니다.
     curl -X POST --data @test.json -H "Content-Type: application/json" http://funnel.soracom.io
    

    전송 후 S3에 데이터가 저장되면 성공합니다.



    S3에 PUT이 발생하면 Lambda를 킥으로 DynamoDB에 데이터 저장
    대략적인 흐름
    1. Funnel을 통해 Firehose에 JSON 보내기
    2. Firehose가 S3로 PutRecord
    3. S3가 람다를 킥
    4. Lambda가 S3에 Put 된 JSON을 얻고 DynamoDB에 Put

    람다



    Lambda의 역할에는 DynamoDB에 대한 권한이 있습니다.

    index.js
    const AWS = require("aws-sdk");
    const co = require("co");
    
    const dynamodb = new AWS.DynamoDB.DocumentClient({
      region: "us-west-2"
    });
    const s3 = new AWS.S3();
    
    const dynamoPutData = require("./lib/dynamo_put_data");
    const s3GetObject = require("./lib/s3_get_object");
    
    
    exports.handler = (event, context, callback) => {
      console.log(JSON.stringify(event));
      co(function *() {
        // S3のBucketNameとPUTされたJSONのKey情報を取得
        const bucketName = event["Records"][0]["s3"]["bucket"]["name"];
        const objectKey = event["Records"][0]["s3"]["object"]["key"];
        // S3からJSONファイルを取得
        const s3GetData = yield s3GetObject.getObject(s3, bucketName, objectKey);
        const item = {
          id: "test",
          payloads: s3GetData
        };
        return yield dynamoPutData.putDynamoDB(dynamodb, item);
      }).then(() => {
        console.log("success");
      }).catch((err) => {
        console.log(err);
      });
    
    };
    

    s3_get_object.js
    class s3GetObject {
    
      /**
       * S3からObjectを取得する
       * @param s3
       * @param bucket
       * @param key
       * @returns {Promise}
       */
      static getObject(s3, bucket, key) {
        return new Promise((resolve, reject) => {
          const params = {
            Bucket: bucket,
            Key: key
          };
          s3.getObject(params, (err, data) => {
            if(err) {
              return reject(err);
            } else {
              const getData = JSON.parse(data.Body.toString());
              return resolve(getData["payloads"]);
            }
          });
        });
      }
    }
    
    module.exports = s3GetObject;
    

    dynamo_put_data.js
    class dynamoPutData {
    
       /**
       * DynamoDBにデータを保存する(Put)
       * @param {DocumentClient} dynamoDB
       * @param item
       * @returns {Promise}
       */
      static putDynamoDB(dynamoDB, item) {
        return new Promise((resolve, reject) => {
          const params = {
            TableName: "tableName",
            Item: item
          };
          return dynamoDB.put(params, (err, data) => {
            if (err) {
              console.log(err);
              return reject(err);
            } else {
              console.log(data);
              return resolve(data);
            }
          });
        });
      }
    }
    
    module.exports = dynamoPutData;
    

    S3


  • S3 콘솔 화면에서 속성 ⇒ 이벤트 선택
  • "추가 알림"선택
  • "Name"에는 임의의 문자열, "Events"는 Put, "Send to"는 "Lambda Function"을 선택하고 방금 만든 Lambda를 선택합니다
  • .

    이것으로 작업 완료입니다.
    Funnel을 통해 JSON을 보내 S3 또는 DynamoDB에 데이터가 저장되어 있는지 확인하십시오.

    요약



    우선 Firehose가 아무래도 1분 데이터를 유지해 버리므로 Stream에 비하면 처리 시간이 걸려 버리는 것 같습니다
    그렇다면 실시간 성을 원하면 Stream을 사용하고 싶습니다.
    얼마 지나지 않아 도쿄에 도착하지 않을 것이라고 생각합니다.

    그럼 또!

    좋은 웹페이지 즐겨찾기