目錄
一、基于netty創(chuàng)建udp服務(wù)端以及對(duì)應(yīng)通道設(shè)置關(guān)鍵
二、發(fā)送數(shù)據(jù)
三、netty中的ChannelOption常用參數(shù)說明
1、ChannelOption.SO_BACKLOG
2、ChannelOption.SO_REUSEADDR
3、ChannelOption.SO_KEEPALIVE
4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
5、ChannelOption.SO_LINGER
6、ChannelOption.TCP_NODELAY
一、基于netty創(chuàng)建udp服務(wù)端以及對(duì)應(yīng)通道設(shè)置關(guān)鍵
@Configuration
@RefreshScope
public class NettyUdpServer {
@Value("${netty.server.udpPort}")
private int port;
private EventLoopGroup bossGroup;//主線程
private Channel channel;//通道
private ChannelFuture future; //回調(diào)
@Autowired
private DataCollector dataCollector;;
public Channel start() throws InterruptedException {
//判斷是否支持Epoll模式,從而創(chuàng)建不同的線程組
bossGroup = Epoll.isAvailable()? new EpollEventLoopGroup() : new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//linux平臺(tái)下增加SO_REUSEPORT特性提高性能,支持多個(gè)進(jìn)程或者線程綁定到同一個(gè)端口,提高服務(wù)器程序的吞吐性能
if(Epoll.isAvailable()) {
//設(shè)置反應(yīng)器線程組
b.group(bossGroup)
.handler(new EpollUdpServerInitializer(dataCollector))
//設(shè)置nio類型的通道
.channel(EpollDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_RCVBUF, 1024 * 1024)
.option(EpollChannelOption.SO_REUSEPORT, true);
}else{
//設(shè)置反應(yīng)器線程組
b.group(bossGroup)
.handler(new UdpServerInitializer(dataCollector))
//設(shè)置nio類型的通道
.channel(NioDatagramChannel.class)
//設(shè)置通道的參數(shù)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_REUSEADDR, true);
}
//Channel channel = server.bind(port).sync().channel();
//開始綁定服務(wù)器,通過調(diào)用sync()同步方法阻塞直到綁定成功
//ChannelFuture channelFuture = b.bind(port).sync();
//等待通道關(guān)閉的異步任務(wù)結(jié)束
//ChannelFuture closeFuture = channelFuture.channel().closeFuture();
//closeFuture.sync();
ChannelFuture f = b.bind(port).sync();
channel = f.channel();
if(f.isSuccess()){
//MasterSelector registry = new MasterSelector("","netty-services", port);
System.out.println("UDP服務(wù)器啟動(dòng),監(jiān)聽在端口:" + port);
}else {
channel.closeFuture().sync();
}
} finally {
//bossGroup.shutdownGracefully().sync();
}
System.out.println("Udp服務(wù)器啟動(dòng),監(jiān)聽在端口:"+port);
return channel;
}
}
以上代碼中Epoll.isAvailable()用戶判斷是window還是linux環(huán)境,linux環(huán)境默認(rèn)采用Epoll相關(guān)通道,所以顯式設(shè)置EpollDatagramChannel通道。在處理(handler)的設(shè)置中要根據(jù)不同的通道設(shè)置初始化的通道類型:
linux環(huán)境下EpollDatagramChannel通道設(shè)置?.handler(new EpollUdpServerInitializer(dataCollector))具體代碼
public class EpollUdpServerInitializer extends ChannelInitializer<EpollDatagramChannel> {
private final DataCollector dataCollector;
public EpollUdpServerInitializer(DataCollector dataCollector) {
this.dataCollector = dataCollector;
}
@Override
protected void initChannel(EpollDatagramChannel epollDatagramChannel) throws Exception {
epollDatagramChannel.pipeline()
//添加netty空閑超時(shí)檢查的支持
.addLast(new UdpServerHandler(dataCollector));
}
要使 通過服務(wù)器端通過EpollDatagramChannel通道發(fā)送數(shù)據(jù),客戶端能夠正常接收到數(shù)據(jù),下圖中標(biāo)紅的泛型通道類要與服務(wù)器端設(shè)置的通道類一致
同意要支持Nio類型通道為NioDatagramChannel.class時(shí),通道初始化為:
public class UdpServerInitializer extends ChannelInitializer<NioDatagramChannel> {
private final DataCollector dataCollector;
public UdpServerInitializer(DataCollector dataCollector) {
this.dataCollector = dataCollector;
}
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
nioDatagramChannel.pipeline()
//添加netty空閑超時(shí)檢查的支持
.addLast(new UdpServerHandler(dataCollector));
}
}
?要使 通過服務(wù)器端通過NioDatagramChannel通道發(fā)送數(shù)據(jù),客戶端能夠正常接收到數(shù)據(jù),下圖中標(biāo)紅的泛型通道類要與服務(wù)器端設(shè)置的通道類一致
二、發(fā)送數(shù)據(jù)
關(guān)鍵代碼,采用writeAndFlush發(fā)送數(shù)據(jù),注意:要發(fā)送udp數(shù)據(jù)報(bào),
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
/**設(shè)置最大消息大小*/
private static final int MAX_MESSAGE_SIZE = 2048;
/**線程池*/
private ExecutorService executorService;
private final DataCollector dataCollector;
public UdpServerHandler(DataCollector dataCollector) {
this.dataCollector = dataCollector;
//根據(jù)當(dāng)前系統(tǒng)可用的處理器數(shù)量創(chuàng)建一個(gè)固定長(zhǎng)度的線程池
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception {
ByteBuf buffer = datagramPacket.content();
//確保不會(huì)超出最大消息大小
if(buffer.readableBytes() > MAX_MESSAGE_SIZE) {
buffer.release();
return;
}
UdpDatagram udpDatagram = parseUdpDatagram(buffer);
UdpDatagram respUdpDatagram = dataCollector.processUdpDatagram(udpDatagram);
if (null != respUdpDatagram) {
handleReceivedData(ctx, respUdpDatagram, datagramPacket);
}
}
/**
* 處理接收到的數(shù)據(jù)
* @param ctx
* @param udpDatagram
*/
public void handleReceivedData(ChannelHandlerContext ctx, UdpDatagram udpDatagram, DatagramPacket datagramPacket) throws ExecutionException, InterruptedException {
Channel channel = ctx.channel();
if (log.isInfoEnabled()) {
log.info("received udp message: sessionId: {}, opCode: {}, short messageId: {}",
ctx.channel().id(), udpDatagram.getMessageTypeEnum(), udpDatagram.getShortMessageId());
}
byte[] payloadBytes = udpDatagram.getPayloadBytes();
ByteBuf copiedBuffer = Unpooled.copiedBuffer(payloadBytes);
ChannelFuture channelFuture = channel.writeAndFlush(new DatagramPacket(copiedBuffer.retain(), datagramPacket.sender()));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
// 數(shù)據(jù)發(fā)送成功
log.info("數(shù)據(jù)發(fā)送成功:sender host: {}, sender port:{}, sender address:{}",datagramPacket.sender().getHostName(),datagramPacket.sender().getPort(), datagramPacket.sender().getAddress());
} else {
// 數(shù)據(jù)發(fā)送失敗
log.error("數(shù)據(jù)發(fā)送失敗: {}",channelFuture.cause().getStackTrace());
channelFuture.cause().printStackTrace();
}
}
});
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
dataCollector.tcpConnect(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (log.isWarnEnabled()) {
log.warn("udp session throw an exception, sessionId:{} exception message: {}",
ctx.channel().id().asLongText(), cause.getMessage());
}
}
//當(dāng)客戶端關(guān)閉鏈接時(shí)關(guān)閉通道
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
dataCollector.tcpChannelDisconnect(ctx.channel());
}
}
處理類繼承SimpleChannelInboundHandler類泛型類為DatagramPacket?
?writeAndFlush方法中發(fā)送的數(shù)據(jù)類型要是DatagramPacket
三、netty中的ChannelOption常用參數(shù)說明
1、ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG對(duì)應(yīng)的是tcp/ip協(xié)議listen函數(shù)中的backlog參數(shù)。函數(shù)listen(int socketfd, int backlog)用來初始化服務(wù)端可連接隊(duì)列。
服務(wù)端處理客戶端連接請(qǐng)求是順序處理的,所以同一時(shí)間只能處理一個(gè)客戶端連接,多個(gè)客戶端來的時(shí)候,服務(wù)端將不能處理的客戶端連接請(qǐng)求放在隊(duì)列中等待處理,backlog參數(shù)指定了隊(duì)列的大小。
2、ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR對(duì)應(yīng)于套接字選項(xiàng)中的SO_REUSEADDR,這個(gè)參數(shù)表示允許重復(fù)使用本地地址和端口。
比如,某個(gè)服務(wù)器進(jìn)程占用了TCP的80端口進(jìn)行監(jiān)聽,此時(shí)再次監(jiān)聽該端口就會(huì)返回錯(cuò)誤,使用該參數(shù)就可以解決問題,該參數(shù)允許共用該端口,這個(gè)在服務(wù)器程序中比較常使用。
比如某個(gè)進(jìn)程非正常退出,該程序占用的端口可能要被占用一段時(shí)間才能允許其他進(jìn)程使用,而且程序死掉以后,內(nèi)核一需要一定的時(shí)間才能夠釋放此端口,不設(shè)置SO_REUSEADDR就無法正常使用該端口。
3、ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_KEEPALIVE,該參數(shù)用于設(shè)置TCP連接,當(dāng)設(shè)置該選項(xiàng)以后,連接會(huì)測(cè)試鏈接的狀態(tài),這個(gè)選項(xiàng)用于可能長(zhǎng)時(shí)間沒有數(shù)據(jù)交流的連接。
當(dāng)設(shè)置該選項(xiàng)以后,如果在兩小時(shí)內(nèi)沒有數(shù)據(jù)的通信時(shí),TCP會(huì)自動(dòng)發(fā)送一個(gè)活動(dòng)探測(cè)數(shù)據(jù)報(bào)文。
4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_SNDBUF,ChannelOption.SO_RCVBUF參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_RCVBUF這兩個(gè)參數(shù)用于操作發(fā)送緩沖區(qū)大小和接受緩沖區(qū)大小。
接收緩沖區(qū)用于保存網(wǎng)絡(luò)協(xié)議站內(nèi)收到的數(shù)據(jù),直到應(yīng)用程序讀取成功,發(fā)送緩沖區(qū)用于保存發(fā)送數(shù)據(jù),直到發(fā)送成功。
5、ChannelOption.SO_LINGER
ChannelOption.SO_LINGER參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_LINGER,Linux內(nèi)核默認(rèn)的處理方式是當(dāng)用戶調(diào)用close()方法的時(shí)候,函數(shù)返回,在可能的情況下,盡量發(fā)送數(shù)據(jù),不一定保證會(huì)發(fā)送剩余的數(shù)據(jù),造成了數(shù)據(jù)的不確定性,使用SO_LINGER可以阻塞close()的調(diào)用時(shí)間,直到數(shù)據(jù)完全發(fā)送。
6、ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的TCP_NODELAY,該參數(shù)的使用與Nagle算法有關(guān)。
Nagle算法是將小的數(shù)據(jù)包組裝為更大的幀然后進(jìn)行發(fā)送,而不是輸入一次發(fā)送一次,因此在數(shù)據(jù)包不足的時(shí)候會(huì)等待其他數(shù)據(jù)的到來,組裝成大的數(shù)據(jù)包進(jìn)行發(fā)送,雖然該算法有效提高了網(wǎng)絡(luò)的有效負(fù)載,但是卻造成了延時(shí)。
而該參數(shù)的作用就是禁止使用Nagle算法,使用于小數(shù)據(jù)即時(shí)傳輸。和TCP_NODELAY相對(duì)應(yīng)的是TCP_CORK,該選項(xiàng)是需要等到發(fā)送的數(shù)據(jù)量最大的時(shí)候,一次性發(fā)送數(shù)據(jù),適用于文件傳輸。
SO_BROADCAST | 對(duì)應(yīng)套接字層的套接字:SO_BROADCAST,將消息發(fā)送到廣播地址。 如果目標(biāo)中指定的接口支持廣播數(shù)據(jù)包,則啟用此選項(xiàng)可讓應(yīng)用程序發(fā)送廣播消息。 |
SO_KEEPALIVE | 對(duì)應(yīng)套接字層的套接字:SO_KEEPALIVE,保持連接。 在空閑套接字上發(fā)送探測(cè),以驗(yàn)證套接字是否仍處于活動(dòng)狀態(tài)。 |
SO_SNDBUF | 對(duì)應(yīng)套接字層的套接字:SO_SNDBUF,設(shè)置發(fā)送緩沖區(qū)的大小。 |
SO_RCVBUF | 對(duì)應(yīng)套接字層的套接字:SO_RCVBUF,獲取接收緩沖區(qū)的大小。 |
SO_REUSEADDR | 對(duì)應(yīng)套接字層的套接字:SO_REUSEADDR,本地地址復(fù)用。 啟用此選項(xiàng)允許綁定已使用的本地地址。 |
SO_LINGER | 對(duì)應(yīng)套接字層的套接字:SO_LINGER,延遲關(guān)閉連接。 啟用此選項(xiàng),在調(diào)用close時(shí)如果存在未發(fā)送的數(shù)據(jù)時(shí),在close期間將阻止調(diào)用應(yīng)用程序,直到數(shù)據(jù)被傳輸或連接超時(shí)。 |
SO_BACKLOG | 對(duì)應(yīng)TCP/IP協(xié)議中<font color=red>backlog</font>參數(shù),<font color=red>backlog</font>即連接隊(duì)列,設(shè)置TCP中的連接隊(duì)列大小。如果隊(duì)列滿了,會(huì)發(fā)送一個(gè)ECONNREFUSED錯(cuò)誤信息給C端,即“ Connection refused”。 |
SO_TIMEOUT | 等待客戶連接的超時(shí)時(shí)間。 |
IP_TOS | 對(duì)應(yīng)套接字層的套接字:IP_TOS,在IP標(biāo)頭中設(shè)置服務(wù)類型(TOS)和優(yōu)先級(jí)。 |
IP_MULTICAST_ADDR | 對(duì)應(yīng)IP層的套接字選項(xiàng):IP_MULTICAST_IF,設(shè)置應(yīng)發(fā)送多播數(shù)據(jù)報(bào)的傳出接口。 |
IP_MULTICAST_IF | 對(duì)應(yīng)IP層的套接字選項(xiàng):IP_MULTICAST_IF2,設(shè)置應(yīng)發(fā)送多播數(shù)據(jù)報(bào)的IPV6傳出接口。 |
IP_MULTICAST_TTL | 對(duì)應(yīng)IP層的套接字選項(xiàng):IP_MULTICAST_TTL,在傳出的 多播數(shù)據(jù)報(bào)的IP頭中設(shè)置生存時(shí)間(TTL)。 |
IP_MULTICAST_LOOP_DISABLED | 取消 指定應(yīng)將 傳出的多播數(shù)據(jù)報(bào)的副本 回傳到發(fā)送主機(jī),只要它是多播組的成員即可。 |
TCP_NODELAY | 對(duì)應(yīng)TCP層的套接字選項(xiàng):TCP_NODELAY,指定TCP是否遵循<font color=#35b998>Nagle算法</font> 決定何時(shí)發(fā)送數(shù)據(jù)。Nagle算法代表通過減少必須發(fā)送包的個(gè)數(shù)來增加網(wǎng)絡(luò)軟件系統(tǒng)的效率。即盡可能發(fā)送大塊數(shù)據(jù)避免網(wǎng)絡(luò)中充斥著大量的小數(shù)據(jù)塊。如果要追求高實(shí)時(shí)性,需要設(shè)置關(guān)閉Nagle算法;如果需要追求減少網(wǎng)絡(luò)交互次數(shù),則設(shè)置開啟Nagle算法。 |
??
?ChannelOption通用配置
參數(shù) | 說明 |
ALLOCATOR | ByteBuf的分配器,默認(rèn)值為ByteBufAllocator.DEFAULT。 |
RCVBUF_ALLOCATOR | 用于Channel分配接受Buffer的分配器,默認(rèn)值為AdaptiveRecvByteBufAllocator.DEFAULT,是一個(gè)自適應(yīng)的接受緩沖區(qū)分配器,能根據(jù)接受到的數(shù)據(jù)自動(dòng)調(diào)節(jié)大小。可選值為FixedRecvByteBufAllocator,固定大小的接受緩沖區(qū)分配器。文章來源:http://www.zghlxwxcb.cn/news/detail-845530.html |
MESSAGE_SIZE_ESTIMATOR | 消息大小估算器,默認(rèn)為DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder為實(shí)際大小,F(xiàn)ileRegion估算值為0。該值估算的字節(jié)數(shù)在計(jì)算水位時(shí)使用,F(xiàn)ileRegion為0可知FileRegion不影響高低水位。文章來源地址http://www.zghlxwxcb.cn/news/detail-845530.html |
CONNECT_TIMEOUT_MILLIS | 連接超時(shí)毫秒數(shù),默認(rèn)值30000毫秒即30秒。 |
WRITE_SPIN_COUNT | 一個(gè)Loop寫操作執(zhí)行的最大次數(shù),默認(rèn)值為16。也就是說,對(duì)于大數(shù)據(jù)量的寫操作至多進(jìn)行16次,如果16次仍沒有全部寫完數(shù)據(jù),此時(shí)會(huì)提交一個(gè)新的寫任務(wù)給EventLoop,任務(wù)將在下次調(diào)度繼續(xù)執(zhí)行。這樣,其他的寫請(qǐng)求才能被響應(yīng)不會(huì)因?yàn)閱蝹€(gè)大數(shù)據(jù)量寫請(qǐng)求而耽誤。 |
WRITE_BUFFER_WATER_MARK | |
ALLOW_HALF_CLOSURE | 一個(gè)連接的遠(yuǎn)端關(guān)閉時(shí)本地端是否關(guān)閉,默認(rèn)值為False。值為False時(shí),連接自動(dòng)關(guān)閉;為True時(shí),觸發(fā)ChannelInboundHandler的userEventTriggered()方法,事件為ChannelInputShutdownEvent。 |
AUTO_READ | 自動(dòng)讀取,默認(rèn)值為True。Netty只在必要的時(shí)候才設(shè)置關(guān)心相應(yīng)的I/O事件。對(duì)于讀操作,需要調(diào)用channel.read()設(shè)置關(guān)心的I/O事件為OP_READ,這樣若有數(shù)據(jù)到達(dá)才能讀取以供用戶處理。該值為True時(shí),每次讀操作完畢后會(huì)自動(dòng)調(diào)用channel.read(),從而有數(shù)據(jù)到達(dá)便能讀??;否則,需要用戶手動(dòng)調(diào)用channel.read()。需要注意的是:當(dāng)調(diào)用config.setAutoRead(boolean)方法時(shí),如果狀態(tài)由false變?yōu)閠rue,將會(huì)調(diào)用channel.read()方法讀取數(shù)據(jù);由true變?yōu)閒alse,將調(diào)用config.autoReadCleared()方法終止數(shù)據(jù)讀取。 |
到了這里,關(guān)于netty構(gòu)建udp服務(wù)器以及發(fā)送報(bào)文到客戶端客戶端詳細(xì)案例的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!