Delta Lake 도입기

DataBricks는 데이터 처리 프레임워크인 Apache Spark와 파일 포멧 Apache Parquet 등을 개발해오면서, 클라우드 환경에 적합한 data lake란 무엇인지 다음과 같이 고민해왔다고 합니다.

  • 어떻게 data를 partitioning할 것인지
  • 어떻게 이상적인 파일 사이즈를 설정할지
  • 어떻게 schema를 확장할 것인지
  • 어떻게 compaction routin을 짤 것인지
  • 어떻게 실패한 ETL job을 복구할 것인지
  • 어떻게 raw data를 data lake로 흘려보낼 것인지

이러한 문제들을 해결해나가다보니 Delta Lake라는 훌륭한 오픈소스 스토리지가 탄생하였으며, Delta Lake를 사용해야할 5가지 이유 또한 Spark Submit에서 발표된 적이 있습니다. [참고]

무엇보다도 Delta Lake는 데이터가 변화한 transaction log를 보존하고 있기 때문에, ACID에 의거한 read/write가 가능하다는 장점이 있습니다.

제가 delta lake를 도입한 환경은 다음과 같습니다.

  • AWS EMR + structured streaming + pyspark
  • AWS S3
  • AWS Athena

Delta lake를 도입하여 얻은 이점은 다음과 같습니다.

  • Structured streaming을 사용하게 되면 s3에 데이터 적재 시, 작은 사이즈의 파일이 너무 많이 생성되기 때문에 읽기 성능이 매우 떨어집니다. 다행히 delta lake의 compaction 기능을 이용하여 파일 사이즈를 적절하게 조절할 수 있었습니다.
  • Delta lake의 merge 기능을 이용하여 과거데이터 backfill을 빠르게 수행할 수 있었습니다.
  • Append only log의 경우, data duplication 문제를 아주 손쉽게 해결할 수 있습니다.

이제 본론으로 넘어가 delta lake를 도입하기 위해 코드레벨에서 필요한 작업들을 이야기해보겠습니다.

1. Download Delta Core

먼저 Maven에서 적절한 버전의 delta-core를 다운로드 받습니다.

curl "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar" -O

2. Spark Submit with Jars

spark-submit시 다운로드 받은 jar파일도 함께 제출해줍니다.

spark-submit --master yarn --deploy-mode cluster \
	--jars "delta-core_2.12-1.0.0.jar" \
	...
  • Python에서 delta-spark 라이브러리를 사용하고 싶다면, —py-files 옵션으로도 jar파일을 넘겨줍니다.
  • 혹은 어플리케이션 내에서 spark.SparkContext.addPyFile()로 해당 jar파일 경로를 넣어주면 importing이 가능합니다.
builder = SparkSession.builder \
    .config(conf=SparkConf()) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport()

spark = builder.getOrCreate()

spark.sparkContext.addPyFile("s3://ridi-emr/jars/delta-core_2.12-1.0.0.jar")

3. Spark SQL Extension

Delta Table를 조작하기 위한 Spark SQL DDL Extension를 추가해줍니다.

from pyspark.sql import SparkSession

builder = SparkSession.builder \
	.config(conf=SparkConf()) \
	.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport()

spark = builder.getOrCreate()
sc = spark.sparkContext

4. Create Delta Table

  • Delta Table을 만들고 초기 옵션들을 지정합니다.
    • 테이블 이름을 alias하여 hive metastore에 등록하거나
    • delta.s3://your_location 와 같이 location 방식의 identifier를 이용할 수 있습니다.
  • delta.compatibility.symlinkFormatManifest.enabled=true 으로 설정해주면 데이터 insert가 발생할때마다 뒤에서 설명할 manifest 파일을 업데이트 할 수 있습니다.
CREATE TABLE IF NOT EXISTS delta.`s3://your_location` (
	`key` STRING,
	`value` STRING,
	`topic` STRING,
	`timestamp` TIMESTAMP,
    `date` STRING
)
USING DELTA
PARTITIONED BY (date)
LOCATION 's3://your_location/'
TBLPROPERTIES (
    'delta.compatibility.symlinkFormatManifest.enabled'='true'
)

5. Generate Manifest

  • Athena에서 external table을 읽기 위해서는 manifest 파일이 필요합니다.
  • 최초 1번은 실행해 주어야 합니다.
GENERATE symlink_format_manifest FOR TABLE **delta.`s3://your_location`**

6. Create Athena Table

  • Athena에서 실시간으로 연동되는 external table을 생성합니다.
    • Athena 뿐만아니라 Spark extended SQL에서도 DDL이 가능합니다.
    • 이렇게 생성한 테이블은 manifest 파일을 읽어오기 때문에 Spark SQL로 DQL할 수 없습니다.
  • 앞서 만든 manifest 파일의 경로를 지정해줍니다.
CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
    key STRING,
	value STRING,
	topic STRING,
	timestamp TIMESTAMP
)
PARTITIONED BY (date STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT
    'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your_location/_symlink_format_manifest/'

7. Repair Athena Table

  • Athena가 S3를 스캔하여 새로운 파티션이 생겼으면 이를 테이블 메타데이터에도 반영하도록 합니다.
  • 새로운 파티션에 데이터가 들어오고, manifest 파일 속 파티션이 갱신된 후에 실행되어야 합니다.
  • 일 배치 단위로 작동시킵니다.
  • 후술할 partition projection 방법을 사용하는게 더 좋습니다.
MSCK REPAIR TABLE database.table

8. Partition Projection

  • MSCK REPAIR 방식은 다음과 같은 단점이 있습니다.
    1. 파티션에 데이터가 한번 유입되어야 정상 작동합니다.
    2. 테이블을 풀스캔합니다.
    3. 일 배치 스케쥴링을 신경써야 합니다.
  • 따라서 MSCK REPAIR 방식 대신에 좀 더 효과적으로 파티션을 갱신할 수 있는 방법을 찾아보았습니다.
    1. ADD PARTITION 쿼리
      • MSCK REPAIR와 달리 데이터가 유입되지 않아도 파티션을 추가합니다.
      • 여전히 일 단위로 쿼리를 날려주어야 합니다.
    2. AWS GLUE의 Partition Projection
      • Hive Metastore에 테이블 property로 partition projection 옵션을 지정해주면, Athena가 metastore에서 메타데이터 정보를 검색하지 않고도 파티션을 생성할 수 있습니다.
  • Delta Lake + Partition Projection
    • AWS GLUE에서 UI를 통해 옵션을 설정하거나, 6.Create Athena Table 에서 테이블 property로 넘겨줄 수 있습니다.
    • Athena는 Delta Lake의 manifest를 참조하므로, location을 parquet가 쌓이는 경로가 아닌, manifest가 업데이트되는 경로로 지정해주어야 합니다.
CREATE EXTERNAL TABLE IF NOT EXISTS **database.table** (
    key STRING,
	value STRING,
	topic STRING,
	timestamp TIMESTAMP
)
PARTITIONED BY (date STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT
    'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your_location/_symlink_format_manifest/'
TBLPROPERTIES (
    'projection.enabled'='true',
    'projection.date.type'='date',
    'projection.date.format'='yyyy-MM-dd',
    'projection.date.range'=NOW-2YEARS,NOW+1DAY',
    'projection.date.interval'='1',
    'projection.date.interval.unit'='DAYS'
)

9. Read and Write

Spark에서 테이블을 저장하거나 불러올 때 location 기반의 identifier를 사용해야 합니다.

  • Read
spark.read.format("delta") \
    .load("s3://your_location") \
    .where("date='2022-04-17'")
SELECT *
FROM delta.`s3://your_location`
WHERE date='2021-11-25'
  • Write
df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("date") \
    .save("s3://your_location")

10. Compact Files

  • 오픈소스 용 delta lake에서는 auto-optimization가 지원되지 않습니다.
  • 대신 읽기 성능을 높히기 위해 file 개수를 줄이는 compaction 작업을 수행할 수 있습니다.
partition_clause = "date='2022-04-17'"
num_partitions = 10

spark.read.format("delta") \
    .load("s3://your_location") \
    .where(partition_clause) \
    .repartition(num_partitions) \
    .write \
    .format("delta") \
    .option("dataChange", "false") \
    .option("replaceWhere", partition_clause) \
    .mode("overwrite") \
    .save("s3://your_location")

11. Vacuum

  • Delta table과 연결된 location에서 더 이상 참조하지 않는 파일들을 청소합니다.
    • n일이 넘은 데이터를 삭제하는데, 이때 삭제 기준은 파일의 마지막 수정 시간이 아닌 transaction log가 끊어진 시간으로부터 경과한 시간입니다.
    • 현재 매우 느린 성능이 이슈가되고 있습니다.
  • spark.conf.set("spark.databricks.delta.vacuum.parallelDelete.enabled", "true") 설정을 추가하여 병렬 삭제가 가능합니다.
VACUUM delta.`s3://your_location` RETAIN 72 HOURS

좋은 웹페이지 즐겨찾기