Dubbo源码解析——异步支持之Provider端
前言
继上篇Consumer
端的异步,我们再看看Provider
端如何异步。先看个例子:
1 | <bean id="asyncService" class="com.alibaba.dubbo.samples.async.impl.AsyncServiceImpl"/> |
注意这里async=true
表明这是一个开启P端异步的服务。P端的代码:
1 | public class AsyncServiceImpl implements AsyncService { |
这里的用法,如果使用过Servlet 3.0
的同学应该会比较熟悉,用法很类似。注意这里我们在P端新开了一个线程,如果直接用RpcContext.getContext
是拿不到原来的上下文的,需要asyncContext.signalContextSwitch
一下,把调用的上下文切换到新线程里。关于最后的return
,其实这里return什么都无所谓的,因为最终内容会使用asyncContext.write
写出去的内容。
代码
看完了使用我们分析一下P端异步的实现方式。我们在XML中给P的Service
打了一个async=true
,这个表示可以开启异步。
我们看下DubboProtocol
中的内部类,其中有一个requestHandler
,这个类是真正处理之后把值返回的地方,看下这个类的reply
方法:
1 | if (message instanceof Invocation) { |
这里我们要看下AbstractProxyInvoker#invoke
方法了,P端的ServiceImpl
其实也是代理,是AbstractProxyInvoker
的实现类,里面有这样一段代码:
1 | RpcContext rpcContext = RpcContext.getContext(); |
这里我们看到,如果开启异步,真正的执行的结果会是一个AsyncRpcResult
,这个AsyncRpcResult
包含了一个Future
,我们回到reply
方法中,可以看到,这个Future
是一个CompletableFuture<Object> future = new CompletableFuture<>();
。
那好了,总结上面的几步,就是开启异步的时候,我们会在AsyncContextImpl
里面初始化一个CompletableFuture
,真正执行doInvoke
之后,会把这个CompletableFuture
包裹在AsyncRpcResult
中返回。
看下AsyncRpcResult
的构造函数:
1 | // 在我们开启服务端异步的情况下,三个入参的情况: |
解释一下,入参第一个Future
,就是我们AsyncContextImpl
中的那个Future
,在我们的P端执行asyncContext.write
时,就会触发它的whenComplete
回调。这里其实就是把我们P端服务实现的值传递给一个rFuture
。
这里其实就是把我们的AsyncContextImpl
中的Future
(又可以称为值Future
表示维护的是一个值,即服务端执行结果,比如一个String)和resultFuture
(可以称为ResultFuture
,即Dubbo一次Rpc执行的Result
,这里是一个RpcResult
)关联起来,具体的联系就是:当值Future
执行完成时(P端调用asyncContext.write
时),调用ResultFuture
的complete
,并且把write
进去的值包装城RpcResult
传递给ResultFuture
。
这样我们的关系就理顺了,这里其实还是有点绕的,原因还是由于Dubbo自身的设计问题(写出去的都必须是Result
)。P端异步的流程总结一下就是:
- P端开启异步,这里会设置一个
值Future
在AsyncContextImpl
里,用来监听P端执行的结果。 - 这个
值Future
同样被设置到一个新创建的AsyncRpcResult
中,并且同时在AsyncRpcResult
中初始化了一个ResultFuture
。 - 给
值Future
设置回调,当值Future
完成的时候,就把这个值包装成一个RpcResult
,并且执行ResultFuture
的complete
方法,把ResultFuture
给结束掉。
顺着我们的思路,那是不是说ResultFuture
应该有一个回调(whenComplete
函数),当ResultFuture
结束的时候,就把Result
写回到C端呢?
继续看回到dubbo执行完成之后的部分,及reply
返回给上级之后干了什么,这里看下HeaderExchangeHandler#handleRequest
:
1 | // 这个future是AsyncRpcResult中的ResultFuture |
果然就像我们说的那样,我们handler.reply(channel, msg)
会返回一个CompletableFuture
,这个CompletableFuture
结束时就把内容写回C端。
而这个CompletableFuture
,我们看回到AsyncRpcResult
中,就是入参的第二个参数,是被new出来的CompletableFuture
。这个CompletableFuture
是在AsyncRpcResult
构造函数里的第一个入参(值Future
)执行whenComplete
回调时,被complete
的。
而第一个入参是我们AsyncContextImpl
中的那个Future
,二者是同一个引用。这个AsyncContextImpl
就是我们在P端实现类里执行asyncContext.write
时被complete
的,这样,我们就从正反两个方向明白了整个异步的执行过程。