--packages 플래그를 사용하여 Amazon EMR Serverless를 실행하는 방법

에서는 Amazon EMR Serverless에서 Delta Lake를 실행하는 방법을 보여 주었습니다. 그 이후로 --packages 플래그가 구현된 새 릴리스(6.7.0)가 나왔습니다. 이를 통해 Spark로 작업을 훨씬 쉽게 수행할 수 있습니다. 그러나 --packages 플래그에는 대부분의 데이터 과학자와 엔지니어가 익숙하지 않은 추가 네트워킹 설정이 필요합니다. 우리의 목표는 이를 수행하는 방법을 단계별로 보여주는 것입니다.

먼저 몇 가지 개념 설명



Java 종속성과 함께 Spark를 사용하는 경우 두 가지 옵션이 있습니다. (1) 클러스터에서 수동으로 .jar 파일을 빌드하고 삽입하거나 (2) 종속성을 --packages 플래그에 전달하여 Spark가 maven에서 자동으로 다운로드할 수 있도록 합니다. EMR 서버리스의 릴리스 6.7.0부터 이 플래그를 사용할 수 있습니다.

문제는 maven에서 패키지를 다운로드하려면 스파크 클러스터가 인터넷에 연결되어야 한다는 것입니다. Amazon EMR Serverless는 처음에는 VPC 외부에 있으므로 인터넷에 연결할 수 없습니다. 그렇게 하려면 VPC 내부에 EMR 애플리케이션을 생성해야 합니다. 그러나 EMR 애플리케이션은 (그런데...) 인터넷에 연결되지 않고 S3에 연결할 수 없는 프라이빗 서브넷에서만 생성할 수 있습니다. 😭... 이 문제를 어떻게 해결합니까?

1단계: 네트워킹



아래 다이어그램은 필요한 전체 네트워크 구조를 보여줍니다.



이는 VPC 인터페이스의 AWS에서 쉽게 생성됩니다. VPC 생성 버튼을 클릭하고 VPC 등을 선택합니다. AWS는 무거운 작업을 수행하고 2개의 퍼블릭 서브넷, 2개의 프라이빗 서브넷, 인터넷 게이트웨이, 필요한 라우팅 테이블 및 S3 엔드포인트(VPC 내부의 리소스가 S3에 도달할 수 있도록)가 있는 VPC에 대한 설계를 제공합니다.



원하는 경우 가용 영역 수를 1로 설정할 수 있지만 고가용성을 확보하려면 최소 2개의 AZ로 작업해야 합니다.

다음으로 프라이빗 서브넷이 인터넷에 연결되도록 하는 NAT 게이트웨이를 하나 이상 표시해야 합니다. 아래는 최종 설정 화면입니다.



VPC 만들기를 누르면 네트워킹이 완료됩니다.



마지막으로 인터넷에 대한 아웃바운드 트래픽을 허용하는 보안 그룹을 생성합니다. AWS의 VPC로 돌아가 왼쪽 패널에서 보안 그룹을 클릭합니다. 그런 다음 보안 그룹 생성을 클릭합니다. 보안 그룹의 이름을 지정하고 선택한 VPC를 선택 해제하고 방금 생성한 VPC를 선택합니다. 기본적으로 보안 그룹은 인바운드 트래픽을 허용하지 않고 모든 아웃바운드 트래픽을 허용합니다. 그대로 둘 수 있습니다. 보안 그룹 et voilà를 만드십시오!



2단계: IAM 역할 및 정책



Service Linked Role과 Access S3 및 Glue에 대한 권한을 부여하는 또 다른 역할의 두 가지 역할이 필요합니다. 설정 - 인증 섹션에서 이미 논의했습니다. 확인 해봐. 작업할 데이터 세트도 필요합니다. 유명한 Titanic 데이터 세트가 이를 수행해야 합니다. 다운로드할 수 있습니다here.

3단계: EMR Studio 및 EMR 서버리스 애플리케이션 생성



먼저 EMR Studio를 생성해야 합니다. 아직 생성된 스튜디오가 없다면 매우 간단합니다. EMR 서버리스 홈 페이지에서 시작하기를 클릭한 후 클릭하여 자동으로 스튜디오를 생성할 수 있습니다.

둘째, EMR 서버리스 애플리케이션을 생성해야 합니다. 이름을 설정하고 (기억하세요!) 릴리스 6.7.0을 선택합니다. 네트워킹을 설정하려면 사용자 지정 설정 선택을 선택하고 네트워크 연결까지 아래로 스크롤해야 합니다.



네트워크 연결에서 생성한 VPC, 두 개의 프라이빗 서브넷 및 보안 그룹을 선택합니다.



4단계: 스파크 코드



이제 데이터 세트의 일부 수정 사항을 시뮬레이션하기 위해 간단한 pyspark 코드를 준비하고 있습니다(두 명의 새로운 승객인 Ney와 Sarah를 포함할 예정이며 사망한 것으로 추정되었지만 살아 있는 것으로 발견된 두 명의 승객인 Mr. Owen Braund와 Mr.에 대한 정보를 업데이트할 것입니다). . 윌리엄 앨런). 아래는 그렇게 하기 위한 코드입니다.

from pyspark.sql import functions as f
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

from delta.tables import *

print("Reading CSV file from S3...")

schema = "PassengerId int, Survived int, Pclass int, Name string, Sex string, Age double, SibSp int, Parch int, Ticket string, Fare double, Cabin string, Embarked string"
df = spark.read.csv(
    "s3://<YOUR-BUCKET>/titanic", 
    header=True, schema=schema, sep=";"
)

print("Writing titanic dataset as a delta table...")
df.write.format("delta").save("s3://<YOUR-BUCKET>/silver/titanic_delta")

print("Updating and inserting new rows...")
new = df.where("PassengerId IN (1, 5)")
new = new.withColumn("Survived", f.lit(1))
newrows = [
    (892, 1, 1, "Sarah Crepalde", "female", 23.0, 1, 0, None, None, None, None),
    (893, 0, 1, "Ney Crepalde", "male", 35.0, 1, 0, None, None, None, None)
]
newrowsdf = spark.createDataFrame(newrows, schema=schema)
new = new.union(newrowsdf)

print("Create a delta table object...")
old = DeltaTable.forPath(spark, "s3://<YOUR-BUCKET>/silver/titanic_delta")


print("UPSERT...")
# UPSERT
(
    old.alias("old")
    .merge(new.alias("new"), 
    "old.PassengerId = new.PassengerId"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

print("Checking if everything is ok")
print("New data...")

(
    spark.read.format("delta")
    .load("s3://<YOUR-BUCKET>/silver/titanic_delta")
    .where("PassengerId < 6 OR PassengerId > 888")
    .show()
)

print("Old data - with time travel")
(
    spark.read.format("delta")
    .option("versionAsOf", "0")
    .load("s3://<YOUR-BUCKET>/silver/titanic_delta")
    .where("PassengerId < 6 OR PassengerId > 888")
    .show()
)


.py 파일을 S3에 업로드해야 합니다.

5단계: GO!



이제 실행할 작업을 제출합니다. AWS CLI로 할 수 있습니다.

aws emr-serverless start-job-run \
--name Delta-Upsert \
--application-id <YOUR-APPLICATION-ID> \
--execution-role-arn arn:aws:iam::<ACCOUNT-NUMBER>:role/EMRServerlessJobRole \
--job-driver '{
  "sparkSubmit": {
    "entryPoint": "s3://<YOUR-BUCKET>/pyspark/emrserverless_delta_titanic.py", 
    "sparkSubmitParameters": "--packages io.delta:delta-core_2.12:2.0.0"
  }
}' \
--configuration-overrides '{
"monitoringConfiguration": {
  "s3MonitoringConfiguration": {
    "logUri": "s3://<YOUR-BUCKET>/emr-serverless-logs/"} 
  } 
}'


그게 다야! 작업이 완료되면 로그 폴더로 이동하여 로그를 확인합니다(응용 프로그램 ID, 작업 ID 및 SPARK_DRIVER 로그를 찾습니다). 다음과 같은 내용이 표시되어야 합니다.

Reading CSV file from S3...
Writing titanic dataset as a delta table...
Updating and inserting new rows...
Create a delta table object...
UPSERT...
Checking if everything is ok
New data...
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       1|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       1|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S|
|        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C|
|        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q|
|        892|       1|     1|      Sarah Crepalde|female|23.0|    1|    0|            null|   null| null|    null|
|        893|       0|     1|        Ney Crepalde|  male|35.0|    1|    0|            null|   null| null|    null|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+

Old data - with time travel
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S|
|        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C|
|        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+


행복한 코딩과 구축!

좋은 웹페이지 즐겨찾기