目錄
前言
正文
Channel概述
Channel 特點(diǎn)
Channel 接口方法
ChannelOutboundInvoker
AttributeMap
總結(jié)
前言
前兩篇文章我們已經(jīng)對(duì)Netty
進(jìn)行了簡(jiǎn)單的了解和架構(gòu)設(shè)計(jì)原理的剖析。
本篇文章我們就來(lái)開始對(duì)Netty
源碼的分析,首先我們來(lái)講解 Netty 中Channel
相關(guān)的功能和接口。
io.netty學(xué)習(xí)使用匯總
正文
Channel概述
Channel 顧名思義就是?
管道
,代表網(wǎng)絡(luò) Socket 或能夠進(jìn)行 I/O 操作的組件的關(guān)系。這些 I/O 操作包括讀、寫、連接和綁定。
簡(jiǎn)單的說(shuō),Channel 就是代表連接,實(shí)體之間的連接,程序之間的連接,文件之間的連接,設(shè)備之間的連接。同時(shí)它也是數(shù)據(jù)入站和出站的載體。
Netty 中的 Channel 為用戶提供了如下功能:
-
查詢當(dāng)前 Channel 的狀態(tài)。例如,是打開還是已連接狀態(tài)等。
-
提供 Channel 的參數(shù)配置。例如,接收緩沖區(qū)大小。
-
提供支持的 I/O 操作。例如,讀、寫、連接和綁定。
-
提供 ChannelPipeline。ChannelPipeline 用戶處理所有與 Channel 關(guān)聯(lián)的 I/O 事件和請(qǐng)求。
Channel 特點(diǎn)
I/O 操作都是異步的
Channel
中的所有 I/O 操作都是異步的,一經(jīng)調(diào)用就馬上返回,而不保證所請(qǐng)求的 I/O 操作在調(diào)用結(jié)束時(shí)已完成。相反,在調(diào)用時(shí)都將返回一個(gè)?ChannelFuture
實(shí)例,用來(lái)代表未來(lái)的結(jié)果。該實(shí)例會(huì)在 I/O 操作真正完成后通知用戶,然后就可以得到具體的 I/O操作的結(jié)果。
Channel 是分層的
一個(gè)?Channel
會(huì)有一個(gè)對(duì)應(yīng)的parent
,該parent
也是一個(gè)Channel
。并且根據(jù)?Channel
創(chuàng)建的不同,它的parent
也會(huì)不一樣。例如,在一個(gè)SocketChannel
連接上ServerSocketChannel
之后,該SocketChannel
的parent
就會(huì)是該ServerSocketChannel
。層次的結(jié)構(gòu)的語(yǔ)義取決于Channel
使用了何種傳輸實(shí)現(xiàn)方式。
向下轉(zhuǎn)型以下訪問(wèn)特定于輸出的操作
某些傳輸公開了特定于該傳輸?shù)南嚓P(guān)操作,因此可以將該Channel
向下轉(zhuǎn)換為子類型以調(diào)用此類操作。
釋放資源
一旦Channel
完成,調(diào)用ChannelOutboundInvoker.close()
或ChannelOutboundInvoker.close(ChannelPromise)
來(lái)釋放所有資源是非常重要的。這樣可以確保以適當(dāng)?shù)姆绞剑ㄒ晕募浔┽尫潘械馁Y源。
Channel 接口方法
以下是Channel接口的核心源碼:
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
//返回全局唯一的channel id
ChannelId id();
//返回該通道注冊(cè)的事件輪詢器
EventLoop eventLoop();
//返回該通道的父通道,如果是ServerSocketChannel實(shí)例則返回null,
//SocketChannel實(shí)例則返回對(duì)應(yīng)的ServerSocketChannel
Channel parent();
//返回該通道的配置參數(shù)
ChannelConfig config();
//端口是否處于open,通道默認(rèn)一創(chuàng)建isOpen方法就會(huì)返回true,close方法被調(diào)用后該方法返回false
boolean isOpen();
//是否已注冊(cè)到EventLoop
boolean isRegistered();
// 通道是否處于激活
boolean isActive();
// 返回channel的元數(shù)據(jù)
ChannelMetadata metadata();
// 服務(wù)器的ip地址
SocketAddress localAddress();
// remoteAddress 客戶端的ip地址
SocketAddress remoteAddress();
//通道的關(guān)閉憑證(許可),這里是多線程編程一種典型的設(shè)計(jì)模式,一個(gè)channle返回一個(gè)固定的
ChannelFuture closeFuture();
//是否可寫,如果通道的寫緩沖區(qū)未滿,即返回true,表示寫操作可以立即 操作緩沖區(qū),然后返回。
boolean isWritable();
long bytesBeforeUnwritable();
long bytesBeforeWritable();
Channel.Unsafe unsafe();
// 返回管道
ChannelPipeline pipeline();
// 返回ByteBuf內(nèi)存分配器
ByteBufAllocator alloc();
Channel read();
Channel flush();
public interface Unsafe {
Handle recvBufAllocHandle();
SocketAddress localAddress();
SocketAddress remoteAddress();
// 把channel注冊(cè)進(jìn)EventLoop
void register(EventLoop var1, ChannelPromise var2);
// 給channel綁定一個(gè) adress,
void bind(SocketAddress var1, ChannelPromise var2);
// Netty客戶端連接到服務(wù)端
void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
// 斷開連接,但不會(huì)釋放資源,該通道還可以再通過(guò)connect重新與服務(wù)器建立連接
void disconnect(ChannelPromise var1);
// 關(guān)閉通道,回收資源,該通道的生命周期完全結(jié)束
void close(ChannelPromise var1);
void closeForcibly();
// 取消注冊(cè)。
void deregister(ChannelPromise var1);
// 從channel中讀取IO數(shù)據(jù)
void beginRead();
// 往channe寫入數(shù)據(jù)
void write(Object var1, ChannelPromise var2);
void flush();
ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
}
從上述源碼可以看到Channel
接口繼承了AttributeMap
、ChannelOutboundInvoker
、Comparable<Channel>
外,還提供了很多接口。
ChannelOutboundInvoker
ChannelOutboundInvoker
接口聲明了所有的出站的網(wǎng)絡(luò)操作。
以下是ChannelOutboundInvoker
接口的核心源碼:
public interface ChannelOutboundInvoker {
ChannelFuture bind(SocketAddress var1);
ChannelFuture connect(SocketAddress var1);
ChannelFuture connect(SocketAddress var1, SocketAddress var2);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress var1, ChannelPromise var2);
ChannelFuture connect(SocketAddress var1, ChannelPromise var2);
ChannelFuture connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
ChannelFuture disconnect(ChannelPromise var1);
ChannelFuture close(ChannelPromise var1);
ChannelFuture deregister(ChannelPromise var1);
ChannelOutboundInvoker read();
ChannelFuture write(Object var1);
ChannelFuture write(Object var1, ChannelPromise var2);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object var1, ChannelPromise var2);
ChannelFuture writeAndFlush(Object var1);
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable var1);
ChannelPromise voidPromise();
}
從上述源碼可以看到ChannelOutboundInvoker
接口中大部分方法的返回值都是ChannelFuture
及ChannelPromise
。因此,ChannelOutboundInvoker
接口的操作都是異步的。
ChannelFuture
與ChannelPromise
的差異在于,ChannelFuture
用于獲取異步的結(jié)果,而ChannelPromise
則是對(duì)ChannelFuture
進(jìn)行擴(kuò)展,支持寫的操作。因此,ChannelPromise
也被稱為可寫的ChannelFuture
。
AttributeMap
Channel
接口繼承了AttributeMap
接口,那么AttributeMap
接口的作用是什么呢?
以下是AttributeMap
接口的核心源碼:
public interface AttributeMap {
<T> Attribute<T> attr(AttributeKey<T> var1);
<T> boolean hasAttr(AttributeKey<T> var1);
}
從源碼可以看到,AttributeMap
其實(shí)就是類似于 Map 的鍵值對(duì),而鍵就是AttributeKey
類型,值就是Attribute
類型。換言之,AttributeMap
是用來(lái)存放屬性的。
我們知道每一個(gè)ChannelHandlerContext
都是ChannelHandler
和ChannelPipeline
之間連接的橋梁,每一個(gè)ChannelHandlerContext
都有屬于自己的上下文,也就說(shuō)每一個(gè)ChannelHandlerContext
上如果有AttributeMap
都是綁定上下文的,也就說(shuō)如果A的ChannelHandlerContext
中的AttributeMap
,是無(wú)法被B的ChannelHandlerContext
讀取到的。
Netty 提供了AttributeMap
的默認(rèn)實(shí)現(xiàn)類DefaultAttributeMap
。與 JDK 中?ConcurrentHashMap
相比,在高并發(fā)下DefaultAttributeMap
可以更加節(jié)省內(nèi)存。
DefaultAttributeMap
的核心源碼如下:
public class DefaultAttributeMap implements AttributeMap {
//以原子方式更新attributes變量的引用
private static final AtomicReferenceFieldUpdater<DefaultAttributeMap, AtomicReferenceArray> updater = AtomicReferenceFieldUpdater.newUpdater(DefaultAttributeMap.class, AtomicReferenceArray.class, "attributes");
//默認(rèn)桶的大小
private static final int BUCKET_SIZE = 4;
//掩碼 桶大小 3
private static final int MASK = 3;
//延遲初始化,節(jié)約內(nèi)存
private volatile AtomicReferenceArray<DefaultAttributeMap.DefaultAttribute<?>> attributes;
public <T> Attribute<T> attr(AttributeKey<T> key) {
ObjectUtil.checkNotNull(key, "key");
AtomicReferenceArray<DefaultAttributeMap.DefaultAttribute<?>> attributes = this.attributes;
if (attributes == null) {
//當(dāng)attributes為空時(shí)則創(chuàng)建它,默認(rèn)數(shù)組長(zhǎng)度4
attributes = new AtomicReferenceArray(4);
//原子方式更新attributes,如果attributes為null則把新創(chuàng)建的對(duì)象賦值
//原子方式更新解決了并發(fā)賦值問(wèn)題
if (!updater.compareAndSet(this, (Object)null, attributes)) {
attributes = this.attributes;
}
}
//根據(jù)key計(jì)算下標(biāo)
int i = index(key);
DefaultAttributeMap.DefaultAttribute<?> head = (DefaultAttributeMap.DefaultAttribute)attributes.get(i);
if (head == null) {
head = new DefaultAttributeMap.DefaultAttribute();
DefaultAttributeMap.DefaultAttribute<T> attr = new DefaultAttributeMap.DefaultAttribute(head, key);
head.next = attr;
attr.prev = head;
if (attributes.compareAndSet(i, (Object)null, head)) {
return attr;
}
//返回attributes數(shù)組中的第一個(gè)元素
head = (DefaultAttributeMap.DefaultAttribute)attributes.get(i);
}
//對(duì)head同步加鎖
synchronized(head) {
//curr 賦值為head head為鏈表結(jié)構(gòu)的第一個(gè)變量
DefaultAttributeMap.DefaultAttribute curr = head;
while(true) {
//依次獲取next
DefaultAttributeMap.DefaultAttribute<?> next = curr.next;
//next==null,說(shuō)明curr為最后一個(gè)元素
if (next == null) {
//創(chuàng)建一個(gè)新對(duì)象傳入head和key
DefaultAttributeMap.DefaultAttribute<T> attr = new DefaultAttributeMap.DefaultAttribute(head, key);
//curr后面指向attr
curr.next = attr;
//attr的前面指向curr
attr.prev = curr;
//返回新對(duì)象
return attr;
}
//如果next的key等于傳入的key,并且沒有被移除
if (next.key == key && !next.removed) {
//這直接返回next
return next;
}
//否則把curr變量指向下一個(gè)元素
curr = next;
}
}
}
public <T> boolean hasAttr(AttributeKey<T> key) {
ObjectUtil.checkNotNull(key, "key");
//attributes為null直接返回false
AtomicReferenceArray<DefaultAttributeMap.DefaultAttribute<?>> attributes = this.attributes;
if (attributes == null) {
return false;
} else {
//計(jì)算數(shù)組下標(biāo)
int i = index(key);
//獲取頭 為空直接返回false
DefaultAttributeMap.DefaultAttribute<?> head = (DefaultAttributeMap.DefaultAttribute)attributes.get(i);
if (head == null) {
return false;
} else {
//對(duì)head同步加鎖
synchronized(head) {
//從head的下一個(gè)開始,因?yàn)閔ead不存儲(chǔ)元素
for(DefaultAttributeMap.DefaultAttribute curr = head.next; curr != null; curr = curr.next) {
if (curr.key == key && !curr.removed) {
return true;
}
}
return false;
}
}
}
}
private static int index(AttributeKey<?> key) {
//與掩碼&運(yùn)算,數(shù)值肯定<=mask 正好是數(shù)組下標(biāo)
return key.id() & 3;
}
}
從上述源碼可以看出,DefaultAttributeMap
使用了AtomicReferenceArray
和synchronized
等來(lái)保證并發(fā)的安全。
AtomicReferenceArray
類提供了可以原子讀取和寫入的底層引用數(shù)組的操作,并且還包含高級(jí)原子操作。AtomicReferenceArray
支持對(duì)底層引用數(shù)組變量的原子操作。它具有獲取和設(shè)置的方法,如在變量上的讀取和寫入。compareAndSet()
方法用于判斷當(dāng)前值是否等于預(yù)期值,如果是,則以原子方式將位置i
的元素設(shè)置為給定的更新值。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-492237.html
總結(jié)
看完以上關(guān)于Channel
的介紹,相信你對(duì)Channel
應(yīng)該有了一定的認(rèn)識(shí),后面我們繼續(xù)分析Channel
相關(guān)的接口和源碼。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-492237.html
到了這里,關(guān)于io.netty學(xué)習(xí)(三)Channel 概述的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!