Kedro Hooks를 사용한 데이터 유효성 검사 자동화

Kedro 소개 30초



Kedro는 데이터 파이프라인을 실행하기 위한 상당히 독선적인 Python 프레임워크입니다. 높은 수준에서 Kedro는 카탈로그 항목으로 추상화된 데이터 세트로 연결된 노드로 추상화된 일련의 개별 단계로 구성된 DAG 솔버입니다. 노드는 파이프라인이라는 상위 구조로 그룹화되며 노드가 실행되는 순서는 각 노드의 입력-출력의 공통 데이터 종속성에 따라 결정됩니다.

후크



Kedrodocumentation에 따르면 후크를 사용하면 쉽고 일관된 방식으로 Kedro의 기본 실행 동작을 확장할 수 있습니다. 후크는 사양 및 구현에서 빌드됩니다. 아래는 kedro new 를 실행하여 생성된 프로젝트의 일반적인 구조를 보여줍니다.

후크는 hooks.py에서 구현되며 각 관련 후크에 대해 클래스로 그룹화된 관련 함수 세트로 구성되어야 합니다. 그런 다음 후크 클래스를 등록하여 src/<project_name>/settings.py에 후크를 지정합니다. 새로 만든 후크 클래스를 가져와 HOOKS 키에 추가하면 됩니다.

후크가 따라야 하는 이벤트 유형과 실행 시기에 따라 여러 유형의 후크가 있습니다. 이 게시물에서는 데이터가 로드된 후 유효성을 검사하기 위한 하나의 특정 후크에 초점을 맞춥니다.

후크를 사용하여 데이터 유효성 검사



하나의 Kedro 후크after_dataset_loaded를 사용하면 데이터 카탈로그의 항목이 로드될 때마다 사용자 정의 함수를 일관되게 실행할 수 있습니다. 이는 예를 들어 데이터 소스의 배포가 예상대로 이루어지도록 하는 데 유용합니다. 이는 데이터 드리프트 모니터링이 모델의 성능과 신뢰도를 유지하는 데 중요한 머신 러닝 파이프라인을 구축할 때 흔히 발생하는 문제일 수 있습니다. 이 게시물에서는 Population-Stability-Index을 사용하여 데이터 드리프트를 모니터링하는 후크를 작성할 것입니다.

후크 정의



우리는 after_dataset_loaded 후크를 사용하여 (잠재적인) 기계 학습 모델에 대한 데이터의 일관성을 보장할 것입니다. after_dataset_loaded Hook의 정의를 보면:

    @hook_spec
    def after_dataset_loaded(self, dataset_name: str, data: Any) -> None:
        """Hook to be invoked after a dataset is loaded from the catalog.
        Args:
            dataset_name: name of the dataset that was loaded from the catalog.
            data: the actual data that was loaded from the catalog.
        """


후크 정의에는 데이터 세트 이름과 카탈로그에서 로드된 데이터가 필요합니다. (걱정하지 마세요. Kedro에서 처리하므로 실제로 지정할 필요는 없습니다. hooks.py 파일에서 위에서 정의한 인터페이스를 사용하고 hook_impl 데코레이터를 올바르게 추가하면 됩니다. 명명된 함수).

예를 들어 hooks.py라는 새 클래스를 PSIHooks에 만들고 필요한 후크를 만듭니다.

from kedro.framework.hooks import hook_impl

class PSIHooks:

@hook_impl 
def after_dataset_loaded(
  self, 
  dataset_name: str,
  data: Any) -> None:



또한 데이터에 열이 포함되어 있고 배열로 저장된 일련의 값에 대해 각 열의 유효성을 검사하려고 한다고 가정해 보겠습니다. this implementation(내 것이 아님)을 사용하여 PSI에 대해 후크 본문에 다음을 추가할 수 있습니다.

# convert dataframe to numpy matrix 
actual_values = data.values

psi_values = calculate_psi(expected_values, actual_values)

logging.info('f Dataset Name: {dataset_name}')
logging.info('PSI Values')
logging.info(psi_values)


또한 각 개별 데이터 세트의 유효성을 검사하는 데 필요한 데이터를 결정하는 조건부 논리와 일부 데이터 작업의 결과인 데이터 세트에 대한 데이터를 모니터링하지 않는 옵션을 추가할 수 있습니다.

위의 내용은 일반적인 PSI 계산을 설정하지만 PSI가 무엇인지 또는 시간이 지남에 따라 PSI가 어떻게 변경되는지 추적할 방법이 없습니다. 이 경우 후크 본문에서 MLFlow, Neptune.ai 또는 wandb.ai와 같은 실험 추적 프레임워크를 사용하여 PSI가 시간 경과에 따라 어떻게 변경되는지 기록할 수 있습니다.

좋은 웹페이지 즐겨찾기