RPC层服务暴露与引用实现

通过前面的章节,我们了解了Dubbo传输层的实现原理。那么后续的章节,我们将介绍一下Dubbo RPC层的代码实现,也就是对应Dubbo源代码的dubbo-rpc模块的代码。首先明确一点,本篇所讲述的服务暴露与引用不包含集群层的逻辑,仅包含RPC层的服务暴露与引用逻辑。

核心接口

在开始这个模块的讲解之前,我们先来看一下这一层的核心接口,也就是这一层对上层服务提供的能力入口。

Protocol

Protocol是RPC层服务暴露和引用的入口,其最重要的方法是:服务暴露与引用,接口定义如下:

@Extension("dubbo")
public interface Protocol {
  
  /**
   * get default port.
   * 
   * @return default port.
   */
  int getDefaultPort();

  /**
   * export. 服务提供者, 绑定本地端口
   * 
   * @param <T>
   * @param invoker
   * @return exporter
   * @throws RpcException
   */
  @Adaptive
  <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

  /**
   * refer. 服务消费端, 连接到服务端
   * 
   * @param <T>
   * @param type
   * @param url
   * @return invoker
   * @throws RpcException
   */
  @Adaptive
  <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

  /**
   * destory.
   */
  void destroy();

}

我们可以看到其中 export() 方法就是服务提供者,在本地启动了一个Server(如果是DubboProtocol) ,最后返回一个Exporter 的暴露引用。另外,refer() 方法是订阅了url指定的服务方法,type参数是制定了订阅服务的接口类型。

Exporter

Exporter是对暴露服务的引用,其中定义了获取服务实现类的Invoker(InvokerWrapper)和下线服务的方法,接口定义如下:

public interface Exporter<T> {
  
  /**
   * get invoker.
   * 
   * @return invoker
   */
  Invoker<T> getInvoker();
  
  /**
   * unexport.
   * 
   * <code>
   *   getInvoker().destroy();
   * </code>
   */
  void unexport();

}

Invoker

Invoker这个接口相对来说还是比较难理解的,其主要原因是Invoker会在Dubbo的各个层中出现,当然也包含客户端和服务端。首先,在客户端PRC层有个Invoker,这个Invoker是客户端的调用桩。具体的实现主要有InjvmInvoker和DubboInvoker,以DubboInvoker为例,这里的意思就是,将客户端对接口的调用转换为网络请求,并返回对应的响应值。其次,Dubbo是客户端负载均衡的,因此在Cluster集群层会有一个Invoker的调用,具体的实现有AvailableClusterInvoker、FailoverClusterInvoker、FailsafeClusterInvoker等。其主要功能是提升客户端重试的能力,提升了服务调用的稳定性。最后,在服务端有一个Invoker,以Dubbo协议传输为例,服务端有个抽象实现InvokerWrapper(注意:这里和InjvmProtocol所使用的InvokerWrapper不同)。这里就是上面Exporter接口返回的Invoker对象,最终的实现有JDKProxyFactory或JavassistProxyFactory。意思就是可以通过Invoker接口的invoker()方法,来达到调用目标服务实现类方法的目的,其实就是一个代理模式。

由此可以看出,Invoker接口承载着多重的含义,其接口定义如下:

public interface Invoker<T> {

  /**
   * get service interface.
   * 
   * @return service interface.
   */
  Class<T> getInterface();

  /**
   * get service url.
   * 
   * @return service url.
   */
  URL getUrl();
  
  /**
   * is available.
   * 
   * @return available.
   */
  boolean isAvailable();

  /**
   * invoke.
   * 
   * @param invocation
   * @return result
   * @throws RpcException
   */
  Result invoke(Invocation invocation) throws RpcException;

  /**
   * destroy.
   */
  void destroy();

}

上面定义了5个接口,可以看出分别提供的能力是:获取服务接口类型,获取服务提供者的URL,服务是否可用,服务调用和服务注销。最终一句话就是:这个接口定义的是服务提供方的接口描述,只不过客户端/服务端的不同层次上的含义略有差异。

Protocol包装器

在讲解服务暴露之前,我们先看下一Protocol接口的实现。在Dubbo SPI的 META-INF/services/com.alibaba.dubbo.rpc.Protocol 这个文件中我们可以看到,除了有具体的Protocol实现。还有2个包装器的实现:ProtocolFilterWrapper和ProtocolListenerWrapper的实现。在讲述Wrapper类之前,我们先看下Dubbo SPI的包装器是什么。当一个接口和其实现都交给Dubbo SPI管理之后,Dubbo SPI 会判断实现类是否有,入参为接口的构造器,如果有的话则为改接口的Wrapper。以ProtocolFilterWrapper为例,我们看下其实现如下,而对于真正的实现类InjvmProtocol或DubboProtocol而言,并没有入参为Protocol的构造器。

public ProtocolFilterWrapper(Protocol protocol){
  if (protocol == null) {
    throw new IllegalArgumentException("protocol == null");
  }
  this.protocol = protocol;
}

了解了Dubbo SPI的构建方式之后,我们可以看到如果出现了多个Wrapper,那个他们之间是怎样的一个包装关系呢?我们先看下META-INF/services/com.alibaba.dubbo.rpc.Protocol的内容如下,起始通常也就是Wrapper在最上面,然后Wrapper的包裹从上到下依次进行,也就是最外层是ProtocolFilterWrapper -> ProtocolListenerWrapper -> InjvmProtocol / DubboProtocol。

com.alibaba.dubbo.rpc.support.ProtocolFilterWrapper
com.alibaba.dubbo.rpc.support.ProtocolListenerWrapper
com.alibaba.dubbo.rpc.injvm.InjvmProtocol
com.alibaba.dubbo.rpc.dubbo.DubboProtocol

最终的包装效果就如下图所示:

ProtocolFilterWrapper

下面我们看下Wrapper到底做了哪些事情,首先看下ProtocolFilterWrapper,其暴露和引用服务的方法如下:

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
    // Dubbo注册中心服务, 不构建Filter链
    return protocol.export(invoker);
  }
  // 构建Filter链
  return protocol.export(buildInvokerChain(invoker, invoker.getUrl().getParameter(Constants.SERVICE_FILTER_KEY), RpcConstants.DEFAULT_SERVICE_FILTERS));
}

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    // Dubbo注册中心服务, 不构建Filter链
    return protocol.refer(type, url);
  }
  // 构建Filter链
  return buildInvokerChain(protocol.refer(type, url), url.getParameter(Constants.REFERENCE_FILTER_KEY), RpcConstants.DEFAULT_REFERENCE_FILTERS);
}

我们可以看到,在调用Protocol的暴露和引用之前,会构建invoker的过滤链,也就是执行Filter接口的多个实现。这个可以类比Serverlet的过滤器链来理解。其buildInvokerChain()的方法实现如下:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String config, List<String> defaults) {
  // 获取Filter的名称
  List<String> names = ConfigUtils.mergeValues(Filter.class, config, defaults);
  Invoker<T> last = invoker;
  if (names.size() > 0) {
    List<Filter> filters = new ArrayList<Filter>(names.size());
    // 通过SPI获取Filter实例
    for (String name : names) {
      filters.add(ExtensionLoader.getExtensionLoader(Filter.class).getExtension(name));
    }
    if (filters.size() > 0) {
      // 倒序遍历, 串联Filter
      for (int i = filters.size() - 1; i >= 0; i --) {
        final Filter filter = filters.get(i);
        final Invoker<T> next = last;
        last = new Invoker<T>() {

          @Override
          public Class<T> getInterface() {
            return invoker.getInterface();
          }

          @Override
          public URL getUrl() {
            return invoker.getUrl();
          }

          @Override
          public boolean isAvailable() {
            return invoker.isAvailable();
          }

          @Override
          public Result invoke(Invocation invocation) throws RpcException {
            return filter.invoke(next, invocation);
          }

          @Override
          public void destroy() {
            invoker.destroy();
          }

          @Override
          public String toString() {
            return invoker.toString();
          }
        };
      }
    }
  }
  return last;
}

上面构建Filter链的代码实现也很简单,首先获取到Filter的名称列表,然后通过SPI获取到Filter实例列表,最后倒序遍历Filter实例,将所有的Filter串联起来。最终形成的效果见下图所示:

ProtocolListenerWrapper

对于ProtocolListenerWrapper,其实现原理和Filter链的构建类似,下面是其服务暴露和引用的包装代码实现。

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
    return protocol.export(invoker);
  }
  return new ListenerExporterWrapper<T>(protocol.export(invoker), 
      buildServiceListeners(invoker.getUrl().getParameter(Constants.EXPORTER_LISTENER_KEY), RpcConstants.DEFAULT_EXPORTER_LISTENERS));
}

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    return protocol.refer(type, url);
  }
  return new ListenerInvokerWrapper<T>(protocol.refer(type, url), 
      buildReferenceListeners(url.getParameter(Constants.INVOKER_LISTENER_KEY), RpcConstants.DEFAULT_INVOKER_LISTENERS));
}

可以看出,监听器不是在服务暴露或者引用的时候触发的动作,而是通过ListenerExporterWrapper和ListenerInvokerWrapper对返回的结果做了一层包装。这2个包装器,分别对export()和refer()的结果做了包装,并增加了ExporterListener和InvokerListener的列表。并且是在Exporter.unexport()和Invoker.destory()被调用时触发的。实际上,Listener在Dubbo是一个很弱的东西,也就是对应的监听器链实现上没有具体有意义的监听器,而Dubbo本身只是预留了这种监听器的设计,在阅读源码时可以暂时忽略监听器的代码。

服务暴露

对于RPC层的服务暴露和引用,下面我们主要围绕着InjvmProtocol和DubboProtocol两个实现来讲解。首先我们先看一下Protocol的继承关系,如下所示。

从上图可以看出,Protocol的子类实现还有一个RegistryProtocol。这个实现是用于Dubbo自身启动服务作为注册中心服务的,这个实现是在dubbo-registry层的,这个我们后续讲解。下面我们先看一下Protocol接口的抽象实现AbstractProtocol。

AbstractProtocol

我们首先看一下AbstractProtocol的核心属性和方法,首先抽象类里面缓存了所有的暴露服务的Exporter Map和引用服务的Invoker Map。然后提供了公共的serviceKey()和destory()方法,代码如下,相对比较简单,此处不做过多的描述。

protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();

protected static String serviceKey(URL url) {
  return serviceKey(url.getPort(), url.getPath(), url.getParameter(Constants.VERSION_KEY), url.getParameter(Constants.GROUP_KEY));
}

@Override
public void destroy() {
  ...
}

InjvmProtocol

InjvmProtocol是相对比较简单的实现,Injvm从字面意义上可以看出,就是在一个JVM内部完成服务的暴露和引用。InjvmProtocol的服务暴露也是比较简单的,也就是直接将需要暴露的Invoker存放到本地缓存中,这个入参的Invoker其实就是InvokerWrapper。代码如下所示:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  return new InjvmExporter<T>(invoker, serviceKey(invoker.getUrl()), exporterMap);
}

上面的缓存直接放在了InjvmExporter的缓存中,代码如下:

class InjvmExporter<T> extends AbstractExporter<T> {

  private final String key;
  private final Map<String, Exporter<?>> exporterMap;

  InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap){
    super(invoker);
    this.key = key;
    this.exporterMap = exporterMap;
    exporterMap.put(key, this);
  }

  public void unexport() {
    super.unexport();
    exporterMap.remove(key);
  }

}

DubboProtocol

DubboProtocol 从字面意思上就可以看出,它是一个比较重要的实现类。下面是其主要属性和暴露服务的方法,serverMap 是一个服务key到服务提供者 ExchangeServer的映射。stubServiceMethodMap 是用于保存客户端回调时所暴露的回调服务的映射,但是实际上这个变量并没有被使用。对于requestHandler,这个内部实现类比较重要(代码示例中的实现已省略),起始也就是通过Invocation找到对应的服务的DubboExporter,然后调用服务并返回结果,当然其中会处理回调这种方式的响应。

private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
// consumer side export a stub service for dispatching event servicekey-stubmethods
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter(){...}

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  URL url = invoker.getUrl().addParameterIfAbsent(Constants.DOWNSTREAM_CODEC_KEY, DubboCodec.NAME);
  // find server.
  String key = url.getAddress();
  //client 也可以暴露一个只有server可以调用的服务。
  boolean isServer = url.getBooleanParameter(RpcConstants.IS_SERVER_KEY, true);
  if (isServer && !serverMap.containsKey(key)) {
    serverMap.put(key, initServer(url));
  }
  // export service.
  key = serviceKey(url);
  DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
  exporterMap.put(key, exporter);

  return exporter;
}

对于export() 这个方法而言,首先会获取到启动服务的相关参数,然后调用initServer()方法启动服务,并保存在serverMap中。最终启动的Server被包装成DubboExporter返回。下面我们简单的看下 initServer()的简化实现如下:

private ExchangeServer initServer(URL url) {
  url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
  ExchangeServer server;
  try {
    server = Exchangers.bind(url, requestHandler);
  } catch (RemotingException e) {
    throw new RpcException(e.getMessage(), e);
  }
  str = url.getParameter(Constants.CLIENT_KEY);
  if (str != null && str.length() > 0) {
    Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
    if (!supportedTypes.contains(str)) {
      throw new RpcException("Unsupported client type: " + str);
    }
  }
  return server;
}

上述代码中,其中最核心的2个点是,制定了通信框架的编解码方式。其次就是调用了 Exchanges.bind(url, requestHandler),启动了一个NettyServer(如通信框架为Netty),并指定了处理网络事件的Handler为requestHandler。

服务引用

服务的引用可以认为是,获取对服务提供者的调用句柄,这里面在Dubbo中指的就是Invoker。下面我们先看下InjvmProtocol的实现方式:

InjvmProtocol

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
  return new InjvmInvoker<T>(serviceType, url, serviceKey(url), exporterMap);
}

这里面引用服务直接返回了一个InjvmInvoker的实例,然后我们看下InjvmInvoker的实现。

class InjvmInvoker<T> extends AbstractInvoker<T> {

  private final String key;
  private final Map<String, Exporter<?>> exporterMap;

  InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap){
    super(type, url);
    this.key = key;
    this.exporterMap = exporterMap;
  }

  public Object doInvoke(Invocation invocation) throws Throwable {
    InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
    Result result;
    try {
      result = exporter.invoke(invocation, NetUtils.LOCALHOST, 0);
    } catch (Throwable e) {
      throw new RpcException(e);
    }
    return result.recreate();
  }
}

从上面代码可以看出,当调用方调用invoke()方法时,抽象父类的invoke()方法会调用doInvoke()方法。而最终的Invoker对象是从Exporter的本地map缓存中获取的,也就是暴露服务时的InvokerWrapper。

DubboProtocol

对于DubboProtocol的服务引用,其实和服务暴露的流程类似。也就是首先在本地启动一个ExchangeClient 连接到远程的ExchangeServer,这个可以创建多个客户端连接到Server,最后new 一个DubboInvoker返回给调用方。当然这个DubboInvoker就是客户端调用桩,其内部处理了异步转同步、多个客户端选其中一个调用服务的处理过程等。

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
  // find client.
  int channels = url.getPositiveIntParameter(Constants.CONNECTIONS_KEY, 1);
  ExchangeClient[] clients = new ExchangeClient[channels];
  for (int i = 0; i < clients.length; i++) {
    clients[i] = initClient(url);
  }
  // create rpc invoker.
  DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, clients);
  invokers.add(invoker);
  return invoker;
}

对于上面的initClient() 这个方法,简化后的代码如下。首先就是先进行参数校验和数据准备,如果客户端是可以延迟连接的,此时并不会直接 connect到服务端,而是直接 new 了一个延迟客户端,最终在第一次调用的时候会连接到服务端。如果不是延迟连接的,则直接调用 Exchanges.connect(),并指定了处理网络事件的 requestHandler。

private ExchangeClient initClient(URL url) {
  //设置连接应该是lazy的 
  if (url.getBooleanParameter(RpcConstants.LAZY_CONNECT_KEY)) {
    return new LazyConnectExchangeClient(url, requestHandler);
  }
  return Exchangers.connect(url, requestHandler);
}


参考:Dubbo 2.6.0 源代码、Dubbo 2.0.7 源代码