사용자 코드 배포가 있는 Dagster(gRPC)

하면, 만약, 만약...


Dagster는 기계 학습, 분석, ETL에 사용되는 소스 데이터 컴파일러로 파이프를 더욱 믿음직하고 튼튼하게 합니다.프로젝트 사이트: https://dagster.io/.
이 글은 주로 Dagster를 사용한 사람들, 특히 사용자 코드 배치의 구체적인 예시를 원하는 사람들을 대상으로 한다.

동기


저는 Genthttps://www.tengu.io/의 초창기 회사에서 데이터 엔지니어를 맡고 있습니다. 그곳에서 우리는 끊임없이 새로운 방법을 찾아 우리의 플랫폼과 데이터 파이프라인을 개선합니다.
우리는 Dagster가 아주 어렸을 때 우연히 그것을 발견했다. Github에는 약 40개의 별이 있다.우리는 그것으로 몇 가지 실험을 했지만, 그것은 우리의 수요를 만족시키지 못했다. 주로 우리가 서로 다른 배치/서비스에서 코드를 분리할 수 없기 때문이다.이와 동시에 Dagster는 이미 성숙했다(Github에는 2.2k개의 별이 있다!)그에 따른 것은 사용자 코드 배치다.
사용자 코드 배포를 사용하면 파이프 코드를 Dagit 이미지와 분리할 수 있습니다.이것은 전체 Dagster 시스템을 재배치할 필요가 없이 사용자 코드를 업데이트할 수 있다는 것을 의미합니다!
모든 배치에는 별도의 코드 저장소가 있을 수 있다.이것은 조직 내의 독립 단체가 자신의 이미지를 관리할 수 있도록 한다.
이 글은 Dagster's official documentation 옆에 있는 추가 정보로 사용할 수 있다.연습은 네 가지 부분을 포함한다.
  • Dagster 파이프가 있는 저장소를 포함하는 docker 이미지를 만듭니다.
  • Helm 값을 구성합니다.yaml 파일.
  • 투구가 달린 Dagster의 값을 설정합니다.두 번째 yaml.
  • Dagit 및/또는 GraphQL을 사용하여 파이프를 실행합니다.
  • 가자!

    선결 조건

  • 키가 있는 쿠베르니테스 성단을 설치한다.
  • Kubernetes 클러스터에서 액세스할 수 있는 Docker 레지스트리입니다.
  • (Dagster에 대한 경험이 있습니다.)
  • 1. 사용자 코드가 있는 Docker 이미지


    우선, 사용자 코드, 즉 Dagster 파이프가 있는 Dagster 저장소를 포함하는 docker 이미지를 만들 것입니다.이것은 Dagit 인스턴스에 사용할 이미지가 아닙니다.
    이것은 사용자 코드를 Dagit/Dagster 시스템 코드에서 분리하는 데 두 가지 장점을 가진다.
  • 안정성이 증가한다.
  • 조직 내의 독립된 팀이 자신의 이미지를 관리할 수 있도록 한다.
  • 상호 의존 관계를 줄이다.
  • 저장소 구조
    새 프로젝트를 시작하여python 파일 두 개(celery_pipeline.py,repos.py), yaml 파일 (workspace.yaml), Dockerfile 하나를 만듭니다.모든 파일은 같은 디렉토리에 있어야 합니다.
    project
    │   
    └───image
        │   celery_pipeline.py
        |   repos.py
        │   workspace.yaml
        |   Dockerfile
    
    이것은 우리의 파이프 파일입니다. 같은solid를 다섯 번 실행할 것입니다.본고의 뒤에 Dagster를 설치할 것입니다. 기본값(사용자 코드 배치 제외)을 사용합니다. 이것은 우리가 파이프를 실행할 미나리가 있다는 것을 의미합니다.다음 코드를 celery_pipeline.py 파일에 붙여넣습니다.
    미나리 파이프.회사 명
    """
    A basic pipeline that can be executed with Celery.
    """
    from dagster_celery_k8s import celery_k8s_job_executor
    from dagster import ModeDefinition, default_executors, pipeline, solid
    
    celery_mode_defs = [ModeDefinition(executor_defs=default_executors + [celery_k8s_job_executor])]
    
    @solid
    def not_much(_):
        return
    
    @pipeline(mode_defs=celery_mode_defs)
    def parallel_pipeline():
        for i in range(5):
            not_much.alias("not_much_" + str(i))()
    
    이제 파이프를 포함하는 저장소를 만들 것입니다.이 코드를 repos.py에 추가합니다.
    환매 협의회사 명
    """
    Simple repository that contains our parallel pipeline.
    """
    import sys
    
    from dagster import repository
    from dagster.utils import script_relative_path
    
    sys.path.append(script_relative_path("."))
    
    from celery_pipeline import parallel_pipeline
    
    @repository
    def example_repository():
        return [parallel_pipeline]
    
    작업공간에서는 Dagster에 있는 저장소와 로드할 위치를 알려 줍니다.다음 코드를 복사하여 workspace.yaml 파일에 붙여넣습니다.
    작업공간아마르
    # Yaml for loading our single repository from our repos.py file:
    load_from:
      - python_file: repos.py
    
    마지막으로, 우리는 Dockerfile을 가지고 있으며, 사용자 코드 이미지를 구축하는 데 사용할 것입니다.Dockerfiles가 어떻게 작동하는지 알고 싶어요.이것은 두 가지 일을 할 수 있다.
  • 필요한 Dagster 라이브러리를 설치합니다.
  • 우리의 사용자 코드를 루트 디렉터리로 복사합니다(COPY . /.
  • Dockerfile에 다음 문장을 추가합니다.
    Dockerfile 파일
    FROM python:3.7.8-slim
    
    # This tutorial was written using Dagster 0.9.12
    ARG DAGSTER_VERSION=0.9.12
    
    RUN apt-get update -yqq && \
        apt-get install -yqq cron
    
    RUN pip install \
        dagster==${DAGSTER_VERSION} \
        dagster-graphql==${DAGSTER_VERSION} \
        dagster-postgres==${DAGSTER_VERSION} \
        dagster-cron==${DAGSTER_VERSION} \
        dagster-celery[flower,redis,kubernetes]==${DAGSTER_VERSION} \
        dagster-aws==${DAGSTER_VERSION} \
        dagster-k8s==${DAGSTER_VERSION} \
        dagster-celery-k8s==${DAGSTER_VERSION} \
        dagit==${DAGSTER_VERSION}
    
    COPY . /
    

    이미지 구축 및 푸시


    이미지 디렉터리로 이동하여docker 이미지를 구축합니다.Kubernetes 클러스터에서 액세스할 수 있는 docker 레지스트리를 사용합니다.
    # F.e. docker build -t us.gcr.io/company-12345/user_code:0.1 .
    docker build -t YOUR_REGISTRY/user_code:0.1 .
    docker push YOUR_REGISTRY/user_code:0.1
    

    2. 가치관.아마르


    다음은 Dagster를 배포할 때 Helm에 제공할 파일values.yaml을 구성합니다.
    다운로드default values.yaml file 및 사용자 코드 배포를 지원하기 위해 다음과 같은 변경 사항을 수행합니다.
    userDeployments:
      # Whether launching user deployments is enabled.
      enabled: true
      # List of unique deployments using images that contain your
      # user code.
      deployments:
        - name: "k8s-example-user-code-1"
          image:
            # Use the image that you created in the previous step, without the tag
            # F.e. us.gcr.io/company-12345/user_code
            repository: "YOUR_REGISTRY/user_code"
            tag: 0.1
          # Make sure these arguments are the same as your repository file name
          # and repository name.
          # Arguments to `dagster api grpc`.
          dagsterApiGrpcArgs:
            - "--python-file"
            - "repos.py"
            - "--attribute"
            - "example_repository"
    

    3. Dagster 설치


    새 네임스페이스 만들기 시작:
    kubectl create namespace dagster-walkthrough
    
    이제 Helm을 사용하여 Dagster와 Dagit을 Kubernetes 클러스터에 배치할 준비가 되었습니다.GKE 중부에 배치했습니다.)
    helm repo add dagster https://dagster-io.github.io/helm
    helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml
    
    시간을 주면 이런 것을 볼 수 있다.

    문제가 발생한 경우 다시 시작하려면 전체 버전을 삭제하고 다시 설치하는 것이 좋습니다.
    helm delete dagster -n dagster-walkthrough
    helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml
    

    4. 파이프라인 경로설정


    현재 우리는 이미 모든 설치를 마쳤으니, 우리는 파이프를 운행하기 시작할 수 있다.UI(Dagit) 또는 GraphQL을 사용하여 파이핑을 실행하는 방법을 보여 드리고 싶습니다.

    다지트


    Dagit UI로 이동하여 샘플 저장소에서 병렬 파이핑을 선택합니다.놀이공원 옵션 카드를 클릭하여 다음 달리기 설정을 추가합니다.
    실행 구성
    execution:
      celery-k8s:
        config:
          job_namespace: dagster-walkthrough
          env_config_maps:
            - "dagster-pipeline-env"
          image_pull_policy: "Always"
    storage:
      filesystem:
    
    오른쪽 아래에 있는'시작 실행'을 누르십시오.Dagster는 파이프 운행 계획을 훑어보고 미나리 대기열에 절차를 제출하는 실행 조율기 Kubernetes 작업을 시작합니다.
    단계 실행은 미나리 일꾼이 맡고 단계마다 Kubernetes 작업이 생성됩니다.다음과 같은 Kubernetes 리소스가 있어야 합니다.

    실행 결과
    Dagit에서 파이프의 진행 상황을 감시할 수 있습니다. 다섯 줄을 표시해야 합니다.각 행은 다음 작업을 나타냅니다.

    그래픽 QL


    GraphQL 실행 파이프를 사용하려면 GraphQL 끝점http://localhost/graphql에 요청을 보내거나 운동장에서 실행할 수 있습니다.
    mutation ExecutePipeline(
      $repositoryLocationName: String!
      $repositoryName: String!
      $pipelineName: String!
      $runConfigData: RunConfigData!
      $mode: String!
    ) {
      launchPipelineExecution(
        executionParams: {
          selector: {
            repositoryLocationName: $repositoryLocationName
            repositoryName: $repositoryName
            pipelineName: $pipelineName
          }
          runConfigData: $runConfigData
          mode: $mode
        }
      ) {
        __typename
        ... on LaunchPipelineRunSuccess {
          run {
            runId
          }
        }
        ... on PipelineConfigValidationInvalid {
          errors {
            message
            reason
          }
        }
        ... on PythonError {
          message
        }
      }
    }
    
    쿼리 변수
    {
      "repositoryName": "example_repository",
      "repositoryLocationName": "k8s-example-user-code-1",
      "pipelineName": "parallel_pipeline",
      "mode": "default",
      "runConfigData": {
        "execution": {
          "celery-k8s": {
            "config": {
              "job_namespace": "dagster-walkthrough",
              "env_config_maps": [
                "dagster-pipeline-env"
              ],
              "image_pull_policy": "Always"
            }
          }
        },
        "storage": {
          "filesystem": null
        }
      }
    }
    

    결론


    이 글을 통해 저는 Dagster와 사용자 코드 배치를 설정하는 데 시간을 절약할 수 있기를 바랍니다.만약 당신에게 무슨 문제가 있으면 마음대로 평론해 주십시오. 나는 최선을 다해 당신을 도울 것입니다.
    이것은 저의 첫 번째 기술 블로그 글입니다. 당신의 건설적인 피드백에 매우 감사합니다.읽어주셔서 감사합니다!

    리소스


    https://docs.dagster.io/deploying/celery
    https://docs.dagster.io/deploying/k8s_part2
    https://docs.dagster.io/overview/graphql-api#launch-a-pipeline-run
    https://docs.dagster.io/_apidocs/libraries/dagster_celery_k8s#dagster_celery_k8s.celery_k8s_job_executor
    https://github.com/dagster-io/dagster/blob/master/helm/dagster/values.yaml

    좋은 웹페이지 즐겨찾기