moto로 SQS 대기열 모의

6999 단어 pythonawsmotoboto3
다음은 aws sqs api를 모의하는 간단한 예제 코드입니다. 이것은 개발에 유용할 수 있으므로 준비가 될 때까지 실제 SQS 대기열/연결/IAM 권한을 실제로 관리할 필요가 없습니다.

app.py



기존 큐에 메시지를 쓰는 간단한 함수

QUEUE_URL = os.environ.get('QUEUE_URL', '<default value>')
def write_message(data):
    sqs = boto3.client('sqs', region_name = 'us-east-1')
    r = sqs.send_message(
        MessageBody = json.dumps(data),
        QueueUrl = QUEUE_URL
    )


conftest.py



aws sqs API를 조롱하는 Pytest 고정 장치. aws_credentials() 또한 pytest 함수가 실제로 aws 리소스에 쓰지 않도록 합니다.

REGION='us-east-'
@pytest.fixture(scope='function')
def aws_credentials():
    """Mocked AWS Credentials for moto."""
    os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
    os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
    os.environ['AWS_SECURITY_TOKEN'] = 'testing'
    os.environ['AWS_SESSION_TOKEN'] = 'testing'


@pytest.fixture(scope='function')
def sqs_client(aws_credentials):
    # setup
    with mock_sqs():
        yield boto3.client('sqs', region_name=REGION)
    # teardown


test_sqs.py



예제 테스트 기능. conftest.py에서 모의 ​​클라이언트를 사용하여 대기열을 만들고(noticesqs_client 매개변수는 conftest 함수 이름sqs_client과 일치), 파이썬 모듈 함수app.write_message()를 호출합니다. 반환된 메시지가 보낸 메시지와 일치하는지 확인

def test_write_message(sqs_client):
    queue = sqs_client.create_queue(QueueName='test-msg-sender')
    queue_url = queue['QueueUrl']
    # override function global URL variable
    app.QUEUE_URL = queue_url
    expected_msg = str({'msg':f'this is a test'})
    app.write_message(expected_msg)
    sqs_messages = sqs_client.receive_message(QueueUrl=queue_url)

    assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg


추가의



내 파일 구조를 보고 싶은 경우:

├── README.md
├── app.py
├── requirements.txt
├── requirements_dev.txt
└── tests
    ├── __init__.py
    ├── conftest.py
    └── unit
        ├── __init__.py
        └── test_sqs.py


고맙습니다



저를 시작하게 한 Arunkumar Muralidharan에게 찬사를 보냅니다.

좋은 웹페이지 즐겨찾기