RingBuffer实现定时器

看到一篇讲定时器的文章

很多时候,业务有定时任务或者定时超时的需求,当任务量很大时,可能需要维护大量的timer,或者进行低效的扫描。

例如:58到家APP实时消息通道系统,对每个用户会维护一个APP到服务器的TCP连接,用来实时收发消息,对这个TCP连接,有这样一个需求:“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。

其中,单机TCP同时在线量约在10w级别,keepalive请求包大概30s一次,吞吐量约在3000qps。

一般来说怎么实现这类需求呢?

环形队列法

1)30s超时,就创建一个index从0到30的环形队列(本质是个数组)

2)环上每一个slot是一个Set ,任务集合

3)同时还有一个Map

1)启动一个timer,每隔1s,在上述环形队列中移动一格,0->1->2->3…->29->30->0…

2)有一个Current Index指针来标识刚检测过的slot

  1. 当有某用户uid有请求包到达时:

1)从Map结构中,查找出这个uid存储在哪一个slot里

2)从这个slot的Set结构中,删除这个uid

3)将uid重新加入到新的slot中,具体是哪一个slot呢 =>Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到

(4)更新Map,这个uid对应slot的index值

  1. 哪些元素会被超时掉呢?

Current Index每秒种移动一个slot,这个slot对应的Set 中所有uid都应该被集体超时!如果最近30s有请求包来到,一定被放到Current Index的前一个slot了,Current Index所在的slot对应Set中所有元素,都是最近30s没有请求包来到的。

所以,当没有超时时,Current Index扫到的每一个slot的Set中应该都没有元素。

  1. 优势:

(1)只需要1个timer

(2)timer每1s只需要一次触发,消耗CPU很低

(3)批量超时,Current Index扫到的slot,Set中所有元素都应该被超时掉

disrupter中的ringbuffer

文章的最后还说Netty中的一个工具类一一HashedWheelTimer也是类似的原理,

这让我想到了之前看过的消息队列框架disruper也用到了ringbuffer。

python实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# -*- coding: utf-8 -*-
class RingBuffer(object):
def __init__(self, timeout):
self.timeout = timeout
self.slot_tasks = {}
"""
slot0: {id: value}
slot1: {id: value}
"""
self.init_slot_tasks()
self.task_slot_map = {}
# self.cursor = 1
# 觉得应该是从 0->timeout->0
self.cursor = 0
# 初始放slot的化环形队列(用dict代替数组实现)
# 每个slot都是一个set{id : 任务}
def init_slot_tasks(self):
# for i in range(1, self.timeout + 1):
for i in range(self.timeout + 1):
self.slot_tasks[i] = {}
# 为task和slot建立map
def set_task_slot(self, d, slot):
if self.task_slot_map.get(d, None):
del self.task_slot_map[d]
self.task_slot_map[d] = slot
def add_slot_task(self, k, ts):
slot = self.before_cursor
_dict = self.slot_tasks.get(slot, {})
_dict[k] = ts
return slot
def del_slot_task(self, k):
slot_index = self.task_slot_map.get(k)
_dict = self.slot_taks.get(slot_index, {})
if _dict.get(k):
del _dict[k]
def next(self):
if self.cursor == self.timeout:
self.cursor = 0
return self.cursor
self.cursor += 1
return self.cursor
# 只读属性
@property
def before_cursor(self):
if self.cursor == 0:
return self.timeout
return self.cursor - 1
@property
def now_cursor(self):
return self.cursor

多线程测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# -*- coding: utf-8 -*-
import time
import threading
from ringbuffertimer import RingBuffer
def excute(rb):
import copy
while True:
print "tip"
time.sleep(1)
cur = rb.next()
res = rb.slot_tasks.get(cur)
res = copy.deepcopy(res)
rb.slot_tasks[cur] = {}
print res
def feed(rb):
time.sleep(1)
for i in range(100000):
ts = int(time.time())
rb.del_slot_task(i)
slot = rb.add_slot_task(i, ts)
rb.set_task_slot(i, slot)
time.sleep(0.1)
def main():
dis = []
rb = RingBuffer(30)
thread_1 = threading.Thread(target=excute, args=(rb,))
thread_1.setDaemon(True)
thread_1.start()
thread_2 = threading.Thread(target=feed, args=(rb,))
thread_2.setDaemon(True)
thread_2.start()
dis.append(thread_1)
dis.append(thread_2)
for t in dis:
t.join()
if __name__ == "__main__":
main()

参考

峰云就她了的BLOG