Dubbo传输层及交换层(二)

接上篇:《Dubbo传输层及交换层(一)

Exchange层

Exchange层是对Transport层的封装,让传输的二进制数据转换为Dubbo可以识别的Request / Response,这个我们从开篇的交互示意图就可以看出来。下面我们看下Exchange的类结构。

Exchange层的每个组件,都对应这Transport层的组件,分别为 Handler、Channel、Server、Client。我们首先看下ExchangeHandler,其继承了TelnetHandler和ChannelHandler,新增了reply方法如下所示:

ExchangeHandler

public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
  /**
   * reply.
   */
  Object reply(ExchangeChannel channel, Object request) throws RemotingException;
}

其子类有个抽象的 ExchangeHandlerAdapter,这个Adapter中的 reply方法是个空实现,最终在DubboProtocol中实现了reply方法,实现Dubbo协议相关的相应方法,这个方法是在Server端接收到消息后(received方法中)调用的,对于DubboProtocol的代码这里不详细展开。

最后我们看到了HeaderExchangeHandler,它并不是继承了ExchangeHandler,而是以组合的方式获取DubboProtocol中的实现。但是其继承了ChannelHandler接口,这主要是为了封装上层的ChannelHandler给底层的通讯框架使用(如Netty)。下面罗列了HeaderExchangeHandler的主要代码:

private final ExchangeHandler handler;

public HeaderExchangeHandler(ExchangeHandler handler) {
  if (handler == null) {
    throw new IllegalArgumentException("handler == null");
  }
  this.handler = handler;
}

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
  Response res = new Response(req.getId(), req.getVersion());
  if (req.isHeartbeat()) {
    res.setHeartbeat(true);
    return res;
  }

  if (req.isBroken()) {
    Object data = req.getData();

    String msg;
    if (data == null) {
      msg = null;
    } else if (data instanceof Throwable) {
      msg = StringUtils.toString((Throwable) data);
    } else {
      msg = data.toString();
    }
    res.setErrorMessage("Fail to decode request due to: " + msg);
    res.setStatus(Response.BAD_REQUEST);

    return res;
  }

  // find handler by message class.
  Object msg = req.getData();
  if (handler == null) {// no handler.
    res.setStatus(Response.SERVICE_NOT_FOUND);
    res.setErrorMessage("InvokeHandler not found, Unsupported protocol object: " + msg);
  } else {
    try {
      // handle data.
      Object result = handler.reply(channel, msg);
      res.setStatus(Response.OK);
      res.setResult(result);
    } catch (Throwable e) {
      res.setStatus(Response.SERVICE_ERROR);
      res.setErrorMessage(StringUtils.toString(e));
    }
  }
  return res;
}

可以看出 handleRequest()方法中拿到了ExchangeCodec解码出来的请求对象 Request,然后封装响应报文,最终会调用DubboProtocol中的reply() 方法获取到服务端的invoker 桩对象,然后执行对应的业务逻辑,拿到结果result 后,封装返回Response。

对于HeaderExchangeHandler作为Transport层的入参,最终注入到 Transport层的Handler的实现如下:

public class HeaderExchanger implements Exchanger {
  
  public static final String NAME = "header";

  @Override
  public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeClient(Transporters.connect(url, new HeaderExchangeHandler(handler)));
  }

  @Override
  public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new HeaderExchangeHandler(handler)));
  }

}

ExchangeChannel

下面我们看一下ExchangeChannel,从类图上可以看出,它继承了Channel接口。然后新增了和Dubbo协议相关的方法。如下方的request(Object request) 中的request对象就是业务层的请求,在HeaderExchangeChannel中封装成了Dubbo协议的Request对象。它的返回值是ResponseFuture,这是通过Future模式,让RPC请求同步转异步,对于ResponseFuture这里不展开阐述。

public interface ExchangeChannel extends Channel {

  /**
   * send request.
   * 
   * @param request
   * @return
   * @throws RemotingException
   */
  ResponseFuture request(Object request) throws RemotingException;

  /**
   * send request.
   * 
   * @param request
   * @param timeout
   * @return
   * @throws RemotingException
   */
  ResponseFuture request(Object request, int timeout) throws RemotingException;

  /**
   * get message handler.
   * 
   * @return message handler
   */
  ExchangeHandler getExchangeHandler();

  /**
   * graceful close.
   * 
   * @param timeout
   */
  @Override
  void close(int timeout);

}

对于HeaderExchangeChannel的实现,还是相对比较简单的,这里我们简单的看下它的sent方法,也就是直接对业务的请求message,封装成了Dubbo协议的Request对象:

@Override
public void send(Object message, boolean sent) throws RemotingException {
  if (closed) {
    throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
  }
  if (message instanceof Request || message instanceof Response || message instanceof String) {
    channel.send(message, sent);
  } else {
    Request request = new Request();
    request.setVersion("2.0.0");
    request.setTwoWay(false);
    request.setData(message);
    channel.send(request, sent);
  }
}

ExchangeClient

对于ExchangeClient而言,其中没有定义接口方法,仅仅是继承了Client和ExchangeChannel。也可以简单的理解为拥有了消息的发送能力即可。下面我们看下HeaderExchangeClient的主要实现,构造器中的Client是Transport层的Client(如NettyClient)。其含有2个属性,Transport层的Client和ExchangeChannel。我们知道HeaderExchangeChannel是用来收发Dubbo的Request/Response的,因此这里的主要属性是基于NettyClient构造的ExchangeChannel。

public class HeaderExchangeClient implements ExchangeClient {

  private final Client client;
  private final ExchangeChannel channel;

  public HeaderExchangeClient(Client client) {
    if (client == null) {
      throw new IllegalArgumentException("client == null");
    }
    this.client = client;
    this.channel = new HeaderExchangeChannel(client);
  }
}

ExchangeServer

ExchangeServer接口仅仅继承了Server接口,并新增了对Exchange层的ExchangeChannel的管理,仅此而已。

public interface ExchangeServer extends Server {

  /**
   * get channels.
   * 
   * @return channels
   */
  Collection<ExchangeChannel> getExchangeChannels();

  /**
   * get channel.
   * 
   * @param remoteAddress
   * @return channel
   */
  ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);

}

下面我们看下HeaderExchangeServer的简单实现,我们可以看到在构造的时候,会启动一个心跳的任务去检测客户端的连接是否正常。

public class HeaderExchangeServer implements ExchangeServer {

  private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory("dubbo-remoting-server-heartbeat", true));
  // 心跳定时器
  private ScheduledFuture<?> heatbeatTimer;
  // 心跳超时,毫秒。缺省0,不会执行心跳。
  private int heartbeat;
  private int heartbeatTimeout;
  private final Server server;
  private volatile boolean closed = false;

  public HeaderExchangeServer(Server server) {
    if (server == null) {
      throw new IllegalArgumentException("server == null");
    }
    this.server = server;
    this.heartbeat = server.getUrl().getIntParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT);
    this.heartbeatTimeout = server.getUrl().getIntParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
    if (heartbeatTimeout < heartbeat * 2) {
      throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    }
    startHeatbeatTimer();
  }
}

请求响应详细流程


从图上我们可以看出,上图一共分为4层。最下面一层是通信框架层,也就是直接和Netty组件交互的一层。其中 Channel 指的是 Netty 框架的Channel,也就是直接发送数据的通道。另外,NettyHandler 是继承Netty框架的Handler组件,而NettyHandler 本身又组合了上层的Handler,最终完成对底层网络事件的上层业务逻辑处理。其类的组合形式见如下示意图:


最下面的一层是Transport层,这一层的NettyServer 和 NettyClient(Dubbo的NettyChannel的子类) 接口都是 Dubbo自定义的。对于Channel而言,是组合了底层Netty框架的Channel,并在此基础上增加了attributes 的属性。对于ChannelHandler 最终的实现由 NettyClient / NettyServer 承载,而最终NettyClient / NettyServer 又会作为ChannelHandler的形式,作为客户端 / 服务端启动的入参,传入底层的Netty框架层,就如上图所示的那样。除此之外,这一层还有对于ChannelHandler的异步处理的封装层,也就是ChannelHandlerWrapper。

再往上的一层是Exchange层,这一层可以认为就是应用层了。其中收发的数据都是Dubbo协议对应的 Request / Response对象,并且对于发送的请求有异步转同步的处理等等。最上层是Protocol层,这一层主要是实现了ExchangeHandler的reply() 方法,通常用来实现,当Server端接收到了客户端的请求后,用来返回响应报文的(Response)。

对于HeaderExchangeChannel,组合了下层的 NettyChannel,并对上层发过来的 Object message 原始对象做了Dubbo的Request对象封装,也就是Dubbo请求协议的封装。然后再发送出去。由此看来对于 Channel 的封装,相对于ChannelHandler的封装嵌套是反着的,如下所示:



参考:Dubbo 2.6.0 源代码、Dubbo 2.0.7 源代码