引言
System.Threading.Channels 是.NET Core 3.0 后推出的新的集合類型, 具有異步API,高性能,線程安全等特點(diǎn),它提供一個(gè)異步數(shù)據(jù)集合,可用于生產(chǎn)者和消費(fèi)者之前的數(shù)據(jù)異步傳遞。
它提供如下方法:
BoundedChannelOptions | Provides options that control the behavior of bounded Channel<T> instances. 提供通道的行為控制 有限通道 |
Channel | Provides static methods for creating channels. 提供創(chuàng)建通道的靜態(tài)方法 |
Channel<T> | Provides a base class for channels that support reading and writing elements of type 泛型通道,寫入和讀取方類型都為 T |
Channel<TWrite,TRead> | Provides a base class for channels that support reading elements of type 泛型通道,分別指定寫入和讀取方的類型 |
ChannelClosedException | Exception thrown when a channel is used after it's been closed. 通道在關(guān)閉后被調(diào)用時(shí)會(huì)拋出此異常 |
ChannelOptions | Provides options that control the behavior of channel instances. 提供通道的行為控制 |
ChannelReader<T> | Provides a base class for reading from a channel. 通道讀取方的基類 |
ChannelWriter<T> | Provides a base class for writing to a channel. 通道寫入方的基類 |
UnboundedChannelOptions | Provides options that control the behavior of unbounded Channel 提供通道的行為控制 無(wú)限制通道 |
BoundedChannelFullMode | Specifies the behavior to use when writing to a bounded channel that is already full. 當(dāng)通道容量達(dá)到最大時(shí) 控制通道的寫入規(guī)則 有限通道 |
靜態(tài)類 Channel
通道的創(chuàng)建方法由靜態(tài)類Channel提供,只需要指定通道類型、控制條件和數(shù)據(jù)類型
CreateBounded<T>(BoundedChannelOptions) | Creates a channel with the specified maximum capacity. 創(chuàng)建具有指定配置的通道 |
CreateBounded<T>(BoundedChannelOptions, Action<T>) | Creates a channel subject to the provided options. 通道容量已滿,再寫入時(shí)會(huì)觸發(fā)Action<T> |
CreateBounded<T>(Int32) | Creates a channel with the specified maximum capacity. 創(chuàng)建具有指定容量的通道 |
CreateUnbounded<T>() | Creates an unbounded channel usable by any number of readers and writers concurrently. 創(chuàng)建無(wú)限制通道,可供任意數(shù)量的讀取和寫入方同時(shí)操作 |
CreateUnbounded<T>(UnboundedChannelOptions) | Creates an unbounded channel subject to the provided options. 創(chuàng)建具有指定配置的無(wú)限制通道 |
BoundedChannelFullMode
DropNewest | 1 | 刪除并忽略通道中的最新項(xiàng),以便為要寫入的項(xiàng)留出空間。 |
DropOldest | 2 | 刪除并忽略通道中的最舊項(xiàng),以便為要寫入的項(xiàng)留出空間。 |
DropWrite | 3 | 刪除要寫入的項(xiàng)。 |
Wait | 0 | 等待空間可用以便完成寫入操作。 |
?ChannelOption
AllowSynchronousContinuations | 如果通道上執(zhí)行的操作能以同步方式調(diào)用已訂閱掛起異步操作的通知的延續(xù),則為 true ;如果應(yīng)以異步方式調(diào)用所有延續(xù),則為 false 。 |
SingleReader | 如果通道中的讀取器需要保證一次最多僅執(zhí)行一個(gè)讀取操作,則為 |
SingleWriter | 如果寫入到通道的編寫器需要保證一次最多僅執(zhí)行一個(gè)寫入操作,則為 |
BoundedChannelOptions
BoundedChannelOptions(int Capcity) | 初始化選項(xiàng)。 |
AllowSynchronousContinuations | 如果通道上執(zhí)行的操作能以同步方式調(diào)用已訂閱掛起異步操作的通知的延續(xù),則為 |
Capacity | 獲取或設(shè)置有限通道可能會(huì)存儲(chǔ)的最大項(xiàng)數(shù)。 |
FullMode | 獲取或設(shè)置通道已滿時(shí)由寫入操作引起的行為。BoundedChannelFullMode枚舉 |
SingleReader | 如果通道中的讀取器需要保證一次最多僅執(zhí)行一個(gè)讀取操作,則為 |
SingleWriter | 如果寫入到通道的編寫器需要保證一次最多僅執(zhí)行一個(gè)寫入操作,則為 |
?UnBoundedChannelOptions
AllowSynchronousContinuations | 如果通道上執(zhí)行的操作能以同步方式調(diào)用已訂閱掛起異步操作的通知的延續(xù),則為 true ;如果應(yīng)以異步方式調(diào)用所有延續(xù),則為 false 。 |
SingleReader | 如果通道中的讀取器需要保證一次最多僅執(zhí)行一個(gè)讀取操作,則為 |
SingleWriter | 如果寫入到通道的編寫器需要保證一次最多僅執(zhí)行一個(gè)寫入操作,則為 |
Writer
Complete(Exception) | Mark the channel as being complete, meaning no more items will be written to it. 標(biāo)記通道即將關(guān)閉,意味著沒(méi)有更多的數(shù)據(jù)需要寫入 |
TryComplete(Exception) | Attempts to mark the channel as being completed, meaning no more data will be written to it. 嘗試關(guān)閉通道,意味著沒(méi)有更多的數(shù)據(jù)需要寫入,成功返回 true,反之false |
TryWrite(T) | Attempts to write the specified item to the channel. 嘗試寫入數(shù)據(jù),成功返回 true ,反之 false |
WaitToWriteAsync(CancellationToken) | Returns a ValueTask<TResult> that will complete when space is available to write an item. 將等待有可用空間寫入項(xiàng)時(shí)完成寫入并返回 |
WriteAsync(T, CancellationToken) | Asynchronously writes an item to the channel. |
Reader
ReadAllAsync(CancellationToken) | Creates an IAsyncEnumerable<T> that enables reading all of the data from the channel.創(chuàng)建的異步可枚舉項(xiàng) |
ReadAsync(CancellationToken) | Asynchronously reads an item from the channel. |
TryPeek(T) | Attempts to peek at an item from the channel. |
TryRead(T) | 如果項(xiàng)已讀取到,則為 true ;否則為 false
|
WaitToReadAsync(CancellationToken) | 它將在有數(shù)據(jù)可供讀取時(shí)完成,并返回 |
示例
// 創(chuàng)建有限容量的channel
var channel1 = Channel.CreateBounded<string>(100);
var option = new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait//容量滿時(shí),等待空位再寫入
};
// 創(chuàng)建指定配置的有限通道
var channel2 = Channel.CreateBounded<string>(option);
await channel.Writer.WriteAsync("hello");//寫入
string res = await channel.Reader.ReadAsync();//讀取
//自動(dòng)循環(huán)讀取,內(nèi)部阻塞,當(dāng)寫入方調(diào)用Complete時(shí),會(huì)自動(dòng)退出循環(huán)
await foreach(var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}
//捕獲到異常 退出
try
{
while(!channel.Reader.Completion.IsCompleted)
{
var message = await channel.Reader.ReadAsync();
Console.WriteLine(message);
}
}
catch(ChannelClosedException)
{
Console.WriteLine("channel closed");
}
有意思的是當(dāng)寫入方Writer調(diào)用Complete方法時(shí),Reader.Completion.Completed并不會(huì)立即被標(biāo)記,而是通道內(nèi)沒(méi)有數(shù)據(jù)時(shí)再被標(biāo)記。
此外,當(dāng)寫入方Writer調(diào)用Complete方法時(shí),await foreach會(huì)自動(dòng)退出循環(huán)。但上述第二種讀取方式會(huì)報(bào)錯(cuò)ChannelClosedException,需要Try Catch包起來(lái)。
.Net Framework 4.8 沒(méi)有ReadAllAsync方法,ReadAkkAsync的實(shí)現(xiàn)其實(shí)是:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-821717.html
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out var item))
{
Console.WriteLine(item);
}
}
可以用上述方式避免報(bào)錯(cuò)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-821717.html
到了這里,關(guān)于System.Threading.Channels 高性能異步隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!