亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

rabbitmq-c初探

系統 2176 0

? RabbitMQ著實是個好東西,當然了也有對C語言client開發的支持。例子和文檔少的可憐,只能去項目里去查看example來理解,簡單整理了一些,以免走些彎路。主要是在版本對應上,這點就沒Maven好了,只能對好類庫和例子。接下來我們簡單看看需要的東東。

環境:Ubuntu 13.04

rabbitmq-server 默認的3.0.2-1

librabbitmq-dev 默認的0.0.1.hg216-1

項目構造用的qmake(這樣簡單不少)

1 consumer

1.1 consumer.pro的內容

SOURCES=utils.cpp amqp_consumer.cpp platform_utils.cpp

HEADERS=utils.h

VPATH+=/usr/include

CONFIG+=release

TARGET=consumer

LIBS += -lrabbitmq

1.2 amqp_consumer.cpp代碼

? 這里的代碼來自于rabbitmq-c-v0.3.0 具體查看? https://github.com/alanxz/rabbitmq-c/blob/rabbitmq-c-v0.3.0/examples/amqp_consumer.c 。(對于幾個特殊的宏引用作了調整)

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <stdint.h>

#include <amqp.h>

#include <amqp_framing.h>

#include <assert.h>

#include "utils.h"

#define SUMMARY_EVERY_US 1000000


static void run(amqp_connection_state_t conn)

{

? uint64_t start_time = now_microseconds();

? int received = 0;

? int previous_received = 0;

? uint64_t previous_report_time = start_time;

? uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

? amqp_frame_t frame;

? int result;

? size_t body_received;

? size_t body_target;

? uint64_t now;


? while (1) {

? ? now = now_microseconds();

? ? if (now > next_summary_time) {

? ? ? int countOverInterval = received - previous_received;

? ? ? double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

? ? ? printf("%d ms: Received %d - %d since last report (%d Hz)\n",

? ? (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);

? ? ? previous_received = received;

? ? ? previous_report_time = now;

? ? ? next_summary_time += SUMMARY_EVERY_US;

? ? }

? ? amqp_maybe_release_buffers(conn);

? ? result = amqp_simple_wait_frame(conn, &frame);

? ? if (result < 0)

? ? ? return;

? ? if (frame.frame_type != AMQP_FRAME_METHOD)

? ? ? continue;

? ? if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)

? ? ? continue;


? ? result = amqp_simple_wait_frame(conn, &frame);

? ? if (result < 0)

? ? ? return;

? ? if (frame.frame_type != AMQP_FRAME_HEADER) {

? ? ? fprintf(stderr, "Expected header!");

? ? ? abort();

? ? }

? ? body_target = frame.payload.properties.body_size;

? ? body_received = 0;


? ? while (body_received < body_target) {

? ? ? result = amqp_simple_wait_frame(conn, &frame);

? ? ? if (result < 0)

? ? ? ? return;

? ? ?if (frame.frame_type != AMQP_FRAME_BODY) {

? ? ? ? fprintf(stderr, "Expected body!");

? ? ? ? abort();

? ? ? }

? ? ? body_received += frame.payload.body_fragment.len;

? ? ? assert(body_received <= body_target);

? ? ? amqp_dump(frame.payload.body_fragment.bytes,frame.payload.body_fragment.len);

? ? }

? ? received++;

? }

}


int main(int argc, char const * const *argv) {

? char const *hostname;

? int port;

? char const *exchange;

? char const *bindingkey;

? int sockfd;

? amqp_connection_state_t conn;

? amqp_bytes_t queuename;


? if (argc < 3) {

? ? fprintf(stderr, "Usage: amqp_consumer host port\n");

? ? return 1;

? }

? hostname = argv[1];

? port = atoi(argv[2]);

? exchange = "amq.direct"; /* argv[3]; */

? bindingkey = "test queue"; /* argv[4]; */

? conn = amqp_new_connection();

? die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");

? amqp_set_sockfd(conn, sockfd);

? die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

? ?"Logging in");

? amqp_channel_open(conn, 1);

? die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

? {

? ? amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES/*amqp_empty_bytes*/, 0, 0, 0, 1,

? ?AMQP_EMPTY_TABLE/*amqp_empty_table*/);

? ? die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

? ? queuename = amqp_bytes_malloc_dup(r->queue);

? ? if (queuename.bytes == NULL) {

? ? ? fprintf(stderr, "Out of memory while copying queue name");

? ? ? return 1;

? ? }

? }

? amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),

?AMQP_EMPTY_TABLE/*amqp_empty_table*/);

? die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");

? amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES/*amqp_empty_bytes*/, 0, 1, 0, AMQP_EMPTY_TABLE/*amqp_empty_table*/);

? die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");

? run(conn);

? die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");

? die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");

? die_on_error(amqp_destroy_connection(conn), "Ending connection");

? return 0;

}

?

2 producer

2.1 producer.pro的內容

SOURCES=utils.cpp amqp_producer.cpp platform_utils.cpp
HEADERS=utils.h
VPATH+=/usr/include?
CONFIG+=release
TARGET=producer
LIBS += -lrabbitmq

2.2 amqp_producer.cpp代碼

? 這里的代碼來自于rabbitmq-c-v0.3.0 具體查看 https://github.com/alanxz/rabbitmq-c/blob/rabbitmq-c-v0.3.0/examples/amqp_producer.c 。(對于幾個特殊的宏引用作了調整)

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <stdint.h>

#include <amqp.h>

#include <amqp_framing.h>

#include "utils.h"

#define SUMMARY_EVERY_US 1000000


static void send_batch(amqp_connection_state_t conn,

? ? ? char const *queue_name,

? ? ? int rate_limit,

? ? ? int message_count)

{

? uint64_t start_time = now_microseconds();

? int i;

? int sent = 0;

? int previous_sent = 0;

? uint64_t previous_report_time = start_time;

? uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

? char message[256];

? amqp_bytes_t message_bytes;


? for (i = 0; i < (int)sizeof(message); i++) {

? ? message[i] = i & 0xff;

? }

? message_bytes.len = sizeof(message);

? message_bytes.bytes = message;

? for (i = 0; i < message_count; i++) {

? ? uint64_t now = now_microseconds();

? ? die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"), amqp_cstring_bytes(queue_name),

? ? ? 0, 0, NULL, message_bytes), "Publishing");

? ? sent++;

? ? if (now > next_summary_time) {

? ? ? int countOverInterval = sent - previous_sent;

? ? ? double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

? ? ? printf("%d ms: Sent %d - %d since last report (%d Hz)\n", (int)(now - start_time) / 1000, sent,

? ? ? ? ?countOverInterval, (int) intervalRate);

? ? ? previous_sent = sent;

? ? ? previous_report_time = now;

? ? ? next_summary_time += SUMMARY_EVERY_US;

? ? }

? ? while (((i * 1000000.0) / (now - start_time)) > rate_limit) {

? ? ? microsleep(2000);

? ? ? now = now_microseconds();

? ? }

? }

? {

? ? uint64_t stop_time = now_microseconds();

? ? int total_delta = stop_time - start_time;

? ? printf("PRODUCER - Message count: %d\n", message_count);

? ? printf("Total time, milliseconds: %d\n", total_delta / 1000);

? ? printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));

? }

}


int main(int argc, char const * const *argv) {

? char const *hostname;

? int port;

? int rate_limit;

? int message_count;

?int sockfd;

? amqp_connection_state_t conn;

? if (argc < 5) {

? ? fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n");

? ? return 1;

? }

? hostname = argv[1];

? port = atoi(argv[2]);

? rate_limit = atoi(argv[3]);

? message_count = atoi(argv[4]);

? conn = amqp_new_connection();

? die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");

? amqp_set_sockfd(conn, sockfd);

? die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

? ?"Logging in");

? amqp_channel_open(conn, 1);

? die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

? send_batch(conn, "test queue", rate_limit, message_count);

? die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");

? die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");

? die_on_error(amqp_destroy_connection(conn), "Ending connection");

? return 0;

}

?

?

rabbitmq-c初探


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 全部免费毛片 | 国产亚洲精品一区二区久久 | 国产理论视频在线观看 | 涩涩色视频在线播放 | 青娱乐伊人 | 亚洲涩福利高清在线 | 91视频第一页 | 一级毛片免费视频日本 | 999奇米 | 色在线视频观看 | se94se亚洲欧美在线 | 欧美激情一区二区三区视频 | 成人亚洲精品一区二区 | 99久久综合狠狠综合久久一区 | 欧美成人精品一区二区三区 | 国产一级黄色网 | 日日摸狠狠的摸夜夜摸 | 每日更新国产精品视频 | 四虎影片国产精品8848 | 久久6国产 | 在线播放91 | 一本一道波多野结衣一区二区 | 日本高清视频一区二区 | 精品久久久久久无码中文字幕 | 神马视频我不卡 | 狠狠色丁香婷婷综合久久片 | 夜色视频网站 | 性激烈欧美三级在线播放 | 国产精品伊人 | 99久久国产综合精麻豆 | 日本一级看片免费播放 | 蜜桃久久 | 九九99精品 | 国内精品视频一区二区八戒 | 国产精品福利资源在线 | 日本综合色| 91探花国产综合在线精品 | 欧美开嫩苞实拍视频在线观看 | 国产成人看片免费视频观看 | 狠狠色成人综合网图片区 | 伊人精品网|