前言:
無論什么樣的并行計算方式,其終極目的都是為了有效利用多機多核的計算能力,并能靈活滿足各種需求。相對于傳統基于單機編寫的運行程序,如果使用該方式改寫為多機并行程序,能夠充分利用多機多核cpu的資源,使得運行效率得到大幅度提升,那么這是一個好的靠譜的并行計算方式,反之,又難使用又難直接看出并行計算優勢,還要耗費大量學習成本,那就不是一個好的方式。
由于并行計算在互聯網應用的業務場景都比較復雜,如海量數據商品搜索、廣告點擊算法、用戶行為挖掘,關聯推薦模型等等,如果以真實場景舉例,初學者很容易被業務本身的復雜度繞暈了頭。因此,我們需要一個通俗易懂的例子來直接看到并行計算的優勢。
數字排列組合是個經典的算法問題,它很通俗易懂,適合不懂業務的人學習,我們通過它來發現和運用并行計算的優勢,可以得到一個很直觀的體會,并留下深刻的印象。問題如下:
請寫一個程序,輸入M,然后打印出M個數字的所有排列組合(每個數字為1,2,3,4中的一個)。比如:M=3,輸出:
1,1,1
1,1,2
……
4,4,4
共64個
注意:這里是使用計算機遍歷出所有排列組合,而不是求總數,如果只求總數,可以直接利用數學公式進行計算了。
一、單機解決方案:
通常,我們在一臺電腦上寫這樣的排列組合算法,一般用遞歸或者迭代來做,我們先分別看看這兩種方案。
1) 單機遞歸
可以將n(1<=n<=4)看做深度,輸入的m看做廣度,得到以下遞歸函數(完整代碼見附件CombTest.java)?
public void comb(String str){ for(int i=1;i<n+1;i++){ if(str.length()==m-1){ System.out.println(str+i); total++; }else comb(str+i); } }
但是當m數字很大時,會超出單臺機器的計算局限導致緩慢,太大數字的排列組合在一臺計算機上幾乎很難運行出,
不光是排列組合問題,其他類似遍歷求解的遞歸或回溯等算法也都存在這個問題,如何突破單機計算性能的問題一直困擾著我們。
2) 單機迭代
我們觀察到,求的m個數字的排列組合,實際上都可以在m-1的結果基礎上得到。
比如m=1,得到排列為1,2,3,4,記錄該結果為r(1)
m=2, 可以由(1,2,3,4)* r(1) = 11,12,13,14,21,22,…,43,44得到, 記錄該結果為r(2)
由此,r(m) =(1,2,3,4)*r(m-1)
如果我們從1開始計算,每輪結果保存到一個中間變量中,反復迭代這個中間變量,直到算出m的結果為止,這樣看上去也可行,仿佛還更簡單。
但是如果我們估計一下這個中間變量的大小,估計會嚇一跳,因為當m=14的時候,結果已經上億了,一億個數字,每個數字有14位長,并且為了得到m=15 的結果,我們需要將m=14的結果存儲在內存變量中用于迭代計算,無論以什么格式存,幾乎都會遭遇到單臺機器的內存局限,如果排列組合數字繼續增大下去,結果便會內存溢出了。
二、分布式并行計算解決方案:
我們看看如何利用多臺計算機來解決該問題,同樣以遞歸和迭代的方式進行分析。
1) 多機遞歸
做分布式并行計算的核心是需要改變傳統的編程設計觀念,將算法重新設計按多機進行拆分和合并,有效利用多機并行計算優勢去完成結果。
我們觀察到,將一個n深度m廣度的遞歸結果記錄為 r(n,m),那么它可以由(1,2,…n)*r(n,m-1)得到:
r(n,m)=1*r(n,m-1)+2*r(n,m-1)+…+n*r(n,m-1)
假設我們有n臺計算機,每臺計算機的編號依次為1到n,那么每臺計算機實際上只要計算r(n,m-1)的結果就夠了,這里實際上將遞歸降了一級, 并且讓多機并行計算。
如果我們有更多的計算機,假設有n*n臺計算機,那么:
r(n,m)=11*r(n,m-2)+12*r(n,m-2)+…+nn*r(n,m-2)
拆分到n*n臺計算機上就將遞歸降了兩級了
可以推斷,只要我們的機器足夠多,能夠線性擴充下去,我們的遞歸復雜度會逐漸降級,并且并行計算的能力會逐漸增強。
這里是進行拆分設計的分析是假設每臺計算機只跑1個實例,實際上每臺計算機可以跑多個實例(如上圖),我們下面的例子可以看到,這種并行計算的方式相對傳統單機遞歸有大幅度的效率提升。
這里使用fourinone框架設計分布式并行計算,第一次使用可以參考
分布式計算上手demo指南
, 開發包下載地址:
http://www.skycn.com/soft/68321.html
ParkServerDemo:負責工人注冊和分布式協調
CombCtor:是一個包工頭實現,它負責接收用戶輸入的m,并將m保存到變量comb,和線上工人總數wknum一起傳給各個工人,下達計算命令,并在計算完成后累加每個工人的結果數量得到一個結果總數。
CombWorker:是一個工人實現,它接收到工頭發的comb和wknum參數用于遞歸條件,并且通過獲取自己在集群的位置index,做為遞歸初始條件用于降級,它找到一個排列組合會直接在本機輸出,但是計數保存到total,然后將本機的total發給包工頭統計總體數量。
運行步驟:
為了方便演示,我們在一臺計算機上運行:
1、啟動ParkServerDemo:它的IP端口已經在配置文件的PARK部分的SERVERS指定。
2、啟動4個CombWorker實例:傳入2個參數,依次是ip或者域名、端口(如果在同一臺機器可以ip相同,但是端口不同),這里啟動4個工人是由于1<=n<=4,每個工人實例剛好可以通過集群位置 index進行任務拆分。
3、運行CombCtor查看計算時間和結果
下面是在一臺普通4cpu雙核2.4Ghz內存4g開發機上和單機遞歸CombTest的測試對比
通過測試結果我們可以看到:
1、可以推斷,由于單機的性能限制,無法完成m值很大的計算。
2、同是單機環境下,并行計算相對于傳統遞歸提升了將近1.6倍的效率,隨著m的值越大,節省的時間越多。
3、單機遞歸的CPU利用率不高,平均20-30%,在多核時代沒有充分利用機器資源,造成cpu閑置浪費,而并行計算則能打滿cpu,充分利用機器資源。
4、如果是多機分布式并行計算,在4臺機器上,采用4*4的16個實例完成計算,效率還會成倍提升,而且機器數量越多,計算越快。
5、單機遞歸實現和運行簡單,使用c或者java寫個main函數完成即可,而分布式并行程序,則需要利用并行框架,以包工頭+多個工人的全新并行計算思想去完成。
2) 多機迭代
我們最后看看如何構思多機分布式迭代方式實現。
思路一:
根據單機迭代的特點,我們可以將n臺計算機編號為1到n
第一輪統計各工人發送編號給工頭,工頭合并得到第一輪結果{1,2,3,…,n}
第二輪,工頭將第一輪結果發給各工人做為計算輸入條件,各工人根據自己編號累加,返回結果給工頭合并,得到第二輪結果:{11,12,13,1n,…,n1,n2,n3,nn}
這樣迭代下去,直到m輪結束,如上圖所示。
但很快就會發現,工頭合并每輪結果是個很大的瓶頸,很容易內存不夠導致計算崩潰。
思路二:
如果對思路一改進,各工人不發中間結果給工頭合并,而采取工人之間互相合并方式,將中間結果按編號分類,通過receive方式(工人互相合并及receive使用可參見
sayhello demo
),將屬于其他工人編號的數據發給對方。這樣一定程度避免了工頭成為瓶頸,但是經過實踐發現,隨著迭代變大,中間結果數據越來越大,工人合并耗用網絡也越來越大,如果中間結果保存在各工人內存中,隨著m變的更大,仍然存在內存溢出危險。
思路三:
繼續改進思路二,將中間結果變量不保存內存中,而每次寫入文件(
詳見Fourinone2.0對分布式文件的簡化操作
),這樣能避免內存問題,但是增加了大量的文件io消耗。雖然能運行出結果,但是并不高效。
總結:
或許分布式迭代在這里并不是最好的做法,上面的多機遞歸更合適。由于迭代計算的特點,需要將中間結果進行保存,做為下一輪計算的條件,如果為了利用多機并行計算優勢,又需要反復合并產生中間結果,所以導致對內存、帶寬、文件io的耗用很大,處理不當容易造成性能低下。
我們早已經進入多cpu多核時代,但是我們的傳統程序設計和算法還停留在過去單機應用,因此合理利用并行計算的優勢來改進傳統軟件設計思想,能為我們帶來更大效率的提升。
以下是分布式并行遞歸的demo源碼:
001
|
// CombTest
|
002
|
import
java.util.Date;
|
003
|
public
class
CombTest
|
004
|
{
|
005
|
????
int
m=
0
,n=
0
,total=
0
;
|
006
|
????
CombTest(
int
n,
int
m){
|
007
|
????????
this
.m=m;
|
008
|
????????
this
.n=n;
|
009
|
????
}
|
010
|
????
public
void
comb(String str)
|
011
|
????
{
|
012
|
????????
for
(
int
i=
1
;i<n+
1
;i++){
|
013
|
????????????
if
(str.length()==m-
1
){
|
014
|
????????????????
//System.out.println(str+i);//打印出組合序列
|
015
|
????????????????
total++;
|
016
|
????????????
}
|
017
|
????????????
else
|
018
|
????????????????
comb(str+i);
|
019
|
????????
}
|
020
|
????
}
|
021
|
????
?
|
022
|
????
public
static
void
main(String[] args)
|
023
|
????
{
|
024
|
????????
CombTest ct =
new
CombTest(Integer.parseInt(args[
0
]), Integer.parseInt(args[
1
]));
|
025
|
????????
long
begin = (
new
Date()).getTime();
|
026
|
????????
ct.comb(
""
);
|
027
|
????????
System.out.println(
"total:"
+ct.total);
|
028
|
????????
long
end = (
new
Date()).getTime();
|
029
|
????????
System.out.println(
"time:"
+(end-begin)/
1000
+
"s"
);
|
030
|
????
}
|
031
|
}
|
032
|
? |
033
|
// ParkServerDemo
|
034
|
import
com.fourinone.BeanContext;
|
035
|
public
class
ParkServerDemo{
|
036
|
????
public
static
void
main(String[] args){
|
037
|
????????
BeanContext.startPark();
|
038
|
????
}
|
039
|
}
|
040
|
? |
041
|
// CombCtor
|
042
|
import
com.fourinone.Contractor;
|
043
|
import
com.fourinone.WareHouse;
|
044
|
import
com.fourinone.WorkerLocal;
|
045
|
import
java.util.Date;
|
046
|
public
class
CombCtor
extends
Contractor
|
047
|
{
|
048
|
????
public
WareHouse giveTask(WareHouse wh)
|
049
|
????
{
|
050
|
????????
WorkerLocal[] wks = getWaitingWorkers(
"CombWorker"
);
|
051
|
????????
System.out.println(
"wks.length:"
+wks.length+
";"
+wh);
|
052
|
????????
wh.setObj(
"wknum"
,wks.length);
|
053
|
????????
WareHouse[] hmarr = doTaskBatch(wks, wh);
//批量執行任務,所有工人完成才返回
|
054
|
????????
int
total=
0
;
|
055
|
????????
for
(WareHouse hm:hmarr)
|
056
|
????????????
total+=(Integer)hm.getObj(
"total"
);
|
057
|
????????
System.out.println(
"total:"
+total);
|
058
|
????????
return
wh;
|
059
|
????
}
|
060
|
????
?
|
061
|
????
public
static
void
main(String[] args)
|
062
|
????
{
|
063
|
????????
CombCtor a =
new
CombCtor();
|
064
|
????????
WareHouse wh =
new
WareHouse(
"comb"
, Integer.parseInt(args[
0
]));
|
065
|
????????
long
begin = (
new
Date()).getTime();
|
066
|
????????
a.doProject(wh);
|
067
|
????????
long
end = (
new
Date()).getTime();
|
068
|
????????
System.out.println(
"time:"
+(end-begin)/
1000
+
"s"
);
|
069
|
????????
a.exit();
|
070
|
????
}
|
071
|
}
|
072
|
? |
073
|
//CombWorker
|
074
|
import
com.fourinone.MigrantWorker;
|
075
|
import
com.fourinone.WareHouse;
|
076
|
public
class
CombWorker
extends
MigrantWorker
|
077
|
{
|
078
|
????
private
int
m=
0
,n=
0
,total=
0
,index=-
1
;
|
079
|
? |
080
|
????
public
WareHouse doTask(WareHouse wh)
|
081
|
????
{
|
082
|
????????
total=
0
;
|
083
|
????????
n = (Integer)wh.getObj(
"wknum"
);
|
084
|
????????
m = (Integer)wh.getObj(
"comb"
);
|
085
|
????????
index = getSelfIndex()+
1
;
|
086
|
????????
System.out.println(
"index:"
+index);
|
087
|
????????
comb(index+
""
);
|
088
|
????????
System.out.println(
"total:"
+total);
|
089
|
????????
return
new
WareHouse(
"total"
,total);
|
090
|
????
}
|
091
|
????
?
|
092
|
????
public
void
comb(String str)
|
093
|
????
{
|
094
|
????????
for
(
int
i=
1
;i<n+
1
;i++){
|
095
|
????????????
if
(str.length()==m-
1
){
|
096
|
????????????????
//System.out.println(str+i);//打印出組合序列
|
097
|
????????????????
total++;
|
098
|
????????????
}
|
099
|
????????????
else
|
100
|
????????????????
comb(str+i);
|
101
|
????????
}
|
102
|
????
}
|
103
|
????
?
|
104
|
????
public
static
void
main(String[] args)
|
105
|
????
{
|
106
|
????????
CombWorker mw =
new
CombWorker();
|
107
|
????????
mw.waitWorking(args[
0
],Integer.parseInt(args[
1
]),
"CombWorker"
);
|
108
|
????
}
|
109
|
}
|
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
