@wraps(func) defwrapper(*args, **kw): g = func(*args, **kw) next(g) return g
return wrapper
@coroutine defgenerator(): l = [] whileTrue: value = yield if value == 'CLOSE': break l.append(value) return l
if __name__ == '__main__': g = generator() value = None for i in ('hello', 'world', 'CLOSE'): try: g.send(i) except StopIteration as e: # 捕获生成器停止的异常 value = e.value # 获取其返回值 print('END') print(value)
输出如下
1 2
END ['hello', 'world']
使用 yield from
这个可以实现对内部生成器的迭代,例子如下
1 2 3 4 5 6 7 8 9 10 11 12 13
defchain(*args): """ 使用 yield from 进行迭代器的嵌套迭代 :param args: :return: """ for iter_obj in args: yieldfrom iter_obj
if __name__ == '__main__': for i in chain({'one', 'two'}, list('ace')): print(i, end=' ')
输出如下
1
one two a c e
这种 chain迭代器有官方的实现
1 2 3 4 5 6 7 8
from itertools import chain
if __name__ == '__main__': c = chain({'one', 'two'}, list('ace')) print(c) # <itertools.chain object at 0x1033a2b38>
for i in c: print(i, end=' ') # two one a c
输出如下
1 2
<itertools.chain object at 0x102aa2588> one two a c e
defsub_coro(): """子生成器函数,真正做事的函数""" l = [] whileTrue: value = yield if value == 'CLOSE': break l.append(value) return sorted(l)
defdele_coro(): whileTrue: l = yieldfrom sub_coro() print('排序后的列表:', l) print('-------------------')
defmain(): fake = Faker().country_code nest_country_list = [[fake() for i in range(3)] for j in range(3)] for country_list in nest_country_list: print('国家代号列表:', country_list) c = dele_coro() next(c) for country in country_list: c.send(country) c.send('CLOSE')
if __name__ == '__main__': loop = asyncio.get_event_loop() workers = [] for i in range(1, 4): workers.append(loop.create_task(worker('{}'.format(i), i))) gather = asyncio.gather(*workers)
start = time.time() loop.run_until_complete(gather) print('all is finish, time usage: {}s'.format(time.time() - start))
输出如下
1 2 3 4 5 6 7
worker 1 start work worker 1 finish work, took 1s worker 2 start work worker 2 finish work, took 2s worker 3 start work worker 3 finish work, took 3s all is finish, time usage: 6.003150701522827s
if __name__ == '__main__': loop = asyncio.get_event_loop() workers = [] for i in range(1, 4): workers.append(loop.create_task(worker('{}'.format(i), i))) gather = asyncio.gather(*workers)
start = time.time() loop.run_until_complete(gather) print('all is finish, time usage: {}s'.format(time.time() - start))
输入如下
1 2 3 4 5 6 7
worker 1 start work worker 2 start work worker 3 start work worker 1 finish work, took 1s worker 2 finish work, took 2s worker 3 finish work, took 3s all is finish, time usage: 3.004673957824707
if __name__ == '__main__': loop = asyncio.get_event_loop() tasks = [] for i in range(1, 4): task = loop.create_task(worker('{}'.format(i), i)) task.add_done_callback(functools.partial(cb, '{}'.format(i))) tasks.append(task) gather = asyncio.gather(*tasks)
start = time.time() loop.run_until_complete(gather) print('all is finish, time usage: {}s'.format(time.time() - start))
输出结果如下
1 2 3 4 5 6 7
worker 1 start work worker 2 start work worker 3 start work worker 1 state: FINISHED , took 1s worker 2 state: FINISHED , took 2s worker 3 state: FINISHED , took 3s all is finish, time usage: 3.003952980041504s
if __name__ == '__main__': loop = asyncio.get_event_loop() coroutines = [work(i, i) for i in range(1, 4)] try: loop.run_until_complete(asyncio.gather(*coroutines)) except KeyboardInterrupt: tasks = asyncio.Task.all_tasks() for i in tasks: print('取消任务:{}'.format(i._state)) print('取消状态:{}'.format(i.cancel())) finally: loop.close()
[2] start [1] start [normal_func] triggered by call_soon [1] after 1s stop [normal_func] triggered by call_later [normal_func] triggered by call_at [2] after 4s stop