Netty源码分析——AUTOREAD
前言
这个是设置在Channel
上的一个属性,主要控制管道的自动读。可能对于很多初学者来说,都对这个Channel
的自动读表示困惑,这篇文章主要来看下这个自动读如何工作以及什么时候关闭自动读(自动读默认开启)。
上代码
这个要追溯到我们的读操作了,我们以服务端为例,服务端先是有个叫Boss的Reacotr
线程,不断的轮训ACCEPT
事件,如果轮训到,就创建一个Channel
并且注册READ
事件,把这个Channel
分配给一个叫Worker
的Reactor
线程。这个逻辑不说了,详细的内容看之前的:Boss和Worker。
分配给Worker
的Channel
,在NIO模式下是NioSocketChannel
,我们看下它的读取操作:
1 | // sth... |
我们讲过之前的大部分步骤,这里看这个pipeline.fireChannelReadComplete
。这里会从HeadContext
节点开始向后传递,看下HeadContext#channelReadComplete
:
1 | ctx.fireChannelReadComplete(); |
这里看这个readIfIsAutoRead
,这里出现了自动读类似的字眼。具体的方法内容也很简单,如果Channel
设置了自动读,就执行channel.read()
。这个方法我们之前也讲过了,最终传递给tail
,再从tail
传递给head
,然后执行unsafe.beginRead()
,最终是给这个NioSocketChannel
注册一个READ
事件。
注册以后,这个Channel就可以继续在一个Worker Reactor
线程里,继续做读操作。
注意,这个管道能够在一次读操作之后继续读的要求,就算我们的主题——AUTOREAD
,关于如何设置自动读,就不细说了。那我们到此就可以推断出这个自动读的作用:在第一次读操作结束之后,是否还进行读操作。这里需要注意,如果这个NioSocketChannel
第一次被注册到Worker
上,就算这个管道被设置为非自动读,也是会进行一次读操作的,换句话说,每个管道都会至少进行一次读(除非客户端就根本没东西写给服务端,那么当然就不会进行读操作)。
作用和使用场景
说完了原理,我们说下什么时候把这个自动读关掉,已经什么时候重新开启。
这个其实是做流控用的。举个例子,我们服务端有个线程池,固定大小500个线程。这时候我们可能有很多的客户端链接,一下子把线程池撑满了,这时候我们可以关掉一部分管道的自动读。
以上场景可能不是非常恰当,我可以在管道中设置一个流控Handler
。这个Handler
每次读到数据的时候,就看看线程池的size,如果超过某个值,我们就关闭这个管道的自动读:channel.config().setAutoRead(false)
。然后继续传递。等到这个管道读结束了,就不会再出发下次读了,这样当然也不会占用我们的线程池了。
这里我们注意一个问题,我们不从Channel
里读数据,并不代表这个Channel
关闭了。我们可以用一个定时任务检测我们的线程池,如果低于某个值,我们调用Channel
的channel.config().setAutoRead(true)
即可开启自动读。
这里可能各位有个问题,之前我们也说了,调用readIfIsAutoRead
(或者说fireChannelReadComplete
)是在一次读结束,但是之前我们设置了禁止自动读,那么自然也没人来执行ctx.fireChannelReadComplete
了,这时候我把Channel
的自动读打开也没用啊,因为没人能触发readIfIsAutoRead
给这个Channel
注册自动读了。
这里还是要追一下代码,看下DefaultChannelConfig#setAutoRead
:
1 | boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1; |
这里我们看到,如果把管道的自动读设置为true的时候,是会主动调用一次channel.read()
来进行READ事件的注册的。
注意
但是使用自动读也要注意一件事情。
自动读如果关闭后,对端发送FIN
的时候,接收端应用层也是感知不到的。这样带来一个后果就是对端发送了FIN
,然后内核将这个socket
的状态变成CLOSE_WAIT
。但是因为应用层感知不到,所以应用层一直没有调用close
。这样的socket
就会长期处于CLOSE_WAIT
状态。特别是一些使用连接池的应用,如果将连接归还给连接池后,一定要记着自动读一定是打开的。不然就会有大量的连接处于CLOSE_WAIT
状态。
说白了就是,如果服务端关闭自动读,但是关闭之后内核中的socket
收到了客户端传来的关闭命令,这时候应用层没有从Channel
中读取数据,自然也就不知道客户端已经要求关闭了。这里要特别注意。
后记
之前在读代码的时候也有个疑问,在AbstractNioByteChannel.NioByteUnsafe#read
方法中是这样进行读操作的:
1 | do{ |
这里分析一下,我截取一部分有用的代码:
1 | // 这里的allocHandle是AdaptiveRecvByteBufAllocator.HandleImpl |
我会分析两个功能的代码:
- 关于
allocHandle.continueReading
如何判断读操作结束。 - 具体解释一下
readPending
这个标识的作用。
先看第一个。allocHandle.reset
操作中,设置了一个maxMessagesPerRead
,这个用来标识一次读取操作最多能读取多少次消息。注意这里的单位是次数不少字节数,我们每次pipeline.fireChannelRead(byteBuf)
前都会增加一次读取的消息的次数。这个值默认是16,具体的设置这个值的链路我就不展开说了,源头在AbstractNioByteChannel#METADATA
这个成员。
另外说一下RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
这里这个allocHandle
默认是AdaptiveRecvByteBufAllocator.HandleImpl
,也就是说默认的Allocator
是AdaptiveRecvByteBufAllocator
,这个源头在:
1 | public DefaultChannelConfig(Channel channel) { |
说完这些我们看下如何确定读操作可以继续。按照上述代码来看,先是重置计数器,然后分配ByteBuf
这两步没有什么特殊的。allocHandle.lastBytesRead(doReadBytes(byteBuf))
继续看到这里,doReadBytes
会返回实际读取的字节数,我们看下实现:
1 | final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); |
这里会记录这次读取试图读取的字节数,也就是ByteBuf
的容量,记录这个有什么用我们后面继续看。
看下lastBytesRead
方法:
1 | if (bytes == attemptedBytesRead()) { |
这里先去判断是否这次读取的字节数等于试图读取的字节数。这句有注释,如果我们读取的字节数正好就是我们试图读的字节数,这时候记录一下,将来调整ByteBuf
的时候,就可以根据之前的记录调整ByteBuf
的大小。
这个解释有点绕,举个生活化的例子:加入有个水桶,这就好比我们的socket
,这个水桶里的水我们都不知道,我们现在用杯子从里面取(这就是读取操作),第一次我们用1000毫升的水杯取,取完水杯里就500毫升,说明水桶里的水就500毫升。那么如果我们如果取了1000毫升出来,说明两种情况:
- 水桶里就1000毫升水,我们一次恰恰好好取完了,水桶空了。
- 水桶里不止1000毫升,我们一次没取完。
这个时候我们怎么办呢,我们一般下次取的时候会换个2000毫升的更大的水杯去取。为什么要这么做,因为对于程序而言,性能更好的方式是每次尽可能多的读取字节,减少读取次数。说到这里我们基本也可以猜到netty判断读操作是否结束的条件之一就是:我们本次实际读的数据量是不是本次试图读的数据量。我们把它bring back到刚才的例子上就是:我们本次用杯子取的水的量是不是就是杯子的容量。
继续看我们读操作:
1 | // 读取的字节数小于等于0,分两种,等于0和小于0 |
这是一种本次读取没读到数据或者EOF了,就结束本次读。否则往下进行,执行allocHandle.incMessagesRead(1);
,增加一次读取次数,我们之前说过,默认情况下最多一次读取操作读取16次数据。那么我们又可以推断出另一个判断读取结束的条件了:已经进行的读数据次数是否超过16(16是默认值)。我们现在看一下continueReading
方法,重头戏,最终会走到这里:
1 | public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { |
maybeMoreDataSupplier.get()
用的是attemptedBytesRead == lastBytesRead;
,我们把条件拆开看:
- 是否配置了自动读,如果没有配置自动读,结束这次读操作,可以看到
autoRead
如果设置为false
,也至少会进行一次读。 respectMaybeMoreData
目前都是true,我们先不看。看maybeMoreDataSupplier.get()
,其实就是attemptedBytesRead == lastBytesRead;
,就是我们刚刚说的,如果试图读取的数据等于上次读取的数据,如果等于,说明我们的水杯(ByteBuf
)在这次取(读)操作中取满了,那么猜测可能还有数据没读完,就继续读。totalMessages < maxMessagePerRead
,之前说过了,默认情况下读取不能超过16次。totalBytesRead > 0
,这个totalBytesRead
表示总共读取的字节数,必须大于0才继续读,等于0表示一共读取的数据都小于等于0,就不读了。
ReadPending
继续看下个细节,关于readPending
标识符的作用。在读结束的finally
里,我们可以看到:
1 | if (!readPending && !config.isAutoRead()) { |
如果没有readPending
并且不需要自动读,就移除读的key,下次select
就不会select
到这个Channel
。
autoRead
上面已经说了,这里主要看readPending
为了解决什么问题。这里要往上看读操作中pipeline.fireChannelReadComplete();
和pipeline.fireChannelRead(byteBuf);
。我们存在一些情况下,需要在channelReadComplete
和channelRead
中操作channel.read()
方法,channel.read()
会给管道增加读的key,让reactor
线程能够取出这个channel
做读操作。那么问题就来了,因为removeReadOp
是在finally
中,所以我们在channelReadComplete
和channelRead
中进行的channel.read()
都将失效。我们为了让它不失效,增加了一个readPending
,标识现在有一个读操作待操作,不能移除读的key。
我们看看什么时候这个readPending
会被设置为true
,调用channel.read()
会最终委托到pipeline
最后传递给HeadContext
,这是老的知识点,最终会进行unsafe.beginRead();
,最后再AbstractNioChannel#doBeginRead
中设置读的key。这个方法里面就会设置readPending = true;
。这样,我们如果在channelReadComplete
中进行channel.read()
,就会被这个readPending
设置为true
,这样在finally
中就不会移除读的key了。
那么我们什么时候设置readPending
为false
呢,是在所有读操作结束的时候。readPending
指的是否有待处理的读操作,所以读完一次数据,我们就设置一次readPending=false
。当然我们shutdown
之类的时候也会设置这个值为false
,这里不展开了,有兴趣的可以去看看代码。