1??????什么是RabbitMQ?
RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然:
?
單向解耦
?
雙向解耦(如:RPC)
????例如一個日志系統,很容易使用RabbitMQ簡化工作量,一個Consumer可以進行消息的正常處理,另一個Consumer負責對消息進行日志記錄,只要在程序中指定兩個Consumer所監聽的queue以相同的方式綁定到同一個exchange即可,剩下的消息分發工作由RabbitMQ完成。
?
?
使用RabbitMQ server需要:
1. ErLang語言包;
2. RabbitMQ安裝包;
RabbitMQ同時提供了java的客戶端(一個jar包)。
?
2??????概念和特性
2.1??????交換機(exchange):
1.?接收消息,轉發消息到綁定的隊列。四種類型:direct, topic, headers and fanout
direct:轉發消息到routigKey指定的隊列
topic:按規則轉發消息(最靈活)
headers:(這個還沒有接觸到)
fanout:轉發消息到所有綁定隊列
2.?如果沒有隊列綁定在交換機上,則發送到該交換機上的消息會丟失。
3.?一個交換機可以綁定多個隊列,一個隊列可以被多個交換機綁定。
4. topic類型交換器通過模式匹配分析消息的routing-key屬性。它將routing-key和binding-key的字符串切分成單詞。這些單詞之間用點隔開。它同樣也會識別兩個通配符:#匹配0個或者多個單詞,*匹配一個單詞。例如,binding key:*.stock.#匹配routing key:usd.stcok和eur.stock.db,但是不匹配stock.nana。
還有一些其他的交換器類型,如header、failover、system等,現在在當前的RabbitMQ版本中均未實現。
5.?因為交換器是命名實體,聲明一個已經存在的交換器,但是試圖賦予不同類型是會導致錯誤??蛻舳诵枰獎h除這個已經存在的交換器,然后重新聲明并且賦予新的類型。
6.?交換器的屬性:
-?持久性:如果啟用,交換器將會在server重啟前都有效。
-?自動刪除:如果啟用,那么交換器將會在其綁定的隊列都被刪除掉之后自動刪除掉自身。
-?惰性:如果沒有聲明交換器,那么在執行到使用的時候會導致異常,并不會主動聲明。
?
2.2??????隊列(queue):
1.?隊列是RabbitMQ內部對象,存儲消息。相同屬性的queue可以重復定義。
2.?臨時隊列。channel.queueDeclare(),有時不需要指定隊列的名字,并希望斷開連接時刪除隊列。
3.?隊列的屬性:
-?持久性:如果啟用,隊列將會在server重啟前都有效。
-?自動刪除:如果啟用,那么隊列將會在所有的消費者停止使用之后自動刪除掉自身。
-?惰性:如果沒有聲明隊列,那么在執行到使用的時候會導致異常,并不會主動聲明。
-?排他性:如果啟用,隊列只能被聲明它的消費者使用。
這些性質可以用來創建例如排他和自刪除的transient或者私有隊列。這種隊列將會在所有鏈接到它的客戶端斷開連接之后被自動刪除掉。它們只是短暫地連接到server,但是可以用于實現例如RPC或者在AMQ上的對等通信。4. RPC的使用是這樣的:RPC客戶端聲明一個回復隊列,唯一命名(例如用UUID),并且是自刪除和排他的。然后它發送請求給一些交換器,在消息的reply-to字段中包含了之前聲明的回復隊列的名字。RPC服務器將會回答這些請求,使用消息的reply-to作為routing key(默認綁定器會綁定所有的隊列到默認交換器,名稱為“amp.交換器類型名”)發送到默認交換器。注意這僅僅是慣例而已,可以根據和RPC服務器的約定,它可以解釋消息的任何屬性(甚至數據體)來決定回復給誰。
2.3??????消息傳遞:
1.?消息在隊列中保存,以輪詢的方式將消息發送給監聽消息隊列的消費者,可以動態的增加消費者以提高消息的處理能力。
2.?為了實現負載均衡,可以在消費者端通知RabbitMQ,一個消息處理完之后才會接受下一個消息。
channel.basic_qos(prefetch_count=1)
注意:要防止如果所有的消費者都在處理中,則隊列中的消息會累積的情況。
3.?消息有14個屬性,最常用的幾種:
deliveryMode:持久化屬性
contentType:編碼
replyTo:指定一個回調隊列
correlationId:消息id
實例代碼:
4.?消息生產者可以選擇是否在消息被發送到交換器并且還未投遞到隊列(沒有綁定器存在)和/或沒有消費者能夠立即處理的時候得到通知。通過設置消息的mandatory和/或immediate屬性為真,這些投遞保障機制的能力得到了強化。
5.?此外,一個生產者可以設置消息的persistent屬性為真。這樣一來,server將會嘗試將這些消息存儲在一個穩定的位置,直到server崩潰。當然,這些消息肯定不會被投遞到非持久的隊列中。
?
2.4??????高可用性(HA):
1.?消息ACK,通知RabbitMQ消息已被處理,可以從內存刪除。如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同于ActiveMQ,在RabbitMQ里,消息沒有過期的概念),則RabbitMQ會將消息重新發送給其他監聽在隊列的下一個消費者。
channel.basicConsume(queuename, noAck=false, consumer);
2.?消息和隊列的持久化。定義隊列時可以指定隊列的持久化屬性(問:持久化隊列如何刪除?)
channel.queueDeclare(queuename, durable=true, false, false, null);
發送消息時可以指定消息持久化屬性:
channel.basicPublish(exchangeName, routingKey,
????????????MessageProperties.PERSISTENT_TEXT_PLAIN,
????????????message.getBytes());
這樣,即使RabbitMQ服務器重啟,也不會丟失隊列和消息。
3. publisher confirms
4. master/slave機制,配合Mirrored Queue,這種情況下,publisher會正常發送消息和接收消息的confirm,但對于subscriber來說,需要接收Consumer Cancellation Notifications來得到主節點失敗的通知,然后re-consume from the queue,此時要求client有處理重復消息的能力。注意:如果queue在一個新加入的節點上增加了一個slave,此時slave上沒有此前queue的信息(目前還沒有同步機制)。
(通過命令行或管理插件可以查看哪個slave是同步的:
rabbitmqctl list_queues name slave_pids synchronised_slave_pids)
????當一個slave重新加入mirrored-queue時,如果queue是durable的,則會被清空。
?
2.5??????集群(cluster):
1.?不支持跨網段(如需支持,需要shovel或federation插件)
2.?可以隨意的動態增加或減少、啟動或停止節點,允許節點故障
3.?集群分為RAM節點和DISK節點,一個集群最好至少有一個DISK節點保存集群的狀態。
4.?集群的配置可以通過命令行,也可以通過配置文件,命令行優先。
?
3??????使用
3.1??????簡易使用流程
?
3.2??????RabbitMQ在OpenStack中的使用
?
?
????在Openstack中,組件之間對RabbitMQ使用基本都是“Remote Procedure Calls”的方式。每一個Nova服務(比如計算服務、存儲服務等)初始化時會創建兩個隊列,一個名為“NODE-TYPE.NODE-ID”,另一個名為“NODE-TYPE”,NODE-TYPE是指服務的類型,NODE-ID指節點名稱。
????從抽象層面上講,RabbitMQ的組件的使用類似于下圖所示:
每個服務會綁定兩個隊列到同一個topic類型的exchange,從不同的隊列中接收不同類型的消息。消息的發送者如果關心消息的返回值,則會監聽另一個隊列,該隊列綁定在一個direct類型的exchange。接受者收到消息并處理后,會將消息的返回發送到此exchange。
在Openstack中,如果不關心消息返回,消息的流程圖如下:
?
????如果關心消息返回值,流程圖如下:
?
?
3.3??????為什么要使用RabbitMQ?
曾經有過一個人做過一個測試( http://www.cnblogs.com/amityat/archive/2011/08/31/2160293.html ),發送1百萬個并發消息,對性能有很高的需求,于是作者對比了RabbitMQ、MSMQ、ActiveMQ、ZeroMQueue,整個過程共產生1百萬條1K的消息。測試的執行是在一個Windows Vista上進行的,測試結果如下:
????雖然ZeroMQ性能較高,但這個產品不提供消息持久化,需要自己實現審計和數據恢復,因此在易用性和HA上不是令人滿意,通過測試結果可以看到,RabbitMQ的性能確實不錯。
????我在本機也做了一些測試,但我的測試是基于組件的原生配置,沒有做任何的配置優化,因此總覺的不靠譜。我只測試了RabbitMQ和ActiveMQ兩款產品,雖然網上都說ActiveMQ性能不如前者,但平心而論,ActiveMQ提供了很多配置,存在很大的調優空間,也許修改一個配置參數就會使組件的性能有一個質的飛躍。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
