孫靖 2009.5.5
Email : jing_sun999@126.com QQ : 272964426
前段時間實現了一個小型的 C/S 架構的多人在線即時通信工具, JIGQQ 。其中對使用 TCP 通信有點心得。 記得在我大學時代,就用 VB 做過 TCP 的通信。當然那時候是很初級的,發送的數據量也很小的應用。當時就覺得,有時候發送的數據接收端不能接收到,有時候呢覺得一次性沒有接受完畢。
前段時間實現了一個小型的 C/S 架構的多人在線即時通信工具, JIGQQ 。其中對使用 TCP 通信有點心得。
記得在我大學時代,就用 VB 做過 TCP 的通信。當然那時候是很初級的,發送的數據量也很小的應用。當時就覺得,有時候發送的數據接收端不能接收到,有時候呢覺得一次性沒有接受完畢,總會丟失一些內容。這和我從書本學到的 TCP/IP 可靠通信協議的知識完全不匹配,這讓我疑惑了很久。直到后來接觸的東西越來越多,眼界也逐漸開拓才慢慢意識到問題出在哪。
應用平臺: WindowsXP
開發工具: C++Builder6.0
問題描述
我在使用
Socket
接收消息時,將會觸發一個接收函數。(
BCB
中的是
ClientRead
函數)所以我在此函數處接收信息,并做相應處理。那問題來了:由于傳輸的數據包都是我自定義的,我明確的知道長度為多多少。可實際效果卻是,有時候接不夠我期待的長度,而甚至有時候一次接收的數據包長度竟然比我預期的要長
10
個字節。當時只有設定條件將不滿足我預期長度的數據包丟棄。
問題分析
看了上面描述,想必大家也明白我的錯誤在哪了吧?實際是我對 Socket 的接收機制理解有誤。 TCP/IP 只保證發送包按順序到達目的地,但可能由于網絡狀況他會自動分包發送,這樣就導致接收端的接受函數每次提交時只有若干數據,不一定是我預期的一個完整的包。可以這樣理解,發過來的實際是一個 ‘ 流 ’ 。
看來要很好的解決這個問題,那就只有先將接收的數據保存起來,再來做處理。
處理模型
為了要保存接收數據,我們首先就要建立一個緩沖區。那第一個問題來了:由于我們要接收的信息是不可預知的,那難道這個緩沖區要無限的擴容?
可我們的實際 PC 內存肯定是有限的,所以我們必須建立一套內存緩沖區可以被反復利用的機制 —— 環形隊列。
我們用圖來說明環形隊列的工作原理:
圖 1 藍色為寫入的數據,綠色為已經讀取處理的數據
看上圖 1 ,在正常狀態下: Write 指針在寫入數據,而 Read 指針在 Write 指針之前,說明緩沖區后端還有空余空間。
在指針回滾狀態下: Write 指針在 Read 指針之前,說明緩沖區的前端已經有空閑的空間。
除了這兩種狀態外,我們不得不再考慮一種即將錯誤狀態:
圖 2 藍色為寫入的數據,棕紅色為未處理的數據
看圖 2 ,無足夠空間:當 Write 指針回滾,發現無足夠空間,將和 Read 指針發生交集(虛點部分)這顯然是不合理的。一部分未處理數據將被覆蓋破環。所以我們必須重新調整整個緩沖區。
重新分配調整:當遇到空間不足,不能實現 Write 指針回滾的情況,我們只有重新開辟一個更大一點的緩沖區,并把未處理數據(棕紅色)和寫入數據(藍色)按順序復制到新的緩沖區內,并調整好 Read 和 Write 指針的位置。最后釋放掉原來的緩沖區。
我們可以看到,經過這樣一個過程,我們的緩沖區,將在 Read 指針處理速度較慢并在處理信息量增大時,逐漸擴容。但是,當擴容到一定程度,將達到一個平衡。因為信息量不可能無限增大,當需處理信息量達到最大值再結合 Read 指針的不斷處理,緩沖區的大小也將穩定下來。
我們一開頭就給此緩沖區命名為 ‘ 環形隊列 ’ 。從以上的圖和文字,我們可以形象的理解:由于緩沖區大小最終將穩定, Write 和 Read 指針將無障礙的在緩沖區中不斷循環回滾,其運行軌跡,將是一個環形。
其他知識
為了要實現上述模型,我們必須要具備一些知識。
( Write 和 Read 是相對緩沖區而言的!)
首先, Wirte 指針部分 ,應該在 Socket 接收函數中去實現。他什么都不干,只管將接收到的數據往環形隊列中存放就行了。
再一個,就是我們的 Read 指針部分 。他需要盡可能快的在環形隊列中讀取已經儲存好的數據,并解析數據后進行相關的操作。最重要一點是,這個過程必須是獨立的,在主線程之外運行。
所以,我們的 Write 指針部分應該是在主線程,而 Read 指針部分就必須建立一個額外的線程來進行工作。
下面我們就看看 C++ Builder 6.0 下如何實現多線程和需注意的相關事宜。
1. 深淺相關全局或私有變量
HANDLE DealInfHanld ; // 處理各類信息線程句柄
DWORD DealInfID ; // 處理各類信息線程 ID
CRITICAL_SECTION m_csLock ; // 用于臨界變量互鎖
2. 編寫好一個線程處理函數實體
// 處理環形隊列中各類信息
DWORD __stdcall DealInf( LPVOID )
{
// ……
}
3. 利用 CreateThread 函數創建線程
在窗體開被創建時就創建好線程處理函數:
void __fastcall TForm_Main ::FormCreate( TObject * Sender )
{
// ……
:: InitializeCriticalSection (& m_csLock );
// 開辟各類信息處理線程
DealInfHanld = CreateThread ( NULL , 0, DealInf, NULL , 0, & DealInfID );
}
進過上面 3 個步驟,函數 DealInf(LPVOID) 將被創建為一個線程函數,他將獨立在主線程之外獨立運行。
我們還需要注意一個很重要的問題:由于線程函數中操作的有關內存,很可能也是主線程中要操作的。比如我們的環形隊列就是這樣。其中 Read 指針在線程中讀取數據進行處理,而 Write 指針卻在主線程中儲存數據。所以此兩指針應該要是‘不可見’的,即他們不應該同時去操作同一數據區,總不可能要 CPU 同時對同一個內存地址又讀又寫吧?所以我們必須利用 互鎖機制 ,讓 Write 和 Read 操作互斥。
這點很好實現,比如 DealInf(LPVOID) 函數,我們只需在函數主體的開始和結束處加鎖定和解鎖代碼即可:
// 處理環形隊列中各類信息
DWORD __stdcall DealInf( LPVOID )
{
:: EnterCriticalSection (& m_csLock ); // 鎖定
// ……
:: LeaveCriticalSection (& m_csLock ); // 解鎖
}
當然我們應該在主線程 Sockte 接收函數處也這樣鎖定和解鎖。
說明:
下文代碼中, DealInfMem 為緩沖區, MemcpySize 為緩沖區大小, ReadIndex 為讀游標, WriteIndex 為寫游標。 SocketUS 為數據包開頭的 TCustomWinSocket 信息, BufferSize 為數據包長度。
具體 Write 部分實現
首先我們還需要來討論下,如何實現自定義數據包能在我們實際編碼的時帶來便利,并最好做到自定義數據包能無限擴展。
我個人認為,在自定義數據包時,必須將此數據包的實際長度信息包含進去,這樣可以有利于信息在接收時能有判斷的依據。
舉例說明:
// 加為好友命令
typedef struct QQMAKEFRIEND
{
UINT32 BufferLeng ; // 本數據包長度
UINT8 Order ; // 命令 ID
UINT32 QQNumMy ; // 自己的 QQ 號碼
UINT32 QQNumFD ; // 好友的 QQ 號碼
} QQMakeFriend ;
如上面的一個加好友命令,我們應該在數據包的最前端 4 字節用來標記數據包長度。這樣我們在信息接收函數時,就可以有效判斷我們是否已經將此數據包接收完畢。代碼如下:
// 接收各客戶端信息
// 獲取此次接收到的全部數據
do
{
t ++;
LengBuffer = Socket -> ReceiveLength ();
Socket -> ReceiveBuf ( ReadBuffer + LengOff + sizeof( TCustomWinSocket *), LengBuffer );
LengOff += LengBuffer ; // 累計接收到的數據長度
// 獲取此次數據包長度,并把 Soket 連接記錄在數據包中去
if ( t == 1)
{
memcpy ( ReadBuffer , & Socket , sizeof( TCustomWinSocket *));
memcpy (& ReLeng , ReadBuffer + sizeof( TCustomWinSocket *), 4);
}
}while ( LengOff != ReLeng ); // 若此數據包接收完畢,跳出
LengBuffer = LengOff + sizeof( TCustomWinSocket *);
經過上面的代碼,我們就可以將每次發送來的數據包完整的記錄在字節數組 ReadBuffer 中,以便后面使用。
而且這樣將數據包長度信息加在數據包中,還便于變長數據包的傳輸。比如當發送聊天信息的時候,由于我們的信息是不定長的,如果知道數據包長度便于我們完整接收數據包。并且這些信息在 Read 指針讀取的時候也發揮巨大作用,后面我們將看到。
接下來,我們就是將接收到的數據合理的儲存到環形隊列中。
通過圖 1 和圖 2 我們清楚的知道在將數據包放入環形隊列的時候,有 3 種情形:
1. 環形隊列后面還有足夠空余空間( R 追 W )
2. 環形隊列后面沒有足夠空余空間但前端有,這樣實現回滾( W 超 R 不足一圈)
3. 環形隊列前后都沒有足夠的空余空間,這樣重新開辟更大的緩沖區。復制好數據調整讀寫指針,并釋放原來緩沖區。( W 超 R 一圈)
我們就根據上面三中 2 情形實現代碼如下:
// 向環形隊列填塞信息
:: EnterCriticalSection (& m_csLock ); // 鎖定
if ( WriteIndex >= ReadIndex ) // Write 指針在 Read 指針之后
{
if (( MemcpySize - WriteIndex ) >= LengBuffer ) // 說明環行隊列后面有空閑位置
{
memcpy ( DealInfMem + WriteIndex , ReadBuffer , LengBuffer );
WriteIndex += LengBuffer ;
}
else // 后面沒有空閑位置,往前面找
{
if (( MemcpySize - WriteIndex + ReadIndex ) > LengBuffer ) // 如果前面有空閑位置
{
memcpy ( DealInfMem + WriteIndex , ReadBuffer , MemcpySize - WriteIndex );
memcpy ( DealInfMem , ReadBuffer + ( MemcpySize - WriteIndex ), LengBuffer - ( MemcpySize - WriteIndex ));
WriteIndex = LengBuffer - ( MemcpySize - WriteIndex );
}
else // 說明沒有空余空間,重新開辟緩沖區
{
while (( DealInfMem2 = ( UINT8 *) malloc ( MemcpySize + MEMSIZE + LengBuffer )) == NULL );
memcpy ( DealInfMem2 , DealInfMem , MemcpySize );
MemcpySize += ( MEMSIZE + LengBuffer );
free ( DealInfMem );
DealInfMem = DealInfMem2 ;
memcpy ( DealInfMem + WriteIndex , ReadBuffer , LengBuffer );
WriteIndex += LengBuffer ;
}
}
}
else // Write 指針在 Read 指針之前
{
if (( ReadIndex - WriteIndex ) > LengBuffer ) // 有空余空間
{
memcpy ( DealInfMem + WriteIndex , ReadBuffer , LengBuffer );
WriteIndex += LengBuffer ;
}
else // 無空余空間,重新開辟緩沖區
{
while (( DealInfMem2 = ( UINT8 *) malloc ( MemcpySize + MEMSIZE + LengBuffer )) == NULL );
memcpy ( DealInfMem2 , DealInfMem + ReadIndex , MemcpySize - ReadIndex );
memcpy ( DealInfMem2 + MemcpySize - ReadIndex , DealInfMem , WriteIndex );
memcpy ( DealInfMem2 + MemcpySize - ReadIndex + WriteIndex , ReadBuffer , LengBuffer );
WriteIndex = MemcpySize - ReadIndex + WriteIndex + LengBuffer ;
MemcpySize += ( MEMSIZE + LengBuffer );
ReadIndex = 0;
free ( DealInfMem );
DealInfMem = DealInfMem2 ;
}
}
:: LeaveCriticalSection (& m_csLock ); // 解鎖
這樣,我們就實現了 Write 指針儲存數據的部分。
具體 Read 部分實現
首先我們要看看 Read 指針在讀取數據時,有不有特別指出。
1. 按順序讀取數據;
2. 如果數據有一部分在環形隊列尾部,而其余回滾到前端,那我們必須獲取這兩部分合并成一個數據包再來使用;
3. 如何確定當前數據包應該有多長?我們本來就記錄當前數據包長度,我們只要根據此信息在環形隊列中讀取對應長度的數據就可以。
我們來看看具體代碼實現:
while (1)
{
:: EnterCriticalSection (& m_csLock ); // 鎖定
TimeSendHand ++;
// 此處為定時掛起此線程,以便主線程響應消息,不然 CPU 占用比將飆升
if ( TimeSendHand >= 100000)
{
TimeSendHand = 0;
Sleep (1);
}
// 若發現讀寫指針重疊,說明當前沒有可處理的數據,放回等待
if ( WriteIndex == ReadIndex )
{
:: LeaveCriticalSection (& m_csLock ); // 解鎖
Sleep (1);
continue;
}
if ( WriteIndex > ReadIndex ) // 說明讀取指針在寫入指針之前( W > R )
{
// 若目前接收的數據不夠,返回等待
if (( WriteIndex - ReadIndex ) <= (sizeof( TCustomWinSocket *) + 4))
{
:: LeaveCriticalSection (& m_csLock ); // 解鎖
Sleep (1);
continue;
}
// 摘取 Socket 連接記錄,用于對應發送
memcpy (& SocketUS , DealInfMem + ReadIndex , sizeof( TCustomWinSocket *));
// 摘取實際數據包長度記錄(后續 4 個字節為長度信息)
memcpy (& BufferSize , DealInfMem + ReadIndex +sizeof( TCustomWinSocket *), 4);
// 實際數據包內容不完整,返回等待
if (( WriteIndex - ReadIndex - sizeof( TCustomWinSocket *)) < BufferSize )
{
:: LeaveCriticalSection (& m_csLock );
Sleep (1);
continue;
}
memcpy ( XCBuffer , DealInfMem + ReadIndex + sizeof( TCustomWinSocket *), BufferSize ); // 摘取實際數據包內容(包括長度信息)
ReadIndex = ReadIndex + sizeof( TCustomWinSocket *) + BufferSize ; // 讀指針后移
}
else // 說明讀取指針在寫入指針之后 (W <= R 則反超,若 W == R 則重疊,若 W < R 則 W 反追 R)
{
// 若目前接收的數據不夠,返回等待
// MemcpySize - ReadIndex + WriteIndex 為后面未讀的加前面未讀的
if (( MemcpySize - ReadIndex + WriteIndex ) <= (sizeof( TCustomWinSocket *) + 4))
{
:: LeaveCriticalSection (& m_csLock );
Sleep (1);
continue;
}
// 將 sizeof(TCustomWinSocket *)+4 的信息先放入 XCBuffer ,以便利用
if (( MemcpySize - ReadIndex ) < (sizeof( TCustomWinSocket *) + 4)) // 后不夠
{
memcpy ( XCBuffer , DealInfMem + ReadIndex , MemcpySize - ReadIndex ); // 后
memcpy ( XCBuffer + MemcpySize - ReadIndex , DealInfMem , sizeof( TCustomWinSocket *) + 4 - ( MemcpySize - ReadIndex )); // 前
}
else // 后夠
{
memcpy ( XCBuffer , DealInfMem + ReadIndex , sizeof( TCustomWinSocket *) + 4);
}
// 摘取 Socket
memcpy (& SocketUS , XCBuffer , sizeof( TCustomWinSocket *));
// 摘取實際數據包長度記錄
memcpy (& BufferSize , XCBuffer + sizeof( TCustomWinSocket *), 4);
if (( MemcpySize - ReadIndex + WriteIndex - sizeof( TCustomWinSocket *)) < BufferSize ) // 實際數據包內容不完整,返回等待
{
:: LeaveCriticalSection (& m_csLock );
Sleep (1);
continue;
}
// 摘取實際數據包內容
// 若數據就完整的在一起
if (( MemcpySize - ReadIndex ) <= (sizeof( TCustomWinSocket *)))
{
memcpy ( XCBuffer , DealInfMem + sizeof( TCustomWinSocket *) - ( MemcpySize - ReadIndex ), BufferSize );
ReadIndex = sizeof( TCustomWinSocket *) - ( MemcpySize - ReadIndex ) + BufferSize ;
}
else // 數據一部分在緩沖區后端,其他部分在緩沖區前端
{
memcpy ( XCBuffer , DealInfMem + ReadIndex + sizeof( TCustomWinSocket *),
MemcpySize - ReadIndex - sizeof( TCustomWinSocket *));
memcpy ( XCBuffer + MemcpySize - ReadIndex - sizeof( TCustomWinSocket *), DealInfMem , BufferSize - ( MemcpySize - ReadIndex - sizeof( TCustomWinSocket *)));
ReadIndex = BufferSize - ( MemcpySize - ReadIndex - sizeof( TCustomWinSocket *));
}
}
XCBufferOff = 0;
// ……
// 根據信息獲取命令號處理
memcpy (& Order , XCBuffer + 4 + XCBufferOff , 1);
switch ( Order )
{
case QQMSGHEAD_ORDER : // 接收消息命令
// ……
break; // Break;
// ……
}
:: LeaveCriticalSection (& m_csLock ); // 解鎖
}
經過上面的代碼,一個個實際數據包將被放入字節數組 XCBuffer 中。并獲取了此條信息對應的 Socket 記錄 SocketUS 。
好了,經過以上的講解,我們這個 TCP 通信處理模型就建好了。這個模型可以固定不變的應用于 C/S 架構的通信。上面代碼紅色部分就是各命令信息處理的部分,并且我們的命令數據包可以任意自定義,只要確保數據包的前 4 字節記錄好當前數據包長度。
下面給出具體的實際代碼:( C++ Builder 6.0 實現)
全局變量
HANDLE DealInfHanld ; // 處理各類信息線程句柄
DWORD DealInfID ; // 處理各類信息線程 ID
// 環行隊列指針
UINT8 * DealInfMem ; // 環形隊列指針
UINT32 MemcpySize = MEMSIZE ; // 環行隊列尺寸
UINT32 WriteIndex = 0, ReadIndex = 0; // 讀寫游標
CRITICAL_SECTION m_csLock ; // 用于臨界變量互鎖
UINT8 ReadBuffer [1024*1024]; // 用于接收消息的緩存( 1M )
UINT8 XCBuffer [1024*1024]; // 用于線程中的數據包摘取( 1M )
Write 指針部分
void __fastcall TForm_Main ::ServerSocket1ClientRead( TObject * Sender , TCustomWinSocket * Socket )
{
// 接收各客戶端信息
UINT32 LengBuffer , LengOff = 0;
UINT32 ReLeng ;
UINT8 t = 0;
UINT8 * DealInfMem2 ;
// 獲取此次接收到的全部數據
do
{
t ++;
LengBuffer = Socket -> ReceiveLength ();
Socket -> ReceiveBuf ( ReadBuffer + LengOff + sizeof( TCustomWinSocket *), LengBuffer );
LengOff += LengBuffer ;
if ( t == 1)
{
memcpy ( ReadBuffer , & Socket , sizeof( TCustomWinSocket *));
memcpy (& ReLeng , ReadBuffer + sizeof( TCustomWinSocket *), 4);
}
}while ( LengOff != ReLeng );
LengBuffer = LengOff + sizeof( TCustomWinSocket *);
// 向環形隊列填塞信息
// 以下同具體 Write 部分實現中的代碼
}
Read 指針部分
// 處理環形隊列中各類信息
DWORD __stdcall DealInf( LPVOID )
{
AnsiString AddSQL ;
static UINT32 TimeSendHand = 0;
UINT32 BufferSize , XCBufferOff = 0;
UINT8 Order ;
TCustomWinSocket * SocketUS ;
// 以下同具體 Read 部分實現中的代碼
}
感興趣的朋友,可以直接把以上代碼復制到 C++ Builder 6.0 中去實驗。幾乎不用該任何代碼,就能為你實現一個使用 TCP/IP 協議接收處理的信息的通信系統。
通信模塊的使用說明
( 1 )應用平臺及開發工具:
WindowsXP , C++builder6.0
( 2 )代碼組成:
頭文件: MyThread.h
Cpp : MyThread.cpp
( 3 )功能描述:
以環形隊列為儲存基礎和類 MyThread 為多線程基礎的通信模塊。由于有環形隊列實現,以保證完全無遺漏的接收數據并等待處理。 MyThread 多線程類由 BCB 提供的類派生,保證了可操作性和穩定性。
( 4 )使用要點
數據寫入:用戶在自己的數據接收函數處,調用 MyThread 類成員函數 MyThread_WriteData(UINT32SocketUSAdr,UINT8*Buffer,UINT32LengBuffer); 即可將數據寫入到環形隊列中。
SocketUSAdr :用于接收的 Socket 控件的地址值
Buffer :接收到的數據
LengBuffer :接收到的數據長度
數據摘取使用: MyThread 類中的函數指針 MyThread_ReadData 將被處理子線程自動調用。所以,用戶需要在自己的程序中,聲明一個函數實現,并將函數地址賦給 MyThread_ReadData 。如:
void __fastcall MyReadData ( UINT32 SocketUSAdr , UINT8 * XCBuffer ); // 聲明的函數
mythread -> MyThread_ReadData = MyReadData ; // 將函數地址賦給函數指針
SocketUSAdr :記錄了發送此信息的 Socket 地址
XCBuffer :實際信息數據包
( 5 )備注
1>. 使用此通信模塊,發送的數據包必須滿足以下格式:
typedef struct QQMSGHEAD
{
UINT32 BufferLeng ; // 本數據包長度
// ……
// ……
} QQMsgHead ;
即首 4 字節必須為此次數據包的長度,以實現報文定界。數據摘取模塊就是通過此信息準確提取出每個完整的數據包,并提交給 mythread->MyThread_ReadData 所指向的函數。
2>. 環形隊列中數據以以下形式存放:
Socket 地址: 4 字節,是發送此消息的 Socket 地址。
實際數據: N 個字節,建議不超過 64K
數據包字節數: 4 字節
有用信息: N-4 字節
由于記錄了發送消息的 Socket 地址,所以此通信模塊可以很輕易應用于多人即時在線聊天系統。(本人的 JIGQQ 已成功實現)
注意:環形隊列中的數據是不用用戶自己操作的,這里給以說明只是為了用戶更好的理解本通信模塊
3>. 數據摘取函數 MyThread_ReadData 的實際實現不建議申請臨時變量,最好不要。如果要用到變量,請將其申請為全局,或申請為所在類模塊的內部成員。
環形隊列多線程模塊
對堆的申請釋放操作需要用互斥量加鎖,軟件中所有的線程的 malloc,free,new,delete 都要加鎖。
原文:
《 TCP 通信 處理 淺談 》
參考:
《 應用層通 信協議開發淺析 》
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
