Python 操作 Rabbit MQ 路由 (六)
一、路由(Routing):
本章打算新增加一個功能,使它可以達到僅訂閱消息的一個子集。
舉個栗子,我們需要把驗證的錯誤日志信息寫入日志文件(存儲到磁盤),但同時仍然把所有的日志信息輸出到控制臺中。
二、綁定(Bindings):
綁定(Binding)是指交換機(Exchange)和隊列(Queue)的關系
;
綁定的時候可以帶上一個額外的routing_key參數。為了避免與basic_publish的參數混淆,我們把它叫做
綁定鍵(Binding Key)
。
channel
.
queue_bind
(
exchange
=
exchange_name
,
queue
=
queue_name
,
routing_key
=
'fe_cow'
)
注意 :
-
綁定鍵的意義取決于交換機的類型,上一篇使用的
扇形交換機會忽略這個值
。
三、直連交換機(Direct Exchange):
打算擴展上一篇的功能,使其基于日志的嚴重程度進行消息過濾。比如把一些比較嚴重的錯誤日志寫入磁盤,以免將警告或信息日志上浪費磁盤空間,僅記錄比較嚴重的錯誤。
上一篇我們使用的扇形交換機,沒有足夠的靈活性,僅能做廣播的需求;
直連交換機的路由算法很簡單易懂,
交換機將會對綁定鍵和路由鍵進行精準匹配,從而確定消息該分發到哪個隊列
;
圖解 :
詳解 :可以看到type=direct,指定交換機的類型為直連交換機,它和兩個隊列都進行了綁定。第一個隊列(Q1)使用orange作為綁定鍵,第二個隊列(Q2)有兩個綁定鍵,一個是black作為綁定鍵,另一個是green綁定鍵。
簡單理解 :當路由鍵為orange的消息發布到交換機,就會被路由到隊列Q1中,路由鍵為black或green的消息發布到交換機,就會被路由到隊列Q2中,其他的所有消息都將被丟棄。
四、多個綁定:
圖解 :
詳解 :多個隊列使用相同的綁定鍵是合法的。這樣一來,直連交換機就和扇形交換機的行為一樣,會將消息廣播到所有匹配的隊列(Q1和Q2)。
五、發送日志:
將發送消息到一個直連的交換機,把日志級別作為路由鍵,這樣接收日志的腳本可根據嚴重的級別來選擇它想要處理的日志,我們假設
severity的值是info、warning、error中的一個
。
創建直連交換機:
# 交換機名為
:
direct_logs 類型為
:
直連交換機
channel
.
exchange_declare
(
exchange
=
'direct_logs'
,
type
=
'direct'
)
發送消息:
# 交換機名稱為
:
direct_logs
channel
.
basic_publish
(
exchange
=
'direct_logs'
,
routing_key
=
severity
,
body
=
message
)
六、訂閱:
處理接收消息的方式跟之前不一樣,將會為每個嚴重級別分別創建一個新的綁定。
# 表示與消費者斷開連接
,
隊列立即刪除
result
=
channel
.
queue_declare
(
queue
=
''
,
exclusive
=
True
)
# 生成隊列的名字
queue_name
=
result
.
method
.
queue
for
severity
in
severities
:
channel
.
queue_bind
(
exchange
=
'direct_logs'
,
queue
=
queue_name
,
routing_key
=
severity
)
七、整理本節代碼:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
severity
=
sys
.
argv
[
1
]
if
len
(
sys
.
argv
)
>
1
else
'info'
message
=
' '
.
join
(
sys
.
argv
[
2
:
]
)
or
"Hello World!"
# 創建一個實例 本地訪問
IP
地址可以為 localhost
后面5672是端口地址
(
可
>
以不用指定
,
因為默認就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 聲明一個管道
,
在管道里發送消息
channel
=
connection
.
channel
(
)
# 指定交換機的類型為direct
:
執行交換機 交換機名稱
:
direct_logs
channel
.
exchange_declare
(
exchange
=
'direct_logs'
,
exchange_type
=
'direct'
)
# 投遞消息
channel
.
basic_publish
(
exchange
=
'direct_logs'
,
routing_key
=
severity
,
body
=
message
)
print
"[x] sent {}"
.
format
(
severity
,
message
)
# 隊列關閉
connection
.
close
(
)
2.以下是
receive.py
代碼:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
# 創建實例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 聲明管道
channel
=
connection
.
channel
(
)
# 指定交換機名為 direct_logs 交換機類型為
:
direct
channel
.
exchange_declare
(
exchange
=
'direct_logs'
,
exchange_type
=
'direct'
)
# 表示與消費者斷開連接
,
隊列立即刪除
result
=
channel
.
queue_declare
(
queue
=
''
,
exclusive
=
True
)
# 生成隊列的名字
queue_name
=
result
.
method
.
queue
severities
=
sys
.
argv
[
1
:
]
if
not severities
:
print
>>
sys
.
stderr
,
"Usage: %s [info] [warning] [error]"
%
\
(
sys
.
argv
[
0
]
,
)
sys
.
exit
(
1
)
for
severitie
in
severities
:
# 綁定交換機和隊列 這里注意的是綁定鍵
,
就是根據按照指定嚴重級別進行記錄日志
channel
.
queue_bind
(
exchange
=
'direct_logs'
,
queue
=
queue_name
,
routing_key
=
severitie
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
# 消費消息
channel
.
basic_consume
(
queue
=
queue_name
,
# 從指定的消息隊列中接收消息
on_message_callback
=
callback
,
# 如果收到消息
,
就調用callback函數來處理
)
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 開始消費消息
3.僅希望保存
warning和error級別
日志到磁盤中,需要打開控制臺并輸入:
python receive
.
py warning error
>
logs_from_rabbit
.
log
4.希望所有日志都輸出到屏幕中,打開一個新的終端,輸入:
python receive
.
py info warning error
5.要觸發一個
error
級別的日志,需要輸入:
python send
.
py error
'發送一個error級別的錯誤'
# 可以看到步驟
3
的控制臺
,
會出現
:
===
===
=
正在等待消息
===
===
==
[
X
]
Received發送一個error級別的錯誤
簡單理解:通過綁定鍵的名稱,來進行由哪個隊列進行處理
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
