目錄
-
python多進(jìn)程
- 序.multiprocessing
-
一、Process
- process介紹
- 例1.1:創(chuàng)建函數(shù)并將其作為單個(gè)進(jìn)程
- 例1.2:創(chuàng)建函數(shù)并將其作為多個(gè)進(jìn)程
- 例1.3:將進(jìn)程定義為類
- 例1.4:daemon程序?qū)Ρ冉Y(jié)果
- 二、Lock
- 三、Semaphore
- 四、Event
- 五、Queue
- 六、Pipe
-
七、Pool
- 例7.1:使用進(jìn)程池(非阻塞)
- 例7.2:使用進(jìn)程池(阻塞)
- 例7.3:使用進(jìn)程池,并關(guān)注結(jié)果
- 例7.4:使用多個(gè)進(jìn)程池
python多進(jìn)程
序.multiprocessing
python中的多線程其實(shí)并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進(jìn)程。Python提供了非常好用的多進(jìn)程包multiprocessing,只需要定義一個(gè)函數(shù),Python會(huì)完成其他所有事情。借助這個(gè)包,可以輕松完成從單進(jìn)程到并發(fā)執(zhí)行的轉(zhuǎn)換。multiprocessing支持子進(jìn)程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
一、Process
process介紹
-
創(chuàng)建進(jìn)程的類 :Process([group [, target [, name [, args [, kwargs]]]]]),target表示調(diào)用對(duì)象,args表示調(diào)用對(duì)象的位置參數(shù)元組。kwargs表示調(diào)用對(duì)象的字典。name為別名。group實(shí)質(zhì)上不使用。
-
方法 :is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動(dòng)某個(gè)進(jìn)程。
-
屬性 :authkey、daemon(要通過(guò)start()設(shè)置)、exitcode(進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束)、name、pid。其中daemon是父進(jìn)程終止后自動(dòng)終止,且自己不能產(chǎn)生新進(jìn)程,必須在start()之前設(shè)置。
例1.1:創(chuàng)建函數(shù)并將其作為單個(gè)進(jìn)程
import multiprocessing
import time
def worker(interval):
n = 5
while n > 0:
print("The time is {0}".format(time.ctime()))
time.sleep(interval)
n -= 1
if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.start()
print("p.pid:", p.pid)
print("p.name:", p.name)
print("p.is_alive:", p.is_alive())
------------------------------------------------
>>> p.pid: 1004
>>> p.name: Process-1
>>> p.is_alive: True
>>> The time is Mon Jul 29 21:31:11 2019
>>> The time is Mon Jul 29 21:31:14 2019
>>> The time is Mon Jul 29 21:31:17 2019
>>> The time is Mon Jul 29 21:31:20 2019
>>> The time is Mon Jul 29 21:31:23 2019
例1.2:創(chuàng)建函數(shù)并將其作為多個(gè)進(jìn)程
import multiprocessing
import time
def worker_1(interval):
print("worker_1")
time.sleep(interval)
print("end worker_1")
def worker_2(interval):
print("worker_2")
time.sleep(interval)
print("end worker_2")
def worker_3(interval):
print("worker_3")
time.sleep(interval)
print("end worker_3")
if __name__ == "__main__":
p1 = multiprocessing.Process(target = worker_1, args = (2,))
p2 = multiprocessing.Process(target = worker_2, args = (3,))
p3 = multiprocessing.Process(target = worker_3, args = (4,))
p1.start()
p2.start()
p3.start()
print("The number of CPU is:" + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print("child p.name:" + p.name + "\tp.id" + str(p.pid))
print("END")
------------------------------------------------
>>> The number of CPU is:8
>>> child p.name:Process-3 p.id18208
>>> child p.name:Process-2 p.id1404
>>> child p.name:Process-1 p.id11684
>>> END
>>> worker_1
>>> worker_2
>>> worker_3
>>> end worker_1
>>> end worker_2
>>> end worker_3
例1.3:將進(jìn)程定義為類
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def __init__(self, interval):
multiprocessing.Process.__init__(self)
self.interval = interval
def run(self):
n = 5
while n > 0:
print("the time is {0}".format(time.ctime()))
time.sleep(self.interval)
n -= 1
if __name__ == '__main__':
p = ClockProcess(3)
p.start()
------------------------------------------------
>>> the time is Mon Jul 29 21:43:07 2019
>>> the time is Mon Jul 29 21:43:10 2019
>>> the time is Mon Jul 29 21:43:13 2019
>>> the time is Mon Jul 29 21:43:16 2019
>>> the time is Mon Jul 29 21:43:19 2019
注 :進(jìn)程p調(diào)用start()時(shí),自動(dòng)調(diào)用run()
例1.4:daemon程序?qū)Ρ冉Y(jié)果
1.4-1 不加daemon屬性
import multiprocessing
import time
def worker(interval):
print("work start:{0}".format(time.ctime()));
time.sleep(interval)
print("work end:{0}".format(time.ctime()));
if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.start()
print("end!")
------------------------------------------------
>>> end!
>>> work start:Tue Jul 29 21:29:10 2019
>>> work end:Tue Jul 29 21:29:13 2019
1.4-2 加上daemon屬性
import multiprocessing
import time
def worker(interval):
print("work start:{0}".format(time.ctime()));
time.sleep(interval)
print("work end:{0}".format(time.ctime()));
if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.daemon = True
p.start()
print("end!")
------------------------------------------------
>>> end!
注 :因子進(jìn)程設(shè)置了daemon屬性,主進(jìn)程結(jié)束,它們就隨著結(jié)束了。
1.4-3 設(shè)置daemon執(zhí)行完結(jié)束的方法
import multiprocessing
import time
def worker(interval):
print("work start:{0}".format(time.ctime()));
time.sleep(interval)
print("work end:{0}".format(time.ctime()));
if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.daemon = True
p.start()
p.join()
print("end!")
------------------------------------------------
>>> work start:Tue Jul 29 22:16:32 2019
>>> work end:Tue Jul 29 22:16:35 2019
>>> end!
二、Lock
當(dāng)多個(gè)進(jìn)程需要訪問(wèn)共享資源的時(shí)候,Lock可以用來(lái)避免訪問(wèn)的沖突。
import multiprocessing
import sys
def worker_with(lock, f):
with lock:
fs = open(f, 'a+')
n = 10
while n > 1:
fs.write("Lockd acquired via with\n")
n -= 1
fs.close()
def worker_no_with(lock, f):
lock.acquire()
try:
fs = open(f, 'a+')
n = 10
while n > 1:
fs.write("Lock acquired directly\n")
n -= 1
fs.close()
finally:
lock.release()
if __name__ == "__main__":
lock = multiprocessing.Lock()
f = "file.txt"
w = multiprocessing.Process(target = worker_with, args=(lock, f))
nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
w.start()
nw.start()
print("end")
------------------------------------------------
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
三、Semaphore
Semaphore用來(lái)控制對(duì)共享資源的訪問(wèn)數(shù)量,例如池的最大連接數(shù)。
import multiprocessing
import time
def worker(s, i):
s.acquire()
print(multiprocessing.current_process().name + "acquire");
time.sleep(i)
print(multiprocessing.current_process().name + "release\n");
s.release()
if __name__ == "__main__":
s = multiprocessing.Semaphore(2)
for i in range(5):
p = multiprocessing.Process(target = worker, args=(s, i*2))
p.start()
------------------------------------------------
>>> Process-1acquire
>>> Process-1release
>>>
>>> Process-2acquire
>>> Process-3acquire
>>> Process-2release
>>>
>>> Process-5acquire
>>> Process-3release
>>>
>>> Process-4acquire
>>> Process-5release
>>>
>>> Process-4release
四、Event
Event用來(lái)實(shí)現(xiàn)進(jìn)程間同步通信。
import multiprocessing
import time
def wait_for_event(e):
print("wait_for_event: starting")
e.wait()
print("wairt_for_event: e.is_set()->" + str(e.is_set()))
def wait_for_event_timeout(e, t):
print("wait_for_event_timeout:starting")
e.wait(t)
print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
if __name__ == "__main__":
e = multiprocessing.Event()
w1 = multiprocessing.Process(name = "block",
target = wait_for_event,
args = (e,))
w2 = multiprocessing.Process(name = "non-block",
target = wait_for_event_timeout,
args = (e, 2))
w1.start()
w2.start()
time.sleep(3)
e.set()
print("main: event is set")
------------------------------------------------
>>> wait_for_event: starting
>>> wait_for_event_timeout:starting
>>> wait_for_event_timeout:e.is_set->False
>>> main: event is set
>>> wairt_for_event: e.is_set()->True
五、Queue
Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。put方法用以插入數(shù)據(jù)到隊(duì)列中,put方法還有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會(huì)拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會(huì)立即拋出Queue.Full異常。
get方法可以從隊(duì)列讀取并且刪除一個(gè)元素。同樣,get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒(méi)有取到任何元素,會(huì)拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常。Queue的一段示例代碼:
import multiprocessing
def writer_proc(q):
try:
q.put(1, block = False)
except:
pass
def reader_proc(q):
try:
print(q.get(block = False))
except:
pass
if __name__ == "__main__":
q = multiprocessing.Queue()
writer = multiprocessing.Process(target=writer_proc, args=(q,))
writer.start()
reader = multiprocessing.Process(target=reader_proc, args=(q,))
reader.start()
reader.join()
writer.join()
------------------------------------------------
>>> 1
六、Pipe
Pipe方法返回(conn1, conn2)代表一個(gè)管道的兩個(gè)端。Pipe方法有duplex參數(shù),如果duplex參數(shù)為True(默認(rèn)值),那么這個(gè)管道是全雙工模式,也就是說(shuō)conn1和conn2均可收發(fā)。duplex為False,conn1只負(fù)責(zé)接受消息,conn2只負(fù)責(zé)發(fā)送消息。
send和recv方法分別是發(fā)送和接受消息的方法。例如,在全雙工模式下,可以調(diào)用conn1.send發(fā)送消息,conn1.recv接收消息。如果沒(méi)有消息可接收,recv方法會(huì)一直阻塞。如果管道已經(jīng)被關(guān)閉,那么recv方法會(huì)拋出EOFError。
import multiprocessing
import time
def proc1(pipe):
while True:
for i in range(10000):
print("send: %s" %(i))
pipe.send(i)
time.sleep(1)
def proc2(pipe):
while True:
print("proc2 rev:", pipe.recv())
time.sleep(1)
def proc3(pipe):
while True:
print("PROC3 rev:", pipe.recv())
time.sleep(1)
if __name__ == "__main__":
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
# p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
p1.start()
p2.start()
# p3.start()
p1.join()
p2.join()
# p3.join()
------------------------------------------------
>>> send: 0
>>> roc2 rev: 0
>>> send: 1
>>> proc2 rev: 1
>>> send: 2
>>> proc2 rev: 2
>>> send: 3
>>> proc2 rev: 3
>>> send: 4
>>> proc2 rev: 4
>>> send: 5
>>> proc2 rev: 5
>>> send: 6
>>> proc2 rev: 6
>>> send: 7
>>> proc2 rev: 7
>>> send: 8
>>> proc2 rev: 8
.
.
.
.
.
.
七、Pool
在利用Python進(jìn)行系統(tǒng)管理的時(shí)候,特別是同時(shí)操作多個(gè)文件目錄,或者遠(yuǎn)程控制多臺(tái)主機(jī),并行操作可以節(jié)約大量的時(shí)間。當(dāng)被操作對(duì)象數(shù)目不大時(shí),可以直接利用multiprocessing中的Process動(dòng)態(tài)成生多個(gè)進(jìn)程,十幾個(gè)還好,但如果是上百個(gè),上千個(gè)目標(biāo),手動(dòng)的去限制進(jìn)程數(shù)量卻又太過(guò)繁瑣,此時(shí)可以發(fā)揮進(jìn)程池的功效。
Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請(qǐng)求提交到pool中時(shí),如果池還沒(méi)有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來(lái)執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來(lái)它。
例7.1:使用進(jìn)程池(非阻塞)
import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(3)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in range(4):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, )) #維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會(huì)出錯(cuò)。執(zhí)行完close后不會(huì)有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束
print("Sub-process(es) done.")
------------------------------------------------
>>> Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
>>> msg: hello 0
>>> msg: hello 1
>>> msg: hello 2
>>> end
>>> msg: hello 3
>>> end
>>> end
>>> end
>>> Sub-process(es) done.
函數(shù)解釋:
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區(qū)別,看例1例2結(jié)果區(qū)別)
- close() 關(guān)閉pool,使其不在接受新的任務(wù)。
- terminate() 結(jié)束工作進(jìn)程,不在處理未完成的任務(wù)。
- join() 主進(jìn)程阻塞,等待子進(jìn)程的退出, join方法要在close或terminate之后使用。
執(zhí)行說(shuō)明:創(chuàng)建一個(gè)進(jìn)程池pool,并設(shè)定進(jìn)程的數(shù)量為3,xrange(4)會(huì)相繼產(chǎn)生四個(gè)對(duì)象[0, 1, 2, 4],四個(gè)對(duì)象被提交到pool中,因pool指定進(jìn)程數(shù)為3,所以0、1、2會(huì)直接送到進(jìn)程中執(zhí)行,當(dāng)其中一個(gè)執(zhí)行完事后才空出一個(gè)進(jìn)程處理對(duì)象3,所以會(huì)出現(xiàn)輸出“msg: hello 3”出現(xiàn)在"end"后。因?yàn)闉榉亲枞?,主函?shù)會(huì)自己執(zhí)行自個(gè)的,不搭理進(jìn)程的執(zhí)行,所以運(yùn)行完for循環(huán)后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個(gè)進(jìn)程的結(jié)束。
例7.2:使用進(jìn)程池(阻塞)
import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(3)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in range(4):
msg = "hello %d" %(i)
pool.apply(func, (msg, )) #維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會(huì)出錯(cuò)。執(zhí)行完close后不會(huì)有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束
print("Sub-process(es) done.")
------------------------------------------------
>>> msg: hello 0
>>> end
>>> msg: hello 1
>>> end
>>> msg: hello 2
>>> end
>>> msg: hello 3
>>> end
>>> Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
>>> Sub-process(es) done.
例7.3:使用進(jìn)程池,并關(guān)注結(jié)果
import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(3)
print("end")
return "done" + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in range(3):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print(":::", res.get())
print("Sub-process(es) done.")
------------------------------------------------
>>> msg: hello 0
>>> msg: hello 1
>>> msg: hello 2
>>> end
>>> end
>>> end
>>> ::: donehello 0
>>> ::: donehello 1
>>> ::: donehello 2
>>> Sub-process(es) done.
例7.4:使用多個(gè)進(jìn)程池
import multiprocessing
import os, time, random
def Lee():
print("\nRun task Lee-%s" % (os.getpid())) # os.getpid()獲取當(dāng)前的進(jìn)程的ID
start = time.time()
time.sleep(random.random() * 10) # random.random()隨機(jī)生成0-1之間的小數(shù)
end = time.time()
print('Task Lee, runs %0.2f seconds.' % (end - start))
def Marlon():
print("\nRun task Marlon-%s" % (os.getpid()))
start = time.time()
time.sleep(random.random() * 40)
end = time.time()
print('Task Marlon runs %0.2f seconds.' % (end - start))
def Allen():
print("\nRun task Allen-%s" % (os.getpid()))
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print('Task Allen runs %0.2f seconds.' % (end - start))
def Frank():
print("\nRun task Frank-%s" % (os.getpid()))
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print('Task Frank runs %0.2f seconds.' % (end - start))
if __name__ == '__main__':
function_list = [Lee, Marlon, Allen, Frank]
print("parent process %s" % (os.getpid()))
pool = multiprocessing.Pool(4)
for func in function_list:
pool.apply_async(func) # Pool執(zhí)行函數(shù),apply執(zhí)行函數(shù),當(dāng)有一個(gè)進(jìn)程執(zhí)行完畢后,會(huì)添加一個(gè)新的進(jìn)程到pool中
print('Waiting for all subprocesses done...')
pool.close()
pool.join() # 調(diào)用join之前,一定要先調(diào)用close() 函數(shù),否則會(huì)出錯(cuò), close()執(zhí)行后不會(huì)有新的進(jìn)程加入到pool,join函數(shù)等待素有子進(jìn)程結(jié)束
print('All subprocesses done.')
------------------------------------------------
>>> parent process 9828
>>> Waiting for all subprocesses done...
>>>
>>> Run task Lee-12948
>>>
>>> Run task Marlon-8948
>>>
>>> Run task Allen-18124
>>>
>>> Run task Frank-17404
>>> Task Frank runs 3.42 seconds.
>>> Task Lee, runs 6.69 seconds.
>>> Task Allen runs 8.38 seconds.
>>> Task Marlon runs 13.37 seconds.
>>> All subprocesses done.
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
