Netty与NIO
感觉很多文章都讲不到重点,很多都是API层面怎么用,很多同学搞IO这块看的也是一头雾水
今天来了兴致写一篇吧,一气呵成,前篇没有去查任何资料,全靠记忆与理解回顾,后篇是很早之前学习过程的笔记贴上,如有问题欢迎指正~
Netty用于高并发服务开发,基于NIO,要想明白Netty,要先知道NIO的情况,然后多了解下操作系统就OK了,不想说netty你的性能优化真的是各种数组替换hash结构的骚操作太多~
PS:这篇涉及IO模型、堆外内存、Netty分析、操作系统IO原理、内存分配、并发实践等内容,后续有空配图
NIO
在Java1.4的时候就提供了nio的支持,non-blocking I/O,好多也叫new I/O,主要是非阻塞的实现,本质还是同步的通信模型
这里就会有人问了,非阻塞不应该是异步么?这就是没分清楚的表现,这种问题还经常出现在面试题中(其实是我经常问。。。)
首先概念要先搞清楚
- 阻塞与非阻塞指的是调用方的情况,同步与异步指的是通信的模型
- 本质上你调用的IO通信接口都是操作系统提供的,操作系统的接口你调用之后就阻塞住了还是立即返回这叫阻塞非阻塞
- 而你调用系统之后,系统又与TCP与网卡交互,有数据之后操作系统会把数据传输到应用程序中的过程是通信过程,如果操作系统只提供了查询有没有数据的接口,你只能去拉数据确认有没有那这就是同步,如果系统提供回调通知的方式,来了数据会主动回调你,避免了你不停的去轮询这个就是异步通信,是个推的过程
- 其实IO上的阻塞总是有的,只是说应用处理上是不是阻塞的,如果可以把数据处理与接收请求2个事情分开这个事情就解了
先看看Java吧
- 1.4之前都是用BIO模式,还记得那些讨厌的装饰器模式的InputStream与OutputStream之间的嵌套使用么,那个就是,在Socket编程中基本都是服务端Socket调用accept方法来返回Socket对象才能操作客户端的请求,如果没有请求,那就要阻塞。这有2个问题,1是主线程只能卡主不能干别的,2是主线程accept到socket进来了还要去执行任务,这样下个socket请求就只能卡在TCP缓冲区里了要等这个处理完才能处理,服务端瞬间变为同步模式排队处理请求,所以BIO时代处理都是用线程池玩儿,这样虽然服务端监听accept还在卡主,但是多少是可以并发处理
- 1.4之后的nio,提供了一个selector方法,open一下,然后监听accept事件,虽然没有事件也是阻塞的,但是一旦有了事件会返回一个
List<SelectKey>
,这是一组Socket请求,然后你就可以while循环一个个去处理,当然你不是直接去处理,这就跟刚才没区别了,而是NIO中accept与用户读写事件完全分离了,accept之后如果想去读写数据可以再单独去订阅读写事件,那刚才接收到accept的主线程就直接用另一个selector去订阅read事件,这样刚才那个selector就可以独立去处理请求了。重要的是读写的时候也是一批List<SelectKey>
去拉取处理,这样相当于术业有专攻,有专门批量处理请求的,也有专门批量处理读写的,谁也可以不等着谁的流程。底层其实还是同步去确认系统数据的,只是API上通过监听分开,达到了非阻塞的目的,同时采用的是多路复用的IO模型,这个后续会讲 - 1.7之后提供了aio,这个就是刚才说的非阻塞异步模型了,不过使用的人真不多,netty曾经用过不过效率还没nio好,所以也弃用了,以我的感知认为其实在推拉模型中跨单元调用拉可能是大部分设计比较好的方式(不过也有特例,比如现在的Reactive模式,Disrupter模型)
说下NIO底层
- 其实nio底层采用的操作系统的epoll方法,这个epoll相当于在每个socket的队列中都插入epoll指针,自己也维护一个用户测注册epoll的回调,当网卡收到数据会中断操作系统来接收数据(中断是操作系统的机制,就是IO总是有限的,CPU忙到中间IO有读写CPU就要中断程序优先响应IO处理),操作系统会读取数据写入socket文件中(linux中设备、网络都相当于是文件,fd),这时候epoll就发现来数据了,它给每个socket都增加了队列监听,来了数据就去从系统的socket数据登记列表中查哪些socket有数据(操作系统维护了一个数据通知的表避免epoll把所有socket遍历一遍,低效),然后去拉取数据然后再通知到自己的队列中监听者回调数据通知,返回一批的selectKey事件告知来数据可以来取了
- 其实在epoll之前,还有select,与poll,select方法比较笨,监听与读写是一体的,而且要遍历socket才知道有哪些socket有数据,poll稍微优化了下,2002年诞生的epoll目前基本都在用,比如redis、netty、kafka,底层都是epoll。这里需要注意的是Java的API总是在说selector,但是底层是epoll不是select,此select非彼select,不要混淆。
- epoll也有2种触发模式,水平触发与边缘出发,边缘就是快到了就通知准备,JDK提供的NIO是采用水平出发方式,在netty中就提供了底层的边缘出发,效果更好,在netty中就是EventLoop是Epoll开头的那些类
理解IO模型
再来看看操作系统的IO模型吧
- nio刚才讲了其实是非阻塞同步,那与操作系统打交道,操作系统到底提供了几种,答案是5种,4种同步,1种异步
- 同步阻塞、同步非阻塞、多路复用、?、信号异步
- 其中多路复用是同步非阻塞的升级,同时监听多个Socket,也就是刚才说的epoll的模式,而信号异步就是aio,同步阻塞就是那个bio了,其他2种都是鸡肋
说白了,其实JDK中关于IO它自己还真没怎么去搞什么优化,大部分还就是包装操作系统的方法而已,要想学好IO,包括文件IO、网络IO,那还是学好操作系统才能完全搞明白
Channel与ByteBuf
接下来还要给大家介绍2个概念
1个是channel、1是个bytebuf对象,有时候是不是困惑这个堆外内存的情况?因为它不属于JVM堆管理的范畴,是直接操作系统中的内存,那为啥高性能的框架组件总喜欢直接操作操作系统的内存呢?这就涉及到进程与系统之间IO交互的问题了,一步步说明吧
- 操作系统与进程的内存是分开的2部分,一部分是内核态,一部分是用户态,所谓用户态就是每次分配一个进程的时候给你开辟一块进程的内存空间让你自己去玩,比如Java进程就开辟一块JVM用去分了栈私有空间与堆私有空间,这个就是Java的内存模型JMM,这都是一个进程自个儿定义的,怎么玩都没关系,而IO就不一样了,如果进程想去碰一个文件,或者一个网络上的传输的数据,那就要通过磁盘,然后到内存,内存还是先在内核内存,然后要copy到用户进程内存中才能使用,这个过程好处就是因为每个进程内存模型自己玩的,一旦数据跑到用户态内存中,比如Java中就变为堆上的对象了,可以用GC回收处理了,不用管太多,用户操作也简单,但是效率低,至少要copy一次,所以就诞生了直接去内核态里去读取内存与分配内存的骚操作,比如文件写入的缓存直接写到内存分配的内存中,或者读取直接读取内核内存中的数据,这样就减少了copy的过程,这就是大家一直在讲的零拷贝优化了。但是有个缺点,就是内核中的内存使用就要按操作系统的内存玩法去玩(一会提到指针问题),而且稍有不慎可能就堆外内存溢出了,所以一般提供的API都有一些限制,在Java中,这个东西就放在ByteBuffer对象中,可以通过ByteBuffer#allocateDirect分配,JVM还会限制堆外内存分配总大小,Unsafe#allocateMemory也是直接可以分配的,不过这东西更底层,Unsafe只能用特殊方法调用(我其他博客中有讲)
- 不过这个ByteBuffer操作是真的挺难用的,因为总要控制几个指针,一个是read,一个是write,还有一个总共的大小,还要各种重置清空的方法,read<write<总大小,这样来控制你写入多少读取多少,如果读过头了其实是错误的数据,JDK中也不允许这么玩,而且JDK还会保持一个引用好进行一定程度的回收,总之这个ByteBuffer就是可以用于操作系统内存了,其实也可以分配的堆中,相当于2个实现。这个回收相对而言netty做的就比较贴心,有个兜底的方式给你回收掉,后面会提
- 读取文件还有更多骚操作,比如直接把文件映射为内存映射,就是虚拟内存,在程序中读取虚拟内存加快速度
- Channel的理解:操作系统层面有channel,是为了避免CPU直接与IO打交道,直接开辟的内存读取的方式形成一个Channel通道,Java中的FileChannel,还有Nio中的网络Channel都是这个东西
所以要想真的让NIO更快,那必须用channel+对外内存ByteBuffer来处理,这个netty都是有的
PS:关于文件句柄限制、内存区域与优化的东西还有一些问题,netty章节会讲
Netty
大约3年前我还一直简单的认为netty只是对JavaNIO太难用的封装+个reactor模型而已,后来看了下源码与资料,发现果真如此(说好的转折呢?),不过netty对性能细节的把握真的是独到的,而且API还是比较灵活易用的
核心组件
- NioEventLoop/xxxEventLoop:事件循环,包含一个group里的公用线程池、selector引用
- Channel:是对socket请求的抽象概念,netty中的channel又对javanio的channel包装了一层概念
- Pipeline:可以理解为责任链中的chain的作用,里面可以添加ChannelHandler
- ChannelHandler:就是责任链中的比如filter的实现,来处理read数据与write数据
- ByteBuf:对JDK的ByteBuffer的封装,带有自动回收与复用的性能神器
程序流程
总结流程
Netty的整体流程可以分为服务端与请求处理端2部分
其中在执行服务端启动的时候,bind方法时就把服务端的EventLoopGroup,对应的Channel与pipeline触发完毕,调用服务端eventloop.execute方法启动执行监听
在bind中的init的时候也会去调用请求处理端Work的eventloop的execute方法启动监听
服务端eventloop在监听到selector有请求的时候会请求到请求处理端Work的eventloop中经过pipeline处理
监听请求:EventLoopGroup持有多个NioEventLoop,会无限循环找任务并监听端口的selector的请求
- 通过路由器chooser#next方法依次轮训选择,多个EventLoop共用一个ThreadPerTaskExecutor线程池,每个EventLoop并持有一个优化后的selector对象
- 在channel执行eventloop.execute的时候,内部会持有thread的引用,在非当前thread执行时封装task丢入到queue中,这样EventLoopGroup在执行的时候会先检查queue来执行,然后才去selector任务
发现请求,转换工作:当检测到有请求到来时会受到SelectKey,然后调用processSelectKey方法,内部用unsafe读取,最大读取16个channel,并且逐个包装为Netty的NioSocketChannel对象,workEventLoopGroup会去注册这个channel,然后读取byte数组遍历使用pipeline的fireChannelRead传递byte数据
业务处理:work的pipeline通过head、ServerBootstrapAcceptor、tail的调用完成请求处理,其中ServerBootstrapAcceptor包含了用户的ChannelHandler经过,每个ChannelHandler都持有ChannelHandlerContext来操作传播
- 读请求正向,写请求逆向,异常请求正向
Boss流程
bind接口
服务端启动主要流程就在bind接口中
- initAndRegister初始化:
- newChannel:通过用户传入的Channel,如NioServerSocketChannel反射实例
- 构造方法:初始化pipeline、config、unsafe,channel对象设置非阻塞configureBlocking(false)
- init:对channel设置options、attr、pipeline最后添加一个channelInitializer
- pipeline添加ServerHandler
- channel的eventloop.execute一个函数,pipeline最后添加ServerBootstrapAcceptor对象,传入childOptions,还要用户定义的childHandler(channelInitializer对象)
- register:调用group(childGroup,就是最初传入的NioEventLoop)的register这个channel对象
- 把channel中的eventloop=这个NioEventLoop
- NioChannel实现中就是启动Java的selector对象注册0兴趣,不关心任何事件,只是做个selector与channel的绑定,传递了this用于java selector回调channel方法用
- 然后触发2个回调,handlerRegisted、handlerAdded2个方法(要ServerHandler实现ChannelInboundHandlerAdapter),这里由于还没actived,所以不会触发激活
- newChannel:通过用户传入的Channel,如NioServerSocketChannel反射实例
- doBind方法
- doBind变为actived
- NioServerSocketChannel实现
- eventloop.execute添加一个bind绑定端口的任务
- NioServerSocketChannel实现
- pipeline的fireChannelActive
- 通知激活后,还会触发read绑定selector的操作,从0变为read
- doBind变为actived
Work流程
work的主要流程开始就是在服务端接收到请求时对work分配工作时操作的,关注pipeline
Pipeline
NioEventLoop中有processSelectKey方法
- 用unsafe读取,while循环只要继续读,内部通过JavaNio获取JavaChannel个数,读取while条件中最大读取16条,如果中途读不到数据也会break跳出,读取的Channel会包装为Netty的NioSocketChannel,内部会做一些事情
- 设置非阻塞,创建unsafe、id、pipeline
- parent就是通过那个通过反射创建的channel
- setTcpNoDelay(true):如果是false tcp会将小数据包转换为大数据包才发送,不过netty想尽可能快的让收到数据,所以禁止了,默认非android的默认都是禁止的
- 读取数据后,遍历readByte,然后调用pipeline的fireChannelRead方法传递byte数据
- pipeline调用顺序:head->ServerBootstrapAcceptor->tail,channel创建是诞生的pipeline
- head与tail是创建pipeline的时候就有了,tail是outbound处理,如果异常没处理,信息没处理这个就是兜底收尾的事情,head是个inbound,写都是用unsafe操作
- ChannelHandlerContext中具备属性存储,读事件传播,写事件传播的3个接口实现
- 在fireChannelRead方法中,用户编写时使用chilidHandler的ChannelInitializer的ch.pipeline去添加ChannelHandler实现,添加完后会把ChannelInitializer自己删除,用户自己设置的options与attrs都会设置到channel的config对象中(也是用户侧设置的),然后workgroup去注册这个Channel,注册方法内部会调用next函数找到一个NioEventLoop去执行,最终注册调用的是底层javanio的selector的注册0感兴趣事件,然后后面就是开始调用pipeline的head节点传播了,head会去注册读事件(这个逻辑与服务端启动的逻辑相似,dobind方法中在监听端口后也会触发一次读请求)
- 会先判断是否重复添加,添加是基于双向链表尾部操作,删除要先找到节点然后删除,添加删除都有回调
- ChannelHandler分为Outbound与Inbound,还有对应的adapter实现
- inbound是next正向传播到tail(tail也是in),如果buffer会自动释放,考虑周全,而outbound是逆向传播到head(head是out),而异常传播是inboundoutbound都是正向传播
- context.channel.write是从头传播(更常用),而直接context.write是从当前节点传播
公共部分
Channel
- AbstractChannel:持有unsafe、Pipeline、channel
- AbstractNioChannel:持有selectKey、selector
- AbstractNioByteChannel持有NioByteUnsafe,客户端的,监听read
- NioSocketChannel持有NioSocketChannelConfig
- AbstractNioMessageChannel持有NioMessageUnsafe,服务端的,监听accept
- NioServerSocketChannel持有NioServerSocketChannelConfig
- AbstractNioByteChannel持有NioByteUnsafe,客户端的,监听read
- AbstractNioChannel:持有selectKey、selector
客户端 服务端 都继承 说明都是基于selector
- 不过监听事件不一样,服务端监听accept、客户端监听read
- 还一个对应Unsafe不一样,主要是读写抽象不一样,客户端读数据byte,服务端读连接
EventLoopGroup
EventLoopGroup,默认不设置线程个数是0,会用2倍CPU线程数
- new 一个线程执行器ThreadPerTaskExecutor,实现Executor接口实现的线程池,factory里线程池开头小写命名线程名,创建的Thread是FastThreadLocalThread,这个Thread继承Thread,重写ThreadLocal为数组的实现,性能优化点之一
- 按线程个数初始化线程child
- 在NioEventLoopGroup中的newChild会new NioEventLoop,触发构造函数逻辑
- provider,openSelector产生selector引用持有,一个selector对应一个NioEventLoop
- 通过Class.forName传入System类加载器+sun.nio.ch.
- SelectorImpl获取Class对象,然后针对selector对象中的
Set<SelectKey>
默认的HashSet替换为netty自己的实现,用数组替代时间复杂度从On降到O1
- TaskQueue:newTaskQueue:mpscQueue,如果不是当前线程的任务会塞在队列里,代表外部线程用的任务队列
- provider,openSelector产生selector引用持有,一个selector对应一个NioEventLoop
- 在NioEventLoopGroup中的newChild会new NioEventLoop,触发构造函数逻辑
- chooser选择器构建,传入全部NioEventLoop对象
- chooser会调用NioEventLoopGroup#next:NioEventLoop[] ,每一个新连接是从0到N依次去绑定任务,最终从多个里面返回一个NioEventLoop
- 特定优化:如果是2的幂次方性能优化的策略:每次next时自增id+1,然后要选择EventExecutor[]数组(NioEventLoop的父类)中的一个要取模,2的倍数直接用&(长度-1)操作,可以二进制过滤出余数,如果是非2个倍数,直接%长度取模,&比%高效
- execute方法
- EventLoopGroup中的executer线程池会调用execute,内部run方法,如果是第一次这个channel会保存这个thread的引用,如果是第二次则会判断执行是否是当前线程,如果不是当前线程就封装为一个task放到queue串行执行
- 方法执行一个SingleThreadEventExecutor.run
- 这个run里会无限循环,然后去调用selector对象找SelectKeys数据IO数据,每次会统计select的时间
- 先检查任务按照截止时间排队是否到期,没有检查是否有队列任务,没有就执行真正的selector操作processSelectKey
- 阻塞1s获取,如果selectKeys不为空、被唤醒、有任务了,就终止select操作
- java的niobug就是不阻塞 然后就不停的轮训导致cpu100%。这里判断是否执行时间大于阻塞时间,如果小于阻塞时间说明 select 没有阻塞直接就返回了,超过512次就重新建立一个selector把所有事件转移到新的selector 这样可能就好了
- 处理异步队列中的任务
- 除了普通任务,还有定时任务,scheduled,有个PriorityQueue优先级队列,按照截止时间来排序
- 因为queue不是线程安全的,如果是在事件循环中就直接queue.add,如果不是就execute一个run把queue.add添加进去来保证queue的线程安全(netty如何保证异步串行无锁化?)
- 触发会把到期的定时任务放到taskQueue里面
- 无限循环,任务执行,nanoTime也是耗时操作,所以每执行64次,才去计算时间如果没有结束
- 方法执行一个SingleThreadEventExecutor.run
- EventLoopGroup中的executer线程池会调用execute,内部run方法,如果是第一次这个channel会保存这个thread的引用,如果是第二次则会判断执行是否是当前线程,如果不是当前线程就封装为一个task放到queue串行执行
内存分配
read只能读取写入的数据,capacity是空间最大值
Unsafe可以直接拿到对象内存地址,可以直接内存读写,非Unsafe是直接拿一个数组的一个索引值,Unsafe 这个是根据jdk来自动判断的,看是否能强行获取JDK内部的Unsafe来来操作,使用者不用关心。Unsafe的读写数据更快
Heap不需要手工释放在堆,Direct需要手工释放
另外2个维度是用实现来区分 Pool与Unpool,区别就是PooledByteBufAllcator,内部有PoolThreadCache,可以在Area上分配
- 提供了内存分配管理器ByteBufAllcator,有堆、直接内存分配的方法,主要区分 堆内与对外的接口
- tiny、small、normal3个不同大小的bytebuf 缓存多少个,每个thread到时候分配是直接拿走用,不用用的时候才去分配,这样更快
- 每个对象中持有一个handler用于回收,如果缓存没分配成功会直接分配
内存规格:比较复杂,最终得到效果:分配内存规格然后缓存,防止分配耗费性能
- tiny(0-512b)、small(512b-8k:512、1k、2k、4k)、normal(8k-16M:8k、16k、32k)、huge(超过16M) 就是这4个区间
- 操作系统规定16M要去申请一个Chunk,所有的内存申请向系统的单位是Chunk,分配1M也要先申请一个Chunk,然后再在Chunk里取一个
- 8K是一个Page ,16M有点大,拆分就用Page来取,2^11次方2048个Page,所以一个Chunk包含2048个page,一个page是8k,16k就是2个page
- subpage就是0-8k之间 还很大,netty定义的small、tiny、normal,而huge是直接走的非缓存的分配,每种类型的每个一个小规格都有一个queue存储
- Chunk链表,netty会分析使用情况 组成chunklist用于分配,分析使用率,还是用到了ThreadLocal隔离
解码
netty是基于TCP层之上的组件,这个与socket一样,客户端监听服务端IP+端口,而服务端需要绑定端口
所以接收到的数据是一个byte[]通过协议转换为报文内容,然后再处理,最终再转换为报文,转换为byte[]的过程,这个过程其实就像生产处理流水线一样,所以是放在Pipeline中的一个个ChannelHandler来处理的
ChannelHandler默认提供了一堆编解码的基础能力实现,比如
- 基于byte长度分割字符
- 基于\r\n或者\n的行分割字符
- 基于分隔符号,比如逗号、分号这种分隔符处理器
- 还有基于数据域的方式,先解析接下来数据长度,然后读取一段,再遇到一个长度数字再解析一段,里面有不少可以调节的参数
整体上这些处理器都具备一定容错能力,主要是比如一直读取不到(可以设置上限)分隔符就会抛弃到下一个分隔符的一段数据,实现层面是通过一个byte累加器来实现,在他们父类中
虽然整个业务逻辑都是ChannelHandler,但是也要区分下职责,比如有只负责读的,有只负责写的,也有读写都行的,那就是分别是inbound与outbound2个接口,inboud对应read的相关方法,outbound对应write的一些方法
write操作
- 如果直接是堆外就返回,否则就把内容 写到 堆外内存中的转换,写byte操作几个指针,一部分flush了,还有没有flush的,如果积累的没flush的数据超过64k就自旋锁+CAS阻塞不可写状态。每次flush就size-1,如果小于32k就设置为可写的状态,head会保证数据都塞到堆外内存去,最终数据转为JDK原生对象写入
性能工具
这2个工具类 也可以单独去用
- FastThreadLocal是自己维护了一个map,继承Thread,内部的ThreadLocalMap底层用数组实现提高效率,自动扩容
- 对象池Recycler,基于fastThreadlocal,bytebuf就是用这种方式回收,ThreadLocal中获取stack,然后pop,是个handler,如果handler没有会创建,里面包含value还有stack的引用,可以同线程 回收,也可以另一个线程会回收对象
- 另一个零copy的点:CompositeByteBuf,用addComponent取代ByteBuf#writeByte,可以把2个内存直接当连续的读取,这样减少的内存copy的代价,内部实现也是先找到组件,然后再找空间
实践
百万并发调优
模拟百万:用8000-8100端口,一个端口顶多6w请求,1024-65535 ,扣除常用端口,实际只有6w左右连接,单机也就6w,但是多客户端 连接 太费机器了,所以用多接口方案
一个文件一个句柄,突破
ulimit -n
查看句柄/etc/security/limits.conf
1
2* hard nofile 1000000
* soft nofile 1000000*代表当前用户,hard是真正限制,soft是警告限制,nofile表示能打开的最大连接数
任何用户最终能打开100w文件需要重启
一个系统的限制需要突破
cat /proc/sys/fs/file-max
/etc/sysctl.conf
添加一行1
fs.file-max=1000000
sysctl -p /etc/sysctl.conf
生效
cpu可能会标满
线程数可以调优
- 任务线程里自己搞个线程池把耗时代码隔离
- 在pipeline.addLast中可以添加businessGroup新建一个,整个Handler逻辑隔离
- 线程数要不停的尝试才能达到最优
技巧
- telnet 127.0.0.1 8888 连接netty服务端,然后输入内容测试服务端