我們除了使用WF提供的SqlWorkflowPersistenceService外,還可以自定義持久化服務。因為有的時候你可能不想使用Sql Server數據庫,我們就可以通過自定義持久化服務來使用其他的數據庫,文件等來進行持久化存儲。
一:1.1 我們先看一個MSDN中的例子,當從內存中卸載工作流時,工作流運行時可使用該服務將工作流實例狀態保存到文件。該持久服務類代碼如下FilePersistence.cs:
???
public
class
FilePersistenceService : WorkflowPersistenceService
??? {
???????
public
readonly
static
TimeSpan MaxInterval =
new
TimeSpan(30, 0, 0, 0);
???????
private
bool
unloadOnIdle =
false
;
???????
private
Dictionary<Guid,Timer> instanceTimers;
???????
public
FilePersistenceService(
bool
unloadOnIdle)
??????? {
???????????
this
.unloadOnIdle = unloadOnIdle;
???????????
this
.instanceTimers =
new
Dictionary<Guid, Timer>();
??????? }
???????
protected
override
void
SaveWorkflowInstanceState(Activity rootActivity,
bool
unlock)
??????? {
???????????
// Save the workflow
??????????? Guid contextGuid = (Guid)rootActivity.GetValue(Activity.ActivityContextGuidProperty);
??????????? Console.WriteLine("
Saving instance: {0}\n
", contextGuid);
??????????? SerializeToFile( WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity), contextGuid);
???????????
// See when the next timer (Delay activity) for this workflow will expire
??????????? TimerEventSubscriptionCollection timers = (TimerEventSubscriptionCollection)rootActivity.GetValue(TimerEventSubscriptionCollection.TimerCollectionProperty);
??????????? TimerEventSubscription subscription = timers.Peek();
???????????
if
(subscription !=
null
)
??????????? {
???????????????
// Set a system timer to automatically reload this workflow when its next timer expires
??????????????? TimerCallback callback =
new
TimerCallback(ReloadWorkflow);
??????????????? TimeSpan timeDifference = subscription.ExpiresAt - DateTime.UtcNow;
???????????????
// check to make sure timeDifference is in legal range
???????????????
if
(timeDifference > FilePersistenceService.MaxInterval)
??????????????? {
??????????????????? timeDifference = FilePersistenceService.MaxInterval;
??????????????? }
???????????????
else
if
(timeDifference < TimeSpan.Zero)
??????????????? {
??????????????????? timeDifference = TimeSpan.Zero;
??????????????? }
???????????????
this
.instanceTimers.Add(contextGuid,
new
System.Threading.Timer(
??????????????????? callback,
??????????????????? subscription.WorkflowInstanceId,
??????????????????? timeDifference,
???????????????????
new
TimeSpan(-1)));
??????????? }
??????? }
???????
private
void
ReloadWorkflow(
object
id)
??????? {
???????????
// Reload the workflow so that it will continue processing
??????????? Timer toDispose;
???????????
if
(
this
.instanceTimers.TryGetValue((Guid)id,
out
toDispose))
??????????? {
???????????????
this
.instanceTimers.Remove((Guid)id);
??????????????? toDispose.Dispose();
??????????? }
???????????
this
.Runtime.GetWorkflow((Guid)id);
??????? }
???????
// Load workflow instance state.
???????
protected
override
Activity LoadWorkflowInstanceState(Guid instanceId)
??????? {
??????????? Console.WriteLine("
Loading instance: {0}\n
", instanceId);
???????????
byte
[] workflowBytes = DeserializeFromFile(instanceId);
???????????
return
WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes,
null
);
??????? }
???????
// Unlock the workflow instance state.
???????
// Instance state locking is necessary when multiple runtimes share instance persistence store
???????
protected
override
void
UnlockWorkflowInstanceState(Activity state)
??????? {
???????????
//File locking is not supported in this sample
??????? }
???????
// Save the completed activity state.
???????
protected
override
void
SaveCompletedContextActivity(Activity activity)
??????? {
??????????? Guid contextGuid = (Guid)activity.GetValue(Activity.ActivityContextGuidProperty);
??????????? Console.WriteLine("
Saving completed activity context: {0}
", contextGuid);
??????????? SerializeToFile(
??????????????? WorkflowPersistenceService.GetDefaultSerializedForm(activity), contextGuid);
??????? }
???????
// Load the completed activity state.
???????
protected
override
Activity LoadCompletedContextActivity(Guid activityId, Activity outerActivity)
??????? {
??????????? Console.WriteLine("
Loading completed activity context: {0}
", activityId);
???????????
byte
[] workflowBytes = DeserializeFromFile(activityId);
??????????? Activity deserializedActivities = WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, outerActivity);
???????????
return
deserializedActivities;
??????? }
???????
protected
override
bool
UnloadOnIdle(Activity activity)
??????? {
???????????
return
unloadOnIdle;
??????? }
???????
// Serialize the activity instance state to file
???????
private
void
SerializeToFile(
byte
[] workflowBytes, Guid id)
??????? {
??????????? String filename = id.ToString();
??????????? FileStream fileStream =
null
;
???????????
try
??????????? {
???????????????
if
(File.Exists(filename))
??????????????????? File.Delete(filename);
??????????????? fileStream =
new
FileStream(filename, FileMode.CreateNew, FileAccess.Write, FileShare.None);
???????????????
// Get the serialized form
??????????????? fileStream.Write(workflowBytes, 0, workflowBytes.Length);
??????????? }
???????????
finally
??????????? {
???????????????
if
(fileStream !=
null
)
??????????????????? fileStream.Close();
??????????? }
??????? }
???????
// Deserialize the instance state from the file given the instance id
???????
private
byte
[] DeserializeFromFile(Guid id)
??????? {
??????????? String filename = id.ToString();
??????????? FileStream fileStream =
null
;
???????????
try
??????????? {
???????????????
// File opened for shared reads but no writes by anyone
??????????????? fileStream =
new
FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read);
??????????????? fileStream.Seek(0, SeekOrigin.Begin);
???????????????
byte
[] workflowBytes =
new
byte
[fileStream.Length];
???????????????
// Get the serialized form
??????????????? fileStream.Read(workflowBytes, 0, workflowBytes.Length);
???????????????
return
workflowBytes;
??????????? }
???????????
finally
??????????? {
??????????????? fileStream.Close();
??????????? }
??????? }
??? }
1.2 看看我們的工作流設計,只需要放入一個DelayActivity即可并設置他的Timeout時間,如下圖:
1.3 宿主程序加載了我們自定義的持久化服務后,執行結果如下:
二:上面的例子其實很簡單,我們只是做了一些基本的操作,還有很多工作沒有做。我們就來說說如何自定義持久化服務。自定義持久化服務大概有以下幾步:
1.定義自己的持久化類FilePersistenceService ,必須繼承自WorkflowPersistenceService.
2.實現WorkflowPersistenceService中所有的抽象方法,下面會具體介紹。
3.把自定義的持久化服務裝載到工作流引擎中(和裝載WF提供的標準服務的方式一樣)。
下面我們來介紹下WorkflowPersistenceService類中相關的抽象方法:
2.1 SaveWorkflowInstanceState: 將工作流實例狀態保存到數據存儲區。
protected internal abstract void SaveWorkflowInstanceState(Activity rootActivity, bool unlock)
rootActivity:工作流實例的根 Activity。
unlock:如果工作流實例不應鎖定,則為 true;如果工作流實例應該鎖定,則為 false。 關于鎖方面的我們暫時不提。
?
在這個方法中我們會把rootActivity 序列化到 Stream 中,可以看我們上面的例子中的代碼實現的該方法中有這樣一段
?
SerializeToFile( WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity), contextGuid);
private void SerializeToFile( byte [] workflowBytes, Guid id)
?
SerializeToFile方法的第一個參數是需要一個byte[]的數組,我們是使用WorkflowPersistenceService.GetDefaultSerializedForm
(rootActivity)來實現的,那我們Reflector一下WorkflowPersistenceService這個類中的GetDefaultSerializedForm
方法,代碼如下:
protected static byte [] GetDefaultSerializedForm(Activity activity) { DateTime now = DateTime.Now; using (MemoryStream stream = new MemoryStream(0x2800)) { stream.Position = 0L; activity.Save(stream); using (MemoryStream stream2 = new MemoryStream(( int ) stream.Length)) { using (GZipStream stream3 = new GZipStream(stream2, CompressionMode.Compress,
true )) { stream3.Write(stream.GetBuffer(), 0, ( int ) stream.Length); } ActivityExecutionContextInfo info = (ActivityExecutionContextInfo)
activity.GetValue(Activity.ActivityExecutionContextInfoProperty); TimeSpan span = (TimeSpan) (DateTime.Now - now); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, " Serialized a
{0} with id {1} to length {2}. Took {3}. ", new object [] { info, info.ContextGuid,
stream2.Length, span }); byte [] array = stream2.GetBuffer(); Array.Resize< byte >( ref array, Convert.ToInt32(stream2.Length)); return array; } } }
可以看出主要是調用了activity.Save(stream); ,將rootActivity 序列化到 Stream 中,如果我們自定義這個方法我們要調用 Activity的save方法將Activity序列化到stream中去,我們在實現LoadWorkflowInstanceState方法時會調用Activity的Load方法來讀取,另外我們把工作流保存到持久化存儲里我們一般都使用WorkflowInstanceId來做為唯一性標識
當工作流實例完成或終止時,工作流運行時引擎最后一次調用 SaveWorkflowInstanceState。因此,如果 GetWorkflowStatus等于 Completed或 Terminated,則可以從數據存儲區中安全地刪除工作流實例及其所有關聯的已完成作用域。此外,可以訂閱 WorkflowCompleted或 WorkflowTerminated事件,確定何時可以安全地刪除與工作流實例關聯的記錄。是否確實從數據存儲區中刪除記錄取決于您的實現。
如果無法將工作流實例狀態保存到數據存儲區,則應引發帶有適當錯誤消息的 PersistenceException。
? 2.2 LoadWorkflowInstanceState : 將 SaveWorkflowInstanceState 中保存的工作流實例的指定狀態加載回內存
protected internal abstract Activity LoadWorkflowInstanceState(Guid instanceId)
必須還原活動的相同副本。為此,必須從數據存儲區中工作流實例的表示形式中還原有效的 Stream;然后,必須將此 Stream 傳遞到重載的 Load 方法之一,用于反序列化工作流實例。如果持久性服務無法從其數據存儲區加載工作流實例狀態,則它應引發帶有適當消息的 PersistenceException。
在我們上面例子中實現的該方法中,
protected override Activity LoadWorkflowInstanceState(Guid instanceId) { Console.WriteLine(" Loading instance: {0}\n ", instanceId); byte [] workflowBytes = DeserializeFromFile(instanceId); return WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, null ); }
我們調用了WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, null);方法
我們Reflector出WorkflowPersistenceService類的代碼后可以看到,如下代碼:
protected
static
Activity RestoreFromDefaultSerializedForm(
byte
[] activityBytes, Activity outerActivity)
??? {
??????? Activity activity;
??????? DateTime now = DateTime.Now;
??????? MemoryStream stream =
new
MemoryStream(activityBytes);
??????? stream.Position = 0L;
???????
using
(GZipStream stream2 =
new
GZipStream(stream, CompressionMode.Decompress,
true
))
??????? {
??????????? activity = Activity.Load(stream2, outerActivity);
??????? }
??????? TimeSpan span = (TimeSpan) (DateTime.Now - now);
??????? WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "
Deserialized a {0}
??????????????????????????????????????????????????? to length {1}. Took {2}.
",
new
object
[] { activity, stream.Length, span });
???????
return
activity;
??? }
這段代碼核心的調用了Activity.Load方法將從 Stream 加載 Activity的實例。
2.3 SaveCompletedContextActivity
protected internal abstract void SaveCompletedContextActivity(Activity activity)
activity:表示已完成范圍的 Activity。
?
將指定的已完成作用域保存到數據存儲區。保存完成活動的AEC環境以便實現補償,比如WhileActivity他每次循環的
都會創建新的AEC環境,這個時候完成的活動的AEC就會被保存,但是前提是這個活動要支持補償才可以,所有如果你的WhileActivity里包含SequenceActivity這樣該方法是不會被調用的,如果你換成CompensatableSequenceActivity就可以了
工作流運行時引擎保存已完成作用域活動的狀態,以便實現補償。必須調用重載的 Save 方法之一,將 activity 序列化到 Stream 中;然后可以選擇在將 Stream 寫入到數據存儲區之前,對其執行其他處理。但是,在工作流運行時引擎調用 LoadCompletedContextActivity時,必須還原活動的相同副本。
本例子的程序中不會涉及到這部分
也同樣使用了WorkflowPersistenceService的GetDefaultSerializedForm方法
2.4 LoadCompletedContextActivity?
和SaveCompletedContextActivity是對應的,加載SaveCompletedContextActivity中保存的已完成活動的AEC,就不多說了
2.5 UnlockWorkflowInstanceState: 解除對工作流實例狀態的鎖定。
此方法是抽象的,因此它不包含對鎖定和解鎖的默認實現。
實現自定義持久性服務時,如果要實現鎖定方案,則需要重寫此方法,并在 SaveWorkflowInstanceState方法中提供根據解鎖參數的值進行鎖定-解鎖的機制。
比如我們的工作流在取消的時候,這個方法被調用來解除工作流實例狀態的鎖定。
2.6 UnloadOnIdle
返回一個布爾值,確定在工作流空閑時是否將其卸載。
這些都只是自定義持久化中最基本的,就先說這些吧。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
