Pyspark에서 Hive 테이블을 덮으려고 하다가 메모에 빠졌어요.

개시하다


이것은 Gulue의 Spark 작업에서 구역을 나누는 하이브 테이블을 덮어쓰려고 할 때의 비망록입니다.

하고 싶은 일.


table_path = "database_name" + "." + "table_name"
df = df.dropDulicates() 
df.write.mode("overwrite").partitionBy("xxx").insertInto(table_path, overWrite=True)
S3의 Hive 테이블을 중복 제거하기 위해
Spark DataFrame의 dropDulicates()에 따라 중복 제거된 DataFrame
하이브리드 테이블의 기록을 덮어쓰려고 할 때
다음 오류가 출력되었습니다.
java.io.FileNotFoundException: File does not exist: hdfs://folder_name/part-xxxx-xxx.snappy.parquet It is possible the underlying files have been updated. 
You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
아마 하이브의 원점이 업데이트되지 않았기 때문일 거예요.REFRESH TABLE tableName 구역 정보를 업데이트하는 설명을 했습니다.
오류 메시지 기준
실행
spark.sql("REFRESH TABLE tableName")
도 변경되지 않음
시험해 보다
spark.sql("MSCK REPAIR TABLE tableName")
이쪽이랑 다 달라진 게 없어요.

결론


Glue 작업 내에서 초기화SparkContext하기 전에SparkConf에서 다양한 설정을 통해
DataFrame의 overwrite를 사용하여 Hive 테이블을 덮어쓸 수 있습니다.

import sys
from awsglue.transforms import ApplyMapping
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf#これを追加
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, [
    'JOB_NAME'
])

JOB_NAME = args['JOB_NAME']

# SparkConfの設定
conf = SparkConf()
conf.set("spark.sql.catalogImplementation", "hive")
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
conf.set("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")

# SparkConfをSparkContextに渡して初期化
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

~~省略~~

# 重複削除したDataFrameでhiveテーブルを上書き
table_path = "database_name" + "." + "table_name"
df = df.dropDulicates() 
df.write.mode("overwrite").partitionBy("xxx").insertInto(table_path, overWrite=True)

참고 자료


sparksession에서 Hive에 접근할 수 없을 때
[분산 처리] PySpark~ 파티션 단위로 덮어쓰십시오~
How to enable or disable Hive support in spark-shell through Spark property (Spark 1.6)?
https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

좋은 웹페이지 즐겨찾기