亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Python 操作 Rabbit MQ 路由 (六)

系統 1689 0

Python 操作 Rabbit MQ 路由 (六)

一、路由(Routing):

本章打算新增加一個功能,使它可以達到僅訂閱消息的一個子集。

舉個栗子,我們需要把驗證的錯誤日志信息寫入日志文件(存儲到磁盤),但同時仍然把所有的日志信息輸出到控制臺中。

二、綁定(Bindings):

綁定(Binding)是指交換機(Exchange)和隊列(Queue)的關系 ;

綁定的時候可以帶上一個額外的routing_key參數。為了避免與basic_publish的參數混淆,我們把它叫做 綁定鍵(Binding Key) 。

            
              channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              exchange_name
              
                ,
              
              
                   queue
              
                =
              
              queue_name
              
                ,
              
              
                   routing_key
              
                =
              
              
                'fe_cow'
              
              
                )
              
            
          

注意

  • 綁定鍵的意義取決于交換機的類型,上一篇使用的 扇形交換機會忽略這個值 。

三、直連交換機(Direct Exchange):

打算擴展上一篇的功能,使其基于日志的嚴重程度進行消息過濾。比如把一些比較嚴重的錯誤日志寫入磁盤,以免將警告或信息日志上浪費磁盤空間,僅記錄比較嚴重的錯誤。

上一篇我們使用的扇形交換機,沒有足夠的靈活性,僅能做廣播的需求;

直連交換機的路由算法很簡單易懂, 交換機將會對綁定鍵和路由鍵進行精準匹配,從而確定消息該分發到哪個隊列 ;

圖解

Python 操作 Rabbit MQ 路由 (六)_第1張圖片

詳解 :可以看到type=direct,指定交換機的類型為直連交換機,它和兩個隊列都進行了綁定。第一個隊列(Q1)使用orange作為綁定鍵,第二個隊列(Q2)有兩個綁定鍵,一個是black作為綁定鍵,另一個是green綁定鍵。

簡單理解 :當路由鍵為orange的消息發布到交換機,就會被路由到隊列Q1中,路由鍵為black或green的消息發布到交換機,就會被路由到隊列Q2中,其他的所有消息都將被丟棄。

四、多個綁定:

圖解

Python 操作 Rabbit MQ 路由 (六)_第2張圖片

詳解 :多個隊列使用相同的綁定鍵是合法的。這樣一來,直連交換機就和扇形交換機的行為一樣,會將消息廣播到所有匹配的隊列(Q1和Q2)。

五、發送日志:

將發送消息到一個直連的交換機,把日志級別作為路由鍵,這樣接收日志的腳本可根據嚴重的級別來選擇它想要處理的日志,我們假設 severity的值是info、warning、error中的一個

創建直連交換機:

            
              # 交換機名為
              
                :
              
              direct_logs  類型為
              
                :
              
              直連交換機
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               type
              
                =
              
              
                'direct'
              
              
                )
              
            
          

發送消息:

            
              # 交換機名稱為
              
                :
              
              direct_logs
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               routing_key
              
                =
              
              severity
              
                ,
              
               body
              
                =
              
              message
              
                )
              
            
          

六、訂閱:

處理接收消息的方式跟之前不一樣,將會為每個嚴重級別分別創建一個新的綁定。

            
              # 表示與消費者斷開連接
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成隊列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue


              
                for
              
               severity 
              
                in
              
               severities
              
                :
              
              
    channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
              
                       queue
              
                =
              
              queue_name
              
                ,
              
              
                       routing_key
              
                =
              
              severity
              
                )
              
            
          

七、整理本節代碼:

圖解
Python 操作 Rabbit MQ 路由 (六)_第3張圖片
1.以下是 send.py 代碼:

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

severity 
              
                =
              
               sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                ]
              
              
                if
              
              
                len
              
              
                (
              
              sys
              
                .
              
              argv
              
                )
              
              
                >
              
              
                1
              
              
                else
              
              
                'info'
              
              
message 
              
                =
              
              
                ' '
              
              
                .
              
              
                join
              
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                2
              
              
                :
              
              
                ]
              
              
                )
              
               or 
              
                "Hello World!"
              
              

# 創建一個實例  本地訪問
              
                IP
              
              地址可以為 localhost 
              
                后面5672是端口地址
              
              
                (
              
                >
              
              以不用指定
              
                ,
              
               因為默認就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              

# 聲明一個管道
              
                ,
              
               在管道里發送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機的類型為direct
              
                :
              
               執行交換機    交換機名稱
              
                :
              
               direct_logs
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'direct'
              
              
                )
              
              

# 投遞消息
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
              
                      routing_key
              
                =
              
              severity
              
                ,
              
              
                      body
              
                =
              
              message
                      
              
                )
              
              

print 
              
                "[x] sent {}"
              
              
                .
              
              
                format
              
              
                (
              
              severity
              
                ,
              
               message
              
                )
              
              
# 隊列關閉
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          

2.以下是 receive.py 代碼:

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

# 創建實例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              

# 聲明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機名為 direct_logs 交換機類型為
              
                :
              
              direct
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'direct'
              
              
                )
              
              

# 表示與消費者斷開連接
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成隊列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue

severities 
              
                =
              
               sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                :
              
              
                ]
              
              
                if
              
               not severities
              
                :
              
              
    print 
              
                >>
              
               sys
              
                .
              
              stderr
              
                ,
              
              
                "Usage: %s [info] [warning] [error]"
              
              
                %
              
               \
                         
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                0
              
              
                ]
              
              
                ,
              
              
                )
              
              
    sys
              
                .
              
              
                exit
              
              
                (
              
              
                1
              
              
                )
              
              
                for
              
               severitie 
              
                in
              
               severities
              
                :
              
              
    # 綁定交換機和隊列  這里注意的是綁定鍵
              
                ,
              
               就是根據按照指定嚴重級別進行記錄日志
    channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               queue
              
                =
              
              queue_name
              
                ,
              
               routing_key
              
                =
              
              severitie
              
                )
              
              


def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
    print 
              
                '[X] Received{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                ,
              
              
                )
              
              


# 消費消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              queue_name
              
                ,
              
                # 從指定的消息隊列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                ,
              
                # 如果收到消息
              
                ,
              
               就調用callback函數來處理
                      
              
                )
              
              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 開始消費消息

            
          

3.僅希望保存 warning和error級別 日志到磁盤中,需要打開控制臺并輸入:

            
              python receive
              
                .
              
              py warning error 
              
                >
              
               logs_from_rabbit
              
                .
              
              log

            
          

4.希望所有日志都輸出到屏幕中,打開一個新的終端,輸入:

            
              python receive
              
                .
              
              py info warning error

            
          

5.要觸發一個 error 級別的日志,需要輸入:

            
              python send
              
                .
              
              py error 
              
                '發送一個error級別的錯誤'
              
            
          
            
              # 可以看到步驟
              
                3
              
              的控制臺
              
                ,
              
               會出現
              
                :
              
              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
              
                [
              
              
                X
              
              
                ]
              
               Received發送一個error級別的錯誤

            
          

簡單理解:通過綁定鍵的名稱,來進行由哪個隊列進行處理


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!?。?/p>

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 亚洲日本va中文字幕婷婷 | 欧美亚洲黄色 | 欧美大色网 | 国产成人免费午夜性视频 | 久久精品中文字幕一区 | 精品在线99 | 国产精品美女流白浆视频 | 久久精品国产一区二区三区不卡 | 国产日韩在线播放 | 狠狠狠地在啪线香蕉 | 欧美在线一区二区 | 亚洲色吧 | 久久九九热视频 | 国产成人一区二区三区免费观看 | 日日噜噜夜夜狠狠久久丁香 | 伊人久久伊人 | 日韩亚洲欧美一区二区三区 | 成年人免费在线视频 | 日韩成人免费一级毛片 | 国内精品久久久久久 | 欧美精品1区| 亚洲一区二区在线免费观看 | 农村女人十八毛片a级毛片 农村三级孕妇视频在线 | 国产91在线 | 欧美 | 在线99| 九九精品视频一区二区三区 | 国产精品综合一区二区三区 | 精品国产调教最大网站女王 | 色综合视频一区二区三区 | 亚洲欧美日本视频 | 欧美激情 亚洲 | 欧美亚洲日本在线 | 日韩美一区二区 | 色的综合 | 久久亚洲精品玖玖玖玖 | 久一视频在线观看 | 色综合在| 99热在线获取最新地址 | 国产精品婷婷久青青原 | 人人爱人人做 | 中文字幕一区在线播放 |