bytebuf 分配失败

来源:7-1 内存分配概述

正义柔情永在

2018-10-23

包格式: | 标识位(0x7e) | 消息头 | 消息体 | 校验位 | 标识位(0x7e) |
@Override
protected void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast(new ByteToMessageDecoder() {

                        //每一个单独的包
                        ByteBuf pack = null;

                        //是否为空包
                        boolean isEmptyPack = true;

                        private void buildBuffer(ChannelHandlerContext ctx){

                            if( this.pack == null || this.pack.refCnt()==0 ){//如果已经被回收了,那么重建
                                System.out.println("创建");
                                this.pack = ctx.alloc().buffer();
                            }else{
                                System.out.println("清理");
                                /*
                                 *  使用者在业务handler中忘记回收,那么这里帮忙回收,采用的方式不是重建,只是clear()做到复用
                                 *  如果是第一次解码或者已经将上一次的包发送给下一个handler了,总之,不是半包的情况,那么就进行clear
                                 *
                                 *  不能清理半包数据,半包数据需要等待下一次读取够一个完整的包
                                 */
                                if(isEmptyPack){
                                    this.pack.clear();
                                }
                            }

                        }

                        @Override
                        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

                            this.buildBuffer(ctx);

                            //空轮询阈值
                            int maxReadHeadNum = 1024;

                            while(in.readableBytes()>0 && isEmptyPack){
                                if(--maxReadHeadNum ==0 && isEmptyPack){
                                    //说明有大量非法数据上来.这里在生产环境下直接抛出异常
                                    throw new RuntimeException("大量空轮训,没有读取到任何包头标示位信息");
                                }
                                byte data = in.readByte();
                                if(data == (byte)0x7e){
                                    this.pack.writeByte(data);
                                    this.isEmptyPack = false;
                                    break;
                                }
                            }

                            while (in.readableBytes()>0 && !isEmptyPack){
                                byte data = in.readByte();
                                if(data == (byte)0x7e){ //包尾
                                    this.pack.writeByte(data);
                                    out.add(this.pack);
                                    this.isEmptyPack = true;
                                    break;
                                }else if(data == (byte)0x7d){ //特殊字节
                                    if(in.readableBytes()>0){
                                        byte nextData = in.readByte();
                                        if(nextData == (byte)0x02){
                                            this.pack.writeByte((byte)0x7e);
                                        }else if(nextData == (byte)0x01){
                                            this.pack.writeByte((byte)0x7d);
                                        }
                                    }else{
                                        in.readerIndex(in.readerIndex()-1);
                                        break;//此处必须break ,等待下一次缓冲区的数据足够多在触发decode,不然极限情况下可能陷入死循环
                                    }
                                }else{
                                    this.pack.writeByte(data);
                                }
                            }
                        }
                    });

                    pipeline.addLast(new ChannelInboundHandlerAdapter(){
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf msg1 = (ByteBuf) msg;
                            System.out.println(ByteBufUtil.hexDump(msg1));
                         //   ReferenceCountUtil.release(msg); //只有在此处释放才不会报错
                        }

                    });
                }
            });

为了做到不反复重置buf的读游标.将不够一个完整的包临时存入pack 中, 每次用完之后判断如果当前pack已经被回收了.那么重建,如果没有回收,则清理读写游标即可.
目前这种方式的代码有如下问题:
后面的Handler如果是ChannelInboundHandlerAdapter那么没有问题.如果是SimpleChannelInboundHandler的 话,那么执行几次之后decode形参中的buf就读取不到任何数据了(buf.readableBytes()==0).这是什么原因

写回答

1回答

闪电侠

2018-10-26

SimpleChannelInboundHandler 会自动释放内存,和adapter是有区别的

0
1
正义柔情永在
老师,事例代码有点乱,可能我没叙述清楚。simple是会释放,但是我会通过判断引用计数是否等于零。如果为零,就会重建一个。但是依然会报错。且并不是第一次执行报错。我测试了好几次。好像是读到256个字节后以后就不读了。假设一个包32个字节,那么会读取八次后,decode中buffer就读不到数据了。而此时客户端仍然不停的发送数据。而且新建的buffer无论初始容量分配多大。就是读取256个字节后,程序卡住。
2018-10-26
共1条回复

Java读源码之Netty深入剖析

解析netty各大组件细节,百万级性能调优,设计模式实际运用

2334 学习 · 283 问题

查看课程