Rabbitmq
初識rabbitmq
RabbitMQ 是流行的開源消息隊列系統,用 erlang 語言開發。 RabbitMQ 是 AMQP (高級消息隊列協議)的標準實現。假設不熟悉 AMQP ,直接看 RabbitMQ 的文檔會比較困難。只是它也僅僅有幾個關鍵概念,這里簡介
幾個概念說明:
Broker
:簡單來說就是消息隊列server實體。
Exchange
:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue
:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
Binding
:綁定,它的作用就是把
exchange
和
queue
依照路由規則綁定起來。
Routing Key
:路由keyword,
exchange
依據這個keyword進行消息投遞。
vhost
:虛擬主機,一個
broker
里能夠開設多個
vhost
,用作不同用戶的權限分離。
producer
:消息生產者,就是投遞消息的程序。
consumer
:消息消費者,就是接受消息的程序。
channel
:消息通道,在client的每一個連接里,可建立多個
channel
,每一個
channel
代表一個會話任務。
由Exchange,Queue,RoutingKey三個才干決定一個從Exchange到Queue的唯一的線路。
消息隊列的使用過程大概例如以下:
(
1
)client連接到消息隊列server,打開一個
channel
。
(
2
)client聲明一個
exchange
,并設置相關屬性。
(
3
)client聲明一個
queue
,并設置相關屬性。
(
4
)client使用
routing key
,在
exchange
和
queue
之間建立好綁定關系。
(
5
)client投遞消息到
exchange
。
exchange 接收到消息后,就依據消息的 key 和已經設置的 binding ,進行消息路由,將消息投遞到一個或多個隊列里。
exchange 也有幾個類型,全然依據 key 進行投遞的叫做 Direct 交換機,比如,綁定時設置了 routing key 為 ”abc” ,那么client提交的消息,僅僅有設置了 key 為 ”abc” 的才會投遞到隊列。對 key 進行模式匹配后進行投遞的叫做 Topic 交換機,符號 ”#” 匹配一個或多個詞,符號 ”*” 匹配正好一個詞。比如 ”abc.#” 匹配 ”abc.def.ghi” , ”abc.*” 僅僅匹配 ”abc.def” 。另一種不須要 key 的,叫做 Fanout 交換機,它採取廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。
RabbitMQ
支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包含
3
個部分:
(
1
)
exchange
持久化,在聲明時指定
durable => 1
(
2
)
queue
持久化,在聲明時指定
durable => 1
(
3
)消息持久化,在投遞時指定
delivery_mode=> 2
(
1
是非持久化)
假設 exchange 和 queue 都是持久化的,那么它們之間的 binding 也是持久化的。假設 exchange 和 queue 兩者之間有一個持久化,一個非持久化,就不同意建立綁定。
安裝開發環境和庫
1.將文件夾中的librabbitmq.so.1放到文件夾 /usr/local/lib/librabbitmq.so.1
2.安裝rabbitm須要的環境和庫
yum install -y ncurses-devel
yum install gcc
yum install g++
yum install cmake
yum install make
yum install php
yum install mysql
yum install php-process
yum install php-devel
yum install mysql-server
#安裝php的amq支持擴展
wget http://pecl.php.net/get/amqp-1.0.3.tgz
tar zxvf amqp-1.0.3.tgz
cd amqp-1.0.3
/usr/bin/phpize
./configure--with-php-config=/usr/bin/php-config --with-amqp
make && make install
#php.ini 加入
vi /etc/php.ini
extension="amqp.so"
#安裝erlang支持
wgethttp://www.erlang.org/download/otp_src_R15B01.tar.gz
tar -zxvf otp_src_R15B01.tar.gz
cd otp_src_R15B01
./configure --prefix=/home/erlang--without-javac
make && make install
ln -s /home/erlang/bin/erl/usr/local/bin/erl
3. 安裝rabbitma
?解壓rabbitmq-server-generic-unix-3.3.4.tar
?進入sbin文件夾:
??? 啟動rabbitmq服務,運行 nohup./rabbitmq-server start &
啟動rabbitmqserver以及命令
當第一次啟動服務,檢測數據庫是否未初始化或者被刪除,它會用以下的資源初始化一個新的數據庫:
一個命名為 / 的虛擬宿主一個名為guestpassword也為guest的用戶,他擁有/虛擬宿主的全部權限假設你的中間件是公開訪問的,最好改動guest用戶的password。管理概觀rabbitmqctl 是RabbitMQ中間件的一個命令行管理工具。它通過連接一個中間件節點運行全部的動作。本地節點默認被命名為”rabbit”。能夠通過這個命令前使用”-n”標志明白的指定節點名稱, 比如:# rabbitmqctl -n rabbit@shortstop add_user tonyg changeit
這個命令指示RabbitMQ中間件在rabbit@shortstop 節點創建一個tonyg/changeit的用戶。
在一個名為”server.example.com”的主機,RabbitMQ Erlang節點的名稱一般是rabbit@server(除非RABBITMQ_NODENAM在中間件啟動時候被設置)。hostnam -s 的輸出一般是”@”符號正確的后綴。rabbitmqctl 默認產生具體輸出。通過”-q”標示可選擇安靜模式。rabbitmqctl -q status應用和集群管理1.停止RabbitMQ應用,關閉節點
# rabbitmqctl stop
2.停止RabbitMQ應用
# rabbitmqctl stop_app
3.啟動RabbitMQ應用
# rabbitmqctl start_app
4.顯示RabbitMQ中間件各種信息
# rabbitmqctl status
5.重置RabbitMQ節點
# rabbitmqctl reset
# rabbitmqctl force_reset
從它屬于的不論什么集群中移除,從管理數據庫中移除全部數據,比如配置過的用戶和虛擬宿主, 刪除全部持久化的消息。
force_reset命令和reset的差別是無條件重置節點,無論當前管理數據庫狀態以及集群的配置。假設數據庫或者集群配置錯誤發生才使用這個最后的手段。
注意:僅僅有在停止RabbitMQ應用后,reset和force_reset才干成功。
6.循環日志文件
# rabbitmqctl rotate_logs[suffix]
7.集群管理
# rabbitmqctl cluster clusternode…
用戶管理
1.加入用戶
# rabbitmqctl add_user username password
2.刪除用戶
# rabbitmqctl delete_user username
3.改動password
# rabbitmqctl change_password usernamenewpassword
4.列出全部用戶
# rabbitmqctl list_users
權限控制1.創建虛擬主機
# rabbitmqctl add_vhost vhostpath
2.刪除虛擬主機
# rabbitmqctl delete_vhost vhostpath
3.列出全部虛擬主機
# rabbitmqctl list_vhosts
4.設置用戶權限
# rabbitmqctl set_permissions [-pvhostpath] username regexp regexp regexp
5.清除用戶權限
# rabbitmqctl clear_permissions [-pvhostpath] username
6.列出虛擬主機上的全部權限
# rabbitmqctl list_permissions [-pvhostpath]
7.列出用戶權限
# rabbitmqctl list_user_permissionsusername
?
樣例:
加入? rabbitmqctl add_vhost az
rabbitmqctl set_permissions -p az guest".*" ".*" ".*"
?
接口描寫敘述
amqp_connection_state_tamqp_new_connection(void);
? ? 接口說明:聲明一個新的 amqp connection
intamqp_open_socket(char const *hostname,?int portnumber);
? ? 接口說明:獲取 socket.
? ? 參數說明: hostname ? ? ?? RabbitMQ server 所在主機
? ? ?? ? ? ? ? ? portnumber ? ??RabbitMQ server 監聽端口 ??
?
voidamqp_set_sockfd(amqp_connection_state_t state,int sockfd);
? ? ? 接口說明:將 amqp connection 和 sockfd 進行綁定
amqp_rpc_reply_tamqp_login(amqp_connection_state_t state,?char const *vhost,intchannel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method,...);
? ? 接口說明:用于登錄 RabbitMQ server ,主要目的為了進行權限管理;
? ? 參數說明: state ? ?amqpconnection
? ? ?? ? ? ? ? ? vhost ? rabbit-mq 的虛機主機,是 rabbit-mq 進行權限管理的最小單位
? ? ?? ? ? ? ? ? channel_max ? 最大鏈接數,此處設成 0 就可以
? ? ?? ? ? ? ? ? frame_max ? 和client通信時所同意的最大的 frame size. 默認值為 131072 ,增大這個值有助于提高吞吐,減少這個值有利于減少時延
? ? ?? ? ? ? ? ? heartbeat 含義未知,默認值填 0
? ? ?? ? ? ? ? ? sasl_method ? 用于 SSL 鑒權,默認值參考后文 demo
?
amqp_channel_open_ok_t*amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);
? ? 接口說明:用于關聯 conn 和 channel
?
amqp_exchange_declare_ok_t*amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive,amqp_boolean_t durable, amqp_table_t arguments);?
? ? 接口說明:聲明 declare
? ? 參數說明: state
? ? ?? ? ? ? ? ? channel
? ? ?? ? ? ? ? ? exchange
? ? ?? ? ? ? ? ? type ? ? "fanout"?"direct" "topic" 三選一
? ? ?? ? ? ? ? ? passive
? ? ?? ? ? ? ? ? curable
? ? ?? ? ? ? ? ? arguments
?
amqp_queue_declare_ok_t*amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable,amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_targuments);?
? ? 接口說明:聲明 queue
? ? 參數說明: state ? amqp connection
? ? ?? ? ? ? ? ? channel?
? ? ?? ? ? ? ? ? queue ?queue name
? ? ?? ? ? ? ? ? passive?
? ? ?? ? ? ? ? ? durable ? 隊列是否持久化
? ? ?? ? ? ? ? ? exclusive ? 當前連接不在時,隊列是否自己主動刪除
? ? ?? ? ? ? ? ? aoto_delete 沒有 consumer 時,隊列是否自己主動刪除
? ? ?? ? ? ? ? ? arguments 用于拓展參數,比方 x-ha-policy 用于 mirrored queue
?
amqp_queue_bind_ok_t*amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments);
? ? 接口說明:聲明 binding ? ?
?
amqp_basic_qos_ok_t*amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_tprefetch_size, uint16_t prefetch_count, amqp_boolean_t global);
? ? ? 接口說明: qos 是 quality of service ,我們這里使用主要用于控制預取消息數,避免消息按條數均勻分配,須要和 no_ack 配合使用
? ? ? 參數說明: state
? ? ?? ? ? ? ? ? ?channel
? ? ?? ? ? ? ? ? ?prefetch_size 以 bytes 為單位, 0 為 unlimited
? ? ?? ? ? ? ? ? ?prefetch_count 預取的消息條數
? ? ?? ? ? ? ? ? ?global
?
amqp_basic_consume_ok_t*amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local,amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments);?
? ? ? 接口說明:開始一個 queue consumer
? ? ? 參數說明: state
? ? ?? ? ? ? ? ? ?channel
? ? ?? ? ? ? ? ? ?queue
? ? ?? ? ? ? ? ? ?consumer_tag
? ? ?? ? ? ? ? ? ?no_local
? ? ?? ? ? ? ? ? ?no_ack ? ? 是否須要確認消息后再從隊列中刪除消息
? ? ?? ? ? ? ? ? ?exclusive
? ? ?? ? ? ? ? ? ?arguments
?
int amqp_basic_ack(amqp_connection_state_tstate,amqp_channel_t channel,uint64_t delivery_tag,amqp_boolean_t multiple);
? ? ?? ? ? ? ? ? ? ?
intamqp_basic_publish(amqp_connection_state_t state,amqp_channel_tchannel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_tmandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const*properties,amqp_bytes_t body);
? ? 接口說明:公布消息
? ? 參數說明: state?
? ? ?? ? ? ? ? ? channel
? ? ?? ? ? ? ? ? exchange ?
? ? ?? ? ? ? ? ? routing_key ? 當 exchange 為默認 “” 時,此處填寫 queue_name ,當 exchange 為 direct ,此處為 binding_key
? ? ?? ? ? ? ? ? mandatory 參見參考文獻 2
? ? ?? ? ? ? ? ? immediate 同上
? ? ?? ? ? ? ? ? properties 很多其它屬性,怎樣設置消息持久化,參見文后 demo
? ? ?? ? ? ? ? ? body 消息體
?
amqp_rpc_reply_tamqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,intcode);
amqp_rpc_reply_tamqp_connection_close(amqp_connection_state_t state,int code);
intamqp_destroy_connection(amqp_connection_state_t state);
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
