`
海浪儿
  • 浏览: 271487 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

支持连接池的netty client核心功能实现剖析

阅读更多

支持连接池的netty client核心功能实现剖析

 

本文为原创,转载请注明出处 

源码地址:

 https://github.com/zhangxianwu/light-netty-client

 

1、连接池

    由于TCP连接的建立和关闭分别会经历三次握手和四次挥手,而三次握手和四次挥手都是系统开销很大的操作。如果每次一个新的请求发起时,都为其新建一个连接,在请求处理完毕后,再将这个新的连接关闭,这样处理的代价是高昂的,尤其是在请求本身的处理逻辑比较简单时,那么新建和关闭连接的开销在整个请求处理中占的比例就会越大。

    因此,需要采用连接池,将连接缓存起来,以便后续的复用。

 

    一个TCP连接可用一个四元组标示(源IP、源端口、目的IP、目的端口),当必须为一个新的请求建立连接后,服务端处理完该请求并通过该连接发送响应,客户端接收到响应后,将该连接还回到池中。

 

1.1 连接的存储

    池采用ConcurrentHashMap<String, LinkedBlockingQueue<Channel>>存储连接:

    Key

    目的IP + 目的端口(对于一个固定的客户端来说,它所处的源IP和源端口是不变的);

    value

    如果只为每个目的IP和目的端口缓存一个连接,那么在高并发的场景或者请求本身处理比较耗时的情况下,请求获取连接的延时会比较严重,因此在当前请求从池里获取连接超时后,需要根据实际的需要新建连接,并将新建的连接存储下来,所以采用LinkedBlockingQueue<Channel>实现多个连接的存储。

    然而每个连接的存储会有一定的内存开销,所以在高并发的场景下,不能无限制的创建和存储连接,需要做最大数量的限制。如果能够从连接创建的地方做到最大数量限制,那么最终缓存的连接数量也就实现了最大数量的限制(最大连接数的控制在后面分析)。

 

1.2 连接的入池和出池

    a)连接何时入池:

    往pipeline添加一个handler(取名为NettyChannelPoolHandler),并继承SimpleChannelInboundHandler,实现channelRead0方法,当服务端返回响应,并被客户端decode后,会发出channelReadinbound事件,该事件的处理会经过channelRead0方法,在该方法中,如果发现decode后的msgHttpContent类型,且响应头中不包含“Connection: close”,则在通知调用方响应结果已接受后,将该连接返回到池中。

    说明:如果服务端tomcat设置了maxKeepAliveRequests参数,则一个keep-alive连接处理的请求数达到这个配置后,在该连接处理的最后一个请求的响应头中,就会设置“Connection: close”,表示连接会被关闭。所以这个连接不需要放回到池中。

    b)连接何时出池:

    1、新请求从池中获取连接

    2、连接被关闭

 

1.3 最大连接数的控制

    采用ConcurrentHashMap<String, Semaphore>记录当前每个目的IP和目的端口组合能够新建的连接数量。

    a)信号量的初始化:

    客户端初始化时可以指定每个目的IP和目的端口组合的最大允许存活的连接数量。如果未指定,则设置为默认值,譬如200,表示对于某个目的IP和目的端口组合,可以同时允许最大存活200个连接。

    b)信号量的减少:

    每次新建连接前,基于信号量做tryAquire操作,如果tryAquire成功,则再执行新建连接操作。

    c)信号量的恢复:

    新建连接失败或连接被关闭,则需要基于信号量的tryRelease操作进行恢复。

    在netty中,connect操作会返回channelFuture,为其添加listener:如果futureisSuccess返回false,则说明新建连接失败,需要恢复信号量;如果isSuccess返回true,则说明新建连接成功,此时为新建channelcloseFuture添加listener,执行信号量恢复操作。

    在新建连接成功后,会执行发送请求操作,即调用channelwriteAndFlush操作,该操作也会返回一个future,需要为该future添加CLOSE_ON_FAILUREnetty提供)这个listener

 

1.4 空闲连接处理:

    在业务低峰期,池中的连接大部分处于空闲状态,是一种浪费,因此需要对空闲连接进行清理。

    Netty提供了IdleStateHandler,通过指定允许的最大空闲时间,当某个连接空闲时间超过这个值后,会发出userEventTriggeredInbound事件,在NettyChannelPoolHandler中捕获该事件,如果发现事件的类型是IdleStateEvent,则调用channel.close()方法关闭连接,这样,在之前为该连接添加的listener就会收到close事件,然后将连接出池,并恢复控制最大连接数的信号量。

 

1.5 连接的获取策略:

    基于以下先后顺序获取连接:

    策略1:首先从池中获取连接(调用LinkedBlockingQueue.pool(),不等待,获取不到立即返回null),如果获取不到连接,则进入第二种策略

    策略2:创建新连接,如果信号量已用完或者创建连接失败,则进入第三种策略

    策略3:再次从池中获取连接(调用LinkedBlockingQueue.(long timeout, TimeUnit unit),等待timeoute时间后,如果获取不到连接,则返回null)。如果此时还是获取不到连接,则抛出获取连接失败的异常。

    当并发程度很大或者服务端处理请求比较耗时,如果信号量的初始值设置的比较小,则会导致部分请求获取连接有一定的延时,甚至会获取连接超时。此时,可以采用以下措施之一:

    a)增大信号量的初始值

    b)在策略2中,由调用方指定是否在信号量已用完的情况下,强制创建新连接。注意对于强制创建的连接,不需要执行信号量的acquirerelease操作,也不需要进行入池和出池操作。那么如何区分一个连接是正常创建的还是强制创建的呢?基于netty,可以通过channelattr(AttributeKey<T> key)方法进行标示。

 

2、如何通知调用方响应结果已收到

   在netty中,一切都是异步的,那么调用方通过客户端发起请求后,如何得知请求已处理完毕,响应结果已返回?

   在每次获取连接前,首先新建一个自定义的NettyResponseFuture,在获取连接后,将该future通过channelattr(AttributeKey<T> key)方法添加到channel中,然后在发送请求后,将该NettyResponseFuture立即返回给调用方。NettyResponseFuture中包含以下属性和方法:

private final CountDownLatch              latch       = new CountDownLatch(1);
    private volatile boolean                  isDone      = false;
    private volatile boolean                  isCancel    = false;
    private final AtomicBoolean               isProcessed = new AtomicBoolean(false);
    private volatile NettyHttpResponseBuilder responseBuilder;
    private volatile Channel                  channel;
    public boolean cancel(Throwable cause) {
        if (isProcessed.getAndSet(true)) {
            return false;
        }
        responseBuilder = new NettyHttpResponseBuilder();
        responseBuilder.setSuccess(false);
        responseBuilder.setCause(cause);
        isCancel = true;
        latch.countDown();
        return true;
    }

    public NettyHttpResponse get() throws InterruptedException, ExecutionException {
        latch.await();
        return responseBuilder.build();
    }

    public NettyHttpResponse get(long timeout, TimeUnit unit) throws TimeoutException,
                                                             InterruptedException {
        if (!latch.await(timeout, unit)) {
            throw new TimeoutException();
        }
        return responseBuilder.build();
    }

    public boolean done() {
        if (isProcessed.getAndSet(true)) {
            return false;
        }
        isDone = true;
        latch.countDown();
        return true;
    }

    通过get方法获取返回结果,在响应未返回时,latch.await()会一直阻塞。而latch.countdown只有在canceldone方法中被调用。那什么时候会调用canceldone方法呢呢?三个时机:

    1) connect失败,则connect返回的future中注册的listener会调用cancel方法;

    2) channel被关闭,则channel对应的的closefuture中注册的Listner会调用cancel方法;

    3) 服务端正常返回响应时,NettyChannelPoolHandlerchannelRead0方法会调用done方法

    说明:

    a)由于isProcessed可能会被nettyio线程和外部线程并发修改,因此采用atomicBooleancas操作进行修改

    b)由于isDoneisCancel只会被一个线程修改,因此不需要采用AtomicBoolean类型。但这两个属性会被其他线程访问,因此需要定义为volatile,保证线程间的可见性

对阿里电商交易平台感兴趣的,欢迎投简历至 xw_znwpu@163.com

分享到:
评论
4 楼 海浪儿 2016-07-11  
lisfeye_123456 写道
楼主的思路非常好,但是ConcurrentHashMap<String, LinkedBlockingQueue<Channel>>连接池里的Channel不能重用。当下次重用的时候,连接池中的Channel不能把请求发送的服务端,我怀疑连接池中的Channel已经被关闭,仔细阅读代码没有发现在客户端手动强制关闭Channel的操作,是不是服务端把已经建立好的Channel给强制关了?导致客户端所谓的长连接池只是一厢情愿的长连接,而服务端并不支持长连接。如果服务端要想支持长连接,使用Netty需要如何设置?望楼主帮忙解答一下,万分感谢!


首先如果服务端不支持长连接,那么客户端搞连接池确实没有意义
基于netty的服务端如何设置长连接,可以看下这篇文章里trustin的回复:
http://stackoverflow.com/questions/21358800/tcp-keep-alive-to-determine-if-client-disconnected-in-netty/21372593#21372593

Alternatively, you can enable SO_KEEPALIVE, but the keepalive interval of this option is usually OS-dependent and I would not recommend using it.

To help a user implement this sort of behavior relatively easily, Netty provides ReadTimeoutHandler. Configure your pipeline so that ReadTimeoutHandler raises an exception when there's no inbound traffic for a certain amount of time, and close the connection on the exception in your exceptionCaught() handler method.

目前light-netty-client里的逻辑是:如果连接被正常关闭了,或者空闲时间超过某个阈值都会将对应连接从连接池中剔除
3 楼 lisfeye_123456 2016-07-09  
楼主的思路非常好,但是ConcurrentHashMap<String, LinkedBlockingQueue<Channel>>连接池里的Channel不能重用。当下次重用的时候,连接池中的Channel不能把请求发送的服务端,我怀疑连接池中的Channel已经被关闭,仔细阅读代码没有发现在客户端手动强制关闭Channel的操作,是不是服务端把已经建立好的Channel给强制关了?导致客户端所谓的长连接池只是一厢情愿的长连接,而服务端并不支持长连接。如果服务端要想支持长连接,使用Netty需要如何设置?望楼主帮忙解答一下,万分感谢!
2 楼 lisfeye_123456 2016-07-09  
楼主的思路非常好,但是ConcurrentHashMap<String, LinkedBlockingQueue<Channel>>连接池里的Channel不能重用。当下次重用的时候,连接池中的Channel不能把请求发送的服务端,我怀疑连接池中的Channel已经被关闭,仔细阅读代码没有发现在客户端手动强制关闭Channel的操作,是不是服务端把已经建立好的Channel给强制关了?导致客户端所谓的长连接池只是一厢情愿的长连接,而服务端并支持长连接。如果服务端要想支持长连接,使用Netty需要如何设置?望楼主帮忙解答一下,万分感谢!
1 楼 0704681032 2015-02-22  
期望有朝一日能成为楼主这样的大牛~

相关推荐

    java开源包3

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包1

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包4

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包8

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包11

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包2

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包6

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包5

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包10

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包7

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包9

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    java开源包101

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    Java资源包01

    BoneCP 是一个高性能的开源java数据库连接池实现库。它的设计初衷就是为了提高数据库连接池的性能,根据某些测试数据发现,BoneCP是最快的连接池。BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加...

    JAVA上百实例源码以及开源项目源代码

    5个目标文件,演示Address EJB的实现,创建一个EJB测试客户端,得到名字上下文,查询jndi名,通过强制转型得到Home接口,getInitialContext()函数返回一个经过初始化的上下文,用client的getHome()函数调用Home接口...

    JAVA上百实例源码以及开源项目

    5个目标文件,演示Address EJB的实现,创建一个EJB测试客户端,得到名字上下文,查询jndi名,通过强制转型得到Home接口,getInitialContext()函数返回一个经过初始化的上下文,用client的getHome()函数调用Home接口...

Global site tag (gtag.js) - Google Analytics