Netty中的Handler
# Netty那点事(五)讲讲Handler
至上部分为止,我觉得Netty的架构部分已经差不多说完了,还有些细节,可以在实践中慢慢掌握。
但是对于实践来说,Netty还有不容忽视的一部分:Netty提供了大量的ChannelHandler,可以完成不同的任务。用好它们,会使Netty在你手里更加得心应手!
# 业务多线程执行
# OrderedMemoryAwareThreadPoolExecutor
# 使用Netty 4.1进行内存感知通道处理(Memory-aware Channel handling with Netty 4.1)
在4(3.x)之前的Netty版本中,有一种方法可以通过执行程序OrderedMemoryAwareThreadPoolExecutor内存来进行通道处理,并使用OrderedMemoryAwareThreadPoolExecutor执行程序对其进行排序,以执行给定Channel 。 3.x中的OrderedMemoryAwareThreadPoolExecutor将负责为通道排序事件处理,即使它们可以由不同的线程执行,也可以限制Channel使用的总内存。 如果通道内存(由于排队事件)超过某个阈值,则会阻止事件的执行,直到释放内存为止。
然而,在4.x中,没有这样的机制。 新的线程模型确实提供了对已执行事件的排序(因为特定通道的事件由单个线程执行),但似乎没有办法限制任何EventExecutorGroup单个Channel消耗的内存。 这意味着,如果无法做到这一点,发送到某个特定Channel的大量事件可能会耗尽服务器上的内存。 虽然我还没有对此事进行测试,但我认为在这里询问Netty 4.x的情况是否值得。
所以我的问题基本上是:
在Netty 4.x中使用带有ChannelHandler的EventExecutorGroup时,有没有办法限制单个Channel消耗的内存?
这种情况是可能的。
但是,Netty为您的频道提供了ChannelOption.WRITE_BUFFER_WATER_MARK选项。 因此,当您在某个频道写入太快并且待处理消息队列超过ChannelOption.WRITE_BUFFER_WATER_MARK限制时,您写入的频道将变得不可写。 所以你可以用以下方法保护你的代码
if (channel.isWritable()) {
}
要么
if (ctx.channel().isWritable()) {
}
因此,当通道繁忙或缓慢消耗事件时,可防止内存耗尽。
您还可以为生成事件的通道更改ChannelOption.AUTO_READ ,并使用以下方法手动处理此事件:
ctx.channel().config().setAutoRead(false);
因此,您的服务器将停止从生成它们的通道中读取事件。 这是展示这种方式的pull请求 。
# ExecutionHandler
如果业务处理handler耗时长,将严重影响可支持的并发数。
针对这一问题,经过学习,发现了可以使用ExecutionHandler来优化。
先来回顾一下没有使用ExecutionHandler优化的流程:
1)Boss线程(接收到客户端连接)->生成Channel->交给Worker线程池处理。
2)某个被分配到任务的Worker线程->读完已接收的数据到ChannelBuffer->触发ChannelPipeline中的ChannelHandler链来处理业务逻辑。
注意:执行ChannelHandler链的整个过程是同步的,如果业务逻辑的耗时较长,会将导致Work线程长时间被占用得不到释放,从而影响了整个服务器的并发处理能力。
一、引入ExecutionHandler优化
//HttpServerPipelineFactory.java
private final ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
public class HttpServerPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("execution", executionHandler);
pipeline.addLast("handler", new HttpServerHandler());
return pipeline;
}
}
当我们引入ExecutionHandler后,原本同步的ChannelHandler链在经过 ExecutionHandler后就结束了,它会被ChannelFactory的worker线程池所回收,而剩下的ChannelHandler链将由ExecutionHandler的线程池接手处理。
对于ExecutionHandler需要的线程池模型,Netty提供了两种可选:
1) MemoryAwareThreadPoolExecutor 通过对线程池内存的使用控制,可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻塞),并可控制单个Channel待处理任务的上限,防止内存溢出错误。但是它不维持同一Channel的ChannelEvents秩序,当经过ExecutionHandler后的ChannelHandler链中有不止一个Handler时,这些事件驱动存在混乱的可能。例如:
----------------------------------------> Timeline ------------------------------------->
Thread X: --- Channel A (Event 2) --- Channel A (Event 1) ----------------------------->
Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) --->
Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->
2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子类。除了MemoryAwareThreadPoolExecutor 的功能之外,它还可以保证同一Channel中处理的事件流的顺序性(不同Channel使用不同的key保持事件顺序),这主要是控制事件在异步处理模式下可能出现的错误事件顺序,但它并不保证同一 Channel中的事件都在一个线程中执行(通常也没必要)。例如:
----------------------------------------> Timeline ---------------------------------------->`` ``Thread X: --- Channel A (Event ``1``) --. .-- Channel B (Event ``2``) --- Channel B (Event ``3``) --->`` ``\ /`` ``X`` ``/ \`` ``Thread Y: --- Channel B (Event ``1``) --``' '``-- Channel A (Event ``2``) --- Channel A (Event ``3``) --->
二、具有可伸缩性的OrderedMemoryAwareThreadPoolExecutor****使用策略
在大多数情况下,我们会使用OrderedMemoryAwareThreadPoolExecutor,它的构造函数要求我们提供线程池的大小,在上面的代码中,我们使用了16这个具体的值,是一种很不好的写法,通常情况下,我们会使用配置文件使之可变,但是在实际部署时,并不能保证实施人员能很好的去调整,故提供如下的一种写法:
double coefficient = 0.8; //系数
int numberOfCores = Runtime.getRuntime().availableProcessors();
int poolSize = (int)(numberOfCores / (1 - coefficient));
我们可以使用poolSize取代OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)中的那个16,因为当一个系统被开发出来后,它是CPU密集型还是IO密集型是可评估的,通过评估其密集型,调整系数即可:CPU密集型接近0,IO密集型接近1。
# 粘包/分包
# 1. DelimiterBasedFrameDecoder使用特殊字符作为分割,如果使用的话,注意特殊字符不能在真正要传输的内容中出现,
客户端
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup ();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap ();
//设置相关参数
bootstrap.group (group) //设置线程组
.channel (NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
.handler (new ChannelInitializer<SocketChannel> () {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//字符串编码器
//channel.pipeline ().addLast (new StringEncoder ());
//long编码器
// channel.pipeline ().addLast (new OutEncoder ());
ByteBuf delimiter = Unpooled.copiedBuffer ("$_$".getBytes ());
channel.pipeline ().addLast (new DelimiterBasedFrameDecoder (1024, delimiter));
//加入处理器
channel.pipeline ().addLast (new NettyClientHandler ());
}
});
System.out.println ("netty client start");
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect ("127.0.0.1", 9000).sync ();
//对关闭通道进行监听
channelFuture.channel ().closeFuture ().sync ();
} finally {
group.shutdownGracefully ();
}
}
}
客户端的handler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println ("发送消息");
for (int i = 0; i < 2; i++) {
//拆包用分隔符
ByteBuf buf = Unpooled.copiedBuffer ("HelloServer$_$", CharsetUtil.UTF_8);
ctx.writeAndFlush (buf);
}
ctx.fireChannelActive ();
}
//当通道有读取事件时会触发,即服务端发送数据给客户端
//msg就额是接受的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = ( ByteBuf ) msg;
//因为已经使用了StringDecoder传过来的已经string类型
System.out.println ("收到服务端的消息:" + buf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace ();
ctx.close ();
}
}
服务端
public static void main(String[] args) throws Exception {
//创建两个线程组bossGroup和workerGroup,
//NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup (1);
EventLoopGroup workerGroup = new NioEventLoopGroup (8);
try {
//创建服务器端的引导对象
ServerBootstrap bootstrap = new ServerBootstrap ();
//将两个线程组放进引导对象
bootstrap.group (bossGroup, workerGroup)
//NioServerSocketChannel服务端channel
.channel (NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option (ChannelOption.SO_BACKLOG, 1024)
.childHandler (new ChannelInitializer<SocketChannel> () {//创建通道初始化对象,设置初始化参数
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//每一个链接过来(每一次创建channel)都会执行这个方法
System.out.println ("初始化pipeline");
//编码,将String转码为ByteBuf
ch.pipeline ().addLast ("encode", new StringEncoder ());
//用分割符处理粘包问题,分隔符可以是多个,
// 1024代表1024个字节内还没有找到分隔符抛出异常,TooLongFrameException,
//如果后续的Handler重写了exceptionCaught方法就会调用exceptionCaught方法
//DelimiterBasedFrameDecoder分割是转义成ByteBuf才分割的,所以如果添加了StringDecoder的话,
// 要把StringDecoder放在DelimiterBasedFrameDecoder的后面
ByteBuf delimiter = Unpooled.copiedBuffer ("$_$".getBytes ());
ch.pipeline ().addLast (new DelimiterBasedFrameDecoder (1024, delimiter));
//解码,将ByteBuf解码为String
ch.pipeline ().addLast ("decode", new StringDecoder ());
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
ch.pipeline ().addLast (new NettyServerHandler ());
}
});
System.out.println ("netty server start。。");
//绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
//启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
//我们可以去掉sync,添加事件监听,如果链接成功失败相对应的处理
ChannelFuture cf = bootstrap.bind (9000);
//给cf注册监听器,监听我们关心的事件
cf.addListener (new ChannelFutureListener () {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess ()) {
//这里只是简单的打印,
System.out.println ("监听端口9000成功");
} else {
System.out.println ("监听端口9000失败");
}
}
});
//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
cf.channel ().closeFuture ().sync ();
} finally {
bossGroup.shutdownGracefully ();
workerGroup.shutdownGracefully ();
}
}
}
服务端的handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println ("channelActive打印" + Thread.currentThread ().getName () + ctx.channel ().remoteAddress ());
//如果此handler后续还有handler的话,只有调用了fireXXX才能向下继续调用
ctx.fireChannelActive ();
}
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//使用了Stingdecoder所以这里接受到消息就是string类型的,不需要再次抓交换
System.out.println ("客户端发送消息是:" +msg);
//这是返回给客户端消息,如果使用StringEncoder,直接发String类型就可以,如果没有使用就先转为ByteBuf
ByteBuf buf1 = Unpooled.copiedBuffer ("HelloClient$_$", CharsetUtil.UTF_8);
Channel channel = ctx.channel ();
channel.writeAndFlush (buf1);
}
/**
* 数据读取完毕处理方法,这个方法个人认为不太好用,
*
* @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
* System.out.println ("服务端接受消息结束");
* ByteBuf buf = Unpooled.copiedBuffer ("HelloClient$_$", CharsetUtil.UTF_8);
* ctx.writeAndFlush (buf);
* }
*/
@Override
//处理异常, 一般是需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println (cause.getMessage ());
ctx.close ();
}
}
# 2. FixedLengthFrameDecoder定长,这个也不常用
//每一个链接过来(每一次创建channel)都会执行这个方法
System.out.println ("初始化pipeline");
//编码,将String转码为ByteBuf
ch.pipeline ().addLast ("encode", new StringEncoder ());
//固定接受10个字节,如果超过这个字节数,超出的部分存在tcp的缓存中,等待下一次传输,超出的部分不会丢弃掉
ch.pipeline ().addLast (new FixedLengthFrameDecoder (10));
//解码,将ByteBuf解码为String
ch.pipeline ().addLast ("decode", new StringDecoder ());
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
ch.pipeline ().addLast (new NettyServerHandler ());
客户端发送的消息为HelloServer11个字节,可以看到超出的r会放到下一次的消息中, 一个汉字为3个字节,所以很难在实际项目中使用 客户端发送消息是:HelloServe 客户端发送消息是:rHelloServ
# 3. LineBasedFrameDecoder 换行符在发送消息加上换行符 \n
System.out.println ("初始化pipeline");
//编码,将String转码为ByteBuf
ch.pipeline ().addLast ("encode", new StringEncoder ());
//如果字节超过1024还没有找到换行符则抛出异常
ch.pipeline ().addLast (new LineBasedFrameDecoder (1024));
//解码,将ByteBuf解码为String
ch.pipeline ().addLast ("decode", new StringDecoder ());
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
ch.pipeline ().addLast (new NettyServerHandler ());
# 4. LengthFieldBasedFrameDecoder和LengthFieldPrepender
LengthFieldPrepender编码器,将发送消息的前面加上请求体的字节长度 LengthFieldBasedFrameDecoder获取请求头的长度,根据长度获取请求体的信息 个人认为这个比较常用 客户端
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup ();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap ();
//设置相关参数
bootstrap.group (group) //设置线程组
.channel (NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
.handler (new ChannelInitializer<SocketChannel> () {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//规定标记消息提长度所占字节数
channel.pipeline().addLast(new LengthFieldPrepender (2));
channel.pipeline ().addLast (new NettyClientHandler ());
}
});
System.out.println ("netty client start");
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect ("127.0.0.1", 9000).sync ();
//对关闭通道进行监听
channelFuture.channel ().closeFuture ().sync ();
} finally {
group.shutdownGracefully ();
}
}
}
客户端handler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println ("发送消息");
for (int i = 0; i < 2; i++) {
//拆包用分隔符
ByteBuf buf = Unpooled.copiedBuffer ("HelloServer"+i, CharsetUtil.UTF_8);
ctx.writeAndFlush (buf);
}
ctx.fireChannelActive ();
}
//当通道有读取事件时会触发,即服务端发送数据给客户端
//msg就额是接受的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = ( ByteBuf ) msg;
//因为已经使用了StringDecoder传过来的已经string类型
System.out.println ("收到服务端的消息:" + buf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace ();
ctx.close ();
}
}
服务端
public class NettyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组bossGroup和workerGroup,
//NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup (1);
EventLoopGroup workerGroup = new NioEventLoopGroup (8);
try {
//创建服务器端的引导对象
ServerBootstrap bootstrap = new ServerBootstrap ();
//将两个线程组放进引导对象
bootstrap.group (bossGroup, workerGroup)
//NioServerSocketChannel服务端channel
.channel (NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option (ChannelOption.SO_BACKLOG, 1024)
.childHandler (new ChannelInitializer<SocketChannel> () {//创建通道初始化对象,设置初始化参数
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//每一个链接过来(每一次创建channel)都会执行这个方法
System.out.println ("初始化pipeline");
// 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据
// 进行长度字段解码,这里也会对数据进行粘包和拆包处理
//maxFrameLength:指定了每个包所能传递的最大数据包大小;
//lengthFieldOffset:指定了长度字段在字节码中的偏移量;
//lengthFieldLength:指定了长度字段所占用的字节长度;
//lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,这样就可以只得到消息体的数据,这里的lengthAdjustment指定的就是消息头的长度;
//initialBytesToStrip:对于长度字段在消息头中间的情况,可以通过initialBytesToStrip忽略掉消息头以及长度字段占用的字节。
//1024最大数据包长度,包括长度所占的字节数
//0,因为第一字符开始就是长度
//2消息体长度所占字节数
//0因为第一个字符就是长度
//2去掉长度所占字节数,获取剩下的消息体
//注2就是下面LengthFieldPrepender中的规定长度所占的字节数
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder (1024, 0, 2, 0, 2));
// LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
ch.pipeline().addLast(new LengthFieldPrepender (2));
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
//StringDecoder要放到LengthFieldBasedFrameDecoder后面,对得到消息体的ByteBuf转码为String类型,个人认为这个比较常用
ch.pipeline ().addLast ("encode", new StringDecoder ());
ch.pipeline ().addLast (new NettyServerHandler ());
}
});
System.out.println ("netty server start。。");
//绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
//启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
//我们可以去掉sync,添加事件监听,如果链接成功失败相对应的处理
ChannelFuture cf = bootstrap.bind (9000);
//给cf注册监听器,监听我们关心的事件
cf.addListener (new ChannelFutureListener () {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess ()) {
//这里只是简单的打印,
System.out.println ("监听端口9000成功");
} else {
System.out.println ("监听端口9000失败");
}
}
});
//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
cf.channel ().closeFuture ().sync ();
} finally {
bossGroup.shutdownGracefully ();
workerGroup.shutdownGracefully ();
}
}
}
服务端handler
/**
* 继承了ChannelInboundHandlerAdapter 属于Inbound,输入的处理器
*/
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println ("channelActive打印" + Thread.currentThread ().getName () + ctx.channel ().remoteAddress ());
//如果此handler后续还有handler的话,只有调用了fireXXX才能向下继续调用
ctx.fireChannelActive ();
}
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//使用了Stingdecoder所以这里接受到消息就是string类型的,不需要再次抓交换
System.out.println ("客户端发送消息是:" +msg);
}
/**
* 数据读取完毕处理方法,这个方法个人认为不太好用,
*
* @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
* System.out.println ("服务端接受消息结束");
* ByteBuf buf = Unpooled.copiedBuffer ("HelloClient$_$", CharsetUtil.UTF_8);
* ctx.writeAndFlush (buf);
* }
*/
@Override
//处理异常, 一般是需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println (cause.getMessage ());
ctx.close ();
}
}
# TCP粘包和拆包
TCP是面向连接的, 面向流的, 提供可靠性服务, 收发两端(客户端和服务器端) 都有一一成对的Socket,因此发送端为了将多个发给接收端的包, 更有效的发给对方, 使用了优化算法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包, 这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
由于TCP无消息保护边界, 需要在接收端处理消息边界问题, 也就是我们所说的粘包,拆包问题,看一张图
示意图TCP粘包,拆包图解
对图的说明
假设客户端分别发送了两个数据包D1和D2给服务端, 由于服务端一次读取到字节数是不确定的,故有可能存在以下四种情况
服务端分别两次读取到了两个独立的数据包, 分别是D1 和 D2, 没有粘包和拆包
服务端一次接收到了两个数据包D1和D2粘在了一起,称之为TCP粘包
服务端分两次读取到了数据包, 第一次读取到了完整的D1包和D2包的部分内容, 第二次读取到了D2包的剩余部分, 称之为TCP拆包
服务器分两次读取到了数据包, 第一次读取到了D1包的部分内容D1_1, 第二次读取到了D1包的剩余部分D1_2, 和完整的D2包
# TCP粘包和拆包现象实例
在编写Netty程序时, 如果没有做处理,就会发生粘包和拆包问题
看一个具体的实例
NettyServer
package com.daicy.netty.netty.tcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture sync = serverBootstrap.bind("127.0.0.1", 7000).sync();
System.out.println("server is ready ......");
sync.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
static class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
count++;
System.out.println("服务器第"+count+"次接收到来自客户端的数据:" + new String(bytes, StandardCharsets.UTF_8));
// 服务器回送数据给客户端 回送随机的UUID给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer(UUID.randomUUID().toString(),StandardCharsets.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
}
NettyClient
package com.daicy.netty.netty.tcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.StandardCharsets;
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync();
sync.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
static class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连续发送10条数据
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server!" + i, StandardCharsets.UTF_8));
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
// 接收服务器的返回
count++;
System.out.println("客户端第"+count+"次接收服务端的回送:" + new String(bytes, StandardCharsets.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
}
执行结果
Server
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
server is ready ......
服务器第1次接收到来自客户端的数据:hello,server!0hello,server!1hello,server!2hello,server!3hello,server!4hello,server!5hello,server!6hello,server!7hello,server!8hello,server!9
服务器第1次接收到来自客户端的数据:hello,server!0
服务器第2次接收到来自客户端的数据:hello,server!1
服务器第3次接收到来自客户端的数据:hello,server!2hello,server!3hello,server!4
服务器第4次接收到来自客户端的数据:hello,server!5hello,server!6
服务器第5次接收到来自客户端的数据:hello,server!7hello,server!8hello,server!9
Client1
客户端第1次接收服务端的回送:84653e99-0e7f-431d-a897-c215af959a3b
Client2
客户端第1次接收服务端的回送:6f3b0e79-2f40-4066-bb6b-80f988ecec116b6bbd94-b345-46d6-8d36-a114534331a850628e04-ece1-4f58-b684-d30189f6cf26b2139027-6bda-4d40-9238-9fc0e59bc7a64b568ffe-f616-4f48-8f1c-05ecf3e817ee
分析:
服务器启动后到server is ready ……
第一个客户端启动后 TCP将10次发送直接封包成一次直接发送,所以导致了服务器一次就收到了所有的数据,产生了TCP粘包,拆包的问题
第二客户端启动后 TCP将10次发送分别封装成了5次请求,产生粘包,拆包问题
# TCP粘包和拆包解决方案
- 使用自定义协议 + 编解码器来解决
- 关键就是要解决 服务器每次读取数据长度的问题, 这个问题解决, 就不会出现服务器多读或少读数据的问题,从而避免TCP粘包和拆包
# TCP粘包, 拆包解决方案实现
- 要求客户端发送5个Message对象, 客户端每次发送一个Message对象
- 服务器端每次接收一个Message, 分5次进行解码, 每读到一个Message, 会回复一个Message对象给客户端
# 新建协议MessageProtocol
package com.daicy.netty.netty.protocoltcp;
/**
* 消息协议
*/
public class MessageProtocol {
private int length;
private byte[] content;
public MessageProtocol() {
}
public MessageProtocol(int length, byte[] content) {
this.length = length;
this.content = content;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
# 新建编码器
package com.daicy.netty.netty.protocoltcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 自定义协议编码器
*/
public class MyMessageProtocolEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
// System.out.println("自定义协议---->开始编码");
// 开始发送数据
out.writeInt(msg.getLength()); // 优先发送长度,定义边界
out.writeBytes(msg.getContent());
// System.out.println("自定义协议---->编码完成");
}
}
# 新建解码器
package com.daicy.netty.netty.protocoltcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MyMessageProtocolDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// System.out.println("自定义协议---->开始解码");
// 获取定义的边界长度
int length = in.readInt();
if(in.readableBytes() >= length){
// 根据长度读取数据
byte[] bytes = new byte[length];
in.readBytes(bytes);
// 反构造成MessageProtocol
MessageProtocol messageProtocol = new MessageProtocol(length, bytes);
out.add(messageProtocol);
// System.out.println("自定义协议---->解码完成");
}else{
// 内容长度不够
}
}
}
# 新建服务器端
package com.daicy.netty.netty.protocoltcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 加入自定义协议编解码器
pipeline.addLast(new MyMessageProtocolDecoder());
pipeline.addLast(new MyMessageProtocolEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture sync = serverBootstrap.bind("127.0.0.1", 7000).sync();
System.out.println("server is ready ......");
sync.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
static class NettyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
byte[] bytes = msg.getContent();
count++;
System.out.println("服务器第"+count+"次接收到来自客户端的数据:" + new String(bytes, StandardCharsets.UTF_8));
// 服务器回送数据给客户端 回送随机的UUID给客户端
byte[] s = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
ctx.writeAndFlush(new MessageProtocol(s.length,s));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
}
# 新建客户端
package com.daicy.netty.netty.protocoltcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.StandardCharsets;
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 加入自定义分割符号
// ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes());
// pipeline.addFirst(new DelimiterBasedFrameDecoder(8192, delimiter));
// 添加自定义协议编解码器
pipeline.addLast(new MyMessageProtocolDecoder());
pipeline.addLast(new MyMessageProtocolEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync();
sync.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
static class NettyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连续发送10条数据
for (int i = 0; i < 10; i++) {
String msg = "今天天气冷, 打火锅" + i;
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
// 使用自定义协议
MessageProtocol messageProtocol = new MessageProtocol(bytes.length, bytes);
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
byte[] bytes = msg.getContent();
// 接收服务器的返回
count++;
System.out.println("客户端第"+count+"次接收服务端的回送:" + new String(bytes, StandardCharsets.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
}
# 测试
发送10次
服务器端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
server is ready ......
服务器第1次接收到来自客户端的数据:今天天气冷, 打火锅0
......
服务器第10次接收到来自客户端的数据:今天天气冷, 打火锅9
客户端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
客户端第1次接收服务端的回送:a6b69f1c-daba-435a-802a-c19a6350ca94
......
客户端第10次接收服务端的回送:5af5c297-8668-48aa-b8c4-35656142f591
ok,没有问题, 但是真的没有问题吗?答案是有问题
# FAQ
发送1000次
# 修改客户端发送消息数量
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连续发送10条数据
for (int i = 0; i < 1000; i++) {
......
}
}
# 重新测试
服务器端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
server is ready ......
服务器第1次接收到来自客户端的数据:今天天气冷, 打火锅0
......
服务器第31次接收到来自客户端的数据:今天天气冷, 打火锅30
服务器第32次接收到来自客户端的数据:今天天气冷, 打火锅31
io.netty.handler.codec.DecoderException: java.lang.NegativeArraySizeException
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1412)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:943)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
at com.daicy.netty.netty.protocoltcp.MyMessageProtocolDecoder.decode(MyMessageProtocolDecoder.java:17)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
... 16 more
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(1022) + length(4) exceeds writerIndex(1024): PooledUnsafeDirectByteBuf(ridx: 1022, widx: 1024, cap: 1024)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:359)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1407)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:925)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(1022) + length(4) exceeds writerIndex(1024): PooledUnsafeDirectByteBuf(ridx: 1022, widx: 1024, cap: 1024)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1403)
at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:786)
at com.daicy.netty.netty.protocoltcp.MyMessageProtocolDecoder.decode(MyMessageProtocolDecoder.java:14)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
... 17 more
what ? 直接报错了, 数组下标越界, 读索引1022 + 长度4 > 写缩影1024了
这个是什么问题呢 ? 我看网上关于这个BUG的解决方案很少,基本没有, 好多都是贴问题的, 我翻了将近1个小时,才找到一个大佬写的一篇文章解决了, 感谢大佬
博客地址:
https://blog.csdn.net/u011035407/article/details/80454511
问题描述:
这样在刚开始的工作中数据包传输没有问题,不过数据包的大小超过512b的时候就会抛出异常了。
# 解决方案
配合解码器DelimiterBasedFrameDecoder一起使用,在数据包的末尾使用换行符\n表示本次数据包已经结束,当DelimiterBasedFrameDecoder把数据切割之后,再使用ByteToMessageDecoder实现decode方法把数据流转换为Message对象。
我们在ChannelPipeline加入DelimiterBasedFrameDecoder解码器
客户端和服务器端都加
//使用\n作为分隔符
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
在MessageToByteEncoder的实现方法encode()增加out.writeBytes(new byte[]{’\n’});
//在写出字节流的末尾增加\n表示数据结束
out.writeBytes(new byte[]{'\n'});
这时候就可以愉快的继续处理数据了。 等我还没有高兴半天的时候,问题又来了。还是一样的问题
等等等,,,怎么又报错了,不是已经加了黏包处理了吗??,解决问题把,首先看解析的数据包结构
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 01 01 00 00 00 06 00 00 01 0a 7b 22 69 64 22 |...........{"id"|
|00000010| 3a 33 2c 22 75 73 65 72 6e 61 6d 65 22 3a 22 31 |:3,"username":"1|
|00000020| 38 35 30 30 33 34 30 31 36 39 22 2c 22 6e 69 63 |8500340169","nic|
|00000030| 6b 6e 61 6d 65 22 3a 22 e4 bb 96 e5 9b 9b e5 a4 |kname":"........|
|00000040| a7 e7 88 b7 22 2c 22 72 6f 6f 6d 49 64 22 3a 31 |....","roomId":1|
|00000050| 35 32 37 32 33 38 35 36 39 34 37 34 2c 22 74 65 |527238569474,"te|
|00000060| 61 6d 4e 61 6d 65 22 3a 22 e4 bf 84 e7 bd 97 e6 |amName":".......|
|00000070| 96 af 22 2c 22 75 6e 69 74 73 22 3a 7b 22 75 6e |..","units":{"un|
|00000080| 69 74 31 22 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 |it1":{"x":10.0,"|
|00000090| 79 22 3a 31 30 2e 30 7d 2c 22 75 6e 69 74 32 22 |y":10.0},"unit2"|
|000000a0| 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 79 22 3a 31 |:{"x":10.0,"y":1|
|000000b0| 30 2e 30 7d 2c 22 75 6e 69 74 33 22 3a 7b 22 78 |0.0},"unit3":{"x|
|000000c0| 22 3a 31 30 2e 30 2c 22 79 22 3a 31 30 2e 30 7d |":10.0,"y":10.0}|
|000000d0| 2c 22 75 6e 69 74 34 22 3a 7b 22 78 22 3a 31 30 |,"unit4":{"x":10|
|000000e0| 2e 30 2c 22 79 22 3a 31 30 2e 30 7d 2c 22 75 6e |.0,"y":10.0},"un|
|000000f0| 69 74 35 22 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 |it5":{"x":10.0,"|
|00000100| 79 22 3a 31 30 2e 30 7d 7d 2c 22 73 74 61 74 75 |y":10.0}},"statu|
|00000110| 73 22 3a 31 7d 0a |s":1}. |
+--------+-------------------------------------------------+----------------+
接收到的数据是完整的没错,但是还是报错了,而且数据结尾的字节的确是0a,转化成字符就是\n没有问题啊。
在ByteToMessageDecoder的decode方法里打印ByteBuf buf的长度之后,问题找到了 长度 : 10
这就是说在进入到ByteToMessageDecoder这个解码器的时候,数据包已经只剩下10个长度了,那么长的数据被上个解码器DelimiterBasedFrameDecoder隔空劈开了- -。问题出现在哪呢,看上面那块字节流的字节,找到第11个字节,是0a。。。。因为不是标准的json格式,最前面使用了3个字节 加上2个int长度的属性,所以 数据包头应该是11个字节长。
而DelimiterBasedFrameDecoder在读到第11个字节的时候读成了\n,自然而然的就认为这个数据包已经结束了,而数据进入到ByteToMessageDecoder的时候就会因为规定的body长度不等于length长度而出现问题。
思来想去 不实用\n 这样的单字节作为换行符,很容易在数据流中遇到,转而使用\r\n俩字节来处理,而这俩字节出现在前面两个int长度中的几率应该很小。
# 最终解决
在客户端和服务器端的pipeline中添加 以 “\r\n” 定义为边界的符号来标识数据包结束
//这里使用自定义分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes());
pipeline.addFirst(new DelimiterBasedFrameDecoder(8192, delimiter));
Server端
Client端
编码器中发送结束位置增加
//这里最后修改使用\r\n
out.writeBytes(new byte[]{'\r','\n'});
再次运行程序 数据包可以正常接收了。
# 最终测试
服务器端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
server is ready ......
服务器第1次接收到来自客户端的数据:今天天气冷, 打火锅0
......
服务器第999次接收到来自客户端的数据:今天天气冷, 打火锅998
服务器第1000次接收到来自客户端的数据:今天天气冷, 打火锅999
客户端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
客户端第1次接收服务端的回送:48fa6d78-8079-4700-b488-ca2af9eb3f8c
......
客户端第999次接收服务端的回送:581da47b-d77b-4972-af11-6d33057f6610
客户端第1000次接收服务端的回送:0014e906-69cb-4900-9409-f4d1af9148dd
# 总结
以前使用netty的时候也仅限于和硬件交互,而当时的硬件受限于成本问题是一条一条处理数据包的,所以基本上不会考虑黏包问题
然后就是ByteToMessageDecoder和MessageToByteEncoder两个类是比较底层实现数据流处理的,并没有带有拆包黏包的处理机制,需要自己在数据包头规定包的长度,而且无法处理过大的数据包,因为我一开始首先使用了这种方式处理数据,所以后来就没有再换成DelimiterBasedFrameDecoder加 StringDecoder来解析数据包,最后使用json直接转化为对象。
# 编码/解码
# ReplayingDecoder
# ReplayingDecoder
的原理
ReplayingDecoder
继承了ByteToMessageDecoder
,但是使用ReplayingDecoder
的好处在于:ReplayingDecoder
在处理数据时可以认为所有的数据(ByteBuf) 已经接收完毕,而不用判断接收数据的长度。
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder
使用了特殊的ByteBuf
:ReplayingDecoderByteBuf
,当数据不够时会抛出一类特殊的错误,然后ReplayingDecoder
会重置readerIndex
并且再次调用decode
方法。- 泛型
<S>
使用 枚举Enum
来表示状态,其内部存在状态管理。如果是无状态的,则使用Void
。
# 继承基类ByteToMessageDecoder
的方式
下面是一个用来解码带有长整型(Long
)数据头head的解码器:
public class LongHeaderFrameDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
//总字节数<8,不够Long的长度,返回
if (buf.readableBytes() < 8) {
return;
}
buf.markReaderIndex();
//读取head的值,例如6,说明body的长度是6个字节
int length = buf.readLong();
//body的总字节数不够6,返回
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return;
}
//读取6个长度的body
out.add(buf.readBytes(length));
}
}
从以上代码可以看出,在decode
方法中需要对数据的长度做判断,依据ByteBuf
的readerIndex
来获取真实数据,逻辑比较复杂。
# 继承基类ReplayingDecoder
的方式
如果以上的例子选择继承ReplayingDecoder
,那逻辑会非常简单。由于不存在状态管理,所以泛型使用Void
。
public class LongHeaderFrameDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
// 读取head的值,例如6,说明body的长度是6个字节
int length = buf.readLong();
// 读取6个长度的body
out.add(buf.readBytes(length));
}
}
# 状态管理和checkpoint
方法
状态可以使用枚举Enum
来表示,如:
public enum MyDecoderState {
READ_HEAD,
READ_BODY;
}
当调用checkpoint(MyDecoderState state)
时,ReplayingDecoder
会将当前readerIndex
赋值给int
类型的成员变量checkpoint
,在后续数据读取过程中方便重置。
protected void checkpoint(S state) {
checkpoint();
state(state);
}
protected void checkpoint() {
checkpoint = internalBuffer().readerIndex();
}
使用状态管理后的LongHeaderFrameDecoder
:
public class LongHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {
// HEAD的长度
private int length;
public LongHeaderFrameDecoder() {
// 初始状态是读取头部HEAD
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_HEAD:
length = buf.readLong();
checkpoint(MyDecoderState.READ_BODY);
case READ_BODY:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_BODY);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
# 一、设计模式为啥老是用不好?
想要写出更屌的代码,提高代码的健壮性和可扩展性,那么设计模式可谓是必学的技能。
关于学习设计模式,大家可能都觉得设计模式的概念太过于抽象,理解起来有点费劲;又或者看的时候是理解了,但是写起代码时,却毫无头绪,压根不知道可以套用哪个设计模式。
对,可以看到我使用了 “套” 这个字眼,正是因为我们无法深入理解设计模式的设计理念和使用场景,所以我们往往是想让我们的代码套用设计模式,而不理会业务场景是否合适。
关于设计模式的学习,我不会推荐任何书,因为我自己也没看过,哈哈哈。我看过的是龙哥的设计模式系列文章,里面的文章不但会介绍设计模式的概念,也会用非常有趣的场景去讲解设计模式的设计理念,下面先分享一波链接:龙哥设计模式全集 (opens new window)。
对于我自己而言,关于设计模式的使用,除非是非常深刻的理解了,又或者某种设计模式的使用场景非常的清晰明确(例如创建型设计模式中的单例模式、结构型设计模式中的组合模式、行为型设计模式中的策略模式等等),不然我也不知道该如何使用,和什么时候使用。
# 二、在阅读开源框架源码中学习设计模式!
想学习设计模式的使用方式,何不研究一下各大优秀的开源框架的源码。
想更深层次的理解设计模式,往往阅读优秀的框架和中间件的源码是非常好的方式。优秀的开源框架和中间件,里面都使用了大量的设计模式,使得框架的实用性、可扩展性和性能非常的高。
很巧,今天在工作的空余时间中,我继续阅读一本关于并发的书,并看到关于 Netty 的内置解码器,其中最常用的有 ReplayingDecoder,它是 ByteToMessageDecoder 的子类,作用是: 在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节;若ByteBuf中有足够的字节,则会正常读取;反之,如果没有足够的字节,则会停止解码。
它是如何做到自主控制解码的时机的呢?其实底层是使用了 ReplayingDecoderByteBuf 这个继承于 ByteBuf 的实现类。而它使用了装饰器设计模式。
# 1、在 Netty 中如何自定义实现整数解码器?
# 1.1、ByteToMessageDecoder:
我们需要自定义类需要继承 ByteToMessageDecoder 抽象类,然后重写 decode 方法即可。
看代码:
/**
* @author daichangya
* @desc
* @date 2020/8/21
*/
public class MyIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
while (byteBuf.readableBytes() >= 4){
int num = byteBuf.readInt();
System.out.println("解码出一个整数:"+num);
list.add(num);
}
}
}
我们可以看到非常的简单,就是不断地判断缓冲区里的的可读字节数是否大于等于4(Java 中整数的大小);如果是的话就读取4个字节大小的内容,然后放到结果集里面。
# 1.2、ReplayingDecoder:
我们需要自定义类需要继承 ReplayingDecoder 类,然后重写 decode 方法即可。
看代码:
/**
* @author daichangya
* @desc
* @date 2020/8/21
*/
public class MyIntegerDecoder2 extends ReplayingDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int num = byteBuf.readInt();
System.out.println("解码出一个整数:"+num);
list.add(num);
}
}
这个实现更加简单,那就是去掉判断,直接调用 ByteBuf 的 readInt() 方法去获取整数即可。
# 1.3、测试用例:
1.3.1、自定义业务处理器:
先创建一个业务处理器 IntegerProcessHandler,用于处理上面的自定义解码器解码之后的 Java Integer 整数。其功能是:读取上一站的入站数据,把它转换成整数,并且输出到Console控制台上。
码如下:
/**
* @author daichangya
* @desc
* @date 2020/8/21
*/
public class IntegerProcessorHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Integer integer = (Integer) msg;
System.out.println("打印出一个整数:"+integer);
}
}
这个业务处理器非常的简单,直接继承 ChannelInBoundHandlerAdapter,然户重写 channelRead() 方法即可。
1.3.2、利用 EmbeddedChannel 进行测试:
为了测试入站处理器,需要确保通道能接收到 ByteBuf 入站数据。这里调用 writeInbound 方法,模拟入站数据的写入,向嵌入式通道 EmbeddedChannel 写入100次 ByteBuf 入站缓冲;每一次写入仅仅包含一个整数。
EmbeddedChannel 的 writeInbound 方法模拟入站数据,会被流水线上的两个入站处理器所接收和处理。接着,这些入站的二进制字节被解码成一个一个的整数,然后逐个地输出到控制台上。
看代码:
public class Test{
public static void main(String[] args){
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
@Override
protected void initChannel(EmbeddedChannel channel) throws Exception {
// 继承 ByteToMessageDecoder 抽象类的自定义解码器
// channel.pipeline().addLast(new MyIntegerDecoder()).addLast(new IntegerProcessorHandler());
// 继承 ReplayingDecoder 类的自定义解码器
channel.pipeline().addLast(new MyIntegerDecoder2()).addLast(new IntegerProcessorHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0;j < 20;j++){
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeInt(j);
channel.writeInbound(byteBuf);
}
ThreadUtil.sleep(Integer.MAX_VALUE);
}
}
通过测试,两个自定义 Decoder 都是没问题的。而他们的最大不同点在于:继承抽象类 ByteToMessageDecoder 的解码器需要判断可读字节数是否大于等于4,大于等于才可以读取一个整数出来;而继承 ReplayingDecoder 的解码器直接调用 readInt() 方法即可。
# 2、解读 ReplayingDecoder 的原理
其实其中的原理非常的简单,我们可以直接从 ReplayingDecoder 的源码入手:
# 2.1、ReplayingDecoder的构造函数:
首先是构造函数,此处我们用了无参构造函数:
protected ReplayingDecoder() {
this((Object)null);
}
protected ReplayingDecoder(S initialState) {
this.replayable = new ReplayingDecoderByteBuf();
this.checkpoint = -1;
this.state = initialState;
}
我们可以看到,主要是初始化了 ReplayingDecoderByteBuf(其实就是加了点料的 ByteBuf)、checkpoint(读指针下标) 和 state。我们这篇文章不需要理会 state 属性,这个属性是稍微高级一点的用法。 我们最需要关注的是 ReplayingDecoderByteBuf 这个类。
# 2.2、继续探讨 ReplayingDecoderByteBuf:
那么接下来看看 ReplayingDecoderByteBuf 的源码。
2.2.1、ReplayingDecoderByteBuf 的属性:
final class ReplayingDecoderByteBuf extends ByteBuf {
private static final Signal REPLAY;
private ByteBuf buffer;
private boolean terminated;
private SwappedByteBuf swapped;
static final ReplayingDecoderByteBuf EMPTY_BUFFER;
ReplayingDecoderByteBuf() {
}
//...
}
我们可以看到,它继承了 ByteBuf 抽象类,并且里面包含一个 ByteBuf 类型的 buffer 属性,剩余的其他属性暂时不需要看懂。
2.2.2、瞧一瞧 readInt() 方法:
那么接下来,我们就是直接看 ReplayingDecoderByteBuf 的 readInt() 方法了,因为我们知道,在上面的自定义解码器 MyIntegerDecoder2 的 decode() 方法中,只需要直接调用 ByteBuf(也就是 ReplayingDecoderByteBuf) 的 readInt() 方法即可解码一个整数。
public int readInt() {
this.checkReadableBytes(4);
return this.buffer.readInt();
}
readInt() 方法非常简单,首先是调用 checkReadableBytes() 方法,并且传入 4。根据方法名,我们就可以猜到,先判断缓冲区中是否有4个可读字节;如果是的话,就调用 buffer 的 readInt() 方法,读取一个整数。
2.2.3、继续看看 checkReadableBytes() 方法:
代码如下:
private void checkReadableBytes(int readableBytes) {
if (this.buffer.readableBytes() < readableBytes) {
throw REPLAY;
}
}
方法非常简单,其实和我们上面的 MyIntegerDecoder 一样,就是判断缓冲区中是否有 4个字节的可读数据,如果不是的话,则抛出异常。
2.2.4、Signal 异常:
而我们最需要关注的就是这个异常,这个异常是 ReplayingDecoder 的静态成员变量。它是继承了 error 的异常类,是 netty 提供配合 ReplayingDecoder 一起使用的。
至于如何使用,我们可以看到 ReplayingDecoder 的 callDecode() 方法:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 调用 ReplayingDecoderByteBuf 的 setCumulation() 方法,使用 ReplayingDecoderByteBuf 装饰 ByteBuf
this.replayable.setCumulation(in);
try {
while(in.isReadable()) {
int oldReaderIndex = this.checkpoint = in.readerIndex();
int outSize = out.size();
if (outSize > 0) {
// 将结果集流到下一个 InBoundChannel
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
S oldState = this.state;
int oldInputLength = in.readableBytes();
try {
// 调用自定义解码器的 decode() 方法进行解码
this.decodeRemovalReentryProtection(ctx, this.replayable, out);
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes() && oldState == this.state) {
throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() must consume the inbound data or change its state if it did not decode anything.");
}
continue;
}
} catch (Signal var10) {
// 如果不是 Sinal 异常,则往外抛
var10.expect(REPLAY);
if (!ctx.isRemoved()) {
// 设置读指针为原来的位置
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
in.readerIndex(checkpoint);
}
}
break;
}
// ......
}
} catch (DecoderException var11) {
throw var11;
} catch (Exception var12) {
throw new DecoderException(var12);
}
}
到这里,我们可以捋一下思路:
- 当缓冲区数据流到继承 ReplayingDecoder 的解码器时,会先判断结果集是否有数据,如果有则流入到下一个 InBoundChannel;
- 接着会调用自定义解码器的 decode() 方法,而这里就是是直接调用 ByteBuf 的 readInt() 方法,即 ReplayingDecoderByteBuf 的 readInt() 方法;里面会先判断可读字节大小是否大于 4,如果大于则读取,否则抛出 Signal 这个 Error 类型的异常。
- 如果 ReplayingDecoder 捕捉 Signal 这个异常,会先判断 checkpoint(即读指针下标不) 是否为零,如果不是则重新设置读指针下标,然后跳出读循环。
ReplayingDecoder 能做到自主控制解码的时机,是因为使用 ReplayingDecoderByteBuf 对 ByteBuf 进行修饰,在调用 ByteBuf 的方法前,会先调用自己的判断逻辑,这也就是我们常说的装饰器模式。
# 三、装饰器模式的特点
首先,被装饰的类和装饰类都是继承同一个类(抽象类)或实现同一个接口。
接着,被装饰类会作为装饰类的成员变量。
最后,在执行被装饰类的方法前后,可能会调用装饰类的方法。
场景总结:
装饰器模式常用于这么一个场景:在不修改类的状态(属性或行为)下,对类的功能进行扩展!
当然啦,这是我自己个人的总结,大家可去阅读专业的书籍来证实这是否正确。如果有更好的总结,可以留言给我,让我也学习学习~