python RabbitMQ 메시지 큐 구현 예제 코드
base.py:
import pika
# , 、 。
credentials = pika.PlainCredentials("admin", "admin")
# BlockingConnection():
# ConnectionParameters():
connection = pika.BlockingConnection(pika.ConnectionParameters(
"192.168.0.102", 5672, "/", credentials))
# channel( )
channel = connection.channel()
fanout 모드:지정 한 exchange 에 연 결 된 quue 에 메 시 지 를 보 냅 니 다.소비 자 는 quue 에서 데 이 터 를 꺼 내 방송 모드,구독 모드 와 유사 합 니 다.귀속 방식:수신 단 채널.queuebind(exchange="logs", queue=queue_name)
코드:
publisher.py:
from base import channel, connection
# exchange, queue
channel.exchange_declare(exchange="logs", exchange_type="fanout") #
message = "hello fanout"
channel.basic_publish(
exchange="logs",
routing_key="",
body=message
)
connection.close()
consumer.py:
from base import channel, connection
# exchange
channel.exchange_declare(exchange="logs", exchange_type="fanout")
# queue , rabbitmq , queue
result = channel.queue_declare(exclusive=True)
# queue
queue_name = result.method.queue
# exchange queue
channel.queue_bind(exchange="logs", queue=queue_name)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
direct 모드:송신 단 에 routing 을 연결 합 니 다.key 1,queue 에 몇 개의 routing 바 인 딩key 2,key 1 이 key 2 와 같 거나 key 1 이 key 2 에 있 으 면 메 시 지 는 이 quue 에 보 내 고 해당 소비자 가 quue 에서 데 이 터 를 찾 습 니 다.publisher.py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
message = "hello"
channel.basic_publish(
exchange="direct_test",
routing_key="info", # key
body=message
)
connection.close()
consumer01.py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="direct_test",
queue=queue_name,
# key, publisher
routing_key="info"
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
consumer02.py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="direct_test",
queue=queue_name,
# key
routing_key="error"
)
def callback(ch, method, properties, bosy):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
consumer03.py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
key_list = ["info", "warning"]
for key in key_list:
channel.queue_bind(
exchange="direct_test",
queue=queue_name,
# queue key, key
routing_key=key
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
실행:
python producer.py
python consumer01.py
python consumer02.py
python consumer03.py
결과:consumer01.py: body:b'hello'
consumer 02.py 결과 못 받았어요.
consumer03.py: body:b'hello'
topic 모드 는 이해 하기 가 쉽 지 않 습 니 다.제 이 해 는 다음 과 같 습 니 다.
송신 단 에 연 결 된 routingkey 1,queue 는 몇 개의 routing 을 연결 합 니 다.key2;약 routingkey 1 임의의 routing 만족key 2,이 메 시 지 는 exchange 를 통 해 이 quue 에 보 내 고 수신 단 에서 quue 에서 꺼 내 는 것 이 direct 모드 의 확장 입 니 다.
귀속 방식:
발송 단 귀속:
channel.basic_publish(
exchange="topic_logs",
routing_key=routing_key,
body=message
)
수신 단 귀속:
channel.queue_bind(
exchange="topic_logs",
queue=queue_name,
routing_key=binding_key
)
publisher.py:
import sys
from base import channel, connection
# exchange
channel.exchange_declare(exchange="topic_test", exchange_type="topic")
#
message = " ".join(sys.argv[1:]) or "hello topic"
#
channel.basic_publish(
exchange="topic_test",
routing_key="mysql.error", # routing_key
body=message
)
connection.close()
consumer01.py:
from base import channel, connection
channel.exchange_declare(exchange="topic_test", exchange_type="topic")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="topic_test",
queue=queue_name,
routing_key="*.error" # routing_key
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name,
no_ack=True
)
channel.start_consuming()
consumer02.py:
from base import channel, connection
channel.exchange_declare(exchange="topic_test", exchange_type="topic")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="topic_test",
queue=queue_name,
routing_key="mysql.*" # routing_key
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name,
no_ack=True
)
channel.start_consuming()
실행:
python publisher02.py "this is a topic test"
python consumer01.py
python consumer02.py
결과:consumer 01.py 결과:body:b'this is a topic test'
consumer 02.py 결과:body:b'this is a topic test'
해당 routing 바 인 딩 을 통 해키,두 소비자 모두 소식 을 들 었 습 니 다.
publisher.py 의 routingkey"mysql.info"로 변경
다음 실행:
python publisher02.py "this is a topic test"
python consumer01.py
python consumer02.py
결과:consumer 01.py 결과 못 받았어요.
consumer 02.py 결과:body:b'this is a topic test'
이 예 를 통 해 우 리 는 topic 의 운행 방식 을 알 수 있다.
참고:https://www.jb51.net/article/150386.htm
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
로마 숫자를 정수로 또는 그 반대로 변환그 중 하나는 로마 숫자를 정수로 변환하는 함수를 만드는 것이었고 두 번째는 그 반대를 수행하는 함수를 만드는 것이었습니다. 문자만 포함합니다'I', 'V', 'X', 'L', 'C', 'D', 'M' ; 문자열이 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.