moto로 SQS 대기열 모의
app.py
기존 큐에 메시지를 쓰는 간단한 함수
QUEUE_URL = os.environ.get('QUEUE_URL', '<default value>')
def write_message(data):
sqs = boto3.client('sqs', region_name = 'us-east-1')
r = sqs.send_message(
MessageBody = json.dumps(data),
QueueUrl = QUEUE_URL
)
conftest.py
aws sqs API를 조롱하는 Pytest 고정 장치. aws_credentials()
또한 pytest 함수가 실제로 aws 리소스에 쓰지 않도록 합니다.
REGION='us-east-'
@pytest.fixture(scope='function')
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_SESSION_TOKEN'] = 'testing'
@pytest.fixture(scope='function')
def sqs_client(aws_credentials):
# setup
with mock_sqs():
yield boto3.client('sqs', region_name=REGION)
# teardown
test_sqs.py
예제 테스트 기능. conftest.py에서 모의 클라이언트를 사용하여 대기열을 만들고(noticesqs_client
매개변수는 conftest 함수 이름sqs_client
과 일치), 파이썬 모듈 함수app.write_message()
를 호출합니다. 반환된 메시지가 보낸 메시지와 일치하는지 확인
def test_write_message(sqs_client):
queue = sqs_client.create_queue(QueueName='test-msg-sender')
queue_url = queue['QueueUrl']
# override function global URL variable
app.QUEUE_URL = queue_url
expected_msg = str({'msg':f'this is a test'})
app.write_message(expected_msg)
sqs_messages = sqs_client.receive_message(QueueUrl=queue_url)
assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg
추가의
내 파일 구조를 보고 싶은 경우:
├── README.md
├── app.py
├── requirements.txt
├── requirements_dev.txt
└── tests
├── __init__.py
├── conftest.py
└── unit
├── __init__.py
└── test_sqs.py
고맙습니다
저를 시작하게 한 Arunkumar Muralidharan에게 찬사를 보냅니다.
Reference
이 문제에 관하여(moto로 SQS 대기열 모의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/drewmullen/mock-a-sqs-queue-with-moto-4ppd
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
QUEUE_URL = os.environ.get('QUEUE_URL', '<default value>')
def write_message(data):
sqs = boto3.client('sqs', region_name = 'us-east-1')
r = sqs.send_message(
MessageBody = json.dumps(data),
QueueUrl = QUEUE_URL
)
aws sqs API를 조롱하는 Pytest 고정 장치.
aws_credentials()
또한 pytest 함수가 실제로 aws 리소스에 쓰지 않도록 합니다.REGION='us-east-'
@pytest.fixture(scope='function')
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_SESSION_TOKEN'] = 'testing'
@pytest.fixture(scope='function')
def sqs_client(aws_credentials):
# setup
with mock_sqs():
yield boto3.client('sqs', region_name=REGION)
# teardown
test_sqs.py
예제 테스트 기능. conftest.py에서 모의 클라이언트를 사용하여 대기열을 만들고(noticesqs_client
매개변수는 conftest 함수 이름sqs_client
과 일치), 파이썬 모듈 함수app.write_message()
를 호출합니다. 반환된 메시지가 보낸 메시지와 일치하는지 확인
def test_write_message(sqs_client):
queue = sqs_client.create_queue(QueueName='test-msg-sender')
queue_url = queue['QueueUrl']
# override function global URL variable
app.QUEUE_URL = queue_url
expected_msg = str({'msg':f'this is a test'})
app.write_message(expected_msg)
sqs_messages = sqs_client.receive_message(QueueUrl=queue_url)
assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg
추가의
내 파일 구조를 보고 싶은 경우:
├── README.md
├── app.py
├── requirements.txt
├── requirements_dev.txt
└── tests
├── __init__.py
├── conftest.py
└── unit
├── __init__.py
└── test_sqs.py
고맙습니다
저를 시작하게 한 Arunkumar Muralidharan에게 찬사를 보냅니다.
Reference
이 문제에 관하여(moto로 SQS 대기열 모의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/drewmullen/mock-a-sqs-queue-with-moto-4ppd
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
def test_write_message(sqs_client):
queue = sqs_client.create_queue(QueueName='test-msg-sender')
queue_url = queue['QueueUrl']
# override function global URL variable
app.QUEUE_URL = queue_url
expected_msg = str({'msg':f'this is a test'})
app.write_message(expected_msg)
sqs_messages = sqs_client.receive_message(QueueUrl=queue_url)
assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg
├── README.md
├── app.py
├── requirements.txt
├── requirements_dev.txt
└── tests
├── __init__.py
├── conftest.py
└── unit
├── __init__.py
└── test_sqs.py
Reference
이 문제에 관하여(moto로 SQS 대기열 모의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/drewmullen/mock-a-sqs-queue-with-moto-4ppd텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)