Dubbo领域模型、调用链及调用方式

微服务框架(十八)

Posted by Tillend on January 29, 2019

  此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。

  本文为Dubbo领域模型、调用链及调用方式

本系列文章中所使用的框架版本为Spring Boot 2.0.3-RELEASE,Spring 5.0.7-RELEASE,Dubbo 2.6.2。

Dubbo领域模型

在 Dubbo 的核心领域模型中:

  • Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。
  • Invoker是实体域,它是Dubbo的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
  • Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。

Dubbo调用链

Dubbo协议请求数据发送全调用链

  1. 由实体域(Invoker)执行会话域(Invocation)的集群实现
  2. 拦截器(Filter)链的拦截过滤
  3. 监听器(Listener)的监听指向
  4. Dubbo调用(DubboInvoker)的实现
  5. 协议头交互客户端(HeaderExchangeClient)
  6. Netty通信

Dubbo协议请求数据发送全调用链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)    //Dubbo同异步调用
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)	//协议头交互Client
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)   //Netty通信
                              —> NioClientSocketChannel#write(Object)

DubboInvoker

Dubbo 的底层IO操作都是异步的。Consumer端发起调用后,得到一个Future对象。 对于同步调用,业务线程通过Future#get(timeout),阻塞等待Provider端将结果返回;timeout则是Consumer端定义的超时时间。

  • RpcInvocation设置相关参数pathversion
  • 获取ExchangeClient
  • 同步/异步调用模式
    1. 同步调用模式:框架获得DefaultFuture对象后,会立即调用get方法进行等待
    2. 异步调用模式:将该对象封装到FutureAdapter实例中,并将FutureAdapter实例设置到RpcContext中,供用户使用

FutureAdapter 是一个适配器,用于将Dubbo中的ResponseFutureJDK 中的Future进行适配。

DubboInvoker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class DubboInvoker<T> extends AbstractInvoker<T> {
    
    private final ExchangeClient[] clients;
    
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 设置 path 和 version 到 attachment 中
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            // 从 clients 数组中获取 ExchangeClient
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 获取异步配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // isOneway 为 true,表示“单向”通信
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 异步无返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 发送请求
                currentClient.send(inv, isSent);
                // 设置上下文中的 future 字段为 null
                RpcContext.getContext().setFuture(null);
                // 返回一个空的 RpcResult
                return new RpcResult();
            } 

            // 异步有返回值
            else if (isAsync) {
                // 发送请求,并得到一个 ResponseFuture 实例
                ResponseFuture future = currentClient.request(inv, timeout);
                // 设置 future 到上下文中
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                // 暂时返回一个空结果
                return new RpcResult();
            } 

            // 同步调用
            else {
                RpcContext.getContext().setFuture(null);
                // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(..., "Invoke remote method timeout....");
        } catch (RemotingException e) {
            throw new RpcException(..., "Failed to invoke remote method: ...");
        }
    }
    
    // 省略其他方法
}

Dubbo调用方式

Dubbo 缺省协议采用单一长连接,底层实现是 NettyNIO 异步通讯机制;基于这种机制,Dubbo 实现了以下几种调用方式:

  • 同步调用
  • 异步调用
  • 参数回调
  • 事件通知

下述只描述异步调用,其他调用方式详细描述及例子见Dubbo 关于同步/异步调用的几种方式

异步调用

基于Dubbo底层的异步NIO实现异步调用,对于Provider响应时间较长的场景是必须的,它能有效利用Consumer端的资源,相对于Consumer端使用多线程来说开销较小。

Provider 端接口

1
2
3
public interface AsyncService {
    String goodbye(String name);
}

Consumer 配置

1
2
3
<dubbo:reference id="asyncService" interface="com.alibaba.dubbo.samples.async.api.AsyncService">
    <dubbo:method name="goodbye" async="true"/>
</dubbo:reference>

需要异步调用的方法,均需要使用 <dubbo:method/>标签进行描述。

Consumer 端发起调用

1
2
3
4
5
AsyncService service = ...;
String result = service.goodbye("samples");// 这里的返回值为空,请不要使用
Future<String> future = RpcContext.getContext().getFuture();
... // 业务线程可以开始做其他事情
result = future.get(); // 阻塞需要获取异步结果时,也可以使用 get(timeout, unit) 设置超时时间

参考资料:

  1. Dubbo 关于同步/异步调用的几种方式
  2. 服务调用过程
  3. 框架设计