亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Python多進程與多線程編程及GIL詳解

系統 1580 0

介紹如何使用python的multiprocess和threading模塊進行多線程和多進程編程。

Python的多進程編程與multiprocess模塊

python的多進程編程主要依靠multiprocess模塊。我們先對比兩段代碼,看看多進程編程的優勢。我們模擬了一個非常耗時的任務,計算8的20次方,為了使這個任務顯得更耗時,我們還讓它sleep 2秒。第一段代碼是單進程計算(代碼如下所示),我們按順序執行代碼,重復計算2次,并打印出總共耗時。

            
              import
            
            
               time

            
            
              import
            
            
               os

            
            
              def
            
            
               long_time_task():
    
            
            
              print
            
            (
            
              '
            
            
              當前進程: {}
            
            
              '
            
            
              .format(os.getpid()))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))


            
            
              if
            
            
              __name__
            
             == 
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    
            
            
              print
            
            (
            
              '
            
            
              當前母進程: {}
            
            
              '
            
            
              .format(os.getpid()))
    start 
            
            =
            
               time.time()
    
            
            
              for
            
             i 
            
              in
            
             range(2
            
              ):
        long_time_task()

    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              用時{}秒
            
            
              "
            
            .format((end-start)))
          

輸出結果如下,總共耗時4秒,至始至終只有一個進程14236。看來電腦計算8的20次方基本不費時。

          當前母進程: 14236
          
當前進程: 14236
結果: 1152921504606846976
當前進程: 14236
結果: 1152921504606846976
用時4.01080060005188秒

第2段代碼是多進程計算代碼。我們利用multiprocess模塊的Process方法創建了兩個新的進程p1和p2來進行并行計算。Process方法接收兩個參數, 第一個是target,一般指向函數名,第二個時args,需要向函數傳遞的參數。對于創建的新進程,調用start()方法即可讓其開始。我們可以使用os.getpid()打印出當前進程的名字。

            
              from
            
             multiprocessing 
            
              import
            
            
               Process

            
            
              import
            
            
               os

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    
            
            
              print
            
            (
            
              '
            
            
              子進程: {} - 任務{}
            
            
              '
            
            
              .format(os.getpid(), i))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    
            
            
              print
            
            (
            
              '
            
            
              當前母進程: {}
            
            
              '
            
            
              .format(os.getpid()))
    start 
            
            =
            
               time.time()
    p1 
            
            = Process(target=long_time_task, args=(1
            
              ,))
    p2 
            
            = Process(target=long_time_task, args=(2
            
              ,))
    
            
            
              print
            
            (
            
              '
            
            
              等待所有子進程完成。
            
            
              '
            
            
              )
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時{}秒
            
            
              "
            
            .format((end - start)))
          

輸出結果如下所示,耗時變為2秒,時間減了一半,可見并發執行的時間明顯比順序執行要快很多。你還可以看到盡管我們只創建了兩個進程,可實際運行中卻包含里1個母進程和2個子進程。之所以我們使用join()方法就是為了讓母進程阻塞,等待子進程都完成后才打印出總共耗時,否則輸出時間只是母進程執行的時間。

          當前母進程: 6920
          
等待所有子進程完成。
子進程: 17020 - 任務1
子進程: 5904 - 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.131091356277466秒

知識點:

  • 新創建的進程與進程的切換都是要耗資源的,所以平時工作中進程數不能開太大。

  • 同時可以運行的進程數一般受制于CPU的核數。

  • 除了使用Process方法,我們還可以使用Pool類創建多進程。

?

利用multiprocess模塊的Pool類創建多進程

很多時候系統都需要創建多個進程以提高CPU的利用率,當數量較少時,可以手動生成一個個Process實例。當進程數量很多時,或許可以利用循環,但是這需要程序員手動管理系統中并發進程的數量,有時會很麻煩。這時進程池Pool就可以發揮其功效了。可以通過傳遞參數限制并發進程的數量,默認值為CPU的核數。?

Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,如果進程池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。?

下面介紹一下multiprocessing 模塊下的Pool類的幾個方法:

1.apply_async

函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

其作用是向進程池提交需要執行的函數及參數,?各個進程采用非阻塞(異步)的調用方式,即每個子進程只管運行自己的,不管其它進程是否已經完成。

2.map()

函數原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到結果返回。?注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。

3.map_async()

函數原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關事項見apply_async。

4.close()

關閉進程池(pool),使其不在接受新的任務。

5. terminate()

結束工作進程,不在處理未處理的任務。

6.join()

主進程阻塞等待子進程的退出, join方法要在close或terminate之后使用。

?

下例是一個簡單的multiprocessing.Pool類的實例。因為小編我的CPU是4核的,一次最多可以同時運行4個進程,所以我開啟了一個容量為4的進程池。4個進程需要計算5次,你可以想象4個進程并行4次計算任務后,還剩一次計算任務(任務4)沒有完成,系統會等待4個進程完成后重新安排一個進程來計算。

            
              from
            
             multiprocessing 
            
              import
            
            
               Pool, cpu_count

            
            
              import
            
            
               os

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    
            
            
              print
            
            (
            
              '
            
            
              子進程: {} - 任務{}
            
            
              '
            
            
              .format(os.getpid(), i))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    
            
            
              print
            
            (
            
              "
            
            
              CPU內核數:{}
            
            
              "
            
            
              .format(cpu_count()))
    
            
            
              print
            
            (
            
              '
            
            
              當前母進程: {}
            
            
              '
            
            
              .format(os.getpid()))
    start 
            
            =
            
               time.time()
    p 
            
            = Pool(4
            
              )
    
            
            
              for
            
             i 
            
              in
            
             range(5
            
              ):
        p.apply_async(long_time_task, args
            
            =
            
              (i,))
    
            
            
              print
            
            (
            
              '
            
            
              等待所有子進程完成。
            
            
              '
            
            
              )
    p.close()
    p.join()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時{}秒
            
            
              "
            
            .format((end - start)))
          

知識點: ?

  • 對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close()或terminate()方法,讓其不再接受新的Process了。

?輸出結果如下所示,5個任務(每個任務大約耗時2秒)使用多進程并行計算只需4.37秒,, 耗時減少了60%。

          CPU內核數:4
          
當前母進程: 2556
等待所有子進程完成。
子進程: 16480 - 任務0
子進程: 15216 - 任務1
子進程: 15764 - 任務2
子進程: 10176 - 任務3
結果: 1152921504606846976
結果: 1152921504606846976
子進程: 15216 - 任務4
結果: 1152921504606846976
結果: 1152921504606846976
結果: 1152921504606846976
總共用時4.377134561538696秒

?相信大家都知道python解釋器中存在GIL(全局解釋器鎖), 它的作用就是保證同一時刻只有一個線程可以執行代碼。由于GIL的存在,很多人認為python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。然而這并意味著python多線程編程沒有意義哦,請繼續閱讀下文。

? ?多進程間的數據共享與通信

?通常,進程之間是相互獨立的,每個進程都有獨立的內存。通過共享內存(nmap模塊),進程之間可以共享對象,使多個進程可以訪問同一個變量(地址相同,變量名可能不同)。多進程共享資源必然會導致進程間相互競爭,所以應該盡最大可能防止使用共享狀態。還有一種方式就是使用隊列queue來實現不同進程間的通信或數據共享,這一點和多線程編程類似。

            
              from
            
             multiprocessing 
            
              import
            
            
               Process, Queue

            
            
              import
            
            
               os, time, random

            
            
              #
            
            
               寫數據進程執行的代碼:
            
            
              def
            
            
               write(q):
    
            
            
              print
            
            (
            
              '
            
            
              Process to write: {}
            
            
              '
            
            
              .format(os.getpid()))
    
            
            
              for
            
             value 
            
              in
            
             [
            
              '
            
            
              A
            
            
              '
            
            , 
            
              '
            
            
              B
            
            
              '
            
            , 
            
              '
            
            
              C
            
            
              '
            
            
              ]:
        
            
            
              print
            
            (
            
              '
            
            
              Put %s to queue...
            
            
              '
            
             %
            
               value)
        q.put(value)
        time.sleep(random.random())

            
            
              #
            
            
               讀數據進程執行的代碼:
            
            
              def
            
            
               read(q):
    
            
            
              print
            
            (
            
              '
            
            
              Process to read:{}
            
            
              '
            
            
              .format(os.getpid()))
    
            
            
              while
            
            
               True:
        value 
            
            =
            
               q.get(True)
        
            
            
              print
            
            (
            
              '
            
            
              Get %s from queue.
            
            
              '
            
             %
            
               value)

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    
            
            
              #
            
            
               父進程創建Queue,并傳給各個子進程:
            
            
   q =
            
               Queue()
    pw 
            
            = Process(target=write, args=
            
              (q,))
    pr 
            
            = Process(target=read, args=
            
              (q,))
    
            
            
              #
            
            
               啟動子進程pw,寫入:
            
            
                  pw.start()
    
            
            
              #
            
            
               啟動子進程pr,讀取:
            
            
                  pr.start()
    
            
            
              #
            
            
               等待pw結束:
            
            
                  pw.join()
    
            
            
              #
            
            
               pr進程里是死循環,無法等待其結束,只能強行終止:
            
            
    pr.terminate()
          

下例這段代碼中中創建了2個獨立進程,一個負責寫(pw), 一個負責讀(pr), 實現了共享一個隊列queue。

輸出結果如下所示:

          Process to write: 3036
          
Put A to queue...
Process to read:9408
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

? Python的多線程編程與threading模塊

?python 3中的多進程編程主要依靠threading模塊。創建新線程與創建新進程的方法非常類似。threading.Thread方法可以接收兩個參數, 第一個是target,一般指向函數名,第二個時args,需要向函數傳遞的參數。對于創建的新線程,調用start()方法即可讓其開始。我們還可以使用current_thread().name打印出當前線程的名字。 下例中我們使用多線程技術重構之前的計算代碼。

            
              import
            
            
               threading

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    
            
            
              print
            
            (
            
              '
            
            
              當前子線程: {} - 任務{}
            
            
              '
            
            
              .format(threading.current_thread().name, i))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    start 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              '
            
            
              這是主線程:{}
            
            
              '
            
            
              .format(threading.current_thread().name))
    t1 
            
            = threading.Thread(target=long_time_task, args=(1
            
              ,))
    t2 
            
            = threading.Thread(target=long_time_task, args=(2
            
              ,))
    t1.start()
    t2.start()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時{}秒
            
            
              "
            
            .format((end -
            
               start)))
              

下面是輸出結果。為什么總耗時居然是0秒??我們可以明顯看到主線程和子線程其實是獨立運行的,主線程根本沒有等子線程完成,而是自己結束后就打印了消耗時間。主線程結束后,子線程仍在獨立運行,這顯然不是我們想要的。

          這是主線程:MainThread
          
當前子線程: Thread-1 - 任務1
當前子線程: Thread-2 - 任務2
總共用時0.0017192363739013672秒
結果: 1152921504606846976
結果: 1152921504606846976

如果要實現主線程和子線程的同步,我們必需使用join方法(代碼如下所示)。

            
              import
            
            
               threading

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    
            
            
              print
            
            (
            
              '
            
            
              當前子線程: {} 任務{}
            
            
              '
            
            
              .format(threading.current_thread().name, i))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    start 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              '
            
            
              這是主線程:{}
            
            
              '
            
            
              .format(threading.current_thread().name))
    thread_list 
            
            =
            
               []
    
            
            
              for
            
             i 
            
              in
            
             range(1, 3
            
              ):
        t 
            
            = threading.Thread(target=long_time_task, args=
            
              (i, ))
        thread_list.append(t)
    
            
            
              for
            
             t 
            
              in
            
            
               thread_list:
        t.start()
    
            
            
              for
            
             t 
            
              in
            
            
               thread_list:
        t.join()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時{}秒
            
            
              "
            
            .format((end - start)))
          

修改代碼后的輸出如下所示。這時你可以看到主線程在等子線程完成后才答應出總消耗時間(2秒),比正常順序執行代碼(4秒)還是節省了不少時間。

          這是主線程:MainThread
          
當前子線程: Thread - 1 任務1
當前子線程: Thread - 2 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.0166890621185303秒

當我們設置多線程時,主線程會創建多個子線程,在python中,默認情況下主線程和子線程獨立運行互不干涉。如果希望讓主線程等待子線程實現線程的同步,我們需要使用join()方法。如果我們希望一個主線程結束時不再執行子線程,我們應該怎么辦呢? 我們可以使用t.setDaemon(True),代碼如下所示。

            
              import
            
            
               threading

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task():
    
            
            
              print
            
            (
            
              '
            
            
              當子線程: {}
            
            
              '
            
            
              .format(threading.current_thread().name))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    start 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              '
            
            
              這是主線程:{}
            
            
              '
            
            
              .format(threading.current_thread().name))
    
            
            
              for
            
             i 
            
              in
            
             range(5
            
              ):
        t 
            
            = threading.Thread(target=long_time_task, args=
            
              ())
        t.setDaemon(True)
        t.start()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時{}秒
            
            
              "
            
            .format((end - start)))
          

通過繼承Thread類重寫run方法創建新線程

?除了使用Thread()方法創建新的線程外,我們還可以通過繼承Thread類重寫run方法創建新的線程,這種方法更靈活。下例中我們自定義的類為MyThread, 隨后我們通過該類的實例化創建了2個子線程。

            
              #
            
            
              -*- encoding:utf-8 -*-
            
            
              import
            
            
               threading

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    time.sleep(
            
            2
            
              )
    
            
            
              return
            
             8**20

            
              class
            
            
               MyThread(threading.Thread):
    
            
            
              def
            
            
              __init__
            
            (self, func, args , name=
            
              ''
            
            
              , ):
        threading.Thread.
            
            
              __init__
            
            
              (self)
        self.func 
            
            =
            
               func
        self.args 
            
            =
            
               args
        self.name 
            
            =
            
               name
        self.result 
            
            =
            
               None
    
            
            
              def
            
            
               run(self):
        
            
            
              print
            
            (
            
              '
            
            
              開始子進程{}
            
            
              '
            
            
              .format(self.name))
        self.result 
            
            =
            
               self.func(self.args[0],)
        
            
            
              print
            
            (
            
              "
            
            
              結果: {}
            
            
              "
            
            
              .format(self.result))
        
            
            
              print
            
            (
            
              '
            
            
              結束子進程{}
            
            
              '
            
            
              .format(self.name))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    start 
            
            =
            
               time.time()
    threads 
            
            =
            
               []
    
            
            
              for
            
             i 
            
              in
            
             range(1, 3
            
              ):
        t 
            
            =
            
               MyThread(long_time_task, (i,), str(i))
        threads.append(t)
    
            
            
              for
            
             t 
            
              in
            
            
               threads:
        t.start()
    
            
            
              for
            
             t 
            
              in
            
            
               threads:
        t.join()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時{}秒
            
            
              "
            
            .format((end - start)))
          

輸出結果如下所示:

          開始子進程1
          
開始子進程2
結果: 1152921504606846976
結果: 1152921504606846976
結束子進程1
結束子進程2
總共用時2.005445718765259秒

? 不同線程間的數據共享

一個進程所含的不同線程間共享內存,這就意味著任何一個變量都可以被任何一個線程修改,因此線程之間共享數據最大的危險在于多個線程同時改一個變量,把內容給改亂了。如果不同線程間有共享的變量,其中一個方法就是在修改前給其上一把鎖lock,確保一次只有一個線程能修改它。threading.lock()方法可以輕易實現對一個共享變量的鎖定,修改完后release供其它線程使用。比如下例中賬戶余額balance是一個共享變量,使用lock可以使其不被改亂。

            
              #
            
            
               -*- coding: utf-8 -*
            
            
              import
            
            
               threading

            
            
              class
            
            
               Account:
    
            
            
              def
            
            
              __init__
            
            
              (self):
        self.balance 
            
            =
            
               0
    
            
            
              def
            
            
               add(self, lock):
        
            
            
              #
            
            
               獲得鎖
            
            
                      lock.acquire()
        
            
            
              for
            
             i 
            
              in
            
             range(0, 100000
            
              ):
            self.balance 
            
            += 1
        
            
              #
            
            
               釋放鎖
            
            
                      lock.release()
    
            
            
              def
            
            
               delete(self, lock):
        
            
            
              #
            
            
               獲得鎖
            
            
                      lock.acquire()
        
            
            
              for
            
             i 
            
              in
            
             range(0, 100000
            
              ):
            self.balance 
            
            -= 1
            
            
              #
            
            
               釋放鎖
            
            
                      lock.release()

            
            
              if
            
            
              __name__
            
             == 
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    account 
            
            =
            
               Account()
    lock 
            
            =
            
               threading.Lock()
    
            
            
              #
            
            
               創建線程
            
            
   thread_add = threading.Thread(target=account.add, args=(lock,), name=
            
              '
            
            
              Add
            
            
              '
            
            
              )
    thread_delete 
            
            = threading.Thread(target=account.delete, args=(lock,), name=
            
              '
            
            
              Delete
            
            
              '
            
            
              )
    
            
            
              #
            
            
               啟動線程
            
            
                 thread_add.start()
    thread_delete.start()
    
            
            
              #
            
            
               等待線程結束
            
            
                 thread_add.join()
    thread_delete.join()
    
            
            
              print
            
            (
            
              '
            
            
              The final balance is: {}
            
            
              '
            
            .format(account.balance))
          

?

另一種實現不同線程間數據共享的方法就是使用消息隊列queue。不像列表,queue是線程安全的,可以放心使用,見下文。

? 使用queue隊列通信-經典的生產者和消費者模型

下例中創建了兩個線程,一個負責生成,一個負責消費,所生成的產品存放在queue里,實現了不同線程間溝通。

            
              from
            
             queue 
            
              import
            
            
               Queue

            
            
              import
            
            
               random, threading, time

            
            
              #
            
            
               生產者類
            
            
              class
            
            
               Producer(threading.Thread):
    
            
            
              def
            
            
              __init__
            
            
              (self, name, queue):
        threading.Thread.
            
            
              __init__
            
            (self, name=
            
              name)
        self.queue 
            
            =
            
               queue
    
            
            
              def
            
            
               run(self):
        
            
            
              for
            
             i 
            
              in
            
             range(1, 5
            
              ):
            
            
            
              print
            
            (
            
              "
            
            
              {} is producing {} to the queue!
            
            
              "
            
            
              .format(self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randrange(
            
            10) / 5
            
              )
        
            
            
              print
            
            (
            
              "
            
            
              %s finished!
            
            
              "
            
             %
            
               self.getName())

            
            
              #
            
            
               消費者類
            
            
              class
            
            
               Consumer(threading.Thread):
    
            
            
              def
            
            
              __init__
            
            
              (self, name, queue):
        threading.Thread.
            
            
              __init__
            
            (self, name=
            
              name)
        self.queue 
            
            =
            
               queue

    
            
            
              def
            
            
               run(self):
        
            
            
              for
            
             i 
            
              in
            
             range(1, 5
            
              ):
            val 
            
            =
            
               self.queue.get()
            
            
            
              print
            
            (
            
              "
            
            
              {} is consuming {} in the queue.
            
            
              "
            
            
              .format(self.getName(), val))
            time.sleep(random.randrange(
            
            10
            
              ))
        
            
            
              print
            
            (
            
              "
            
            
              %s finished!
            
            
              "
            
             %
            
               self.getName())

            
            
              def
            
            
               main():
    queue 
            
            =
            
               Queue()
    producer 
            
            = Producer(
            
              '
            
            
              Producer
            
            
              '
            
            
              , queue)
    consumer 
            
            = Consumer(
            
              '
            
            
              Consumer
            
            
              '
            
            
              , queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
    
            
            
              print
            
            (
            
              '
            
            
              All threads finished!
            
            
              '
            
            
              )

            
            
              if
            
            
              __name__
            
             == 
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    main()
            
          

隊列queue的put方法可以將一個對象obj放入隊列中。如果隊列已滿,此方法將阻塞至隊列有空間可用為止。queue的get方法一次返回隊列中的一個成員。如果隊列為空,此方法將阻塞至隊列中有成員可用為止。queue同時還自帶emtpy(), full()等方法來判斷一個隊列是否為空或已滿,但是這些方法并不可靠,因為多線程和多進程,在返回結果和使用結果之間,隊列中可能添加/刪除了成員。

? Python多進程和多線程哪個快?

?由于GIL的存在,很多人認為Python多進程編程更快,針對多核CPU,理論上來說也是采用多進程更能有效利用資源。網上很多人已做過比較,我直接告訴你結論吧。

  • 對CPU密集型代碼(比如循環計算) - 多進程效率更高

  • 對IO密集型代碼(比如文件操作,網絡爬蟲) - 多線程效率更高。?

為什么是這樣呢?其實也不難理解。對于IO密集型操作,大部分消耗時間其實是等待時間,在等待時間中CPU是不需要工作的,那你在此期間提供雙CPU資源也是利用不上的,相反對于CPU密集型代碼,2個CPU干活肯定比一個CPU快很多。那么為什么多線程會對IO密集型代碼有用呢?這因是為python碰到等待會釋放GIL供新的線程使用,實現了線程間的切換。

?

GIL是什么

首先需要明確的一點是GIL并不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這里要先明確一點:GIL并不是Python的特性,Python完全可以不依賴于GIL。

GIL:? 一個防止多線程并發執行機器碼的一個Mutex,乍一看就是個BUG般存在的全局鎖嘛!別急,我們下面慢慢的分析。

為什么會有GIL

由于物理上得限制,各CPU廠商在核心頻率上的比賽已經被多核所取代。為了更有效的利用多核處理器的性能,就出現了多線程的編程方式,而隨之帶來的就是線程間數據一致性和狀態同步的困難。即使在CPU內部的Cache也不例外,為了有效解決多份緩存之間的數據同步時各廠商花費了不少心思,也不可避免的帶來了一定的性能損失。

Python當然也逃不開,為了利用多核,Python開始支持多線程。而解決多線程之間數據完整性和狀態同步的最簡單方法自然就是加鎖。 于是有了GIL這把超級大鎖,而當越來越多的代碼庫開發者接受了這種設定后,他們開始大量依賴這種特性(即默認python內部對象是thread-safe的,無需在實現時考慮額外的內存鎖和同步操作)。

慢慢的這種實現方式被發現是蛋疼且低效的。但當大家試圖去拆分和去除GIL的時候,發現大量庫代碼開發者已經重度依賴GIL而非常難以去除了。有多難?做個類比,像MySQL這樣的“小項目”為了把Buffer Pool Mutex這把大鎖拆分成各個小鎖也花了從5.5到5.6再到5.7多個大版為期近5年的時間,本且仍在繼續。MySQL這個背后有公司支持且有固定開發團隊的產品走的如此艱難,那又更何況Python這樣核心開發和代碼貢獻者高度社區化的團隊呢?

所以簡單的說GIL的存在更多的是歷史原因。如果推到重來,多線程的問題依然還是要面對,但是至少會比目前GIL這種方式會更優雅。

? GIL的影響

從上文的介紹和官方的定義來看,GIL無疑就是一把全局排他鎖。毫無疑問全局鎖的存在會對多線程的效率有不小影響。甚至就幾乎等于Python是個單線程的程序。
那么讀者就會說了,全局鎖只要釋放的勤快效率也不會差啊。只要在進行耗時的IO操作的時候,能釋放GIL,這樣也還是可以提升運行效率的嘛。或者說再差也不會比單線程的效率差吧。理論上是這樣,而實際上呢?Python比你想的更糟。

下面我們就對比下Python在多線程和單線程下得效率對比。測試方法很簡單,一個循環1億次的計數器函數。一個通過單線程執行兩次,一個多線程執行。最后比較執行總時間。測試環境為雙核的Mac pro。注:為了減少線程庫本身性能損耗對測試結果帶來的影響,這里單線程的代碼同樣使用了線程。只是順序的執行兩次,模擬單線程。

順序執行的單線程(single_thread.py)

                    
                      #
                    
                    
                      ! /usr/bin/python
                    
                    
                      from
                    
                     threading 
                    
                      import
                    
                    
                       Thread

                    
                    
                      import
                    
                    
                       time
 

                    
                    
                      def
                    
                    
                       my_counter():
    i 
                    
                    =
                    
                       0
    
                    
                    
                      for
                    
                     _ 
                    
                      in
                    
                     range(100000000
                    
                      ):
        i 
                    
                    = i + 1
    
                    
                      return
                    
                    
                       True
 

                    
                    
                      def
                    
                    
                       main():
    thread_array 
                    
                    =
                    
                       {}
    start_time 
                    
                    =
                    
                       time.time()
    
                    
                    
                      for
                    
                     tid 
                    
                      in
                    
                     range(2
                    
                      ):
        t 
                    
                    = Thread(target=
                    
                      my_counter)
        t.start()
        t.join()
    end_time 
                    
                    =
                    
                       time.time()
    
                    
                    
                      print
                    
                    (
                    
                      "
                    
                    
                      Total time: {}
                    
                    
                      "
                    
                    .format(end_time -
                    
                       start_time))
 

                    
                    
                      if
                    
                    
                      __name__
                    
                     == 
                    
                      '
                    
                    
                      __main__
                    
                    
                      '
                    
                    
                      :
    main()
                    
                  

同時執行的兩個并發線程(multi_thread.py)

                    
                      #
                    
                    
                      ! /usr/bin/python
                    
                    
                      from
                    
                     threading 
                    
                      import
                    
                    
                       Thread

                    
                    
                      import
                    
                    
                       time
 

                    
                    
                      def
                    
                    
                       my_counter():
    i 
                    
                    =
                    
                       0
    
                    
                    
                      for
                    
                     _ 
                    
                      in
                    
                     range(100000000
                    
                      ):
        i 
                    
                    = i + 1
    
                    
                      return
                    
                    
                       True
 

                    
                    
                      def
                    
                    
                       main():
    thread_array 
                    
                    =
                    
                       {}
    start_time 
                    
                    =
                    
                       time.time()
    
                    
                    
                      for
                    
                     tid 
                    
                      in
                    
                     range(2
                    
                      ):
        t 
                    
                    = Thread(target=
                    
                      my_counter)
        t.start()
        thread_array[tid] 
                    
                    =
                    
                       t
    
                    
                    
                      for
                    
                     i 
                    
                      in
                    
                     range(2
                    
                      ):
        thread_array[i].join()
    end_time 
                    
                    =
                    
                       time.time()
    
                    
                    
                      print
                    
                    (
                    
                      "
                    
                    
                      Total time: {}
                    
                    
                      "
                    
                    .format(end_time -
                    
                       start_time))
 

                    
                    
                      if
                    
                    
                      __name__
                    
                     == 
                    
                      '
                    
                    
                      __main__
                    
                    
                      '
                    
                    
                      :
    main()
                    
                  

python在多線程的情況下居然比單線程整整慢了45%。按照之前的分析,即使是有GIL全局鎖的存在,串行化的多線程也應該和單線程有一樣的效率才對。那么怎么會有這么糟糕的結果呢?

讓我們通過GIL的實現原理來分析這其中的原因。

當前GIL設計的缺陷

基于pcode數量的調度方式

按照Python社區的想法,操作系統本身的線程調度已經非常成熟穩定了,沒有必要自己搞一套。所以Python的線程就是C語言的一個pthread,并通過操作系統調度算法進行調度(例如linux是CFS)。為了讓各個線程能夠平均利用CPU時間,python會計算當前已執行的微代碼數量,達到一定閾值后就強制釋放GIL。而這時也會觸發一次操作系統的線程調度(當然是否真正進行上下文切換由操作系統自主決定)。

偽代碼

                    
                      while
                    
                    
                       True:
    acquire GIL
    
                    
                    
                      for
                    
                     i 
                    
                      in
                    
                     1000
                    
                      :
        do something
    release GIL
    
                    
                    /* Give Operating System a chance to do thread scheduling */
                  

這種模式在只有一個CPU核心的情況下毫無問題。任何一個線程被喚起時都能成功獲得到GIL(因為只有釋放了GIL才會引發線程調度)。但當CPU有多個核心的時候,問題就來了。從偽代碼可以看到,從release GIL到acquire GIL之間幾乎是沒有間隙的。所以當其他在其他核心上的線程被喚醒時,大部分情況下主線程已經又再一次獲取到GIL了。這個時候被喚醒執行的線程只能白白的浪費CPU時間,看著另一個線程拿著GIL歡快的執行著。然后達到切換時間后進入待調度狀態,再被喚醒,再等待,以此往復惡性循環。

PS:當然這種實現方式是原始而丑陋的,Python的每個版本中也在逐漸改進GIL和線程調度之間的互動關系。例如先嘗試持有GIL在做線程上下文切換,在IO等待時釋放GIL等嘗試。但是無法改變的是GIL的存在使得操作系統線程調度的這個本來就昂貴的操作變得更奢侈了。
關于GIL影響的擴展閱讀

為了直觀的理解GIL對于多線程帶來的性能影響,這里直接借用的一張測試結果圖(見下圖)。圖中表示的是兩個線程在雙核CPU上得執行情況。兩個線程均為CPU密集型運算線程。綠色部分表示該線程在運行,且在執行有用的計算,紅色部分為線程被調度喚醒,但是無法獲取GIL導致無法進行有效運算等待的時間。

由圖可見,GIL的存在導致多線程無法很好的立即多核CPU的并發處理能力。

那么Python的IO密集型線程能否從多線程中受益呢?我們來看下面這張測試結果。顏色代表的含義和上圖一致。白色部分表示IO線程處于等待。可見,當IO線程收到數據包引起終端切換后,仍然由于一個CPU密集型線程的存在,導致無法獲取GIL鎖,從而進行無盡的循環等待。
Python多進程與多線程編程及GIL詳解_第1張圖片

簡單的總結下就是:Python的多線程在多核CPU上,只對于IO密集型計算產生正面效果;而當有至少有一個CPU密集型線程存在,那么多線程效率會由于GIL而大幅下降。

如何避免受到GIL的影響

說了那么多,如果不說解決方案就僅僅是個科普帖,然并卵。GIL這么爛,有沒有辦法繞過呢?我們來看看有哪些現成的方案。

用multiprocess替代Thread

multiprocess庫的出現很大程度上是為了彌補thread庫因為GIL而低效的缺陷。它完整的復制了一套thread所提供的接口方便遷移。唯一的不同就是它使用了多進程而不是多線程。每個進程有自己的獨立的GIL,因此也不會出現進程之間的GIL爭搶。

當然multiprocess也不是萬能良藥。它的引入會增加程序實現時線程間數據通訊和同步的困難。就拿計數器來舉例子,如果我們要多個線程累加同一個變量,對于thread來說,申明一個global變量,用thread.Lock的context包裹住三行就搞定了。而multiprocess由于進程之間無法看到對方的數據,只能通過在主線程申明一個Queue,put再get或者用share memory的方法。這個額外的實現成本使得本來就非常痛苦的多線程程序編碼,變得更加痛苦了。

用其他解析器

之前也提到了既然GIL只是CPython的產物,那么其他解析器是不是更好呢?沒錯,像JPython和IronPython這樣的解析器由于實現語言的特性,他們不需要GIL的幫助。然而由于用了Java/C#用于解析器實現,他們也失去了利用社區眾多C語言模塊有用特性的機會。所以這些解析器也因此一直都比較小眾。畢竟功能和性能大家在初期都會選擇前者,Done is better than perfect。

所以沒救了么?

當然Python社區也在非常努力的不斷改進GIL,甚至是嘗試去除GIL。并在各個小版本中有了不少的進步。

另一個改進Reworking the GIL
– 將切換顆粒度從基于opcode計數改成基于時間片計數
– 新增線程優先級功能(高優先級線程可以迫使其他線程釋放所持有的GIL鎖)
– 避免最近一次釋放GIL鎖的線程再次被立即調度

總結

Python GIL其實是功能和性能之間權衡后的產物,它尤其存在的合理性,也有較難改變的客觀因素。從本問的分析中,我們可以做以下一些簡單的總結:

      • 因為GIL的存在,只有IO Bound場景下得多線程會得到較好的性能
      • 如果對并行計算性能較高的程序可以考慮把核心部分也成C模塊,或者索性用其他語言實現
      • GIL在較長一段時間內將會繼續存在,但是會不斷對其進行改進
參考資料:

一文看懂Python多進程與多線程編程

python中的GIL詳解


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 亚洲国产精品aa在线看 | 欧美特级毛片aaaa | 精品99视频 | 国产乱人视频在线播放不卡 | 国产精品久久毛片蜜月 | 久久综合国产 | 色综合久久综合欧美综合 | 四虎视频国产精品免费入口 | 欧美日韩亚洲国产 | 欧美激情级毛片 | 色老头成人免费视频天天综合 | 亚洲国产高清美女在线观看 | 2021国产成人综合亚洲精品 | 欧美另类久久久精品 | 亚洲国产视频在线 | 久久精品国产一区二区 | 99视频在线精品免费 | 久久久成人啪啪免费网站 | 欧美日韩中字 | 不卡网站| 快播激情| 亚洲va精品中文字幕动漫 | 欧美日韩中文视频 | 日韩经典欧美精品一区 | 狠狠丁香激情久久综合 | 欧美xxx网站 | 久久国产欧美另类久久久 | 91香蕉网站 | aaaa一级片| 久久久久久久久中文字幕 | 日本aaaa级毛片在线看 | 女女女女女女bbbbbb级毛片 | 国产成人麻豆精品 | 欧美精品久久久久久久免费观看 | 天天天天躁天天天天碰 | 国产精品久久久久久久久 | 五月天亚洲 | 97国产在线视频公开免费 | 免费一级a毛片夜夜看 | 婷婷伊人五月 | 97在线碰碰观看免费高清 |