빠른 팁: Apache Iceberg Data Lake에 SingleStoreDB 추가

추상적인



이 짧은 기사에서는 Apache Spark를 사용하여 Apache Iceberg Data Lake를 생성하는 방법을 보여줍니다. 그런 다음 SingleStoreDB를 믹스에 추가합니다. 우리는 Deepnote를 개발 환경으로 사용할 것입니다.

소개



엔터프라이즈 환경에서는 서로 다른 시스템의 데이터를 함께 사용해야 합니다. 이 간단한 예에서는 Apache Spark 데이터 프레임을 Apache Iceberg와 SingleStoreDB를 연결하는 방법으로 사용하는 방법을 살펴보겠습니다.

딥노트 계정 만들기



Deepnote 웹사이트에 무료account를 생성해 드립니다. 로그인하면 새로운 Deepnote 프로젝트를 생성하여 새 노트북을 제공합니다. 또한 세 개의 폴더( jars , datawarehouse )를 만들어야 합니다.
jars 폴더에 다음 파일을 저장합니다.
  • SingleStore JDBC Client
  • SingleStore Spark Connector
  • Spray JSON
  • Iceberg Spark Runtime
  • data 폴더에 CSV을 포함하는 Iris flower data set 파일을 저장합니다.
    warehouse 폴더는 Apache Iceberg Data Lake를 저장하는 데 사용됩니다.

    SingleStoreDB 클라우드 계정 생성



    A는 무료 SingleStoreDB Cloud 계정을 만드는 데 필요한 단계를 보여주었습니다. Iris Demo Group을 작업 공간 그룹 이름으로 사용하고 iris-demo를 작업 공간 이름으로 사용합니다. 암호와 호스트 이름을 기록해 둡니다. 마지막으로 SQL 편집기를 사용하여 새 데이터베이스를 만듭니다.

    CREATE DATABASE iris_demo;
    


    딥노트 노트북



    이제 노트북을 작성해 보겠습니다.

    아파치 스파크 설치



    먼저 Apache Spark를 설치해야 합니다.

    ! sudo apt-get update
    ! sudo mkdir -p /usr/share/man/man1
    ! sudo apt-get install -y openjdk-11-jdk
    ! pip install pyspark==3.2.1
    


    설치가 완료되면 SparkSession을 준비합니다.

    from pyspark.sql import SparkSession
    spark = (SparkSession
                .builder
                .config("spark.jars",
                    "jars/singlestore-jdbc-client-1.0.1.jar, \
                    jars/singlestore-spark-connector_2.12-4.0.0-spark-3.2.0.jar, \
                    jars/spray-json_3-1.3.6.jar, \
                    jars/iceberg-spark-runtime-3.2_2.12-0.14.1.jar"
                )
                .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
                )
                .getOrCreate()
            )
    


    다음과 같이 Spark 버전을 확인할 수 있습니다.

    spark.version
    


    출력은 다음과 같아야 합니다.

    '3.2.1'
    


    Spark 데이터 프레임 만들기



    다음으로 CSV 데이터에서 Spark Dataframe을 생성합니다.

    iris_df = spark.read.csv(
                        "data/iris.csv",
                        header = True,
                        inferSchema = True
                    )
    


    데이터를 볼 수 있습니다.

    iris_df.show(5)
    


    출력은 다음과 유사해야 합니다.

    +------------+-----------+------------+-----------+-----------+
    |sepal_length|sepal_width|petal_length|petal_width|    species|
    +------------+-----------+------------+-----------+-----------+
    |         5.1|        3.5|         1.4|        0.2|Iris-setosa|
    |         4.9|        3.0|         1.4|        0.2|Iris-setosa|
    |         4.7|        3.2|         1.3|        0.2|Iris-setosa|
    |         4.6|        3.1|         1.5|        0.2|Iris-setosa|
    |         5.0|        3.6|         1.4|        0.2|Iris-setosa|
    +------------+-----------+------------+-----------+-----------+
    only showing top 5 rows
    


    Apache Iceberg 데이터 레이크 만들기



    먼저 몇 가지 설정을 구성합니다.

    spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
    spark.conf.set("conf spark.sql.catalog.spark_catalog.type", "hive")
    spark.conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    spark.conf.set("spark.sql.catalog.local.type", "hadoop")
    spark.conf.set("spark.sql.catalog.local.warehouse", "warehouse")
    


    다음으로 Spark Dataframe에서 임시 테이블을 만듭니다.

    iris_df.createOrReplaceTempView("tempview")
    


    이미 종료된 경우 Data Lake에서 iris 테이블을 삭제합니다.

    spark.sql("""
        DROP TABLE IF EXISTS local.db.iris
    """)
    


    그런 다음 iris 테이블을 만듭니다.

    spark.sql("""
        CREATE TABLE local.db.iris
        USING iceberg
        PARTITIONED BY (species)
        AS ( SELECT *
             FROM tempview )
    """)
    


    우리는 꽃 종별로 데이터를 분할하고 있으며 그 중 세 가지가 있습니다.

    다음과 같이 테이블에 대한 자세한 정보를 얻을 수 있습니다.

    spark.sql("""
        SELECT file_path, file_format, partition, record_count
        FROM local.db.iris.files
    """).show()
    


    출력은 다음과 유사해야 합니다.

    +--------------------+-----------+-----------------+------------+
    |           file_path|file_format|        partition|record_count|
    +--------------------+-----------+-----------------+------------+
    |warehouse/db/iris...|    PARQUET|    {Iris-setosa}|          50|
    |warehouse/db/iris...|    PARQUET|{Iris-versicolor}|          50|
    |warehouse/db/iris...|    PARQUET| {Iris-virginica}|          50|
    +--------------------+-----------+-----------------+------------+
    


    이제 새 Dataframe에서 데이터의 하위 집합을 선택해 보겠습니다.

    df = spark.sql("""
            SELECT *
            FROM local.db.iris
            WHERE species = 'Iris-virginica'
    """)
    


    SingleStoreDB에 데이터 프레임 쓰기



    먼저 SingleStoreDB에 대한 연결 세부 정보를 제공합니다.

    host = "<TO DO>"
    password = "<TO DO>"
    
    port = "3306"
    cluster = host + ":" + port
    


    호스트 및 비밀번호의 <TO DO>를 SingleStoreDB 클라우드 계정의 값으로 대체합니다.

    이제 SingleStore Spark Connector에 대한 일부 매개변수를 설정합니다.

    spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
    spark.conf.set("spark.datasource.singlestore.user", "admin")
    spark.conf.set("spark.datasource.singlestore.password", password)
    spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
    


    준비가 되면 Dataframe 데이터를 SingleStoreDB에 저장할 수 있습니다.

    (df.write
       .format("singlestore")
       .option("loadDataCompression", "LZ4")
       .mode("overwrite")
       .save("iris_demo.iris")
    )
    


    SingleStoreDB Cloud에서 iris 테이블이 생성되었는지 확인하고 데이터를 쿼리할 수 있습니다.

    USE iris_demo;
    
    SELECT * FROM iris LIMIT 5;
    


    요약



    Spark Dataframes를 사용하여 Apache Iceberg의 Data Lake 데이터와 SingleStoreDB의 데이터베이스 데이터로 직접 작업할 수 있습니다. 이 예제에서는 데이터를 SingleStoreDB에 썼지만 SingleStoreDB에서 Spark Dataframe으로 기존 데이터를 검색하고 데이터를 사용하여 Apache Iceberg Data Lake에 이미 저장된 데이터로 쿼리를 수행할 수도 있습니다.

    좋은 웹페이지 즐겨찾기