Dubbo源码解析——异步支持之异步带来的问题
面临的问题
承接我之前的两篇文章,异步虽好,但是也为框架带来了新的复杂性。第一个问题是Filter
问题,第二个问题是上下文问题。第二个问题其实比较容易理解,我们在Invoke
的时候,有一个RpcContext
,我们以P端异步为例,这时候Invoke
里开了一个新线程去执行真正的业务逻辑,这时候我们就无法从新线程里获取RpcContext
了(ThreadLocal
不一样了)。
第一个问题比第二个复杂一点,解决方式也复杂一点点。我举个例子,P端有一些Filter
,比如ExceptionFilter
。Filter
的invoke
逻辑是这样的:
1 | public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { |
可以在执行前和后分别做一些事情,跟Spring的AOP有点像。ExceptionFilter
的作用记录invoke
结束之后的Result
中的异常。问题来了:如果是P端异步,invoker.invoke(invocation)
这步会立刻返回一个AsyncRpcResult
(为什么会返回这个类型的Result
在P端异步支持中说了,这里不细说),但是这个AsyncRpcResult
中是没有结果的,因为可能业务还没执行结束,那么这个时候很明显也没有异常需要记录,如果这时候过滤器就执行结束,那么很明显,有些功能就缺失了。
Filter失效问题代码分析
我们先看第一种,过滤器失效问题。
这里Dubbo引入了一种新的接口:PostProcessFilter
,里面也只有一个方法:Result postProcessResult(Result result, Invoker<?> invoker, Invocation invocation);
。
实现这个接口的类有一个AbstractPostProcessFilter
,主要的逻辑就是在这里实现的,我们看下这个类的postProcessResult
方法:
1 | if (result instanceof AsyncRpcResult) { |
如果结果是同步的,就直接执行doPostProcess
,如果是异步的,就给asyncResult
增加一个回调(asyncResult.thenApplyWithContext
),回调中执行doPostProcess
。继续看下这个asyncResult.thenApplyWithContext
:
1 | public void thenApplyWithContext(Function<Result, Result> fn) { |
这里这里把resultFuture
也就是ResultFuture
替换成了新的Future
,并且增加了回调,当resultFuture
结束的时候,会执行我们传入的Function
(传入的Function
主要是执行doPostProcess
方法)和两个上下文切换的Function
。
上下文切换的Function
我们一会再说。这里其实主要目的就是一个:在AsyncRpcResult
结束的时候,执行doPostProcess
方法。
我们再看看ExceptionFilter
的invoke
:
1 | // 这里跟源码有出入,为了看着更清晰。源码是把两句改成了一句,及没有局部变量r |
这里就是把invoke
的结果传递给postProcessResult
,而根据我们刚说的,postProcessResult
就是给AsyncRpcResult
增加一个回调,让这个异步Result结束以后调用doPostProcess
。那么我们可以看一下ExceptionFilter
的doPostProcess
方法,其实里面就是执行了记录日志的逻辑。
至此我们就明白,Filter
失效问题的解决方案,给AsyncRpcResult
增加回调(最终这个回调会被加到我们上一篇说的ResultFuture
方法上),在回调中执行doPostProcess
方法,doPostProcess
方法就是我们希望在执行结束后做的事情,这里的执行结束是指的真正的业务逻辑结束。
既然原理都清楚了,那么大家就清楚了:如果要扩展一个Dubbo的过滤器,还要能够处理异步的结果该怎么办了吧——继承一下AbstractPostProcessFilter
即可(参考ExceptionFilter
)
RpcContext失效问题代码分析
再看RpcContext
。Dubbo调用其实是Dubbo线程
,用户在P端开启异步实际上是用户线程
,我们用D线程
代表Dubbo线程
,U线程
代表用户线程
。
这里其实解决思路比较简单,D线程里的RpcContext
,我们取出来放到U线程
里就可以解决这个问题了。
看看Dubbo的解决方案,先看AsyncContextImpl
的构造函数,这个AsyncContextImpl
我们在P端异步的时候说过,如果xml里配置<dubbo:service async=true>
,C端调用这个服务的时候,就会提前在RpcContext
里初始化一个AsyncContextImpl
:
1 | public AsyncContextImpl(CompletableFuture<Object> future) { |
除了我们上篇说的设置Future
,还初始化了两个RpcContext
。注意,AsyncContextImpl
是在D线程里被初始化的,这时候RpcContext
还是生效的!
然后我们在使用的时候,可以在新线程里执行AsyncContext#signalContextSwitch
方法,切换上下文:
1 | public void signalContextSwitch() { |
这里storedContext
就是D线程中,那个有各种值的Context
,在U线程中执行,就会把当前线程的Context设置成D线程中的那个。这样,我们在新线程里就可以愉快的使用RpcContext
了。
同样可以在AsyncRpcResult
中发现类似的操作,这里就不展开说了,大家可以看看,其实是一样的套路,只不过改成用回调的方式设置,具体可以看看AsyncRpcResult#thenApplyWithContext
方法。
后记
这个后记我想了半天还是决定写一写,这个部分比较复杂,我也不知道自己能不能说明白。
我们先看回AsyncRpcResult
的构造方法,里面有这么一句:
1 | if (registerCallback) { |
可以注意到我留了一句注释在那里,这句的注释意思是,我们必须使用rFuture.complete
,而不能使用resultFuture
,因为在构建filter
链的时候,resultFuture
会变化(变成其他实例),但是由于rFuture
由于有闭包的存在,所以不会变化。
在哪里变化的呢,在这里AsyncRpcResult#thenApplyWithContext
。这个方法我们之前说过了,是用来注册filter
的回调用的。这里有几个问题:
- 为什么
resultFuture
会变化? - 就算
resultFuture
变化,又会有什么问题?
这里我们可以做个试验,我这里就展示代码了,completableFuture.thenApply
会返回一个新的completableFuture
,这样我们在this.resultFuture = resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));
就会导致resultFuture
不断变为新的实例。
第一个问题解决了,再看第二个。如果resultFuture
变化,使用resultFuture.complete
会有什么问题。答案是会导致回调失败。假设resultFuture
注册了个Filter
的回调:F1
。
还没开始注册的时候,resultFuture = rFuture
,注册了第一个F1
,resultFuture
就不再等于rFuture
了,而是(F1-Future(rFuture))
(F1-Future
表示注册F1返回的新的future
),这里举个例子:
1 | public static void main(String[] args) { |
也就是说,我们的回调不在起作用。那么怎么才能让它起作用呢?很简单,用老的cf1
去触发回调:
1 | public static void main(String[] args) { |
看到了吧,我们的resultFuture
在变化,如果不适用rFuture
而使用resultFuture
,就会导致我们的结果complete
之后,其他Filter
注册的回调也不生效。
那为什么rFuture
就可以呢,因为rFuture
在方法结束以后就不会变化了,(F1-Future(rFuture))
这种模式下,我们可以认为必须从内部往外传递才能出发Filter
注册的回调,这时候使用rFuture
就可以触发所有的回调了。
那么又引出一个新问题,为啥thenApplyWithContext
要这么写:this.resultFuture = resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));
,我们直接resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));
这么写是不是问题就解决了呢?我们依然可以触发所有Filter注册的回调,而且还可以使用resultFuture.complete
,而不用关注rFuture
和resultFuture
谁变了谁没变,多好。
答案是不行。
这里涉及到Dubbo的Filter
的Order
问题。什么意思,如果使用resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));
,假设我注册了三个Filter
,F1
,F2
和F3
,那么会变成当resultFuture
完成的时候,直接调用F1
,F2
和F3
,而且是并行的。
如果使用this.resultFuture = resultFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));
,会变成先执行F1
的回调,当F1
注册的回调结束之后再执行F2
,再F3
。
区别一看就知道了,很明显我们应该让F1
、F2
和F3
按照顺序执行,因为有些过滤器的执行优先级是非常高的而有些很低,如果我们并行执行很明显就打乱了回调的顺序。
我纠结这个问题纠结了大概有三四个周了,最近才完全搞明白。希望这里写的不会太乱。
再后记
这里吐个槽,我们试图改动框架代码的时候,至少要先尽可能理解框架为什么这么做。最近成为Committer
之后,理所当然的解决各种issue
,有很多人完全不理解为什么Dubbo的代码为啥要这么写,就直接吐槽,什么我觉得你们不严谨,我觉得你们这个代码不能这么写,我觉得这个那个。然后我一看他的github
,还是个初学者,非常固执,根本听不进去意见。
这种都是无意义的issue
,我并不是一竿子掀翻一船人,说什么初学者就不行,不要发表自己的意见。但是至少要充分评估自己的问题的正确性,凡事需谨慎,没实力的时候不要那么横。