RabbitMQ的工作隊列和路由
工作隊列:Working Queue
?
工作隊列這個概念與簡單的發(fā)送/接收消息的區(qū)別就是:接收方接收到消息后,可能需要花費更長的時間來處理消息,這個過程就叫一個Work/Task。
?
幾個概念
分配:多個接收端接收同一個Queue時,如何分配?
消息確認:Server端如何確定接收方的Work已經(jīng)對消息進行了完整的處理?
消息持久化:發(fā)送方、服務(wù)端Queue如何對未處理的消息進行磁盤持久化?
?
Round-robin分配
多個接收端接收同一個Queue時,采用了Round-robin分配算法,即輪叫調(diào)度——依次分配給各個接收方。
?
消息確認
默認開啟了消息確認(接收方接收到消息后,立即向服務(wù)器發(fā)回確認)。消息接收方處理完消息后,向服務(wù)器發(fā)送消息確認,服務(wù)器再刪除該消息。
?
對于耗時的work,可以先關(guān)閉自動消息確認,在work完成后,再手動發(fā)回確認。
channel.basicConsume("hello",false/*關(guān)閉自動消息確認*/,consumer);
// ...work完成后
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
?
持久化
?
1. Server端的Queue持久化
注意的是,如果已經(jīng)聲明了同名非持久化的Queue,則再次聲明無效。
發(fā)送方和接收方都需要指定該參數(shù)。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);?
?
2. Message持久化
channel.basicPublish("", "task_queue",?MessageProperties.
PERSISTENT_TEXT_PLAIN
,message.getBytes());
?
負載分配
?
為了解決各個接收端工作量相差太大的問題(有的一直busy,有的空閑比較多),突破Round-robin。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
意思為,最多為當前接收方發(fā)送一條消息。如果接收方還未處理完畢消息,還沒有回發(fā)確認,就不要再給他分配消息了,應(yīng)該把當前消息分配給其它空閑接收方。
?
固定關(guān)鍵詞路由:Routing
?
使用類型為direct的exchange,發(fā)送特定關(guān)鍵詞(
RoutingKey
)的消息給訂閱該關(guān)鍵詞的Queue。
?
場景示例:消息發(fā)送方發(fā)送了類型為[error][info]的兩種消息,寫磁盤的消息接受者只接受error類型的消息,Console打印的接收兩者。
?

(上圖采用了不同顏色來作為routingKey)
?
發(fā)送方
?
ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct" /* exchange類型為direct */ ); channel.basicPublish(EXCHANGE_NAME, "info" /* 關(guān)鍵詞=info */ , null , message.getBytes()); channel.close(); connection.close();
?
接收方
?
ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct" /* exchange類型為direct */ ); // 創(chuàng)建匿名Queue String queueName = channel.queueDeclare().getQueue(); // 訂閱某個關(guān)鍵詞,綁定到匿名Queue中 channel.queueBind(quueName,EXCHANGE_NAME,"error" ); channel.queueBind(quueName,EXCHANGE_NAME, "info" ); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true , consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking... String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); // 可獲取路由關(guān)鍵詞
?
關(guān)鍵詞模式路由:Topics
?
這種模式可以看做對Routing的擴展。Routing只能使用固定關(guān)鍵詞,而Topics模式可以訂閱
模糊關(guān)鍵詞
。
?
關(guān)鍵詞必須是一組word,由點號分割。例如"xxx.yyy.zzz",限定255bytes。
* 表示一個word;
# 表示0個或者多個word;
?

?
發(fā)送方
?
ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic" /* exchange類型 */ ); channel.basicPublish(EXCHANGE_NAME, "xxx.yyy" /* 關(guān)鍵詞routingKey */ , null , message.getBytes()); channel.close(); connection.close();
?
接收方
?
ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "localhost" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic" /* exchange類型 */ ); // 創(chuàng)建匿名Queue String queueName = channel.queueDeclare().getQueue(); // 訂閱某個關(guān)鍵詞,綁定到匿名Queue中 channel.queueBind(quueName,EXCHANGE_NAME,"*.yyy" ); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true , consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking... String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); // 可獲取路由關(guān)鍵詞
?
Refs
?
?
?
分類:?
10@Java
,?
50@Network&Distributed
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
