一、基础概念

  1. IO与NIO
阻塞与非阻塞是描述进程在访问某个资源时,数据是否准备就绪的的一种处理方式。当数据没有准备就绪时:

阻塞:线程持续等待资源中数据准备完成,直到返回响应结果。
非阻塞:线程直接返回结果,不会持续等待资源准备数据结束后才响应结果。

同步与异步
同步与异步是指访问数据的机制,同步一般指主动请求并等待IO操作完成的方式。
异步则指主动请求数据后便可以继续处理其它任务,随后等待IO操作完毕的通知。
    普通水壶煮水,站在旁边,主动的看水开了没有?同步的阻塞
    普通水壶煮水,去干点别的事,每过一段时间去看看水开了没有,水没开就走人。 同步非阻塞
    响水壶煮水,站在旁边,不会每过一段时间主动看水开了没有。如果水开了,水壶自动通知他。 异步阻塞
    响水壶煮水,去干点别的事,如果水开了,水壶自动通知他。异步非阻塞

  1. 传统BIO模型

是一种同步阻塞IO,IO在进行读写时,该线程将被阻塞,线程无法进行其它操作。 IO流在读取时,会阻塞。直到发生以下情况:1、有数据可以读取。2、数据读取完成。3、发生异常

  1. 伪异步IO模型

以传统BIO模型为基础,通过线程池的方式维护所有的IO线程,实现相对高效的线程开销及管理

  1. NIO模型

NIO(JDK1.4)模型是一种同步非阻塞IO,主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(多路复用器)。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(多路复用器)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。

IO是面向流的,NIO是面向缓冲区的,缓冲区数据可以前后移动

IO的各种流是阻塞的,这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道

5.三大组件

channel

FileChannel 文件
datachannel
soketchannel 客户端服务端都可以用
serverSocketchannel 服务端专用

Buffer

ByteBuffer

Selector

服务器端soket连接多个客户端,多线程版设计,缺点内存占用高,上下文切换成本高(保存状态恢复状态),16核cpu一次最多只能跑16线程。只适合连接数少场景。
线程池版设计,阻塞模式下,线程仅能处理一个socket连接。连接上啥事不干也会等待,仅适合短连接场景
selector版设计,作用就是配合一个线程管理多个channel获取channel上发生的事,这些channel是在非阻塞模式下工作,不会让线程吊死在一个channnel上,适合连接特别多,但是流量低的场景。



  1. ByteBuffer

正确使用姿势

1.向buffer中写数据 channel.read(buffer)
2.切换到读模式 buffer.flip()
3.从buffer中读数据,buffer.get()
4.切换到写模式 buffer.lear()或buffer.compact()
5.重复1~4步骤

重要属性

1. capacity 容量
2. position 指针
3. limit 写入限制

flip动作发生后,position切换为读取位置,limit切换为读取限制(前面写的position位置)
clear动作发生后,清空
compact方法,是把未读完的部分向前压缩,然后切换到写模式

常见方法

1. allocate: 空间分配,定义buffer的大小,不可变的
ByteBuffer.allocate(16); //java对内存 读写效率较低,收到GC影响
ByteBuffer.allocateDirect(16);//直接内存,读写效率高(一次拷贝)不受GC影响 分配的效率低

2. 向ByteBuffer写入数据
    调用channel的read方法:channnel.read(buffer)
    调用buffer的put方法:buffer.put();
3. 从ByteBuffer读数据
    调用channel的write方法:channel.write(buffer)
    调用buffer 自己的get方法:buffer.get() 该方法会让position读指针向后走,如果想重复读取数据可以调用rewind方法将position方法重置为0,或者调用get(position)获取索引i的内容,不会移动指针。

4. scatteringReads 作用:“onetwothree”将一串字符串读出来并分开,


二、使用方法

参考链接 https://www.cnblogs.com/AIPAOJIAO/p/10631551.html

/**
     * 初始化连接
     */
    public void connect() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                connectServer();
            }
        }).start();
    }
     private void connectServer() {
        try {
            group = new NioEventLoopGroup();
            bootstrap = new Bootstrap()
                    .option(ChannelOption.SO_KEEPALIVE, true)//长连接
                    .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))//两个字节的最大保存值 255是一个字节的最大保存值
                    .channel(NioSocketChannel.class)
                    .group(group)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        ByteBuf delimiter = Unpooled.buffer(1);

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            delimiter.writeByte(0x7E);
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new DelimiterBasedFrameDecoder(65535, delimiter));
                            /**
                             * readerIdleTime 设置读超时时间,说白了就是这个时间内没有读到有报文,userEventTriggered被触发,(设置这个参数常用在服务器端)
                             * writerIdleTime 同样的道理,这个是写超时,这个就是今天提到自定义心跳要用的(设置在客户端)
                             * allIdleTime 这个不多说,读写多超时
                             */
                            pipeline.addLast(new IdleStateHandler(0, HEART_TIME, 0));//第二个参数,表示客户端在15s没有数据发出,会主动发一个心跳包给服务端
                            pipeline.addLast(new JTT808Decoder());
                            pipeline.addLast(new JTT808Encoder());
                            pipeline.addLast(new JTTLanZhouHandler(JTTLanZhouClient.this, listener));//处理数据接收
                            pipeline.addLast(new ConnectorIdleStateTrigger());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(ip, port));
            channelFuture.addListener(channelFutureListener);
        } catch (Exception e) {
            e.printStackTrace();
            if (listener != null) {
                listener.onConnectionSateChange(OnConnectionListener.DIS_CONNECT);
            }
        }
    }
    private ChannelFutureListener channelFutureListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (!channelFuture.isSuccess()) {
                Log.i(TAG, "链接失败----ChannelFutureListener");
               /* if (listener != null) {
                    listener.onConnectionSateChange(OnConnectionListener.DIS_CONNECT);
                }*/
                final EventLoop loop = channelFuture.channel().eventLoop();
                loop.schedule(new Runnable() {
                    @Override
                    public void run() {
                        reConnect();
                        if (listener != null) {
                            listener.onConnectionSateChange(OnConnectionListener.RE_CONNECT);
                        }
                    }
                }, 3, TimeUnit.SECONDS);
            } else {
                Log.i(TAG, "链接成功----ChannelFutureListener ");
                channel = channelFuture.channel();
                //开启周期性上报位置服务 5s
                JTTLanZhouManager.getInstance().periodUploadLocation();
            }
        }
    };
    public interface MyChannelFutureListener {
        void operationComplete(ChannelFuture channelFuture, int reconect);

        void reconect(int count);
    }

三、解析905格式报文

  1. 组包

``` /* * 陕标 * @return / public static byte[] generate905(int msgId,byte[] msgBody){ //=========================标识位==================================// byte[] flag = new byte[]{0x7E};

       //=========================消息头==================================//
       //[0,1]消息Id
       byte[] msgIdb = BitOperator.numToByteArray(msgId, 2);
       //信息属性--消息包大小
       byte[] length = BitOperator.numToByteArray(msgBody.length, 2);
       //ISU标识 10开头  通讯号 861130040179
           //10开头 2字节
           byte[] start = BitOperator.numToByteArray(01, 1);
           //厂商编号 1字节
           byte[] number = BitOperator.numToByteArray(31, 1);
           //设备类型 1字节
           byte[] type = BitOperator.numToByteArray(23, 1);
           //序列号 3字节
           byte[] serno = BitOperator.numToByteArray(456789, 3);
       byte[] isu = ByteUtil2.byteMergerAll(start, number, type,serno);
       byte[] terminalPhone = BCD8421Operater.string2Bcd("013123456789");
       //流水号
       byte[] flowNum = BitOperator.numToByteArray(SocketConfig.getSocketMsgCount(), 2);
       //消息头
       byte[] msgHeader = ByteUtil2.byteMergerAll(msgIdb, length, terminalPhone, flowNum);
       //=========================数据合并(消息头,消息体)=====================//
       byte[] bytes = ByteUtil2.byteMergerAll(msgHeader, msgBody);
       //=========================计算校验码==================================//
       String checkCodeHexStr = HexUtil2.getBCC(bytes);
       byte[] checkCode = HexUtil2.hexStringToByte(checkCodeHexStr);
       //=========================合并:消息头 消息体 校验码 得到总数据============//
       byte[] AllData = ByteUtil2.byteMergerAll(bytes, checkCode);

       //=========================转义 7E和7D==================================//
       // 转成16进制字符串
       String hexStr = HexUtil2.byte2HexStr(AllData);
       // 替换 7E和7D
       String replaceHexStr = hexStr.replaceAll(FLAG_7D, " 7D 01")
               .replaceAll(FLAG_7E, " 7D 02")
               // 最后去除空格
               .replaceAll(" ", "");

       //替换好后 转回byte[]
       byte[] replaceByte = HexUtil2.hexStringToByte(replaceHexStr);

       //=========================最终传输给服务器的数据==================================//
       return ByteUtil2.byteMergerAll(flag, replaceByte, flag);
   }

```

  1. 解包
/**
     * 解析数据
     *
     * @param byteBuf
     */
    private JTT905Bean resolve(ByteBuf byteBuf) {

        ByteBuf escapeBuf = escape7D(byteBuf);
       /* byte[] bytes = new byte[escapeBuf.readableBytes()];
        int readerIndex = escapeBuf.readerIndex();
        escapeBuf.getBytes(readerIndex, bytes);*/
        JTT905Bean jtt905Bean = new JTT905Bean();
        //解析消息头
        JTT905Bean.MsgHeader msgHeader = new JTT905Bean.MsgHeader();
        byte[] msgId = escapeBuf.readBytes(2).array();
        byte[] msgAttributes = escapeBuf.readBytes(2).array();
        byte[] terminalPhone = escapeBuf.readBytes(6).array();
        byte[] flowNum = escapeBuf.readBytes(2).array();

        msgHeader.setMsgId(msgId);
        msgHeader.setMsgAttributes(msgAttributes);
        msgHeader.setTerminalPhone(terminalPhone);
        msgHeader.setFlowNum(flowNum);

        //消息体长度
        int[] msgBodyAttr = resolveMsgBodyLength(msgAttributes);
        if (msgBodyAttr[0] == 1) {
            //TODO 分包
            escapeBuf.readBytes(4);
        }
        //消息体
        ByteBuf msgBody = escapeBuf.readBytes(msgBodyAttr[1]);
        //校验码
        byte checkCode = escapeBuf.readByte();

        jtt905Bean.setMsgHeader(msgHeader);
        jtt905Bean.setMsgBody(msgBody);
        jtt905Bean.setCheckCode(checkCode);
//        int checkSum4JT808 = getCheckSum4JT808(bytes, 0, bytes.length - 1);
       /* if(checkSum4JT808 != checkCode){
            Log.i(TAG,"校验码不一致");
            return null;
        }*/
        return jtt905Bean;
    }
    public int getCheckSum4JT808(byte[] bs, int start, int end) {
        int cs = 0;
        for (int i = start; i < end; i++) {
            cs ^= bs[i];
        }
        return cs;
    }
    /**
     * 转义 7D 02->7E  7D 01->7D
     *
     * @param byteBuf
     */
    private ByteBuf escape7D(ByteBuf byteBuf) {
        ByteBuf escapeBuf = Unpooled.buffer();
        int length = byteBuf.readableBytes();
        for (; byteBuf.readerIndex() < length; ) {
            byte b = byteBuf.readByte();
            if (b == 0x7D) {
                byte nextB = byteBuf.readByte();
                if (nextB == 0x02) {
                    escapeBuf.writeByte(0x7E);
                } else if (nextB == 0x01) {
                    escapeBuf.writeByte(0x7D);
                } else {
                    escapeBuf.writeByte(b);
                }
            } else {
                escapeBuf.writeByte(b);
            }
        }
        return escapeBuf;
    }

     /**
     * 解析消息体属性
     *
     * @return
     */
    private int[] resolveMsgBodyLength(byte[] msgAttributes) {
        ByteBuf msgAttr = Unpooled.buffer(16);
        for (byte attribute : msgAttributes) {
            msgAttr.writeBytes(ByteUtil.byteToBit(attribute));
        }
        //保留位
        msgAttr.readBytes(2);
        //是否分包
        byte subpackage = msgAttr.readByte();
        //加密方式
        byte[] encrypt = msgAttr.readBytes(3).array();
        //消息体长度
        byte[] bodyLength = msgAttr.readBytes(10).array();
        String bits = "";
        for (byte b : bodyLength) {
            bits += b;
        }
        int msgBodyLength = Integer.parseInt(bits, 2);
        return new int[]{subpackage, msgBodyLength};
    }