我們接下來就看和業務息息相關的解碼器,首先我們來看FrameDecoder,這個東西應該是所有的解碼器都會實現這個,所以我們來重點看一下。
? ? ? ??FrameDecoder產生的根源就是TCP/IP數據包的傳輸方式決定的,包在傳輸的過程中會分片和重組,
正如javadoc里面所說的:
? ? 客戶端在發送的時候的序列如下:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
服務器端在接受到后可能會變成下面的序列:
+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+
FrameDecoder幫助我們將接受到的數據包整理成有意義的數據幀,例如,可以幫助我們將數據包整理成
下面的數據格式:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
我們接下來就看看FrameDecoder的實現吧:
FrameDecoder是繼承SimpleChannelUpstreamHandler的,我們首先來看一下messageReceived的實現:
? ? ? ??
- @Override??
- ??? public? void?messageReceived(??
- ???????????ChannelHandlerContext?ctx,?MessageEvent?e)? throws?Exception?{??
- ??
- ???????Object?m?=?e.getMessage();??
- ??????? if?(!(m? instanceof?ChannelBuffer))?{??
- ???????????ctx.sendUpstream(e);??
- ??????????? return;??
- ???????}??
- ??
- ???????ChannelBuffer?input?=?(ChannelBuffer)?m;??
- ??????? if?(!input.readable())?{??
- ??????????? return;??
- ???????}??
- ??
- ???????ChannelBuffer?cumulation?=?cumulation(ctx);??
- ??????? if?(cumulation.readable())?{??
- ???????????cumulation.discardReadBytes();??
- ???????????cumulation.writeBytes(input);??
- ???????????callDecode(ctx,?e.getChannel(),?cumulation,?e.getRemoteAddress());??
- ???????}? else?{??
- ???????????callDecode(ctx,?e.getChannel(),?input,?e.getRemoteAddress());??
- ??????????? if?(input.readable())?{??
- ???????????????cumulation.writeBytes(input);??
- ???????????}??
- ???????}??
- ???}??
? ? ? ??這個里面首先會檢查input的可讀性,這個比較好理解,關鍵是cumulation,
我們首先來看一下cumulation的實現吧:
- private?ChannelBuffer?cumulation(ChannelHandlerContext?ctx)?{??
- ????????ChannelBuffer?c?=?cumulation;??
- ???????? if?(c?==? null)?{??
- ????????????c?=?ChannelBuffers.dynamicBuffer(??
- ????????????????????ctx.getChannel().getConfig().getBufferFactory());??
- ????????????cumulation?=?c;??
- ????????}??
- ???????? return?c;??
- ????}??
? ? ? ??這個函數很簡單,就是如果cumulation為空的時候初始化一下,如果不為空,就返回。我們得思考一下什么時候cumulation為空,什么時候不為空。我們再回過頭來看一下上面的實現吧。如果cumulation可讀,cumulation.discardReadBytes函數的作用是將0到readIndex之間的空間釋放掉,將readIndex和writeIndex都重新標記一下。然后將讀到的數據寫到buffer里面。如果cumulation不可讀,在調callDecode,如果發現從不可讀狀態到可讀狀態,則將讀到的數據寫到緩存區里面。
? ? ? ??我們再來看callDecode的實現:
- private? void?callDecode(??
- ????????????ChannelHandlerContext?context,?Channel?channel,??
- ????????????ChannelBuffer?cumulation,?SocketAddress?remoteAddress)? throws?Exception?{??
- ??
- ???????? while?(cumulation.readable())?{??
- ???????????? int?oldReaderIndex?=?cumulation.readerIndex();??
- ????????????Object?frame?=?decode(context,?channel,?cumulation);??
- ???????????? if?(frame?==? null)?{??
- ???????????????? if?(oldReaderIndex?==?cumulation.readerIndex())?{??
- ???????????????????? //?Seems?like?more?data?is?required.??
- ???????????????????? //?Let?us?wait?for?the?next?notification.??
- ???????????????????? break;??
- ????????????????}? else?{??
- ???????????????????? //?Previous?data?has?been?discarded.??
- ???????????????????? //?Probably?it?is?reading?on.??
- ???????????????????? continue;??
- ????????????????}??
- ????????????}? else? if?(oldReaderIndex?==?cumulation.readerIndex())?{??
- ???????????????? throw? new?IllegalStateException(??
- ???????????????????????? "decode()?method?must?read?at?least?one?byte?"?+??
- ???????????????????????? "if?it?returned?a?frame?(caused?by:?"?+?getClass()?+? ")");??
- ????????????}??
- ??
- ????????????unfoldAndFireMessageReceived(context,?remoteAddress,?frame);??
- ????????}??
- ??
- ???????? if?(!cumulation.readable())?{??
- ?????????? this.cumulation?=? null;??
- ????????}??
- ????}??
- ??????
?
?
? ? ? ?這個里面上面是一個循環,首先將讀指針備份一下,decode方法是交個子類實現的一個抽象方這個用來實現具體數據分幀的算法,從這個里面看到如果子類沒有讀到一幀數據,則返回null所以下面有一個判斷,是一點數據沒有讀呢,還是讀了一點,如果一點都沒有讀,就不需要再檢測了等下一次messageRecieved進行通知,如果發現讀了一點數據,就調用下一次分幀。如果讀了一幀數據就發送一個通知,unfold是針對讀到的循環數據要不要打開的意思。到最后如果發現不是可讀狀態,
cumulation將會被設置成null。
?
最后來看一下cleanup的實現
- private? void?cleanup(ChannelHandlerContext?ctx,?ChannelStateEvent?e)??
- ???????????? throws?Exception?{??
- ???????? try?{??
- ????????????ChannelBuffer?cumulation?=? this.cumulation;??
- ???????????? if?(cumulation?==? null)?{??
- ???????????????? return;??
- ????????????}? else?{??
- ???????????????? this.cumulation?=? null;??
- ????????????}??
- ??
- ???????????? if?(cumulation.readable())?{??
- ???????????????? //?Make?sure?all?frames?are?read?before?notifying?a?closed?channel.??
- ????????????????callDecode(ctx,?ctx.getChannel(),?cumulation,? null);??
- ????????????}??
- ??
- ???????????? //?Call?decodeLast()?finally.??Please?note?that?decodeLast()?is??
- ???????????? //?called?even?if?there's?nothing?more?to?read?from?the?buffer?to??
- ???????????? //?notify?a?user?that?the?connection?was?closed?explicitly.??
- ????????????Object?partialFrame?=?decodeLast(ctx,?ctx.getChannel(),?cumulation);??
- ???????????? if?(partialFrame?!=? null)?{??
- ????????????????unfoldAndFireMessageReceived(ctx,? null,?partialFrame);??
- ????????????}??
- ????????}? finally?{??
- ????????????ctx.sendUpstream(e);??
- ????????}??
- ????}??
? ??在這個里面一般來說是在socket斷開的時候調用,這個時候如果發現buffer還是可讀狀態,還會努力的確保所有的數據已經被分幀,然后調用decodeLast
?
===========================================================================================
轉自http://blog.csdn.net/xiaolang85/article/details/12621663
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
