사용자 코드 배포가 있는 Dagster(gRPC)
21466 단어 dagsterkubernetesdataengineeringetl
하면, 만약, 만약...
Dagster는 기계 학습, 분석, ETL에 사용되는 소스 데이터 컴파일러로 파이프를 더욱 믿음직하고 튼튼하게 합니다.프로젝트 사이트: https://dagster.io/.
이 글은 주로 Dagster를 사용한 사람들, 특히 사용자 코드 배치의 구체적인 예시를 원하는 사람들을 대상으로 한다.
동기
저는 Genthttps://www.tengu.io/의 초창기 회사에서 데이터 엔지니어를 맡고 있습니다. 그곳에서 우리는 끊임없이 새로운 방법을 찾아 우리의 플랫폼과 데이터 파이프라인을 개선합니다.
우리는 Dagster가 아주 어렸을 때 우연히 그것을 발견했다. Github에는 약 40개의 별이 있다.우리는 그것으로 몇 가지 실험을 했지만, 그것은 우리의 수요를 만족시키지 못했다. 주로 우리가 서로 다른 배치/서비스에서 코드를 분리할 수 없기 때문이다.이와 동시에 Dagster는 이미 성숙했다(Github에는 2.2k개의 별이 있다!)그에 따른 것은 사용자 코드 배치다.
사용자 코드 배포를 사용하면 파이프 코드를 Dagit 이미지와 분리할 수 있습니다.이것은 전체 Dagster 시스템을 재배치할 필요가 없이 사용자 코드를 업데이트할 수 있다는 것을 의미합니다!
모든 배치에는 별도의 코드 저장소가 있을 수 있다.이것은 조직 내의 독립 단체가 자신의 이미지를 관리할 수 있도록 한다.
이 글은 Dagster's official documentation 옆에 있는 추가 정보로 사용할 수 있다.연습은 네 가지 부분을 포함한다.
저는 Genthttps://www.tengu.io/의 초창기 회사에서 데이터 엔지니어를 맡고 있습니다. 그곳에서 우리는 끊임없이 새로운 방법을 찾아 우리의 플랫폼과 데이터 파이프라인을 개선합니다.
우리는 Dagster가 아주 어렸을 때 우연히 그것을 발견했다. Github에는 약 40개의 별이 있다.우리는 그것으로 몇 가지 실험을 했지만, 그것은 우리의 수요를 만족시키지 못했다. 주로 우리가 서로 다른 배치/서비스에서 코드를 분리할 수 없기 때문이다.이와 동시에 Dagster는 이미 성숙했다(Github에는 2.2k개의 별이 있다!)그에 따른 것은 사용자 코드 배치다.
사용자 코드 배포를 사용하면 파이프 코드를 Dagit 이미지와 분리할 수 있습니다.이것은 전체 Dagster 시스템을 재배치할 필요가 없이 사용자 코드를 업데이트할 수 있다는 것을 의미합니다!
모든 배치에는 별도의 코드 저장소가 있을 수 있다.이것은 조직 내의 독립 단체가 자신의 이미지를 관리할 수 있도록 한다.
이 글은 Dagster's official documentation 옆에 있는 추가 정보로 사용할 수 있다.연습은 네 가지 부분을 포함한다.
선결 조건
1. 사용자 코드가 있는 Docker 이미지
우선, 사용자 코드, 즉 Dagster 파이프가 있는 Dagster 저장소를 포함하는 docker 이미지를 만들 것입니다.이것은 Dagit 인스턴스에 사용할 이미지가 아닙니다.
이것은 사용자 코드를 Dagit/Dagster 시스템 코드에서 분리하는 데 두 가지 장점을 가진다.
새 프로젝트를 시작하여python 파일 두 개(celery_pipeline.py,repos.py), yaml 파일 (workspace.yaml), Dockerfile 하나를 만듭니다.모든 파일은 같은 디렉토리에 있어야 합니다.
project
│
└───image
│ celery_pipeline.py
| repos.py
│ workspace.yaml
| Dockerfile
이것은 우리의 파이프 파일입니다. 같은solid를 다섯 번 실행할 것입니다.본고의 뒤에 Dagster를 설치할 것입니다. 기본값(사용자 코드 배치 제외)을 사용합니다. 이것은 우리가 파이프를 실행할 미나리가 있다는 것을 의미합니다.다음 코드를 celery_pipeline.py
파일에 붙여넣습니다.미나리 파이프.회사 명
"""
A basic pipeline that can be executed with Celery.
"""
from dagster_celery_k8s import celery_k8s_job_executor
from dagster import ModeDefinition, default_executors, pipeline, solid
celery_mode_defs = [ModeDefinition(executor_defs=default_executors + [celery_k8s_job_executor])]
@solid
def not_much(_):
return
@pipeline(mode_defs=celery_mode_defs)
def parallel_pipeline():
for i in range(5):
not_much.alias("not_much_" + str(i))()
이제 파이프를 포함하는 저장소를 만들 것입니다.이 코드를 repos.py
에 추가합니다.환매 협의회사 명
"""
Simple repository that contains our parallel pipeline.
"""
import sys
from dagster import repository
from dagster.utils import script_relative_path
sys.path.append(script_relative_path("."))
from celery_pipeline import parallel_pipeline
@repository
def example_repository():
return [parallel_pipeline]
작업공간에서는 Dagster에 있는 저장소와 로드할 위치를 알려 줍니다.다음 코드를 복사하여 workspace.yaml
파일에 붙여넣습니다.작업공간아마르
# Yaml for loading our single repository from our repos.py file:
load_from:
- python_file: repos.py
마지막으로, 우리는 Dockerfile을 가지고 있으며, 사용자 코드 이미지를 구축하는 데 사용할 것입니다.Dockerfiles가 어떻게 작동하는지 알고 싶어요.이것은 두 가지 일을 할 수 있다.COPY . /
.Dockerfile 파일
FROM python:3.7.8-slim
# This tutorial was written using Dagster 0.9.12
ARG DAGSTER_VERSION=0.9.12
RUN apt-get update -yqq && \
apt-get install -yqq cron
RUN pip install \
dagster==${DAGSTER_VERSION} \
dagster-graphql==${DAGSTER_VERSION} \
dagster-postgres==${DAGSTER_VERSION} \
dagster-cron==${DAGSTER_VERSION} \
dagster-celery[flower,redis,kubernetes]==${DAGSTER_VERSION} \
dagster-aws==${DAGSTER_VERSION} \
dagster-k8s==${DAGSTER_VERSION} \
dagster-celery-k8s==${DAGSTER_VERSION} \
dagit==${DAGSTER_VERSION}
COPY . /
이미지 구축 및 푸시
이미지 디렉터리로 이동하여docker 이미지를 구축합니다.Kubernetes 클러스터에서 액세스할 수 있는 docker 레지스트리를 사용합니다.
# F.e. docker build -t us.gcr.io/company-12345/user_code:0.1 .
docker build -t YOUR_REGISTRY/user_code:0.1 .
docker push YOUR_REGISTRY/user_code:0.1
2. 가치관.아마르
다음은 Dagster를 배포할 때 Helm에 제공할 파일values.yaml
을 구성합니다.
다운로드default values.yaml file 및 사용자 코드 배포를 지원하기 위해 다음과 같은 변경 사항을 수행합니다.
userDeployments:
# Whether launching user deployments is enabled.
enabled: true
# List of unique deployments using images that contain your
# user code.
deployments:
- name: "k8s-example-user-code-1"
image:
# Use the image that you created in the previous step, without the tag
# F.e. us.gcr.io/company-12345/user_code
repository: "YOUR_REGISTRY/user_code"
tag: 0.1
# Make sure these arguments are the same as your repository file name
# and repository name.
# Arguments to `dagster api grpc`.
dagsterApiGrpcArgs:
- "--python-file"
- "repos.py"
- "--attribute"
- "example_repository"
3. Dagster 설치
새 네임스페이스 만들기 시작:
kubectl create namespace dagster-walkthrough
이제 Helm을 사용하여 Dagster와 Dagit을 Kubernetes 클러스터에 배치할 준비가 되었습니다.GKE 중부에 배치했습니다.)
helm repo add dagster https://dagster-io.github.io/helm
helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml
시간을 주면 이런 것을 볼 수 있다.
문제가 발생한 경우 다시 시작하려면 전체 버전을 삭제하고 다시 설치하는 것이 좋습니다.
helm delete dagster -n dagster-walkthrough
helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml
4. 파이프라인 경로설정
현재 우리는 이미 모든 설치를 마쳤으니, 우리는 파이프를 운행하기 시작할 수 있다.UI(Dagit) 또는 GraphQL을 사용하여 파이핑을 실행하는 방법을 보여 드리고 싶습니다.
다지트
Dagit UI로 이동하여 샘플 저장소에서 병렬 파이핑을 선택합니다.놀이공원 옵션 카드를 클릭하여 다음 달리기 설정을 추가합니다.
실행 구성
execution:
celery-k8s:
config:
job_namespace: dagster-walkthrough
env_config_maps:
- "dagster-pipeline-env"
image_pull_policy: "Always"
storage:
filesystem:
오른쪽 아래에 있는'시작 실행'을 누르십시오.Dagster는 파이프 운행 계획을 훑어보고 미나리 대기열에 절차를 제출하는 실행 조율기 Kubernetes 작업을 시작합니다.
단계 실행은 미나리 일꾼이 맡고 단계마다 Kubernetes 작업이 생성됩니다.다음과 같은 Kubernetes 리소스가 있어야 합니다.
실행 결과
Dagit에서 파이프의 진행 상황을 감시할 수 있습니다. 다섯 줄을 표시해야 합니다.각 행은 다음 작업을 나타냅니다.
그래픽 QL
GraphQL 실행 파이프를 사용하려면 GraphQL 끝점http://localhost/graphql에 요청을 보내거나 운동장에서 실행할 수 있습니다.
mutation ExecutePipeline(
$repositoryLocationName: String!
$repositoryName: String!
$pipelineName: String!
$runConfigData: RunConfigData!
$mode: String!
) {
launchPipelineExecution(
executionParams: {
selector: {
repositoryLocationName: $repositoryLocationName
repositoryName: $repositoryName
pipelineName: $pipelineName
}
runConfigData: $runConfigData
mode: $mode
}
) {
__typename
... on LaunchPipelineRunSuccess {
run {
runId
}
}
... on PipelineConfigValidationInvalid {
errors {
message
reason
}
}
... on PythonError {
message
}
}
}
쿼리 변수
{
"repositoryName": "example_repository",
"repositoryLocationName": "k8s-example-user-code-1",
"pipelineName": "parallel_pipeline",
"mode": "default",
"runConfigData": {
"execution": {
"celery-k8s": {
"config": {
"job_namespace": "dagster-walkthrough",
"env_config_maps": [
"dagster-pipeline-env"
],
"image_pull_policy": "Always"
}
}
},
"storage": {
"filesystem": null
}
}
}
결론
이 글을 통해 저는 Dagster와 사용자 코드 배치를 설정하는 데 시간을 절약할 수 있기를 바랍니다.만약 당신에게 무슨 문제가 있으면 마음대로 평론해 주십시오. 나는 최선을 다해 당신을 도울 것입니다.
이것은 저의 첫 번째 기술 블로그 글입니다. 당신의 건설적인 피드백에 매우 감사합니다.읽어주셔서 감사합니다!
리소스
https://docs.dagster.io/deploying/celery
https://docs.dagster.io/deploying/k8s_part2
https://docs.dagster.io/overview/graphql-api#launch-a-pipeline-run
https://docs.dagster.io/_apidocs/libraries/dagster_celery_k8s#dagster_celery_k8s.celery_k8s_job_executor
https://github.com/dagster-io/dagster/blob/master/helm/dagster/values.yaml
Reference
이 문제에 관하여(사용자 코드 배포가 있는 Dagster(gRPC)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/michielhub/dagster-with-user-code-deployments-grpc-2c16
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
userDeployments:
# Whether launching user deployments is enabled.
enabled: true
# List of unique deployments using images that contain your
# user code.
deployments:
- name: "k8s-example-user-code-1"
image:
# Use the image that you created in the previous step, without the tag
# F.e. us.gcr.io/company-12345/user_code
repository: "YOUR_REGISTRY/user_code"
tag: 0.1
# Make sure these arguments are the same as your repository file name
# and repository name.
# Arguments to `dagster api grpc`.
dagsterApiGrpcArgs:
- "--python-file"
- "repos.py"
- "--attribute"
- "example_repository"
새 네임스페이스 만들기 시작:
kubectl create namespace dagster-walkthrough
이제 Helm을 사용하여 Dagster와 Dagit을 Kubernetes 클러스터에 배치할 준비가 되었습니다.GKE 중부에 배치했습니다.)helm repo add dagster https://dagster-io.github.io/helm
helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml
시간을 주면 이런 것을 볼 수 있다.문제가 발생한 경우 다시 시작하려면 전체 버전을 삭제하고 다시 설치하는 것이 좋습니다.
helm delete dagster -n dagster-walkthrough
helm install dagster dagster/dagster -n dagster-walkthrough -f /path/to/values.yaml
4. 파이프라인 경로설정
현재 우리는 이미 모든 설치를 마쳤으니, 우리는 파이프를 운행하기 시작할 수 있다.UI(Dagit) 또는 GraphQL을 사용하여 파이핑을 실행하는 방법을 보여 드리고 싶습니다.
다지트
Dagit UI로 이동하여 샘플 저장소에서 병렬 파이핑을 선택합니다.놀이공원 옵션 카드를 클릭하여 다음 달리기 설정을 추가합니다.
실행 구성
execution:
celery-k8s:
config:
job_namespace: dagster-walkthrough
env_config_maps:
- "dagster-pipeline-env"
image_pull_policy: "Always"
storage:
filesystem:
오른쪽 아래에 있는'시작 실행'을 누르십시오.Dagster는 파이프 운행 계획을 훑어보고 미나리 대기열에 절차를 제출하는 실행 조율기 Kubernetes 작업을 시작합니다.
단계 실행은 미나리 일꾼이 맡고 단계마다 Kubernetes 작업이 생성됩니다.다음과 같은 Kubernetes 리소스가 있어야 합니다.
실행 결과
Dagit에서 파이프의 진행 상황을 감시할 수 있습니다. 다섯 줄을 표시해야 합니다.각 행은 다음 작업을 나타냅니다.
그래픽 QL
GraphQL 실행 파이프를 사용하려면 GraphQL 끝점http://localhost/graphql에 요청을 보내거나 운동장에서 실행할 수 있습니다.
mutation ExecutePipeline(
$repositoryLocationName: String!
$repositoryName: String!
$pipelineName: String!
$runConfigData: RunConfigData!
$mode: String!
) {
launchPipelineExecution(
executionParams: {
selector: {
repositoryLocationName: $repositoryLocationName
repositoryName: $repositoryName
pipelineName: $pipelineName
}
runConfigData: $runConfigData
mode: $mode
}
) {
__typename
... on LaunchPipelineRunSuccess {
run {
runId
}
}
... on PipelineConfigValidationInvalid {
errors {
message
reason
}
}
... on PythonError {
message
}
}
}
쿼리 변수
{
"repositoryName": "example_repository",
"repositoryLocationName": "k8s-example-user-code-1",
"pipelineName": "parallel_pipeline",
"mode": "default",
"runConfigData": {
"execution": {
"celery-k8s": {
"config": {
"job_namespace": "dagster-walkthrough",
"env_config_maps": [
"dagster-pipeline-env"
],
"image_pull_policy": "Always"
}
}
},
"storage": {
"filesystem": null
}
}
}
결론
이 글을 통해 저는 Dagster와 사용자 코드 배치를 설정하는 데 시간을 절약할 수 있기를 바랍니다.만약 당신에게 무슨 문제가 있으면 마음대로 평론해 주십시오. 나는 최선을 다해 당신을 도울 것입니다.
이것은 저의 첫 번째 기술 블로그 글입니다. 당신의 건설적인 피드백에 매우 감사합니다.읽어주셔서 감사합니다!
리소스
https://docs.dagster.io/deploying/celery
https://docs.dagster.io/deploying/k8s_part2
https://docs.dagster.io/overview/graphql-api#launch-a-pipeline-run
https://docs.dagster.io/_apidocs/libraries/dagster_celery_k8s#dagster_celery_k8s.celery_k8s_job_executor
https://github.com/dagster-io/dagster/blob/master/helm/dagster/values.yaml
Reference
이 문제에 관하여(사용자 코드 배포가 있는 Dagster(gRPC)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/michielhub/dagster-with-user-code-deployments-grpc-2c16
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
execution:
celery-k8s:
config:
job_namespace: dagster-walkthrough
env_config_maps:
- "dagster-pipeline-env"
image_pull_policy: "Always"
storage:
filesystem:
mutation ExecutePipeline(
$repositoryLocationName: String!
$repositoryName: String!
$pipelineName: String!
$runConfigData: RunConfigData!
$mode: String!
) {
launchPipelineExecution(
executionParams: {
selector: {
repositoryLocationName: $repositoryLocationName
repositoryName: $repositoryName
pipelineName: $pipelineName
}
runConfigData: $runConfigData
mode: $mode
}
) {
__typename
... on LaunchPipelineRunSuccess {
run {
runId
}
}
... on PipelineConfigValidationInvalid {
errors {
message
reason
}
}
... on PythonError {
message
}
}
}
{
"repositoryName": "example_repository",
"repositoryLocationName": "k8s-example-user-code-1",
"pipelineName": "parallel_pipeline",
"mode": "default",
"runConfigData": {
"execution": {
"celery-k8s": {
"config": {
"job_namespace": "dagster-walkthrough",
"env_config_maps": [
"dagster-pipeline-env"
],
"image_pull_policy": "Always"
}
}
},
"storage": {
"filesystem": null
}
}
}
이 글을 통해 저는 Dagster와 사용자 코드 배치를 설정하는 데 시간을 절약할 수 있기를 바랍니다.만약 당신에게 무슨 문제가 있으면 마음대로 평론해 주십시오. 나는 최선을 다해 당신을 도울 것입니다.
이것은 저의 첫 번째 기술 블로그 글입니다. 당신의 건설적인 피드백에 매우 감사합니다.읽어주셔서 감사합니다!
리소스
https://docs.dagster.io/deploying/celery
https://docs.dagster.io/deploying/k8s_part2
https://docs.dagster.io/overview/graphql-api#launch-a-pipeline-run
https://docs.dagster.io/_apidocs/libraries/dagster_celery_k8s#dagster_celery_k8s.celery_k8s_job_executor
https://github.com/dagster-io/dagster/blob/master/helm/dagster/values.yaml
Reference
이 문제에 관하여(사용자 코드 배포가 있는 Dagster(gRPC)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/michielhub/dagster-with-user-code-deployments-grpc-2c16
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
Reference
이 문제에 관하여(사용자 코드 배포가 있는 Dagster(gRPC)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/michielhub/dagster-with-user-code-deployments-grpc-2c16텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)