本文從參考了網絡上的許多內容,主要為分布式進程及其的評論內容。
所謂分布式運算,既可以指在同一臺機器上利用多進程(線程)進行運算,又可以指將計算任務進行分解,利用多臺機器進行運算。本文中的分布式單指最后一種。
python的標準庫
multiprocessing
中存在一個
managers
的子模塊,該模塊支持將多進程分布到多臺機器上。選擇一個進程來調度任務,其他進程進行計算,從而實現分布式運算。而調度進程和計算進程之間的通信,是通過網絡來進行的,用到了python的
socket
模塊。
1. linux中的分布式運算
以參考文章中的例子來進行說明,首先寫一個調度程序:
# task_manager.py
import
random
,
queue
from
multiprocessing
.
managers
import
BaseManager
# 發送任務的隊列
task_queue
=
queue
.
Queue
(
)
# 接收結果的隊列
result_queue
=
queue
.
Queue
(
)
# 將兩個隊列注冊到網絡上,可以被其他機器訪問
BaseManager
.
register
(
'get_task_queue'
,
callable
=
lambda
:
task_queue
)
BaseManager
.
register
(
'get_result_queue'
,
callable
=
lambda
:
result_queue
)
# 實例化一個manager,綁定端口5000,ip為空表名為本地ip,設置驗證碼
manager
=
BaseManager
(
address
=
(
''
,
5000
)
,
authkey
=
b
'abc'
)
# 啟動manager
manager
.
start
(
)
# 獲取網絡中的queue對象
task
=
manager
.
get_task_queue
(
)
result
=
manager
.
get_result_queue
(
)
# 放幾個任務進去
for
i
in
range
(
10
)
:
n
=
random
.
randint
(
0
,
10000
)
print
(
'Put task %d...'
%
n
)
task
.
put
(
n
)
# 監聽是否有結果傳回
print
(
'Try get results...'
)
for
i
in
range
(
10
)
:
r
=
result
.
get
(
timeout
=
10
)
print
(
'Result: %s'
%
r
)
# 關閉
manager
.
shutdown
(
)
再寫一個執行任務的程序:
# task_worker.py
import
time
,
queue
from
multiprocessing
.
managers
import
BaseManager
# 獲取網絡上被master注冊的queue
BaseManager
.
register
(
'get_task_queue'
)
BaseManager
.
register
(
'get_result_queue'
)
# 連接到服務器,即任務分配進程的地址
server_address
=
'191.168.6.67'
print
(
f
'Connecting to server {server_address}...'
)
# 注意驗證碼要保持一致
m
=
BaseManager
(
address
=
(
server_address
,
5000
)
,
authkey
=
b
'abc'
)
m
.
connect
(
)
# 獲取queue對象
task
=
m
.
get_task_queue
(
)
result
=
m
.
get_result_queue
(
)
# 開始進行計算
for
i
in
range
(
10
)
:
try
:
n
=
task
.
get
(
timeout
=
1
)
# 獲取manager進程放入task中的值
print
(
f
'run task {n}*{n}'
)
r
=
f
'{n} * {n} = {n*n}'
time
.
sleep
(
1
)
result
.
put
(
r
)
# 將計算的結果放入result隊列中
except
queue
.
Empty
:
print
(
'task queue is empty.'
)
print
(
'worker exit.'
)
將上面兩個程序分別拷貝到局域網中的兩臺linux服務器上,先運行
task_manager.py
,隨即運行
task_worker.py
,就可以看到計算的結果了。
注意,在運行
task_worker.py
時,可能會提示一下錯誤:
OSError: [Errno 113] No route to host
這是由于系統設置了防火墻,屏蔽了通過設置的端口進行通信導致的。
我使用的linux是centos,要查看哪些端口開放,可以使用:
firewall-cmd --list-ports
開放5000端口,可以使用:
firewall-cmd --zone=public --add-port=5000/tcp --permanent
設置完成后運行:
firewall-cmd --reload
重啟防火墻進行生效。
執行完上述操作后,在linux(centos)下運行應該沒有問題。
2. windows中的分布式運算
注意,以上腳本在windows系統中是無法運行的,經過我的試驗,主要發現了兩個問題:
一、
lambda
定義函數的問題
在
task_manager.py
中,將隊列注冊到網絡上時,對于其中的
callable
參數,使用的是以
lambda
關鍵字定義的函數,在windows下這是行不通的。因此,需要顯示地定義兩個函數,然后傳遞給
callable
:
_task
=
queue
.
Queue
(
)
_result
=
queue
.
Queue
(
)
def
task_queue
(
)
:
return
_task
def
result_queue
(
)
:
return
_result
然后,修改
callable=lambda: task_queue
和
callable=lambda: result_queue
分別為
callable=task_queue
和
callable=result_queue
。
二、__name__問題
在windows下,
manager.start()
及其之后的命令需要放在
if __name__ == '__main__':
語句塊中執行,否則會報錯。
三、綁定地址問題
在linux下,實例化一個manager時,ip可以留空,默認為本機ip,但在windows下面行不通。可以指定為
localhost
或
127.0.0.1
,這樣,在同一臺機器上分別運行兩個腳本是沒有問題的,但將腳本分別放到兩臺機器上運行會出問題。
解決方法是,將ip指定為本機在局域網中的ip地址,如
191.168.1.123
,然后兩臺機器就可以正常工作了。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
