Mongodb數據服務可以直接接受任何類型數據,并且它設計為可以承受大量數據的寫入。為了能保存任何類型的數據,并且在后臺可以查看任何類型的數據,因此我們必須在收到數據的時候對數據的元數據進行提取,隨同主體數據一并保存在數據庫中。對數據本身也需要重新組織結構,相當于進行一次序列化,然后保存到數據庫中。雖然Mongodb是支持Json格式的,但是由于我們在保存數據的時候還有很多邏輯,因此我們必須手動進行這個工作。其實對于提交數據來說,應該是一個非常快的動作,應該以異步方式進行,在一個盡量短的時間內讓方法的調用可以返回,之后可以在后臺慢慢進行數據的轉換和數據發送到遠端。因此,開發了一個內存隊列服務模塊來進行異步隊列處理工作,并且提交數據到遠端也使用了框架內部的Wcf分布式服務模塊。當然,在服務端道理也一樣,我們可以通過一個內存隊列來批量提交數據,并且讓服務的調用盡快返回。Mongodb數據服務提交數據的過程如下:
項目的結構如下:
1、Mongodb項目是客戶端部分的接口
2、Mongodb.Imp項目是客戶端部分的實現
3、Mongodb.Server是服務端部分的接口,或者說是服務契約
4、Mongodb.Server.Imp是服務端部分的實現
可以看到Mongodb數據本身依賴應用程序信息中心模塊、配置服務模塊、內存隊列服務模塊、Wcf分布式服務模塊,對于大部分客戶端應用程序來說都應該只依賴Mongodb數據服務的客戶端而不是服務端。我們把Mongodb數據服務分成兩部分,插入數據的服務和查詢服務,后者的使用者一般而言只有Mongodb數據服務的后臺。本文主要介紹前者:
public interface IMongodbInsertService : IDisposable { void Insert( object item); }
從接口本身來看非常簡單,只有一個方法。我們來看看它的實現步驟:
1、調用配置服務,查看這個數據類型對應的配置,說到這里,讓我們來看一下Mongodb數據服務客戶端的配置:
[ConfigEntity(FriendlyName = "Mongodb客戶端配置" )] public class MongodbServiceConfigurationEntity { [ConfigItem(FriendlyName = "插入服務配置項列表" )] public Dictionary< string , MongodbInsertServiceConfigurationItem> MongodbInsertServiceConfigurationItems { get; set; } }
每一個類型的配置項如下:
[ConfigEntity(FriendlyName = "Mongodb客戶端針對每個數據類型的配置" )] public class MongodbInsertServiceConfigurationItem { [ConfigItem(FriendlyName = "類型完整名" )] public string TypeFullName { get; set; } [ConfigItem(FriendlyName = "是否提交到服務端" )] public bool SubmitToServer { get; set; } [ConfigItem(FriendlyName = "隊列最大項數" )] public int MaxItemCount { get; set; } [ConfigItem(FriendlyName = "消費的線程總數" )] public int ConsumeThreadCount { get; set; } [ConfigItem(FriendlyName = "消費數據的時間間隔毫秒" )] public int ConsumeIntervalMilliseconds { get; set; } [ConfigItem(FriendlyName = "遇到錯誤時消費數據的時間間隔毫秒" )] public int ConsumeIntervalWhenErrorMilliseconds { get; set; } [ConfigItem(FriendlyName = "消費數據的批量項數" )] public int ConsumeItemCountInOneBatch { get; set; } [ConfigItem(FriendlyName = "達到最大項數后的策略" )] public MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; } [ConfigItem(FriendlyName = "消費數據時不足批次數的策略" )] public MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; } [ConfigItem(FriendlyName = "消費數據遇到錯誤的策略" )] public MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; } public MongodbInsertServiceConfigurationItem() { TypeFullName = "" ; SubmitToServer = true ; ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems .Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond); ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException; ConsumeThreadCount = 1; ConsumeIntervalMilliseconds = 10; ConsumeIntervalWhenErrorMilliseconds = 1000; ConsumeItemCountInOneBatch = 100; NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems; MaxItemCount = 10000; } }
這里可以看到,除了是否提交到服務端這個配置,大多數的配置其實是內存隊列服務的配置,在之后的文章中我們單獨會介紹內存隊列服務。之所以需要為Mongodb數據服務的客戶端設置這樣的配置,一方面是允許修改隊列服務的配置,另一方面是為了限制沒有經過配置隨便什么數據都往服務端發送,只有在后臺顯式配置的數據類型,才會發生到服務端。
2、如果沒獲取到配置的話返回,如果獲取到配置的話,則為這個類型初始化內存隊列服務,設置一系列隊列服務的參數,并且把隊列的處理委托掛載我們提交數據到服務端的處理方法。換句話說是每一個類型都會有自己的內存隊列服務,我們在MongodbInsertService的實現定義了一個靜態字典用于保存內存隊列服務的實現:
private static Dictionary< string , IMemoryQueueService> submitDataMemoryQueueServices = new Dictionary< string , IMemoryQueueService>();
if (!submitDataMemoryQueueServices.ContainsKey(typeFullName)) { lock (submitDataMemoryQueueServices) { if (!submitDataMemoryQueueServices.ContainsKey(typeFullName)) { var memoryQueueService = LocalServiceLocator.GetService<IMemoryQueueService>(); memoryQueueService.Init( new MemoryQueueServiceConfiguration( string .Format( "{0}_{1}" , ServiceName, typeFullName), InternalSubmitData) { ConsumeErrorAction = config.ConsumeErrorAction, ConsumeIntervalMilliseconds = config.ConsumeIntervalMilliseconds, ConsumeIntervalWhenErrorMilliseconds = config.ConsumeIntervalWhenErrorMilliseconds, ConsumeItemCountInOneBatch = config.ConsumeItemCountInOneBatch, ConsumeThreadCount = config.ConsumeThreadCount, MaxItemCount = config.MaxItemCount, NotReachBatchCountConsumeAction = config.NotReachBatchCountConsumeAction, ReachMaxItemCountAction = config.ReachMaxItemCountAction, }); submitDataMemoryQueueServices.Add(typeFullName, memoryQueueService); } } }
3、然后會判斷是否已經提取過這個類型元數據了,如果沒提取過則嘗試提取元數據并加入緩存:
if (!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName)) { lock (mongodbDatabaseDescriptionCache) { if (!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName)) { MongodbDatabaseDescription mongodbDatabaseDescription = GetMongodbDatabaseDescription(item); CheckMongodbDatabaseDescription(mongodbDatabaseDescription); mongodbDatabaseDescriptionCache.Add(typeFullName, mongodbDatabaseDescription); } } }
4、把數據加入隊列,等待隊列服務在合適的時候調用處理方法(也就是發送到服務端):
if
(config.SubmitToServer)
{
submitDataMemoryQueueServices[typeFullName].Enqueue(item);
}
?
其實到這里為止,方法已經返回了,之后就是隊列服務在后臺的異步調用了。現在我們來深入一下細節,首先看一下GetMongodbDatabaseDescription是如何提取元數據的,這個方法返回的是MongodbDatabaseDescription,它的定義如下:
[DataContract(Namespace = "Adhesive.Mongodb" )] public class MongodbDatabaseDescription { [DataMember] public bool SentToServer { get; set; } [DataMember] public string TypeFullName { get; set; } [DataMember] public string DatabasePrefix { get; set; } [DataMember] public string CategoryName { get; set; } [DataMember] public string Name { get; set; } [DataMember] public string DisplayName { get; set; } [DataMember] public int ExpireDays { get; set; } [DataMember] public List<MongodbColumnDescription> MongodbColumnDescriptionList { get; set; } [DataMember] public List<MongodbEnumColumnDescription> MongodbEnumColumnDescriptionList { get; set; } }
在這里可以看到,我們主要解析的是MongodbPersistenceEntityAttribute,對于下一級的MongodbColumnDescriptionList ,我們主要是解析每一個列的元數據,而MongodbEnumColumnDescriptionList則提取所有枚舉的信息。MongodbColumnDescription的定義如下:
[DataContract(Namespace = "Adhesive.Mongodb" )] public class MongodbColumnDescription { [DataMember] public string Name { get; set; } [DataMember] public string TypeName { get; set; } [DataMember] public bool IsArrayColumn { get; set; } [DataMember] public bool IsEntityColumn { get; set; } [DataMember] public string ColumnName { get; set; } [DataMember] public string DisplayName { get; set; } [DataMember] public string Description { get; set; } [DataMember] public bool ShowInTableView { get; set; } [DataMember] public bool IsTableColumn { get; set; } [DataMember] public bool IsTimeColumn { get; set; } [DataMember] public bool IsContextIdentityColumn { get; set; } [DataMember] public bool IsPrimaryKey { get; set; } [DataMember] public MongodbIndexOption MongodbIndexOption { get; set; } [DataMember] public MongodbFilterOption MongodbFilterOption { get; set; } [DataMember] public MongodbCascadeFilterOption MongodbCascadeFilterOption { get; set; } [DataMember] public MongodbSortOption MongodbSortOption { get; set; } }
這里很多數據都來自MongodbPersistenceItemAttribute和MongodbPresentationItemAttribute。再來看看MongodbEnumColumnDescription:
[DataContract(Namespace = "Adhesive.Mongodb" )] public class MongodbEnumColumnDescription { [DataMember] public string Name { get; set; } [DataMember] public Dictionary< string , string > EnumItems { get; set; } }
它就簡單了,只是保存枚舉的列名,和枚舉每一項的數據。其實這些元數據提取本身沒什么復雜的,可以想到是反射提取,并且其中還涉及到遞歸,需要深入每一個自定義類型,GetMongodbColumnDescription方法其中有一段這樣的代碼實現了遞歸:
if (!type.Assembly.GlobalAssemblyCache && type != pi.DeclaringType) { columnDescription.IsEntityColumn = true ; var properties = GetPropertyListFromCache(type); if (properties != null ) { foreach (var property in properties) { GetMongodbColumnDescription(typeFullName, fullName, columnDescriptionList, enumColumnDescriptionList, property); } } }
在提取元數據的時候,另一個重要的工作是緩存一些關鍵的PropertyInfo的配置,以便后期處理數據的時候使用:
internal class ProperyInfoConfig { public bool IsCascadeFilterLevelOne { get; set; } public bool IsCascadeFilterLevelTwo { get; set; } public bool IsCascadeFilterLevelThree { get; set; } public bool IsDateColumn { get; set; } public bool IsTableName { get; set; } public bool IsIgnore { get; set; } public string ColumnName { get; set; } }
因為我們在提交數據之前,需要針對級聯下拉的數據進行處理,把第二級的值設置為第一級的值加上第二級的值,第三級的值設置為一加二加三,這樣在篩選的時候就會很方便;此外還需要替換列名,計算表名等等,只有緩存了PropertyInfo才能無需重新讀取元數據:
private static Dictionary< string , Dictionary<PropertyInfo, ProperyInfoConfig>> propertyConfigCache = new Dictionary< string , Dictionary<PropertyInfo, ProperyInfoConfig>>();
之前說了元數據提取部分時的邏輯,然后來看一下格式化數據時的邏輯,之前為內存隊列服務的提交數據的委托掛載的方法主要實現如下:
var mongodbDataList = items.Select(_ => ConvertItemToMongodbData(_)).Where(_ => _ != null ).ToList(); var desc = mongodbDatabaseDescriptionCache[typeFullName]; WcfServiceLocator.GetSafeService<IMongodbServer>().SubmitData(mongodbDataList, desc.SentToServer ? null : desc);
先是獲取要提交的數據,然后再獲取元數據,如果有的話和主體數據一并提交到服務端。通過Wcf分布式數據服務獲取到IMongodbServer,并調用它的SubmitData方法,定義如下:
[OperationContract]
void
SubmitData(IList<MongodbData> dataList, MongodbDatabaseDescription databaseDescription);
MongodbData的定義如下:
[DataContract(Namespace = "Adhesive.Mongodb" )] public class MongodbData { [DataMember] public string TypeFullName { get; set; } [DataMember] public string DatabaseName { get; set; } [DataMember] public string TableName { get; set; } [DataMember] public string Data { get; set; } }
在這里可以發現Data是字符串類型,那是因為我們把要提交的數據主體轉換成了Json,否則我們是無法通過Wcf提交Dictionary<string, object>構成的一顆無限級樹的。在這里,我們略去介紹ConvertItemToMongodbData的實現,它其實并不復雜,也是通過遞歸和反射無限級獲取類的所有屬性的值,并轉換為Dictionary<string, object>,只不過在這里面需要處理列表類型、字典類型以及枚舉。
?
至此為止,客戶端的部分介紹完了,現在我們來看一下服務端部分。首先,服務端也有根據每一個類型的配置:
[ConfigEntity(FriendlyName = "Mongodb服務端針對每個數據類型的配置" )] public class MongodbServerConfigurationItem { [ConfigItem(FriendlyName = "類型完整名" )] public string TypeFullName { get; set; } [ConfigItem(FriendlyName = "服務器名" )] public string MongodbServerUrlName { get; set; } [ConfigItem(FriendlyName = "是否提交到數據庫" )] public bool SubmitToDatabase { get; set; } [ConfigItem(FriendlyName = "隊列最大項數" )] public int MaxItemCount { get; set; } [ConfigItem(FriendlyName = "消費的線程總數" )] public int ConsumeThreadCount { get; set; } [ConfigItem(FriendlyName = "消費數據的時間間隔毫秒" )] public int ConsumeIntervalMilliseconds { get; set; } [ConfigItem(FriendlyName = "遇到錯誤時消費數據的時間間隔毫秒" )] public int ConsumeIntervalWhenErrorMilliseconds { get; set; } [ConfigItem(FriendlyName = "消費數據的批量項數" )] public int ConsumeItemCountInOneBatch { get; set; } [ConfigItem(FriendlyName = "達到最大項數后的策略" )] public MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; } [ConfigItem(FriendlyName = "消費數據時不足批次數的策略" )] public MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; } [ConfigItem(FriendlyName = "消費數據遇到錯誤的策略" )] public MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; } public MongodbServerConfigurationItem() { TypeFullName = "" ; SubmitToDatabase = true ; ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems .Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond); ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException; ConsumeThreadCount = Environment.ProcessorCount; ConsumeIntervalMilliseconds = 10; ConsumeIntervalWhenErrorMilliseconds = 1000; ConsumeItemCountInOneBatch = 100; NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems; MaxItemCount = 100000; } }
這個配置和客戶端的配置差不多,只不過這里把是否提交到服務端改為了是否提交到數據庫。在獲取了配置之后,同樣把數據提交到內存隊列,然后由內存隊列提交到數據庫。核心代碼如下:
try { var sw = Stopwatch.StartNew(); var server = CreateMasterMongoServer(typeFullName); if (server != null ) { var database = server.GetDatabase(item.DatabaseName); var collection = database.GetCollection(item.TableName); var documentList = new List<BsonDocument>(); JavaScriptSerializer s = new JavaScriptSerializer(); mongodbDataList.ForEach(i => { var dic = s.DeserializeObject(i.Data) as IDictionary; var document = new BsonDocument().Add(dic); documentList.Add(document); }); collection.InsertBatch(documentList); LocalLoggingService.Debug( "Mongodb服務端成功服務提交 {0} 條數據到數據庫,類型是 '{1}',耗時 {2} 毫秒" , documentList.Count, typeFullName, sw.ElapsedMilliseconds); } } catch (Exception ex) { AppInfoCenterService.ExceptionService.Handle(ex, categoryName: ServiceName, subcategoryName: typeFullName, description: "寫入數據出現錯誤" , extraInfo: new ExtraInfo { DisplayItems = new Dictionary< string , string >() { { "DatabaseName" , item.DatabaseName}, { "TableName" , item.TableName} } }); } }
首先是Json反序列化獲取到數據,然后轉換為BsonDocument,最后批量提交到數據庫中。
本文介紹了Mongodb數據服務的插入數據部分在客戶端和服務端之間的邏輯,下一篇將介紹Mongodb數據服務查詢數據的部分。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
