가이드 - AWS Glue 및 PySpark
AWS Glue는 완벽한 관리형 추출, 변환 및 로드(ETL) 서비스로 다양한 소스로부터 대량의 데이터 세트를 처리하여 분석 및 데이터 처리를 수행합니다.
AWS 접착 작업을 만들 때 Spark, Spark Streaming, Python 셸 중에서 선택할 수 있습니다.이 작업은 AWS Glue에서 생성한 권장 스크립트, 사용자가 제공한 기존 스크립트 또는 사용자가 작성한 새 스크립트를 실행할 수 있습니다.이 외에도 다양한 모니터링 옵션, 작업 수행 능력, 시간 초과, 지연 알림 한도값, 덮어쓸 수 없고 덮어쓸 수 있는 파라미터를 선택할 수 있습니다.
풀 작업 유형 및 풀 버전
스크립트 파일 이름 및 기타 사용 가능한 옵션
AWS는 최근 Glue 2.0 버전을 발표했는데 스파크 ETL 작업 시작 시간이 10배 빠르고 비용 계산 지속 시간을 최소 10분에서 최소 1분으로 줄인 것이 특징이다.
https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration
AWS Glue를 사용하면 개발 노드를 만들고 SageMaker 또는 Zeppelin 노트북을 구성하여 Glue ETL 스크립트를 개발하고 테스트할 수 있습니다.
나는 ETL 스크립트를 작성하고 테스트하기 위해 개발 노드에 연결된 SageMaker 노트북을 만들었다.익숙한 언어에 따라 노트북을 시작할 수 있습니다.
이제 AWS Glue와 PySpark의 특정한 특성과 기능에 대해 이야기해 봅시다.
1. Spark 데이터 프레임
Spark DataFrame은 이름 열로 구성된 분산 데이터 집합입니다.그것은 개념적으로 관계 데이터베이스에 있는 표에 해당한다.RDD, csv, json, parquet 등 파일 형식에서 데이터 프레임을 만들 수 있습니다.
SageMaker Sparkmagic(PySpark) 코어 노트북을 사용하면 Spark 세션이 자동으로 생성됩니다.
데이터 프레임 만들기 -
# from CSV files
S3_IN = "s3://mybucket/train/training.csv"
csv_df = (
spark.read.format("org.apache.spark.csv")
.option("header", True)
.option("quote", '"')
.option("escape", '"')
.option("inferSchema", True)
.option("ignoreLeadingWhiteSpace", True)
.option("ignoreTrailingWhiteSpace", True)
.csv(S3_IN, multiLine=False)
)
# from PARQUET files
S3_PARQUET="s3://mybucket/folder1/dt=2020-08-24-19-28/"
df = spark.read.parquet(S3_PARQUET)
# from JSON files
df = spark.read.json(S3_JSON)
# from multiline JSON file
df = spark.read.json(S3_JSON, multiLine=True)
2.GlueContextGlueContext는 AWS Glue에서 동적 프레임워크를 읽고 쓰는 입구점입니다.이것은 아파치 스파크 SQL SQLContext 대상을 포장하여 아파치 스파크 플랫폼과 상호작용하는 메커니즘을 제공한다.
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(SparkContext.getOrCreate())
3. 동적 프레임워크
AWS Glue DynamicFrames는 SparkSQL 데이터 프레임과 유사합니다.이것은 분포식 데이터 집합을 표시하며 패턴을 지정할 필요가 없다.불일치 값과 유형을 포함하는 데이터를 읽고 변환하는 데도 사용할 수 있습니다.
다음 옵션을 사용하여 DynamicFrame을 생성할 수 있습니다.
rdd에서 동적 프레임 생성 - Apache Spark 탄성 분산 데이터 세트(rdd)에서 생성
디렉터리에서 동적 프레임워크 만들기 - 접착 디렉터리 데이터베이스와 테이블 이름으로 만들기
create_dynamic_frame_from_options - 지정된 연결 및 형식을 사용하여 생성합니다.예 - 연결 유형(예: Amazon S3, Amazon Redshift 및 JDBC
#create DynamicFame from S3 parquet files
datasource0 = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
connection_options = {
"paths": [S3_location]
},
format="parquet",
transformation_ctx="datasource0")
#create DynamicFame from glue catalog
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "demo",
table_name = "testtable",
transformation_ctx = "datasource0")
#convert to spark DataFrame
df1 = datasource0.toDF()
#convert to Glue DynamicFrame
df2 = DynamicFrame.fromDF(df1, glueContext , "df2")
추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog4. AWS 풀 작업 책갈피
AWS Glue Job bookmark은 예약된 간격으로 작업을 다시 실행할 때 증량 데이터를 처리하여 오래된 데이터를 다시 처리하지 않도록 합니다.
추가 읽기 - https://aprakash.wordpress.com/2020/05/07/implementing-glue-etl-job-with-job-bookmarks/
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
5. 데이터 쓰기
데이터 세트를 변환하는 DynamicFrame은 S3에 비분할 (기본값) 또는 분할 단위로 쓸 수 있습니다.connection_ 옵션에서 "partitionKeys"파라미터를 지정하여 데이터를 섹션으로 S3에 쓸 수 있습니다.AWS Glue는 이러한 데이터 세트를 벌집 파티션에 구성합니다.
다음 코드 예시에서 AWS Glue DynamicFrame은 년, 월, 일, 시간에 따라 구역을 나누고 벌집식 구역의 맞춤법 형식으로 S3에 기록합니다.
s3://bucket\u name/table\u name/year=2020/month=7/day=13/hour=14/part-000-671c.c000.snappy.꽃마루
S3_location = "s3://bucket_name/table_name"
datasink = glueContext.write_dynamic_frame_from_options(
frame= data,
connection_type="s3",
connection_options={
"path": S3_location,
"partitionKeys": ["year", "month", "day", "hour"]
},
format="parquet",
transformation_ctx ="datasink")
추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_options6. "glueparquet"형식 옵션
glueparquet은 성능이 최적화된 Apache parquet 컴파일러 형식으로 DynamicFrame을 컴파일하는 데 사용됩니다.동적 계산과 수정 모드입니다.
datasink = glueContext.write_dynamic_frame_from_options(
frame=dynamicframe,
connection_type="s3",
connection_options={
"path": S3_location,
"partitionKeys": ["year", "month", "day", "hour"]
},
format="glueparquet",
format_options = {"compression": "snappy"},
transformation_ctx ="datasink")
추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format.html7.S3 목록기 및 기타 메모리 관리 최적화 옵션
AWS Glue는 DynamicFrame에 데이터를 읽어들일 때 S3의 파일을 나열하는 최적화 메커니즘을 제공합니다. 추가_options 매개 변수인 "uses3ListImplementation"을 사용하여 DynamicFrame을 사용할 수 있습니다.
추가 읽기 - https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/
8, S3 경로 청소
purge\U s3\U path는 보존 기간이나 다른 필터에 따라 지정한 s3 경로에서 파일을 차례로 삭제할 수 있는 좋은 옵션입니다.예를 들어, AWS Glue 작업을 실행하고 있다면, 매일 테이블을 완전히 새로 고치고, 데이터를 S3에 기록합니다. 이름은 S3://bucketname/tablename/dt=입니다.접착 작업 자체가 정의한 보존 기간에 따라 dt=
#purge locations older than 3 days
print("Attempting to purge S3 path with retention set to 3 days.")
glueContext.purge_s3_path(
s3_path=output_loc,
options={"retentionPeriod": 72})
purge\u table,transition\u table,transition\u s3\u path 등 다른 옵션을 사용할 수 있습니다.transition_table 옵션은 지정한 디렉터리의 데이터베이스와 테이블 변환을 위해 Amazon S3에 저장된 파일의 저장 클래스입니다.추가 읽기 - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path
9.Relationalize 클래스
Relationalize 클래스는 끼워 넣은 json의 가장 바깥쪽을 펴는 데 도움을 줄 수 있습니다.
추가 읽기 - https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
10. Unbox 클래스
Unbox 클래스는 DynamicFrame의 문자열 필드를 지정한 형식 형식으로 포장하지 않도록 도와줍니다(선택 사항).
추가 읽기 - https://aprakash.wordpress.com/2020/02/26/aws-glue-querying-nested-json-with-relationalize-transform/
11. Unnest 클래스
Unnest 클래스는 중첩된 객체를 DynamicFrame의 최상위 요소로 플랫합니다.
root |-- id: string |-- type: string |-- content: map | |-- keyType: string | |-- valueType: string
With content attribute/column being map Type, we can use unnest class to unnest each key elements.
unnested = UnnestFrame.apply(frame=data_dynamic_dframe)
unnested.printSchema()
root |-- id: string |-- type: string |-- content.dateLastUpdated: string |-- content.creator: string |-- content.dateCreated: string |-- content.title: string
12. printSchema()
To print the Spark or Glue DynamicFrame schema in tree format use printSchema().
datasource0.printSchema() root |-- ID: int |-- Name: string |-- Identity: string |-- Alignment: string |-- EyeColor: string |-- HairColor: string |-- Gender: string |-- Status: string |-- Appearances: int |-- FirstAppearance: choice | |-- int | |-- long | |-- string |-- Year: int |-- Universe: string
13. Fields Selection
select_fields can be used to select fields from Glue DynamicFrame.
# From DynamicFrame
datasource0.select_fields(["Status","HairColor"]).toDF().distinct().show()
Spark 데이터 상자에서 필드를 선택하려면 "select"-
# From Dataframe
datasource0_df.select(["Status","HairColor"]).distinct().show()
14. 타임 스탬프
응용 프로그램이 DynamoDB에 데이터를 쓰고 마지막으로 업데이트된 속성/열이 있다고 가정합니다.DynamoDB 이 컴퓨터에서는 날짜/시간 스탬프 데이터 유형을 지원하지 않습니다.따라서 문자열이나 숫자로 저장할 수 있습니다.숫자로 저장된 경우 일반적으로 1970년 1월 1일 UTC 00:00부터 시작되는 시간으로 기록됩니다.당신은 ISO 8601에서 "1598331963"과 유사한 내용, 즉 2020-08-25T05:06:03+00:00을 볼 수 있습니다.
https://www.unixtimestamp.com/index.php
어떻게 그것을 타임 스탬프로 변환합니까?
AWS Glue DynamicFrame을 사용하여 데이터를 읽고 모드를 볼 때 "긴"데이터 형식으로 표시됩니다.
root
|-- version: string
|-- item_id: string
|-- status: string
|-- event_type: string
|-- last_updated: long
마지막으로 업데이트된 긴 데이터 형식을 타임 스탬프 데이터 형식으로 변환하려면 아래의 -import pyspark.sql.functions as f
import pyspark.sql.types as t
new_df = (
df
.withColumn("last_updated", f.from_unixtime(f.col("last_updated")/1000).cast(t.TimestampType()))
)
15. Spark 데이터 프레임의 임시 뷰Spark 데이터 프레임을 테이블로 저장하고 Spark sql을 사용하여 조회하려면createOrReplaceTempView를 사용하여 데이터 프레임을 임시 보기로 변환할 수 있습니다. 이 임시 보기는 Spark 세션에만 적용됩니다.
df = spark.createDataFrame(
[
(1, ['a', 'b', 'c'], 90.00),
(2, ['x', 'y'], 99.99),
],
['id', 'event', 'score']
)
df.printSchema()
root
|-- id: long (nullable = true)
|-- event: array (nullable = true)
| |-- element: string (containsNull = true)
|-- score: double (nullable = true)
df.createOrReplaceTempView("example")
spark.sql("select * from example").show()
+---+---------+-----+
| id| event|score|
+---+---------+-----+
| 1|[a, b, c]| 90.0|
| 2| [x, y]|99.99|
+---+---------+-----+
16. ArrayType에서 요소 추출위의 예시에서 마지막 이벤트만 저장할 새 속성/열을 만들려고 합니다.어떻게 할 거예요?
함수에서 element_를 사용합니다.col이 그룹이라면, 추출할 때 주어진 색인에 있는 그룹 요소를 되돌려줍니다.col이 맵이라면, 추출된 키를 추출하는 데도 사용할 수 있습니다.
import pyspark.sql.functions as element_at
newdf = df.withColumn("last_event", element_at("event", -1))
newdf.printSchema()
root
|-- id: long (nullable = true)
|-- event: array (nullable = true)
| |-- element: string (containsNull = true)
|-- score: double (nullable = true)
|-- last_event: string (nullable = true)
newdf.show()
+---+---------+-----+----------+
| id| event|score|last_event|
+---+---------+-----+----------+
| 1|[a, b, c]| 90.0| c|
| 2| [x, y]|99.99| y|
+---+---------+-----+----------+
17, 폭발PySpark의 분해 함수는 배열이나 맵 행의 열을 분해하는 데 사용됩니다.예를 들어 상기 예시의'사건'열을 분해해 봅시다
from pyspark.sql.functions import explode
df1 = df.select(df.id,explode(df.event))
df1.printSchema()
root
|-- id: long (nullable = true)
|-- col: string (nullable = true)
df1.show()
+---+---+
| id|col|
+---+---+
| 1| a|
| 1| b|
| 1| c|
| 2| x|
| 2| y|
+---+---+
18.getField구조 유형에서 필드를 이름별로 가져오려면 "getField"를 사용할 수 있습니다.
import pyspark.sql.functions as f
from pyspark.sql import Row
from pyspark.sql import Row
df = spark.createDataFrame([Row(attributes=Row(Name='scott', Height=6.0, Hair='black')),
Row(attributes=Row(Name='kevin', Height=6.1, Hair='brown'))]
)
df.printSchema()
root
|-- attributes: struct (nullable = true)
| |-- Hair: string (nullable = true)
| |-- Height: double (nullable = true)
| |-- Name: string (nullable = true)
df.show()
+-------------------+
| attributes|
+-------------------+
|[black, 6.0, scott]|
|[brown, 6.1, kevin]|
+-------------------+
df1 = (df
.withColumn("name", f.col("attributes").getField("Name"))
.withColumn("height", f.col("attributes").getField("Height"))
.drop("attributes")
)
df1.show()
+-----+------+
| name|height|
+-----+------+
|scott| 6.0|
|kevin| 5.1|
+-----+------+
19.시작문자열에 따라 찾기 기록을 일치시키려면'startswith'를 사용할 수 있습니다.
다음 예제에서는 설명 열의 값이 [{]로 시작하는 모든 레코드를 검색하고 있습니다.
import pyspark.sql.functions as f
df.filter(f.col("description").startswith("[{")).show()
20. 추출 년, 월, 일, 시간흔히 볼 수 있는 예는 AWS Glue DynamicFrame 또는 Spark DataFrame을 단원 스타일의 섹션으로 S3에 쓰는 것이다.이를 위해 년, 월, 일, 시간을 추출하고 섹션 키로 사용하여 DynamicFrame/DataFrame을 S3에 쓸 수 있습니다.
import pyspark.sql.functions as f
df2 = (raw_df
.withColumn('year', f.year(f.col('last_updated')))
.withColumn('month', f.month(f.col('last_updated')))
.withColumn('day', f.dayofmonth(f.col('last_updated')))
.withColumn('hour', f.hour(f.col('last_updated')))
)
Reference
이 문제에 관하여(가이드 - AWS Glue 및 PySpark), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/anandp86/using-aws-glue-and-pyspark-56fi텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)