TFX: 시작하기 튜토리얼 - 시작 파이프라인
TFX 공식 홈페이지
펭귄 데이터 세트를 사용한 간단한 TFX 파이프라인 튜토리얼
개요
- 간단한 TFX 파이프라인을 실행하는 짧은 자습서
- 데이터 가져오기, 모델 학습 및 학습된 모델 내보내기와 같은 가장 최소한의 ML 워크플로
- 세 가지 필수 TFX 구성요소: ExampleGen, Trainer 및 Pusher
TFX 파이프라인 이해하기
노트북 코드
TFX 설치하기
pip install -U tfx
- tfx 설치
- 설치 후에 런타임을 다시 시작해야함. Colab이 패키지를 로드하는 방식 때문!
Tenosorflow 및 TFX 버전 확인
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
>>>>>>>>>>>>>>>>
TensorFlow version: 2.6.2
TFX version: 1.4.0
변수 설정
import os
PIPELINE_NAME = "penguin-simple"
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.
- 파이프라인을 정의하는데 사용되는 몇 가지 변수가 있음
- 이러한 변수를 원하는 대로 사용 지정 가능
- 기본적으로 파이프라인의 모든 출력은 현재 디렉터리 아래에 생성
os.path.join
- os.path.join이 좋은 이유
os.path.join 함수는 운영체제에 맞게 폴더 구분자를 다뤄서 경로를 생성
# Windows
import os
os.path.join('a', 'b', 'c')
>>>
a\b\c
pip install -U tfx
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
>>>>>>>>>>>>>>>>
TensorFlow version: 2.6.2
TFX version: 1.4.0
import os
PIPELINE_NAME = "penguin-simple"
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.
os.path.join
os.path.join 함수는 운영체제에 맞게 폴더 구분자를 다뤄서 경로를 생성
# Windows
import os
os.path.join('a', 'b', 'c')
>>>
a\b\c
→ 사실 그동안 슬래쉬 그으면 되는걸 왜 쓰나 했는데 운영체제에 따라 달라지는 것을 고려한 것!
- 그런데 어차피 주피터노트북에서 사용하면 os가 같은것이 보장된게 아닌가?
- 아직도 꼭 왜 써야 하는지 모르겠다!
absl
Google의 내부 코드베이스의 가장 근반이되는 부분으로부터 만들어진 C++ 라이브러리의 오픈소스 모음!
예시 데이터 준비
import urllib.request
import tempfile
DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data') # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
[urllib](https://docs.python.org/3/library/urllib.html#module-urllib)
URLs와 함께 작동하기 위해 몇가지 모듈들을 모은 패키지
requests
라이브러리와 큰 차이는 없음
[tempfile](https://docs.python.org/ko/3/library/tempfile.html)
임시 파일과 디렉토리 생성
- 임시 파일이면 만들어졌다가 없어지는건가?
파이프라인 생성
_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
Tensorflow Transform
TensorFlow로 데이터를 전처리하기 위한 라이브러리
- 다음과 같이 전체 패스가 필요한 데이터에 유용
- 평균 및 표준 편차로 입력 값을 정규화
- 모든 입력 값에 대해 어휘를 생성하여 문자열을 정수로 변환
- 관찰된 데이터 분포를 기반으로 수레를 버킷에 할당하여 부동 소수점을 정수로 변환
- Apache Beam과 Apache Arrow가 종속성으로 필요한 것이 특징
tfx_bsl.public.tfxio
TFXIO는 모든 TFX 라이브러리 및 구성 요소가 공유하는 공통 메모리 내 데이터 표현과 이러한 표현을 생성하기 위한 I/O 추상화 계층을 정의
schema_pb2
Main aliases: tfmd.proto.v0.schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
- 우리가 스키마를 생성하지 않기 때문에 대신에 feature spec을 생성함.
- 스키마를 생성한다면 tfx에서 schemaGEN을 사용해주는건가?
tf.io.FixedLenFeature
고정된 길이의 input feature를 파싱하기 위한 구성요소
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
학습을 위한 특징들과 라벨을 생성함
file_pattern
: 인풋 tfrecord 파일들의 경로들 혹은 패턴의 리스트data_accessor
: input을 RecordBatch로 전환하기 위한 DataAccessorshcema
: 인풋 데이터의 스키마batch_size
: 단일 배치에서 결합하기위해 리턴된 데이터셋의 연이은 요소의 수를 대표 → 우리가 일반적으로 아는 배치사이즈. 말로 푸니까 되게 어렵네return
: 데이터셋 튜플(features, indices)
- features → Tensors 딕셔너리
- indices → 라벨 인덱스들의 단일 텐서
def _build_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
- 펭귄 데이터를 분류하기 위한 DNN Keras Model 생성
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _build_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
- 이 스키마는 주로 SchemaGen 혹은 pipeline의 작성자에 의해 제공된 manually-curated version임
- 스키마는 만약에 Transform component가 사용되면 TFT graph 에서 파생됨
- 둘중 하나가 누락된 경우,
schema_from_feature_spec
은 매우 단순한 feature_spce에서부터 스키마를 제공하는데 사용될 수 있음. 하지만 리턴된 스키마는 매우 primitive(원시적)할 것임 - 학습 결과는 fn_args.serving_model_dir 폴더에 저장됨
파이프라인 정의 작성
TFX 파이프라인을 생성하는 함수를 정의.
Pipeline
객체는 하나의 TFX pipeline을 대표함. 이 파이프라인은 TFX가 지원하는 pipeline orchestration system 중 하나를 이용하여 작동시킬 수 있음
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
metadata_path: str) -> tfx.dsl.Pipeline:
"""Creates a three component penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components)
파이프라인 실행
- TFX는 파이프라인을 실행하기 위한 여러 오케스트레이터 지원.
- 이 튜토리얼에서는
LocalDagRunner
를 사용 - DAG
- 방향성 순환 그래프.
- TFX 파이프라인을 나타내기도 함
LocalDagRunner
: 개발 및 디버깅을 위한 빠른 반복을 제공- Kubeflow Pipelines 및 Apache Airflow를 비롯한 다른 오케스트레이터도 지원
- 다른 오케스트레이션 시스템에 대해서 배우기
LocalDagRunner
를 생성하고 우리가 이미 정의한 함수에 의해 생성된Pipeline
객체를 전달함. 파이프라인은 곧장 실행되고 ML model training을 포함하여 pipeline의 진행 로그를 볼 수 있음
tfx.orchestration.LocalDagRunner().run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=_trainer_module_file,
serving_model_dir=SERVING_MODEL_DIR,
metadata_path=METADATA_PATH))
- 만약 성공적으로 작동한다면
INFO:absl:Component Pusher is finished.
를 마지막 로그에서 볼 수 있음. 왜냐하면Pusher
컴포넌트는 파이프라인의 마지막 구성요소임 - pusher component는 학습된 모델을 SERVING_MODEL_DIR (
serving_model/penguin-simple
) 에 푸시함
Author And Source
이 문제에 관하여(TFX: 시작하기 튜토리얼 - 시작 파이프라인), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@taekkim/TFX-시작하기-튜토리얼-시작-파이프라인저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)