Feast Log #3

Feast Server 생성

jupyter가 아닌 우분투 cli환경에서 생성한다..
server늬까...ㅠ

이전까지는 Docker 컨테이너 환경에서 Jpyter Notebook을 통해서 실행했었음.
지금은 Local 환경에서 Feast Server를 생성하고 실행해볼것임

1. Local에서 Feast Server 실행

Local feature server

- Server 실행

pip install feast
feast init feature_repo
cd feature_repo
feast apply

feast materialize-incremental $(date +%Y-%m-%d)
feast serve

- curl을 이용해서 Query 실행

새 터미널을 열어야함. 기존 터미널은 서버가 돌아가고 있으니까.
# example curl
curl -X POST \
  "http://localhost:6566/get-online-features" \
  -d '{
    "features": [
      "driver_hourly_stats:conv_rate",
      "driver_hourly_stats:acc_rate",
      "driver_hourly_stats:avg_daily_trips"
    ],
    "entities": {
      "driver_id": [1001, 1002, 1003]
    }
  }'

2. Docker Container로 Feast Server 실행

MLOps는 쿠버네티스, 도커를 이용한 이미지-컨테이너 환경에서 돌아감.
Feast는 자체적인 컨테이너 환경 제공은 없음...

맹글어보자

- Build the docker image

requirement.txt 생성

원하는 디렉토리에서 requirement.txt를 생성하자

~/mlops/dokcer/requirement.txt
# requirement.txt
feast
scikit-learn
mlflow
pandas

Dockerfile 생성

# syntax=docker/dockerfile:1
FROM jupyter/base-notebook
WORKDIR /home/jovyan
COPY . /home/jovyan

RUN pip3 install -r requirements.txt

USER jovyan
RUN feast init feature_repo && \
		cd feature_repo && \
		feast apply && \
		feast materialize-incremental $(date +%Y-%m-%d) 

COPY feature_server.py /opt/conda/lib/python3.9/site-packages/feast/feature_server.py
CMD [ "/bin/sh", "-c", "cd /home/jovyan/feature_repo && feast serve"]

WORKDIR /home/jovyan

feature_server.py

feature를 서빙하는 fastAPI server 생성

import json
import traceback
import warnings

import click
import uvicorn
import pandas as pd

from fastapi import FastAPI, HTTPException, Request
from fastapi.logger import logger
from fastapi.params import Depends
from google.protobuf.json_format import MessageToDict, Parse
from pydantic import BaseModel

import feast
from feast import proto_json
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
from feast.type_map import feast_value_type_to_python_type


# TODO: deprecate this in favor of push features
class WriteToFeatureStoreRequest(BaseModel):
    feature_view_name: str
    df: dict
    allow_registry_cache: bool = True


class PushFeaturesRequest(BaseModel):
    push_source_name: str
    df: dict
    allow_registry_cache: bool = True


def get_app(store: "feast.FeatureStore"):
    proto_json.patch()

    app = FastAPI()

    async def get_body(request: Request):
        return await request.body()

    @app.post("/get-online-features")
    def get_online_features(body=Depends(get_body)):
        try:
            # Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object
            request_proto = GetOnlineFeaturesRequest()
            Parse(body, request_proto)

            # Initialize parameters for FeatureStore.get_online_features(...) call
            if request_proto.HasField("feature_service"):
                features = store.get_feature_service(
                    request_proto.feature_service, allow_cache=True
                )
            else:
                features = list(request_proto.features.val)

            full_feature_names = request_proto.full_feature_names

            batch_sizes = [len(v.val) for v in request_proto.entities.values()]
            num_entities = batch_sizes[0]
            if any(batch_size != num_entities for batch_size in batch_sizes):
                raise HTTPException(status_code=500, detail="Uneven number of columns")

            entity_rows = [
                {
                    k: feast_value_type_to_python_type(v.val[idx])
                    for k, v in request_proto.entities.items()
                }
                for idx in range(num_entities)
            ]
            
            response_proto = store._get_online_features(
                features=features,
                entity_rows=entity_rows,
                entity_values=request_proto.entities,
                full_feature_names=full_feature_names,
                native_entity_values=False,
            ).proto

            # Convert the Protobuf object to JSON and return it
            return MessageToDict(  # type: ignore
                response_proto, preserving_proto_field_name=True, float_precision=18
            )
        except Exception as e:
            # Print the original exception on the server side
            logger.exception(traceback.format_exc())
            # Raise HTTPException to return the error message to the client
            raise HTTPException(status_code=500, detail=str(e))

    @app.post("/push")
    def push(body=Depends(get_body)):
        try:
            request = PushFeaturesRequest(**json.loads(body))
            df = pd.DataFrame(request.df)
            store.push(
                push_source_name=request.push_source_name,
                df=df,
                allow_registry_cache=request.allow_registry_cache,
            )
        except Exception as e:
            # Print the original exception on the server side
            logger.exception(traceback.format_exc())
            # Raise HTTPException to return the error message to the client
            raise HTTPException(status_code=500, detail=str(e))

    @app.post("/write-to-online-store")
    def write_to_online_store(body=Depends(get_body)):
        warnings.warn(
            "write_to_online_store is an experimental feature. "
            "This API is unstable and it could be changed in the future. "
            "We do not guarantee that future changes will maintain backward compatibility.",
            RuntimeWarning,
        )
        try:
            request = WriteToFeatureStoreRequest(**json.loads(body))
            df = pd.DataFrame(request.df)
            store.write_to_online_store(
                feature_view_name=request.feature_view_name,
                df=df,
                allow_registry_cache=request.allow_registry_cache,
            )
        except Exception as e:
            # Print the original exception on the server side
            logger.exception(traceback.format_exc())
            # Raise HTTPException to return the error message to the client
            raise HTTPException(status_code=500, detail=str(e))

    return app


def start_server(store: "feast.FeatureStore", host: str, port: int, no_access_log: bool):
    app = get_app(store)
    click.echo(
        "This is an "
        + click.style("experimental", fg="yellow", bold=True, underline=True)
        + " feature. It's intended for early testing and feedback, and could change without warnings in future releases."
    )
    uvicorn.run(app, host=host, port=port, access_log=(not no_access_log))

docker build

# 도커 파일에서 지정한 server.py 실행
#             태그지정         | 현재 디렉토리 파일들을 빌드
docker build --tag feast-docker .

Run the feast docker container

docker run -d --name feast-jupyter -p 8888:8888 -p 6566:6566 -p 5001:5001 -e JUPYTER_TOKEN='password' \
-v "$PWD":/home/jovyan/jupyter \
--user root \
-it feast-docker:latest

앞에서 사용했던 curl문을 사용해보자

juptyer lab 추가 실행

docker exec -it feast-jupyter start.sh jupyter lab &

jupyter-lab


3. Feast Feature Store

FeatureService 추가

feature-repo/example.py에 다음 코드 추가

from feast import FeatureService
driver_fs = FeatureService(name="driver_ranking_fv_svc",
                           features=[driver_hourly_stats_view],
                           tags={"description": "Used for training an ElasticNet model"})

feast apply를 다시 해줘야한다.

  • Feast 주요 명령어
feast --help
feast feature-views list
feast feature-services list
feast feature-services describe <feature_service_name>
feast entities list

feast teardown ## 전부 삭제되므로 주의

좋은 웹페이지 즐겨찾기