前言
從語法上來看,協(xié)程和生成器類似,都是定義體中包含yield關(guān)鍵字的函數(shù)。
yield在協(xié)程中的用法:
- 在協(xié)程中yield通常出現(xiàn)在表達(dá)式的右邊,例如:datum = yield,可以產(chǎn)出值,也可以不產(chǎn)出--如果yield關(guān)鍵字后面沒有表達(dá)式,那么生成器產(chǎn)出None.
- 協(xié)程可能從調(diào)用方接受數(shù)據(jù),調(diào)用方是通過send(datum)的方式把數(shù)據(jù)提供給協(xié)程使用,而不是next(...)函數(shù),通常調(diào)用方會把值推送給協(xié)程。
- 協(xié)程可以把控制器讓給中心調(diào)度程序,從而激活其他的協(xié)程
所以總體上在協(xié)程中把yield看做是控制流程的方式。
在前一篇《一文徹底搞懂Python可迭代(Iterable)、迭代器(Iterator)和生成器(Generator)的概念》 的文中,知道生成器(Generator)可由以下兩種方式定義:
- 列表生成器
- 使用yield定義的函數(shù)
在Python早期的版本中協(xié)程也是通過生成器來實現(xiàn)的,也就是基于生成器的協(xié)程(Generator-based Coroutines)。在前一篇介紹生成器的文章末尾舉了一個生產(chǎn)者-消費者的例子,就是基于生成器的協(xié)程來實現(xiàn)的。
def producer(c): n = 0 while n < 5: n += 1 print('producer {}'.format(n)) r = c.send(n) print('consumer return {}'.format(r)) def consumer(): r = '' while True: n = yield r if not n: return print('consumer {} '.format(n)) r = 'ok' if __name__ == '__main__': c = consumer() next(c) # 啟動consumer producer(c)
看了這段代碼,相信很多初學(xué)者和我一樣對基于生成器的協(xié)程實現(xiàn)其實很難馬上就能夠根據(jù)業(yè)務(wù)寫出自己的協(xié)程代碼。Python實現(xiàn)者們也注意到這個問題,因為它太不Pythonic了。而基于生成器的協(xié)程也將被廢棄,因此本文將重點介紹asyncio包的使用,以及涉及到的一些相關(guān)類概念。
注:
我使用的Python環(huán)境是3.7。
0x00 何為協(xié)程(Coroutine)
協(xié)程(Coroutine)是在線程中執(zhí)行的,可理解為微線程,但協(xié)程的切換沒有上下文的消耗,它比線程更加輕量些。一個協(xié)程可以隨時中斷自己讓另一個協(xié)程開始執(zhí)行,也可以從中斷處恢復(fù)并繼續(xù)執(zhí)行,它們之間的調(diào)度是由程序員來控制的(可以看本文開篇處生產(chǎn)者-消費者的代碼)。
定義一個協(xié)程
在Python3.5+版本新增了aysnc和await關(guān)鍵字,這兩個語法糖讓我們非常方便地定義和使用協(xié)程。
在函數(shù)定義時用async聲明就定義了一個協(xié)程。
import asyncio # 定義了一個簡單的協(xié)程 async def simple_async(): print('hello') await asyncio.sleep(1) # 休眠1秒 print('python') # 使用asynio中run方法運(yùn)行一個協(xié)程 asyncio.run(simple_async()) # 執(zhí)行結(jié)果為 # hello # python
在協(xié)程中如果要調(diào)用另一個協(xié)程就使用await。要注意await關(guān)鍵字要在async定義的函數(shù)中使用,而反過來async函數(shù)可以不出現(xiàn)await
# 定義了一個簡單的協(xié)程 async def simple_async(): print('hello') asyncio.run(simple_async()) # 執(zhí)行結(jié)果 # hello
asyncio.run()將運(yùn)行傳入的協(xié)程,負(fù)責(zé)管理asyncio事件循環(huán)。
除了run()方法可直接執(zhí)行協(xié)程外,還可以使用事件循環(huán)loop
async def do_something(index): print(f'start {time.strftime("%X")}', index) await asyncio.sleep(1) print(f'finished at {time.strftime("%X")}', index) def test_do_something(): # 生成器產(chǎn)生多個協(xié)程對象 task = [do_something(i) for i in range(5)] # 獲取一個事件循環(huán)對象 loop = asyncio.get_event_loop() # 在事件循環(huán)中執(zhí)行task列表 loop.run_until_complete(asyncio.wait(task)) loop.close() test_do_something() # 運(yùn)行結(jié)果 # start 00:04:03 3 # start 00:04:03 4 # start 00:04:03 1 # start 00:04:03 2 # start 00:04:03 0 # finished at 00:04:04 3 # finished at 00:04:04 4 # finished at 00:04:04 1 # finished at 00:04:04 2 # finished at 00:04:04 0
可以看出幾乎同時啟動了所有的協(xié)程。
其實翻閱源碼可知asyncio.run()的實現(xiàn)也是封裝了loop對象及其調(diào)用。而asyncio.run()每次都會創(chuàng)建一個新的事件循環(huán)對象用于執(zhí)行協(xié)程。
0x01 Awaitable對象
在Python中可等待(Awaitable)對象有:協(xié)程(corountine)、任務(wù)(Task)、Future。即這些對象可以使用await關(guān)鍵字進(jìn)行調(diào)用
await awaitable_object
1. 協(xié)程(Coroutine)
協(xié)程由async def聲明定義,一個協(xié)程可由另一個協(xié)程使用await進(jìn)行調(diào)用
async def nested(): print('in nested func') return 13 async def outer(): # 要使用await 關(guān)鍵字 才會執(zhí)行一個協(xié)程函數(shù)返回的協(xié)程對象 print(await nested()) asyncio.run(outer()) # 執(zhí)行結(jié)果 # in nested func # 13
如果在outer()方法中直接調(diào)用nested()而不使用await,將拋出一個RuntimeWarning
async def outer(): # 直接調(diào)用協(xié)程函數(shù)不會發(fā)生執(zhí)行,只是返回一個 coroutine 對象 nested() asyncio.run(outer())
運(yùn)行程序,控制臺將輸出以下信息
RuntimeWarning: coroutine 'nested' was never awaited
? nested()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
2. 任務(wù)(Task)
任務(wù)(Task)是可以用來并發(fā)地執(zhí)行協(xié)程。可以使用asyncio.create_task()將一個協(xié)程對象封裝成任務(wù),該任務(wù)將很快被排入調(diào)度隊列并執(zhí)行。
async def nested(): print('in nested func') return 13 async def create_task(): # create_task 將一個協(xié)程對象打包成一個 任務(wù)時,該協(xié)程就會被自動調(diào)度運(yùn)行 task = asyncio.create_task(nested()) # 如果要看到task的執(zhí)行結(jié)果 # 可以使用await等待協(xié)程執(zhí)行完成,并返回結(jié)果 ret = await task print(f'nested return {ret}') asyncio.run(create_task()) # 運(yùn)行結(jié)果 # in nested func # nested return 13
注:
關(guān)于并發(fā)下文還會詳細(xì)說明。
3. Future
Future是一種特殊的低層級(low-level)對象,它是異步操作的最終結(jié)果(eventual result)。
當(dāng)一個 Future 對象 被等待,這意味著協(xié)程將保持等待直到該 Future 對象在其他地方操作完畢。
通常在應(yīng)用層代碼不會直接創(chuàng)建Future對象。在某些庫和asyncio模塊中的會使用到該對象。
async def used_future_func(): await function_that_returns_a_future_object()
0x02 并發(fā)
1. Task
前面我們知道Task可以并發(fā)地執(zhí)行。? asyncio.create_task()就是一個把協(xié)程封裝成Task的方法。
async def do_after(what, delay): await asyncio.sleep(delay) print(what) # 利用asyncio.create_task創(chuàng)建并行任務(wù) async def corun(): task1 = asyncio.create_task(do_after('hello', 1)) # 模擬執(zhí)行1秒的任務(wù) task2 = asyncio.create_task(do_after('python', 2)) # 模擬執(zhí)行2秒的任務(wù) print(f'started at {time.strftime("%X")}') # 等待兩個任務(wù)都完成,兩個任務(wù)是并行的,所以總時間兩個任務(wù)中最大的執(zhí)行時間 await task1 await task2 print(f'finished at {time.strftime("%X")}') asyncio.run(corun()) # 運(yùn)行結(jié)果 # started at 23:41:08 # hello # python # finished at 23:41:10
task1是一個執(zhí)行1秒的任務(wù),task2是一個執(zhí)行2秒的任務(wù),兩個任務(wù)并發(fā)的執(zhí)行,總共消耗2秒。
2. gather
除了使用asyncio.create_task()外還可以使用asyncio.gather(),這個方法接收協(xié)程參數(shù)列表
async def do_after(what, delay): await asyncio.sleep(delay) print(what) async def gather(): print(f'started at {time.strftime("%X")}') # 使用gather可將多個協(xié)程傳入 await asyncio.gather( do_after('hello', 1), do_after('python', 2), ) print(f'finished at {time.strftime("%X")}') asyncio.run(gather()) # 運(yùn)行結(jié)果 # started at 23:47:50 # hello # python # finished at 23:47:52
兩個任務(wù)消耗的時間為其中消耗時間最長的任務(wù)。
0x03 引用
docs.python.org/3/library/a…
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,謝謝大家對腳本之家的支持。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
