亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

C#實(shí)現(xiàn)異步消息隊(duì)列

系統(tǒng) 3902 0
原文: C#實(shí)現(xiàn)異步消息隊(duì)列

拿到新書《.net框架設(shè)計(jì)》,到手之后迅速讀了好多,雖然這本書不像很多教程一樣從頭到尾系統(tǒng)的講明一些知識(shí),但是從項(xiàng)目實(shí)戰(zhàn)角度告訴我們?nèi)绾问褂梦覀兊闹R(shí),從這本書中提煉了一篇,正好符合我前幾篇的“數(shù)據(jù)驅(qū)動(dòng)框架”設(shè)計(jì)的問題;

消息隊(duì)列

消息隊(duì)列 英語 Message queue)是一種 進(jìn)程間通信 或同一進(jìn)程的不同 線程 間的通信方式, 軟件 貯列 用來處理一系列的 輸入 ,通常是來自使用者。消息隊(duì)列提供了 異步 通信協(xié)議 ,每一個(gè)貯列中的紀(jì)錄包含詳細(xì)說明的資料,包含發(fā)生的時(shí)間,輸入裝置的種類,以及特定的輸入?yún)?shù),也就是說:消息的發(fā)送者和接收者不需要同時(shí)與消息隊(duì)列互交。消息會(huì)保存在 隊(duì)列 中,直到接收者取回它。

簡(jiǎn)單的說隊(duì)列就是貯存了我們需要處理的Command但是并不是及時(shí)的拿到其處理結(jié)果;

實(shí)現(xiàn)

實(shí)際上,消息隊(duì)列常常保存在 鏈表 結(jié)構(gòu)中。擁有權(quán)限的進(jìn)程可以向消息隊(duì)列中寫入或讀取消息。

目前,有很多消息隊(duì)列有很多開源的實(shí)現(xiàn),包括 JBoss Messaging 、 JORAM 、 Apache ActiveMQ 、 Sun Open Message Queue Apache Qpid 和HTTPSQS。

優(yōu)點(diǎn),缺點(diǎn)

消息隊(duì)列本身是 異步 的,它允許接收者在消息發(fā)送很長(zhǎng)時(shí)間后再取回消息,這和大多數(shù)通信協(xié)議是不同的。例如 WWW 中使用的 HTTP 協(xié)議是 同步 的,因?yàn)榭蛻舳嗽诎l(fā)出請(qǐng)求后必須等待服務(wù)器回應(yīng)。然而,很多情況下我們需要異步的通信協(xié)議。比如,一個(gè)進(jìn)程通知另一個(gè)進(jìn)程發(fā)生了一個(gè)事件,但不需要等待回應(yīng)。但消息隊(duì)列的異步特點(diǎn),也造成了一個(gè)缺點(diǎn),就是接收者必須 輪詢 消息隊(duì)列,才能收到最近的消息。

信號(hào) 相比,消息隊(duì)列能夠傳遞更多的信息。與 管道 相比,消息隊(duì)列提供了有格式的數(shù)據(jù),這可以減少開發(fā)人員的工作量。但消息隊(duì)列仍然有大小限制。

讀取隊(duì)列消息

主要有兩種(1)服務(wù)端的推;(2)客戶端的拉;

拉:主要是客戶端定時(shí)輪詢拿走消息處理;

推:通過事件訂閱方式主動(dòng)通知訂閱者進(jìn)行處理;

消息的貯存

簡(jiǎn)單的是通過內(nèi)存鏈表實(shí)現(xiàn)貯存;也可以借助DB,比如Redis;還可以持久到本地文件中;

如何保證異步處理的一致性

盡管隊(duì)列主要目的是實(shí)現(xiàn)消息貯存,同時(shí)將調(diào)用與實(shí)現(xiàn)異步化。但是如果想達(dá)到處理消息一致性,好的方式是區(qū)別業(yè)務(wù)處理順序,比如操作主從DB,主負(fù)責(zé)寫,從負(fù)責(zé)讀,我們沒有機(jī)會(huì)在寫之后立馬從讀數(shù)據(jù)庫(kù)拿到你想要的結(jié)果;同時(shí)我們需要借助中間狀態(tài),當(dāng)多個(gè)中間狀態(tài)同時(shí)符合調(diào)用結(jié)果才到到業(yè)務(wù)時(shí)間被處理,否則將“異常消息”持久化,待下次操作;

上代碼

C#實(shí)現(xiàn)異步消息隊(duì)列

建立消息對(duì)立核心隊(duì)列

      {

    public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message);



    public class MessageQueue:Queue<BaseMessage>

    {

        public static MessageQueue GlobalQueue = new MessageQueue();



        private Timer timer = new Timer();

        public MessageQueue() {

            this.timer.Interval = 5000;

            this.timer.Elapsed += Notify;

            this.timer.Enabled = true;

        }

        private void Notify(object sender, ElapsedEventArgs e) {

            lock (this) {

                if (this.Count > 0) {

                    //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());

                    var message = this.Dequeue();

                    this.messageNotifyEvent(message);

                }

            }

        }



        private MessageQueueEventNotifyHandler messageNotifyEvent;

        public event MessageQueueEventNotifyHandler MessageNotifyEvent {

            add {

                this.messageNotifyEvent += value;

            }



            remove {

                if (this.messageNotifyEvent != null) {

                    this.messageNotifyEvent -= value;

                }

            }

        }

    }

}


    

事件處理

      public const string OrderCodePrefix = "P";

        public void Submit(Message.BaseMessage message)

        {

            Order order = message.Body as Order;



            if (order.OrderCode.StartsWith(OrderCodePrefix))

            {

                System.Console.WriteLine("這個(gè)是個(gè)正確的以({0})開頭的訂單:{1}", OrderCodePrefix,order.OrderCode);

            }

            else {

                System.Console.WriteLine("這個(gè)是個(gè)錯(cuò)誤的訂單,沒有以({0})開頭:{1}",OrderCodePrefix,order.OrderCode);

            }

        }


    

可依據(jù)具體業(yè)務(wù)進(jìn)行個(gè)性化處理;

通過Proxy向隊(duì)列追加消息

      public class OrderServiceProxy:IOrderService

    {

        public void Submit(Message.BaseMessage message)

        {

            MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);

        }

    }


    

客戶端調(diào)用

      OrderService orderService = new OrderService();

            MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit;



            var orders = new List<Order>() { 

                new Order(){OrderCode="P001"},

                new Order(){OrderCode="P002"},

                new Order(){OrderCode="B003"}

            };



            OrderServiceProxy proxy = new OrderServiceProxy();

            orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order}));



            Console.ReadLine();


    

這樣就滿足了事件的綁定與觸發(fā)個(gè)性化處理,同時(shí)達(dá)到了消息異步化的目的,希望更細(xì)致的拓展用到后期的項(xiàng)目中。

C#實(shí)現(xiàn)異步消息隊(duì)列


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦?。。?/p>

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 日本一级高清片免费 | 国产精品久久久久影院嫩草 | 久久精品国产屋 | 亚洲a成人7777777久久 | 久久久精品免费视频 | 中国jizz日本 | 久久精品国产6699国产精 | 梦想成为魔法少女在线观看 | 久久久久久一级毛片免费无遮挡 | 亚洲综合在线观看一区www | 国产自产视频在线观看香蕉 | 色综合久久久久综合体桃花网 | 成人网在线免费观看 | 啪啪色视频 | 99视屏 | 毛片高清| 亚洲精品日韩中文字幕久久久 | 国产精品99久久99久久久看片 | 香蕉国产人午夜视频在线 | 日本一区二区网站 | cao美女视频网站在线观看 | 在线精品中文字幕福利视频 | 欧美天天在线 | 99热久久国产精品一区 | 欧美综合视频在线观看 | 一级做a爱片久久毛片 | 久久香蕉国产 | 夜色私人影院永久入口 | 97天天做天天爱夜夜爽 | 色久综合 | 日韩欧美手机在线 | 性视频一区二区三区免费 | 日本免费一区视频 | 国产日韩欧美在线 | 成人在线91| 美日韩黄色大片 | 女人隐私秘视频黄www免费 | 欧美伊人 | 久久精品国产亚洲妲己影院 | 九九精品免视频国产成人 | 亚洲九色 |