此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。
本文为Dubbo领域模型、调用链及调用方式
本系列文章中所使用的框架版本为Spring Boot 2.0.3-RELEASE,Spring 5.0.7-RELEASE,Dubbo 2.6.2。
Dubbo领域模型
在 Dubbo 的核心领域模型中:
Protocol
是服务域,它是Invoker
暴露和引用的主功能入口,它负责Invoker
的生命周期管理。Invoker
是实体域,它是Dubbo
的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke
调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。Invocation
是会话域,它持有调用过程中的变量,比如方法名,参数等。
Dubbo调用链
Dubbo协议请求数据发送全调用链
- 由实体域(
Invoker
)执行会话域(Invocation
)的集群实现 - 拦截器(
Filter
)链的拦截过滤 - 监听器(
Listener
)的监听指向 - Dubbo调用(
DubboInvoker
)的实现 - 协议头交互客户端(
HeaderExchangeClient
) - Netty通信
Dubbo协议请求数据发送全调用链
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
proxy0#sayHello(String)
—> InvokerInvocationHandler#invoke(Object, Method, Object[])
—> MockClusterInvoker#invoke(Invocation)
—> AbstractClusterInvoker#invoke(Invocation)
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
—> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
—> ListenerInvokerWrapper#invoke(Invocation)
—> AbstractInvoker#invoke(Invocation)
—> DubboInvoker#doInvoke(Invocation) //Dubbo同异步调用
—> ReferenceCountExchangeClient#request(Object, int)
—> HeaderExchangeClient#request(Object, int) //协议头交互Client
—> HeaderExchangeChannel#request(Object, int)
—> AbstractPeer#send(Object)
—> AbstractClient#send(Object, boolean)
—> NettyChannel#send(Object, boolean) //Netty通信
—> NioClientSocketChannel#write(Object)
DubboInvoker
Dubbo 的底层IO操作都是异步的。
Consumer
端发起调用后,得到一个Future
对象。 对于同步调用,业务线程通过Future#get(timeout)
,阻塞等待Provider
端将结果返回;timeout
则是Consumer
端定义的超时时间。
RpcInvocation
设置相关参数path
和version
- 获取
ExchangeClient
- 同步/异步调用模式
- 同步调用模式:框架获得
DefaultFuture
对象后,会立即调用get
方法进行等待 - 异步调用模式:将该对象封装到
FutureAdapter
实例中,并将FutureAdapter
实例设置到RpcContext
中,供用户使用
- 同步调用模式:框架获得
FutureAdapter
是一个适配器,用于将Dubbo
中的ResponseFuture
与JDK
中的Future
进行适配。
DubboInvoker:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 设置 path 和 version 到 attachment 中
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
// 从 clients 数组中获取 ExchangeClient
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 获取异步配置
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// isOneway 为 true,表示“单向”通信
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 异步无返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 发送请求
currentClient.send(inv, isSent);
// 设置上下文中的 future 字段为 null
RpcContext.getContext().setFuture(null);
// 返回一个空的 RpcResult
return new RpcResult();
}
// 异步有返回值
else if (isAsync) {
// 发送请求,并得到一个 ResponseFuture 实例
ResponseFuture future = currentClient.request(inv, timeout);
// 设置 future 到上下文中
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
// 暂时返回一个空结果
return new RpcResult();
}
// 同步调用
else {
RpcContext.getContext().setFuture(null);
// 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(..., "Invoke remote method timeout....");
} catch (RemotingException e) {
throw new RpcException(..., "Failed to invoke remote method: ...");
}
}
// 省略其他方法
}
Dubbo调用方式
Dubbo 缺省协议采用单一长连接,底层实现是 Netty
的 NIO
异步通讯机制;基于这种机制,Dubbo 实现了以下几种调用方式:
- 同步调用
- 异步调用
- 参数回调
- 事件通知
下述只描述异步调用,其他调用方式详细描述及例子见Dubbo 关于同步/异步调用的几种方式
异步调用
基于Dubbo
底层的异步NIO
实现异步调用,对于Provider
响应时间较长的场景是必须的,它能有效利用Consumer
端的资源,相对于Consumer
端使用多线程来说开销较小。
Provider 端接口
1
2
3
public interface AsyncService {
String goodbye(String name);
}
Consumer 配置
1
2
3
<dubbo:reference id="asyncService" interface="com.alibaba.dubbo.samples.async.api.AsyncService">
<dubbo:method name="goodbye" async="true"/>
</dubbo:reference>
需要异步调用的方法,均需要使用 <dubbo:method/>
标签进行描述。
Consumer 端发起调用
1
2
3
4
5
AsyncService service = ...;
String result = service.goodbye("samples");// 这里的返回值为空,请不要使用
Future<String> future = RpcContext.getContext().getFuture();
... // 业务线程可以开始做其他事情
result = future.get(); // 阻塞需要获取异步结果时,也可以使用 get(timeout, unit) 设置超时时间
参考资料: