python异步编程

学习实验楼课程《Python 异步编程》的总结

从线程到协程

yield 生成器

生成器终止时必定抛出 StopIteration 异常,for 循环可以捕获此异常,异常的 value 属性值为生成器函数的 return 值。

生成器还可以使用 next 方法迭代。生成器会在 yield 语句处暂停,这是至关重要的,未来协程中的 IO 阻塞就出现在这里。

1
2
3
4
5
6
7
8
9
10
11
from typing import Generator


def task_generator(task_num: int) -> Generator[str, None, None]:
for i in range(task_num):
yield 'task {}'.format(i)


if __name__ == '__main__':
for i in task_generator(10):
print(i, end=' ')

输出

1
task 0 task 1 task 2 task 3 task 4 task 5 task 6 task 7 task 8 task 9

协程的四种状态

  • GEN_CREATED 创建完成,等待执行
  • GEN_RUNNING 解释器正在执行(这个状态在下面的示例程序中无法看到)
  • GEN_SUSPENDED 在 yield 表达式处暂停-
  • GEN_CLOSE 执行结束,生成器停止
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import inspect
from typing import Generator


def generator() -> Generator[int, str, None]:
word_count = 0
word_list = []
while True:
try:
value = yield word_count # 阻塞于此,等待接收send
print('got str: {}'.format(value))
word_list.append(value)
word_count += 1
except ValueError:
print('OVER, all the words: {}'.format(word_list))


if __name__ == '__main__':
g = generator()
print(inspect.getgeneratorstate(g)) # GEN_CREATED

print('word count: {}'.format(next(g))) # word count: 0
print(inspect.getgeneratorstate(g)) # GEN_SUSPENDED

print('word count: {}'.format(g.send('Hello'))) # word count: 1
print('word count: {}'.format(g.send('World'))) # word count: 2

g.throw(ValueError) # OVER
g.close()
print(inspect.getgeneratorstate(g)) # GEN_CLOSE

输出如下

1
2
3
4
5
6
7
8
9
GEN_CREATED
word count: 0
GEN_SUSPENDED
got str: Hello
word count: 1
got str: World
word count: 2
OVER, all the words: ['Hello', 'World']
GEN_CLOSED

预激活生成器以及协程的返回值

这里需要使用到修饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from functools import wraps


def coroutine(func):
"""
预激活协程
:param func:
:return:
"""

@wraps(func)
def wrapper(*args, **kw):
g = func(*args, **kw)
next(g)
return g

return wrapper


@coroutine
def generator():
l = []
while True:
value = yield
if value == 'CLOSE':
break
l.append(value)
return l


if __name__ == '__main__':
g = generator()
value = None
for i in ('hello', 'world', 'CLOSE'):
try:
g.send(i)
except StopIteration as e: # 捕获生成器停止的异常
value = e.value # 获取其返回值
print('END')
print(value)

输出如下

1
2
END
['hello', 'world']

使用 yield from

这个可以实现对内部生成器的迭代,例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
def chain(*args):
"""
使用 yield from 进行迭代器的嵌套迭代
:param args:
:return:
"""
for iter_obj in args:
yield from iter_obj


if __name__ == '__main__':
for i in chain({'one', 'two'}, list('ace')):
print(i, end=' ')

输出如下

1
one two a c e

这种 chain迭代器有官方的实现

1
2
3
4
5
6
7
8
from itertools import chain

if __name__ == '__main__':
c = chain({'one', 'two'}, list('ace'))
print(c) # <itertools.chain object at 0x1033a2b38>

for i in c:
print(i, end=' ') # two one a c

输出如下

1
2
<itertools.chain object at 0x102aa2588>
one two a c e

yield from 实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from faker import Faker

def sub_coro():
"""子生成器函数,真正做事的函数"""
l = []
while True:
value = yield
if value == 'CLOSE':
break
l.append(value)
return sorted(l)


def dele_coro():
while True:
l = yield from sub_coro()
print('排序后的列表:', l)
print('-------------------')


def main():
fake = Faker().country_code
nest_country_list = [[fake() for i in range(3)] for j in range(3)]
for country_list in nest_country_list:
print('国家代号列表:', country_list)
c = dele_coro()
next(c)
for country in country_list:
c.send(country)
c.send('CLOSE')


if __name__ == '__main__':
main()

输出结果如下

1
2
3
4
5
6
7
8
9
国家代号列表: ['SL', 'IE', 'SD']
排序后的列表: ['IE', 'SD', 'SL']
-------------------
国家代号列表: ['HU', 'NE', 'KM']
排序后的列表: ['HU', 'KM', 'NE']
-------------------
国家代号列表: ['TO', 'LI', 'PL']
排序后的列表: ['LI', 'PL', 'TO']
-------------------

asyncio模块

基础知识

使用 asyncio 中的 asyncio.coroutine 修饰器来注明其为异步函数,可加入事件循环(event loop)中


阻塞式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import time


@asyncio.coroutine
def worker(name: str, sleep_time: float):
print('worker {} start work'.format(name))
time.sleep(sleep_time)
print('worker {} finish work, took {}s'.format(name, sleep_time))


if __name__ == '__main__':
loop = asyncio.get_event_loop()
workers = []
for i in range(1, 4):
workers.append(loop.create_task(worker('{}'.format(i), i)))
gather = asyncio.gather(*workers)

start = time.time()
loop.run_until_complete(gather)
print('all is finish, time usage: {}s'.format(time.time() - start))

输出如下

1
2
3
4
5
6
7
worker 1 start work
worker 1 finish work, took 1s
worker 2 start work
worker 2 finish work, took 2s
worker 3 start work
worker 3 finish work, took 3s
all is finish, time usage: 6.003150701522827s

异步式

这里将 time.sleep 换成了 asyncio.sleep,就将其变成异步的了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import time


async def worker(name: str, sleep_time: float):
print('worker {} start work'.format(name))
await asyncio.sleep(sleep_time)
print('worker {} finish work, took {}s'.format(name, sleep_time))


if __name__ == '__main__':
loop = asyncio.get_event_loop()
workers = []
for i in range(1, 4):
workers.append(loop.create_task(worker('{}'.format(i), i)))
gather = asyncio.gather(*workers)

start = time.time()
loop.run_until_complete(gather)
print('all is finish, time usage: {}s'.format(time.time() - start))

输入如下

1
2
3
4
5
6
7
worker 1 start work
worker 2 start work
worker 3 start work
worker 1 finish work, took 1s
worker 2 finish work, took 2s
worker 3 finish work, took 3s
all is finish, time usage: 3.004673957824707

为其添加回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import functools
import time


async def worker(name: str, sleep_time: float) -> float:
print('worker {} start work'.format(name))
await asyncio.sleep(sleep_time)
return sleep_time


def cb(name: str, task: asyncio.Task):
"""worker的回调函数"""
print('worker {} state: {} , took {}s'.format(name, task._state, task._result))


if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = []
for i in range(1, 4):
task = loop.create_task(worker('{}'.format(i), i))
task.add_done_callback(functools.partial(cb, '{}'.format(i)))
tasks.append(task)
gather = asyncio.gather(*tasks)

start = time.time()
loop.run_until_complete(gather)
print('all is finish, time usage: {}s'.format(time.time() - start))

输出结果如下

1
2
3
4
5
6
7
worker 1 start work
worker 2 start work
worker 3 start work
worker 1 state: FINISHED , took 1s
worker 2 state: FINISHED , took 2s
worker 3 state: FINISHED , took 3s
all is finish, time usage: 3.003952980041504s

异步编程

取消异步任务

只有处于 PENDING 状态下的任务才可以取消,FINISHED状态不可以取消


一个简单的实例

使用 KeyboardInterrupt 来接收终止事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio


async def work(id, t):
print('Working...')
await asyncio.sleep(t)
print('Work {} done'.format(id))


if __name__ == '__main__':
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt: # 接收终端事件,结束事件循环中全部的pending任务
loop.stop()
finally:
loop.close()

在任务执行过程中 ctrl+c,直接取消还未完成的任务

1
2
3
4
5
6
Working...
Working...
Working...
Work 1 done
Work 2 done
^C

显示任务的详细信息

通过 asyncio.Task.all_tasks 获取全部任务并获取它们的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio


async def work(id, t):
print('Working...')
await asyncio.sleep(t)
print('Work {} done'.format(id))


if __name__ == '__main__':
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt:
tasks = asyncio.Task.all_tasks()
for i in tasks:
print('取消任务:{}'.format(i._state))
print('取消状态:{}'.format(i.cancel()))
finally:
loop.close()

效果如下,在执行过程中输入 ctrl+c

1
2
3
4
5
6
7
8
9
10
Working...
Working...
Working...
Work 1 done
^C取消任务:PENDING
取消状态:True
取消任务:FINISHED
取消状态:False
取消任务:PENDING
取消状态:True

排定任务

asyncioeventloop提供了一系列函数来处理异步函数执行的问题

  • run_forever loop一直执行直到调用 loop.stop()函数
  • call_soon 将普通的函数加入到事件循环中,立刻执行
  • call_later 将普通的函数加入到事件循环中,延时执行
  • call_at 将普通的函数加入到事件循环中, 在某时刻执行,配合 loop.time() 使用

下面是一个整合的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import functools


def normal_func(name: str):
print('[normal_func] triggered by {}'.format(name))


def loop_stop(loop: asyncio.AbstractEventLoop, future: asyncio.Future):
loop.stop()


async def work(t: int, name: str):
print('[{}] start'.format(name))
await asyncio.sleep(t)
print('[{}] after {}s stop'.format(name, t))


if __name__ == '__main__':
# asyncio.ensure_future
# loop.create_Task
# loop.run_until_complete
loop = asyncio.get_event_loop()
start = loop.time()
tasks = asyncio.gather(work(1, '1'), work(4, '2'))

loop.call_soon(normal_func, 'call_soon')
loop.call_later(2, normal_func, 'call_later')
loop.call_at(start + 3, normal_func, 'call_at')

tasks.add_done_callback(functools.partial(loop_stop, loop))
loop.run_forever() # 无限运行事件循环,直至 loop.stop 停止
loop.close() # 关闭事件循环,只有 loop 处于停止状态才会执行

结果如下

1
2
3
4
5
6
7
[2] start
[1] start
[normal_func] triggered by call_soon
[1] after 1s stop
[normal_func] triggered by call_later
[normal_func] triggered by call_at
[2] after 4s stop