python 連接操作rabbitMQ 主要是使用pika庫
安裝:
pip install pika==1.0.1
注意: pika 1.x 與 pika 0.x 有一些不同,使用的時候需要看清版本使用,避免踩坑
Pika是用于Python的RabbitMQ(AMQP 0-9-1)客戶端庫
注: 官方對于pika有如下介紹:
Since threads aren’t appropriate to every situation, it doesn’t require threads.
Pika core takes care not to forbid them, either.
The same goes for greenlets, callbacks, continuations, and generators.
An instance of Pika’s built-in connection adapters isn’t thread-safe, however.
線程并不適用于每種場景, 因此并不要求使用線程。 但是pika并不禁用線程, 對于
greenlets, callbacks也一樣。 一個pika建立的連接并不是線程安全的
因此在多線程中共享一個pika連接不是線程安全的, 當然也有一種使用:
with one exception: you may call the connection method add_callback_threadsafe from
another thread to schedule a callback within an active pika connection.
使用add_callback_threadsafe方法callback 一個pika連接從另外一個線程中
pika提供建立連接方式:
- pika.adapters.asyncio_connection.AsyncioConnection - 用于python 3 AsyncIO的I/O異步模式
- pika.BlockingConnection - 同步模式, 簡單易用
- pika.SelectConnection - 沒有第三方依賴包的異步模式
- pika.adapters.tornado_connection.TornadoConnection - 基于Tornado 的異步IO請求模式
- pika.adapters.twisted_connection.TwistedProtocolConnection - 基于Twisted’的異步IO請求模式
例子:
一. 最經典的’hello world’
- 生產者:
#!/usr/bin/env python
import pika
auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
channel.basic_publish(exchange='',
routing_key='TEST01',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
[x] Sent 'Hello World!'
- 消費者:
#!/usr/bin/env python
import pika
auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(on_message_callback=callback,
queue='TEST01',
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
消費者其他例子, 消費10次退出:
#!/usr/bin/env python
import pika
auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
for method_frame, properties, body in channel.consume('TEST01'):
#顯示消息部分并確認消息
print method_frame, properties, body
channel.basic_ack(method_frame.delivery_tag)
#在10條消息后退出循環
if method_frame.delivery_tag == 10:
break
#取消消費者并返回任何待處理消息
requeued_messages = channel.cancel()
print 'Requeued %i messages' % requeued_messages
connection.close()
Hello World!
.....
Hello World!
Requeued 0 messages
delivery_tag 從1 到10, 等于10的時候退出并關閉通道
二. 集群連接方式
rabbitmq提供cluster集群功能,對于集群的連接與單節點連接稍微有點不一樣, 如果集群依然只是用單節點的連接方式,則pika只連接到一個節點,但節點宕機, 服務異常等服務不可用時無法實現故障轉移功能。因此需要配置多個節點,使得節點異常時,能夠在另外一個node上重連,并繼續提供服務
為了實現重試功能需設置
connection_attempts
和
retry_delay
, 重試發生在在所有給定的節點都連接失敗后
import pika
parameters = (
pika.ConnectionParameters(host='rabbitmq.node1.com'),
pika.ConnectionParameters(host='rabbitmq.node2.com',
connection_attempts=5, retry_delay=1))
connection = pika.BlockingConnection(parameters)
對于非阻塞適配器(非BlockingConnection)(如pika.selectconnection和pika.adapter s.asyncio_connection.asynciioconnection),可以通過連接適配器的create_connection()類方法使用多個連接參數實例請求連接。
三. 另外一個線程處理及確認
一個單線程的pika處理程序,當遇到一個需要很長執行時間的請求時,可能造成阻塞,其他使用者將會超時,一種解決方式是,一個線程接收到消息后,將其委托給另外一個線程進行處理, 而該線程繼續處理其他請求, 及非阻塞
因為是多線程非阻塞,那么另外一個消息的處理結果可能無法通過主線程直接處理,解決辦法是可以使用回調來實現
定義一個函數:
def ack_message(channel, delivery_tag):
"""Note that `channel` must be the same Pika channel instance via which
the message being acknowledged was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't acknowledge this message;
# log and/or do something that makes sense for your app in this case.
pass
在另外一個線程中,則連接適配器adapter需要加入這個ack_message回調處理函數
pika.BlockingConnection 是使用pika.BlockingConnection.add_callback_threadsafe() 添加的
connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
完整的消費端程序為:
#!/usr/bin/env python
# coding=utf-8
import pika
import functools
import threading
auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()
channel.queue_declare(queue='TEST01')
def ack_message(channel, delivery_tag, body):
"""Note that `channel` must be the same Pika channel instance via which
the message being acknowledged was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
print method_frame, body
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't acknowledge this message;
# log and/or do something that makes sense for your app in this case.
pass
for method_frame, properties, body in channel.consume('TEST01'):
#顯示消息部分并確認消息
connection.add_callback_threadsafe(functools.partial(ack_message, channel, method_frame.delivery_tag, body))
if method_frame.delivery_tag == 10:
break
requeued_messages = channel.cancel()
print 'Requeued %i messages' % requeued_messages
connection.close()
輸出:
Hello World!
...
Hello World!
Requeued 0 messages
pika.SelectConnection 使用add_callback_threadsafe()方法
pika.adapters.tornado_connection.TornadoConnection 使用add_callback()
pika.adapters.asyncio_connection.AsyncioConnection
使用call_soon_threadsafe()
四. 重連
對于Bunny, Java, .NET, Objective-C, Swift 的rabbitmq客戶端擁有自動重連機制, 但是對于python 客戶端 目前還沒有提供自動重連機制,需要自行實現
- while實現:
import pika
while True:
try:
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
channel.start_consuming()
# Don't recover if connection was closed by broker
except pika.exceptions.ConnectionClosedByBroker:
break
# Don't recover on channel errors
except pika.exceptions.AMQPChannelError:
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
continue
這種方式簡單,但不夠優雅, 因為異常后,會不停地進行重試。
- retry實現
from retry import retry
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
try:
channel.start_consuming()
# Don't recover connections closed by server
except pika.exceptions.ConnectionClosedByBroker:
pass
consume()
五. 擴展功能
擴展支持其他IO框架需要遵循下面方法:
-
通過
繼承pika.BaseConnection
,實現其抽象方法并將其構造函數傳遞給pika.adapters.utils.nbio_interface.AbstractIOService
pika.BaseConnection
實現了
pika.connection.Connection
的抽象方法,包括內部啟動的連接邏輯。可以參考
pika.adapters.asyncio_connection.AsyncioConnection
和
pika.adapters.tornado_connection.TornadoConnection
的實現
-
通過
繼承pika.connection.Connection
并實現其抽象方法
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
