?
?
?
?1、常見并發(fā)類型
I/ O密集型:
藍(lán)色框表示程序執(zhí)行工作的時間,紅色框表示等待I/O操作完成的時間。此圖沒有按比例顯示,因為internet上的請求可能比CPU指令要多花費幾個數(shù)量級的時間,所以你的程序可能會花費大部分時間進(jìn)行等待。
?CPU密集型:
IO密集型程序?qū)r間花在cpu計算上。
常見并發(fā)類型以及區(qū)別:
?
?
?2、同步版本
?我們將使用requests訪問100個網(wǎng)頁,使用同步的方式,requests的請求是同步的,所以代碼就很好寫了。
同步的版本代碼邏輯簡單,編寫也會很相對容易。
import requests import time def download_site(url,session): with session.get(url) as response: print (len(response.content)) def download_all_site(sites): with requests.Session() as session: for url in sites: download_site(url,session) if __name__ == " __main__ " : sites = [ " https://www.baidu.com " , " https://www.jython.org " ] * 50 start_time = time.time() download_all_site(sites) end_time = time.time() print ( " 執(zhí)行時間:%s " % (end_time - start_time) + " 秒 " ) # download_site()只從一個URL下載內(nèi)容并打印其大小 # 需要知道的是我們這里沒有使用requests.get(),而使用了session.get(),我們使用requests.Session()創(chuàng)建了一個Session對象,每次請求使用了session.get(url,因為可以讓requests運用一些神奇的網(wǎng)絡(luò)小技巧,從而真正使程序加速。 # 執(zhí)行時間:33.91123294830322秒
?
?
?3、多線程
?ThreadPoolExecutor,: ThreadPoolExecutor =Thread+Pool+ Executor。
你已經(jīng)了解了Thread部分。那只是我們之前提到的一個思路。Pool部分是開始變得有趣的地方。這個對象將創(chuàng)建一個線程池,其中的每個線程都可以并發(fā)運行。最后,Executor是控制線程池中的每個線程如何以及何時運行的部分。它將在線程池中執(zhí)行請求。
對我們很有幫助的是,標(biāo)準(zhǔn)庫將ThreadPoolExecutor實現(xiàn)為一個上下文管理器,因此你可以使用with語法來管理Threads池的創(chuàng)建和釋放。
一旦有了ThreadPoolExecutor,你就可以使用它方便的.map()方法。此方法在列表中的每個站點上運行傳入函數(shù)。最重要的是,它使用自己管理的線程池自動并發(fā)地運行它們。
來自其他語言,甚至Python 2的人可能想知道,在處理threading時,管理你習(xí)慣的細(xì)節(jié)的常用對象和函數(shù)在哪里,比如Thread.start()、Thread.join()和Queue。
這些都還在那里,你可以使用它們來實現(xiàn)對線程運行方式的精細(xì)控制。但是,從Python 3.2開始,標(biāo)準(zhǔn)庫添加了一個更高級別的抽象,稱為Executor,如果你不需要精細(xì)控制,它可以為你管理許多細(xì)節(jié)。
本例中另一個有趣的更改是,每個線程都需要創(chuàng)建自己的request . Session()對象。當(dāng)你查看requests的文檔時,不一定就能很容易地看出,但在閱讀這個問題(https://github.com/requests/requests/issues/2766? )時,你會清晰地發(fā)現(xiàn)每個線程都需要一個單獨的Session。
這是threading中有趣且困難的問題之一。因為操作系統(tǒng)可以控制任務(wù)何時中斷,何時啟動另一個任務(wù),所以線程之間共享的任何數(shù)據(jù)都需要被保護(hù)起來,或者說是線程安全的。不幸的是,requests . Session()不是線程安全的。
根據(jù)數(shù)據(jù)是什么以及如何你使用它們,有幾種策略可以使數(shù)據(jù)訪問變成線程安全的。其中之一是使用線程安全的數(shù)據(jù)結(jié)構(gòu),比如來自 Python的queue模塊的Queue。
這些對象使用低級基本數(shù)據(jù)類型,比如threading.Lock,以確保只有一個線程可以同時訪問代碼塊或內(nèi)存塊。你可以通過ThreadPoolExecutor對象間接地使用此策略。
import requests import concurrent.futures import threading import time # 創(chuàng)建線程池 thread_local= threading.local() def get_session(): if not getattr(thread_local, " session " ,None): thread_local.session = requests.Session() return thread_local.session def download_site(url): session = get_session() with session.get(url) as response: print (len(response.content)) def download_all_site(sites): with concurrent.futures.ThreadPoolExecutor(max_workers =5 ) as exector: exector.map(download_site,sites) if __name__ == " __main__ " : sites = [ " https://www.baidu.com " , " https://www.jython.org " ] * 50 start_time = time.time() download_all_site(sites) end_time = time.time() print ( " 執(zhí)行時間:%s " % (end_time - start_time) + " 秒 " ) # 執(zhí)行時間:6.152076244354248秒
?
這里要使用的另一種策略是線程本地存儲。Threading.local()會創(chuàng)建一個對象,它看起來像一個全局對象但又是特定于每個線程的。在我們的示例中,這是通過threadLocal和get_session()完成的:
ThreadLocal是threading模塊中專門用來解決這個問題的。它看起來有點奇怪,但是你只想創(chuàng)建其中一個對象,而不是為每個線程創(chuàng)建一個對象。對象本身將負(fù)責(zé)從不同的線程到不同的數(shù)據(jù)的分開訪問。
當(dāng)get_session()被調(diào)用時,它所查找的session是特定于它所運行的線程的。因此,每個線程都將在第一次調(diào)用get_session()時創(chuàng)建一個單個的會話,然后在整個生命周期中對每個后續(xù)調(diào)用使用該會話。
最后,簡要介紹一下選擇線程的數(shù)量。你可以看到示例代碼使用了5個線程。隨意改變這個數(shù)字,看看總時間是如何變化的。你可能認(rèn)為每次下載只有一個線程是最快的,但至少在我的系統(tǒng)上不是這樣。我在5到10個線程之間找到了最快的結(jié)果。如果超過這個值,那么創(chuàng)建和銷毀線程的額外開銷就會抵消程序節(jié)省的時間。
這里比較困難的答案是,從一個任務(wù)到另一個任務(wù)的正確線程數(shù)不是一個常量。需要進(jìn)行一些實驗來得到。
?注意:request . Session()不是線程安全的。這意味著,如果多個線程使用同一個Session,那么在某些地方可能會發(fā)生上面描述的交互類型問題。
?
多線程代碼的執(zhí)行時序表:
?
?
?4、異步IO
asyncio的一般概念是一個單個的Python對象,稱為事件循環(huán),它控制每個任務(wù)如何以及何時運行。事件循環(huán)會關(guān)注每個任務(wù)并知道它處于什么狀態(tài)。在實際中,任務(wù)可以處于許多狀態(tài),但現(xiàn)在我們假設(shè)一個簡化的只有兩種狀態(tài)的事件循環(huán)。
就緒狀態(tài)將表明一個任務(wù)有工作要做,并且已經(jīng)準(zhǔn)備好運行,而等待狀態(tài)意味著該任務(wù)正在等待一些外部工作完成,例如網(wǎng)絡(luò)操作。
我們簡化的事件循環(huán)維護(hù)兩個任務(wù)列表,每一個對應(yīng)這些狀態(tài)。它會選擇一個就緒的任務(wù),然后重新啟動它。該任務(wù)處于完全控制之中,直到它配合地將控制權(quán)交還給事件循環(huán)為止。
當(dāng)正在運行的任務(wù)將控制權(quán)交還給事件循環(huán)時,事件循環(huán)將該任務(wù)放入就緒或等待列表中,然后遍歷等待列表中的每個任務(wù),以查看I/O操作完成后某個任務(wù)是否已經(jīng)就緒。時間循環(huán)知道就緒列表中的任務(wù)仍然是就緒的,因為它知道它們還沒有運行。
一旦所有的任務(wù)都重新排序到正確的列表中,事件循環(huán)將選擇下一個要運行的任務(wù),然后重復(fù)這個過程。我們簡化的事件循環(huán)會選擇等待時間最長的任務(wù)并運行該任務(wù)。此過程會一直重復(fù),直到事件循環(huán)結(jié)束。
asyncio的一個重要之處在于,如果沒有刻意去釋放控制權(quán),任務(wù)是永遠(yuǎn)不會放棄控制權(quán)的。它們在操作過程中從不會被打斷。這使得我們在asyncio中比在threading中能更容易地共享資源。你不必?fù)?dān)心代碼是否是線程安全的。
import time import asyncio from aiohttp import ClientSession async def download_site(session,url): global i try : async with session.get(url) as response: i =i+1 print (i) return await response.read() except Exception as e: pass async def download_all_site(sites): async with ClientSession() as session: tasks = [] for url in sites: task = asyncio.create_task(download_site(session,url)) tasks.append(task) result = await asyncio.gather(*tasks) # 等待一組協(xié)程運行結(jié)束并接收結(jié)果 print (result) if __name__ == " __main__ " : i = 0 sites = [ " http://www.360kuai.com/ " , " https://www.jython.org " ] * 50 start_time = time.time() asyncio.run(download_all_site(sites)) end_time = time.time() print ( " 執(zhí)行時間:%s " % (end_time - start_time) + " 秒 " )
#執(zhí)行時間:5.29184889793396秒
異步IO的執(zhí)行時序表:
asyncio版本的問題
此時asyncio有兩個問題。你需要特殊的異步版本的庫來充分利用asycio。如果你只是使用requests下載站點,那么速度會慢得多,因為requests的設(shè)計目的不是通知事件循環(huán)它被阻塞了。隨著時間的推移,這個問題變得微不足道,因為越來越多的庫包含了asyncio。
另一個更微妙的問題是,如果其中一個任務(wù)不合作,那么協(xié)作多任務(wù)處理的所有優(yōu)勢都將不存在。代碼中的一個小錯誤可能會導(dǎo)致任務(wù)運行超時并長時間占用處理器,使需要運行的其他任務(wù)無法運行。如果一個任務(wù)沒有將控制權(quán)交還給事件循環(huán),則事件循環(huán)無法中斷它。
考慮到這一點,我們來開始討論一種完全不同的并發(fā)性——multiprocessing。
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
