python技巧全面总结

闲来无事,总结一下常用到的python技巧

目录

  • 数据结构
  • 迭代协议
  • 字符串
  • I/O
  • 数据处理
  • 类和对象
  • 并发编程
  • 装饰器

数据结构

2.1 Pythonic 筛选数据

  1. [for … if …]

  2. Filiter(lambda)

2.2 Dict命名

  1. NAME, AGE, SEX, EMAIL = 0, 1, 2, 3
    Student[NAME]
    Student[AGE]

  2. from collections import namedtuple
    Student = namedtuple(‘Student’, [‘name’, ‘age’, ‘sex’, ‘email’])
    s = Student(‘jim’, 16, ‘male’, ‘djdiji@qq.com’)

2.3 统计Dict Counter

from collections import Counter
c2 = Counter(data)
c2.most_common(3)

2.4 Dict 按照value排序

{‘a’: 93, ‘c’: 89, ‘b’: 70, ‘y’: 88, ‘x’: 94, ‘z’: 78}

  1. zz = zip(d.itervalues(), d.iterkeys())

  2. sorted(d.items(), key = lambda x: x[1])

2.5 Dict公共键

  1. s1.viewkeys() & s2.viewkeys()
    set([‘s’])

  2. map(dict.viewkeys, [s1, s2, s3])
    [dict_keys([‘a’, ‘s’, ‘b’, ‘c’, ‘d’]), dict_keys([‘s’, ‘e’, ‘g’, ‘f’]), dict_keys([‘a’, ‘c’, ‘s’, ‘g’, ‘f’])]
    reduce(lambda a, b:a & b, map(dict.viewkeys, [s1, s2, s3]))
    set([‘s’])

2.6 Dict保持有序(append)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from time import time
from random import randint
from collections import OrderedDict
d = OrderedDict()
players = list('ABCDEFGH')
start = time()
for i in xrange(8):
raw_input()
p = players.pop(randint(0, 7-i))
end = time()
print i + 1, p, end - start, d[p] = (i + 1, end - start)
print
print '-' * 20

2.7 实现用户历史记录

猜数字小游戏,实现一个容量为n的历史记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from random import randint
from collections import deque
N = randint(0, 100)
history = deque([], 5)
def guess(k):
if k == N:
print 'Right'
return True
if k < N:
print '%s is less than N' % k
else:
print '%s is greater than N' % k
return False
while 1:
line = raw_input("input a number")

colletions deque + pickle(储存或者load一个python对象到文件中)

三、迭代协议

3.1 实现可迭代对象

3.2 实现迭代器对象

实现一个迭代器对象WeatherItertor, next方法每次返回一个城市气温
实现一个迭代器对象WeatherIterable, __iter__ 方法每次返回一个迭代器对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# -*- coding: utf-8 -*-
import requests
from collections import Iterable, Iterator
# 气温迭代器
class WeatherIterator(Iterator):
def __init__(self, cities):
self.cities = cities
self.index = 0
def getWeather(self, city):
r = requests.get(u'http://wthrcdn.etouch.cn/weather_mini?city=' + city)
data = r.json()['data']['forecast'][0]
return '%s : %s, %s' % (city, data['low'], data['high'])
def next(self):
if self.index == len(self.cities):
raise StopIteration
city = self.cities[self.index]
self.index += 1
return self.getWeather(city)
class WeatherIterable(Iterable):
def __init__(self, cities):
self.cities = cities
def __iter__(self):
return WeatherIterator(self.cities)
if __name__ == '__main__':
for x in WeatherIterable([u'北京', u'上海', u'广州', u'长春']):
print x

3.3 如何用生成器函数实现迭代器对象

实现一个可迭代器类,打印范围内的所有素数

将该类的 __iter__ 方法实现成生成器函数,每次yield返回一个素数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class PrimeNumbers:
def __init__(self, start, end):
self.start = start
self.end = end
def isPrimeNum(self, k):
if k < 2:
return False
for i in xrange(2, k):
if k % i == 0:
return False
return True
def __iter__(self):
for k in xrange(self.start, self.end+1):
if self.isPrimeNum(k):
yield k
if __name__ == '__main__':
for x in PrimeNumbers(1, 100):
print x

3.4 如何进行反向迭代和如何实现反向迭代

实现一个连续的浮点数发生器FloatRange(类似于xrange)
for x in reversed(l):
print x

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# iter(l) 正向
# reversed(l) 反向
class FloatRnage:
def __init__(self, start, end, step):
self.start = start
self.end = end
self.step = step
def __iter__(self):
t = self.start
while t <= self.end:
yield t
t += self.step
def __reversed__(self):
t = self.end
while t >= self.start:
yield t
t -= self.step
if __name__ == '__main__':
for x in FloatRnage(1.0, 4.0, 0.5):
print x
for x in reversed(FloatRnage(1.0, 4.0, 0.5)):
print x

3.5 如何对迭代器做切片操作

使用itertools中的islice(注意它会消耗迭代器对象)

3.6 如何在一个for语句中迭代多个可迭代对象

  1. 并行 -> 同时迭代多个各科成绩列表,统计学生成绩总分 -> 使用zip(之前实现dict按value排序)

  2. 串行 -> 迭代多个英语成绩列表,统计分数大于90分的人数 -> 使用itertools中的chain连接多个可迭代对象

四、字符串

4.1 如何拆分含有多种分隔符的字符串

  1. 多次使用str.split()

  2. 使用正则表达式re.split() -> re.split(r’’, str)

4.2 如何判断字符串a是否以字符串b开头或者结尾

str.startswith() str.endswith() (!函数参数只能是元组)

4.3 如何调整字符串中文本的格式

yyyy—mm-dd -> mm/dd/yyyy

使用正则表达式re.sub()替换

  1. re.sub(‘(\d{4})-(\d{2})-(\d{2})’, ‘\2/\3/\1’, a.txt)

  2. re.sub(‘(?P\d{4})-(?P\d{2})-(?P\d{2})’, ‘?g//’, a.txt)

4.4 如何将多个小字符串拼接成一个大的字符串

UDP分包

  1. ‘+’ 拼接 (运算符重载 __add__)

  2. ‘’.join([str(x) for x in l])
    使用生成器表达式减少开销:
    ‘’.join(str(x) for x in l)

4.5 如何对字符串进行左,右,居中对齐

python3 -> print(‘{0:^}’).fromat(s)

python2 ->

  1. str.ljust(), str.rjust(), str.center()

  2. format ‘<20’, ‘="">20’, ‘^20’

4.6 如何去掉字符串中不需要的字符

  1. 使用strip() lstrip() rstrip() 去掉两端字符

  2. 使用切片 + 拼接的方式

  3. 使用str.replace() 或者正则表达式re.sub()

  4. 使用str.translate(table, [, deletechars]) -> string (一个字符映射到另一个字符)

字符映射:
import string
s = ‘abc1230323xyz’
string.maketrans(‘abcxyz’, ‘xyzabc’) (xyz映射成abc, abc映射成xyz)
s.translate(string.maketrans(‘abcxyz’, ‘xyzabc’))

删除字符:
s.translate(None, ‘\r\t\n’)

五、文件操作

5.1 如何读写文本文件

python2和python3中字符串的差异
str -> bytes
unicode -> str

python2 -> 写入文件前对Unicode编码,读入文件后对二进制字符串解码
python3 -> open函数指定’t’的文本模式,endcoding指定编码格式

5.2 如何处理二进制文件

分析一个WVA音频文件的头部信息
open()指定参数为’b’
使用readinto()读入准备好的buffer中
struct模块的 unpack()解析二进制文件
array模块的 array()

5.3 如何设置文件的缓冲

全缓冲 -> open() 的 buffering参数设置为 > 1
行缓冲 -> open() 的 buffering参数设置为1
无缓冲 -> open() 的 buffering参数设置为0

5.4 如何将文件映射到内存

使用mmap模块中的 mmap(),传递一个fd
mmap.mmap(f.fileno(), 0, access = mmap.ACCESS_WRITE, offset = mmap.PAGESIZE * 4) (跳过4个页面)

5.5 如何访问一个文件状态

文件的类型?
文件的访问权限?
文件的三个时间: 文件的最后访问,修改,节点状态更改(chmod)?
文件的字节数?

1.
系统调用 os模块中 stat fstat(like stat, but for open fd) lstat(donnot follow sybolic links)

  1. 快捷函数 os.path

5.6 如何使用临时文件

tempfile模块中的 TemporaryFile NamedTemporaryFile

5.7 如何读写csv文件数据

csv模块中 reader writer

5.8 如何读写json文件数据

json模块中 loads() dumps() load(文件) dump(文件)

5.9 如何解析简单的xml文档

使用标准库的xml.etree.ElementTree 中的 parse

5.10 如何构建xml文档

使用标准库的xml.etree.ElementTree 中的 write

5.11 如何读写excel文件

使用第三方库 xlrd 读,xlwt 写

六、类和对象

6.1 如何派生不可变类型,并修改其实例化行为

新类型的元组,只保留>0的int
IntTuple([1, -1, ‘abc’, 6, [‘x’, ‘y’], 3]) -> (1, 6, 3)

定义类IntTuple继承内置tuple,并实现new,修改其实例化行为

1
2
3
4
5
6
7
8
9
10
11
class IntTuple(tuple):
def __new__(cls, iterable):
g = (for x in iterable if isinstance(x, int) and x > 0)
return super(IntTuple, cls).__new__(cls, g)
def __init__(self, iterable):
super(IntTuple, self).__init__(iterable)
t = IntTuple([1, -1, 'abc', 6, ['x', 'y'], 3])
print t

6.2 如何为创建大量实例节省内存

网络游戏的player实例

定义类的 __slots__ 属性,它是用来声明实例属性名字的列表

6.3 如何让对象支持上下文管理

with open…

让TelnetClient的实例支持上下文管理协议,不必调用cleanup()(关闭socket并把历史记录写入文件)而是自动清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from telnetlib import telnetlib
from sys import stdin, stdout
from collections import deque
class TelnetClient(object):
def __init__(self, addr, port=23):
self.addr = addr
self.port = port
self.tn = None
def start(self):
self.tn = Telnet(self.addr, self.port)
self.history = deque()
# user
t = self.tn.read_until('login: ')
stdout.write(t)
user = stdin.readline()
self.tn.write(user)
# password
t = self.tn.read_until('Password: ')
if t.startswith(user[:-1]):
t = t[len(user) + 1:]
stdout.write(t)
self.tn.write(stdin.readline())
t = self.tn.read_until('$ ')
stdout.write(t)
while 1:
uinput = stdin.readline()
if not uinput:
break
self.history.append(uinput)
self.tn.write(uinput)
t = self.tn.read_until('$ ')
stdout.write(t[len(uinput) + 1:])
def cleanup(self):
self.tn.close()
self.tn = None
with open(self.addr + '_history.txt', 'w') as f:
f.writelines(self.history)
if __name__ == '__main__':
client = TelnetClient('127.0.0.1')
print '\nstart...'
client.start()
print '\ncleanup'
client.cleanup()


实现上下文管理协议,需要在open()的开始前和结束后定义 __enter__, __exit__ 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from telnetlib import telnetlib
from sys import stdin, stdout
from collections import deque
class TelnetClient(object):
def __init__(self, addr, port=23):
self.addr = addr
self.port = port
self.tn = None
def start(self):
# user
t = self.tn.read_until('login: ')
stdout.write(t)
user = stdin.readline()
self.tn.write(user)
# password
t = self.tn.read_until('Password: ')
if t.startswith(user[:-1]):
t = t[len(user) + 1:]
stdout.write(t)
self.tn.write(stdin.readline())
t = self.tn.read_until('$ ')
stdout.write(t)
while 1:
uinput = stdin.readline()
if not uinput:
break
self.history.append(uinput)
self.tn.write(uinput)
t = self.tn.read_until('$ ')
stdout.write(t[len(uinput) + 1:])
def __enter__(self):
self.tn = Telnet(self.addr, self.port)
self.history = deque()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.tn.close()
self.tn = None
with open(self.addr + '_history.txt', 'w') as f:
f.writelines(self.history)
if __name__ == '__main__':
with TelnetClient('127.0.0.1') as client:
client.start()

6.4 如何创建可管理的对象属性

形式上是访问属性,其实是调用方法,避免使用set get
使用property属性去创建可管理属性,fset fget fdel 对应相应属性访问

  1. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    class Mat(object):
    """动态读,class 内部只存 temp 的值,每次取 pro 的时候实时计算"""
    def __init__(self, t):
    self.tmp = t
    @property
    def pro(self):
    return self.tmp * self.tmp
    a = Mat(5)
    print (a.tmp, a.pro)
    a.tmp = 10
    print (a.tmp, a.pro)
    # 不加属性
    # 5 <bound method Mat.pro of <__main__.Mat object at 0x1090144a8>>
    # 10 <bound method Mat.pro of <__main__.Mat object at 0x1090144a8>>
    # 加了之后
    # 5 25
    # 10 100

  1. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    class Mat2(object):
    """两个都存,写入的时候两个都改"""
    def __init__(self, t):
    self.tmp = t
    @property
    def tmp(self):
    return self._tmp
    @tmp.setter
    def tmp(self, v):
    self._tmp = v
    self.pro = self.Pro(v)
    def Pro(self, t):
    return t * t
    a = Mat2(5)
    print (a.tmp, a.pro)
    a.tmp = 10
    print (a.tmp, a.pro)
    # 5 25
    # 10 100

6.5 如何让类支持比较操作

  1. 比较运算符重载,即实现
    ``__le__``, ``__gt__``, ``__ge__``, ``__eq__``, ``__ne__``
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    2.
    使用标准库functools的类装饰器total_ordering可以简化此进程
    ```python
    # -*- coding: utf-8 -*-
    from functools import total_ordering
    from abc import ABCMeta, abstractmethod
    # 抽象基类
    @total_ordering
    class Shape(object):
    @abstractmethod
    def area(self):
    pass
    # 只用实现等于和小于
    def __eq__(self, obj):
    if not isinstance(obj, Shape):
    raise TypeError('obj is not Shape')
    return self.area() == obj.area()
    def __lt__(self, obj):
    if not isinstance(obj, Shape):
    raise TypeError('obj is not Shape')
    return self.area() < obj.area()
    @total_ordering
    class Rectangle(Shape):
    def __init__(self, w, h):
    self.w = w
    self.h = h
    def area(self):
    return self.w * self.h
    class Circle(Shape):
    def __init__(self, r):
    self.r = r
    def area(self):
    return self.r ** 2 * 3.14

6.6 如何使用描述符对实例属性进行检查

类似C C++ 对实例属性进行检查 age -> int name -> string
对要检查的实例属性实现 __get__ __set__ __delete__ 并在 __set__ 中进行 isinstance()检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Attr(object):
def __init__(self, name, type_):
self.name = name
self.type_ = type_
def __get__(self, instance, cls):
return instance.__dict__[self.name]
def __set__(self, instance, value):
if not isinstance(value, self.type_):
raise TypeError('expected an %s' % self.type_)
instance.__dict__[self.name] = value
def __delete__(self, instance):
raise AttributeError("can't delete this attr")
class Person(object):
name = Attr('name', str)
age = Attr('age', int)
height = Attr('height', float)
weight = Attr('weight', float)
s = Person()
s.name = 'Bob'
s.age = 17
s.height = 1.82
s.weight = 52.5

6.7 如何在环状数据结构中管理内存

环形数据结构存在循环引用:父节点引用子节点,子节点也引用父节点,同时del两个节点,两个对象不能立即被回收

使用标准库weakref,创建一种能访问对象但是不增加引用计数的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import weakref
class Data(object):
def __init__(self, value, owner):
self.owner = weakref.ref(owner)
self.value = value
def __str__(self):
return "%s's data, value is %s" % (self.owner(), self.value)
def __del__(self):
print 'in Data.__del__'
class Node(object):
def __init__(self, value):
self.data = Data(value, self)
def __del__(self):
print 'in Node.__del__'
node = Node(100)
del node
raw_input('wait...')

6.8 如何通过实例化方法名字的字符串调用方法

  1. 使用内置函数getattr(),通过名字在实例上获取方法对象

  2. 使用标准库operator下的methodcaller()函数调用

七、并发编程

7.1 如何使用多线程

使用标准库中的Threading.Thread创建线程

7.2 如何线程间通信

由于全局锁的存在,多线程进行CPU密集型操作并不能提高效率
所以可以采用多线程进行I/O操作 Download
单线程进行CPU密集操作 Convert
使用标准库Queue.Queue,它是一个线程安全队列,
Download线程把下载数据放入队列,Convert线程从队列里提取数据

7.3 如何在线程间进行事件通知

线程间的时间通知,可以使用标准库Threading.Ecent:

1.等待事件一端调用wait,等待事件
2.通知时间一端调用set,通知事件

7.4 如何使用线程本地数据

web监控服务器,每个线程连接一个客户端,支持多客户端

threading.local函数可以创建线程本地数据空间,其下属性对每个线程独立存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os, cv2, time, struct, threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import TCPServer, ThreadingTCPServer
from threading import Thread, RLock
from select import select
class JpegStreamer(Thread):
def __init__(self, camera):
Thread.__init__(self)
self.cap = cv2.VideoCapture(camera)
self.lock = RLock()
self.pipes = {}
def register(self):
pr, pw = os.pipe()
self.lock.acquire()
self.pipes[pr] = pw
self.lock.release()
return pr
def unregister(self, pr):
self.lock.acquire()
pw = self.pipes.pop(pr)
self.lock.release()
os.close(pr)
os.close(pw)
def capture(self):
cap = self.cap
while cap.isOpened():
ret, frame = cap.read()
if ret:
#ret, data = cv2.imencode('.jpg', frame)
ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
yield data.tostring()
def send(self, frame):
n = struct.pack('l', len(frame))
self.lock.acquire()
if len(self.pipes):
_, pipes, _ = select([], iter(self.pipes.values()), [], 1)
for pipe in pipes:
os.write(pipe, n)
os.write(pipe, frame)
self.lock.release()
def run(self):
for frame in self.capture():
self.send(frame)
class JpegRetriever(object):
def __init__(self, streamer):
self.streamer = streamer
self.local = threading.local()
def retrieve(self):
while True:
ns = os.read(self.local.pipe, 8)
n = struct.unpack('l', ns)[0]
data = os.read(self.local.pipe, n)
yield data
def __enter__(self):
if hasattr(self.local, 'pipe'):
raise RuntimeError()
self.local.pipe = streamer.register()
return self.retrieve()
def __exit__(self, *args):
self.streamer.unregister(self.local.pipe)
del self.local.pipe
return True
class Handler(BaseHTTPRequestHandler):
retriever = None
@staticmethod
def setJpegRetriever(retriever):
Handler.retriever = retriever
def do_GET(self):
if self.retriever is None:
raise RuntimeError('no retriver')
if self.path != '/':
return
self.send_response(200)
self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=abcde')
self.end_headers()
with self.retriever as frames:
for frame in frames:
self.send_frame(frame)
def send_frame(self, frame):
s = '--abcde\r\n'
s += 'Content-Type: image/jpeg\r\n'
s += 'Content-Length: %s\r\n\r\n' % len(frame)
self.wfile.write(s.encode('ascii'))
self.wfile.write(frame)
if __name__ == '__main__':
streamer = JpegStreamer(0)
streamer.start()
retriever = JpegRetriever(streamer)
Handler.setJpegRetriever(retriever)
print('Start server...')
httpd = ThreadingTCPServer(('', 9000), Handler)
httpd.serve_forever()

7.5 如何实现线程池

python3中有线程池实现
使用标准库中concurrent.futures下的ThreadPoolExecutor对象的submit和map方法可以用来启动线程池中线程执行任务

使用线程池代替7.4中例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import os, cv2, time, struct, threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import TCPServer, ThreadingTCPServer
from threading import Thread, RLock
from select import select
class JpegStreamer(Thread):
def __init__(self, camera):
Thread.__init__(self)
self.cap = cv2.VideoCapture(camera)
self.lock = RLock()
self.pipes = {}
def register(self):
pr, pw = os.pipe()
self.lock.acquire()
self.pipes[pr] = pw
self.lock.release()
return pr
def unregister(self, pr):
self.lock.acquire()
pw = self.pipes.pop(pr)
self.lock.release()
os.close(pr)
os.close(pw)
def capture(self):
cap = self.cap
while cap.isOpened():
ret, frame = cap.read()
if ret:
#ret, data = cv2.imencode('.jpg', frame)
ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
yield data.tostring()
def send(self, frame):
n = struct.pack('l', len(frame))
self.lock.acquire()
if len(self.pipes):
_, pipes, _ = select([], iter(self.pipes.values()), [], 1)
for pipe in pipes:
os.write(pipe, n)
os.write(pipe, frame)
self.lock.release()
def run(self):
for frame in self.capture():
self.send(frame)
class JpegRetriever(object):
def __init__(self, streamer):
self.streamer = streamer
self.local = threading.local()
def retrieve(self):
while True:
ns = os.read(self.local.pipe, 8)
n = struct.unpack('l', ns)[0]
data = os.read(self.local.pipe, n)
yield data
def __enter__(self):
if hasattr(self.local, 'pipe'):
raise RuntimeError()
self.local.pipe = streamer.register()
return self.retrieve()
def __exit__(self, *args):
self.streamer.unregister(self.local.pipe)
del self.local.pipe
return True
class Handler(BaseHTTPRequestHandler):
retriever = None
@staticmethod
def setJpegRetriever(retriever):
Handler.retriever = retriever
def do_GET(self):
if self.retriever is None:
raise RuntimeError('no retriver')
if self.path != '/':
return
self.send_response(200)
self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=abcde')
self.end_headers()
with self.retriever as frames:
for frame in frames:
self.send_frame(frame)
def send_frame(self, frame):
s = '--abcde\r\n'
s += 'Content-Type: image/jpeg\r\n'
s += 'Content-Length: %s\r\n\r\n' % len(frame)
self.wfile.write(s.encode('ascii'))
self.wfile.write(frame)
from concurrent.futures import ThreadPoolExecutor
class ThreadingPoolTCPServer(ThreadingTCPServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, max_thread_num=100):
super().__init__(server_address, RequestHandlerClass, bind_and_activate)
self.executor = ThreadPoolExecutor(max_thread_num)
def process_request(self, request, client_address):
self.executor.submit(self.process_request_thread, request, client_address)
if __name__ == '__main__':
streamer = JpegStreamer(0)
streamer.start()
retriever = JpegRetriever(streamer)
Handler.setJpegRetriever(retriever)
print('Start server...')
httpd = ThreadingPoolTCPServer(('', 9000), Handler, max_thread_num=3)
httpd.serve_forever()

7.6 如何使用多进程

由于GIL,处理CPU密集型,推荐使用多进程

使用标准库multiprocessing.Process,它可以启动子进程执行任务,操作接口,进程间通信,进程间同步等都与Threading.Thread类似

八、装饰器

8.1 如何使用函数装饰器

定义装饰器函数,用它来生成一个在原函数基础添加了新功能的函数,替代原函数

8.2 如何为被装饰的函数保存元数据

在函数中保存着一些元数据,如
f.__name__ : 函数名字
f.__doc__ : 函数文档字符串
f.__moudle__ : 函数所属模块
f.__dict__ : 属性字典
f.__defaults__: 默认参数元组

我们在使用装饰器后,再访问以上属性时,看到的是内部包裹函数的元数据,原来函数的元数据便丢失掉了,如何解决呢?

使用标准库functools中的装饰器wraps装饰内部包裹函数,可以制定将原函数的某些属性,更新到包裹函数上面

8.3 如何实现带参数的装饰器

实现一个检查函数参数类型的装饰器

python3 提取函数签名
inspect.signature()
带参数的装饰器,也就是根据参数定制化一个装饰器。可以看成生产装饰器的工厂,每次调用typeassert,返回一个特定的装饰器,然后用它去修饰其他函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from inspect import signature
def typeassert(*ty_args, **ty_kargs):
def decorator(func):
# func -> a, b
# d = {'a': int, 'b': str}
sig = signature(func)
btypes = sig.bind_partial(*ty_args, **ty_kargs).arguments
def wrapper(*args, **kargs):
# arg in d, instance(arg, d[arg])
for name, obj in sig.bind(*args, **kargs).arguments.items():
if name in btypes:
if not isinstance(obj, btypes):
raise TypeError('"%s" must be "%s"' % (name, btypes[name]))
return func(*args, **kargs)
return wrapper
return decorator
@typeassert(int, str, list)
def f(a, b, c):
print (a, b, c)
f(1, 'abc', [1, 2, 3])
f(1, 2, [1, 2, 3])

8.4 如何实现属性可修改的函数装饰器

为分析程序内哪些函数开销比较大,我们定义一个带timeout参数的函数装饰器:
1.统计被装饰函数单次调用运行时间
2.时间大于timeout的,将此次函数调用记录到log中
3.运行时可修改timeout的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from functools import wraps
import time
import logging
def warn(timeout):
timeout = [timeout]
def decorator(func):
def wrapper(*args, **kargs):
start = time.time()
res = func(*args, **kargs)
used = time.time() - start
if used > timeout[0]:
msg = '"%s": %s > %s' % (func.__name__, used, timeout[0])
logging.warn(msg)
return res
def setTimeout(k):
# nonlocal timeout # python3
# timeout = k
timeout[0] = k
wrapper.setTimeout = setTimeout
return wrapper
return decorator
from random import randint
@warn(1.5)
def test():
print ('In test')
while randint(0, 1):
time.sleep(0.5)
for _ in range(30):
test()
test.setTimeout(1)
for _ in range(30):
test()