Postgres를 사용하여 비동기 API 만들기 - 체스 분석 응용 프로그램 구축(섹션 3)
28375 단어 webdevprogrammingpythonbeginners
이 문서에서 Postgres를 사용하여 동기화 API를 비동기 API로 변환합니다.
비동기 분석 요청
웹 서버에서 분석을 기다리게 할 수는 없습니다.이상적인 상황에서 우리는 대량의 분석의 요구에 적응하고 과중한 부담을 줄 수 있기를 바란다.이것은 대열을 사용하기에 좋은 시기인 것 같다.
줄 서기란 무엇입니까?
대기열 뒤에는 요청을 즉시 분석하는 것이 아니라 ID와 함께 나중에 분석할 수 있도록 한 위치에 저장하는 것이 기본입니다.
ID를 사용하여 분석이 준비되었는지 확인할 수 있습니다.
또 다른 일부 서비스가 실행 중이다.그것은 대기열에서 체스의 위치를 읽고 위치를 분석하며 결과를 어느 곳에 저장할 것이다.대기열에서 그것들을 읽고 처리하는 서비스를 보통worker라고 부른다.
이런 방법은 많은 장점이 있는데, 그 중 하나는 수요에 따라 직원 수를 늘리거나 줄일 수 있다는 것이다.
우리 줄 어떻게 서요?
대열을 실제로 실현할 수 있는 많은 방법이 있다.Kafka 또는 RabbitMQ 같은 제품이 있습니다.AWS SQS 또는 AWS Kinesis 등의 위탁 관리 서비스가 있다.Celery 같은 라이브러리는 Redis나 RabbitMQ와 함께 사용할 수 있습니다.스태프에게는 AWS Lambda 같은 무서버 기능을 사용할 수도 있고 한 그룹의 서버를 사용하여 작업을 기다릴 수도 있다.
우리의 예에서 우리는 Postgres를 대열로 사용할 것이다. 주로 두 가지 이유가 있다.
대기열로 Postgres 사용
간단한 대기열을 만들려면 다음과 같은 세 가지 함수만 필요합니다.
add_request_to_queue(request)
- 나중에 처리할 수 있도록 요청을 대기열에 추가claim_unprocessed_request()
- 처리되지 않은 요청을 반환합니다.우리가 완성하기 전에 다른 누구도 이 요구를 제기할 수 없다.finish_processing_request(request, result)
- 처리 요청의 결과를 저장합니다.만약 우리가
queue
라는 시계를 가지고 있다면.add_request_to_queue
Postgres의 경우 SQL 쿼리가 간단합니다.INSERT INTO queue (id, request, status, created_at)
VALUES (gen_random_uuid(), :request, 'unprocessed', now())
여기서:request
는 직원이 요청을 처리하는 데 필요한 모든 정보입니다.우리의 예에서 이것이 바로 우리가 분석하고자 하는 국제 장기의 입장과 그 어떠한 논점이다.첫 번째 통과
claim_unprocessed_request
는 간단한 select 문장일 수 있다.SELECT id, request FROM queue
WHERE status = 'unprocessed'
ORDER BY created_at
LIMIT 1
그러나 다른 누구도 같은 요구를 하는 것을 막을 수는 없다.다행히도 박사 후에는 두 가지 조항이 우리를 도울 수 있다.
SELECT id, request FROM queue
WHERE status = 'unprocessed'
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
FOR UPDATE
줄을 잠그십시오. 왜냐하면 우리는 그것을 업데이트할 계획이기 때문입니다.SKIP LOCKED
현재 잠긴 모든 행이 생략됩니다.이렇게 되면 두 명의 노동자가 같은 소대를 빼앗지 않을 것이다.finish_processing_request
분석 결과로 줄을 업데이트할 수 있다.우리는 select 문장과 같은 업무에서 이 조작을 실행해야 한다. (이것이 바로 우리가 사용한 이유 FOR UPDATE
이기 때문에 전체 절차는 다음과 같다.FOR UPDATE SKIP LOCKED
선택 요청연금술
이제 우리는 우리가 무엇을 해야 하는지 알게 되었다. 그것을python으로 번역하자.ORMSQLAlchemy을 사용하여 데이터베이스와 상호작용을 하고 ORMpsycopg2을 Postgres 어댑터로 사용할 것입니다.
# this is a library which adds some flask-specific features to sqlalchemy
(venv) $ pip install flask-sqlalchemy
(venv) $ pip install psycopg2-binary
Postgres를 설정하고 설치해야 합니다. Docker나 그들의 installers 를 사용해서 완성할 수 있습니다.이후에 우리는 데이터베이스 테이블을 설정할 수 있다.앞의 예시에서 우리는
status
열이 하나 있다.이 예에서, 우리는 빈 analysis_result
열 하나만 사용할 것이다.오류 상태가 발생하는 것을 허용하지 않기 때문에, 그것의 통용성은 약간 떨어지지만, 확장하기 쉬워야 한다.우리는 우리의 모형을 새 파일
models.py
에 넣을 것이다.from datetime import datetime
import sqlalchemy
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.dialects.postgresql import UUID, JSON
db = SQLAlchemy()
class Analysis(db.Model):
# Let the DB generate a UUID for us
id = db.Column(UUID(as_uuid=True), primary_key=True, server_default=sqlalchemy.text("gen_random_uuid()"), )
# The 4 parameters to our request
fen = db.Column(db.String(), nullable=False)
num_moves_to_return = db.Column(db.INT, nullable=False)
time_limit = db.Column(db.FLOAT, nullable=False)
depth_limit = db.Column(db.INT)
# The result of an analysis. Null means we haven't processed it yet
analysis_result = db.Column(JSON())
created_at = db.Column(db.TIMESTAMP, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.TIMESTAMP, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
# A conditional index on created_at, whenever analysis_result is null
__table_args__ = (
db.Index('ordered_by_created_at', created_at, postgresql_where=(analysis_result.is_(None))),
)
그리고python 컨트롤러에서 이 테이블을 만들 수 있습니다.(venv) $ python
# ...
>>> from models import db
>>> db.create_all()
API 완료
우리의 API는 곧 완성될 것이다.섹션
app.py
의 나머지 코드를 보여 줍니다.먼저 SQLALCHEMY 구성 값을 설정하고 호출해야 합니다
db.init_app(app)
.# ... same as before
from models import db, Analysis
app = Flask(__name__)
# This is the default URI for postgres
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://postgres:postgres@localhost:5432/postgres'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
다음에 우리는 analyze
함수를 업데이트하여 동기화 분석을 하지 않고 분석을 데이터베이스에 저장하여 나중에 처리할 것이다.데이터베이스는 자동으로 생성된 UUID를 반환하고 사용자에게 반환합니다.@app.route("/analyze", methods=["POST"])
def analyze():
# Validate the request, same as before
parsed_request = parse_request(request.get_json())
# Save an analysis object to the database
analysis = Analysis(
fen=parsed_request.get("fen"),
num_moves_to_return=parsed_request.get("num_moves_to_return"),
time_limit=parsed_request.get("time_limit"),
depth_limit=parsed_request.get("depth_limit"),
)
db.session.add(analysis)
db.session.commit()
# The id is automatically populated after commiting
return {"id": analysis.id}
마지막으로, 우리는 사용자들이 그들의 분석을 받을 수 있도록 몇 가지 방법이 필요하다.UUID를 적용할 새 엔드포인트를 만들고 해당 엔드포인트가 UUID로 전송되는지 확인한 다음 데이터베이스에서 UUID를 가져옵니다.데이터베이스 객체가 설정된 경우analysis_result
분석된 것으로 알고 있습니다.그렇지 않으면, 우리는 그것이 여전히 분석을 기다리고 있다고 말할 수 있다.# Can move this to our parsers file
# Make sure the UUID we are provided is valid
def parse_uuid(possible_uuid):
try:
return uuid.UUID(possible_uuid)
except ValueError:
return None
@app.route("/analysis", methods=["GET"])
def get_analysis():
# 404 if the ID is invalid or not in our database
id = parse_uuid(request.args.get("id"))
if id is None:
return abort(404)
analysis = Analysis.query.get_or_404(id)
if analysis.analysis_result is None:
return {"status": "pending"}
else:
return {"status": "done", "result": analysis.analysis_result}
API가 그렇습니다.우리는 분석할 체스의 위치를 제출한 후에 ID를 얻을 수 있습니다. 그리고 ID를 조회할 수 있습니다. 이 ID는 분석이 준비될 때까지 pending
되돌아갈 것입니다.서버를 실행하고 curl을 사용하여 다음 사항을 확인할 수 있습니다.$ curl -X POST
-H "Content-Type: application/json"
-d '{"fen": "8/8/6P1/4R3/8/6k1/2r5/6K1 b - - 0 1"}' localhost:5000/analyze
{"id":"c5b0a5d9-2427-438c-bd4a-6e9afe135763"}
$ curl "localhost:5000/analysis?id=c5b0a5d9-2427-438c-bd4a-6e9afe135763"
{"status":"pending"}
현재 유일한 문제는 체스의 위치를 진정으로 분석하는 사람이 없다는 것이다.우리는 영원히 돌아올 것이다pending
.이제 우리가 이 문제를 해결합시다.국제 장기 분석원을 창설하다
Flask는 웹 서버 이외에도 지원CLI합니다.우리는 데이터베이스에서 발견된 내용을 무한 순환으로 읽고 처리하도록 명령을 내릴 수 있다.생산 과정에서 우리는 시스템d와 같은 도구를 사용하여 노동자의 생명을 유지할 수 있다. 우리는 생성된 프로세스 수량을 제어하여 모든 기계가 병행 분석할 수 있는 위치 수량을 제어할 수 있다.
상단부터
worker
에서 app.py
명령을 내립시다.@app.cli.command("worker")
def worker():
with app.app_context():
run_worker()
그리고 우리는 새로운 문서run_worker
에서 실현할 것이다worker.py
.import time
from chessengine import analyze_position
from models import Analysis, db
# Runs forever
def run_worker():
while True:
try:
did_work = fetch_and_analyze()
# If there was nothing to analyze, sleep for a bit
# to not overload the DB
if not did_work:
time.sleep(5)
except Exception as err:
print(f"Unexpected {err=}, {type(err)=}")
def fetch_and_analyze():
# This is the SQL query we saw before
# We are looking for rows with no result yet
# and we order by created_at to take older requests first
to_be_analyzed = Analysis.query.filter(Analysis.analysis_result.is_(None)) \
.order_by(Analysis.created_at) \
.with_for_update(skip_locked=True) \
.limit(1) \
.first()
if to_be_analyzed is not None:
# Use the function we created in our first post in this series
to_be_analyzed.analysis_result = analyze_position(
fen=to_be_analyzed.fen,
num_moves_to_return=to_be_analyzed.num_moves_to_return,
depth_limit=to_be_analyzed.depth_limit,
time_limit=to_be_analyzed.time_limit
)
print("Analyzed {}".format(to_be_analyzed.id))
else:
print("Nothing to analyze")
# Do not forget to commit the transaction!
db.session.commit()
return to_be_analyzed is not None
이것이 바로 전 직원이다.대부분의 무거운 물건은 analyze_position
에 의해 완성된 것이니, 너는 이것을 사용할 수 있다모든 동기화 API를 비동기 API의 동일한 구조로 변환합니다.
새 터미널에서 Worker를 실행하여 다음을 테스트합니다.
$ source venv/bin/activate
(venv) $ flask worker
Analyzed c5b0a5d9-2427-438c-bd4a-6e9afe135763
Nothing to analyze
Nothing to analyze
그것은 우리가 전에 제기한 요구를 찾아서 분석했다.curl을 사용하여 웹 서버를 클릭하여 다음을 확인합니다.$ curl "localhost:5000/analysis?id=c5b0a5d9-2427-438c-bd4a-6e9afe135763"
{"result":[{"centipawn_score":null,"mate_score":-2,"pv":["c2c1","e5e1","c1e1"]}],"status":"done"}
우리가 본 상태는done
이고 우리의 결과는 우리가 예상한 분석이다.마무리
우리는 무거운 동기화 API를 사용하고 비동기적으로 Postgres를 작업 대기열로 사용할 수 있습니다.Google 사용자는 현재 대량의 체스 위치를 제출할 수 있으며, Google 서버도 과부하되지 않습니다.
다음 기사에서는 NextJS를 사용하여 응용 프로그램의 UI를 구성합니다.그때 봐요!
Reference
이 문제에 관하여(Postgres를 사용하여 비동기 API 만들기 - 체스 분석 응용 프로그램 구축(섹션 3)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/propelauth/creating-an-async-api-using-postgres-building-a-chess-analysis-app-part-3-11ke텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)