가이드 - AWS Glue 및 PySpark
AWS Glue는 완벽한 관리형 추출, 변환 및 로드(ETL) 서비스로 다양한 소스로부터 대량의 데이터 세트를 처리하여 분석 및 데이터 처리를 수행합니다.
AWS 접착 작업을 만들 때 Spark, Spark Streaming, Python 셸 중에서 선택할 수 있습니다.이 작업은 AWS Glue에서 생성한 권장 스크립트, 사용자가 제공한 기존 스크립트 또는 사용자가 작성한 새 스크립트를 실행할 수 있습니다.이 외에도 다양한 모니터링 옵션, 작업 수행 능력, 시간 초과, 지연 알림 한도값, 덮어쓸 수 없고 덮어쓸 수 있는 파라미터를 선택할 수 있습니다.


AWS는 최근 Glue 2.0 버전을 발표했는데 스파크 ETL 작업 시작 시간이 10배 빠르고 비용 계산 지속 시간을 최소 10분에서 최소 1분으로 줄인 것이 특징이다.
AWS Glue를 사용하면 개발 노드를 만들고 SageMaker 또는 Zeppelin 노트북을 구성하여 Glue ETL 스크립트를 개발하고 테스트할 수 있습니다.

나는 ETL 스크립트를 작성하고 테스트하기 위해 개발 노드에 연결된 SageMaker 노트북을 만들었다.익숙한 언어에 따라 노트북을 시작할 수 있습니다.

이제 AWS Glue와 PySpark의 특정한 특성과 기능에 대해 이야기해 봅시다.
1. Spark 데이터 프레임
Spark DataFrame은 이름 열로 구성된 분산 데이터 집합입니다.그것은 개념적으로 관계 데이터베이스에 있는 표에 해당한다.RDD, csv, json, parquet 등 파일 형식에서 데이터 프레임을 만들 수 있습니다.
SageMaker Sparkmagic(PySpark) 코어 노트북을 사용하면 Spark 세션이 자동으로 생성됩니다.

데이터 프레임 만들기 -
# from CSV files
S3_IN = "s3://mybucket/train/training.csv"
csv_df = (
.option("header", True)
.option("quote", '"')
.option("escape", '"')
.option("inferSchema", True)
.option("ignoreLeadingWhiteSpace", True)
.option("ignoreTrailingWhiteSpace", True)
.csv(S3_IN, multiLine=False)
# from PARQUET files
df = spark.read.parquet(S3_PARQUET)
# from JSON files
df = spark.read.json(S3_JSON)
# from multiline JSON file
df = spark.read.json(S3_JSON, multiLine=True)
2.GlueContextGlueContext는 AWS Glue에서 동적 프레임워크를 읽고 쓰는 입구점입니다.이것은 아파치 스파크 SQL SQLContext 대상을 포장하여 아파치 스파크 플랫폼과 상호작용하는 메커니즘을 제공한다.
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(SparkContext.getOrCreate())

3. 동적 프레임워크
AWS Glue DynamicFrames는 SparkSQL 데이터 프레임과 유사합니다.이것은 분포식 데이터 집합을 표시하며 패턴을 지정할 필요가 없다.불일치 값과 유형을 포함하는 데이터를 읽고 변환하는 데도 사용할 수 있습니다.
다음 옵션을 사용하여 DynamicFrame을 생성할 수 있습니다.
rdd에서 동적 프레임 생성 - Apache Spark 탄성 분산 데이터 세트(rdd)에서 생성
디렉터리에서 동적 프레임워크 만들기 - 접착 디렉터리 데이터베이스와 테이블 이름으로 만들기
create_dynamic_frame_from_options - 지정된 연결 및 형식을 사용하여 생성합니다.예 - 연결 유형(예: Amazon S3, Amazon Redshift 및 JDBC
#create DynamicFame from S3 parquet files
datasource0 = glueContext.create_dynamic_frame_from_options(
connection_options = {
"paths": [S3_location]
#create DynamicFame from glue catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "demo",
table_name = "testtable",
transformation_ctx = "datasource0")
#convert to spark DataFrame
df1 = datasource0.toDF()
#convert to Glue DynamicFrame
df2 = DynamicFrame.fromDF(df1, glueContext , "df2")
추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog4. AWS 풀 작업 책갈피
AWS Glue Job bookmark은 예약된 간격으로 작업을 다시 실행할 때 증량 데이터를 처리하여 오래된 데이터를 다시 처리하지 않도록 합니다.
추가 읽기 - https://aprakash.wordpress.com/2020/05/07/implementing-glue-etl-job-with-job-bookmarks/
5. 데이터 쓰기
데이터 세트를 변환하는 DynamicFrame은 S3에 비분할 (기본값) 또는 분할 단위로 쓸 수 있습니다.connection_ 옵션에서 "partitionKeys"파라미터를 지정하여 데이터를 섹션으로 S3에 쓸 수 있습니다.AWS Glue는 이러한 데이터 세트를 벌집 파티션에 구성합니다.
다음 코드 예시에서 AWS Glue DynamicFrame은 년, 월, 일, 시간에 따라 구역을 나누고 벌집식 구역의 맞춤법 형식으로 S3에 기록합니다.
s3://bucket\u name/table\u name/year=2020/month=7/day=13/hour=14/part-000-671c.c000.snappy.꽃마루
S3_location = "s3://bucket_name/table_name"
datasink = glueContext.write_dynamic_frame_from_options(
frame= data,
"path": S3_location,
"partitionKeys": ["year", "month", "day", "hour"]
transformation_ctx ="datasink")
추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_options6. "glueparquet"형식 옵션
glueparquet은 성능이 최적화된 Apache parquet 컴파일러 형식으로 DynamicFrame을 컴파일하는 데 사용됩니다.동적 계산과 수정 모드입니다.
datasink = glueContext.write_dynamic_frame_from_options(
"path": S3_location,
"partitionKeys": ["year", "month", "day", "hour"]
format_options = {"compression": "snappy"},
transformation_ctx ="datasink")
추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html7.S3 목록기 및 기타 메모리 관리 최적화 옵션
AWS Glue는 DynamicFrame에 데이터를 읽어들일 때 S3의 파일을 나열하는 최적화 메커니즘을 제공합니다. 추가_options 매개 변수인 "uses3ListImplementation"을 사용하여 DynamicFrame을 사용할 수 있습니다.
추가 읽기 - https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/
8, S3 경로 청소
purge\U s3\U path는 보존 기간이나 다른 필터에 따라 지정한 s3 경로에서 파일을 차례로 삭제할 수 있는 좋은 옵션입니다.예를 들어, AWS Glue 작업을 실행하고 있다면, 매일 테이블을 완전히 새로 고치고, 데이터를 S3에 기록합니다. 이름은 S3://bucketname/tablename/dt=입니다.접착 작업 자체가 정의한 보존 기간에 따라 dt=
#purge locations older than 3 days
print("Attempting to purge S3 path with retention set to 3 days.")
options={"retentionPeriod": 72})
purge\u table,transition\u table,transition\u s3\u path 등 다른 옵션을 사용할 수 있습니다.transition_table 옵션은 지정한 디렉터리의 데이터베이스와 테이블 변환을 위해 Amazon S3에 저장된 파일의 저장 클래스입니다.추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path
9.Relationalize 클래스
Relationalize 클래스는 끼워 넣은 json의 가장 바깥쪽을 펴는 데 도움을 줄 수 있습니다.
추가 읽기 - https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
10. Unbox 클래스
Unbox 클래스는 DynamicFrame의 문자열 필드를 지정한 형식 형식으로 포장하지 않도록 도와줍니다(선택 사항).
추가 읽기 - https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
11. Unnest 클래스
Unnest 클래스는 중첩된 객체를 DynamicFrame의 최상위 요소로 플랫합니다.
root |-- id: string |-- type: string |-- content: map | |-- keyType: string | |-- valueType: string
With content attribute/column being map Type, we can use unnest class to unnest each key elements.
unnested = UnnestFrame.apply(frame=data_dynamic_dframe)
root |-- id: string |-- type: string |-- content.dateLastUpdated: string |-- content.creator: string |-- content.dateCreated: string |-- content.title: string
12. printSchema()
To print the Spark or Glue DynamicFrame schema in tree format use printSchema().
datasource0.printSchema() root |-- ID: int |-- Name: string |-- Identity: string |-- Alignment: string |-- EyeColor: string |-- HairColor: string |-- Gender: string |-- Status: string |-- Appearances: int |-- FirstAppearance: choice | |-- int | |-- long | |-- string |-- Year: int |-- Universe: string
13. Fields Selection
select_fields can be used to select fields from Glue DynamicFrame.
# From DynamicFrame

Spark 데이터 상자에서 필드를 선택하려면 "select"-
# From Dataframe

14. 타임 스탬프
응용 프로그램이 DynamoDB에 데이터를 쓰고 마지막으로 업데이트된 속성/열이 있다고 가정합니다.DynamoDB 이 컴퓨터에서는 날짜/시간 스탬프 데이터 유형을 지원하지 않습니다.따라서 문자열이나 숫자로 저장할 수 있습니다.숫자로 저장된 경우 일반적으로 1970년 1월 1일 UTC 00:00부터 시작되는 시간으로 기록됩니다.당신은 ISO 8601에서 "1598331963"과 유사한 내용, 즉 2020-08-25T05:06:03+00:00을 볼 수 있습니다.
어떻게 그것을 타임 스탬프로 변환합니까?
AWS Glue DynamicFrame을 사용하여 데이터를 읽고 모드를 볼 때 "긴"데이터 형식으로 표시됩니다.
|-- version: string
|-- item_id: string
|-- status: string
|-- event_type: string
|-- last_updated: long
마지막으로 업데이트된 긴 데이터 형식을 타임 스탬프 데이터 형식으로 변환하려면 아래의 -import pyspark.sql.functions as f
import pyspark.sql.types as t
new_df = (
.withColumn("last_updated", f.from_unixtime(f.col("last_updated")/1000).cast(t.TimestampType()))
15. Spark 데이터 프레임의 임시 뷰Spark 데이터 프레임을 테이블로 저장하고 Spark sql을 사용하여 조회하려면createOrReplaceTempView를 사용하여 데이터 프레임을 임시 보기로 변환할 수 있습니다. 이 임시 보기는 Spark 세션에만 적용됩니다.
df = spark.createDataFrame(
(1, ['a', 'b', 'c'], 90.00),
(2, ['x', 'y'], 99.99),
['id', 'event', 'score']
|-- id: long (nullable = true)
|-- event: array (nullable = true)
| |-- element: string (containsNull = true)
|-- score: double (nullable = true)
spark.sql("select * from example").show()
| id| event|score|
| 1|[a, b, c]| 90.0|
| 2| [x, y]|99.99|
16. ArrayType에서 요소 추출위의 예시에서 마지막 이벤트만 저장할 새 속성/열을 만들려고 합니다.어떻게 할 거예요?
함수에서 element_를 사용합니다.col이 그룹이라면, 추출할 때 주어진 색인에 있는 그룹 요소를 되돌려줍니다.col이 맵이라면, 추출된 키를 추출하는 데도 사용할 수 있습니다.
import pyspark.sql.functions as element_at
newdf = df.withColumn("last_event", element_at("event", -1))
|-- id: long (nullable = true)
|-- event: array (nullable = true)
| |-- element: string (containsNull = true)
|-- score: double (nullable = true)
|-- last_event: string (nullable = true)
| id| event|score|last_event|
| 1|[a, b, c]| 90.0| c|
| 2| [x, y]|99.99| y|
17, 폭발PySpark의 분해 함수는 배열이나 맵 행의 열을 분해하는 데 사용됩니다.예를 들어 상기 예시의'사건'열을 분해해 봅시다
from pyspark.sql.functions import explode
df1 = df.select(df.id,explode(df.event))
|-- id: long (nullable = true)
|-- col: string (nullable = true)
| id|col|
| 1| a|
| 1| b|
| 1| c|
| 2| x|
| 2| y|
18.getField구조 유형에서 필드를 이름별로 가져오려면 "getField"를 사용할 수 있습니다.
import pyspark.sql.functions as f
from pyspark.sql import Row
from pyspark.sql import Row
df = spark.createDataFrame([Row(attributes=Row(Name='scott', Height=6.0, Hair='black')),
Row(attributes=Row(Name='kevin', Height=6.1, Hair='brown'))]
|-- attributes: struct (nullable = true)
| |-- Hair: string (nullable = true)
| |-- Height: double (nullable = true)
| |-- Name: string (nullable = true)
| attributes|
|[black, 6.0, scott]|
|[brown, 6.1, kevin]|
df1 = (df
.withColumn("name", f.col("attributes").getField("Name"))
.withColumn("height", f.col("attributes").getField("Height"))
| name|height|
|scott| 6.0|
|kevin| 5.1|
19.시작문자열에 따라 찾기 기록을 일치시키려면'startswith'를 사용할 수 있습니다.
다음 예제에서는 설명 열의 값이 [{]로 시작하는 모든 레코드를 검색하고 있습니다.
import pyspark.sql.functions as f
20. 추출 년, 월, 일, 시간흔히 볼 수 있는 예는 AWS Glue DynamicFrame 또는 Spark DataFrame을 단원 스타일의 섹션으로 S3에 쓰는 것이다.이를 위해 년, 월, 일, 시간을 추출하고 섹션 키로 사용하여 DynamicFrame/DataFrame을 S3에 쓸 수 있습니다.
import pyspark.sql.functions as f
df2 = (raw_df
.withColumn('year', f.year(f.col('last_updated')))
.withColumn('month', f.month(f.col('last_updated')))
.withColumn('day', f.dayofmonth(f.col('last_updated')))
.withColumn('hour', f.hour(f.col('last_updated')))
이 문제에 관하여(가이드 - AWS Glue 및 PySpark), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/anandp86/using-aws-glue-and-pyspark-56fi텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)