`
海浪儿
  • 浏览: 271293 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Netty4源码分析-flush

阅读更多
本文为原创,转载请注明出处                                                           
                                                        Netty4源码分析-flush

 Netty的写操作由两个步骤组成:

  1. Write:将msg存储到ChannelOutboundBuffer中
  2. Flush:将msg从ChannelOutboundBuffer中flush到套接字的发送缓冲区中。

上一篇文章分析了write,本文接着分析第二步flush: 

////DefaultChannelHandlerContext
public ChannelHandlerContext flush() {
        final DefaultChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeFlush();
        } else {
            Runnable task = next.invokeFlushTask;
            if (task == null) {
                next.invokeFlushTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeFlush();
                    }
                };
            }
            executor.execute(task);
        }

        return this;
    }

    private void invokeFlush() {
        try {
            ((ChannelOutboundHandler) handler).flush(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

 由于flush是Outbound事件,所以会调用headHandler的flush方法 

//HeadHandler        
public void flush(ChannelHandlerContext ctx) throws Exception {
            unsafe.flush();
        }

 HeadHandler调用abstractUnsafe的flush方法 

//AbstractUnsafe
public void flush() {
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }
            outboundBuffer.addFlush();
            flush0();
        }

outboundBuffer 是之前第一步中msg所存储的地方,通过调用outboundBuffer.addFlush(),将outboundBuffer 的unflushed置为tail,这样本次等待flush的msg在buffer数组中的位置区间就为[flushed, unflushed)。 

//ChannelOutboundBuffer
void addFlush() {
        unflushed = tail; 
    }

接下来的flush0方法将这个区间的msg写到套接字的发送缓冲区中。 

//ChannelOutboundBuffer
        protected void flush0() {
            if (inFlush0) {
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            inFlush0 = true;

            // Mark all pending write requests as failure if the channel is inactive.
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
                    } else {
                        outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                outboundBuffer.failFlushed(t);
                if (t instanceof IOException) {
                    close(voidPromise());
                }
            } finally {
                inFlush0 = false;
            }
        }

主要逻辑在doWrite方法里 

//NioSocketChannel
 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            // Do non-gathering write for a single buffer case.
            final int msgCount = in.size();
            if (msgCount <= 1) {
                super.doWrite(in);
                return;
            }

            // Ensure the pending writes are made of ByteBufs only.
            ByteBuffer[] nioBuffers = in.nioBuffers();
            if (nioBuffers == null) {
                super.doWrite(in);
                return;
            }

            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();

            final SocketChannel ch = javaChannel();
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes == 0) {
                    setOpWrite = true;
                    break;
                }
                expectedWrittenBytes -= localWrittenBytes;
                writtenBytes += localWrittenBytes;
                if (expectedWrittenBytes == 0) {
                    done = true;
                    break;
                }
            }

            if (done) {
                // Release all buffers
                for (int i = msgCount; i > 0; i --) {
                    in.remove();
                }

                // Finish the write loop if no new messages were flushed by in.remove().
                if (in.isEmpty()) {
                    clearOpWrite();
                    break;
                }
            } else {
                // Did not write all buffers completely.
                // Release the fully written buffers and update the indexes of the partially written buffer.

                for (int i = msgCount; i > 0; i --) {
                    final ByteBuf buf = (ByteBuf) in.current();
                    final int readerIndex = buf.readerIndex();
                    final int readableBytes = buf.writerIndex() - readerIndex;

                    if (readableBytes < writtenBytes) {
                        in.progress(readableBytes);
                        in.remove();
                        writtenBytes -= readableBytes;
                    } else if (readableBytes > writtenBytes) {
                        buf.readerIndex(readerIndex + (int) writtenBytes);
                        in.progress(writtenBytes);
                        break;
                    } else { // readableBytes == writtenBytes
                        in.progress(readableBytes);
                        in.remove();
                        break;
                    }
                }

                incompleteWrite(setOpWrite);
                break;
            }
        }
    }

 

 逻辑如下 

  1. 如果ChannelOutboundBuffer的size<=1,即其中存储的待发送的msg只占用buffer数组中的一个entry(buffer是一个Entry[]数组,参见上一篇文章),则不需要采用gathering write的方式,可以直接调用父类AbstractNioByteChannel的doWrite方法。
  2. 如果size>1,即其中存储的待发送的msg占用buffer数组中的至少两个entry,则通过调用in.nioBuffers()方法对ChannelOutboundBuffer的Entry[]数组变量buffer进行转换:将每个Entry元素中存储的msg由io.netty.buffer.ByteBuf类型转换为java.nio.ByteBuffer类型。最终得到ByteBuffer[]数组,并赋给变量buffers。
  3. 如果转换后得到的数组为空,即msg不是ByteBuf类型,则也不需要采用gathering write的方式,可以直接调用父类AbstractNioByteChannel的doWrite方法。
  4. 否则,执行gathering write方法。

a) 首先分析super.doWrite(in)方法,即父类AbstractNioByteChannel的doWrite方法

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1;

        for (;;) {
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                clearOpWrite();
                break;
            }

            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) {
                    in.remove();
                    continue;
                }

                if (!buf.isDirect()) {
                    ByteBufAllocator alloc = alloc();
                    if (alloc.isDirectBufferPooled()) {
                        // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
                        // We can do a better job by using our pooled allocator. If the current allocator does not
                        // pool a direct buffer, we rely on JDK's direct buffer pool.
                        buf = alloc.directBuffer(readableBytes).writeBytes(buf);
                        in.current(buf);
                    }
                }

                boolean setOpWrite = false;
                boolean done = false;
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    int localFlushedAmount = doWriteBytes(buf);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (!buf.isReadable()) {
                        done = true;
                        break;
                    }
                }

                in.progress(flushedAmount);

                if (done) {
                    in.remove();
                } else {
                    incompleteWrite(setOpWrite);
                    break;
                }
            } else if (msg instanceof FileRegion) {
                FileRegion region = (FileRegion) msg;
                boolean setOpWrite = false;
                boolean done = false;
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    long localFlushedAmount = doWriteFileRegion(region);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (region.transfered() >= region.count()) {
                        done = true;
                        break;
                    }
                }

                in.progress(flushedAmount);

                if (done) {
                    in.remove();
                } else {
                    incompleteWrite(setOpWrite);
                    break;
                }
            } else {
                throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
            }
        }
    }

 

protected final void incompleteWrite(boolean setOpWrite) {
        // Did not write completely.
        if (setOpWrite) {
            setOpWrite();
        } else {
            // Schedule flush again later so other tasks can be picked up in the meantime
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = new Runnable() {
                    @Override
                    public void run() {
                        flush();
                    }
                };
            }
            eventLoop().execute(flushTask);
        }
    }

protected final void setOpWrite() {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
            key.interestOps(interestOps | SelectionKey.OP_WRITE);
        }
    }

 

说明:

  • 如果当前msg为空,即buffer[flushed]存储的msg为空,则说明所有msg已经发送完毕,所以需要清除selectionKey中的OP_WRITE位。
  • 目前Netty仅支持两种类型(ByteBuf和FileRegion)的写操作,本文只对ByteBuf类型进行分析。
  • 首先调buf.readableBytes()判断buf中是否有可读的消息,即writerIndex – readerIndex>0。如果结果为0,则执行in.remove方法;否则,采用类似于自旋锁的逻辑对buf执行write操作。
    //NioSocketChannel
    protected int doWriteBytes(ByteBuf buf) throws Exception {
            final int expectedWrittenBytes = buf.readableBytes();
            final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);
            return writtenBytes;
    }
    //UnpooledHeapByteBuf
    public int readBytes(GatheringByteChannel out, int length) throws IOException {
            checkReadableBytes(length);
            int readBytes = getBytes(readerIndex, out, length, true);
            readerIndex += readBytes;
            return readBytes;
    }
    
    private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
            ensureAccessible();
            ByteBuffer tmpBuf;
            if (internal) {
                tmpBuf = internalNioBuffer();
            } else {
                tmpBuf = ByteBuffer.wrap(array);
            }
            return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));
        }
    

由于是非阻塞IO,所以最终写到发送缓冲区中的字节数writtenBytes可能会小于buf中期望写出的字节数expectedWrittenBytes。如果此时不再写,而是依赖selector的异步通知,则会导致buf里剩下的数据不能及时写出去(因为必须等到selector的下一次循环,即必须将本次循环中通知的未处理完的所有事件处理完后,以及剩下的task执行完后,然后再执行一次select,才能处理到这个channel的write事件;在这个过程中,还有可能会发生线程的上下文切换。这样,就会导致msg写到ChannelOutBoundBuffer后,会经历较大的延迟才能将消息flush到套接字的发送缓冲区。

Netty采用类似于自旋锁的逻辑,在一个循环内,多次调用write。这样就有可能将buf中的所有数据在一次flush调用中写完。循环的次数值为writeSpinCount,其默认值为16。 

但是如果一次write调用返回0,则说明发送缓冲区已经完全没有空间了,如果还继续调用write,而系统调用开销是比较大的,所以是一种浪费,此种情况可以退出循环,设置selectionKey的OP_WRITE位,以依赖selector的异步通知。 

如果在自旋期间多次调用write后,数据还是没有写完,而每次write调用的返回又不是0,说明每次的write确实写出去了一些字节,这种情况也不能立即退出flush再依赖selector的异步通知,因为有可能是自旋锁的循环次数设置小了导致buf的数据没有发送完,但实际发送缓冲区还是有空间的。因此将剩下数据的写作为一个异步任务放到当前线程的任务队列中,等待调度执行。这样当本次循环中选择的剩下的所有事件处理完后,就可以执行这个任务了,而不用等到由下次的selector唤醒。

  • 如果msg已全部写完毕,则执行in.remove()方法进行清理
//ChannelOutBoundBuffer
public boolean remove() {
        if (isEmpty()) {
            return false;
        }
        Entry e = buffer[flushed];
        Object msg = e.msg;
        if (msg == null) {
            return false;
        }
        ChannelPromise promise = e.promise;
        int size = e.pendingSize;
        e.clear();
        flushed = flushed + 1 & buffer.length - 1;
        safeRelease(msg);
        promise.trySuccess();
        decrementPendingOutboundBytes(size);
        return true;
    }

首先对buffer[flushed]对应的Entry执行clear操作

//Entry
public void clear() {
            buffers = null;
            buf = null;
            msg = null;
            promise = null;
            progress = 0;
            total = 0;
            pendingSize = 0;
            count = -1;
        }

然后将flunshed累加1,接着对msg执行基于引用计数的release操作,最后看一下decrementPendingOutboundBytes方法的实现

void decrementPendingOutboundBytes(int size) {
        // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets
        // recycled while process this method.
        Channel channel = this.channel;
        if (size == 0 || channel == null) {
            return;
        }

        long oldValue = totalPendingSize;
        long newWriteBufferSize = oldValue - size;
        while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
            oldValue = totalPendingSize;
            newWriteBufferSize = oldValue - size;
        }

        int lowWaterMark = channel.config().getWriteBufferLowWaterMark();

        if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
            if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) {
                channel.pipeline().fireChannelWritabilityChanged();
            }
        }
    }

更新ChannelOutBoundBuffer中待发送的msg大小totalPendingSize,并判断如果totalPendingSize小于channel的低水位线,则设置channel为可写,并触发ChannelWritabilityChanged事件。

 

b)最后,分析gathering write逻辑:

//NioSocketChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            //之前已介绍非gathering write的逻辑,所以此处省略相关代码
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            final SocketChannel ch = javaChannel();
            long writtenBytes = 0;
            boolean done = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes == 0) {
                    break;
                }
                expectedWrittenBytes -= localWrittenBytes;
                writtenBytes += localWrittenBytes;
                if (expectedWrittenBytes == 0) {
                    done = true;
                    break;
                }
            }
            if (done) {
                // Release all buffers
                for (int i = msgCount; i > 0; i --) {
                    in.remove();
                }
                // Finish the write loop if no new messages were flushed by in.remove().
                if (in.isEmpty()) {
                    clearOpWrite();
                    break;
                }
            } else {
                // Did not write all buffers completely.
                // Release the fully written buffers and update the indexes of the partially written buffer.
                for (int i = msgCount; i > 0; i --) {
                    final ByteBuf buf = (ByteBuf) in.current();
                    final int readerIndex = buf.readerIndex();
                    final int readableBytes = buf.writerIndex() - readerIndex;
                    if (readableBytes < writtenBytes) {
                        in.progress(readableBytes);
                        in.remove();
                        writtenBytes -= readableBytes;
                    } else if (readableBytes > writtenBytes) {
                        buf.readerIndex(readerIndex + (int) writtenBytes);
                        in.progress(writtenBytes);
                        break;
                    } else { // readableBytes == writtenBytes
                        in.progress(readableBytes);
                        in.remove();
                        break;
                    }
                }

                setOpWrite();
                break;
            }
        }
    }

  说明:

  • 同样采用类似于自旋锁的方式执行gathering write
  •  如果所有的msg都已flush到发送缓冲区中,则对这些msg执行release;如果ChannelOutBoundBuffer的isEmpty方法返回true(即执行flush期间,没有并发执行write操作导致ChannelOutBoundBuffer中新增待发送的msg,保持了unflushed不变。那么此种情况下,unflushed=flushed),则清除selectionKey的OP_WRITE位;
  • 如果还有msg未flush到发送缓冲区中,则按照flushed->unflushed的顺序对每一个ByteBuf进行处理,如果ByteBuf全部flush完,则进行release,否则仅仅更新该ByteBuf的readerIndex
  • incompleteWrite(setOpWrite)的作用之前已介绍过:如果最后一次write调用返回0,则说明发送缓冲区已经完全没有空间了,此种情况可以退出循环,设置selectionKey的OP_WRITE位,以依赖selector的异步通知。如果多次调用write后,数据还是没有写完,则将剩下数据的写作为一个异步任务放到当前线程的任务队列中,等待调度执行。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics