目錄
背景 CAS CAS 的標(biāo)準(zhǔn)模式 累加示例 寫著玩的 RingBuffer 備注
背景 返回目錄
大多數(shù)企業(yè)開發(fā)人員都理解數(shù)據(jù)庫樂觀并發(fā)控制,不過很少有人聽說過 CAS(我去年才聽說這個(gè)概念),CAS 是多線程樂觀并發(fā)控制策略的一種,一些無鎖的支持并發(fā)的數(shù)據(jù)結(jié)構(gòu)都會(huì)使用到 CAS,本文對比 CAS 和 數(shù)據(jù)庫樂觀并發(fā)控制,以此達(dá)到強(qiáng)化記憶的目的。
CAS 返回目錄
CAS = Compare And Swap
多線程環(huán)境下 this.i = this.i + 1 是沒有辦法保證線程安全的,因此就有了 CAS,CAS 可以保證上面代碼的線程安全性,但是 CAS 并不會(huì)保證 Swap 的成功,只有 Compare 成功了才會(huì) Swap,即:沒有并發(fā)發(fā)生,即:在我讀取和修改之間沒有別人修改。另外說一點(diǎn),如果 i 是局部變量,即:i = i + 1,那么這段代碼是線程安全的,因?yàn)榫植孔兞渴蔷€程獨(dú)享的。
不明白 CAS 沒關(guān)系,下面通過 CAS 的標(biāo)準(zhǔn)模式 和 一個(gè)簡單的示例來理解 CAS。
CAS 的標(biāo)準(zhǔn)模式 返回目錄
偽代碼
1 var localValue, currentValue; 2 do 3 { 4 localValue = this . 5 6 var newValue = 執(zhí)行一些計(jì)算; 7 8 currentValue = Interlocked.CompareExchange( ref this .value, newValue, localValue); 9 } while (localValue != currentValue);
說明
把 this.value 看成是數(shù)據(jù)庫數(shù)據(jù),localValue 是某個(gè)用戶讀取的數(shù)據(jù),newValue是用戶想修改的值,這里有必要解釋一下 CompareExchange 和 currentValue,它的內(nèi)部實(shí)現(xiàn)代碼是這樣的(想想下面代碼是線程安全的):
1 var currentValue = this .value 2 if (currentValue == localValue){ 3 this .value = newValue; 4 } 5 return currentValue;
CompareExchange ?用 sql 來類比就是:update xxx set value = newValue where value = localValue,只不過返回的值不同。通過?CompareExchange 的返回結(jié)果我們知道 CAS 是否成功了,即:是否出現(xiàn)并發(fā)了,即:是否在我讀取和修改之間別人已經(jīng)修改過了,如果是,可以選擇重試。
累加示例 返回目錄
CAS 代碼
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using System.Threading; 7 8 namespace InterlockStudy 9 { 10 class ConcurrentIncrease 11 { 12 public static void Test() 13 { 14 var sum = 0 ; 15 16 var tasks = new Task[ 10 ]; 17 for ( var i = 1 ; i <= 10 ; i++ ) 18 { 19 tasks[i - 1 ] = Task.Factory.StartNew((state) => 20 { 21 int localSum, currentSum; 22 do 23 { 24 localSum = sum; 25 26 Thread.Sleep( 10 ); 27 var value = ( int )state; 28 var newValue = localSum + value; 29 30 currentSum = Interlocked.CompareExchange( ref sum, newValue, localSum); 31 } while (localSum != currentSum); 32 }, i); 33 } 34 35 Task.WaitAll(tasks); 36 37 Console.WriteLine(sum); 38 } 39 } 40 }
數(shù)據(jù)庫并發(fā)代碼
1 public static void Test13() 2 { 3 var tasks = new Task[ 10 ]; 4 for ( var i = 1 ; i <= 10 ; i++ ) 5 { 6 tasks[i - 1 ] = Task.Factory.StartNew((state) => 7 { 8 int localSum, result; 9 do 10 { 11 using ( var con = new SqlConnection(CONNECTION_STRING)) 12 { 13 con.Open(); 14 var cmd = new SqlCommand( " select sum from Tests where Id = 1 " , con); 15 var reader = cmd.ExecuteReader(); 16 reader.Read(); 17 localSum = reader.GetInt32( 0 ); 18 19 System.Threading.Thread.Sleep( 10 ); 20 var value = ( int )state; 21 var newValue = localSum + value; 22 23 cmd = new SqlCommand( " update Tests set sum = " + newValue + " where sum = " + localSum + "" , con); 24 result = cmd.ExecuteNonQuery(); 25 } 26 } while (result == 0 ); 27 }, i); 28 } 29 30 Task.WaitAll(tasks); 31 } 32 }
說明
我們發(fā)現(xiàn) CAS 版本的代碼和數(shù)據(jù)庫版本的代碼出奇的相似,數(shù)據(jù)庫的CAS操作是通過 update + where 來完成的。
寫著玩的 RingBuffer 返回目錄
代碼
1 using System; 2 using System.Collections.Generic; 3 using System.Collections.Concurrent; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7 using System.Threading; 8 9 namespace InterlockStudy 10 { 11 internal class Node<T> 12 { 13 public T Data { get ; set ; } 14 15 public bool HasValue { get ; set ; } 16 } 17 18 class RingBuffer<T> 19 { 20 private readonly Node<T> [] _nodes; 21 private long _tailIndex = - 1 ; 22 private long _headIndex = - 1 ; 23 private AutoResetEvent _readEvent = new AutoResetEvent( false ); 24 private AutoResetEvent _writeEvent = new AutoResetEvent( false ); 25 26 public RingBuffer( int maxSize) 27 { 28 _nodes = new Node<T> [maxSize]; 29 30 for ( var i = 0 ; i < maxSize; i++ ) 31 { 32 _nodes[i] = new Node<T> (); 33 } 34 } 35 36 public void EnQueue(T data) 37 { 38 while ( true ) 39 { 40 if ( this .TryEnQueue(data)) 41 { 42 _readEvent.Set(); 43 return ; 44 } 45 46 _writeEvent.WaitOne(); 47 } 48 49 } 50 51 public T DeQueue() 52 { 53 while ( true ) 54 { 55 T data; 56 if ( this .TryDeQueue( out data)) 57 { 58 _writeEvent.Set(); 59 return data; 60 } 61 62 _readEvent.WaitOne(); 63 } 64 65 } 66 67 public bool TryEnQueue(T data) 68 { 69 long localTailIndex, newTailIndex, currentTailIndex; 70 do 71 { 72 localTailIndex = _tailIndex; 73 74 if (! this .CanWrite(localTailIndex)) 75 { 76 return false ; 77 } 78 79 newTailIndex = localTailIndex + 1 ; 80 81 if (_nodes[newTailIndex % _nodes.Length].HasValue) 82 { 83 return false ; 84 } 85 86 currentTailIndex = Interlocked.CompareExchange( ref _tailIndex, newTailIndex, localTailIndex); 87 } 88 while (localTailIndex != currentTailIndex); 89 90 _nodes[newTailIndex % _nodes.Length].Data = data; 91 _nodes[newTailIndex % _nodes.Length].HasValue = true ; 92 93 return true ; 94 } 95 96 public bool TryDeQueue( out T data) 97 { 98 long localHeadIndex, newHeadIndex, currentHeadIndex; 99 do 100 { 101 localHeadIndex = _headIndex; 102 103 if (! this .CanRead(localHeadIndex)) 104 { 105 data = default (T); 106 return false ; 107 } 108 109 newHeadIndex = localHeadIndex + 1 ; 110 if (_nodes[newHeadIndex % _nodes.Length].HasValue == false ) 111 { 112 data = default (T); 113 return false ; 114 } 115 116 currentHeadIndex = Interlocked.CompareExchange( ref _headIndex, newHeadIndex, localHeadIndex); 117 } 118 while (localHeadIndex != currentHeadIndex); 119 120 data = _nodes[newHeadIndex % _nodes.Length].Data; 121 _nodes[newHeadIndex % _nodes.Length].HasValue = false ; 122 123 return true ; 124 } 125 126 private bool CanWrite( long localTailIndex) 127 { 128 return localTailIndex - _headIndex < _nodes.Length; 129 } 130 131 private bool CanRead( long localHeadIndex) 132 { 133 return _tailIndex - localHeadIndex > 0 ; 134 } 135 } 136 }
備注 返回目錄
倉促成文,如果有必要可以再寫篇文章,希望大家多批評。
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
