Python과 Databricks Connect를 결합하여 사용하는 팁 및 팁

Databricks Connect 너무 좋아요.이것은 아파치 스파크의 개발을 즐거움으로 가득 채웠다.만약 그것이 없었다면, 나는 영원히 아무런 효율도 없었을 뿐만 아니라, 아주 오래 전에 스파크를 포기하지도 않았을 것이다.

왜?


Databricks Connect는 Spark의 신기한 로컬 인스턴스입니다.기계는 스파크의 로컬 설치를 사용하고 있다고 생각하지만, 실제로는 원격 Databricks 실례를 사용할 것입니다.너는 왜 그것을 원하니?일반적으로 기계에 적합하지 않은 대형 데이터 집합에 접근해야 하고, 데이터를 처리하기 위해 더 큰 계산 집단이 필요하기 때문이다.
전통적으로 로컬에 작은 데이터 서브집합을 가지고 이를 처리한 다음에 원격 배치를 통해 서로 다른 데이터 집합에서 실행할 수 있지만 이것은 항상 가능한 것이 아니라 사용자가 사용하는 데이터에 달려 있다.

설치 중


나는 네가 이미 Miniconda(또는 Anaconda, 만약 네 컴퓨터에 관심이 없다면)을 설치했다고 가정한다면, 너는 공식 Databricks Connect docs의 기본 절차에 따라 조작할 수 있다.
Jupyter를 설치했다고 가정해 보겠습니다.없으면 conda-conda install -c conda-forge notebook에서 설치하십시오.기본 환경에 설치하는 것이 좋습니다.
아직 설치되지 않은 경우 JDK 8을 다운로드하여 설치하십시오.Spark는 Java 8에서 실행되므로 더 높은 버전을 사용하지 마십시오.또한 x64 버전의 SDK를 설치하고 있는지 확인합니다.binPATH에 추가하고 JAVA_HOME 환경 변수를 생성합니다.윈도우즈에서 이것은 일반적으로 다음과 같이 보인다.
PATH=C:\Program Files (x86)\Java\jdk1.8.0_261\bin
JAVA_HOME=C:\Program Files (x86)\Java\jdk1.8.0_261
python 버전 3.7을 사용하여 원본과 같은 3.5이 아닌 Conda 환경을 만듭니다.
conda create --name dbconnect python=3.7
환경 활성화
conda activate dbconnect
도구 v6.6 설치:
pip install -U databricks-connect==6.6.*
클러스터는 블록 연결을 위해 두 개의 변수를 구성해야 합니다.
  • spark.databricks.service.server.enabledtrue으로 설정해야 합니다.
  • spark.databricks.service.port은 포트로 설정해야 합니다(나중에 필요).

  • 이제 원하는 IDE에서 Databricks Connect를 사용할 수 있어야 합니다.

    Jupyter 터무니없어.


    나는 내가 만든 모든 환경에서 복제하는 것이 아니라 기본 conda 환경에 Jupyter를 설치하고 싶다.이 경우 Jupyter 서버를 실행할 때 새로 만든 Databricks Connect 환경이 표시되지 않습니다. 자동으로 새 환경을 선택하지 않기 때문입니다.
    이 문제를 해결하려면 Databricks Connect 환경에 ipykernel(Jupyter 코어 통합)을 설치하십시오.
    conda install ipykernel
    
    현재 Jupyter 환경을 커널로 추가해야 함을 나타냅니다.
    python -m ipykernel install --user --name dbconnect --display-name "Databricks Connect (dbconnect)"
    
    Jupyter가 설치된 기본 환경으로 돌아가서 다시 시작합니다.
    conda activate base
    jupyter kernel
    
    커널이 목록에 표시됩니다.

    Jupyter 힌트

    DataFrame을 설정하면 자동 spark.conf.set("spark.sql.repl.eagerEval.enabled", True) 디스플레이 기능을 사용할 수 있습니다.
    extensions available을 설치하여 Jupyter 기능을 확장할 수 있습니다.
    Jupyter 모양과 느낌을 사용자 정의하려면 this repository을 사용하십시오.

    여러 Databricks 클러스터 사용


    모든 매개 변수 (그룹 id 포함) 를 지정해서 명령줄에서 데이터bricksconnect를 설정하면 다시 설정하지 않으면 이 그룹에 연결됩니다.다른 집단을 사용하려면 새로운conda 환경을 만들고 다시 설정할 수 있습니다.단, 모든 집단이 같은 데이터bricks 작업 구역에 있다면, 다음과 같은 기교를 사용하여 집단 사이를 전환할 수 있습니다.
    먼저 코드의 어딘가에 그룹 맵을 만듭니다.
    clusters = {
        "dev": {
            "id": "cluster id",
            "port": "port"
        },
        "prod": {
            "id": "cluster id",
            "port": "port"
        }
    }
    
    노트북에서 호출할 수 있는 함수를 작성합니다.
    def use_cluster(cluster_name: str):
        """
        When running via Databricks Connect, specify to which cluster to connect instead of the default cluster.
        This call is ignored when running in Databricks environment.
        :param cluster_name: Name of the cluster as defined in the beginning of this file.
        """
        real_cluster_name = spark.conf.get("spark.databricks.clusterUsageTags.clusterName", None)
    
        # do not configure if we are already running in Databricks
        if not real_cluster_name:
            cluster_config = clusters.get(cluster_name)
            log.info(f"attaching to cluster '{cluster_name}' (id: {cluster_config['id']}, port: {cluster_config['port']})")
    
            spark.conf.set("spark.driver.host", "127.0.0.1")
            spark.conf.set("spark.databricks.service.clusterId", cluster_config["id"])
            spark.conf.set("spark.databricks.service.port", cluster_config["port"])
    
    
    그룹 이름을 맵에서 use_cluster으로 전달합니다. 이것은 코드를 실행하기 전에 적당한 그룹을 선택할 것입니다.호출을 Databricks 노트북에 저장할 수 있다는 장점이 있습니다. 호출이 환경에서 실행될 때 무시되기 때문입니다.

    로컬 및 원격


    노트북이 로컬에서 실행되는지 데이터베이스에서 실행되는지 확인


    이 팁은 displayHTML과 같은 Databricks의 특정 함수 중 하나가 IPython 사용자 이름 공간에 있는지 확인하는 것입니다.
    def _check_is_databricks() -> bool:
        user_ns = ip.get_ipython().user_ns
        return "displayHTML" in user_ns
    

    Spark 세션 가져오기


    Databricks 노트북에서 spark 변수를 자동으로 초기화하므로 이 변수를 반환할지 아니면 새 로컬 세션을 생성할지 결정할 수 있습니다.
    def _get_spark() -> SparkSession:
        user_ns = ip.get_ipython().user_ns
        if "spark" in user_ns:
            return user_ns["spark"]
        else:
            spark = SparkSession.builder.getOrCreate()
            user_ns["spark"] = spark
            return spark
    

    거짓 표시 기능


    Databricks는 데이터 프레임을 나타내는 데 좋은 display() 함수를 가지고 있습니다.우리는 현지에 없지만, 우리는 가장할 수 있다.
    def _get_display() -> Callable[[DataFrame], None]:
        fn = ip.get_ipython().user_ns.get("display")
        return fn or _display_with_json
    

    DBUtils 회사


    실행 위치에 따라 dbutils의 로컬 인스턴스를 만들거나 Databricks에서 실행할 때 미리 초기화할 수 있습니다.
    def _get_dbutils(spark: SparkSession):
        try:
            from pyspark.dbutils import DBUtils
            dbutils = DBUtils(spark)
        except ImportError:
            import IPython
            dbutils = IPython.get_ipython().user_ns.get("dbutils")
            if not dbutils:
                log.warning("could not initialise dbutils!")
        return dbutils
    

    그것들을 한데 모으다


    위의 모든 내용을 하나의 python 파일에 넣을 수 있습니다. 모든 노트북에서 인용할 수 있습니다. 어디서든 실행할 수 있습니다.dbconnect.py :
    from typing import Any, Tuple, Callable
    
    from pyspark.sql import SparkSession, DataFrame
    import logging
    import IPython as ip
    from pyspark.sql.types import StructType, ArrayType
    import pyspark.sql.functions as f
    
    clusters = {
        "dev": {
            "id": "cluster id",
            "port": "port"
        },
        "prod": {
            "id": "cluster id",
            "port": "port"
        }
    }
    
    # Logging
    
    class SilenceFilter(logging.Filter):
        def filter(self, record: logging.LogRecord) -> int:
            return False
    
    
    logging.basicConfig(format="%(asctime)s|%(levelname)s|%(name)s|%(message)s", level=logging.INFO)
    logging.getLogger("py4j.java_gateway").addFilter(SilenceFilter())
    log = logging.getLogger("dbconnect")
    
    def _check_is_databricks() -> bool:
        user_ns = ip.get_ipython().user_ns
        return "displayHTML" in user_ns
    
    
    def _get_spark() -> SparkSession:
        user_ns = ip.get_ipython().user_ns
        if "spark" in user_ns:
            return user_ns["spark"]
        else:
            spark = SparkSession.builder.getOrCreate()
            user_ns["spark"] = spark
            return spark
    
    
    def _display(df: DataFrame) -> None:
        df.show(truncate=False)
    
    
    def _display_with_json(df: DataFrame) -> None:
        for column in df.schema:
            t = type(column.dataType)
            if t == StructType or t == ArrayType:
                df = df.withColumn(column.name, f.to_json(column.name))
        df.show(truncate=False)
    
    
    def _get_display() -> Callable[[DataFrame], None]:
        fn = ip.get_ipython().user_ns.get("display")
        return fn or _display_with_json
    
    
    def _get_dbutils(spark: SparkSession):
        try:
            from pyspark.dbutils import DBUtils
            dbutils = DBUtils(spark)
        except ImportError:
            import IPython
            dbutils = IPython.get_ipython().user_ns.get("dbutils")
            if not dbutils:
                log.warning("could not initialise dbutils!")
        return dbutils
    
    
    # initialise Spark variables
    is_databricks: bool = _check_is_databricks()
    spark: SparkSession = _get_spark()
    display = _get_display()
    dbutils = _get_dbutils(spark)
    
    
    def use_cluster(cluster_name: str):
        """
        When running via Databricks Connect, specify to which cluster to connect instead of the default cluster.
        This call is ignored when running in Databricks environment.
        :param cluster_name: Name of the cluster as defined in the beginning of this file.
        """
        real_cluster_name = spark.conf.get("spark.databricks.clusterUsageTags.clusterName", None)
    
        # do not configure if we are already running in Databricks
        if not real_cluster_name:
            cluster_config = clusters.get(cluster_name)
            log.info(f"attaching to cluster '{cluster_name}' (id: {cluster_config['id']}, port: {cluster_config['port']})")
    
            spark.conf.set("spark.driver.host", "127.0.0.1")
            spark.conf.set("spark.databricks.service.clusterId", cluster_config["id"])
            spark.conf.set("spark.databricks.service.port", cluster_config["port"])
    
    
    노트북에서:
    from dbconnect import spark, dbutils, use_cluster, display
    
    use_cluster("dev")
    
    # ...
    
    df = spark.table("....")    # use spark variable
    
    display(df) # display DataFrames
    
    # etc...
    

    메모리 부족 문제 해결


    Databricks Connect를 사용할 때 일반적으로 Java Heap Space etc. etc. etc.과 같은 오류가 발생할 수 있습니다.이것은 로컬 spark 노드 (드라이버) 의 메모리가 부족하다는 것을 의미할 뿐, 기본적으로 메모리는 2Gb입니다.만약 당신이 더 많은 메모리를 필요로 한다면, 쉽게 증가할 것이다.
    먼저 PySpark의 홈 디렉토리가 어디에 있는지 확인합니다.
    ❯ databricks-connect get-spark-home
    c:\users\ivang\miniconda3\envs\hospark\lib\site-packages\pyspark
    
    이것은 하위 폴더 conf이 있어야 합니다. 존재하지 않으면 생성합니다.및 파일 spark-defaults.conf(없는 경우 다시 생성).전체 파일 경로는 c:\users\ivang\miniconda3\envs\hospark\lib\site-packages\pyspark\conf\spark-defaults.conf입니다.행 추가
    spark.driver.memory 8g
    
    그러면 드라이버 메모리가 8GB로 증가합니다.

    작업 모니터링


    불행히도 나는 DBC 환경에서 작업을 감시할 좋은 방법을 찾지 못했다.IntelliJ에는 Big Data Tools 플러그인이 있습니다. 이론적으로 스파크 작업 모니터링을 지원합니다. DBC가 실행하는 가상 로컬 집단을 고려하면 작동할 수 있다고 생각합니다.그러나 어떤 설정에서도 운이 없다.
    작업을 감시하는 가장 좋은 방법은 연결된 Databrick에 집단의 Spark UI를 사용하는 것이다.
    본고는 최초로 on my blog에 발표되었다.

    좋은 웹페이지 즐겨찾기