多任務(wù)編程
意義 :充分利用計算機(jī)的資源提高程序的運(yùn)行效率
定義 :通過應(yīng)用程序利用計算機(jī)多個核心,達(dá)到同時執(zhí)行多個任務(wù)的目的
實施方案
:
多進(jìn)程
、
多線程
并行 :多個計算機(jī)核心 并行的 同時處理 多個任務(wù)
并發(fā) :內(nèi)核在 多個任務(wù)間 不斷 切換 ,達(dá)到好像內(nèi)核在同時處理多個任務(wù)的運(yùn)行效果
進(jìn)程 :程序在計算機(jī)中運(yùn)行一次的過程
程序 :是一個可執(zhí)行文件,是靜態(tài)的,占有磁盤,不占有計算機(jī)運(yùn)行資源
進(jìn)程 :進(jìn)程是一個動態(tài)的過程描述,占有CPU內(nèi)存等計算機(jī)資源的,有一定的生命周期
* 同一個程序的不同執(zhí)行過程是不同的進(jìn)程,因為分配的計算機(jī)資源等均不同
父子進(jìn)程 : 系統(tǒng)中每一個進(jìn)程(除了系統(tǒng)初始化進(jìn)程)都有唯一的父進(jìn)程,可以有0個或多個子進(jìn)
程。父子進(jìn)程關(guān)系便于進(jìn)程管理。
進(jìn)程
CPU時間片: 如果一個進(jìn)程在某個時間點被計算機(jī)分配了內(nèi)核,我們稱為該進(jìn)程在CPU時間片上。
PCB( 進(jìn)程控制塊):存放進(jìn)程消息的空間
進(jìn)程ID(PID):進(jìn)程在操作系統(tǒng)中的唯一編號,由系統(tǒng)自動分配
進(jìn)程信息包括:進(jìn)程PID,進(jìn)程占有的內(nèi)存位置,創(chuàng)建時間,創(chuàng)建用戶. . . . . . . .??
進(jìn)程特征:
- 進(jìn)程是操作系統(tǒng)分配計算機(jī)資源的最小單位
- 每一個進(jìn)程都有自己單獨的 虛擬 內(nèi)存空間
- 進(jìn)程間的執(zhí)行相互獨立,互不影響
進(jìn)程的狀態(tài)
1、 三態(tài)
- 就緒態(tài): 進(jìn)程具備執(zhí)行條件, 等待系統(tǒng)分配CPU
- 運(yùn)行態(tài): 進(jìn)程 占有CPU處理器 ,處于運(yùn)行狀態(tài)
- 等待態(tài): 進(jìn)程 暫時不具備運(yùn)行條件 ,需要 阻塞 等待,讓出CPU
2、 五態(tài)(增加新建態(tài)和終止態(tài))
- 新建態(tài): 創(chuàng)建一個 新的進(jìn)程 ,獲取資源的過程
- 終止態(tài): 進(jìn)程結(jié)束 釋放資源的過程
查看進(jìn)程樹: pstree
查看父進(jìn)程PID: ? ps -ajx
linux查看進(jìn)程命令: ps -aux
有一列為STAT為進(jìn)程的狀態(tài)
D 等待態(tài) (不可中斷等待)(
阻塞
)
S 等待態(tài) (可中斷等待)(
睡眠
)
T 等待態(tài) (
暫停
狀態(tài))
R 運(yùn)行態(tài) (就緒態(tài)運(yùn)行態(tài))
Z 僵尸態(tài)
+ 前臺進(jìn)程(能在終端顯示出現(xiàn)象的)
< 高優(yōu)先級
N 低優(yōu)先級
l 有多線程的
s 會話組組長
?
os.fork創(chuàng)建進(jìn)程
pid = os.fork()
功能:創(chuàng)建一個子進(jìn)程
返回值:創(chuàng)建成功在原有的進(jìn)程中返回子進(jìn)程的PID,在子進(jìn)程中返回0;創(chuàng)建失敗返回一個負(fù)數(shù)
父子進(jìn)程通常會根據(jù)fork返回值的差異選擇執(zhí)行不同的代碼(使用if結(jié)構(gòu))
import os from time import sleep pid = os.fork() if pid < 0: print ( " 創(chuàng)建進(jìn)程失敗 " ) # 子進(jìn)程執(zhí)行部分 elif pid == 0: print ( " 新進(jìn)程創(chuàng)建成功 " ) # 父進(jìn)程執(zhí)行部分 else : sleep( 1 ) print ( " 原來的進(jìn)程 " ) print ( " 程序執(zhí)行完畢 " ) # 新進(jìn)程創(chuàng)建成功 # 原來的進(jìn)程 # 程序執(zhí)行完畢
- 子進(jìn)程會復(fù)制父進(jìn)程全部代碼段 (包括fork前的代碼) 但是子進(jìn)程僅從fork的下一句開始執(zhí)行
- 父進(jìn)程不一定先執(zhí)行 (進(jìn)程之間相互獨立,互不影響)
- 父子進(jìn)程各有自己的屬性特征,比如:PID號PCB內(nèi)存空間
- 父進(jìn)程fork之前開辟的空間子進(jìn)程同樣擁有,但是進(jìn)程之間相互獨立,互不影響.
父子進(jìn)程的變量域
import os from time import sleep a = 1 pid = os.fork() if pid < 0: print ( " 創(chuàng)建進(jìn)程失敗 " ) elif pid == 0: print ( " 子進(jìn)程 " ) print ( " a = " ,a) a = 10000 print ( " a = " ,a) else : sleep( 1 ) print ( " 父進(jìn)程 " ) print ( " parent a : " ,a) # a = 1 # 子進(jìn)程 # a = 1 # a = 10000 # 父進(jìn)程 # parent a : 1
進(jìn)程ID和退出函數(shù)
os.getpid() ? 獲取當(dāng)前進(jìn)程的PID號
返回值:返回PID號
os.getppid()? 獲取父類進(jìn)程的進(jìn)程號
返回值:返回PID號
import os pid = os.fork() if pid < 0: print ( " Error " ) elif pid == 0: print ( " Child PID: " , os.getpid()) # 26537 print ( " Get parent PID: " , os.getppid()) # 26536 else : print ( " Get child PID: " , pid) # 26537 print ( " Parent PID: " , os.getpid()) # 26536
os._exit(status) ?? 退出進(jìn)程
參數(shù):進(jìn)程的退出狀態(tài) 整數(shù)
sys.exit([status]) ?? 退出進(jìn)程
參數(shù):默認(rèn)為0 整數(shù)則表示退出狀態(tài);符串則表示退出時打印內(nèi)容
sys.exit([status])可以通過 捕獲 SystemExit 異常 阻止退出
import os,sys # os._exit(0) # 退出進(jìn)程 try : sys.exit( " 退出 " ) except SystemExit as e: print ( " 退出原因: " ,e) # 退出原因: 退出
孤兒和僵尸
孤兒進(jìn)程
父進(jìn)程先于子進(jìn)程退出,此時子進(jìn)程就會變成孤兒進(jìn)程
孤兒進(jìn)程會被系統(tǒng)指定的進(jìn)程收養(yǎng),即系統(tǒng)進(jìn)程會成為該孤兒進(jìn)程新的父進(jìn)程。孤兒進(jìn)程退出時該父進(jìn)程會處理退出狀態(tài)
僵尸進(jìn)程
子進(jìn)程先與父進(jìn)程退出,父進(jìn)程沒有處理子進(jìn)程退出狀態(tài),此時子進(jìn)程成為僵尸進(jìn)程
僵尸進(jìn)程已經(jīng)結(jié)束,但是會滯留部分PCB信息在內(nèi)存,大量的僵尸會消耗系統(tǒng)資源,應(yīng)該盡量避免
如何避免僵尸進(jìn)程的產(chǎn)生
父進(jìn)程處理子進(jìn)程退出狀態(tài)
pid, status = os.wait()
功能:在父進(jìn)程中 阻塞等待 處理子進(jìn)程的退出
返回值: pid? ? ?退出的子進(jìn)程的PID號
status 子進(jìn)程的退出狀態(tài)
import os, sys pid = os.fork() if pid < 0: print ( " Error " ) elif pid == 0: print ( " Child process " , os.getpid()) # Child process 27248 sys.exit(1 ) else : pid, status = os.wait() # 阻塞等待子進(jìn)程退出 print ( " pid : " , pid) # pid : 27248 # 還原退出狀態(tài) print ( " status: " , os.WEXITSTATUS(status)) # status: 1 while True: pass
創(chuàng)建二級子進(jìn)程
- 父進(jìn)程創(chuàng)建子進(jìn)程等待子進(jìn)程退出
- 子進(jìn)程創(chuàng)建二級子進(jìn)程,然后馬上退出
- 二級子進(jìn)程成為孤兒,處理具體事件
import os from time import sleep def fun1(): sleep( 3 ) print ( " 第一件事情 " ) def fun2(): sleep( 4 ) print ( " 第二件事情 " ) pid = os.fork() if pid < 0: print ( " Create process error " ) elif pid == 0: # 子進(jìn)程 pid0 = os.fork() # 創(chuàng)建二級進(jìn)程 if pid0 < 0: print ( " 創(chuàng)建二級進(jìn)程失敗 " ) elif pid0 == 0: # 二級子進(jìn)程 fun2() # 做第二件事 else : # 二級進(jìn)程 os._exit(0) # 二級進(jìn)程退出 else : os.wait() fun1() # 做第一件事 # 第一件事情 # 第二件事情
通過信號處理子進(jìn)程退出
原理: 子進(jìn)程退出時會發(fā)送信號給父進(jìn)程,如果父進(jìn)程忽略子進(jìn)程信號, 則系統(tǒng)就會自動處 理子進(jìn)程退出。
方法: 使用signal模塊在父進(jìn)程創(chuàng)建子進(jìn)程前寫如下語句 :
import signal
signal.signal(signal.SIGCHLD,signal.SIG_IGN)
特點 : 非阻塞,不會影響父進(jìn)程運(yùn)行。可以處理所有子進(jìn)程退出
Multiprocessing創(chuàng)建進(jìn)程
步驟:
- 需要將要做的事情封裝成函數(shù)
- multiprocessing.Process創(chuàng)建進(jìn)程,并綁定函數(shù)
- start啟動進(jìn)程
- join回收進(jìn)程 ?
p = multiprocessing.Process(target, [name], [args], [kwargs]) ?
創(chuàng)建進(jìn)程對象
參數(shù):
- target : 要綁定的函數(shù)名
- name : 給進(jìn)程起的名稱 (默認(rèn)Process-1)
- args:?元組?用來給target函數(shù)傳參
- kwargs :?字典?用來給target函數(shù)鍵值傳參
p.start()
功能
?: 啟動進(jìn)程 自動運(yùn)行terget綁定函數(shù)。此時進(jìn)程被創(chuàng)建
p.join([timeout])
功能
: 阻塞等待子進(jìn)程退出,最后回收進(jìn)程
參數(shù)
: 超時時間
multiprocessing的注意事項:
- 使用multiprocessing創(chuàng)建進(jìn)程子進(jìn)程同樣復(fù)制父進(jìn)程的全部內(nèi)存空間,之后有自己獨立的空間,執(zhí)行上互不干擾
- 如果不使用join回收可能會產(chǎn)生僵尸進(jìn)程
- 一般父進(jìn)程功能就是創(chuàng)建子進(jìn)程回收子進(jìn)程,所有事件交給子進(jìn)程完成
- multiprocessing創(chuàng)建的子進(jìn)程無法使用ptint
import multiprocessing as mp from time import sleep import os a = 1 def fun(): sleep( 2 ) print ( " 子進(jìn)程事件 " ,os.getpid()) global a a = 10000 print ( " a = " ,a) p = mp.Process(target = fun) # 創(chuàng)建進(jìn)程對象 p.start() # 啟動進(jìn)程 sleep(3 ) print ( " 這是父進(jìn)程 " ) p.join() # 回收進(jìn)程 print ( " parent a: " ,a) # 子進(jìn)程事件 5434 # a = 10000 # 這是父進(jìn)程 # parent a: 1 Process(target)
multiprocessing進(jìn)程屬性
p.name ? ? 進(jìn)程名稱
p.pid 對應(yīng)子進(jìn)程的PID號
p.is_alive() ? 查看子進(jìn)程是否在生命周期
p.daemon? ? ? ? ?設(shè)置父子進(jìn)程的退出關(guān)系
如果等于True則子進(jìn)程會隨父進(jìn)程的退出而結(jié)束,就不用使用 join(),必須要求在start()前設(shè)置
進(jìn)程池
引言:如果有大量的任務(wù)需要多進(jìn)程完成,而任務(wù)周期又比較短且需要頻繁創(chuàng)建。此時可能產(chǎn)生大量進(jìn)程頻繁創(chuàng)建銷毀的情況,消耗計算機(jī)資源較大,這個時候就需要進(jìn)程池技術(shù)
進(jìn)程池的原理:創(chuàng)建一定數(shù)量的進(jìn)程來處理事件,事件處理完進(jìn)程不退出而是繼續(xù)處理其他事件,直到所有事件全都處理完畢統(tǒng)一銷毀。增加進(jìn)程的重復(fù)利用,降低資源消耗。 ?
1.創(chuàng)建進(jìn)程池,在池內(nèi)放入適當(dāng)數(shù)量的進(jìn)程
from multiprocessing import Pool
Pool(processes) 創(chuàng)建進(jìn)程池對象
- 參數(shù):進(jìn)程數(shù)量
- 返回 :?指定進(jìn)程數(shù)量,默認(rèn)根據(jù)系統(tǒng)自動判定 ?
2.將事件封裝函數(shù),放入到進(jìn)程池
pool.apply_async(fun,args,kwds) 將事件放入進(jìn)程池執(zhí)行
參數(shù):
- fun 要執(zhí)行的事件函數(shù)
- args 以元組為fun傳參
- kwds 以字典為fun傳參
返回值 : ?
- 返回一個事件對象 通過get()屬性函數(shù)可以獲取fun的返回值?
3.關(guān)閉進(jìn)程池
?pool.close() 關(guān)閉進(jìn)程池,無法再加入事件
4.回收進(jìn)程
pool.join() 回收進(jìn)程池 ?
from multiprocessing import Pool from time import sleep,ctime pool = Pool(4) # 創(chuàng)建進(jìn)程池 # 進(jìn)程池事件 def worker(msg): sleep( 2 ) print (msg) return ctime() # 向進(jìn)程池添加執(zhí)行事件 for i in range(4 ): msg = " Hello %d " % i # r 代表func事件的一個對象 r = pool.apply_async(func=worker,args= (msg,)) pool.close() # 關(guān)閉進(jìn)程池 pool.join() # 回收進(jìn)程池 # Hello 3 # Hello 2 # Hello 0 # Hello 1
進(jìn)程間通信(IPC)
由于進(jìn)程間空間獨立,資源無法共享,此時在進(jìn)程間通信就需要專門的通信方法。
進(jìn)程間通信方法 : 管道 消息隊列 共享內(nèi)存 信號信號量 套接字
管道通信(Pipe)
通信原理:在內(nèi)存中開辟管道空間,生成管道操作對象,多個進(jìn)程使用同一個管道對象進(jìn)行讀寫即可實現(xiàn)通信
from multiprocessing import Pipe
fd1, fd2 = Pipe(duplex = True)
- 功能:創(chuàng)建管道
- 參數(shù):默認(rèn)表示雙向管道,如果為False 表示單向管道
- 返回值:表示管道兩端的讀寫對象;如果是雙向管道均可讀寫;如果是單向管道fd1只讀 fd2只寫
fd.recv()
- 功能 : 從管道獲取內(nèi)容
- 返回值:獲取到的數(shù)據(jù),當(dāng)管道為空則阻塞
fd.send(data)
- 功能: 向管道寫入內(nèi)容
- 參數(shù): 要寫入的數(shù)據(jù)?
注意:
- multiprocessing中管道通信只能用于父子關(guān)系進(jìn)程中?
- 管道對象在父進(jìn)程中創(chuàng)建,子進(jìn)程通過父進(jìn)程獲取 ?
from multiprocessing import Pipe, Process fd1, fd2 = Pipe() # 創(chuàng)建管道,默認(rèn)雙向管道 def fun1(): data = fd1.recv() # 從管道獲取消息 print ( " 管道2傳給管道1的數(shù)據(jù) " , data) inpu = " 跟你說句悄悄話 " fd1.send(inpu) def fun2(): fd2.send( " 肥水不流外人天 " ) data = fd2.recv() print ( " 管道1傳給管道2的數(shù)據(jù) " , data) p1 = Process(target= fun1) P2 = Process(target= fun2) p1.start() P2.start() p1.join() P2.join() # 管道2傳給管道1的數(shù)據(jù) 肥水不流外人天 # 管道1傳給管道2的數(shù)據(jù) 跟你說句悄悄話
消息隊列
從內(nèi)存中開辟隊列結(jié)構(gòu)空間,多個進(jìn)程可以向隊列投放消息,在取出來的時候按照 先進(jìn)先出 順序取出 ?
q = Queue(maxsize = 0)
創(chuàng)建隊列對象
- maxsize :默認(rèn)表示系統(tǒng)自動分配隊列空間;如果傳入正整數(shù)則表示最多存放多少條消息
- 返回值 : 隊列對象
q.put(data,[block,timeout])
向隊列中存入消息
- data: 存放消息( python 數(shù)據(jù)類型)
- block: 默認(rèn)為 True 表示當(dāng)前隊列滿的時候阻塞,設(shè)置為 False 則表示非阻塞
- timeout: 當(dāng) block 為 True 表示超時時間
返回值:返回獲取的消息
q.get([block,timeout])
從隊列取出消息
- 參數(shù):block 設(shè)置是否阻塞 False為非阻塞;timeout 超時檢測
- 返回值: 返回獲取到的內(nèi)容
q.full() 判斷隊列是否為滿
q.empty() 判斷隊列是否為空
q.qsize() 判斷當(dāng)前隊列有多少消息?
q.close() 關(guān)閉隊列
from multiprocessing import Process, Queue from time import sleep from random import randint # 創(chuàng)建消息隊列 q = Queue(3 ) # 請求進(jìn)程 def request(): for i in range(2 ): x = randint(0, 100 ) y = randint(0, 100 ) q.put((x, y)) # 處理進(jìn)程 def handle(): while True: sleep( 1 ) try : x, y = q.get(timeout=2 ) except : break else : print ( " %d + %d = %d " % (x, y, x + y)) p1 = Process(target= request) p2 = Process(target= handle) p1.start() p2.start() p1.join() p2.join() # 12 + 61 = 73 # 69 + 48 = 117
共享內(nèi)存?
在內(nèi)存中開辟一段空間,存儲數(shù)據(jù),對多個進(jìn)程可見,每次寫入共享內(nèi)存中的數(shù)據(jù)會覆蓋之前的內(nèi)容,效率高,速度快
from multiprocessing import Value, Array
obj = Value(ctype,obj)
功能 :開辟共享內(nèi)存空間?
參數(shù) :ctype 字符串 要轉(zhuǎn)變的c的數(shù)據(jù)類型,對比類型對照表
obj 共享內(nèi)存的初始化數(shù)據(jù)
返回:共享內(nèi)存對象
from multiprocessing import Process,Value import time from random import randint # 創(chuàng)建共享內(nèi)存 money = Value( ' i ' , 5000 ) # 修改共享內(nèi)存 def man(): for i in range(30 ): time.sleep( 0.2 ) money.value += randint(1, 1000 ) def girl(): for i in range(30 ): time.sleep( 0.15 ) money.value -= randint(100, 800 ) m = Process(target= man) g = Process(target= girl) m.start() g.start() m.join() g.join() print ( " 一月余額: " , money.value) # 獲取共享內(nèi)存值 # 一月余額: 4264
obj = Array(ctype,obj)
功能 :開辟共享內(nèi)存
參數(shù) :ctype 要轉(zhuǎn)化的c的類型
obj 要存入共享的數(shù)據(jù)
如果是列表 將列表存入共享內(nèi)存,要求數(shù)據(jù)類型一致
如果是正整數(shù) 表示開辟幾個數(shù)據(jù)空間
from multiprocessing import Process, Array # 創(chuàng)建共享內(nèi)存 # shm = Array('i',[1,2,3]) # shm = Array('i',3) # 表示開辟三個空間的列表 shm = Array( ' c ' ,b " hello " ) # 字節(jié)串 def fun(): # 共享內(nèi)存對象可迭代 for i in shm: print (i) shm[0] = b ' H ' p = Process(target= fun) p.start() p.join() for i in shm: # 子進(jìn)程修改,父進(jìn)程中也跟著修改 print (i) print (shm.value) # 打印字節(jié)串 b'Hello'
信號量(信號燈集)
通信原理:給定一個數(shù)量對多個進(jìn)程可見。多個進(jìn)程都可以操作該數(shù)量增減,并根據(jù)數(shù)量值決定自己的行為。
from multiprocessing import Semaphore
sem = Semaphore(num)
創(chuàng)建信號量對象
- 參數(shù) : 信號量的初始值
- 返回值 : 信號量對象
sem.acquire()?
將信號量減1 當(dāng)信號量為0時阻塞
sem.release()?
將信號量加1
sem.get_value()
?獲取信號量數(shù)量
from multiprocessing import Process, Semaphore sem = Semaphore(3) # 創(chuàng)建信號量,最多允許3個任務(wù)同時執(zhí)行 def rnewu(): sem.acquire() # 每執(zhí)行一次減少一個信號量 print ( " 執(zhí)行任務(wù).....執(zhí)行完成 " ) sem.release() # 執(zhí)行完成后增加信號量 for i in range(3): # 有3個人想要執(zhí)行任務(wù) p = Process(target= rnewu) p.start() p.join()
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
