国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

22.Netty源碼之解碼器

這篇具有很好參考價(jià)值的文章主要介紹了22.Netty源碼之解碼器。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。


highlight: arduino-light

抽象解碼類

https://mp.weixin.qq.com/s/526p5f9fgtZu7yYq5j7LiQ

解碼器

Netty 常用解碼器類型:

  • ByteToMessageDecoder/ReplayingDecoder 將字節(jié)流解碼為消息對象;
  • MessageToMessageDecoder 將一種消息類型解碼為另外一種消息類型。

自定義一次解碼器ByteToMessageDecoder解碼器,如果讀到的字節(jié)大小為4,那么認(rèn)為讀取到了1個(gè)完整的數(shù)據(jù)包。

java class VersionDecoder extends ByteToMessageDecoder { ? ?@Override ? ?protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ? ? ? ?//此處不需要while循環(huán) ? ? ? ?if( in.readableBytes()>=4 ){ ? ? ? ? ? ?out.add(in.readInt()); ? ? ? } ? } }

自定義二次解碼器,用于將String轉(zhuǎn)換為Integer

java class StringToIntegerDecoder extends MessageToMessageDecoder<String> { ? ?@Override ? ?public void decode(ChannelHandlerContext ctx, String message,List<Object> out) throws Exception { ? ? ? ?out.add(message.length()); ? } }

此時(shí)使用一次解碼器+二次解碼器完成了Byte到String、String到Integer的轉(zhuǎn)換。

為什么要粘包拆包

為什么要粘包

首先你得了解一下TCP/IP協(xié)議,在用戶數(shù)據(jù)量非常小的情況下,極端情況下,一個(gè)字節(jié),該TCP數(shù)據(jù)包的有效載荷非常低,傳遞100字節(jié)的數(shù)據(jù),需要100次TCP傳送,100次ACK,在應(yīng)用及時(shí)性要求不高的情況下,將這100個(gè)有效數(shù)據(jù)拼接成一個(gè)數(shù)據(jù)包,那會(huì)縮短到一個(gè)TCP數(shù)據(jù)包,以及一個(gè)ack,有效載荷提高了,帶寬也節(jié)省了。

非極端情況,有可能兩個(gè)數(shù)據(jù)包拼接成一個(gè)數(shù)據(jù)包,也有可能一個(gè)半的數(shù)據(jù)包拼接成一個(gè)數(shù)據(jù)包,也有可能兩個(gè)半的數(shù)據(jù)包拼接成一個(gè)數(shù)據(jù)包。

為什么要拆包

拆包和粘包是相對的,一端粘了包,另外一端就需要將粘過的包拆開。

舉個(gè)栗子,發(fā)送端將三個(gè)數(shù)據(jù)包粘成兩個(gè)TCP數(shù)據(jù)包發(fā)送到接收端,接收端就需要根據(jù)應(yīng)用協(xié)議將三個(gè)數(shù)據(jù)包拆分成兩個(gè)數(shù)據(jù)包

還有一種情況就是用戶數(shù)據(jù)包超過了mss(最大報(bào)文長度),那么這個(gè)數(shù)據(jù)包在發(fā)送的時(shí)候必須拆分成幾個(gè)數(shù)據(jù)包,接收端收到之后需要將這些數(shù)據(jù)包粘合起來之后,再拆開。

拆包的原理

在沒有netty的情況下,用戶如果自己需要拆包,基本原理就是不斷從TCP緩沖區(qū)中讀取數(shù)據(jù),每次讀取完都需要判斷是否是一個(gè)完整的數(shù)據(jù)包

1.如果當(dāng)前讀取的數(shù)據(jù)不足以拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從tcp緩沖區(qū)中讀取,直到得到一個(gè)完整的數(shù)據(jù)包

2.如果當(dāng)前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個(gè)數(shù)據(jù)包,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),夠成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到業(yè)務(wù)邏輯,多余的數(shù)據(jù)仍然保留,以便和下次讀到的數(shù)據(jù)嘗試拼接。

netty 中的拆包也是如上這個(gè)原理,內(nèi)部會(huì)有一個(gè)累加器,每次讀取到數(shù)據(jù)都會(huì)不斷累加,然后嘗試對累加到的數(shù)據(jù)進(jìn)行拆包,拆成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,這個(gè)基類叫做 ByteToMessageDecoder,下面我們先詳細(xì)分析下這個(gè)類

同樣,我們先看下抽象解碼類的繼承關(guān)系圖。

解碼類是 ChanneInboundHandler 的抽象類實(shí)現(xiàn),操作的是 Inbound 入站數(shù)據(jù)。

解碼器實(shí)現(xiàn)的難度要遠(yuǎn)大于編碼器,因?yàn)榻獯a器需要考慮拆包/粘包問題。

由于接收方有可能沒有接收到完整的消息,所以解碼框架需要對入站的數(shù)據(jù)做緩沖操作,直至獲取到完整的消息。

22.Netty源碼之解碼器,.net,網(wǎng)絡(luò)

一次解碼器ByteToMessageDecoder

ByteToMessageDecoder 中定義了兩個(gè)累加器

2種累加器

Cumulator

每次將讀取到的數(shù)據(jù)累加。

方式1:默認(rèn)是內(nèi)存復(fù)制的方式累加.如果內(nèi)存不夠先擴(kuò)容。MERGE_CUMULATOR

方式2:組合的方式,避免內(nèi)存復(fù)制。

MERGE_CUMULATOR

默認(rèn)情況下,會(huì)使用 MERGE_CUMULATOR。

MERGE_CUMULATOR 的原理是每次都將讀取到的數(shù)據(jù)通過內(nèi)存拷貝的方式,拼接到一個(gè)大的字節(jié)容器中,這個(gè)字節(jié)容器在 ByteToMessageDecoder中叫做 cumulation。

下面我們看一下 MERGE_CUMULATOR 是如何將新讀取到的數(shù)據(jù)累加到字節(jié)容器里的

java public static final Cumulator MERGE_CUMULATOR = new Cumulator() { ? ? ? ?@Override ? ? ? ?public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?final ByteBuf buffer; ? ? ? ? ? ? ? ?if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() ? ? ? ? ? ? ? ? ? ?|| cumulation.refCnt() > 1 ? ? ? ? ? ? ? ? ? ?|| cumulation.isReadOnly()) { ? ? ? ? ? ? ? ? ? ?//按需擴(kuò)容 ? ? ? ? ? ? ? ? ? ?buffer = expandCumulation(alloc, cumulation, in.readableBytes()); ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ?buffer = cumulation; ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?buffer.writeBytes(in); ? ? ? ? ? ? ? ?return buffer; ? ? ? ? ? } finally { ? ? ? ? ? ? ? ?in.release(); ? ? ? ? ? } ? ? ? } ? };

netty 中ByteBuf的抽象,使得累加非常簡單。通過一個(gè)簡單的api調(diào)用 buffer.writeBytes(in);

便將新數(shù)據(jù)累加到字節(jié)容器中,為了防止字節(jié)容器大小不夠,在累加之前還進(jìn)行了擴(kuò)容處理

java static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { ? ? ? ?ByteBuf oldCumulation = cumulation; ? ? ? ?cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); ? ? ? ?cumulation.writeBytes(oldCumulation); ? ? ? ?oldCumulation.release(); ? ? ? ?return cumulation; ? }

擴(kuò)容也是一個(gè)內(nèi)存拷貝操作,新增的大小即是新讀取數(shù)據(jù)的大小。

ByteToMessageDecoder:拆包原理

利用NIO進(jìn)行網(wǎng)絡(luò)編程時(shí),往往需要將讀取到的字節(jié)數(shù)或者字節(jié)緩沖區(qū)解碼為業(yè)務(wù)可以使用的POJO對象。

Netty提供了ByteToMessageDecoder抽象工具解碼類。

用戶的解碼器繼承ByteToMessageDecoder,只需要實(shí)現(xiàn)decode()方法,即可完成ByteBuf到POJO對象的解碼。 不過ByteToMessageDecoder沒有考慮TCP粘包和組包等場景,讀半包需要用戶自己處理,因此我們可以繼承更高級(jí)的解碼器進(jìn)行半包處理。

首先,我們看下ByteToMessageDecoder的子類FixedLengthFrameDecoder定義的方法:

```java public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { ? ?/* ? ?channelRead方法是每次從TCP緩沖區(qū)讀到數(shù)據(jù)都會(huì)調(diào)用的方法 ? ?觸發(fā)點(diǎn)在AbstractNioByteChannel的read方法中 ? ?里面有個(gè)while循環(huán)不斷讀取,讀取到一次就觸發(fā)一次channelRead。

? ?1.累加數(shù)據(jù)到字節(jié)容器cumulation。 2.將累加到的數(shù)據(jù)的字節(jié)容器傳遞給業(yè)務(wù)進(jìn)行業(yè)務(wù)拆包 3.清理字節(jié)容器 4.傳遞業(yè)務(wù)數(shù)據(jù)包給業(yè)務(wù)解碼器處理 ? ?*/ ? ? ? ?@Override ? ?public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ? ? ? ?//if開始 判斷類型是否匹配 ? ? ? ?if (msg instanceof ByteBuf) { ? ? ? ? ? ?CodecOutputList out = CodecOutputList.newInstance(); ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?ByteBuf data = (ByteBuf) msg; ? ? ? ? ? ? ? ?//1.累加數(shù)據(jù) ? ? ? ? ? ? ? ?//if:當(dāng)前累加器沒有數(shù)據(jù),就直接跳過內(nèi)存拷貝,直接將字節(jié)容器的指針指向新讀取的數(shù)據(jù)。 ? ? ? ? ? ? ? ?//else:調(diào)用累加器累加數(shù)據(jù)至字節(jié)容器 ? ? ? ? ? ? ? ?first = cumulation == null; ? ? ? ? ? ? ? ?if (first) { ? ? ? ? ? ? ? ? ? ?//數(shù)據(jù)累加器 ? ? ? ? ? ? ? ? ? ?cumulation = data; ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ?cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); ? ? ? ? ? ? ? } //調(diào)用decode方法 ? ? ? ? ? ? ? ?//2.將累加到的數(shù)據(jù)傳遞給業(yè)務(wù)進(jìn)行拆包 ? ? ? ? ? ? ? ?//將嘗試將字節(jié)容器的數(shù)據(jù)拆分成業(yè)務(wù)數(shù)據(jù)包塞到業(yè)務(wù)數(shù)據(jù)容器out中 ? ? ? ? ? ? ? ?callDecode(ctx, cumulation, out); ? ? ? ? ? } catch (DecoderException e) { ? ? ? ? ? ? ? ?throw e; ? ? ? ? ? } } catch (Exception e) { ? ? ? ? ? ? ? ?throw new DecoderException(e); ? ? ? ? ? } finally { ? ? ? ? ?
? ? ? ? ? //何為可讀:writerIndex > readerIndex ? ? ? ? ? //何為不可讀:writerIndex <= readerIndex ? ? ? ? ? //不可讀說明已經(jīng)讀完了! ? ? ? ? ? //如果累加器不等于空 也不可讀 ? ? ? ? ? //那么執(zhí)行清理邏輯 ? ? ? ? ? ? if (cumulation != null && !cumulation.isReadable()) { //3.清理字節(jié)容器 //業(yè)務(wù)拆包完成之后,只是從字節(jié)容器中取走了數(shù)據(jù)。 ? ? ? ? ? ? ? ?//但是這部分空間對于字節(jié)容器來說依然保留著。 ? ? ? ? ? ? ? ?//而字節(jié)容器每次累加字節(jié)數(shù)據(jù)的時(shí)候都是將字節(jié)數(shù)據(jù)追加到尾部 ? ? ? ? ? ? ? ?//如果不對字節(jié)容器做清理,那么時(shí)間一長就會(huì)OOM。 //正常情況下,其實(shí)每次讀取完數(shù)據(jù),netty都會(huì)在下面這個(gè)discardSomeReadBytes方法中 ? ? ? ? ? ? ? ?//將字節(jié)容器清理 ? ? ? ? ? ? ? ?//只不過,當(dāng)發(fā)送端發(fā)送數(shù)據(jù)過快,channelReadComplete可能會(huì)很久才被調(diào)用一次 //如果一次數(shù)據(jù)讀取完畢之后,可能接收端一邊收,發(fā)送端一邊發(fā)。 ? ? ? ? ? ? ? ?//這里的讀取完畢指的是接收端在某個(gè)時(shí)間不再接受到數(shù)據(jù)為止。 ? ? ? ? ? ? ? ?//發(fā)現(xiàn)仍然沒有拆到一個(gè)完整的用戶數(shù)據(jù)包,即使該channel的設(shè)置為非自動(dòng)讀取 ? ? ? ? ? ? ? ?//也會(huì)觸發(fā)一次讀取操作 ctx.read(),該操作會(huì)重新向selector注冊op_read事件 ? ? ? ? ? ? ? ?//以便于下一次能讀到數(shù)據(jù)之后拼接成一個(gè)完整的數(shù)據(jù)包 //所以為了防止發(fā)送端發(fā)送數(shù)據(jù)過快,netty會(huì)在每次讀取到一次數(shù)據(jù) ? ? ? ? ? ? ? ?//業(yè)務(wù)拆包之后對字節(jié)字節(jié)容器做清理,清理部分的代碼如下 ? ? ? ? ? ? ? ? ? ?numReads = 0; ? ? ? ? ? ? ? ? ? ?cumulation.release(); ? ? ? ? ? ? ? ? ? ?cumulation = null; ? ? ? ? ? ? ? } else if (++ numReads >= discardAfterReads) { ? ? ? ? //如果字節(jié)容器當(dāng)前已無數(shù)據(jù)可讀取,直接銷毀字節(jié)容器 ? ? ? ? ? ? ? ?//并且標(biāo)注一下當(dāng)前字節(jié)容器一次數(shù)據(jù)也沒讀取 //如果連續(xù)16次,discardAfterReads的默認(rèn)值為16 ? ? ? ? ? ? ? ?//字節(jié)容器中仍然有未被業(yè)務(wù)拆包器讀取的數(shù)據(jù), ? ? ? ? //那就做一次壓縮,有效數(shù)據(jù)段整體移到容器首部 ? ? ? ? ? ? ? ? ? ?numReads = 0; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?discardSomeReadBytes(); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ?int size = out.size(); ? ? ? ? ? ? ? ?firedChannelRead |= out.insertSinceRecycled(); ? ? ? ? ? //4.傳遞業(yè)務(wù)數(shù)據(jù)包給業(yè)務(wù)解碼器處理 ? ? ? ? ? //觸發(fā)channelRead事件 將拆到的業(yè)務(wù)數(shù)據(jù)包都傳遞到后續(xù)的handler ? ? ? ? ? //這樣就可以把一個(gè)個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到后續(xù)的業(yè)務(wù)解碼器進(jìn)行解碼,隨后處理業(yè)務(wù)邏輯 ? ? ? ? ? ? ? ?fireChannelRead(ctx, out, size); ? ? ? ? ? ? ? ?out.recycle(); ? ? ? ? ? } ? ? ? ?//if開始對應(yīng)的else判斷類型是否匹配 ? ? ? } else { ? ? ? ? ? ?ctx.fireChannelRead(msg); ? ? ? } ? ? } ? } ? ? //frameLength=4,如果先發(fā)送2字節(jié)再發(fā)送2字節(jié) //那么是否存在解碼出現(xiàn)異常的情況? //答案:不會(huì),因?yàn)橛幸粋€(gè)死循環(huán) //比如發(fā)送方先發(fā)送了2字節(jié)的數(shù)據(jù),然后發(fā)送方又發(fā)來了2字節(jié) //首先原子累加器累加2字節(jié)傳入callDecode方法的in,in是累加器cumulation //in.isReadable()判斷可讀,調(diào)用decode方法,decode方法會(huì)判斷如果不夠4字節(jié) 直接return跳出死循環(huán) //然后發(fā)送方又發(fā)來2字節(jié),然后繼續(xù)累加到原子累加器 //判斷可讀調(diào)用decode方法。

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) { ? ? ? ?try { ? ? ? ? ? ?while (in.isReadable()) { ? ? ? ? ? ? ? ?int outSize = out.size(); //判斷out有數(shù)據(jù)就觸發(fā)fireChannelRead ? ? ? ? ? ? ? ?//out什么時(shí)候有的數(shù)據(jù)? ? ? ? ? ? ? ? ?//在子類的decode方法中 ? ? ? ? ? ? ? ?if (outSize > 0) { ? ? ? ? ? ? ? ? ? ?fireChannelRead(ctx, out, outSize); ? ? ? ? ? ? ? ? ? ?out.clear(); ? ? ? ? ? ? ? ? ? ?if (ctx.isRemoved()) { ? ? ? ? ? ? ? ? ? ? ? ?break; ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ?outSize = 0; ? ? ? ? ? ? ? } //decode在這里被調(diào)用 ? ? ? ? ? ? ? ?//decode中時(shí),不能執(zhí)行完handler remove清理操作。 ? ? ? ? ? ? ? ?//那decode完之后需要清理數(shù)據(jù)。 ? ? ? ? ? ? ? ?int oldInputLength = in.readableBytes(); ? ? ? ? ? ? ? ?decodeRemovalReentryProtection(ctx, in, out); ? ? ? ? ? ? ? ?if (ctx.isRemoved()) { ? ? ? ? ? ? ? ? ? ?break; ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ?if (outSize == out.size()) { ? ? ? ? ? ? ? ? ? ?if (oldInputLength == in.readableBytes()) { ? ? ? ? ? ? ? ? ? ? ? ?break; ? ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? ? ?continue; ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ?if (oldInputLength == in.readableBytes()) { ? ? ? ? ? ? ? ? ? ?throw new DecoderException( ? ? ? ? ? ? ? ? ? ? ? ? ? ?StringUtil.simpleClassName(getClass()) + ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?".decode() did not read anything but decoded a message."); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ?if (isSingleDecode()) { ? ? ? ? ? ? ? ? ? ?break; ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? } catch (DecoderException e) { ? ? ? ? ? ?throw e; ? ? ? } catch (Exception cause) { ? ? ? ? ? ?throw new DecoderException(cause); ? ? ? } ? } ? ? ? ? ? ?final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List out) ? ? ? ? ? ?throws Exception { ? ? ? ?decodeState = STATECALLINGCHILDDECODE; ? ? ? ?try { ? ? ? ? ? ?//模板模式 ? ? ? ? ? ?decode(ctx, in, out); ? ? ? } finally { ? ? ? ? ? ?boolean removePending = decodeState == STATEHANDLERREMOVEDPENDING; ? ? ? ? ? ?decodeState = STATE_INIT; ? ? ? ? ? ?if (removePending) { ? ? ? ? ? ? ? ?handlerRemoved(ctx); ? ? ? ? ? } ? ? ? } ? } ? //模板模式 ? //netty中對各種用戶協(xié)議的支持就體現(xiàn)在這個(gè)抽象函數(shù)中 ? //傳進(jìn)去的in是累加器累加的數(shù)據(jù) //是當(dāng)前讀取到的未被消費(fèi)的所有的數(shù)據(jù),以及業(yè)務(wù)協(xié)議包容器,所有的拆包器最終都實(shí)現(xiàn)了該抽象方法 ? //業(yè)務(wù)拆包完成之后,如果發(fā)現(xiàn)并沒有拆到一個(gè)完整的數(shù)據(jù)包,這個(gè)時(shí)候又分兩種情況 ? //1.一個(gè)是拆包器什么數(shù)據(jù)也沒讀取,可能數(shù)據(jù)還不夠業(yè)務(wù)拆包器處理,直接break等待新的數(shù)據(jù) ? //2.拆包器已讀取部分?jǐn)?shù)據(jù),說明解碼器仍然在工作,繼續(xù)解碼 ?protected abstract void decode ? ? (ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception; ? ? //我們看下子類FixedLengthFrameDecoder#decode方法 ?@Override ? ?protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { ? ? ? ?//判斷返回的字節(jié)不為空就加入 out ? ? ? ?Object decoded = decode(ctx, in); ? ? ? ?if (decoded != null) { ? ? ? ? ? ?out.add(decoded); ? ? ? } ? } ? ? protected Object decode( ? ? ? ? ? ?@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { ? ? //判斷累加器中的字節(jié)數(shù)小于固定長度的字節(jié)長度 ? ? ? ?if (in.readableBytes() < frameLength) { ? ? ? ? ? ?//返回空 ? ? ? ? ? ?return null; ? ? ? } else { ? ? ? ? ? ?//否則返回可讀的字節(jié)數(shù) 這里很重要 ? ? ? ? ? ?return in.readRetainedSlice(frameLength); ? ? ? } ? } ? ? ?protected void decodeLast ? ? ? (ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { ? ? ? ?if (in.isReadable()) { ? ? ? ? ? ?decodeRemovalReentryProtection(ctx, in, out); ? ? ? } ? } } ```

decode() 是用戶必須實(shí)現(xiàn)的抽象方法,在該方法在調(diào)用時(shí)需要傳入接收的數(shù)據(jù) ByteBuf,及用來添加編碼后消息的 List。

由于 TCP 粘包問題,ByteBuf 中可能包含多個(gè)有效的報(bào)文,或者不夠一個(gè)完整的報(bào)文。

Netty 會(huì)重復(fù)回調(diào) decode() 方法,直到?jīng)]有解碼出新的完整報(bào)文可以添加到 List 當(dāng)中,或者 ByteBuf 沒有更多可讀取的數(shù)據(jù)為止。

如果此時(shí) List 的內(nèi)容不為空,那么會(huì)傳遞給 ChannelPipeline 中的下一個(gè)ChannelInboundHandler。觸發(fā)channelRead方法。

此外 ByteToMessageDecoder 還定義了 decodeLast() 方法。

為什么抽象解碼器要比編碼器多一個(gè) decodeLast() 方法呢?

因?yàn)?decodeLast 在 Channel 關(guān)閉后會(huì)被調(diào)用一次,主要用于處理 ByteBuf 最后剩余的字節(jié)數(shù)據(jù)。

Netty 中 decodeLast 的默認(rèn)實(shí)現(xiàn)只是簡單調(diào)用了 decode() 方法。如果有特殊的業(yè)務(wù)需求,則可以通過重寫 decodeLast() 方法擴(kuò)展自定義邏輯。

ByteToMessageDecoder 還有一個(gè)抽象子類是 ReplayingDecoder。

它封裝了緩沖區(qū)的管理,在讀取緩沖區(qū)數(shù)據(jù)時(shí),你無須再對字節(jié)長度進(jìn)行檢查。因?yàn)槿绻麤]有足夠長度的字節(jié)數(shù)據(jù),ReplayingDecoder 將終止解碼操作。

ReplayingDecoder 的性能相比直接使用 ByteToMessageDecoder 要慢,大部分情況下并不推薦使用 ReplayingDecoder。

二次解碼器MessageToMessageDecoder

MessageToMessageDecoder實(shí)際上是Nety的二次解碼器,從SocketChannel讀取到的TCP數(shù)據(jù)報(bào)是ByteBuffer,先將解碼為Java對象,再二次解碼為POJO對象,因此稱之為二次解碼器。 以HTTP+XML協(xié)議棧為例,第一次解碼是將字節(jié)數(shù)組解碼成HttpRequest對象,然后對HttpRequest消息中的消息體字符串進(jìn)行二次解碼,將XML格式的字符串解碼為POJO對象。 由于二次解碼器是將一個(gè)POJO解碼為另一個(gè)POJO,一般不涉及半包處理。

MessageToMessageDecoder 與 ByteToMessageDecoder 作用類似。

都是將一種消息類型的編碼成另外一種消息類型。

與 ByteToMessageDecoder 不同的是 MessageToMessageDecoder 并不會(huì)對數(shù)據(jù)報(bào)文進(jìn)行緩存,它主要用作轉(zhuǎn)換消息模型。

比較推薦的做法是使用 ByteToMessageDecoder 解析 TCP 協(xié)議,解決拆包/粘包問題。解析得到有效的 ByteBuf 數(shù)據(jù),然后傳遞給后續(xù)的 MessageToMessageDecoder 做數(shù)據(jù)對象的轉(zhuǎn)換,具體流程如下圖所示。

22.Netty源碼之解碼器,.net,網(wǎng)絡(luò)

三種常用的解碼器

FixedLengthFrameDecoder

DelimiterBasedFrameDecoder

LengthFieldBasedFrameDecoder

固定長度:FixedLengthFrameDecoder

public class FixedLengthFrameDecoder extends ByteToMessageDecoder { ? ? ?private final int frameLength; ? ? ?public FixedLengthFrameDecoder(int frameLength) { ? ? ? ?checkPositive(frameLength, "frameLength"); ? ? ? ?this.frameLength = frameLength; ? } ? ? ?@Override ? ?protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ? ? ? ?Object decoded = decode(ctx, in); ? ? ? ?if (decoded != null) { ? ? ? ? ? ?out.add(decoded); ? ? ? } ? } ? ? ? ?protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ? ? ? ?//判斷讀取到的數(shù)據(jù)是否小于定義數(shù)據(jù)的固定長度 ? ? ? ?if (in.readableBytes() < frameLength) { ? ? ? ? ? ?//小于不處理 ? ? ? ? ? ?return null; ? ? ? } else { ? ? ? ? //否則只處理frameLength個(gè)長度的數(shù)據(jù) ? ? ? ? ? ?return in.readRetainedSlice(frameLength); ? ? ? } ? } } ?

通信協(xié)議實(shí)戰(zhàn)★

在之前通信協(xié)議設(shè)計(jì)中我們提到了協(xié)議的基本要素并給出了一個(gè)較為通用的協(xié)議示例。

下面我們通過 Netty 的編輯碼框架實(shí)現(xiàn)該協(xié)議的解碼器,加深我們對 Netty 編解碼框架的理解。

其實(shí)dubbo和rocketMq都是這種方式。

在實(shí)現(xiàn)協(xié)議編碼器之前,我們首先需要清楚一個(gè)問題:如何判斷 ByteBuf 是否存在完整的報(bào)文?

最常用的做法就是通過讀取消息長度 dataLength 進(jìn)行判斷。

如果 ByteBuf 的可讀數(shù)據(jù)長度小于 dataLength,說明 ByteBuf 還不夠獲取一個(gè)完整的報(bào)文。

在該協(xié)議前面的消息頭部分包含了魔數(shù)、協(xié)議版本號(hào)、數(shù)據(jù)長度等固定字段,共 14 個(gè)字節(jié)。

固定字段長度和數(shù)據(jù)長度可以作為我們判斷消息完整性的依據(jù),具體編碼器實(shí)現(xiàn)邏輯示例如下:

java /* +---------------------------------------------------------------+ ? | 魔數(shù) 2byte | 協(xié)議版本號(hào) 1byte | 序列化算法 1byte | 報(bào)文類型 1byte | ? +---------------------------------------------------------------+ ? | 狀態(tài) 1byte | ? ? ? 保留字段 4byte ? ? | ? ? 數(shù)據(jù)長度 4byte ? ? | ? +---------------------------------------------------------------+ ? | ? ? ? ? ? ? ? ? ? 數(shù)據(jù)內(nèi)容 (長度不定) ? ? ? ? ? ? ? ? ? ? ? ? | ? +---------------------------------------------------------------+ */ ? @Override ? public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { ? ? ?// 判斷 ByteBuf 可讀取字節(jié) ? ?if (in.readableBytes() < 14) { ? ? ? ?return; ? } ? ? ?in.markReaderIndex(); // 標(biāo)記 ByteBuf 讀指針位置 ? ?in.skipBytes(2); // 跳過魔數(shù) ? ?in.skipBytes(1); // 跳過協(xié)議版本號(hào) ? ?byte serializeType = in.readByte(); ? ?in.skipBytes(1); // 跳過報(bào)文類型 ? ?in.skipBytes(1); // 跳過狀態(tài)字段 ? ?in.skipBytes(4); // 跳過保留字段 ? ?int dataLength = in.readInt(); ? ?if (in.readableBytes() < dataLength) { ? ? ? ?in.resetReaderIndex(); // 重置 ByteBuf 讀指針位置 ? ? ? ?return; ? } ? ? ?byte[] data = new byte[dataLength]; ? ?in.readBytes(data); ? ?SerializeService serializeService = getSerializeServiceByType(serializeType); ? ?Object obj = serializeService.deserialize(data); ? ?if (obj != null) { ? ? ? ?out.add(obj); ? } } ?

總結(jié)

Netty 提供了一組 ChannelHandler 實(shí)現(xiàn)的抽象類,在項(xiàng)目開發(fā)中基于這些抽象類實(shí)現(xiàn)自定義的編解碼器具備較好的可擴(kuò)展性,最后我們通過具體示例協(xié)議的實(shí)戰(zhàn)加深了對編解碼器的理解。

當(dāng)然 Netty 在編解碼方面所做的工作遠(yuǎn)不止于此。它還提供了豐富的開箱即用的編解碼器,下節(jié)課我們便一起探索實(shí)用的編解碼技巧。文章來源地址http://www.zghlxwxcb.cn/news/detail-624307.html

到了這里,關(guān)于22.Netty源碼之解碼器的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 深入了解Transformer:從編碼器到解碼器的神經(jīng)網(wǎng)絡(luò)之旅

    深入了解Transformer:從編碼器到解碼器的神經(jīng)網(wǎng)絡(luò)之旅

    自2017年問世以來,Transformer模型在自然語言處理(NLP)領(lǐng)域引發(fā)了一場革命。它的獨(dú)特設(shè)計(jì)和高效性能使其成為了解決復(fù)雜語言任務(wù)的關(guān)鍵工具。 (1)自注意力機(jī)制 Transformer的核心在于自注意力機(jī)制。它允許模型在處理每個(gè)詞時(shí)考慮句子中的所有其他詞,從而有效捕獲長距離依

    2024年01月17日
    瀏覽(26)
  • FPGA硬件png圖片解碼器,支持所有顏色類型解碼,提供工程源碼和技術(shù)支持

    FPGA硬件png圖片解碼器,支持所有顏色類型解碼,提供工程源碼和技術(shù)支持

    png 是僅次于jpg的第二常見的圖象壓縮格式。png支持透明通道(A通道),支持無損壓縮,支持索引RGB(基于調(diào)色板的有損壓縮)。在色彩豐富的數(shù)碼照片中,png只能獲得1~4倍的壓縮比。在人工合成圖(例如平面設(shè)計(jì))中,png能獲得10倍以上的壓縮比。 本設(shè)計(jì)使用system verilog語言

    2023年04月17日
    瀏覽(23)
  • 解碼器 | 基于 Transformers 的編碼器-解碼器模型

    基于 transformer 的編碼器-解碼器模型是 表征學(xué)習(xí) 和 模型架構(gòu) 這兩個(gè)領(lǐng)域多年研究成果的結(jié)晶。本文簡要介紹了神經(jīng)編碼器-解碼器模型的歷史,更多背景知識(shí),建議讀者閱讀由 Sebastion Ruder 撰寫的這篇精彩 博文。此外,建議讀者對 自注意力 (self-attention) 架構(gòu) 有一個(gè)基本了解

    2024年02月08日
    瀏覽(25)
  • 【計(jì)算機(jī)視覺 | 目標(biāo)檢測】術(shù)語理解9:AIGC的理解,對比學(xué)習(xí),解碼器,Mask解碼器,耦合蒸餾,半耦合,圖像編碼器和組合解碼器的耦合優(yōu)化

    【計(jì)算機(jī)視覺 | 目標(biāo)檢測】術(shù)語理解9:AIGC的理解,對比學(xué)習(xí),解碼器,Mask解碼器,耦合蒸餾,半耦合,圖像編碼器和組合解碼器的耦合優(yōu)化

    AIGC指的是使用人工智能技術(shù)自動(dòng)生成的各類數(shù)字內(nèi)容,包括文本、圖像、音頻、視頻等。它利用機(jī)器學(xué)習(xí)模型進(jìn)行智能化內(nèi)容生成。 主要的技術(shù)手段包括: 自然語言生成(NLG):使用RNN、GPT等語言模型生成文本。 生成對抗網(wǎng)絡(luò)(GAN):使用GAN生成高質(zhì)量圖片。 自動(dòng)語音合成(TTS):使用

    2024年02月04日
    瀏覽(20)
  • 編碼器 | 基于 Transformers 的編碼器-解碼器模型

    基于 transformer 的編碼器-解碼器模型是 表征學(xué)習(xí) 和 模型架構(gòu) 這兩個(gè)領(lǐng)域多年研究成果的結(jié)晶。本文簡要介紹了神經(jīng)編碼器-解碼器模型的歷史,更多背景知識(shí),建議讀者閱讀由 Sebastion Ruder 撰寫的這篇精彩 博文。此外,建議讀者對 自注意力 (self-attention) 架構(gòu) 有一個(gè)基本了解

    2024年02月08日
    瀏覽(28)
  • 【Transformer系列(1)】encoder(編碼器)和decoder(解碼器)

    【Transformer系列(1)】encoder(編碼器)和decoder(解碼器)

    前言 這個(gè)專欄我們開始學(xué)習(xí)transformer,自推出以來transformer在深度學(xué)習(xí)中占有重要地位,不僅在NLP領(lǐng)域,在CV領(lǐng)域中也被廣泛應(yīng)用,尤其是2021年,transformer在CV領(lǐng)域可謂大殺四方。 在論文的學(xué)習(xí)之前,我們先來介紹一些專業(yè)術(shù)語。本篇就讓我們先來認(rèn)識(shí)一下encoder和decoder吧!

    2024年03月25日
    瀏覽(22)
  • ffmpeg中的avs解碼器綜述

    ffmpeg中的avs解碼器綜述

    最近拿了一個(gè)avs的視頻流,用硬件可以解碼,但是ffmpeg自帶的卻無法解碼。 所以研究了一下,首先看ffmpeg的avs解碼器: 可以看到avs有兩個(gè),第一個(gè)是avs 第二個(gè)是cavs. 我們先用avs來解碼,解碼的視頻是通過【 avs編碼器 】編碼的: 結(jié)果發(fā)現(xiàn)有問題,尺寸本來是640 360,結(jié)果被強(qiáng)

    2024年02月08日
    瀏覽(23)
  • flutter 視頻解碼器fijkplayer使用

    ? ? ? ?本人做視頻監(jiān)控項(xiàng)目的時(shí)候,需要去展示視頻流到用戶端,一開始使用flutter自帶的VideoPlayer播放監(jiān)控視頻,一開始沒有發(fā)現(xiàn)有什么問題,因?yàn)槭褂枚嗟氖茿ndroid模擬器,一直沒有使用iso模擬器或者真機(jī)測試能不能播放,直到開發(fā)接近尾聲,在ios模擬器上測試的時(shí)候發(fā)現(xiàn)

    2023年04月10日
    瀏覽(25)
  • 【NLP概念源和流】 06-編碼器-解碼器模型(6/20 部分)

    【NLP概念源和流】 06-編碼器-解碼器模型(6/20 部分)

    ????????在機(jī)器翻譯等任務(wù)中,我們必須從一系列輸入詞映射到一系列輸出詞。讀者必須注意,這與“序列標(biāo)記”不同,在“序列標(biāo)記”中,該任務(wù)是將序列中的每個(gè)單詞映射到預(yù)定義的類,如詞性或命名實(shí)體任務(wù)。 作者生成 ????????在上面的

    2024年02月14日
    瀏覽(51)
  • ffmpeg視頻解碼器的配置選項(xiàng)含義

    lowres 是 AVCodecContext 結(jié)構(gòu)體中的一個(gè)成員變量,用于指定編解碼器的降低分辨率級(jí)別。 在某些情況下,為了加快編解碼的速度或減少計(jì)算資源的消耗,可以通過設(shè)置 lowres 參數(shù)來降低編解碼器的分辨率級(jí)別。這將導(dǎo)致編解碼器在處理視頻時(shí)使用較低的分辨率,從而減少計(jì)算量

    2024年02月22日
    瀏覽(58)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包