亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

python Rabbitmq編程(一)

系統(tǒng) 2129 0

python Rabbitmq編程(一)

?

?

實(shí)現(xiàn)最簡單的隊(duì)列通信

python Rabbitmq編程(一)_第1張圖片

?

send端

            
              #
            
            
              !/usr/bin/env python
            
            
              import
            
            
               pika
credentials 
            
            = pika.PlainCredentials(
            
              "用戶名
            
            
              "
            
            ,
            
              "密碼
            
            
              "
            
            
              )
connection 
            
            =
            
               pika.BlockingConnection(pika.ConnectionParameters(
    
            
            
              '
            
            
              localhost
            
            
              '
            
            ,credentials=
            
              credentials))
channel 
            
            = connection.channel() 
            
              #
            
            
              建立了rabbit協(xié)議的通道
            
            
              #
            
            
               聲明queue
            
            
channel.queue_declare(queue=
            
              '
            
            
              hello
            
            
              '
            
            
              )


            
            
              #
            
            
               n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
            
            
channel.basic_publish(exchange=
            
              ''
            
            
              ,
                      routing_key
            
            =
            
              '
            
            
              hello
            
            
              '
            
            
              ,
                      body
            
            =
            
              '
            
            
              Hello World!
            
            
              '
            
            
              )

            
            
              print
            
            (
            
              "
            
            
               [x] Sent 'Hello World!'
            
            
              "
            
            
              )
connection.close()
            
          

?

receive端

            
              #
            
            
               _*_coding:utf-8_*_
            
            
              __author__
            
             = 
            
              '
            
            
              Alex Li
            
            
              '
            
            
              import
            
            
               pika
credentials 
            
            = pika.PlainCredentials(
            
              "用戶名
            
            
              "
            
            ,
            
              "密碼
            
            
              "
            
            
              )
connection 
            
            =
            
               pika.BlockingConnection(pika.ConnectionParameters(
    
            
            
              '
            
            
              localhost
            
            
              '
            
            ,credentials=
            
              credentials))
channel 
            
            = connection.channel() 
            
              #
            
            
              建立了rabbit協(xié)議的通道
            
            
              #
            
            
               You may ask why we declare the queue again ? we have already declared it in our previous code.
            
            
              
#
            
            
               We could avoid that if we were sure that the queue already exists. For example if send.py program
            
            
              
#
            
            
               was run before. But we're not yet sure which program to run first. In such cases it's a good
            
            
              
#
            
            
               practice to repeat declaring the queue in both programs.
            
            
channel.queue_declare(queue=
            
              '
            
            
              hello
            
            
              '
            
            
              )



            
            
              def
            
            
               callback(ch, method, properties, body):
    
            
            
              print
            
            (
            
              "
            
            
               [x] Received %r
            
            
              "
            
             %
            
               body)


            
            
              #
            
            
               callback函數(shù)當(dāng)拿到隊(duì)列里的值,則調(diào)用
            
            
              channel.basic_consume(callback,
                      queue
            
            =
            
              '
            
            
              hello
            
            
              '
            
            
              ,
                      no_ack
            
            =
            
              True)


            
            
              print
            
            (
            
              '
            
            
               [*] Waiting for messages. To exit press CTRL+C
            
            
              '
            
            
              )

channel.start_consuming()
            
          

?

          #注意:
          
            遠(yuǎn)程連接rabbitmq server的話,需要配置權(quán)限。
          
          
#1.設(shè)置用戶與密碼
            
              #
            
            
               > rabbitmqctl add_user name pass
            
            
              
#
            
            
               > rabbitmqctl set_user_tags name administrator
            
          
          #2.設(shè)置權(quán)限,允許從外面訪問
        
            
              #
            
            
               rabbitmqctl set_permissions -p /name ".*" ".*" ".*"
            
          
              set_permissions [-
              
                p vhost] {user} {conf} {write} {read}

vhost
The name of the virtual host to which to grant the user access, defaulting to 
              
              /
              
                .

user
The name of the user to grant access to the specified virtual host.

conf
A regular expression matching resource names 
              
              
                for
              
               which the user 
              
                is
              
              
                 granted configure permissions.

write
A regular expression matching resource names 
              
              
                for
              
               which the user 
              
                is
              
              
                 granted write permissions.

read
A regular expression matching resource names 
              
              
                for
              
               which the user 
              
                is
              
               granted read permissions.
            
set_permissions補(bǔ)充
          #3.生產(chǎn)者與消費(fèi)者添加認(rèn)證信息
        
            credentials = pika.PlainCredentials(
            
              "用戶名
            
            
              "
            
            ,
            
              "密碼
            
            
              "
            
            )
          

?

            #為什么要聲明兩次queue,這里hello為隊(duì)列名
# channel.queue_declare(queue='hello')
# 解決發(fā)起者先啟動,而接收者還沒有啟動,發(fā)送者先創(chuàng)建queue,
# 如果發(fā)起者已經(jīng)聲明了,接收者會檢測有沒有queue,如果有了,實(shí)際接收者是不會執(zhí)行聲明的,沒有就會聲明這個(gè)queue。

          

?

消息公平分發(fā)(循環(huán)調(diào)度)

            在這種模式下,RabbitMQ會默認(rèn)把p發(fā)的消息依次分發(fā)給各個(gè)消費(fèi)者(c)。
          
            輪巡公平的發(fā)送給接收者,比如第一次發(fā)送給第一個(gè)接收者,第二次發(fā)送給第二格接受者,如此。
          

python Rabbitmq編程(一)_第2張圖片

send端

            
              import
            
            
               pika

            
            
              import
            
            
               time

credentials 
            
            = pika.PlainCredentials(
            
              "
            
            
              用戶名
            
            
              "
            
            ,
            
              "
            
            
              密碼
            
            
              "
            
            
              )
connection 
            
            =
            
               pika.BlockingConnection(pika.ConnectionParameters(
    
            
            
              '
            
            
              localhost
            
            
              '
            
            ,credentials=
            
              credentials))
channel 
            
            =
            
               connection.channel()


            
            
              #
            
            
               聲明queue
            
            
channel.queue_declare(queue=
            
              '
            
            
              task_queue
            
            
              '
            
            
              )


            
            
              #
            
            
               n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
            
            
              import
            
            
               sys

message 
            
            = 
            
              '
            
            
              '
            
            .join(sys.argv[1:]) 
            
              or
            
            
              "
            
            
              Hello World! %s
            
            
              "
            
             %
            
               time.time()
channel.basic_publish(exchange
            
            =
            
              ''
            
            
              ,
                      routing_key
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            
              ,
                      body
            
            =
            
              message,
                      properties
            
            =
            
              pika.BasicProperties(
                          delivery_mode
            
            =2,  
            
              #
            
            
               make message persistent
            
            
                                    )
                      )

            
            
              print
            
            (
            
              "
            
            
               [x] Sent %r
            
            
              "
            
             %
            
               message)
connection.close()
            
          

?

receive端

            
              #
            
            
               _*_coding:utf-8_*_
            
            
              import
            
            
               pika, time
credentials 
            
            = pika.PlainCredentials(
            
              "
            
            
              用戶名
            
            
              "
            
            ,
            
              "
            
            
              密碼
            
            
              "
            
            
              )
connection 
            
            =
            
               pika.BlockingConnection(pika.ConnectionParameters(
    
            
            
              '
            
            
              localhost
            
            
              '
            
            ,credentials=
            
              credentials))
channel 
            
            =
            
               connection.channel()



            
            
              def
            
            
               callback(ch, method, properties, body):
    
            
            
              print
            
            (
            
              "
            
            
               [x] Received %r
            
            
              "
            
             %
            
               body)
    time.sleep(
            
            20
            
              )
    
            
            
              print
            
            (
            
              "
            
            
               [x] Done
            
            
              "
            
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              method.delivery_tag
            
            
              "
            
            
              , method.delivery_tag)
    ch.basic_ack(delivery_tag
            
            =
            
              method.delivery_tag)


channel.basic_consume(callback,
                      queue
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            
              ,
                      no_ack
            
            =
            
              True
                      )


            
            
              print
            
            (
            
              '
            
            
               [*] Waiting for messages. To exit press CTRL+C
            
            
              '
            
            
              )
channel.start_consuming()
            
          

?

消息確認(rèn)

執(zhí)行任務(wù)可能需要幾秒鐘。 你可能想知道如果其中一個(gè)消費(fèi)者開始一項(xiàng)長期任務(wù)并且只是部分完成而死亡會發(fā)生什么。 使用我們當(dāng)前的代碼,一旦RabbitMQ向消費(fèi)者傳遞消息,它立即將其標(biāo)記為刪除。 在這種情況下,如果你殺死一個(gè)工人,我們將丟失它剛剛處理的消息。 我們還將丟失分發(fā)給這個(gè)特定工作者但尚未處理的所有消息。

但我們不想失去任何任務(wù)。 如果工人死亡,我們希望將任務(wù)交付給另一名工人。

為了確保消息永不丟失,RabbitMQ支持? 消息 確認(rèn) 消費(fèi)者發(fā)回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。

如果消費(fèi)者死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失)而不發(fā)送確認(rèn),RabbitMQ將理解消息未完全處理并將重新排隊(duì)。 如果同時(shí)有其他在線消費(fèi)者,則會迅速將其重新發(fā)送給其他消費(fèi)者。 這樣你就可以確保沒有消息丟失,即使工人偶爾會死亡。

沒有任何消息超時(shí);? 當(dāng)消費(fèi)者死亡時(shí),RabbitMQ將重新發(fā)送消息。 即使處理消息需要非常長的時(shí)間,也沒關(guān)系。

默認(rèn)情況下, 手動消息確認(rèn) 已打開。 在前面的示例中,我們通過 auto_ack = True ?標(biāo)志 明確地將它們關(guān)閉 在我們完成任務(wù)后,是時(shí)候刪除此標(biāo)志并從工作人員發(fā)送適當(dāng)?shù)拇_認(rèn)。

            
              def
            
            
               callback(ch, method, properties, body):
    
            
            
              print
            
            
              "
            
            
               [x] Received %r
            
            
              "
            
             %
            
               (body,)
    time.sleep( body.count(
            
            
              '
            
            
              .
            
            
              '
            
            
              ) )
    
            
            
              print
            
            
              "
            
            
               [x] Done
            
            
              "
            
            
              
    ch.basic_ack(delivery_tag 
            
            =
            
               method.delivery_tag)
 
channel.basic_consume(callback,
                      queue
            
            =
            
              '
            
            
              hello
            
            
              '
            
            )
          

  Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered

?

?

消息持久化  

我們已經(jīng)學(xué)會了如何確保即使消費(fèi)者死亡,任務(wù)也不會丟失。 但是如果RabbitMQ服務(wù)器停止,我們的任務(wù)仍然會丟失。

當(dāng)RabbitMQ退出或崩潰時(shí),它將忘記隊(duì)列和消息,除非你告訴它不要。 確保消息不會丟失需要做兩件事:我們需要將隊(duì)列和消息都標(biāo)記為持久。

首先,我們需要確保RabbitMQ永遠(yuǎn)不會丟失我們的隊(duì)列。 為此,我們需要聲明它是 持久的

            channel.queue_declare(queue=
            
              '
            
            
              hello
            
            
              '
            
            , durable=True)
          

雖然此命令本身是正確的,但它在我們的設(shè)置中不起作用。 那是因?yàn)槲覀円呀?jīng)定義了一個(gè)名為 hello 的隊(duì)列 ?, 這個(gè)隊(duì)列 不耐用。 RabbitMQ不允許您使用不同的參數(shù)重新定義現(xiàn)有隊(duì)列,并將向嘗試執(zhí)行此操作的任何程序返回錯(cuò)誤。 但是有一個(gè)快速的解決方法 - 讓我們聲明一個(gè)具有不同名稱的隊(duì)列,例如 task_queue

            channel.queue_declare(queue=
            
              '
            
            
              task_queue
            
            
              '
            
            , durable=True)
          

queue_declare 更改需要應(yīng)用于生產(chǎn)者和消費(fèi)者代碼。

此時(shí)我們確信 即使RabbitMQ重新啟動 task_queue 隊(duì)列也不會丟失。 現(xiàn)在我們需要將消息標(biāo)記為持久性 - 通過提供 值為 2 delivery_mode 屬性

            channel.basic_publish(exchange=
            
              ''
            
            
              ,
                      routing_key
            
            =
            
              "
            
            
              task_queue
            
            
              "
            
            
              ,
                      body
            
            =
            
              message,
                      properties
            
            =
            
              pika.BasicProperties(
                         delivery_mode 
            
            = 2, 
            
              #
            
            
               make message persistent
            
            
                      ))
          

?

?

負(fù)載均衡

            如果Rabbit只管按順序把消息發(fā)到各個(gè)消費(fèi)者身上,不考慮消費(fèi)者負(fù)載的話,很可能出現(xiàn),一個(gè)機(jī)器配置不高的消費(fèi)者那里堆積了很多消息處理不完,同時(shí)配置高的消費(fèi)者卻一直很輕松。為解決此問題,可以在各個(gè)消費(fèi)者端,
            
配置perfetch_count=1,意思就是告訴RabbitMQ在我這個(gè)消費(fèi)者當(dāng)前消息還沒處理完的時(shí)候就不要再給我發(fā)新消息了。

python Rabbitmq編程(一)_第3張圖片

send端

            
              #
            
            
              !/usr/bin/env python
            
            
              import
            
            
               pika

            
            
              import
            
            
               sys

connection 
            
            =
            
               pika.BlockingConnection(
    pika.ConnectionParameters(host
            
            =
            
              '
            
            
              localhost
            
            
              '
            
            
              ))
channel 
            
            =
            
               connection.channel()

channel.queue_declare(queue
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            , durable=
            
              True)

message 
            
            = 
            
              '
            
            
              '
            
            .join(sys.argv[1:]) 
            
              or
            
            
              "
            
            
              Hello World!
            
            
              "
            
            
              
channel.basic_publish(
    exchange
            
            =
            
              ''
            
            
              ,
    routing_key
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            
              ,
    body
            
            =
            
              message,
    properties
            
            =
            
              pika.BasicProperties(
        delivery_mode
            
            =2,  
            
              #
            
            
               make message persistent
            
            
                  ))

            
            
              print
            
            (
            
              "
            
            
               [x] Sent %r
            
            
              "
            
             %
            
               message)
connection.close()
            
          

?

receive端

            
              #
            
            
              !/usr/bin/env python
            
            
              import
            
            
               pika

            
            
              import
            
            
               time

connection 
            
            =
            
               pika.BlockingConnection(
    pika.ConnectionParameters(host
            
            =
            
              '
            
            
              localhost
            
            
              '
            
            
              ))
channel 
            
            =
            
               connection.channel()

channel.queue_declare(queue
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            , durable=
            
              True)

            
            
              print
            
            (
            
              '
            
            
               [*] Waiting for messages. To exit press CTRL+C
            
            
              '
            
            
              )



            
            
              def
            
            
               callback(ch, method, properties, body):
    
            
            
              print
            
            (
            
              "
            
            
               [x] Received %r
            
            
              "
            
             %
            
               body)
    time.sleep(body.count(b
            
            
              '
            
            
              .
            
            
              '
            
            
              ))
    
            
            
              print
            
            (
            
              "
            
            
               [x] Done
            
            
              "
            
            
              )
    ch.basic_ack(delivery_tag
            
            =
            
              method.delivery_tag)


channel.basic_qos(prefetch_count
            
            =1
            
              )
channel.basic_consume(queue
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            , on_message_callback=
            
              callback)

channel.start_consuming()
            
          

?


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 一本一本久久a久久综合精品蜜桃 | 国产中文字幕在线免费观看 | 亚洲啪视频 | 欧美毛片一级的免费的 | 182午夜在线观看 | 老王午夜69精品影院 | 亚洲精品视频在线看 | 在线播放91 | 欧洲成人爽视频在线观看 | 欧美日韩亚毛片免费观看 | 久久乐国产综合亚洲精品 | 99热这里只有精品3 99热这里只有精品4 | 伊人色综合久久天天网蜜月 | 久久精品国产一区二区 | 中文字幕中文字幕中中文 | 国产a级一级久久毛片 | 久久亚洲国产成人亚 | 亚洲二区在线视频 | 久久99精品国产一区二区三区 | 黄色成人毛片 | 香蕉青草久久成人网 | aaa一级最新毛片 | 午夜视频福利在线 | 在线久综合色手机在线播放 | 欧美激情高清免费不卡 | 欧美中文字幕一区 | 欧美成人国产 | 伊人这里只有精品 | 青青国产成人久久91网站站 | 99 久久99久久精品免观看 | 国产未成女年一区二区 | 噜噜狠狠| 香蕉大黄香蕉在线观看 | 曰韩毛片| 国产一级做a爱免费视频 | 国产成人a毛片在线 | 在线国产中文字幕 | 亚洲精品久久久久久久网站 | 一区二区三区在线播放视频 | 亚洲精品在线播放视频 | 变态 调教 视频 国产九色 |