引言
對于 Python 來說,并不缺少并發選項,其標準庫中包括了對線程、進程和異步 I/O 的支持。在許多情況下,通過創建諸如異步、線程和子進程之類的高層模塊,Python 簡化了各種并發方法的使用。除了標準庫之外,還有一些第三方的解決方案,例如 Twisted、Stackless 和進程模塊。本文重點關注于使用 Python 的線程,并使用了一些實際的示例進行說明。雖然有許多很好的聯機資源詳細說明了線程 API,但本文嘗試提供一些實際的示例,以說明一些常見的線程使用模式。
全局解釋器鎖 (Global Interpretor Lock) 說明 Python 解釋器并不是線程安全的。當前線程必須持有全局鎖,以便對 Python 對象進行安全地訪問。因為只有一個線程可以獲得 Python 對象/C API,所以解釋器每經過 100 個字節碼的指令,就有規律地釋放和重新獲得鎖。解釋器對線程切換進行檢查的頻率可以通過 sys.setcheckinterval() 函數來進行控制。
此外,還將根據潛在的阻塞 I/O 操作,釋放和重新獲得鎖。有關更詳細的信息,請參見參考資料部分中的 Gil and Threading State 和 Threading the Global Interpreter Lock。
需要說明的是,因為 GIL,CPU 受限的應用程序將無法從線程的使用中受益。使用 Python 時,建議使用進程,或者混合創建進程和線程。
首先弄清進程和線程之間的區別,這一點是非常重要的。線程與進程的不同之處在于,它們共享狀態、內存和資源。對于線程來說,這個簡單的區別既是它的優勢,又是它的缺點。一方面,線程是輕量級的,并且相互之間易于通信,但另一方面,它們也帶來了包括死鎖、爭用條件和高復雜性在內的各種問題。幸運的是,由于 GIL 和隊列模塊,與采用其他的語言相比,采用 Python 語言在線程實現的復雜性上要低得多。
使用 Python 線程
要繼續學習本文中的內容,我假定您已經安裝了 Python 2.5 或者更高版本,因為本文中的許多示例都將使用 Python 語言的新特性,而這些特性僅出現于 Python2.5 之后。要開始使用 Python 語言的線程,我們將從簡單的 "Hello World" 示例開始:
hello_threads_example
import threading import datetime class ThreadClass(threading.Thread): def run(self): now = datetime.datetime.now() print "%s says Hello World at time: %s" % (self.getName(), now) for i in range(2): t = ThreadClass() t.start()
如果運行這個示例,您將得到下面的輸出:
# python hello_threads.py Thread-1 says Hello World at time: 2008-05-13 13:22:50.252069 Thread-2 says Hello World at time: 2008-05-13 13:22:50.252576
仔細觀察輸出結果,您可以看到從兩個線程都輸出了 Hello World 語句,并都帶有日期戳。如果分析實際的代碼,那么將發現其中包含兩個導入語句;一個語句導入了日期時間模塊,另一個語句導入線程模塊。類 ThreadClass 繼承自 threading.Thread,也正因為如此,您需要定義一個 run 方法,以此執行您在該線程中要運行的代碼。在這個 run 方法中唯一要注意的是,self.getName() 是一個用于確定該線程名稱的方法。
最后三行代碼實際地調用該類,并啟動線程。如果注意的話,那么會發現實際啟動線程的是 t.start()。在設計線程模塊時考慮到了繼承,并且線程模塊實際上是建立在底層線程模塊的基礎之上的。對于大多數情況來說,從 threading.Thread 進行繼承是一種最佳實踐,因為它創建了用于線程編程的常規 API。
使用線程隊列
如前所述,當多個線程需要共享數據或者資源的時候,可能會使得線程的使用變得復雜。線程模塊提供了許多同步原語,包括信號量、條件變量、事件和鎖。當這些選項存在時,最佳實踐是轉而關注于使用隊列。相比較而言,隊列更容易處理,并且可以使得線程編程更加安全,因為它們能夠有效地傳送單個線程對資源的所有訪問,并支持更加清晰的、可讀性更強的設計模式。
在下一個示例中,您將首先創建一個以串行方式或者依次執行的程序,獲取網站的 URL,并顯示頁面的前 1024 個字節。有時使用線程可以更快地完成任務,下面就是一個典型的示例。首先,讓我們使用 urllib2 模塊以獲取這些頁面(一次獲取一個頁面),并且對代碼的運行時間進行計時:
URL 獲取序列
import urllib2 import time hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] start = time.time() #grabs urls of hosts and prints first 1024 bytes of page for host in hosts: url = urllib2.urlopen(host) print url.read(1024) print "Elapsed Time: %s" % (time.time() - start)
在運行以上示例時,您將在標準輸出中獲得大量的輸出結果。但最后您將得到以下內容:
Elapsed Time: 2.40353488922
讓我們仔細分析這段代碼。您僅導入了兩個模塊。首先,urllib2 模塊減少了工作的復雜程度,并且獲取了 Web 頁面。然后,通過調用 time.time(),您創建了一個開始時間值,然后再次調用該函數,并且減去開始值以確定執行該程序花費了多長時間。最后分析一下該程序的執行速度,雖然“2.5 秒”這個結果并不算太糟,但如果您需要檢索數百個 Web 頁面,那么按照這個平均值,就需要花費大約 50 秒的時間。研究如何創建一種可以提高執行速度的線程化版本:
URL 獲取線程化
#!/usr/bin/env python import Queue import threading import urllib2 import time hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] queue = Queue.Queue() class ThreadUrl(threading.Thread): """Threaded Url Grab""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: #grabs host from queue host = self.queue.get() #grabs urls of hosts and prints first 1024 bytes of page url = urllib2.urlopen(host) print url.read(1024) #signals to queue job is done self.queue.task_done() start = time.time() def main(): #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadUrl(queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) #wait on the queue until everything has been processed queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
對于這個示例,有更多的代碼需要說明,但與第一個線程示例相比,它并沒有復雜多少,這正是因為使用了隊列模塊。在 Python 中使用線程時,這個模式是一種很常見的并且推薦使用的方式。具體工作步驟描述如下:
- ??? 創建一個 Queue.Queue() 的實例,然后使用數據對它進行填充。
- ??? 將經過填充數據的實例傳遞給線程類,后者是通過繼承 threading.Thread 的方式創建的。
- ??? 生成守護線程池。
- ??? 每次從隊列中取出一個項目,并使用該線程中的數據和 run 方法以執行相應的工作。
- ??? 在完成這項工作之后,使用 queue.task_done() 函數向任務已經完成的隊列發送一個信號。
- ??? 對隊列執行 join 操作,實際上意味著等到隊列為空,再退出主程序。
在使用這個模式時需要注意一點:通過將守護線程設置為 true,將允許主線程或者程序僅在守護線程處于活動狀態時才能夠退出。這種方式創建了一種簡單的方式以控制程序流程,因為在退出之前,您可以對隊列執行 join 操作、或者等到隊列為空。隊列模塊文檔詳細說明了實際的處理過程,請參見參考資料:
??? join()
??? 保持阻塞狀態,直到處理了隊列中的所有項目為止。在將一個項目添加到該隊列時,未完成的任務的總數就會增加。當使用者線程調用 task_done() 以表示檢索了該項目、并完成了所有的工作時,那么未完成的任務的總數就會減少。當未完成的任務的總數減少到零時,join() 就會結束阻塞狀態。
使用多個隊列
因為上面介紹的模式非常有效,所以可以通過連接附加線程池和隊列來進行擴展,這是相當簡單的。在上面的示例中,您僅僅輸出了 Web 頁面的開始部分。而下一個示例則將返回各線程獲取的完整 Web 頁面,然后將結果放置到另一個隊列中。然后,對加入到第二個隊列中的另一個線程池進行設置,然后對 Web 頁面執行相應的處理。這個示例中所進行的工作包括使用一個名為 Beautiful Soup 的第三方 Python 模塊來解析 Web 頁面。使用這個模塊,您只需要兩行代碼就可以提取所訪問的每個頁面的 title 標記,并將其打印輸出。
多隊列數據挖掘網站
import Queue import threading import urllib2 import time from BeautifulSoup import BeautifulSoup hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] queue = Queue.Queue() out_queue = Queue.Queue() class ThreadUrl(threading.Thread): """Threaded Url Grab""" def __init__(self, queue, out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue def run(self): while True: #grabs host from queue host = self.queue.get() #grabs urls of hosts and then grabs chunk of webpage url = urllib2.urlopen(host) chunk = url.read() #place chunk into out queue self.out_queue.put(chunk) #signals to queue job is done self.queue.task_done() class DatamineThread(threading.Thread): """Threaded Url Grab""" def __init__(self, out_queue): threading.Thread.__init__(self) self.out_queue = out_queue def run(self): while True: #grabs host from queue chunk = self.out_queue.get() #parse the chunk soup = BeautifulSoup(chunk) print soup.findAll(['title']) #signals to queue job is done self.out_queue.task_done() start = time.time() def main(): #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadUrl(queue, out_queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) for i in range(5): dt = DatamineThread(out_queue) dt.setDaemon(True) dt.start() #wait on the queue until everything has been processed queue.join() out_queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
如果運行腳本的這個版本,您將得到下面的輸出:
# python url_fetch_threaded_part2.py [ ] [ ] [ ] [ ] [ ] Elapsed Time: 3.75387597084
分析這段代碼時您可以看到,我們添加了另一個隊列實例,然后將該隊列傳遞給第一個線程池類 ThreadURL。接下來,對于另一個線程池類 DatamineThread,幾乎復制了完全相同的結構。在這個類的 run 方法中,從隊列中的各個線程獲取 Web 頁面、文本塊,然后使用 Beautiful Soup 處理這個文本塊。在這個示例中,使用 Beautiful Soup 提取每個頁面的 title 標記、并將其打印輸出。可以很容易地將這個示例推廣到一些更有價值的應用場景,因為您掌握了基本搜索引擎或者數據挖掘工具的核心內容。一種思想是使用 Beautiful Soup 從每個頁面中提取鏈接,然后按照它們進行導航。
總結
本文研究了 Python 的線程,并且說明了如何使用隊列來降低復雜性和減少細微的錯誤、并提高代碼可讀性的最佳實踐。盡管這個基本模式比較簡單,但可以通過將隊列和線程池連接在一起,以便將這個模式用于解決各種各樣的問題。在最后的部分中,您開始研究如何創建更復雜的處理管道,它可以用作未來項目的模型。參考資料部分提供了很多有關常規并發性和線程的極好的參考資料。
最后,還有很重要的一點需要指出,線程并不能解決所有的問題,對于許多情況,使用進程可能更為合適。特別是,當您僅需要創建許多子進程并對響應進行偵聽時,那么標準庫子進程模塊可能使用起來更加容易。有關更多的官方說明文檔,請參考參考資料部分。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
