Skip to content

2019 09 14 netty案例,netty4.1源码分析篇五《一行简单的writeAndFlush都做了哪些事》

fuzhengwei edited this page May 16, 2020 · 1 revision

作者:小傅哥
博客:https://bugstack.cn - 原创系列专题

沉淀、分享、成长,让自己和他人都能有所收获!

前言介绍

对于使用netty的小伙伴来说,ctx.writeAndFlush()再熟悉不过了,它可以将我们的消息发送出去。那么它都执行了那些行为呢,是怎么将消息发送出去的呢。

                                                I/O Request
                                           via {@link Channel} or
                                       {@link ChannelHandlerContext}
                                                     |
 +---------------------------------------------------+---------------+
 |                           ChannelPipeline         |               |
 |                                                  \|/              |
 |    +---------------------+            +-----------+----------+    |
 |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 |    +----------+----------+            +-----------+----------+    |
 |              /|\                                  |               |
 |               |                                  \|/              |
 |    +----------+----------+            +-----------+----------+    |
 |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 |    +----------+----------+            +-----------+----------+    |
 |              /|\                                  .               |
 |               .                                   .               |
 | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 |        [ method call]                       [method call]         |
 |               .                                   .               |
 |               .                                  \|/              |
 |    +----------+----------+            +-----------+----------+    |
 |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 |    +----------+----------+            +-----------+----------+    |
 |              /|\                                  |               |
 |               |                                  \|/              |
 |    +----------+----------+            +-----------+----------+    |
 |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 |    +----------+----------+            +-----------+----------+    |
 |              /|\                                  |               |
 +---------------+-----------------------------------+---------------+
                 |                                  \|/
 +---------------+-----------------------------------+---------------+
 |               |                                   |               |
 |       [ Socket.read() ]                    [ Socket.write() ]     |
 |                                                                   |
 |  Netty Internal I/O Threads (Transport Implementation)            |
 +-------------------------------------------------------------------+

源码分析

1、由一行简单发送消息开始

发送消息的代码非常简单,也是我们非常常用的发送消息的方式ctx.writeAndFlush

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
	System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
	//通知客户端链消息发送成功
	String str = "客户端收到[微信公众号:bugstack虫洞栈]:" + new Date() + " " + msg + "\r\n";
	ctx.writeAndFlush(str);
}

2、跟进writeAndFlush | ChannelHandlerContext.writeAndFlush

AbstractChannelHandlerContext.java

@Override
public ChannelFuture writeAndFlush(Object msg) {
	return writeAndFlush(msg, newPromise());
}

@Override
public ChannelPromise newPromise() {
	return new DefaultChannelPromise(channel(), executor());
}

在这段代码中我们可以看到,writeAndFlush方法里提供了一个默认的newPromise()作为参数传递。{promise:v. 许诺;承诺;答应;保证;使很可能;预示}在Netty中发送消息是一个异步操作,那么可以通过往hannelPromise中注册回调监听listener来得到该操作是否成功。

在发送消息时添加监听

ctx.writeAndFlush("hi 微信公众号:bugstack虫洞栈 | 欢迎关注&获取专题源码", ctx.newProgressivePromise().addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        future.isSuccess();
    }
}));

3、继续向下一层跟进代码 | AbstractChannelHandlerContext.invokeWriteAndFlush

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

3.1、首先通过invokeHandler()判断通道处理器已添加到管道

Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
yet. If not return {@code false} and if called or could not detect return {@code true}.
If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.

3.2、执行消息处理 invokeWrite0;首先将消息内容放入输出缓冲区中[ChannelOutboundBuffer] invokeFlush0;然后将输出缓冲区中的数据通过socket发送到网络中

4、分析invokeWrite0执行内容 | AbstractChannelHandlerContext.invokeWrite0

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

((ChannelOutboundHandler) handler()).write是一个出站事件[ChannelOutboundHandler],会由ChannelOutboundHandlerAdapter处理;

/**
 * Calls {@link ChannelHandlerContext#write(Object, ChannelPromise)} to forward
 * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
 *
 * Sub-classes may override this method to change behavior.
 */
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

接下来会走到ChannelPipeline中,来执行网络数据发送;| DefaultChannelPipeline > HeadContext.write

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}

5、unsafe.write执行分析

unsafe是我们构建NioServerSocketChannel或NioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。NioServerSocketChannel中持有的unsafe成员变量是NioMessageUnsafe对象,而NioSocketChannel中持有的unsafe成员变量是NioSocketChannelUnsafe对象。这里我们要看的是NioSocketChannel的write流程

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of th
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, newWriteException(initialCloseCause));
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    }
    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    outboundBuffer.addMessage(msg, size, promise);
}

https://github.com/netty/netty/issues/2362

  • 获取该NioSocketChannel的ChannelOutboundBuffer成员属性。(确切地来说ChannelOutboundBuffer是NioSocketChannelUnsafe对象中的成员属性,而NioSocketChannelUnsafe才是NioSocketChannel的成员属性。)每一个NioSocketChannel会维护一个它们自己的ChannelOutboundBuffer,用于存储待出站写请求。 判断该outboundBuffer是否为null,如果为null则说明该NioSocketChannel已经关闭了,那么就会标志该异步写操作为失败完成,并释放写消息后返回。

AbstractNioByteChannel.java | filterOutboundMessage过滤待发送的消息:

@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        return newDirectBuffer(buf);
    }
    if (msg instanceof FileRegion) {
        return msg;
    }
    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

过滤待发送的消息,只有ByteBuf(堆 or 非堆)以及 FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆ByteBuf转换为一个非堆的ByteBuf返回。也就说,最后会通过socket传输的对象时非堆的ByteBuf和FileRegion。 [size = pipeline.estimatorHandle().size(msg);]估计待发送数据的大小:

DefaultMessageSizeEstimator.java | 通过ByteBuf.readableBytes()判断消息内容大小,估计待发送消息数据的大小,如果是FileRegion的话直接饭0,否则返回ByteBuf中可读取字节数。

private static final class HandleImpl implements Handle {
    private final int unknownSize;
    private HandleImpl(int unknownSize) {
        this.unknownSize = unknownSize;
    }
    @Override
    public int size(Object msg) {
        if (msg instanceof ByteBuf) {
            return ((ByteBuf) msg).readableBytes();
        }
        if (msg instanceof ByteBufHolder) {
            return ((ByteBufHolder) msg).content().readableBytes()
        }
        if (msg instanceof FileRegion) {
            return 0;
        }
        return unknownSize;
    }
}

ChannelOutboundBuffer.java | ChannelOutboundBuffer.addMessage将消息加入outboundBuffer中等待发送

/**
 * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
 * the message was written.
 */
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

6、ChannelOutboundBuffer出栈

一个内部的数据结构,被AbstractChannel用于存储它的待出站写请求。 ChannelOutboundBuffer中有两个属性private Entry unflushedEntry、private Entry flushedEntry。它们都是用Entry对象通过next指针来维护的一个单向链表。以及一个private Entry tailEntry;对象表示始终指向最后一个Entry对象(即,最后加入到该ChannelOutboundBuffer中的写请求的数据消息) unflushedEntry表示还未刷新的ByteBuf的链表头;flushedEntry表示调用flush()操作时将会进行刷新的ByteBuf的链表头。

7、Entry对象

static final class Entry {
	private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
		@Override
		protected Entry newObject(Handle<Entry> handle) {
			return new Entry(handle);
		}
	};

	private final Handle<Entry> handle;
	Entry next;
	Object msg;
	ByteBuffer[] bufs;
	ByteBuffer buf;
	ChannelPromise promise;
	long progress;
	long total;
	int pendingSize;
	int count = -1;
	boolean cancelled;

	private Entry(Handle<Entry> handle) {
		this.handle = handle;
	}

	static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
		Entry entry = RECYCLER.get();
		entry.msg = msg;
		entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
		entry.total = total;
		entry.promise = promise;
		return entry;
	}

	int cancel() {
		if (!cancelled) {
			cancelled = true;
			int pSize = pendingSize;

			// release message and replace with an empty buffer
			ReferenceCountUtil.safeRelease(msg);
			msg = Unpooled.EMPTY_BUFFER;

			pendingSize = 0;
			total = 0;
			progress = 0;
			bufs = null;
			buf = null;
			return pSize;
		}
		return 0;
	}

	void recycle() {
		next = null;
		bufs = null;
		buf = null;
		msg = null;
		promise = null;
		progress = 0;
		total = 0;
		pendingSize = 0;
		count = -1;
		cancelled = false;
		handle.recycle(this);
	}

	Entry recycleAndGetNext() {
		Entry next = this.next;
		recycle();
		return next;
	}
}

Entry是ChannelOutboundBunffer的一个内部类,它是对真实的写消息数据以及其相关信息的一个封装。大致封装了如下信息: a) pendingSize:记录有该ByteBuf or ByteBufs 中待发送数据大小 和 对象本身内存大小 的累加和; b) promise:该异步写操作的ChannelPromise(用于在完成真是的网络层write后去标识异步操作的完成以及回调已经注册到该promise上的listeners); c) total:待发送数据包的总大小(该属性与pendingSize的区别在于,如果是待发送的是FileRegion数据对象,则pengdingSize中只有对象内存的大小,即真实的数据大小被记录为0;但total属性则是会记录FileRegion中数据大小,并且total属性是不包含对象内存大小,仅仅是对数据本身大小的记录); e) msg:原始消息对象的引用; f) count:写消息数据个数的记录(如果写消息数据是个数组的话,该值会大于1) 这里说明下,pendingSize属性记录的不单单是写请求数据的大小,记录的是这个写请求对象的大小。这是什么意思了?这里做个简单的介绍: 一个对象占用的内存大小除了实例数据(instance data),还包括对象头(header)以及对齐填充(padding)。所以一个对象所占的内存大小为『对象头 + 实例数据 + 对齐填充』,即

CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD

// Assuming a 64-bit JVM:
//  - 16 bytes object header
//  - 8 reference fields
//  - 2 long fields
//  - 2 int fields
//  - 1 boolean field
//  - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
        SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

假设的是64位操作系统下,且没有使用各种压缩选项的情况。对象头的长度占16字节;引用属性占8字节;long类型占8字节;int类型占4字节;boolean类型占1字节。同时,由于HotSpot VM的自动内存管理系统要求对象起始地址必须是8字节的整数倍,也就是说对象的大小必须是8字节的整数倍,如果最终字节数不为8的倍数,则padding会补足至8的倍数。

addMessage方法主要就是将请求写出的数据封装为Entry对象,然后加入到tailEntry和unflushedEntry中。 然后调用『incrementPendingOutboundBytes(entry.pendingSize, false);』对totalPendingSize属性以及unwritable字段做调整。 totalPendingSize字段记录了该ChannelOutboundBuffer中所有带发送Entry对象的占的总内存大小和所有带发送数据的大小。unwritable用来标示当前该Channel要发送的数据是否已经超过了设定 or 默认的WriteBufferWaterMark的high值。如果当前操作导致了待写出的数据(包括Entry对象大小以及真实需要传输数据的大小)超过了设置写缓冲区的高水位,那么将会触发fireChannelWritabilityChanged事件。


上一篇:netty案例,netty4.1源码分析篇四《ByteBuf的数据结构在使用方式中的剖析》

下一篇:netty案例,netty4.1源码分析篇六《Netty异步架构监听类Promise源码分析》

微信搜索「bugstack虫洞栈」公众号,关注后回复「rpc案例源码」获取本文源码&更多原创专题案例!

📝 首页

🌏 知识星球码农会锁

实战项目:「DDD+RPC分布式抽奖系统」、专属小册、问题解答、简历指导、架构图稿、视频课程

🐲 头条

⛳ 目录

  1. 源码 - :octocat: 公众号:bugstack虫洞栈 文章所涉及到的全部开源代码
  2. Java
  3. Spring
  4. 面向对象
  5. 中间件
  6. Netty 4.x
  7. 字节码编程
  8. 💯实战项目
  9. 部署 Dev-Ops
  10. 📚PDF 下载
  11. 关于

💋 精选

🐾 友链

建立本开源项目的初衷是基于个人学习与工作中对 Java 相关技术栈的总结记录,在这里也希望能帮助一些在学习 Java 过程中遇到问题的小伙伴,如果您需要转载本仓库的一些文章到自己的博客,请按照以下格式注明出处,谢谢合作。

作者:小傅哥
链接:https://bugstack.cn
来源:bugstack虫洞栈

2021年10月24日,小傅哥 的文章全部开源到代码库 CodeGuide 中,与同好同行,一起进步,共同维护。

这里我提供 3 种方式:

  1. 提出 Issue :在 Issue 中指出你觉得需要改进/完善的地方(能够独立解决的话,可以在提出 Issue 后再提交 PR )。
  2. 处理 Issue : 帮忙处理一些待处理的 Issue
  3. 提交 PR: 对于错别字/笔误这类问题可以直接提交PR,无需提交Issue 确认。

详细参考:CodeGuide 贡献指南 - 非常感谢你的支持,这里会留下你的足迹

  • 加群交流 本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “小傅哥” 微信(fustack),备注:加群。
微信:fustack

  • 公众号(bugstack虫洞栈) - 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。
公众号:bugstack虫洞栈

感谢以下人员对本仓库做出的贡献或者对小傅哥的赞赏,当然不仅仅只有这些贡献者,这里就不一一列举了。如果你希望被添加到这个名单中,并且提交过 Issue 或者 PR,请与我联系。

Clone this wiki locally