亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

rabbitmq技術的一些感悟(一)

系統 1799 0

Rabbitmq

初識rabbitmq

RabbitMQ 是流行的開源消息隊列系統,用 erlang 語言開發。 RabbitMQ AMQP (高級消息隊列協議)的標準實現。假設不熟悉 AMQP ,直接看 RabbitMQ 的文檔會比較困難。只是它也僅僅有幾個關鍵概念,這里簡介

幾個概念說明:

Broker :簡單來說就是消息隊列server實體。
Exchange :消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue :消息隊列載體,每一個消息都會被投入到一個或多個隊列。
Binding :綁定,它的作用就是把 exchange queue 依照路由規則綁定起來。
Routing Key :路由keyword, exchange 依據這個keyword進行消息投遞。
vhost :虛擬主機,一個 broker 里能夠開設多個 vhost ,用作不同用戶的權限分離。
producer :消息生產者,就是投遞消息的程序。
consumer :消息消費者,就是接受消息的程序。
channel :消息通道,在client的每一個連接里,可建立多個 channel ,每一個 channel 代表一個會話任務。

由Exchange,Queue,RoutingKey三個才干決定一個從Exchange到Queue的唯一的線路。

消息隊列的使用過程大概例如以下:

1 )client連接到消息隊列server,打開一個 channel
  ( 2 )client聲明一個 exchange ,并設置相關屬性。
  ( 3 )client聲明一個 queue ,并設置相關屬性。
  ( 4 )client使用 routing key ,在 exchange queue 之間建立好綁定關系。
  ( 5 )client投遞消息到 exchange

exchange 接收到消息后,就依據消息的 key 和已經設置的 binding ,進行消息路由,將消息投遞到一個或多個隊列里。

exchange 也有幾個類型,全然依據 key 進行投遞的叫做 Direct 交換機,比如,綁定時設置了 routing key ”abc” ,那么client提交的消息,僅僅有設置了 key ”abc” 的才會投遞到隊列。對 key 進行模式匹配后進行投遞的叫做 Topic 交換機,符號 ”#” 匹配一個或多個詞,符號 ”*” 匹配正好一個詞。比如 ”abc.#” 匹配 ”abc.def.ghi” ”abc.*” 僅僅匹配 ”abc.def” 。另一種不須要 key 的,叫做 Fanout 交換機,它採取廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。

RabbitMQ 支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包含 3 個部分:
  ( 1 exchange 持久化,在聲明時指定 durable => 1
  ( 2 queue 持久化,在聲明時指定 durable => 1
  ( 3 )消息持久化,在投遞時指定 delivery_mode=> 2 1 是非持久化)

假設 exchange queue 都是持久化的,那么它們之間的 binding 也是持久化的。假設 exchange queue 兩者之間有一個持久化,一個非持久化,就不同意建立綁定。

安裝開發環境和庫

1.將文件夾中的librabbitmq.so.1放到文件夾 /usr/local/lib/librabbitmq.so.1

2.安裝rabbitm須要的環境和庫

yum install -y ncurses-devel

yum install gcc

yum install g++

yum install cmake

yum install make

yum install php

yum install mysql

yum install php-process

yum install php-devel

yum install mysql-server

#安裝php的amq支持擴展

wget http://pecl.php.net/get/amqp-1.0.3.tgz

tar zxvf amqp-1.0.3.tgz

cd amqp-1.0.3

/usr/bin/phpize

./configure--with-php-config=/usr/bin/php-config --with-amqp

make && make install

#php.ini 加入

vi /etc/php.ini

extension="amqp.so"

#安裝erlang支持

wgethttp://www.erlang.org/download/otp_src_R15B01.tar.gz

tar -zxvf otp_src_R15B01.tar.gz

cd otp_src_R15B01

./configure --prefix=/home/erlang--without-javac

make && make install

ln -s /home/erlang/bin/erl/usr/local/bin/erl

3. 安裝rabbitma

?解壓rabbitmq-server-generic-unix-3.3.4.tar

?進入sbin文件夾:

??? 啟動rabbitmq服務,運行 nohup./rabbitmq-server start &

啟動rabbitmqserver以及命令

當第一次啟動服務,檢測數據庫是否未初始化或者被刪除,它會用以下的資源初始化一個新的數據庫:

一個命名為 / 的虛擬宿主一個名為guestpassword也為guest的用戶,他擁有/虛擬宿主的全部權限假設你的中間件是公開訪問的,最好改動guest用戶的password。管理概觀rabbitmqctl 是RabbitMQ中間件的一個命令行管理工具。它通過連接一個中間件節點運行全部的動作。本地節點默認被命名為”rabbit”。能夠通過這個命令前使用”-n”標志明白的指定節點名稱, 比如:# rabbitmqctl -n rabbit@shortstop add_user tonyg changeit

這個命令指示RabbitMQ中間件在rabbit@shortstop 節點創建一個tonyg/changeit的用戶。

在一個名為”server.example.com”的主機,RabbitMQ Erlang節點的名稱一般是rabbit@server(除非RABBITMQ_NODENAM在中間件啟動時候被設置)。hostnam -s 的輸出一般是”@”符號正確的后綴。rabbitmqctl 默認產生具體輸出。通過”-q”標示可選擇安靜模式。rabbitmqctl -q status應用和集群管理1.停止RabbitMQ應用,關閉節點

# rabbitmqctl stop

2.停止RabbitMQ應用

# rabbitmqctl stop_app

3.啟動RabbitMQ應用

# rabbitmqctl start_app

4.顯示RabbitMQ中間件各種信息

# rabbitmqctl status

5.重置RabbitMQ節點

# rabbitmqctl reset

# rabbitmqctl force_reset

從它屬于的不論什么集群中移除,從管理數據庫中移除全部數據,比如配置過的用戶和虛擬宿主, 刪除全部持久化的消息。

force_reset命令和reset的差別是無條件重置節點,無論當前管理數據庫狀態以及集群的配置。假設數據庫或者集群配置錯誤發生才使用這個最后的手段。

注意:僅僅有在停止RabbitMQ應用后,reset和force_reset才干成功。

6.循環日志文件

# rabbitmqctl rotate_logs[suffix]

7.集群管理

# rabbitmqctl cluster clusternode…

用戶管理

1.加入用戶

# rabbitmqctl add_user username password

2.刪除用戶

# rabbitmqctl delete_user username

3.改動password

# rabbitmqctl change_password usernamenewpassword

4.列出全部用戶

# rabbitmqctl list_users

權限控制1.創建虛擬主機

# rabbitmqctl add_vhost vhostpath

2.刪除虛擬主機

# rabbitmqctl delete_vhost vhostpath

3.列出全部虛擬主機

# rabbitmqctl list_vhosts

4.設置用戶權限

# rabbitmqctl set_permissions [-pvhostpath] username regexp regexp regexp

5.清除用戶權限

# rabbitmqctl clear_permissions [-pvhostpath] username

6.列出虛擬主機上的全部權限

# rabbitmqctl list_permissions [-pvhostpath]

7.列出用戶權限

# rabbitmqctl list_user_permissionsusername

?

樣例:

加入? rabbitmqctl add_vhost az

rabbitmqctl set_permissions -p az guest".*" ".*" ".*"

?

接口描寫敘述

amqp_connection_state_tamqp_new_connection(void);

? ? 接口說明:聲明一個新的 amqp connection

intamqp_open_socket(char const *hostname,?int portnumber);

? ? 接口說明:獲取 socket.

? ? 參數說明: hostname ? ? ?? RabbitMQ server 所在主機

? ? ?? ? ? ? ? ? portnumber ? ??RabbitMQ server 監聽端口 ??

?

voidamqp_set_sockfd(amqp_connection_state_t state,int sockfd);

? ? ? 接口說明:將 amqp connection sockfd 進行綁定

amqp_rpc_reply_tamqp_login(amqp_connection_state_t state,?char const *vhost,intchannel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method,...);

? ? 接口說明:用于登錄 RabbitMQ server ,主要目的為了進行權限管理;

? ? 參數說明: state ? ?amqpconnection

? ? ?? ? ? ? ? ? vhost ? rabbit-mq 的虛機主機,是 rabbit-mq 進行權限管理的最小單位

? ? ?? ? ? ? ? ? channel_max ? 最大鏈接數,此處設成 0 就可以

? ? ?? ? ? ? ? ? frame_max ? 和client通信時所同意的最大的 frame size. 默認值為 131072 ,增大這個值有助于提高吞吐,減少這個值有利于減少時延

? ? ?? ? ? ? ? ? heartbeat 含義未知,默認值填 0

? ? ?? ? ? ? ? ? sasl_method ? 用于 SSL 鑒權,默認值參考后文 demo

?

amqp_channel_open_ok_t*amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);

? ? 接口說明:用于關聯 conn channel

?

amqp_exchange_declare_ok_t*amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive,amqp_boolean_t durable, amqp_table_t arguments);?

? ? 接口說明:聲明 declare

? ? 參數說明: state

? ? ?? ? ? ? ? ? channel

? ? ?? ? ? ? ? ? exchange

? ? ?? ? ? ? ? ? type ? ? "fanout"?"direct" "topic" 三選一

? ? ?? ? ? ? ? ? passive

? ? ?? ? ? ? ? ? curable

? ? ?? ? ? ? ? ? arguments

?

amqp_queue_declare_ok_t*amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable,amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_targuments);?

? ? 接口說明:聲明 queue

? ? 參數說明: state ? amqp connection

? ? ?? ? ? ? ? ? channel?

? ? ?? ? ? ? ? ? queue ?queue name

? ? ?? ? ? ? ? ? passive?

? ? ?? ? ? ? ? ? durable ? 隊列是否持久化

? ? ?? ? ? ? ? ? exclusive ? 當前連接不在時,隊列是否自己主動刪除

? ? ?? ? ? ? ? ? aoto_delete 沒有 consumer 時,隊列是否自己主動刪除

? ? ?? ? ? ? ? ? arguments 用于拓展參數,比方 x-ha-policy 用于 mirrored queue

?

amqp_queue_bind_ok_t*amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments);

? ? 接口說明:聲明 binding ? ?

?

amqp_basic_qos_ok_t*amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_tprefetch_size, uint16_t prefetch_count, amqp_boolean_t global);

? ? ? 接口說明: qos quality of service ,我們這里使用主要用于控制預取消息數,避免消息按條數均勻分配,須要和 no_ack 配合使用

? ? ? 參數說明: state

? ? ?? ? ? ? ? ? ?channel

? ? ?? ? ? ? ? ? ?prefetch_size bytes 為單位, 0 unlimited

? ? ?? ? ? ? ? ? ?prefetch_count 預取的消息條數

? ? ?? ? ? ? ? ? ?global

?

amqp_basic_consume_ok_t*amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local,amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments);?

? ? ? 接口說明:開始一個 queue consumer

? ? ? 參數說明: state

? ? ?? ? ? ? ? ? ?channel

? ? ?? ? ? ? ? ? ?queue

? ? ?? ? ? ? ? ? ?consumer_tag

? ? ?? ? ? ? ? ? ?no_local

? ? ?? ? ? ? ? ? ?no_ack ? ? 是否須要確認消息后再從隊列中刪除消息

? ? ?? ? ? ? ? ? ?exclusive

? ? ?? ? ? ? ? ? ?arguments

?

int amqp_basic_ack(amqp_connection_state_tstate,amqp_channel_t channel,uint64_t delivery_tag,amqp_boolean_t multiple);

? ? ?? ? ? ? ? ? ? ?

intamqp_basic_publish(amqp_connection_state_t state,amqp_channel_tchannel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_tmandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const*properties,amqp_bytes_t body);

? ? 接口說明:公布消息

? ? 參數說明: state?

? ? ?? ? ? ? ? ? channel

? ? ?? ? ? ? ? ? exchange ?

? ? ?? ? ? ? ? ? routing_key ? exchange 為默認 “” 時,此處填寫 queue_name ,當 exchange direct ,此處為 binding_key

? ? ?? ? ? ? ? ? mandatory 參見參考文獻 2

? ? ?? ? ? ? ? ? immediate 同上

? ? ?? ? ? ? ? ? properties 很多其它屬性,怎樣設置消息持久化,參見文后 demo

? ? ?? ? ? ? ? ? body 消息體

?

amqp_rpc_reply_tamqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,intcode);

amqp_rpc_reply_tamqp_connection_close(amqp_connection_state_t state,int code);

intamqp_destroy_connection(amqp_connection_state_t state);

?

rabbitmq技術的一些感悟(一)


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 亚洲视频第二页 | 欧美午夜毛片a级在线 | 日本欧美强乱视频在线 | 日本护士一级毛片在线播放 | 中文字幕精品视频 | 在线看片日韩 | 不卡神马影院 | 99精品影视| 久久久久久a亚洲欧洲aⅴ | 久久伊人久久亚洲综合 | 99这里只有精品在线 | 精品视频在线免费播放 | 欧美一区日韩一区中文字幕页 | 久久在线中文字幕 | 99精品福利视频 | 国产精品亚洲精品日韩已满 | 亚洲欧美精品成人久久91 | 松永纱奈在线观看 | 伊人久久综合谁合综合久久 | 久久影视一区 | 九九视频只有精品 | 91精品国产9l久久久久 | 欧美亚洲综合一区 | 欧美区一区 | 一区二区三区精品国产欧美 | 精品一区二区久久久久久久网精 | 毛茸茸成熟女性老太的女bbww | 高清成人综合 | 狠狠色丁香婷婷综合久久片 | 日韩一区二区在线观看 | 九九99 | 国产在线精品一区免费香蕉 | 四虎影视久久 | 久色视频在线观看 | 中国在线播放精品区 | 国产精品自在线拍 | 亚洲不卡视频在线观看 | 涩涩免费播放观看在线视频 | 日本a一级毛片免费观看 | 天天色色网 | 久久99热国产这有精品 |