Lambda에서 지정한 S3에 입출력하는 Glue를 기동시키는 이야기

7971 단어 람다S3glueAWS

배경



작업 중에 AWS의 Glue를 사용할 기회가 있어, 그 때에 지정한 S3에서 데이터를 읽어들여, Glue로 데이터를 가공해 다른 지정한 S3 버킷에 데이터를 출력하는 것을 실시했으므로, 그 흐름 그리고 방법을 메모가 떠나두고 싶습니다.

구축 내용





S3(Input)->Glue->S3(Output)



인수 (매개 변수) 가져 오기


# パラメータを取得
args = getResolvedOptions(sys.argv, [
                          'JOB_NAME', 'source_path', 'out_path'])
# 各パラメータを変数におく
source_path = args['source_path']
out_path = args['out_path']

이번에는 Lambda에서 두 개의 인수 인 'source_path', 'out_path'를받습니다.

S3에서 데이터 수신


# 指定したS3のバケット配下のファイルをすべて読み込む
DataSource0 = glueContext.create_dynamic_frame_from_options(
    connection_type = "s3",
    connection_options = {'paths': source_path, 'recurse': True},
    format="json")

인수로받은 source_path를 넣고 지정된 버킷의 경로 아래에있는 모든 파일을 읽습니다.

다른 S3 버킷으로 출력


# 指定したS3のバケットにファイルを出力する
df.write.options(header=True, codec="gzip")\
    .mode('overwrite').csv(out_path)

지정된 버킷의 경로로 CSV 형식으로 gzip으로 압축 한 다음 출력

Lambda에서 인수를 사용하여 Glue를 시작합니다.


from logging import getLogger, INFO
from datetime import datetime, timedelta, timezone
from app.libs.sns.sns_client import SnsClient
import boto3
import os
import json


logger = getLogger(__name__)
logger.setLevel(INFO)

# 起動するglueのジョブ名
# JOB_NAMEは環境変数
JOB_NAME = os.environ['JOB_NAME']
# エラー通知のSNSトピック
ERROR_PUBLISH_SNS_ARN = os.environ['ERROR_PUBLISH_SNS_ARN']


class TriggerJob:
    def __init__(self):
        self.sns = SnsClient()

    def handler(self, event):
        start_arg = {}
        try:

            # Glueのジョブを実行する
            start_arg = {
                '--out_path': "<指定のアウトプットするs3のパス>"
                '--source_path': "<指定のインプットするs3のパス>"
            }

            glue = boto3.client('glue')
            # glueJob起動
            response = glue.start_job_run(
                JobName=JOB_NAME,
                AllocatedCapacity=2,
                Arguments=start_arg)
            logger.info("response:{}".format(response))
            return response
        except Exception as err:
            logger.error("Glueジョブの起動でエラーが発生しました。err: {}".format(err))

마지막으로



이번에는 lambda에서 Glue를 지정 S3에서 입력과 출력을하는 코드를 싣습니다.
실제로는 다른 처리라도 있어, 관계없는 부분은 삭제하고 있습니다만, 불필요한 것이 아직 있을지도 모릅니다만 양해 바랍니다.
도움이되면 다행입니다.

좋은 웹페이지 즐겨찾기