Kiness Data Analytics의 Beam 자습서

19200 단어 AWSbeamflinktech
Kinessiss Data Analythics의 Apache Beam 강좌에서 필기에 빠졌기 때문이다.

배경 지식


Kiness Data Analytics(이하 KDA)


공식 페이지
AWS의 Managed Apache Flink입니다.

Apache Flink


공식 페이지
분산 데이터 처리 엔진.Apache Spark 및 GCPCloud Dataflow 파트너
마스코트 다람쥐는 매우 귀엽다.

Apache Beam


공식 페이지
Flink 및 Dataflow용 API SDK뱀으로 써.
  • 분포식 데이터 처리 엔진 사이를 쉽게 이동할 수 있음
  • 스트리밍 및 배치 작업은 동일하게 쓸 수 있음
  • 입력과 출력(Source, Sink) 등 라이브러리를 공동으로 사용할 수 있음
  • 등의 장점이 있다.
    마스코트인 반딧불이는 매우 귀엽다.
    핑크·KDA는 자체적으로API SDK도 쓸 수 있고 빔으로도 쓸 수 있다.
    (Dataflow Beam만 해당)
    이번 튜토리얼은 아파치 뱀에서 KDA 앱을 실행한 것이다.

    자습서 개요


    Kiness Data Streams에서 입력을 받아 CloudWatch Logs와 Kines Data Stream으로 출력하는 Beam 응용 프로그램을 구축하는 자습서.아마도 아래의 절차일 것이다.
  • 관련 리소스 준비
  • jar의 S3 배치
  • 입력과 출력을 위한 Kiness Data Stream
  • Kiness Data Stream에 위조 입력을 출력하는 파이썬 스크립트 만들기

  • 샘플 Java 코드 다운로드 및 업로드
  • KDA 적용 설정
  • 어플리케이션 시작
  • 튜토리얼이니까 쓴 대로 하면 움직일 텐데 몇 가지 곤혹스러운 점이 있어요.

    곤혹스러운 곳

  • 라이브러리 버전
  • Kiness Data Stream 확인 방법
  • 애플리케이션 종료 / 체크포인트
  • IAM 사용 주의
  • 라이브러리 버전


    2021/04/25 시점에서 샘플 코드 설명의 의존 관계를 유지합니다
  • 컴파일 성공
  • KDA에서 응용 프로그램을 실행할 때잘못 CloudWatch Logs
  • 로 출력
  • 비처리
  • 이런 문제가 있습니다.issue 이미 보고했으니 언젠가는 고칠 수 있을 거라고 생각합니다.

    Kiness Data Stream 데이터 확인


    이 튜토리얼에는 Kiness Data Streams에 데이터를 기록했지만, 이 확인 방법(※)은 기재되지 않았다.Kiness Data Streams 설명서CLI를 통한 데이터 확인 방법에 나와 있습니다.
    또한 위 문서에 기재된 (※ ※)는 빈 데이터(데이터 스트림에서도)로 되돌아오는 경우가 있으므로 주의해야 합니다.
    Kiness Data Firehose에서는 S3에 데이터를 놓고 확인하거나 아래와 같이 순환하는 것이 좋다.
    # (Streamの最初から見る場合)
    # first=$(aws kinesis --profile プロファイル名 get-records --shard-iterator  $(aws kinesis --profile プロファイル名 get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name ExampleOutputStream | jq  -r .ShardIterator))
    
    # このスクリプト起動後のデータから見る場合
    first=$(aws kinesis --profile プロファイル名 get-records --shard-iterator  $(aws kinesis --profile プロファイル名 get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type LATEST --stream-name ExampleOutputStream | jq  -r .ShardIterator))
    echo $first
    itr=$(echo $first | jq -r .NextShardIterator)
    
    while true
    do
        echo $itr
        m=$(aws kinesis --profile adminuser get-records --shard-iterator $itr)
        # メッセージの中身見る場合は、base64 -dなどにパイプしてデコード
        echo $m
        itr=$(echo $m | jq -r .NextShardIterator)
        sleep 1
    done
    
    ※ 튜토리얼은 "You can check the Kinesis Data Analythics metrics on the CloudWatch to verify the aplication is working"
    ※※ 「you may receive zero or more records even if there are records in your stream, and any records returned may not represent all the records currently in your stream」

    애플리케이션 종료 / 체크포인트


    KDA의 관리 화면 중 3가지 옵션은 애플리케이션을 중지하는 것입니다.
  • 중지
  • 스냅샷을 생성하지 않고 중지
  • 삭제

  • 각자의 자세한 설명을 찾지 못한 것이 아래의 행동인가?내 생각엔
  • 보존 적용에 대한 설정/상태(스냅샷) 적용 중지
  • StopApplication
  • 스냅샷을 생성하지 않고 중지 애플리케이션을 중지하고 설정을 중지하지만 상태를 남기지 않는 애플리케이션을 중지합니다.
  • StopApplication(force)
  • 설정 및 정보를 포함하는 애플리케이션을 중지, 삭제할 때 삭제
  • DeleteApplication
  • 그중 이번 강좌에서 선택한 것은 삭제였다.
    또'정지'를 선택하면 아무리 기다려도 프로그램을 끝낼 수 없다.
    빔이 아닌 플링크자습서 앱에서는'정지'가 완료됐기 때문에 KDA에서 빔 사용상의 제한이 있었을까?내 생각엔
    (원인이 불분명하니 아는 사람이 있으면 알려주세요...)

    IAM 사용 주의


    KDA 애플리케이션을 제작할 때 애플리케이션에서 사용하는 IAM을 제작하거나 기존 IAM을 선택합니다.
    새로 만들 때는 괜찮지만 다른 KDA 앱에서 사용하는 IAM을 사용할 때는
    할당에 주의하십시오CloudWatch Logs 권한.
    새로 제작된 IAM에서는 응용프로그램 이름에 해당하는 로그 그룹에 대해서만 Put Events 권한이 설정되어 있으며, 이를 직접 사용하면 클라우드워치 로그에 로그가 없는 현상이 발생합니다.
    (* 또는 해당되는 적용 로그 그룹을 설정하면 O.K.)

    (부록) 라이브러리 버전 불일치 오류


    길었어나는 처음 봤을 때부터 판본이라는 것을 알기가 매우 어렵다고 생각한다.
    "message": "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:207)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.util.concurrent.CompletionException: java.lang.VerifyError: Bad type on operand stack\nException Details:\n Location:\n org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator.translateNode(Lorg/apache/beam/sdk/transforms/PTransform;Lorg/apache/beam/runners/flink/FlinkStreamingTranslationContext;)V @467: invokespecial\n Reason:\n Type 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' (current frame, stack[4]) is not assignable to 'org/apache/flink/streaming/api/transformations/StreamTransformation'\n Current Frame:\n bci: @467\n flags: { }\n locals: { 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/beam/sdk/transforms/PTransform', 'org/apache/beam/runners/flink/FlinkStreamingTranslationContext', 'java/lang/String', 'org/apache/beam/sdk/values/PCollection', 'org/apache/beam/sdk/values/WindowingStrategy', 'org/apache/beam/sdk/coders/KvCoder', 'org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/sdk/util/WindowedValue$FullWindowedValueCoder', 'org/apache/beam/runners/flink/translation/types/CoderTypeInformation', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector', 'org/apache/flink/streaming/api/datastream/KeyedStream', 'org/apache/beam/sdk/transforms/CombineFnBase$GlobalCombineFn', 'org/apache/beam/runners/core/SystemReduceFn', 'org/apache/beam/sdk/coders/Coder', 'org/apache/flink/api/common/typeinfo/TypeInformation', 'java/util/List', 'org/apache/flink/api/java/tuple/Tuple2', 'org/apache/beam/sdk/values/TupleTag', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' }\n stack: { uninitialized 455, uninitialized 455, 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/flink/streaming/api/environment/StreamExecutionEnvironment', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' }\n Bytecode:\n 0000000: 2cb8 000b 4e2c 2bb6 0003 c000 043a 0419\n 0000010: 04b6 0005 3a05 1904 b600 0cc0 000d 3a06\n 0000020: 1906 b600 0e19 06b6 000f 1904 b600 05b6\n 0000030: 0006 b600 10b8 0011 3a07 2c19 04b6 0012\n 0000040: 3a08 1907 1904 b600 05b6 0006 b600 10b8\n 0000050: 0013 3a09 bb00 1459 1909 b700 153a 0a19\n 0000060: 08bb 0016 592c b600 17b7 0018 b600 1919\n 0000070: 0ab6 001a 121b b600 1c3a 0bbb 001d 5919\n 0000080: 06b6 000e b700 1e3a 0c19 0b19 0cb6 001f\n 0000090: 3a0d 2bc0 0008 b600 203a 0e19 06b6 000e\n 00000a0: 190e 1904 b600 21b6 0022 1906 b800 23b8\n 00000b0: 0024 3a0f 2c2c 2bb6 0025 c000 04b6 0026\n 00000c0: 3a10 2c2c 2bb6 0025 c000 04b6 0027 3a11\n 00000d0: 2bc0 0008 b600 093a 1219 12b9 000a 0100\n 00000e0: 9900 61bb 0028 5912 29b7 002a 3a13 bb00\n 00000f0: 2b59 190f 2d19 0919 13b8 002c bb00 2d59\n 0000100: 1913 1910 b700 2e19 05bb 002f 59b7 0030\n 0000110: b800 2c2c b600 1719 06b6 000e 190c b700\n 0000120: 313a 1419 0d2d 1911 1914 b600 322d b600\n 0000130: 333a 152c 2c2b b600 2519 15b6 0034 a700\n 0000140: af19 122c b800 353a 13bb 0028 5912 29b7\n 0000150: 002a 3a14 bb00 2b59 190f 2d19 0919 14b8\n 0000160: 002c bb00 2d59 1914 1910 b700 2e19 0519\n 0000170: 13b4 0036 c000 3719 122c b600 1719 06b6\n 0000180: 000e 190c b700 313a 15bb 0038 5919 0db6\n 0000190: 0039 1913 b400 3ac0 003b b600 3cb6 003d\n 00001a0: 2bb6 003e 1915 1911 190d b600 3fb7 0040\n 00001b0: 3a16 1916 190d b600 41b6 0042 1916 190d\n 00001c0: b600 4301 b600 44bb 0045 592a 190d b600\n 00001d0: 4619 16b7 0047 3a17 190d b600 4619 16b6\n 00001e0: 0048 2c2c 2bb6 0025 1917 b600 34b1 \n Stackmap Table:\n full_frame(@321,{Object[#73],Object[#164],Object[#165],Object[#166],Object[#4],Object[#100],Object[#13],Object[#167],Object[#59],Object[#125],Object[#20],Object[#59],Object[#29],Object[#168],Object[#136],Object[#169],Object[#170],Object[#171],Object[#172]},{})\n same_frame_extended(@493)\n\n\tat java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)\n\tat java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)\n\t... 6 more\nCaused by: java.lang.VerifyError: Bad type on operand stack\nException Details:\n Location:\n org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator.translateNode(Lorg/apache/beam/sdk/transforms/PTransform;Lorg/apache/beam/runners/flink/FlinkStreamingTranslationContext;)V @467: invokespecial\n Reason:\n Type 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' (current frame, stack[4]) is not assignable to 'org/apache/flink/streaming/api/transformations/StreamTransformation'\n Current Frame:\n bci: @467\n flags: { }\n locals: { 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/beam/sdk/transforms/PTransform', 'org/apache/beam/runners/flink/FlinkStreamingTranslationContext', 'java/lang/String', 'org/apache/beam/sdk/values/PCollection', 'org/apache/beam/sdk/values/WindowingStrategy', 'org/apache/beam/sdk/coders/KvCoder', 'org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/sdk/util/WindowedValue$FullWindowedValueCoder', 'org/apache/beam/runners/flink/translation/types/CoderTypeInformation', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector', 'org/apache/flink/streaming/api/datastream/KeyedStream', 'org/apache/beam/sdk/transforms/CombineFnBase$GlobalCombineFn', 'org/apache/beam/runners/core/SystemReduceFn', 'org/apache/beam/sdk/coders/Coder', 'org/apache/flink/api/common/typeinfo/TypeInformation', 'java/util/List', 'org/apache/flink/api/java/tuple/Tuple2', 'org/apache/beam/sdk/values/TupleTag', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' }\n stack: { uninitialized 455, uninitialized 455, 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/flink/streaming/api/environment/StreamExecutionEnvironment', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' }\n Bytecode:\n 0000000: 2cb8 000b 4e2c 2bb6 0003 c000 043a 0419\n 0000010: 04b6 0005 3a05 1904 b600 0cc0 000d 3a06\n 0000020: 1906 b600 0e19 06b6 000f 1904 b600 05b6\n 0000030: 0006 b600 10b8 0011 3a07 2c19 04b6 0012\n 0000040: 3a08 1907 1904 b600 05b6 0006 b600 10b8\n 0000050: 0013 3a09 bb00 1459 1909 b700 153a 0a19\n 0000060: 08bb 0016 592c b600 17b7 0018 b600 1919\n 0000070: 0ab6 001a 121b b600 1c3a 0bbb 001d 5919\n 0000080: 06b6 000e b700 1e3a 0c19 0b19 0cb6 001f\n 0000090: 3a0d 2bc0 0008 b600 203a 0e19 06b6 000e\n 00000a0: 190e 1904 b600 21b6 0022 1906 b800 23b8\n 00000b0: 0024 3a0f 2c2c 2bb6 0025 c000 04b6 0026\n 00000c0: 3a10 2c2c 2bb6 0025 c000 04b6 0027 3a11\n 00000d0: 2bc0 0008 b600 093a 1219 12b9 000a 0100\n 00000e0: 9900 61bb 0028 5912 29b7 002a 3a13 bb00\n 00000f0: 2b59 190f 2d19 0919 13b8 002c bb00 2d59\n 0000100: 1913 1910 b700 2e19 05bb 002f 59b7 0030\n 0000110: b800 2c2c b600 1719 06b6 000e 190c b700\n 0000120: 313a 1419 0d2d 1911 1914 b600 322d b600\n 0000130: 333a 152c 2c2b b600 2519 15b6 0034 a700\n 0000140: af19 122c b800 353a 13bb 0028 5912 29b7\n 0000150: 002a 3a14 bb00 2b59 190f 2d19 0919 14b8\n 0000160: 002c bb00 2d59 1914 1910 b700 2e19 0519\n 0000170: 13b4 0036 c000 3719 122c b600 1719 06b6\n 0000180: 000e 190c b700 313a 15bb 0038 5919 0db6\n 0000190: 0039 1913 b400 3ac0 003b b600 3cb6 003d\n 00001a0: 2bb6 003e 1915 1911 190d b600 3fb7 0040\n 00001b0: 3a16 1916 190d b600 41b6 0042 1916 190d\n 00001c0: b600 4301 b600 44bb 0045 592a 190d b600\n 00001d0: 4619 16b7 0047 3a17 190d b600 4619 16b6\n 00001e0: 0048 2c2c 2bb6 0025 1917 b600 34b1 \n Stackmap Table:\n full_frame(@321,{Object[#73],Object[#164],Object[#165],Object[#166],Object[#4],Object[#100],Object[#13],Object[#167],Object[#59],Object[#125],Object[#20],Object[#59],Object[#29],Object[#168],Object[#136],Object[#169],Object[#170],Object[#171],Object[#172]},{})\n same_frame_extended(@493)\n\n\tat org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.<clinit>(FlinkStreamingTransformTranslators.java:156)\n\tat org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.enterCompositeTransform(FlinkStreamingPipelineTranslator.java:103)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)\n\tat org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)\n\tat org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)\n\tat org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)\n\tat org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)\n\tat org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:88)\n\tat org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:117)\n\tat org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)\n\tat com.amazonaws.kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:86)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)\n\t... 6 more\n",
    

    좋은 웹페이지 즐겨찾기