`
海浪儿
  • 浏览: 271509 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

netty4服务端启动源码分析-线程的创建

阅读更多

本文为原创,转载请注明出处

netty4服务端启动源码分析-线程的创建

 

本文分析Netty中boss和worker的线程的创建过程:

以下代码是服务端的启动代码,线程的创建就发生在其中。

EventLoopGroup bossGroup = new NioEventLoopGroup();

 

NioEventLoopGroup的类关系图如下:



 构造方法执行过程如下:

 

// NioEventLoopGroup
public NioEventLoopGroup() {
        this(0);
    }

public NioEventLoopGroup(int nThreads) {
        this(nThreads, null);
    }

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }

public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
        super(nThreads, threadFactory, selectorProvider);
    }

 看下父类MultithreadEventLoopGroup的构造方法

// MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

 注:如果没有指定创建的线程数量,则默认创建的线程个数为DEFAULT_EVENT_LOOP_THREADS,该数值为:处理器数量x2

 

再来看MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup的构造方法

 

 /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
     * @param args              arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (threadFactory == null) {
            threadFactory = newDefaultThreadFactory();
        }

        children = new SingleThreadEventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(threadFactory, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    }

 变量children就是用来存放创建的线程的数组,里面每一个元素都通过children[i] = newChild(threadFactory, args)创建。而newChild方法则由子类NioEventLoopGroup实现

 

// NioEventLoopGroup
  protected EventExecutor newChild(
            ThreadFactory threadFactory, Object... args) throws Exception {
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
    }

 每个元素的真实类型为NioEventLoop,而NioEventLoop的类关系图如下

 



 (注:其实有点变态的,譬如EventLoop继承EventLoopGroup,不知道是啥原因,仅仅是因为EventLoop里有个方法parent(),返回EventLoopGroup这个功能吗?后续待确认)

 

接着看NioEventLoop的构造函数:

 

// NioEventLoop
 NioEven NioEventLoop tLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
        super(parent, threadFactory, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        provider = selectorProvider;
        selector = openSelector();
    }

 首先分析一下selector = openSelector()

 

 

// NioEventLoop
 private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }

        try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());
            selectorImplClass.isAssignableFrom(selector.getClass());
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);

            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable t) {
            selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
        }

        return selector;
    }

       这里对sun.nio.ch.SelectorImpl中的selectedKeys和publicSelectedKeys做了优化,NioEventLoop中的变量selectedKeys的类型是SelectedSelectionKeySet,有哪些优化呢?(内部用两个数组存储?初始分配数组大小置为1024避免频繁扩容?当大小超过1024时,对数组进行双倍扩容?)。

       利用反射,当注册到selector中的selectionKey已准备就绪时,selectedKeys中的元素就不会为空,后面会根据selectedKeys进行分发。

 

最后分析super(parent, threadFactory, false),即父类SingleThreadEventExecutor的构造函数

 /**
     * Create a new instance
     *
     * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
     * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
     * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
     *                          executor thread
     */
    protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }

        this.parent = parent;
        this.addTaskWakesUp = addTaskWakesUp;

        thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    if (state < ST_SHUTTING_DOWN) {
                        state = ST_SHUTTING_DOWN;
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error(
                                "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            synchronized (stateLock) {
                                state = ST_TERMINATED;
                            }
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                        "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });

        taskQueue = newTaskQueue();
    }

    /**
     * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
     * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
     * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
     * implementation that does not support blocking operations at all.
     */
    protected Queue<Runnable> newTaskQueue() {
        return new LinkedBlockingQueue<Runnable>();
    }

 boss线程就在此处创建:thread = threadFactory.newThread(new Runnable()

同时也创建了线程的任务队列,是一个LinkedBlockingQueue结构。

SingleThreadEventExecutor.this.run()由子类NioEventLoop实现,后面的文章再进行分析

 

总结:

EventLoopGroup bossGroup = new NioEventLoopGroup()发生了以下事情:

      1、 为NioEventLoopGroup创建数量为:处理器个数 x 2的,类型为NioEventLoop的实例。每个NioEventLoop实例 都持有一个线程,以及一个类型为LinkedBlockingQueue的任务队列

      2、线程的执行逻辑由NioEventLoop实现

      3、每个NioEventLoop实例都持有一个selector,并对selector进行优化。

  • 大小: 22.3 KB
  • 大小: 27.6 KB
分享到:
评论
8 楼 kyorisvc2012 2018-01-18  
EventLoopGroup和EventLoop的继承关系感觉就是为了形成一个组合模式
7 楼 至尊宝_唯一 2016-10-25  
还是没有能理解selectedKeys的优化。。。
6 楼 zhouliangyc1121 2016-05-30  
受教啦 
5 楼 congqingbin 2014-08-23  
4.0.9包下载下来后怎么看源代码呢?
4 楼 Peng-Lee 2014-01-11  
写得不错,楼主
3 楼 海浪儿 2013-12-03  
谢谢
2 楼 rentaoshu 2013-12-03  
  好文,有种相见恨晚的感觉
1 楼 lxzh504 2013-08-16  
呵呵,这么多人浏览过就没有人评论?这么好的文章。顶一个。

相关推荐

    高清Netty5.0架构剖析和源码解读

    NIO客户端13 3.Netty源码分析16 3.1. 服务端创建16 3.1.1. 服务端启动辅助类ServerBootstrap16 3.1.2. NioServerSocketChannel 的注册21 3.1.3. 新的客户端接入25 3.2. 客户端创建28 3.2.1. 客户端连接辅助类...

    精通并发与netty视频教程(2018)视频教程

    54_Netty服务端初始化过程与反射在其中的应用分析 55_Netty提供的Future与ChannelFuture优势分析与源码讲解 56_Netty服务器地址绑定底层源码分析 57_Reactor模式透彻理解及其在Netty中的应用 58_Reactor模式与Netty...

    精通并发与netty 无加密视频

    第54讲:Netty服务端初始化过程与反射在其中的应用分析 第55讲:Netty提供的Future与ChannelFuture优势分析与源码讲解 第56讲:Netty服务器地址绑定底层源码分析 第57讲:Reactor模式透彻理解及其在Netty中的应用...

    精通并发与 netty 视频教程(2018)视频教程

    52_NioEventLoopGroup源码分析与线程数设定 53_Netty对Executor的实现机制源码分析 54_Netty服务端初始化过程与反射在其中的应用分析 55_Netty提供的Future与ChannelFuture优势分析与源码讲解 56_Netty服务器地址...

    Java视频教程 Java游戏服务器端开发 Netty NIO AIO Mina视频教程

    02、第二课netty服务端 03、第三课netty客户端 04、第四课netty线程模型源码分析(一) 05、第五课netty线程模型源码分析(二) 06、第六课netty5案例学习 07、第七课netty学习之心跳 08、第八课protocol buff学习...

    netty-starter:Netty权威指南学习之旅

    Netty学习day01:同步阻塞式I/O源码分析服务端TimerServerTimerServer根据传入的参数设置监听端口,如果没有入参,使用默认值8080。启动后会发现主线程阻塞在ServerSocket的accept()方法上:可通过JConsole查看线程...

    基于javatcpsocket通信的拆包和装包源码-NettyTree:网状树

    基于java tcp socket通信的拆包和装包源码 NettyTree 搭建一个基于Netty的通信框架 NIO:非阻塞式IO ...有一个专门的NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求;   有一个NIO线程池,

    java开源包3

    SpeechLion 是一个语音识别程序,主要用来处理桌面命令,基于 Sphinx-4 语音识别引擎开发。用户可以通过该软件来控制 Linux 桌面,例如打开google搜索、鼠标点击、下一窗口、打开帮助、静音等操作。 Java发送短信包...

    JAVA上百实例源码以及开源项目源代码

     当用户发送第一次请求的时候,验证用户登录,创建一个该qq号和服务器端保持通讯连接得线程,启动该通讯线程,通讯完毕,关闭Scoket。  QQ客户端登录界面,中部有三个JPanel,有一个叫选项卡窗口管理。还可以更新...

    JAVA上百实例源码以及开源项目

     当用户发送第一次请求的时候,验证用户登录,创建一个该qq号和服务器端保持通讯连接得线程,启动该通讯线程,通讯完毕,关闭Scoket。  QQ客户端登录界面,中部有三个JPanel,有一个叫选项卡窗口管理。还可以更新...

    java开源包4

    SpeechLion 是一个语音识别程序,主要用来处理桌面命令,基于 Sphinx-4 语音识别引擎开发。用户可以通过该软件来控制 Linux 桌面,例如打开google搜索、鼠标点击、下一窗口、打开帮助、静音等操作。 Java发送短信包...

    java开源包8

    SpeechLion 是一个语音识别程序,主要用来处理桌面命令,基于 Sphinx-4 语音识别引擎开发。用户可以通过该软件来控制 Linux 桌面,例如打开google搜索、鼠标点击、下一窗口、打开帮助、静音等操作。 Java发送短信包...

    java开源包1

    SpeechLion 是一个语音识别程序,主要用来处理桌面命令,基于 Sphinx-4 语音识别引擎开发。用户可以通过该软件来控制 Linux 桌面,例如打开google搜索、鼠标点击、下一窗口、打开帮助、静音等操作。 Java发送短信包...

    java开源包10

    SpeechLion 是一个语音识别程序,主要用来处理桌面命令,基于 Sphinx-4 语音识别引擎开发。用户可以通过该软件来控制 Linux 桌面,例如打开google搜索、鼠标点击、下一窗口、打开帮助、静音等操作。 Java发送短信包...

    java开源包11

    SpeechLion 是一个语音识别程序,主要用来处理桌面命令,基于 Sphinx-4 语音识别引擎开发。用户可以通过该软件来控制 Linux 桌面,例如打开google搜索、鼠标点击、下一窗口、打开帮助、静音等操作。 Java发送短信包...

    java开源包2

    SpeechLion 是一个语音识别程序,主要用来处理桌面命令,基于 Sphinx-4 语音识别引擎开发。用户可以通过该软件来控制 Linux 桌面,例如打开google搜索、鼠标点击、下一窗口、打开帮助、静音等操作。 Java发送短信包...

    java开源包6

    SpeechLion 是一个语音识别程序,主要用来处理桌面命令,基于 Sphinx-4 语音识别引擎开发。用户可以通过该软件来控制 Linux 桌面,例如打开google搜索、鼠标点击、下一窗口、打开帮助、静音等操作。 Java发送短信包...

    java开源包5

    SpeechLion 是一个语音识别程序,主要用来处理桌面命令,基于 Sphinx-4 语音识别引擎开发。用户可以通过该软件来控制 Linux 桌面,例如打开google搜索、鼠标点击、下一窗口、打开帮助、静音等操作。 Java发送短信包...

Global site tag (gtag.js) - Google Analytics