Delta Lake 도입기
DataBricks는 데이터 처리 프레임워크인 Apache Spark와 파일 포멧 Apache Parquet 등을 개발해오면서, 클라우드 환경에 적합한 data lake란 무엇인지 다음과 같이 고민해왔다고 합니다.
- 어떻게 data를 partitioning할 것인지
- 어떻게 이상적인 파일 사이즈를 설정할지
- 어떻게 schema를 확장할 것인지
- 어떻게 compaction routin을 짤 것인지
- 어떻게 실패한 ETL job을 복구할 것인지
- 어떻게 raw data를 data lake로 흘려보낼 것인지
이러한 문제들을 해결해나가다보니 Delta Lake라는 훌륭한 오픈소스 스토리지가 탄생하였으며, Delta Lake를 사용해야할 5가지 이유 또한 Spark Submit에서 발표된 적이 있습니다. [참고]
무엇보다도 Delta Lake는 데이터가 변화한 transaction log를 보존하고 있기 때문에, ACID에 의거한 read/write가 가능하다는 장점이 있습니다.
제가 delta lake를 도입한 환경은 다음과 같습니다.
- AWS EMR + structured streaming + pyspark
- AWS S3
- AWS Athena
Delta lake를 도입하여 얻은 이점은 다음과 같습니다.
- Structured streaming을 사용하게 되면 s3에 데이터 적재 시, 작은 사이즈의 파일이 너무 많이 생성되기 때문에 읽기 성능이 매우 떨어집니다. 다행히 delta lake의 compaction 기능을 이용하여 파일 사이즈를 적절하게 조절할 수 있었습니다.
- Delta lake의 merge 기능을 이용하여 과거데이터 backfill을 빠르게 수행할 수 있었습니다.
- Append only log의 경우, data duplication 문제를 아주 손쉽게 해결할 수 있습니다.
이제 본론으로 넘어가 delta lake를 도입하기 위해 코드레벨에서 필요한 작업들을 이야기해보겠습니다.
1. Download Delta Core
먼저 Maven에서 적절한 버전의 delta-core를 다운로드 받습니다.
curl "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar" -O
2. Spark Submit with Jars
spark-submit시 다운로드 받은 jar파일도 함께 제출해줍니다.
spark-submit --master yarn --deploy-mode cluster \
--jars "delta-core_2.12-1.0.0.jar" \
...
- Python에서
delta-spark
라이브러리를 사용하고 싶다면, —py-files 옵션으로도 jar파일을 넘겨줍니다. - 혹은 어플리케이션 내에서
spark.SparkContext.addPyFile()
로 해당 jar파일 경로를 넣어주면 importing이 가능합니다.
builder = SparkSession.builder \
.config(conf=SparkConf()) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.enableHiveSupport()
spark = builder.getOrCreate()
spark.sparkContext.addPyFile("s3://ridi-emr/jars/delta-core_2.12-1.0.0.jar")
3. Spark SQL Extension
Delta Table를 조작하기 위한 Spark SQL DDL Extension를 추가해줍니다.
from pyspark.sql import SparkSession
builder = SparkSession.builder \
.config(conf=SparkConf()) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.enableHiveSupport()
spark = builder.getOrCreate()
sc = spark.sparkContext
4. Create Delta Table
- Delta Table을 만들고 초기 옵션들을 지정합니다.
- 테이블 이름을 alias하여 hive metastore에 등록하거나
- delta.
s3://your_location
와 같이 location 방식의 identifier를 이용할 수 있습니다.
delta.compatibility.symlinkFormatManifest.enabled=true
으로 설정해주면 데이터 insert가 발생할때마다 뒤에서 설명할 manifest 파일을 업데이트 할 수 있습니다.
CREATE TABLE IF NOT EXISTS delta.`s3://your_location` (
`key` STRING,
`value` STRING,
`topic` STRING,
`timestamp` TIMESTAMP,
`date` STRING
)
USING DELTA
PARTITIONED BY (date)
LOCATION 's3://your_location/'
TBLPROPERTIES (
'delta.compatibility.symlinkFormatManifest.enabled'='true'
)
5. Generate Manifest
- Athena에서 external table을 읽기 위해서는 manifest 파일이 필요합니다.
- 최초 1번은 실행해 주어야 합니다.
GENERATE symlink_format_manifest FOR TABLE **delta.`s3://your_location`**
6. Create Athena Table
- Athena에서 실시간으로 연동되는 external table을 생성합니다.
- Athena 뿐만아니라 Spark extended SQL에서도 DDL이 가능합니다.
- 이렇게 생성한 테이블은 manifest 파일을 읽어오기 때문에 Spark SQL로 DQL할 수 없습니다.
- 앞서 만든 manifest 파일의 경로를 지정해줍니다.
CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
key STRING,
value STRING,
topic STRING,
timestamp TIMESTAMP
)
PARTITIONED BY (date STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your_location/_symlink_format_manifest/'
7. Repair Athena Table
- Athena가 S3를 스캔하여 새로운 파티션이 생겼으면 이를 테이블 메타데이터에도 반영하도록 합니다.
- 새로운 파티션에 데이터가 들어오고, manifest 파일 속 파티션이 갱신된 후에 실행되어야 합니다.
- 일 배치 단위로 작동시킵니다.
- 후술할 partition projection 방법을 사용하는게 더 좋습니다.
MSCK REPAIR TABLE database.table
8. Partition Projection
- MSCK REPAIR 방식은 다음과 같은 단점이 있습니다.
- 파티션에 데이터가 한번 유입되어야 정상 작동합니다.
- 테이블을 풀스캔합니다.
- 일 배치 스케쥴링을 신경써야 합니다.
- 따라서 MSCK REPAIR 방식 대신에 좀 더 효과적으로 파티션을 갱신할 수 있는 방법을 찾아보았습니다.
- ADD PARTITION 쿼리
- MSCK REPAIR와 달리 데이터가 유입되지 않아도 파티션을 추가합니다.
- 여전히 일 단위로 쿼리를 날려주어야 합니다.
- AWS GLUE의 Partition Projection
- Hive Metastore에 테이블 property로 partition projection 옵션을 지정해주면, Athena가 metastore에서 메타데이터 정보를 검색하지 않고도 파티션을 생성할 수 있습니다.
- ADD PARTITION 쿼리
- Delta Lake + Partition Projection
- AWS GLUE에서 UI를 통해 옵션을 설정하거나,
6.Create Athena Table
에서 테이블 property로 넘겨줄 수 있습니다. - Athena는 Delta Lake의 manifest를 참조하므로, location을 parquet가 쌓이는 경로가 아닌, manifest가 업데이트되는 경로로 지정해주어야 합니다.
- AWS GLUE에서 UI를 통해 옵션을 설정하거나,
CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
key STRING,
value STRING,
topic STRING,
timestamp TIMESTAMP
)
PARTITIONED BY (date STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT
'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your_location/_symlink_format_manifest/'
TBLPROPERTIES (
'projection.enabled'='true',
'projection.date.type'='date',
'projection.date.format'='yyyy-MM-dd',
'projection.date.range'=NOW-2YEARS,NOW+1DAY',
'projection.date.interval'='1',
'projection.date.interval.unit'='DAYS'
)
9. Read and Write
Spark에서 테이블을 저장하거나 불러올 때 location 기반의 identifier를 사용해야 합니다.
- Read
spark.read.format("delta") \
.load("s3://your_location") \
.where("date='2022-04-17'")
SELECT *
FROM delta.`s3://your_location`
WHERE date='2021-11-25'
- Write
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.save("s3://your_location")
10. Compact Files
- 오픈소스 용 delta lake에서는 auto-optimization가 지원되지 않습니다.
- 대신 읽기 성능을 높히기 위해 file 개수를 줄이는 compaction 작업을 수행할 수 있습니다.
partition_clause = "date='2022-04-17'"
num_partitions = 10
spark.read.format("delta") \
.load("s3://your_location") \
.where(partition_clause) \
.repartition(num_partitions) \
.write \
.format("delta") \
.option("dataChange", "false") \
.option("replaceWhere", partition_clause) \
.mode("overwrite") \
.save("s3://your_location")
11. Vacuum
- Delta table과 연결된 location에서 더 이상 참조하지 않는 파일들을 청소합니다.
- n일이 넘은 데이터를 삭제하는데, 이때 삭제 기준은 파일의 마지막 수정 시간이 아닌 transaction log가 끊어진 시간으로부터 경과한 시간입니다.
- 현재 매우 느린 성능이 이슈가되고 있습니다.
spark.conf.set("spark.databricks.delta.vacuum.parallelDelete.enabled", "true")
설정을 추가하여 병렬 삭제가 가능합니다.
VACUUM delta.`s3://your_location` RETAIN 72 HOURS
Author And Source
이 문제에 관하여(Delta Lake 도입기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@ivoryrabbit/Delta-Lake-도입기저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)