AWS Kinesis Streams + Spark Streaming에서 스트림 처리를 시도했습니다.
소개
앞으로는 스트림 처리의 시대가 올 것이다. 어쩌면. 그런 생각이 듭니다.
그래서 적당한 데이터 발생을 발생시켜 Kinesis Streams+Spark streaming에서 스트림 처리를 체험해 보았습니다.
Spark 2.0.0 (EMR 사용)
Kinesis Streams란?
옛날에는 단순히 Kinesis라고 불렸지만 나중에 Kinesis Firehose와 Kinesis Analytics가 추가되었기 때문에 Kinesis 3 형제 중 한 명으로 불렸습니다.
대규모로 스케일 가능하고, 메세지가 일정 시간 보존되는 PubSub 형 큐를 가리킵니다.
즉, 데이터를 발생시키는 Producer 상당과 후단의 처리인 Consumer 상당을 작성할 필요가 있습니다.
Spark Streaming이란?
대규모 데이터 분산 처리 프레임워크인 Apache spark 라이브러리 중 하나로 약간의 코드 변경으로 스트리밍 처리를 작성할 수 있습니다.
이번 Spark 환경은 Amazon EMR(매니지드 Hadoop, Hadoop 에코시스템도 사용할 수 있다)로 준비합니다.
htps : // 아 ws. 아마존. 이 m/jp/에 mr/
준비
Kinesis streams 준비
AWS 콘솔에서 Kinesis streams 페이지로 이동합니다.
결정하기만 하면 됩니다. 처리량 요구 사항 등에 따라 결정하십시오. 여기에서는 적당히 2로 합니다.
EMR 준비
AWS 콘솔에서 EMR 페이지로 이동합니다.
서브넷을 지정하려면 클러스터 생성에서 고급 옵션으로 이동합니다.
Spark에 반드시 체크를 하십시오.
나머지는 기본적으로 문제가 없지만 마스터 보안 그룹은 기본적으로 SSH 포트가 비어 있지 않으므로 다른 보안 그룹을 추가해야합니다.
EC2
데이터 작성용으로 EC2도 1대 준비해 둡니다. boto를 사용할 수 있다면 뭐든지 좋습니다.
데이터 생성(producer)
데이터를 발생시키는 스크립트입니다. 다양한 언어로 작성할 수 있지만 여기에서는 Python을 사용합니다.
put.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import boto.kinesis,datetime,time,random
connection = boto.kinesis.connect_to_region('ap-northeast-1')
stream_name = 'stream-test'
partition_key = str(time.time()) + str(random.random())
source_str = 'abcdefghijklmnopqrstuvwxyz'
for loop in range(10):
for i in range(10):
sampledata=str(i)+","+random.choice(source_str)
put_data = connection.put_record(stream_name,sampledata,partition_key)
time.sleep(10)
하고 있는 일로는
입니다.
partition_key
로 설정한 값을 해시화한 것을 가지고 샤드의 분배가 행해지는 것 같습니다.그래서 어느 정도 무작위가 되도록 하고 있습니다.
데이터 처리(Consumer)
Kinesis streams로부터 데이터를 취득해, 어떠한 처리를 하는 스크립트입니다.
get.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
if __name__ == "__main__":
sc = SparkContext()
ssc = StreamingContext(sc, 5)
appName="stream-app"
streamName="stream-test"
endpointUrl="https://kinesis.ap-northeast-1.amazonaws.com"
regionName="ap-northeast-1"
lines = KinesisUtils.createStream(
ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 1)
lines.pprint()
ssc.start()
ssc.awaitTermination()
Spark 안에 Kinesis strams를 이용하기 위한 모듈이 준비되어 있으므로 그것을 사용합니다.
우선 Spark를 이용하기 위한 입구라고도 할 수 있는 SparkContext와 Streaming으로 이용하기 위한 StreamingContext를 준비합니다.
StreamingContext는 Windows 단위로 처리를 하므로, 여기에서는 5초 간격으로 하고 있습니다.
sc = SparkContext()
ssc = StreamingContext(sc, 5)
스트림을 만듭니다. appname은 뭐든지 좋기 때문에 stream-app로 하고 있습니다.
appName="stream-app"
streamName="stream-test"
endpointUrl="https://kinesis.ap-northeast-1.amazonaws.com"
regionName="ap-northeast-1"
lines = KinesisUtils.createStream(
ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 1)
표시합니다.
lines.pprint()
스트림 처리를 시작합니다.
ssc.start()
ssc.awaitTermination()
실행
EMR에서 Spark 작업을 실행합니다. 이 때 JAR 파일을 지정합니다.
spark-submit --jars /usr/lib/spark/external/lib/spark-streaming-kinesis-asl-assembly_*.jar ./get.py
EC2에서 데이터를 생성합니다.
python put.py
조금 기다리면 Consumer 측에서 표시됩니다.
-------------------------------------------
Time: 2016-11-03 01:38:00
-------------------------------------------
0,c
1,e
2,x
3,w
4,g
5,h
6,t
7,q
8,c
9,o
행 수를 계산하려면 다음과 같이 하십시오.
counts=lines.count()
counts.pprint()
이것으로 스트림 처리를 구현할 때의 이미지가 잡았을까라고 생각합니다.
성능 등에 대해서는 아무것도 생각하고 있지 않으므로, 요 검토입니다.
Reference
이 문제에 관하여(AWS Kinesis Streams + Spark Streaming에서 스트림 처리를 시도했습니다.), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/take60/items/a35925e9569e2b7fa63e텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)