Dubbo源码解析——异步支持之Consumer端
前言
之前版本的异步非常不优雅,需要各种配置、各种坑不说,获取Future
的方式还是需要从Context
里获取,这次的改动在原有的基础上,主要提供了更加便捷的异步使用方式。
目前dubbo已经支持Consumer
端和Provider
端异步。Provider
端的异步是类似Servlet 3.0
的异步方式做的。这篇文章就来解读一下dubbo的Consumer
对异步的实现方式。
对异步的使用,可以参考一下文档:地址
Consumer
这里看个例子,非常简单:
1 | // 接口: |
这里要稍微往前追溯一点了,要追溯到dubbo形成代理的部分,这部分简单点看,入口在AbstractProxyFactory#getProxy
里,会调用子类的getProxy(invoker, interfaces)
,默认实现是Javassist
:
1 | return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); |
这里其实每个方法都是用Invoker
进行执行,但是Invoker
返回的都是Result
,是通过Result.recreate()
方法把Result
中的值吐出来的,这点要记住,后面会提到。
真正执行Invoke
默认还是在DubboInvoker
里。看下doInvoke
:
1 | // 是否是异步的,这里我们在创建代理的时候打上了标记,所以这里是true |
这里我们看到,同步相对于异步,同步就是最后调用了一下ResponseFuture#get()
等待结果,异步就是把ResponseFuture
封装成了FutureAdapter
,然后在外面再包上一层AsyncRpcResult
后返回。
这里我们先不看后面,把目前为止的流程,结合例子捋一遍。我们rpc调用这次返回的本身就是一个CompletableFuture
,并且给这个Future
设置了回调,这个Future
我们之前也强调过了,就是我们创建出来的FutureAdapter
,这个FutureAdapter
被包在AsyncRpcResult
返回了。
问题有几个:
invoke
时候创建FutureAdapter
是怎么被反馈给C端作为实际上的返回值的呢?FutureAdapter
什么时候被调用的complete
方法呢?
第一个问题其实已经有答案了,说回我们在一开始强调过了recreate
方法,这个方法就是返回实际上的值,那么AsyncRpcResult
的recreate
方法是什么样的呢:return valueFuture;
。这个valueFuture
是AsyncRpcResult
的第一个入参,即doInvoke
时候生成的futureAdapter
。这样整个就打通了,流程就是:代理生成的过程中invoker.invoke(invocation).recreate()
,这时候invoke
返回的是AsyncRpcResult
,然后recreate
方法返回我们创建出来的FutureAdapter
作为CompletableFuture
使用。
第二个问题。我们要先了解一下ResponseFuture
的回调机制了。默认实现是DefaultFuture
。C端在收到P端返回的Result
的时候,会调用HeaderExchangeHandler#received
方法,处理之后调用DefaultFuture.received(channel, response);
,从response
中拿到调用id之后,获取对应的DefaultFuture
,然后执行DefaultFuture#doReceived
:
1 | lock.lock(); |
就是把response
赋值以后,如果有回调需要调用则调用回调。
我们猜测一下,C端异步就是通过这个callback
最终回调到用户代码里的那个CompletableFuture
(实际上返回的是futureAdapter
)的。
这里我们要看一下FutureAdapter
的构造函数:
1 |
|
谜团解开。在DubboInvoker
的doInvoke
中,我们创建了一个FutureAdapter
,这个实例将来通过AsyncRpcResult#recreate
返回给用户。这个FutureAdapter
中维护了一个DefaultFuture
。这个DefaultFuture
在接受到P端的返回值的时候,会执行一个回调,回调的时候,会执行维护了这个DefaultFuture
的FutureAdapter
的complete
把远端的值返回回来。这样,用户在代码中设置了的whenComplete
就会被触发并且执行了。
细节
各位看过来了!这里有个细节。今天也是请教了一下阿里的陆龟大神才明白(不得不说在社区里请教各位大神就是方便)。
我们看下doInvoke
中的一段代码:
1 | ResponseFuture future = currentClient.request(inv, timeout); |
第一句执行request
(实际上就把请求发出去了,但是是异步的)。第二句就是之前说的,初始化FutureAdapter
将来返回给用户,初始化的过程中我们也说过了,会设置callback
,将来调用这个FutureAdapter
的complete
。
问题来了,要是currentClient.request
发送出去的数据已经返回,但是FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
还没执行,怎么办?没执行这句,callback
就不会被设置到ResponseFuture
上,自然也不会执行FutureAdapter
的complete
了。这样是不是FutureAdapter
的complete
永远不会被执行?
要么说自己还是经验不够。这种地方其实之前梁飞大大就已经考虑到了。这里取咨询了一下阿里的陆龟,我们看下ResponseFuture#setCallback
方法就知道答案了:
1 | if (isDone()) { |
发现了么,如果设置的时候发现Future
已经结束了,就直接执行一次callback
,否则就加锁设置回调,如果加锁之后,Future
已经结束,同样最终直接执行回调。
这种设计非常值得借鉴。宗旨就是,不管设置回调是在Future
结束前还是结束后,回调都应该被执行。
至此,Consumer
端异步就已经结束了。可以期待下一篇的Provider
端的异步实现了。需要提醒大家的是,一定要多多debug代码,我的文章也只是给大家一个看代码的思路,真正理解和融会贯通,还要不断地debug源码才行。