Postgres를 사용하여 비동기 API 만들기 - 체스 분석 응용 프로그램 구축(섹션 3)

우리last post에서 우리는 국제 장기의 위치를 검증하고 분석하는 동기화 API를 구축했다.웹 서버에서 CPU 집약형 작업을 동기화하는 것은 좋은 생각이 아닌 것 같습니다.
이 문서에서 Postgres를 사용하여 동기화 API를 비동기 API로 변환합니다.

비동기 분석 요청


웹 서버에서 분석을 기다리게 할 수는 없습니다.이상적인 상황에서 우리는 대량의 분석의 요구에 적응하고 과중한 부담을 줄 수 있기를 바란다.이것은 대열을 사용하기에 좋은 시기인 것 같다.

줄 서기란 무엇입니까?


대기열 뒤에는 요청을 즉시 분석하는 것이 아니라 ID와 함께 나중에 분석할 수 있도록 한 위치에 저장하는 것이 기본입니다.

ID를 사용하여 분석이 준비되었는지 확인할 수 있습니다.

또 다른 일부 서비스가 실행 중이다.그것은 대기열에서 체스의 위치를 읽고 위치를 분석하며 결과를 어느 곳에 저장할 것이다.대기열에서 그것들을 읽고 처리하는 서비스를 보통worker라고 부른다.

이런 방법은 많은 장점이 있는데, 그 중 하나는 수요에 따라 직원 수를 늘리거나 줄일 수 있다는 것이다.

우리 줄 어떻게 서요?


대열을 실제로 실현할 수 있는 많은 방법이 있다.Kafka 또는 RabbitMQ 같은 제품이 있습니다.AWS SQS 또는 AWS Kinesis 등의 위탁 관리 서비스가 있다.Celery 같은 라이브러리는 Redis나 RabbitMQ와 함께 사용할 수 있습니다.스태프에게는 AWS Lambda 같은 무서버 기능을 사용할 수도 있고 한 그룹의 서버를 사용하여 작업을 기다릴 수도 있다.
우리의 예에서 우리는 Postgres를 대열로 사용할 것이다. 주로 두 가지 이유가 있다.
  • 어쨌든 우리는 데이터베이스가 필요하기 때문에 Postgres를 대기열과 데이터베이스로 사용하는 것은 관리해야 할 것이 더 적다는 것을 의미한다.
  • 저는 개인적으로 SQL 디버그 대기열을 통과하는 것을 좋아합니다.우리는 지금 발생하고 있는 일에 대해 익숙한 언어로 많은 것을 이해할 수 있다.
  • 대기열로 Postgres 사용


    간단한 대기열을 만들려면 다음과 같은 세 가지 함수만 필요합니다.
  • add_request_to_queue(request) - 나중에 처리할 수 있도록 요청을 대기열에 추가
  • claim_unprocessed_request() - 처리되지 않은 요청을 반환합니다.우리가 완성하기 전에 다른 누구도 이 요구를 제기할 수 없다.
  • finish_processing_request(request, result) - 처리 요청의 결과를 저장합니다.
  • 중요한 것은 한 직원이 청구할 때 다른 직원이 없어도 청구할 수 있다는 것이다.그렇지 않으면, 우리는 단지 여러 명의 노동자가 같은 임무를 하도록 시간을 낭비할 뿐이다.
    만약 우리가 queue라는 시계를 가지고 있다면.add_request_to_queue Postgres의 경우 SQL 쿼리가 간단합니다.
    INSERT INTO queue (id, request, status, created_at) 
               VALUES (gen_random_uuid(), :request, 'unprocessed', now())
    
    여기서:request는 직원이 요청을 처리하는 데 필요한 모든 정보입니다.우리의 예에서 이것이 바로 우리가 분석하고자 하는 국제 장기의 입장과 그 어떠한 논점이다.
    첫 번째 통과claim_unprocessed_request는 간단한 select 문장일 수 있다.
    SELECT id, request FROM queue 
    WHERE status = 'unprocessed' 
    ORDER BY created_at
    LIMIT 1
    
    그러나 다른 누구도 같은 요구를 하는 것을 막을 수는 없다.
    다행히도 박사 후에는 두 가지 조항이 우리를 도울 수 있다.
    SELECT id, request FROM queue 
    WHERE status = 'unprocessed' 
    ORDER BY created_at
    FOR UPDATE SKIP LOCKED
    LIMIT 1
    
    FOR UPDATE 줄을 잠그십시오. 왜냐하면 우리는 그것을 업데이트할 계획이기 때문입니다.SKIP LOCKED 현재 잠긴 모든 행이 생략됩니다.이렇게 되면 두 명의 노동자가 같은 소대를 빼앗지 않을 것이다.finish_processing_request 분석 결과로 줄을 업데이트할 수 있다.우리는 select 문장과 같은 업무에서 이 조작을 실행해야 한다. (이것이 바로 우리가 사용한 이유 FOR UPDATE 이기 때문에 전체 절차는 다음과 같다.
  • 거래 개시
  • 사용FOR UPDATE SKIP LOCKED 선택 요청
  • 처리 요청
  • 결과를 저장하고 상태를 비처리
  • 트랜잭션 제출
  • 연금술


    이제 우리는 우리가 무엇을 해야 하는지 알게 되었다. 그것을python으로 번역하자.ORMSQLAlchemy을 사용하여 데이터베이스와 상호작용을 하고 ORMpsycopg2을 Postgres 어댑터로 사용할 것입니다.
    # this is a library which adds some flask-specific features to sqlalchemy
    (venv) $ pip install flask-sqlalchemy 
    (venv) $ pip install psycopg2-binary
    
    Postgres를 설정하고 설치해야 합니다. Docker나 그들의 installers 를 사용해서 완성할 수 있습니다.
    이후에 우리는 데이터베이스 테이블을 설정할 수 있다.앞의 예시에서 우리는 status열이 하나 있다.이 예에서, 우리는 빈 analysis_result 열 하나만 사용할 것이다.오류 상태가 발생하는 것을 허용하지 않기 때문에, 그것의 통용성은 약간 떨어지지만, 확장하기 쉬워야 한다.
    우리는 우리의 모형을 새 파일 models.py 에 넣을 것이다.
    from datetime import datetime
    
    import sqlalchemy
    from flask_sqlalchemy import SQLAlchemy
    from sqlalchemy.dialects.postgresql import UUID, JSON
    
    db = SQLAlchemy()
    
    
    class Analysis(db.Model):
        # Let the DB generate a UUID for us
        id = db.Column(UUID(as_uuid=True), primary_key=True, server_default=sqlalchemy.text("gen_random_uuid()"), )
    
        # The 4 parameters to our request
        fen = db.Column(db.String(), nullable=False)
        num_moves_to_return = db.Column(db.INT, nullable=False)
        time_limit = db.Column(db.FLOAT, nullable=False)
        depth_limit = db.Column(db.INT)
    
        # The result of an analysis. Null means we haven't processed it yet
        analysis_result = db.Column(JSON())
    
        created_at = db.Column(db.TIMESTAMP, default=datetime.utcnow, nullable=False)
        updated_at = db.Column(db.TIMESTAMP, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
    
        # A conditional index on created_at, whenever analysis_result is null
        __table_args__ = (
            db.Index('ordered_by_created_at', created_at, postgresql_where=(analysis_result.is_(None))),
        )
    
    그리고python 컨트롤러에서 이 테이블을 만들 수 있습니다.
    (venv) $ python
    # ...
    >>> from models import db
    >>> db.create_all()
    

    API 완료


    우리의 API는 곧 완성될 것이다.섹션 app.py 의 나머지 코드를 보여 줍니다.
    먼저 SQLALCHEMY 구성 값을 설정하고 호출해야 합니다db.init_app(app).
    # ... same as before
    
    from models import db, Analysis
    
    app = Flask(__name__)
    # This is the default URI for postgres
    app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://postgres:postgres@localhost:5432/postgres'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    db.init_app(app)
    
    다음에 우리는 analyze 함수를 업데이트하여 동기화 분석을 하지 않고 분석을 데이터베이스에 저장하여 나중에 처리할 것이다.데이터베이스는 자동으로 생성된 UUID를 반환하고 사용자에게 반환합니다.
    @app.route("/analyze", methods=["POST"])
    def analyze():
        # Validate the request, same as before
        parsed_request = parse_request(request.get_json())
    
        # Save an analysis object to the database
        analysis = Analysis(
            fen=parsed_request.get("fen"),
            num_moves_to_return=parsed_request.get("num_moves_to_return"),
            time_limit=parsed_request.get("time_limit"),
            depth_limit=parsed_request.get("depth_limit"),
        )
        db.session.add(analysis)
        db.session.commit()
    
        # The id is automatically populated after commiting
        return {"id": analysis.id}
    
    마지막으로, 우리는 사용자들이 그들의 분석을 받을 수 있도록 몇 가지 방법이 필요하다.UUID를 적용할 새 엔드포인트를 만들고 해당 엔드포인트가 UUID로 전송되는지 확인한 다음 데이터베이스에서 UUID를 가져옵니다.데이터베이스 객체가 설정된 경우analysis_result 분석된 것으로 알고 있습니다.그렇지 않으면, 우리는 그것이 여전히 분석을 기다리고 있다고 말할 수 있다.
    # Can move this to our parsers file
    # Make sure the UUID we are provided is valid
    def parse_uuid(possible_uuid):
        try:
            return uuid.UUID(possible_uuid)
        except ValueError:
            return None
    
    
    @app.route("/analysis", methods=["GET"])
    def get_analysis():
        # 404 if the ID is invalid or not in our database
        id = parse_uuid(request.args.get("id"))
        if id is None:
            return abort(404)
        analysis = Analysis.query.get_or_404(id)
    
        if analysis.analysis_result is None:
            return {"status": "pending"}
        else:
            return {"status": "done", "result": analysis.analysis_result}
    
    API가 그렇습니다.우리는 분석할 체스의 위치를 제출한 후에 ID를 얻을 수 있습니다. 그리고 ID를 조회할 수 있습니다. 이 ID는 분석이 준비될 때까지 pending 되돌아갈 것입니다.서버를 실행하고 curl을 사용하여 다음 사항을 확인할 수 있습니다.
    $ curl -X POST 
           -H "Content-Type: application/json" 
           -d '{"fen": "8/8/6P1/4R3/8/6k1/2r5/6K1 b - - 0 1"}' localhost:5000/analyze
    {"id":"c5b0a5d9-2427-438c-bd4a-6e9afe135763"}
    
    $ curl "localhost:5000/analysis?id=c5b0a5d9-2427-438c-bd4a-6e9afe135763"
    {"status":"pending"}
    
    현재 유일한 문제는 체스의 위치를 진정으로 분석하는 사람이 없다는 것이다.우리는 영원히 돌아올 것이다pending.이제 우리가 이 문제를 해결합시다.

    국제 장기 분석원을 창설하다


    Flask는 웹 서버 이외에도 지원CLI합니다.우리는 데이터베이스에서 발견된 내용을 무한 순환으로 읽고 처리하도록 명령을 내릴 수 있다.생산 과정에서 우리는 시스템d와 같은 도구를 사용하여 노동자의 생명을 유지할 수 있다. 우리는 생성된 프로세스 수량을 제어하여 모든 기계가 병행 분석할 수 있는 위치 수량을 제어할 수 있다.
    상단부터 worker에서 app.py 명령을 내립시다.
    @app.cli.command("worker")
    def worker():
        with app.app_context():
            run_worker()
    
    그리고 우리는 새로운 문서run_worker에서 실현할 것이다worker.py.
    import time
    from chessengine import analyze_position
    from models import Analysis, db
    
    # Runs forever
    def run_worker():
        while True:
            try:
                did_work = fetch_and_analyze()
    
                # If there was nothing to analyze, sleep for a bit
                #   to not overload the DB
                if not did_work:
                    time.sleep(5)
            except Exception as err:
                print(f"Unexpected {err=}, {type(err)=}")
    
    def fetch_and_analyze():
        # This is the SQL query we saw before
        # We are looking for rows with no result yet
        #  and we order by created_at to take older requests first
        to_be_analyzed = Analysis.query.filter(Analysis.analysis_result.is_(None)) \
            .order_by(Analysis.created_at) \
            .with_for_update(skip_locked=True) \
            .limit(1) \
            .first()
    
        if to_be_analyzed is not None:
            # Use the function we created in our first post in this series
            to_be_analyzed.analysis_result = analyze_position(
                fen=to_be_analyzed.fen,
                num_moves_to_return=to_be_analyzed.num_moves_to_return,
                depth_limit=to_be_analyzed.depth_limit,
                time_limit=to_be_analyzed.time_limit
            )
            print("Analyzed {}".format(to_be_analyzed.id))
        else:
            print("Nothing to analyze")
    
        # Do not forget to commit the transaction!
        db.session.commit()
        return to_be_analyzed is not None
    
    이것이 바로 전 직원이다.대부분의 무거운 물건은 analyze_position에 의해 완성된 것이니, 너는 이것을 사용할 수 있다
    모든 동기화 API를 비동기 API의 동일한 구조로 변환합니다.
    새 터미널에서 Worker를 실행하여 다음을 테스트합니다.
    $ source venv/bin/activate
    (venv) $ flask worker
    Analyzed c5b0a5d9-2427-438c-bd4a-6e9afe135763
    Nothing to analyze
    Nothing to analyze
    
    그것은 우리가 전에 제기한 요구를 찾아서 분석했다.curl을 사용하여 웹 서버를 클릭하여 다음을 확인합니다.
    $ curl "localhost:5000/analysis?id=c5b0a5d9-2427-438c-bd4a-6e9afe135763"
    {"result":[{"centipawn_score":null,"mate_score":-2,"pv":["c2c1","e5e1","c1e1"]}],"status":"done"}
    
    우리가 본 상태는done이고 우리의 결과는 우리가 예상한 분석이다.

    마무리


    우리는 무거운 동기화 API를 사용하고 비동기적으로 Postgres를 작업 대기열로 사용할 수 있습니다.Google 사용자는 현재 대량의 체스 위치를 제출할 수 있으며, Google 서버도 과부하되지 않습니다.
    다음 기사에서는 NextJS를 사용하여 응용 프로그램의 UI를 구성합니다.그때 봐요!

    좋은 웹페이지 즐겨찾기