python Rabbitmq編程(一)
?
?
實(shí)現(xiàn)最簡單的隊(duì)列通信
?
send端
# !/usr/bin/env python import pika credentials = pika.PlainCredentials( "用戶名 " , "密碼 " ) connection = pika.BlockingConnection(pika.ConnectionParameters( ' localhost ' ,credentials= credentials)) channel = connection.channel() # 建立了rabbit協(xié)議的通道 # 聲明queue channel.queue_declare(queue= ' hello ' ) # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange= '' , routing_key = ' hello ' , body = ' Hello World! ' ) print ( " [x] Sent 'Hello World!' " ) connection.close()
?
receive端
# _*_coding:utf-8_*_ __author__ = ' Alex Li ' import pika credentials = pika.PlainCredentials( "用戶名 " , "密碼 " ) connection = pika.BlockingConnection(pika.ConnectionParameters( ' localhost ' ,credentials= credentials)) channel = connection.channel() # 建立了rabbit協(xié)議的通道 # You may ask why we declare the queue again ? we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue= ' hello ' ) def callback(ch, method, properties, body): print ( " [x] Received %r " % body) # callback函數(shù)當(dāng)拿到隊(duì)列里的值,則調(diào)用 channel.basic_consume(callback, queue = ' hello ' , no_ack = True) print ( ' [*] Waiting for messages. To exit press CTRL+C ' ) channel.start_consuming()
?
#注意: 遠(yuǎn)程連接rabbitmq server的話,需要配置權(quán)限。
#1.設(shè)置用戶與密碼
# > rabbitmqctl add_user name pass # > rabbitmqctl set_user_tags name administrator
#2.設(shè)置權(quán)限,允許從外面訪問
# rabbitmqctl set_permissions -p /name ".*" ".*" ".*"

set_permissions [- p vhost] {user} {conf} {write} {read} vhost The name of the virtual host to which to grant the user access, defaulting to / . user The name of the user to grant access to the specified virtual host. conf A regular expression matching resource names for which the user is granted configure permissions. write A regular expression matching resource names for which the user is granted write permissions. read A regular expression matching resource names for which the user is granted read permissions.
#3.生產(chǎn)者與消費(fèi)者添加認(rèn)證信息
credentials = pika.PlainCredentials( "用戶名 " , "密碼 " )
?
#為什么要聲明兩次queue,這里hello為隊(duì)列名 # channel.queue_declare(queue='hello') # 解決發(fā)起者先啟動,而接收者還沒有啟動,發(fā)送者先創(chuàng)建queue, # 如果發(fā)起者已經(jīng)聲明了,接收者會檢測有沒有queue,如果有了,實(shí)際接收者是不會執(zhí)行聲明的,沒有就會聲明這個(gè)queue。
?
消息公平分發(fā)(循環(huán)調(diào)度)
在這種模式下,RabbitMQ會默認(rèn)把p發(fā)的消息依次分發(fā)給各個(gè)消費(fèi)者(c)。
輪巡公平的發(fā)送給接收者,比如第一次發(fā)送給第一個(gè)接收者,第二次發(fā)送給第二格接受者,如此。
send端
import pika import time credentials = pika.PlainCredentials( " 用戶名 " , " 密碼 " ) connection = pika.BlockingConnection(pika.ConnectionParameters( ' localhost ' ,credentials= credentials)) channel = connection.channel() # 聲明queue channel.queue_declare(queue= ' task_queue ' ) # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. import sys message = ' ' .join(sys.argv[1:]) or " Hello World! %s " % time.time() channel.basic_publish(exchange = '' , routing_key = ' task_queue ' , body = message, properties = pika.BasicProperties( delivery_mode =2, # make message persistent ) ) print ( " [x] Sent %r " % message) connection.close()
?
receive端
# _*_coding:utf-8_*_ import pika, time credentials = pika.PlainCredentials( " 用戶名 " , " 密碼 " ) connection = pika.BlockingConnection(pika.ConnectionParameters( ' localhost ' ,credentials= credentials)) channel = connection.channel() def callback(ch, method, properties, body): print ( " [x] Received %r " % body) time.sleep( 20 ) print ( " [x] Done " ) print ( " method.delivery_tag " , method.delivery_tag) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue = ' task_queue ' , no_ack = True ) print ( ' [*] Waiting for messages. To exit press CTRL+C ' ) channel.start_consuming()
?
消息確認(rèn)
執(zhí)行任務(wù)可能需要幾秒鐘。 你可能想知道如果其中一個(gè)消費(fèi)者開始一項(xiàng)長期任務(wù)并且只是部分完成而死亡會發(fā)生什么。 使用我們當(dāng)前的代碼,一旦RabbitMQ向消費(fèi)者傳遞消息,它立即將其標(biāo)記為刪除。 在這種情況下,如果你殺死一個(gè)工人,我們將丟失它剛剛處理的消息。 我們還將丟失分發(fā)給這個(gè)特定工作者但尚未處理的所有消息。
但我們不想失去任何任務(wù)。 如果工人死亡,我們希望將任務(wù)交付給另一名工人。
為了確保消息永不丟失,RabbitMQ支持? 消息 確認(rèn) 。 消費(fèi)者發(fā)回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。
如果消費(fèi)者死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失)而不發(fā)送確認(rèn),RabbitMQ將理解消息未完全處理并將重新排隊(duì)。 如果同時(shí)有其他在線消費(fèi)者,則會迅速將其重新發(fā)送給其他消費(fèi)者。 這樣你就可以確保沒有消息丟失,即使工人偶爾會死亡。
沒有任何消息超時(shí);? 當(dāng)消費(fèi)者死亡時(shí),RabbitMQ將重新發(fā)送消息。 即使處理消息需要非常長的時(shí)間,也沒關(guān)系。
默認(rèn)情況下, 手動消息確認(rèn) 已打開。 在前面的示例中,我們通過 auto_ack = True ?標(biāo)志 明確地將它們關(guān)閉 。 在我們完成任務(wù)后,是時(shí)候刪除此標(biāo)志并從工作人員發(fā)送適當(dāng)?shù)拇_認(rèn)。
def callback(ch, method, properties, body): print " [x] Received %r " % (body,) time.sleep( body.count( ' . ' ) ) print " [x] Done " ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue = ' hello ' )
Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered
?
?
消息持久化
我們已經(jīng)學(xué)會了如何確保即使消費(fèi)者死亡,任務(wù)也不會丟失。 但是如果RabbitMQ服務(wù)器停止,我們的任務(wù)仍然會丟失。
當(dāng)RabbitMQ退出或崩潰時(shí),它將忘記隊(duì)列和消息,除非你告訴它不要。 確保消息不會丟失需要做兩件事:我們需要將隊(duì)列和消息都標(biāo)記為持久。
首先,我們需要確保RabbitMQ永遠(yuǎn)不會丟失我們的隊(duì)列。 為此,我們需要聲明它是 持久的 :
channel.queue_declare(queue= ' hello ' , durable=True)
雖然此命令本身是正確的,但它在我們的設(shè)置中不起作用。 那是因?yàn)槲覀円呀?jīng)定義了一個(gè)名為 hello 的隊(duì)列 ?, 這個(gè)隊(duì)列 不耐用。 RabbitMQ不允許您使用不同的參數(shù)重新定義現(xiàn)有隊(duì)列,并將向嘗試執(zhí)行此操作的任何程序返回錯(cuò)誤。 但是有一個(gè)快速的解決方法 - 讓我們聲明一個(gè)具有不同名稱的隊(duì)列,例如 task_queue :
channel.queue_declare(queue= ' task_queue ' , durable=True)
此 queue_declare 更改需要應(yīng)用于生產(chǎn)者和消費(fèi)者代碼。
此時(shí)我們確信 即使RabbitMQ重新啟動 , task_queue 隊(duì)列也不會丟失。 現(xiàn)在我們需要將消息標(biāo)記為持久性 - 通過提供 值為 2 的 delivery_mode 屬性 。
channel.basic_publish(exchange= '' , routing_key = " task_queue " , body = message, properties = pika.BasicProperties( delivery_mode = 2, # make message persistent ))
?
?
負(fù)載均衡
如果Rabbit只管按順序把消息發(fā)到各個(gè)消費(fèi)者身上,不考慮消費(fèi)者負(fù)載的話,很可能出現(xiàn),一個(gè)機(jī)器配置不高的消費(fèi)者那里堆積了很多消息處理不完,同時(shí)配置高的消費(fèi)者卻一直很輕松。為解決此問題,可以在各個(gè)消費(fèi)者端,
配置perfetch_count=1,意思就是告訴RabbitMQ在我這個(gè)消費(fèi)者當(dāng)前消息還沒處理完的時(shí)候就不要再給我發(fā)新消息了。
send端
# !/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host = ' localhost ' )) channel = connection.channel() channel.queue_declare(queue = ' task_queue ' , durable= True) message = ' ' .join(sys.argv[1:]) or " Hello World! " channel.basic_publish( exchange = '' , routing_key = ' task_queue ' , body = message, properties = pika.BasicProperties( delivery_mode =2, # make message persistent )) print ( " [x] Sent %r " % message) connection.close()
?
receive端
# !/usr/bin/env python import pika import time connection = pika.BlockingConnection( pika.ConnectionParameters(host = ' localhost ' )) channel = connection.channel() channel.queue_declare(queue = ' task_queue ' , durable= True) print ( ' [*] Waiting for messages. To exit press CTRL+C ' ) def callback(ch, method, properties, body): print ( " [x] Received %r " % body) time.sleep(body.count(b ' . ' )) print ( " [x] Done " ) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count =1 ) channel.basic_consume(queue = ' task_queue ' , on_message_callback= callback) channel.start_consuming()
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
