Dubbo传输层及交换层(一)

从上一篇文章Dubbo的编解码方式可以看出,在远程调用这一块,Dubbo主要分为2层:传输层和交换层。其中传输层(Transport)只负责对二进制数据的收发,交换层(Exchange)负责对Dubbo协议的编解码,然后调用Transport层的接口收发数据,其大致流程如下所示:

【交换层/传输层收发数据图】

在介绍Dubbo的Channel和ChannelHandler之前,我们先看一下Netty的框架设计。

Netty中的Channel是对网络Socket的封装,通过Channel可以和网络对端进行数据的收发。在数据收发的过程中,会经过入站和出站的ChannelHandler处理。通常来说在连接建立/数据发送/数据接受等阶段,在ChannelHandler会产生对应的事件回调。这样ChannelHandler可以根据事件的类型,执行具体的处理逻辑。

而Dubbo的网络层也是参考了Netty的设计,重新定义了Channel和ChannelHandler,下面我们依次看一下其类之间的关系及具体的代码实现。

Channel

从上面的继承关系可以看出,Channel继承了Endpoint接口,Endpoint是对一个网络节点的抽象,具有着数据收发,获取Dubbo的URL等功能。而Channel在Endpoint的基础上,又增加了属性存储的方法。

public interface Endpoint {

    /**
     * get url.
     */
    URL getUrl();

    /**
     * get channel handler.
     */
    ChannelHandler getChannelHandler();

    /**
     * get local address.
     */
    InetSocketAddress getLocalAddress();

    /**
     * send message.
     */
    void send(Object message) throws RemotingException;

    /**
     * send message.
     * @param sent    是否已发送完成
     */
    void send(Object message, boolean sent) throws RemotingException;

    /**
     * close the channel.
     */
    void close();

    /**
     * Graceful close the channel.
     */
    void close(int timeout);

    /**
     * is closed.
     */
    boolean isClosed();

}

public interface Channel extends Endpoint {

    /**
     * get remote address.
     */
    InetSocketAddress getRemoteAddress();

    /**
     * is connected.
     */
    boolean isConnected();

    /**
     * has attribute.
     */
    boolean hasAttribute(String key);

    /**
     * get attribute.
     */
    Object getAttribute(String key);

    /**
     * set attribute.
     */
    void setAttribute(String key, Object value);

    /**
     * remove attribute.
     */
    void removeAttribute(String key);

}

而对于AbstractChannel,其实现了Channel接口之外,又继承了AbstractPeer,AbstractChannel类本身并没有逻辑性的代码。下面我们看下AbstractPeer的实现:

private final ChannelHandler handler;
private volatile URL url;
private volatile boolean closed;

public AbstractPeer(URL url, ChannelHandler handler) {
	if (url == null) {
		throw new IllegalArgumentException("url == null");
	}
	if (handler == null) {
		throw new IllegalArgumentException("handler == null");
	}
	this.url = url;
	this.handler = handler;
}

AbstractPeer是后面即将谈到的Server/Client的抽象父类,从上面的属性可以看出其有着ChannelHandler和URL。下面我们看下NettyChannel的实现:

private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
private final org.jboss.netty.channel.Channel channel;
private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();

private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
	super(url, handler);
	if (channel == null) {
		throw new IllegalArgumentException("netty channel == null;");
	}
	this.channel = channel;
}

首先NettyChannel有个static的变量,里面保存着Netty的Channel到自身对象的映射Map,然后就是组合了Netty的Channel和当前Dubbo的Channel属性的attributes。也即是说Dubbo的Channel通过组合通信框架(如Netty)的Channel,来实现自身通信的功能。

ChannelHandler

对于ChannelHandler来说,从名字上就可以看出是对Channel上产生的一些列事件,所产生的事件回调触发。这个我们可以类比Netty的ChannelHandler,也可以看下Dubbo的ChannelHandler定义:

public interface ChannelHandler {

    /**
     * on channel connected.
     */
    void connected(Channel channel) throws RemotingException;

    /**
     * on channel disconnected.
     */
    void disconnected(Channel channel) throws RemotingException;

    /**
     * on message sent.
     */
    void sent(Channel channel, Object message) throws RemotingException;

    /**
     * on message received.
     */
    void received(Channel channel, Object message) throws RemotingException;

    /**
     * on exception caught.
     */
    void caught(Channel channel, Throwable exception) throws RemotingException;

}

从上面接口声明可以看出,当有连接建立或断开,数据接收或发送等网络事件触发时,会回调ChannelHandler的对应的方法,让调用方执行相应的业务逻辑。

对于ChannelHandler而言下面主要分2个方面讲述:基于ChannelHandler所形成的Server和Client和ChannelHandler触发的执行模式。

Server和Client

从上图可以看出,AbstractPeer具有Endpoint(网络节点)和ChannelHandler(网络事件回调)的能力。在AbstractPeer的基础上,AbstractEndpoint又增加了编码器和重置超时时间的能力。其核心代码如下所示:

private Codec codec;
private int timeout;
private int connectTimeout;

public AbstractEndpoint(URL url, ChannelHandler handler) {
	super(url, handler);
	this.codec = ExtensionLoader.getExtensionLoader(Codec.class).getExtension(url.getParameter(Constants.CODEC_KEY, "telnet"));
	this.timeout = url.getPositiveIntParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
	this.connectTimeout = url.getPositiveIntParameter(Constants.CONNECT_TIMEOUT_KEY, timeout);
}

对于Server端,我们看下AbstractServer所实现的一个接口Server。

public interface Server extends Endpoint, Resetable {

    /**
     * is bound.
     */
    boolean isBound();

    /**
     * get channels.
     */
    Collection<Channel> getChannels();

    /**
     * get channel.
     */
    Channel getChannel(InetSocketAddress remoteAddress);

}

除了网络节点和网络事件处理的能力之外,在Server接口上又增加了,获取连接到服务端的所有Channel,和根据IP获取对应的Channel等。对于Server这条线,我们下面看下AbstractServer的实现:

ExecutorService executor;
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
// 最大连接数
private int accepts;
private int idleTimeout = 600; //600 seconds

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
	super(url, handler);
	localAddress = getUrl().toInetSocketAddress();

	String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
	int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
	if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
		bindIp = NetUtils.ANYHOST;
	}
	bindAddress = new InetSocketAddress(bindIp, bindPort);
	this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
	this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
	try {
	    // 开启服务, 监听端口
		doOpen();
		if (logger.isInfoEnabled()) {
			logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
		}
	} catch (Throwable t) {
		throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
				+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
	}
	//fixme replace this with better method
	DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
	executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

从AbstractServer的核心代码可以看出,抽象类提供了创建的模板方法,其中包含了回调子类的doOpen(),去完成一个真正的端口监听。从属性上看,上面有服务端限制连接的最大数accepts和绑定的IP等信息。再往下走,就到了具体的实现类了。我们看一下Netty4的实现(dubbo 2.6.0):

private Map<String, Channel> channels; // <ip:port, channel>
private ServerBootstrap bootstrap;
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    // 对ChannelHandler做包装,形成异步化
	super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

@Override
protected void doOpen() throws Throwable {
	NettyHelper.setNettyLoggerFactory();

	bootstrap = new ServerBootstrap();

	bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
	workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
			new DefaultThreadFactory("NettyServerWorker", true));

	final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
	channels = nettyServerHandler.getChannels();

	bootstrap.group(bossGroup, workerGroup)
			.channel(NioServerSocketChannel.class)
			.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
			.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
			.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
			.childHandler(new ChannelInitializer<NioSocketChannel>() {
				@Override
				protected void initChannel(NioSocketChannel ch) throws Exception {
					NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
					ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
					        // 设置编解码
							.addLast("decoder", adapter.getDecoder())
							.addLast("encoder", adapter.getEncoder())
							// 设置处理请求响应的Handler
							.addLast("handler", nettyServerHandler);
				}
			});
	// 绑定IP
	ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
	channelFuture.syncUninterruptibly();
	channel = channelFuture.channel();
}

从上面的Netty4的实现可以看出,在构造器中,直接调用了AbstractServer的构造器。不过对ChannelHandler做了包装处理,形成了异步处理的效果。然后AbstractServer会调用子类的doOpen()方法,进入具体的IP绑定和服务启动。这里对于Netty4本身的一些API就不做过多的解释了,这里我们来看下在pipeline中设置的编解码及处理请求响应的Handler。

对于编解码的Handler,这里面使用了NettyCodecAdapter去封装了编码和解码器,这两个编解码器分别是作为内部类实现的,这里我们先看下NettyCodecAdapter的核心属性:

private final ChannelHandler encoder = new InternalEncoder();
private final ChannelHandler decoder = new InternalDecoder();
private final Codec2 codec;
private final URL url;
private final com.alibaba.dubbo.remoting.ChannelHandler handler;

public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
	this.codec = codec;
	this.url = url;
	this.handler = handler;
}

可以看出,除了编码和解码器,还有具体的编解码实现Codec2,这里面通常是DubboCodec。其他的属性都是辅助编码实现的,下面我们分别看下InternalEncode和InternalDecode的实现

private class InternalEncoder extends MessageToByteEncoder {
	@Override
	protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
		com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
		Channel ch = ctx.channel();
		NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
		try {
			codec.encode(channel, buffer, msg);
		} finally {
			NettyChannel.removeChannelIfDisconnected(ch);
		}
	}
}

编码器还是相对比较简单的,对于Netty4的适配,直接继承了MessageToByteEncoder。在重写encode的方法里,直接使用了Codec2(通常为DubboCodec,新版本使用的DubboCountCodec也是对DubboCodec的简单封装)的编码。对于InternalDecoder而言,实现如下:

private class InternalDecoder extends ByteToMessageDecoder {
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
		ChannelBuffer message = new NettyBackedChannelBuffer(input);
		NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
		Object msg;
		int saveReaderIndex;

		try {
			// decode object.
			do {
				saveReaderIndex = message.readerIndex();
				try {
					msg = codec.decode(channel, message);
				} catch (IOException e) {
					throw e;
				}
				if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
					message.readerIndex(saveReaderIndex);
					break;
				} else {
					//is it possible to go here ?
					if (saveReaderIndex == message.readerIndex()) {
						throw new IOException("Decode without read data.");
					}
					if (msg != null) {
						out.add(msg);
					}
				}
			} while (message.readable());
		} finally {
			NettyChannel.removeChannelIfDisconnected(ctx.channel());
		}
	}
}

可以看出,当解码时如果此时接收到的字节数不完整,此时会解码器会返回 NEED_MORE_INPUT,此时会继续接受数据,直到接收到完整的报文,最终完成解码过程。