가이드 - AWS Glue 및 PySpark

16575 단어 bigdatacloudpysparkaws
이 글에서 저는 AWS Glue와 PySpark 기능을 썼습니다. 이 기능들은 AWS 파이프라인을 만들고 AWS Glue PySpark 스크립트를 작성하는 데 도움이 될 수 있습니다.
AWS Glue는 완벽한 관리형 추출, 변환 및 로드(ETL) 서비스로 다양한 소스로부터 대량의 데이터 세트를 처리하여 분석 및 데이터 처리를 수행합니다.
AWS 접착 작업을 만들 때 Spark, Spark Streaming, Python 셸 중에서 선택할 수 있습니다.이 작업은 AWS Glue에서 생성한 권장 스크립트, 사용자가 제공한 기존 스크립트 또는 사용자가 작성한 새 스크립트를 실행할 수 있습니다.이 외에도 다양한 모니터링 옵션, 작업 수행 능력, 시간 초과, 지연 알림 한도값, 덮어쓸 수 없고 덮어쓸 수 있는 파라미터를 선택할 수 있습니다.
풀 작업 유형 및 풀 버전
스크립트 파일 이름 및 기타 사용 가능한 옵션
AWS는 최근 Glue 2.0 버전을 발표했는데 스파크 ETL 작업 시작 시간이 10배 빠르고 비용 계산 지속 시간을 최소 10분에서 최소 1분으로 줄인 것이 특징이다.
https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration
AWS Glue를 사용하면 개발 노드를 만들고 SageMaker 또는 Zeppelin 노트북을 구성하여 Glue ETL 스크립트를 개발하고 테스트할 수 있습니다.

나는 ETL 스크립트를 작성하고 테스트하기 위해 개발 노드에 연결된 SageMaker 노트북을 만들었다.익숙한 언어에 따라 노트북을 시작할 수 있습니다.

이제 AWS Glue와 PySpark의 특정한 특성과 기능에 대해 이야기해 봅시다.
1. Spark 데이터 프레임
Spark DataFrame은 이름 열로 구성된 분산 데이터 집합입니다.그것은 개념적으로 관계 데이터베이스에 있는 표에 해당한다.RDD, csv, json, parquet 등 파일 형식에서 데이터 프레임을 만들 수 있습니다.
SageMaker Sparkmagic(PySpark) 코어 노트북을 사용하면 Spark 세션이 자동으로 생성됩니다.

데이터 프레임 만들기 -
# from CSV files 
S3_IN = "s3://mybucket/train/training.csv"

csv_df = (
    spark.read.format("org.apache.spark.csv")
    .option("header", True)
    .option("quote", '"')
    .option("escape", '"')
    .option("inferSchema", True)
    .option("ignoreLeadingWhiteSpace", True)
    .option("ignoreTrailingWhiteSpace", True)
    .csv(S3_IN, multiLine=False)
)

# from PARQUET files 
S3_PARQUET="s3://mybucket/folder1/dt=2020-08-24-19-28/"

df = spark.read.parquet(S3_PARQUET)

# from JSON files
df = spark.read.json(S3_JSON)

# from multiline JSON file 
df = spark.read.json(S3_JSON, multiLine=True)
2.GlueContext
GlueContext는 AWS Glue에서 동적 프레임워크를 읽고 쓰는 입구점입니다.이것은 아파치 스파크 SQL SQLContext 대상을 포장하여 아파치 스파크 플랫폼과 상호작용하는 메커니즘을 제공한다.
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame

glueContext = GlueContext(SparkContext.getOrCreate())

3. 동적 프레임워크
AWS Glue DynamicFrames는 SparkSQL 데이터 프레임과 유사합니다.이것은 분포식 데이터 집합을 표시하며 패턴을 지정할 필요가 없다.불일치 값과 유형을 포함하는 데이터를 읽고 변환하는 데도 사용할 수 있습니다.
다음 옵션을 사용하여 DynamicFrame을 생성할 수 있습니다.

  • rdd에서 동적 프레임 생성 - Apache Spark 탄성 분산 데이터 세트(rdd)에서 생성

  • 디렉터리에서 동적 프레임워크 만들기 - 접착 디렉터리 데이터베이스와 테이블 이름으로 만들기

  • create_dynamic_frame_from_options - 지정된 연결 및 형식을 사용하여 생성합니다.예 - 연결 유형(예: Amazon S3, Amazon Redshift 및 JDBC
  • 를 사용하여 데이터 프레임 사이에서 동적 프레임을 변환할 수 있습니다.toDF() 및 fromDF().
    #create DynamicFame from S3 parquet files
    datasource0 = glueContext.create_dynamic_frame_from_options(
                connection_type="s3",
                connection_options = {
                    "paths": [S3_location]
                },
                format="parquet",
                transformation_ctx="datasource0")
    
    #create DynamicFame from glue catalog 
    datasource0 = glueContext.create_dynamic_frame.from_catalog(
               database = "demo",
               table_name = "testtable",
               transformation_ctx = "datasource0")
    
    #convert to spark DataFrame 
    df1 = datasource0.toDF()
    
    #convert to Glue DynamicFrame
    df2 = DynamicFrame.fromDF(df1, glueContext , "df2")
    
    추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog
    4. AWS 풀 작업 책갈피
    AWS Glue Job bookmark은 예약된 간격으로 작업을 다시 실행할 때 증량 데이터를 처리하여 오래된 데이터를 다시 처리하지 않도록 합니다.
    추가 읽기 - https://aprakash.wordpress.com/2020/05/07/implementing-glue-etl-job-with-job-bookmarks/
    https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
    5. 데이터 쓰기
    데이터 세트를 변환하는 DynamicFrame은 S3에 비분할 (기본값) 또는 분할 단위로 쓸 수 있습니다.connection_ 옵션에서 "partitionKeys"파라미터를 지정하여 데이터를 섹션으로 S3에 쓸 수 있습니다.AWS Glue는 이러한 데이터 세트를 벌집 파티션에 구성합니다.
    다음 코드 예시에서 AWS Glue DynamicFrame은 년, 월, 일, 시간에 따라 구역을 나누고 벌집식 구역의 맞춤법 형식으로 S3에 기록합니다.
    s3://bucket\u name/table\u name/year=2020/month=7/day=13/hour=14/part-000-671c.c000.snappy.꽃마루
    S3_location = "s3://bucket_name/table_name"
    
    datasink = glueContext.write_dynamic_frame_from_options(
        frame= data,
        connection_type="s3",
        connection_options={
            "path": S3_location,
            "partitionKeys": ["year", "month", "day", "hour"]
        },
        format="parquet",
        transformation_ctx ="datasink")
    추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_options
    6. "glueparquet"형식 옵션
    glueparquet은 성능이 최적화된 Apache parquet 컴파일러 형식으로 DynamicFrame을 컴파일하는 데 사용됩니다.동적 계산과 수정 모드입니다.
    datasink = glueContext.write_dynamic_frame_from_options(
                   frame=dynamicframe,
                   connection_type="s3",
                   connection_options={
                      "path": S3_location,
                      "partitionKeys": ["year", "month", "day", "hour"]
                   },
                   format="glueparquet",
                   format_options = {"compression": "snappy"},
                   transformation_ctx ="datasink")
    추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html
    7.S3 목록기 및 기타 메모리 관리 최적화 옵션
    AWS Glue는 DynamicFrame에 데이터를 읽어들일 때 S3의 파일을 나열하는 최적화 메커니즘을 제공합니다. 추가_options 매개 변수인 "uses3ListImplementation"을 사용하여 DynamicFrame을 사용할 수 있습니다.
    추가 읽기 - https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/
    8, S3 경로 청소
    purge\U s3\U path는 보존 기간이나 다른 필터에 따라 지정한 s3 경로에서 파일을 차례로 삭제할 수 있는 좋은 옵션입니다.예를 들어, AWS Glue 작업을 실행하고 있다면, 매일 테이블을 완전히 새로 고치고, 데이터를 S3에 기록합니다. 이름은 S3://bucketname/tablename/dt=입니다.접착 작업 자체가 정의한 보존 기간에 따라 dt=s3 폴더를 삭제할 수 있습니다.또 다른 옵션은 접두사를 사용하여 S3 스토리지 배럴 라이프 사이클 정책을 설정하는 것입니다.
    #purge locations older than 3 days
    print("Attempting to purge S3 path with retention set to 3 days.")
    glueContext.purge_s3_path(
        s3_path=output_loc, 
        options={"retentionPeriod": 72})
    purge\u table,transition\u table,transition\u s3\u path 등 다른 옵션을 사용할 수 있습니다.transition_table 옵션은 지정한 디렉터리의 데이터베이스와 테이블 변환을 위해 Amazon S3에 저장된 파일의 저장 클래스입니다.
    추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path
    9.Relationalize 클래스
    Relationalize 클래스는 끼워 넣은 json의 가장 바깥쪽을 펴는 데 도움을 줄 수 있습니다.
    추가 읽기 - https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
    10. Unbox 클래스
    Unbox 클래스는 DynamicFrame의 문자열 필드를 지정한 형식 형식으로 포장하지 않도록 도와줍니다(선택 사항).
    추가 읽기 - https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
    11. Unnest 클래스
    Unnest 클래스는 중첩된 객체를 DynamicFrame의 최상위 요소로 플랫합니다.
    root
    |-- id: string
    |-- type: string
    |-- content: map
    |    |-- keyType: string
    |    |-- valueType: string

    With content attribute/column being map Type, we can use unnest class to unnest each key elements.

    unnested = UnnestFrame.apply(frame=data_dynamic_dframe)
    unnested.printSchema()
    root
    |-- id: string
    |-- type: string
    |-- content.dateLastUpdated: string
    |-- content.creator: string
    |-- content.dateCreated: string
    |-- content.title: string

    12. printSchema()

    To print the Spark or Glue DynamicFrame schema in tree format use printSchema().

    datasource0.printSchema()
    
    root
    |-- ID: int
    |-- Name: string
    |-- Identity: string
    |-- Alignment: string
    |-- EyeColor: string
    |-- HairColor: string
    |-- Gender: string
    |-- Status: string
    |-- Appearances: int
    |-- FirstAppearance: choice
    |    |-- int
    |    |-- long
    |    |-- string
    |-- Year: int
    |-- Universe: string
    

    13. Fields Selection

    select_fields can be used to select fields from Glue DynamicFrame.

    # From DynamicFrame
    
    datasource0.select_fields(["Status","HairColor"]).toDF().distinct().show()

    Spark 데이터 상자에서 필드를 선택하려면 "select"-
    # From Dataframe
    
    datasource0_df.select(["Status","HairColor"]).distinct().show()

    14. 타임 스탬프
    응용 프로그램이 DynamoDB에 데이터를 쓰고 마지막으로 업데이트된 속성/열이 있다고 가정합니다.DynamoDB 이 컴퓨터에서는 날짜/시간 스탬프 데이터 유형을 지원하지 않습니다.따라서 문자열이나 숫자로 저장할 수 있습니다.숫자로 저장된 경우 일반적으로 1970년 1월 1일 UTC 00:00부터 시작되는 시간으로 기록됩니다.당신은 ISO 8601에서 "1598331963"과 유사한 내용, 즉 2020-08-25T05:06:03+00:00을 볼 수 있습니다.
    https://www.unixtimestamp.com/index.php
    어떻게 그것을 타임 스탬프로 변환합니까?
    AWS Glue DynamicFrame을 사용하여 데이터를 읽고 모드를 볼 때 "긴"데이터 형식으로 표시됩니다.
    root
    |-- version: string
    |-- item_id: string
    |-- status: string
    |-- event_type: string
    |-- last_updated: long
    마지막으로 업데이트된 긴 데이터 형식을 타임 스탬프 데이터 형식으로 변환하려면 아래의 -
    import pyspark.sql.functions as f
    import pyspark.sql.types as t
    
    new_df = (
        df
            .withColumn("last_updated", f.from_unixtime(f.col("last_updated")/1000).cast(t.TimestampType()))
    )   
    15. Spark 데이터 프레임의 임시 뷰
    Spark 데이터 프레임을 테이블로 저장하고 Spark sql을 사용하여 조회하려면createOrReplaceTempView를 사용하여 데이터 프레임을 임시 보기로 변환할 수 있습니다. 이 임시 보기는 Spark 세션에만 적용됩니다.
    df = spark.createDataFrame(
        [
            (1, ['a', 'b', 'c'], 90.00),
            (2, ['x', 'y'], 99.99),
        ],
        ['id', 'event', 'score'] 
    )
    
    df.printSchema()
    root
     |-- id: long (nullable = true)
     |-- event: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- score: double (nullable = true)
    
    df.createOrReplaceTempView("example")
    
    spark.sql("select * from example").show()
    
    +---+---------+-----+
    | id|    event|score|
    +---+---------+-----+
    |  1|[a, b, c]| 90.0|
    |  2|   [x, y]|99.99|
    +---+---------+-----+
    16. ArrayType에서 요소 추출
    위의 예시에서 마지막 이벤트만 저장할 새 속성/열을 만들려고 합니다.어떻게 할 거예요?
    함수에서 element_를 사용합니다.col이 그룹이라면, 추출할 때 주어진 색인에 있는 그룹 요소를 되돌려줍니다.col이 맵이라면, 추출된 키를 추출하는 데도 사용할 수 있습니다.
    import pyspark.sql.functions as element_at
    
    newdf = df.withColumn("last_event", element_at("event", -1))
    
    newdf.printSchema()
    root
     |-- id: long (nullable = true)
     |-- event: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- score: double (nullable = true)
     |-- last_event: string (nullable = true)
    
    newdf.show()
    +---+---------+-----+----------+
    | id|    event|score|last_event|
    +---+---------+-----+----------+
    |  1|[a, b, c]| 90.0|         c|
    |  2|   [x, y]|99.99|         y|
    +---+---------+-----+----------+
    17, 폭발
    PySpark의 분해 함수는 배열이나 맵 행의 열을 분해하는 데 사용됩니다.예를 들어 상기 예시의'사건'열을 분해해 봅시다
    from pyspark.sql.functions import explode
    
    df1 = df.select(df.id,explode(df.event))
    
    df1.printSchema()
    root
     |-- id: long (nullable = true)
     |-- col: string (nullable = true)
    
    df1.show()
    +---+---+
    | id|col|
    +---+---+
    |  1|  a|
    |  1|  b|
    |  1|  c|
    |  2|  x|
    |  2|  y|
    +---+---+
    18.getField
    구조 유형에서 필드를 이름별로 가져오려면 "getField"를 사용할 수 있습니다.
    import pyspark.sql.functions as f
    from pyspark.sql import Row
    
    from pyspark.sql import Row
    df = spark.createDataFrame([Row(attributes=Row(Name='scott', Height=6.0, Hair='black')),
                                Row(attributes=Row(Name='kevin', Height=6.1, Hair='brown'))]
    )
    
    df.printSchema()
    root
     |-- attributes: struct (nullable = true)
     |    |-- Hair: string (nullable = true)
     |    |-- Height: double (nullable = true)
     |    |-- Name: string (nullable = true)
    
    df.show()
    +-------------------+
    |         attributes|
    +-------------------+
    |[black, 6.0, scott]|
    |[brown, 6.1, kevin]|
    +-------------------+
    
    df1 = (df
          .withColumn("name", f.col("attributes").getField("Name"))
          .withColumn("height", f.col("attributes").getField("Height"))
          .drop("attributes")
          )
    
    df1.show()
    +-----+------+
    | name|height|
    +-----+------+
    |scott|   6.0|
    |kevin|   5.1|
    +-----+------+
    19.시작
    문자열에 따라 찾기 기록을 일치시키려면'startswith'를 사용할 수 있습니다.
    다음 예제에서는 설명 열의 값이 [{]로 시작하는 모든 레코드를 검색하고 있습니다.
    import pyspark.sql.functions as f
    
    df.filter(f.col("description").startswith("[{")).show()
    20. 추출 년, 월, 일, 시간
    흔히 볼 수 있는 예는 AWS Glue DynamicFrame 또는 Spark DataFrame을 단원 스타일의 섹션으로 S3에 쓰는 것이다.이를 위해 년, 월, 일, 시간을 추출하고 섹션 키로 사용하여 DynamicFrame/DataFrame을 S3에 쓸 수 있습니다.
    import pyspark.sql.functions as f
    
    df2 = (raw_df
            .withColumn('year', f.year(f.col('last_updated')))
            .withColumn('month', f.month(f.col('last_updated')))
            .withColumn('day', f.dayofmonth(f.col('last_updated')))
            .withColumn('hour', f.hour(f.col('last_updated')))            
            )

    좋은 웹페이지 즐겨찾기