很多时候,业务有定时任务或者定时超时的需求,当任务量很大时,可能需要维护大量的timer,或者进行低效的扫描。
例如:58到家APP实时消息通道系统,对每个用户会维护一个APP到服务器的TCP连接,用来实时收发消息,对这个TCP连接,有这样一个需求:“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。
其中,单机TCP同时在线量约在10w级别,keepalive请求包大概30s一次,吞吐量约在3000qps。
一般来说怎么实现这类需求呢?
环形队列法
1)30s超时,就创建一个index从0到30的环形队列(本质是个数组)
2)环上每一个slot是一个Set ,任务集合
3)同时还有一个Map
1)启动一个timer,每隔1s,在上述环形队列中移动一格,0->1->2->3…->29->30->0…
2)有一个Current Index指针来标识刚检测过的slot
- 当有某用户uid有请求包到达时:
1)从Map结构中,查找出这个uid存储在哪一个slot里
2)从这个slot的Set结构中,删除这个uid
3)将uid重新加入到新的slot中,具体是哪一个slot呢 =>Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到
(4)更新Map,这个uid对应slot的index值
- 哪些元素会被超时掉呢?
Current Index每秒种移动一个slot,这个slot对应的Set 中所有uid都应该被集体超时!如果最近30s有请求包来到,一定被放到Current Index的前一个slot了,Current Index所在的slot对应Set中所有元素,都是最近30s没有请求包来到的。
所以,当没有超时时,Current Index扫到的每一个slot的Set中应该都没有元素。
- 优势:
(1)只需要1个timer
(2)timer每1s只需要一次触发,消耗CPU很低
(3)批量超时,Current Index扫到的slot,Set中所有元素都应该被超时掉
disrupter中的ringbuffer
文章的最后还说Netty中的一个工具类一一HashedWheelTimer也是类似的原理,
这让我想到了之前看过的消息队列框架disruper也用到了ringbuffer。
python实现
|
|
多线程测试:
|
|