파이썬과 CouchDB
CouchDB와 Python의 utilizar algunos 구성 요소를 수정하고 더 이상 사용하지 않는 proyectos의 algunos를 확인하고 CouchDB를 조작하는 구성 요소를 제거하기로 결정하고 Python utilizando aiohttp, buscando utilizar código asyncrono를 사용합니다.
CouchDB와 함께 CouchDB에 대한 튜토리얼을 시작하고, cree la base llamada albums y realicé un componente para agregar, modificar, eliminar un documento de la base de datos.
@dataclasses.dataclass
class Album:
uuid: uuid.UUID
title: str
artist: str
year: int = 1900
Después realice una clase DAO para acceder a los datos de la base.
class AlbumDAO(abc.ABC):
@abc.abstractmethod
def insert_album(album: Album) -> str:
pass
@abc.abstractmethod
def delete_album(album: Album) -> bool:
pass
@abc.abstractmethod
def find_album(album: Album) -> Album:
pass
@abc.abstractmethod
def get_all_albums() -> typing.List[Album]:
pass
La implementación para la clase anterior es el siguiente
import album_dao import AlbumDAO
import typing
from album_dao import Album
from utils import get_uuid, get_all
import dataclasses
class CouchdbAlbumDAO(AlbumDAO):
def insert_album(album: Album) -> str:
uuid = await get_uuid()
d = dataclasses.asdict(album)
return uuid
def delete_album(album: Album) -> bool:
pass
def find_album(album: Album) -> Album:
pass
def get_all_albums() -> typing.List[Album]:
return await get_all()
Despues realice una clase utils.py la cual nos permite
import aiohttp
import json
from album_dao import Album
import asyncio
HOST = "127.0.0.1"
PORT = "5984"
USER = "admin"
PWD = "password"
async def get_uuid():
async with aiohttp.ClientSession() as session:
async with session.get(f"http://{HOST}:{PORT}/_uuids") as resp:
j = json.loads(await resp.text())
return j.get("uuids", [])[0]
async def insert(uuid, json_data):
async with aiohttp.ClientSession() as session:
async with session.put(
f"http://{USER}:{PWD}@{HOST}:{PORT}/albums/{uuid}",
json=json_data
) as resp:
j = json.loads(await resp.rext())
return j
async def get_a_document(database, uuid):
async with aiohttp.ClientSession() as session:
async with session.get(
f"http://{USER}:{PWD}@{HOST}:{PORT}/{database}/{uuid}"
) as resp:
return await resp.text()
async def delete(database: str, uuid: str) -> bool:
correct = False
data = json.loads(await get_a_document(database, uuid))
print("delete -> ", data.keys())
rev = data.get("_rev")
print(rev)
async with aiohttp.ClientSession() as session:
async with session.delete(
f"http://{USER}:{PWD}@{HOST}:{PORT}/{database}/{uuid}?rev={rev}",
) as resp:
print(await resp.text())
correct = True
return correct
async def get_all():
async with aiohttp.ClientSession() as session:
l = []
result = []
async with session.get(f"http://{USER}:{PWD}@{HOST}:{PORT}/albums/_all_docs") as resp:
j = json.loads(await resp.text())
l = j.get("rows", [])
for row in l:
async with session.get(
f"http://{USER}:{PWD}@{HOST}:{PORT}/albums/{row.get('id')}"
) as resp:
j = json.loads(await resp.text())
print(j)
result.append(
Album(
uuid=j.get("_id"), title=j.get("title"),
artist=j.get("artist"), year=int(j.get("year", 1980)
)
)
)
return result
async def main():
results = await get_all()
print(results)
#await delete("albums", "df9edb8a6e29871ffc1f05fff9001571")
if __name__ == "__main__":
asyncio.run(main())
Este es el codigo inicial para para para crear un componente reutilizable para acceder a la base de datos CouchDB utilizando Python y con codigo asincrono lo cual es lo que buscaba.
Reference
이 문제에 관하여(파이썬과 CouchDB), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/olvein/python-y-couchdb-3cc8텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)