亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Adhesive框架系列文章--分布式組件客戶端模塊實

系統 2332 0

Adhesive框架中是分布式組件客戶端首先實現的是基于Json序列化+二進制協議的Memcached客戶端。在本文中會介紹其中的實現細節。

我們先來看一下項目結構:

image

從這個結構大致可以看出:

1)Memcached只是其中的一個具體實現,這個組件期望提供一個ClientSocket-ClientNode-ClientCluster的基礎實現,以后可以有各種客戶端基于這種結構來實現

2)對于Memcached的實現,其中把協議部分放在的Protocol文件夾中,并且根據協議為每一個請求和響應封裝類型,也就是使用面向對象的方式而不是拼數據包的方式來封裝協議

那么現在首先來介紹基礎結構。從最底部的層次開始,最底部應該是對Socket進行一個封裝,在這里我們實現了一個ClientSocket,主要完成下面功能:

1)封裝Read、Write、Connect、Reset(因為我們實現的是Socket池,所以在Socket使用之后,歸還池之前需要重置)操作

2)封裝Socket基本狀態,包括創建時間、忙碌時間、閑置時間、發生錯誤時的回調方法

?

在ClientSocket之上的一層是ClientNode,也就是一個節點的客戶端,很明顯,這里需要做的是Socket連接池,具體完成的工作有:

1)進行連接池的維護,包括移除空閑超時的Socket、強制結束忙碌時間過長的Socket、補充新的Socket到連接池的下限

2)初始化池、結束池、從池獲取Socket、把使用后的Socket返回池、創建非池Socket

在正常使用的時候,所有Socket都從池中獲取,如果整個Node不可用,那么我們定時創建非池Socket來測試Node是否恢復

?

在ClientNode之上的是ClientCluster,也就是集群,對于需要客戶端進行一致性哈希分發節點的分布式組件來說,這層就很必要了,完成的功能主要有:

1)初始化集群、使用一致性哈希從集群獲得節點、直接獲得ClientSocket

2)在節點出錯的時候進行重新節點分配、嘗試恢復出錯的節點

?

ClientCluster是使用ClientNodeLocator來分配節點的,其中的算法也就是一致性哈希算法。

image

之前說過節點有權重的概念,在這里也就是通過虛擬節點的數量來設置節點權重,權重越高分配到Key的數量也就會越多。

?

在ClientCluster之上還封裝了一層AbstractClient,也就是直接面向用戶的API入口。

    
      public
    
    
      abstract
    
    
      class
    
     AbstractClient<T> 
    
      where
    
     T : AbstractClient<T>, 
    
      new
    
    ()
  

完成的功能有:

1)保存所有的Cluster,初始化Cluster

2)獲取具體的XXXClient的實現,比如MemcachedClient

?

很明顯,我們的第一個實現MemcachedClient是繼承了AbstractClient:

    
      public
    
    
      partial
    
    
      class
    
     MemcachedClient : AbstractClient<MemcachedClient>
  

在這里使用了部分類,內部的實現都放在了MemcachedClient_Internal.cs中,而對外的API都放在了MemcachedClient.cs中。

?

對于Memcached的二進制協議,我們首先是實現一個頭的格式包:

        [StructLayout(LayoutKind.Sequential, Pack = 1)]

    
    
      internal
    
    
      struct
    
     Header

    {

        
    
      internal
    
    
      byte
    
     Magic;



        
    
      internal
    
    
      byte
    
     Opcode;



        
    
      internal
    
    
      ushort
    
     KeyLength;



        
    
      internal
    
    
      byte
    
     ExtraLength;



        
    
      internal
    
    
      byte
    
     DataType;



        
    
      internal
    
    
      ushort
    
     Reserved;



        
    
      internal
    
    
      uint
    
     TotalBodyLength;



        
    
      internal
    
    
      uint
    
     Opaque;



        
    
      internal
    
    
      ulong
    
     Version;

    }
  

由于我們會直接把結構打包為字節數組,所以這里聲明了結構的內存布局。在Protocol.cs中,我們有一些實用的方法,比如結構和字節數組雙向轉換的實現:

    
      internal
    
    
      static
    
     T BytesToStruct<T>(
    
      this
    
    
      byte
    
    [] rawData)

        {

            T result = 
    
      default
    
    (T);

            RespectEndianness(
    
      typeof
    
    (T), rawData);

            GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned);

            
    
      try
    
    

            {

                IntPtr rawDataPtr = handle.AddrOfPinnedObject();

                result = (T)Marshal.PtrToStructure(rawDataPtr, 
    
      typeof
    
    (T));

            }

            
    
      finally
    
    

            {

                handle.Free();

            }

            
    
      return
    
     result;

        }



        
    
      internal
    
    
      static
    
    
      byte
    
    [] StructToBytes<T>(
    
      this
    
     T data)

        {

            
    
      byte
    
    [] rawData = 
    
      new
    
    
      byte
    
    [Marshal.SizeOf(data)];

            GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned);

            
    
      try
    
    

            {

                IntPtr rawDataPtr = handle.AddrOfPinnedObject();

                Marshal.StructureToPtr(data, rawDataPtr, 
    
      false
    
    );

            }

            
    
      finally
    
    

            {

                handle.Free();

            }

            RespectEndianness(
    
      typeof
    
    (T), rawData);

            
    
      return
    
     rawData;

        }



        
    
      private
    
    
      static
    
    
      void
    
     RespectEndianness(Type type, 
    
      byte
    
    [] data)

        {

            var fields = type.GetFields(BindingFlags.NonPublic | BindingFlags.Instance).Select(field => 
    
      new
    
    

            {

                Field = field,

                Offset = Marshal.OffsetOf(type, field.Name).ToInt32()

            }).ToList();



            fields.ForEach(item => Array.Reverse(data, item.Offset, Marshal.SizeOf(item.Field.FieldType)));

        }
  

在定義了頭之后,我們就可以封裝一個抽象的請求包了:

image

只要實現這個包,然后調用其GetBytes方法就可以直接獲得需要發送的請求數據包,它會在內部處理Header和Body數據的打包。

比如,我們來看一個Set操作的包實現:

    
      internal
    
    
      class
    
     SetRequestPackage : AbstractRequestPackage

    {

        
    
      private
    
     TimeSpan expireSpan;

        
    
      private
    
    
      byte
    
    [] valueBytes;

        
    
      private
    
    
      ulong
    
     version;



        
    
      public
    
    
      override
    
     Opcode Opcode

        {

            get { 
    
      return
    
     Opcode.Set; }

        }



        
    
      internal
    
     SetRequestPackage(
    
      string
    
     key, 
    
      byte
    
    [] valueBytes, TimeSpan expireSpan, 
    
      ulong
    
     version)

            : 
    
      base
    
    (key)

        {

            
    
      if
    
     (expireSpan > TimeSpan.FromDays(30))

                
    
      throw
    
    
      new
    
     ArgumentOutOfRangeException(
    
      "過期時間不能超過30天!"
    
    );

            
    
      this
    
    .expireSpan = expireSpan;

            
    
      this
    
    .valueBytes = valueBytes;

            
    
      this
    
    .version = version;

        }



        
    
      internal
    
     SetRequestPackage(
    
      string
    
     key, 
    
      string
    
    
      value
    
    , TimeSpan expireSpan, 
    
      ulong
    
     version)

            : 
    
      this
    
    (key, Encoding.UTF8.GetBytes(
    
      value
    
    ), expireSpan, version)

        {

        }



        
    
      internal
    
     SetRequestPackage(
    
      string
    
     key, 
    
      string
    
    
      value
    
    , 
    
      ulong
    
     version)

            : 
    
      this
    
    (key, Encoding.UTF8.GetBytes(
    
      value
    
    ), TimeSpan.FromDays(30), version)

        {

        }



        
    
      internal
    
     SetRequestPackage(
    
      string
    
     key, 
    
      byte
    
    [] valueBytes, 
    
      ulong
    
     version)

            : 
    
      this
    
    (key, valueBytes, TimeSpan.FromDays(30), version)

        {

        }



        
    
      protected
    
    
      override
    
    
      ulong
    
     GetVersion()

        {

            
    
      return
    
     version;

        }



        
    
      protected
    
    
      override
    
    
      byte
    
    [] GetExtraBytes()

        {

            var extraBytes = 
    
      new
    
     List<
    
      byte
    
    >();

            
    
      uint
    
     flag = 0xdeadbeef;

            extraBytes.AddRange(flag.GetBigEndianBytes());

            
    
      uint
    
     expire = Convert.ToUInt32(expireSpan.TotalSeconds);

            extraBytes.AddRange(expire.GetBigEndianBytes());

            
    
      return
    
     extraBytes.ToArray();

        }



        
    
      protected
    
    
      override
    
    
      byte
    
    [] GetValueBytes()

        {

            
    
      return
    
     valueBytes;

        }

    }
  

在這里,我們只是實現了抽象方法來為基類提供沒有的數據,并不需要關心數據是如何打包的。那么,之后發送Set請求的操作就很簡單了:

    
      private
    
    
      bool
    
     InternalSet(
    
      string
    
     key, 
    
      string
    
    
      value
    
    , TimeSpan expire, 
    
      ulong
    
     version)

        {

            
    
      using
    
     (var socket = GetCluster().AcquireSocket(key))

            {

                
    
      if
    
     (socket != 
    
      null
    
    )

                {

                    AbstractRequestPackage requestPackage = expire == TimeSpan.MaxValue ? 
    
      new
    
     SetRequestPackage(key, 
    
      value
    
    , version)

                            : 
    
      new
    
     SetRequestPackage(key, 
    
      value
    
    , expire, version);

                    var requestData = requestPackage.GetBytes();

                    
    
      if
    
     (requestData != 
    
      null
    
    )

                    {

                        socket.Write(requestData);

                        var responsePackage = ResponsePackageCreator.GetPackage(socket);

                        
    
      if
    
     (responsePackage != 
    
      null
    
    )

                        {

                            
    
      if
    
     (responsePackage.ResponseStatus == ResponseStatus.NoError)

                            {

                                
    
      return
    
    
      true
    
    ;

                            }

                            
    
      else
    
    
      if
    
     (responsePackage.ResponseStatus != ResponseStatus.KeyExists

                                    && responsePackage.ResponseStatus != ResponseStatus.KeyNotFound)

                            {

                                LocalLoggingService.Warning(
    
      "在 {0} 上執行操作 {1} 得到了不正確的回復 Key : {2} -> {3}"
    
    ,

                                            socket.Endpoint.ToString(),

                                            requestPackage.Opcode,

                                            key,

                                            responsePackage.ResponseStatus);

                            }

                        }

                        
    
      else
    
    

                        {

                            LocalLoggingService.Error(
    
      "在 {0} 上執行操作 {1} 沒有得到回復 Key : {2}"
    
    ,

                                       socket.Endpoint.ToString(),

                                       requestPackage.Opcode,

                                       key);

                        }

                    }

                }

            }

            
    
      return
    
    
      false
    
    ;

        }
  

1)首先是獲取到Cluster,再獲取到池中的Socket

2)然后初始化一個SetRequestPackage,再通過GetBytes獲得數據

3)直接把數據寫入Socket

4)通過ResponsePackageCreator來獲得返回的數據包

?

很明顯,ResponsePackageCreator和AbstractRequestPackage的意圖差不多,用來把響應的數據包封裝成我們需要的數據,其中有一個:

    
      internal
    
    
      static
    
     GeneralResponsePackage GetPackage(ClientSocket socket)
  

獲得的是一個通用的響應數據包:

    
      internal
    
    
      class
    
     GeneralResponsePackage

    {

        
    
      internal
    
     Opcode Opcode { get; set; }



        
    
      internal
    
     ResponseStatus ResponseStatus { get; set; }



        
    
      internal
    
    
      string
    
     Key { get; set; }



        
    
      internal
    
    
      byte
    
    [] ValueBytes { get; set; }



        
    
      internal
    
    
      ulong
    
     Version { get; set; }



        
    
      internal
    
    
      string
    
     Value

        {

            get

            {

                
    
      if
    
     (ValueBytes != 
    
      null
    
    )

                {

                    
    
      return
    
     Encoding.UTF8.GetString(ValueBytes);

                }

                
    
      else
    
    

                {

                    
    
      return
    
    
      null
    
    ;

                }

            }

        }

    }
  

在這里基本的信息都有了,比如操作代碼、響應狀態、Key、Value、版本號。正因為Memcached的協議比較簡單,所有的響應包都是這么一個格式,所以我們并沒有實現特殊的響應包。如果要實現的話,只需要在類頭部標記OpCode并且繼承GeneralResponsePackage,ResponsePackageCreator會自動返回相應的子類:

        [AttributeUsage(AttributeTargets.Class)]

    
    
      internal
    
    
      class
    
     ResponsePackageAttribute : Attribute

    {

        
    
      internal
    
     Opcode Opcode { get; 
    
      private
    
     set; }



        
    
      internal
    
     ResponsePackageAttribute(Opcode opcode)

        {

            
    
      this
    
    .Opcode = opcode;

        }

    }
  

在獲得了響應之后,通過判斷ResponseStatus來知道響應是否正確,并且記錄相關日志即可。這么一來,數據一去一回以及協議如何實現的整個過程就介紹完了。下面,我們再介紹一下客戶端中幾個特色功能的實現。

?

1)獲取一組Key功能。由于一個集群會有多個節點,所以要獲取一組Key,我們首先需要把Key按照節點分類,然后對于不同的節點,采用并行的方式同時獲取,這樣速度會很快,代碼片段如下:

                var nodeCache = 
    
      new
    
     Dictionary<ClientNode, List<
    
      string
    
    >>();

            
    
      foreach
    
     (var key 
    
      in
    
     keys)

            {

                var node = GetCluster().AcquireNode(key);

                
    
      if
    
     (!nodeCache.ContainsKey(node))

                    nodeCache.Add(node, 
    
      new
    
     List<
    
      string
    
    > { key });

                
    
      else
    
    
      if
    
     (!nodeCache[node].Contains(key))

                    nodeCache[node].Add(key);

            }



            var data = 
    
      new
    
     Dictionary<
    
      string
    
    , 
    
      string
    
    >();

            Parallel.ForEach(nodeCache, node =>
  

2)List功能。Memcached只提供了Key、Value的存儲,有的時候我們的Value是一個列表,那么我們可以有兩種方式完成這個功能。第一種就是直接把列表序列化作為一個Value保存,優點是簡單,缺點是如果以后需要修改的話需要整個列表取出,修改后再把整個列表保存進去,并且由于Memcached Value大小的限制,這么做也不能保存大列表;第二種方式是一個Value保存列表中的一個項,再使用一個KeyValue來保存其中每一項的ID,這么優點是修改方便,獲取的數據可以是列表中的一部分,缺點是實現麻煩,要考慮并發問題、要維護另外一個KeyValue來保存所有的ID。在這里,我們封裝了后一種方式的實現。

3)Locker功能。使用Memcached完成鎖的功能其實很簡單,我們只需要在獲取鎖的時候判斷Add一個空值是否成功,如果不成功則表示占有,等待一段時間嘗試獲取,一直到超時,在返回鎖的時候刪除這個項即可。在這里,我們封裝了MemcachedLocker來完成這個功能。

Adhesive框架系列文章--分布式組件客戶端模塊實現


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!??!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 嘿咻视频在线观看 | 久久精品亚瑟全部免费观看 | 波多野结衣一区免费作品 | 婷婷的五月 | 亚洲视频 中文字幕 | 九九热国产精品视频 | 国产欧美综合精品一区二区 | 99精品国产成人a∨免费看 | 天天欧美| 高清国产美女一级a毛片录 高清国产美女一级毛片 | 情趣视频网站在线观看 | 在线观看免费情网站大全 | 偷偷鲁影院手机在线观看 | 四虎最新网| 草逼com| 久久五月天综合网 | 黄黄网址 | 欧美日韩精品高清一区二区 | 中文字幕 日韩在线 | 真实偷拍激情啪啪对白 | 色天天干 | 日韩欧美精品在线视频 | 四虎影永久地址www 四虎影永久在线高清免费 四虎影永久在线观看精品 四虎影永久在线观看网址 四虎影院.com | 综合一区| 精品夜夜春夜夜爽久久 | 国产高清在线a视频大全凹凸 | 国产成人精品曰本亚洲78 | 草逼网站| 91精品国产91久久久久久青草 | 成人99国产精品 | 九九影院理论片私人影院 | 国产一级久久久久久毛片 | 精品久久久久久国产 | 亚州精品视频 | 天天曰夜夜操 | 亚洲精品99久久久久中文字幕 | 欧美日本一本线在线观看 | 欧美一级毛片免费观看视频 | 日本高清一道本 | 久久精品青草社区 | 在线国产网站 |