nginx配置
在nginx.conf文件中,events,http同級添加配置
stream {
upstream tcp {
server 127.0.0.1:8888 weight=1;
server 127.0.0.1:8889 weight=1;
}
server {
listen 8880;
proxy_pass tcp;
proxy_protocol on; #僅此一句重點,用以判斷獲取客戶端真實ip
}
}
啟動nginx服務(wù)
netty代碼
package com.alexyang.nettyandthread.netty2;
/**
* @author yqc
* @version 1.0
* @description netty+nginx
* @date 2023-06-30 9:22
*/
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class ServerNetty implements ApplicationRunner {
final static Logger log = LogManager.getLogger(ServerNetty.class);
private int port = 8888;
private String ip = "127.0.0.1";
public void start() throws InterruptedException {
NioEventLoopGroup boss = null;
NioEventLoopGroup worker = null;
try {
ServerBootstrap b = new ServerBootstrap();
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
b.group(boss, worker);
b.channel(NioServerSocketChannel.class);
b.localAddress(port);
b.option(ChannelOption.SO_KEEPALIVE, true);//是否開啟TCP心跳機制
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast("decoder",new DecodeProxy()); // 增加這個自定義的解碼器
channel.pipeline().addLast(new ServerHandler());
}
});
log.info("啟動 netty 服務(wù)端");
ChannelFuture future = b.bind(ip, port).sync();
log.info("服務(wù)器啟動成功,監(jiān)聽端口{}", future.channel().localAddress());
future.channel().closeFuture().sync();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//關(guān)閉線程組,釋放資源
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("執(zhí)行.............");
start();
}
}
package com.alexyang.nettyandthread.netty2;
/**
* @author yqc
* @version 1.0
* @description nginx代理netty tcp服務(wù)端負載均衡,nginx stream要打開 proxy_protocol on; 配置
* @date 2023-06-30 10:09
*/
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.nio.charset.Charset;
import java.util.List;
/**
* @Description nginx代理netty tcp服務(wù)端負載均衡,nginx stream要打開 proxy_protocol on; 配置
*/
public class DecodeProxy extends ByteToMessageDecoder {
/**
* 保存客戶端IP
*/
public static AttributeKey<String> key = AttributeKey.valueOf("IP");
/**
* decode() 會根據(jù)接收的數(shù)據(jù),被調(diào)用多次,直到確定沒有新的元素添加到list,
* 或者是 ByteBuf 沒有更多的可讀字節(jié)為止。
* 如果 list 不為空,就會將 list 的內(nèi)容傳遞給下一個 handler
*
* @param ctx 上下文對象
* @param byteBuf 入站后的 ByteBuf
* @param out 將解碼后的數(shù)據(jù)傳遞給下一個 handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
/*消息打印--------------------------*/
byte[] bytes = printSz(byteBuf);
String message = new String(bytes, Charset.forName("UTF-8"));
//logger.info("從客戶端收到的字符串:" + message);
/*消息打印--------------------------*/
if (bytes.length > 0) {
//判斷是否有代理
if (message.indexOf("PROXY") != -1) {
//PROXY MSG: PROXY TCP4 192.168.12.52 192.168.12.52 1096 5680\r\n
System.out.println("PROXY MSG: " + message.substring(0, message.length() - 2));
if (message.indexOf("\n") != -1) {
String[] str = message.split("\n")[0].split(" ");
System.out.println("Real Client IP: " + str[2]);
Attribute<String> channelAttr = ctx.channel().attr(key);
//基于channel的屬性
if (null == channelAttr.get()) {
channelAttr.set(str[2]);
}
}
//清空數(shù)據(jù),重要不能省略
byteBuf.clear();
}
if (byteBuf.readableBytes() > 0) {
//logger.info("out add!!!");
out.add(byteBuf.readBytes(byteBuf.readableBytes()));
}
}
}
/**
* 打印byte數(shù)組
*
* @param newBuf
*/
public byte[] printSz(ByteBuf newBuf) {
ByteBuf copy = newBuf.copy();
byte[] bytes = new byte[copy.readableBytes()];
copy.readBytes(bytes);
//logger.info("字節(jié)數(shù)組打印:" + Arrays.toString(bytes));
return bytes;
}
}
package com.alexyang.nettyandthread.netty2;
/**
* @description ServerHandler
* @author yqc
* @date 2023-06-30 9:23
* @version 1.0
*/
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.Attribute;
import java.nio.charset.Charset;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("收到鏈接:" + ctx.channel().remoteAddress());
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Attribute<String> channelAttr = ctx.channel().attr(DecodeProxy.key);
//基于channel的屬性
if(null != channelAttr.get()){
System.out.println("IP地址--------------- :" + channelAttr.get());
}
ByteBuf in = (ByteBuf) msg;
System.out.println("收到客戶端" + ctx.channel().remoteAddress().toString() + "內(nèi)容:" + in.toString(Charset.forName("UTF-8")));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
啟動2個服務(wù)netty服務(wù)設(shè)置nginx中8888,8889端口。
使用tcp工具連接并發(fā)送數(shù)據(jù)測試文章來源:http://www.zghlxwxcb.cn/news/detail-736409.html
參考博客
參考鏈接1
參考鏈接2文章來源地址http://www.zghlxwxcb.cn/news/detail-736409.html
到了這里,關(guān)于Nginx+netty實現(xiàn)tcp負載均衡,獲取客戶端真實ip的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!