亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

python使用pika操作rabbitmq總結(一)

系統 1875 0

python 連接操作rabbitMQ 主要是使用pika庫

安裝:

            
              pip install pika==1.0.1

            
          

注意: pika 1.x 與 pika 0.x 有一些不同,使用的時候需要看清版本使用,避免踩坑

Pika是用于Python的RabbitMQ(AMQP 0-9-1)客戶端庫

注: 官方對于pika有如下介紹:

            
              Since threads aren’t appropriate to every situation, it doesn’t require threads. 
Pika core takes care not to forbid them, either. 
The same goes for greenlets, callbacks, continuations, and generators.
 An instance of Pika’s built-in  connection adapters isn’t thread-safe, however.

            
          

線程并不適用于每種場景, 因此并不要求使用線程。 但是pika并不禁用線程, 對于

greenlets, callbacks也一樣。 一個pika建立的連接并不是線程安全的

因此在多線程中共享一個pika連接不是線程安全的, 當然也有一種使用:

            
              with one exception: you may call the connection method add_callback_threadsafe from
 another thread to schedule a callback within an active pika connection.

            
          

使用add_callback_threadsafe方法callback 一個pika連接從另外一個線程中

pika提供建立連接方式:

  1. pika.adapters.asyncio_connection.AsyncioConnection - 用于python 3 AsyncIO的I/O異步模式
  2. pika.BlockingConnection - 同步模式, 簡單易用
  3. pika.SelectConnection - 沒有第三方依賴包的異步模式
  4. pika.adapters.tornado_connection.TornadoConnection - 基于Tornado 的異步IO請求模式
  5. pika.adapters.twisted_connection.TwistedProtocolConnection - 基于Twisted’的異步IO請求模式

例子:

一. 最經典的’hello world’

  1. 生產者:
            
              #!/usr/bin/env python
import pika

auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()

channel.queue_declare(queue='TEST01')

channel.basic_publish(exchange='',
                      routing_key='TEST01',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

            
          
            
               [x] Sent 'Hello World!'

            
          
  1. 消費者:
            
              #!/usr/bin/env python
import pika

auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()

channel.queue_declare(queue='TEST01')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(on_message_callback=callback,
                      queue='TEST01',
                      auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

            
          
            
               [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

            
          

消費者其他例子, 消費10次退出:

            
              #!/usr/bin/env python
import pika

auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()

channel.queue_declare(queue='TEST01')

for method_frame, properties, body in channel.consume('TEST01'):
    #顯示消息部分并確認消息
    print method_frame, properties, body
    channel.basic_ack(method_frame.delivery_tag)
    #在10條消息后退出循環
    if method_frame.delivery_tag == 10:
        break
#取消消費者并返回任何待處理消息
requeued_messages = channel.cancel()
print 'Requeued %i messages' % requeued_messages
connection.close()

            
          
            
              
                
                   Hello World!
.....

                  
                    
                       Hello World!
Requeued 0 messages

                    
                  
                
              
            
          

delivery_tag 從1 到10, 等于10的時候退出并關閉通道

二. 集群連接方式

rabbitmq提供cluster集群功能,對于集群的連接與單節點連接稍微有點不一樣, 如果集群依然只是用單節點的連接方式,則pika只連接到一個節點,但節點宕機, 服務異常等服務不可用時無法實現故障轉移功能。因此需要配置多個節點,使得節點異常時,能夠在另外一個node上重連,并繼續提供服務

為了實現重試功能需設置 connection_attempts retry_delay , 重試發生在在所有給定的節點都連接失敗后

            
              import pika

parameters = (
    pika.ConnectionParameters(host='rabbitmq.node1.com'),
    pika.ConnectionParameters(host='rabbitmq.node2.com',
                              connection_attempts=5, retry_delay=1))
connection = pika.BlockingConnection(parameters)

            
          

對于非阻塞適配器(非BlockingConnection)(如pika.selectconnection和pika.adapter s.asyncio_connection.asynciioconnection),可以通過連接適配器的create_connection()類方法使用多個連接參數實例請求連接。

三. 另外一個線程處理及確認

一個單線程的pika處理程序,當遇到一個需要很長執行時間的請求時,可能造成阻塞,其他使用者將會超時,一種解決方式是,一個線程接收到消息后,將其委托給另外一個線程進行處理, 而該線程繼續處理其他請求, 及非阻塞

因為是多線程非阻塞,那么另外一個消息的處理結果可能無法通過主線程直接處理,解決辦法是可以使用回調來實現

定義一個函數:

            
              def ack_message(channel, delivery_tag):
    """Note that `channel` must be the same Pika channel instance via which
    the message being acknowledged was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        # Channel is already closed, so we can't acknowledge this message;
        # log and/or do something that makes sense for your app in this case.
        pass

            
          

在另外一個線程中,則連接適配器adapter需要加入這個ack_message回調處理函數

pika.BlockingConnection 是使用pika.BlockingConnection.add_callback_threadsafe() 添加的

            
              connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))

            
          

完整的消費端程序為:

            
              #!/usr/bin/env python
# coding=utf-8
import pika
import functools
import threading

auth = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=8000))
channel = connection.channel()

channel.queue_declare(queue='TEST01')


def ack_message(channel, delivery_tag, body):
    """Note that `channel` must be the same Pika channel instance via which
    the message being acknowledged was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        print method_frame, body
        channel.basic_ack(delivery_tag)
    else:
        # Channel is already closed, so we can't acknowledge this message;
        # log and/or do something that makes sense for your app in this case.
        pass

for method_frame, properties, body in channel.consume('TEST01'):
    #顯示消息部分并確認消息
    connection.add_callback_threadsafe(functools.partial(ack_message, channel, method_frame.delivery_tag, body))
    
    if method_frame.delivery_tag == 10:
        break

requeued_messages = channel.cancel()
print 'Requeued %i messages' % requeued_messages
connection.close()

            
          

輸出:

            
              
                 Hello World!
...

                
                   Hello World!
Requeued 0 messages

                
              
            
          

pika.SelectConnection 使用add_callback_threadsafe()方法

pika.adapters.tornado_connection.TornadoConnection 使用add_callback()

pika.adapters.asyncio_connection.AsyncioConnection

使用call_soon_threadsafe()

四. 重連

對于Bunny, Java, .NET, Objective-C, Swift 的rabbitmq客戶端擁有自動重連機制, 但是對于python 客戶端 目前還沒有提供自動重連機制,需要自行實現

  1. while實現:
            
              import pika

while True:
    try:
        connection = pika.BlockingConnection()
        channel = connection.channel()
        channel.basic_consume('test', on_message_callback)
        channel.start_consuming()
    # Don't recover if connection was closed by broker
    except pika.exceptions.ConnectionClosedByBroker:
        break
    # Don't recover on channel errors
    except pika.exceptions.AMQPChannelError:
        break
    # Recover on all other connection errors
    except pika.exceptions.AMQPConnectionError:
        continue

            
          

這種方式簡單,但不夠優雅, 因為異常后,會不停地進行重試。

  1. retry實現
            
              from retry import retry

@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
    connection = pika.BlockingConnection()
    channel = connection.channel()
    channel.basic_consume('test', on_message_callback)

    try:
        channel.start_consuming()
    # Don't recover connections closed by server
    except pika.exceptions.ConnectionClosedByBroker:
        pass

consume()

            
          

五. 擴展功能

擴展支持其他IO框架需要遵循下面方法:

  1. 通過 繼承pika.BaseConnection ,實現其抽象方法并將其構造函數傳遞給 pika.adapters.utils.nbio_interface.AbstractIOService

pika.BaseConnection 實現了 pika.connection.Connection 的抽象方法,包括內部啟動的連接邏輯。可以參考 pika.adapters.asyncio_connection.AsyncioConnection pika.adapters.tornado_connection.TornadoConnection 的實現

  1. 通過 繼承pika.connection.Connection 并實現其抽象方法

更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 色综合久久综合欧美综合图片 | 狠狠婷| 日韩欧美国产偷亚洲清高 | 我要看欧美精品一级毛片 | 特级按摩一级毛片 | 91精品国产高清久久久久久io | 一区二区三区精品视频 | 一区二区三区精品国产 | 国产精品美女免费视频大全 | 国产精品香蕉在线观看不卡 | 国产精品亚洲综合一区在线观看 | 欧美ⅹxxxx18性欧美 | 国产亚洲精品精品国产亚洲综合 | 91欧美亚洲| 久久国产亚洲欧美日韩精品 | 欧美成人高清免费大片观看 | 久久婷婷丁香七月色综合 | 久久这| 天天影视欧美综合在线观看 | 亚洲精品国产精品国自产观看 | 日韩免费一级毛片欧美一级日韩片 | 日韩欧美精品一区二区三区 | 国产精品一区二区四区 | 男人的天堂久久精品激情 | 久久不射影院 | 久久久久精彩视频 | 2020国产成人精品免费视频 | 欧美成人毛片 | 亚洲国产成人久久综合一 | 97av在线| 久久99精品久久 | 99一级毛片| 久久精品*5在热 | 国产黄mmd在线观看免费 | 国产亚洲欧美另类一区二区三区 | 色婷婷国产 | 免费国产视频 | 亚洲国产爱 | 成人性a激情免费视频 | 99亚洲精品高清一二区 | 亚洲久草 |