Android에서 Kinesis로 직접 데이터 전송

11869 단어 amplifyKotlinKinesis

하고 싶은 일



Android에서 Kinesis에 직접 로그 데이터를 올립니다.
특히 대량의 시계열 데이터를 업로드해야 하는 경우 REST API 등을 경유하면 그 부분의 비용이나 유지보수를 무시할 수 없게 될 수 있습니다.
이번에는 Android 및 iOS 측 처리 리소스를 사용하여 Cognito에서 지불한 자격 증명을 바탕으로 Kinesis를 직접 호출하여 데이터를 업로드해 보세요.

주의점으로는 REST API 경유와 비교하면, 디바이스측에 Kinesis의 구현이 들어가기 때문에, 클라우드와의 의존관계가 증가합니다. 장치 수가 늘어날 때 최적화 방법을 고려할 때 하나의 옵션으로 생각하십시오.

또한 이번에는 us-east-1 지역을 사용합니다. 다른 지역의 경우 일부 변경이 필요합니다.

Amplify 개발 환경


npm install -g @aws-amplify/cli

Android Studio



빈 프로젝트를 기반으로 기능을 구현해 보겠습니다.





Android Studio 터미널에서 다음을 두드리십시오.
$ amplify init
? Enter a name for the project kinesisTest
? Enter a name for the environment dev
? Choose your default editor: IntelliJ IDEA
? Choose the type of app that you're building android
? Where is your Res directory:  app/src/main/res
Using default provider  awscloudformation
? Do you want to use an AWS profile? Yes
? Please choose the profile you want to use default

이어 인증을 추가.
$ amplify add auth
 Do you want to use the default authentication and security configuration? Default configuration
 Warning: you will not be able to edit these selections. 
 How do you want users to be able to sign in? Email
 Do you want to configure advanced settings? No, I am done.

$ amplify push

이제 나중에 자동으로 생성됩니다.
이번에는 사용하지 않지만 로그인 기능에 필요한 설정도 이루어집니다.

Cognito





Cognito 서비스를 선택한 후 federated identities -> identity pool을 확인하면 새 풀이 추가되었는지 확인할 수 있습니다.



여기에서 Unauthenticated role를 검토하여 IAM에서 편집합니다.
IAM 서비스로 이동하여 Permission을 편집합니다.


편의를 위해 AmazonKinesisFullAccess를 연결했습니다. 이제 익명 사용자도 Kinesis에 데이터를 올릴 수 있습니다. (어디까지나 테스트이므로 이 설정입니다.실제는 AuthRole에 대해서 어태치합니다)

Cognito로 돌아가서 다음을 확인하십시오.


이제 Cognito 설정이 끝났으므로 구현에 들어갑니다.

구현



Amplify 문서를 보면서 필요한 모듈을 넣습니다. 추가로 Kinesis와 비동기 처리를위한 코 루틴을 추가했습니다.

app/build.gradle
    //Base SDK
    implementation 'com.amazonaws:aws-android-sdk-core:2.15.+'
    //AppSync SDK
    implementation 'com.amazonaws:aws-android-sdk-appsync:2.8.+'
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

    implementation 'com.amazonaws:aws-android-sdk-kinesis:2.15.+'

    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"

MainActivity를 다시 씁니다. 추가한 것은 몇 줄입니다.
어색하고 난폭하지만, 이것으로 대량의 스트림을 Kinesis에 보냅니다.

package com.example.kinesistest

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import com.amazonaws.auth.CognitoCachingCredentialsProvider
import com.amazonaws.mobile.config.AWSConfiguration
import com.amazonaws.mobileconnectors.kinesis.kinesisrecorder.KinesisRecorder
import com.amazonaws.regions.Regions
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.json.JSONObject

class MainActivity : AppCompatActivity() {

    private lateinit var kinesisRecorder: KinesisRecorder

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        val credentialsProvider = CognitoCachingCredentialsProvider(
            applicationContext,
            AWSConfiguration(applicationContext)
        )

        kinesisRecorder =
            KinesisRecorder(applicationContext.cacheDir, Regions.US_EAST_1, credentialsProvider)

        GlobalScope.launch {

            (0..1000).map { a ->
                (0..1000).map { b ->
                    val json = JSONObject()
                    json.accumulate("time", System.currentTimeMillis())
                    json.accumulate("count", a)
                    json.accumulate("index", b)
                    kinesisRecorder.saveRecord(json.toString(), "mystream")

                }
                kinesisRecorder.submitAllRecords()
                Log.d("kinesis", "count:$a")
            }
        }

    }
}


키네시스 스트림



Android에서 보낸 데이터 레코드를 수신하도록 설정합니다.
mystream이라는 이름으로 샤드 수는 1입니다.
실제로는 처리량을 보면서 샤드 조정이 필요합니다.



실행



Android Studio로 돌아가서 실행합니다.
Kinesis Monitoring 화면에서 레코드가 들어왔는지 확인할 수 있습니다.



지금까지 Android 앱에서 Kinesis로 데이터를 올릴 수있었습니다.
클라이언트 측의 구현도 거기까지 많은 것은 없기 때문에, 로그만을 계속 올리고 싶은 유스 케이스에서는 검토의 여지가 있을지도 모릅니다.
Kinesis에서 받은 데이터를 처리하는 곳은, 또 이번. .

kinesis Firehose



Kinesis Stream 대신 Firehose를 사용하는 방법도 적습니다.

Android측의 변경점은 3점만입니다.
    //private lateinit var kinesisRecorder: KinesisRecorder
    private lateinit var kinesisFirehoseRecorder: KinesisFirehoseRecorder

(中略)
        //kinesisRecorder =
        //    KinesisRecorder(applicationContext.cacheDir, Regions.US_EAST_1, credentialsProvider)
        kinesisFirehoseRecorder =
            KinesisFirehoseRecorder(cacheDir, Regions.US_EAST_1, credentialsProvider)

(中略)
                    //kinesisRecorder.saveRecord(json.toString(), "mystream")
                    kinesisFirehoseRecorder.saveRecord(json.toString(), "mystream")


Firehose의 Delivery Stream도, mystream 라는 이름으로 작성해, 실행하면 Firehose로부터 앞의 S3등에 파일을 보존 가능합니다. (자세한 것은 할애)

좋은 웹페이지 즐겨찾기