在client向DataNode寫入block之前,會與NameNode有一次通信,由NameNode來選擇指定數目的DataNode來存放副本。具體的副本選擇策略在BlockPlacementPolicy接口中,其子類實現是BlockPlacementPolicyDefault。該類中會有多個chooseTarget()方法重載,但最終調用了下面的方法:
1
/**
2
* This is not part of the public API but is used by the unit tests.
3
*/
4
DatanodeDescriptor[] chooseTarget(
int
numOfReplicas,
5
DatanodeDescriptor writer,
6
List<DatanodeDescriptor>
chosenNodes,
7
HashMap<Node, Node>
excludedNodes,
8
long
blocksize) {
9
//
numOfReplicas:要選擇的副本個數
10
//
clusterMap.getNumOfLeaves():整個集群的DN個數
11
if
(numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0
) {
12
return
new
DatanodeDescriptor[0
];
13
}
14
15
//
excludedNodes:排除的DN(因為有些DN已經被選中,所以不再選擇他們)
16
if
(excludedNodes ==
null
) {
17
excludedNodes =
new
HashMap<Node, Node>
();
18
}
19
20
int
clusterSize =
clusterMap.getNumOfLeaves();
21
//
總的副本個數=已選擇的個數 + 指定的副本個數
22
int
totalNumOfReplicas = chosenNodes.size()+
numOfReplicas;
23
if
(totalNumOfReplicas > clusterSize) {
//
若總副本個數 > 整個集群的DN個數
24
numOfReplicas -= (totalNumOfReplicas-
clusterSize);
25
totalNumOfReplicas =
clusterSize;
26
}
27
28
//
計算每個一個rack能有多少個DN被選中
29
int
maxNodesPerRack =
30
(totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2
;
31
32
List<DatanodeDescriptor> results =
33
new
ArrayList<DatanodeDescriptor>
(chosenNodes);
34
for
(DatanodeDescriptor node:chosenNodes) {
35
//
add localMachine and related nodes to excludedNodes
36
addToExcludedNodes(node, excludedNodes);
37
adjustExcludedNodes(excludedNodes, node);
38
}
39
40
//
客戶端不是DN
41
if
(!
clusterMap.contains(writer)) {
42
writer=
null
;
43
}
44
45
boolean
avoidStaleNodes = (stats !=
null
&&
stats
46
.shouldAvoidStaleDataNodesForWrite());
47
48
//
選擇numOfReplicas個DN,并返回本地DN
49
DatanodeDescriptor localNode =
chooseTarget(numOfReplicas, writer,
50
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
51
52
results.removeAll(chosenNodes);
53
54
//
sorting nodes to form a pipeline
55
//
將選中的DN(result中的元素)組織成pipe
56
return
getPipeline((writer==
null
)?
localNode:writer,
57
results.toArray(
new
DatanodeDescriptor[results.size()]));
58
}
方法含義大概就如注釋中寫的,不過要注意其中的變量含義。在第48行,又 調用 chooseTarget()方法來選擇指定數目的DN(選中的DN存放在result中),并返回一個DN作為本地DN。下面分析這個方法。 ?
1
/*
choose <i>numOfReplicas</i> from all data nodes
*/
2
private
DatanodeDescriptor chooseTarget(
int
numOfReplicas,
3
DatanodeDescriptor writer, HashMap<Node, Node>
excludedNodes,
4
long
blocksize,
int
maxNodesPerRack, List<DatanodeDescriptor>
results,
5
boolean
avoidStaleNodes) {
6
7
if
(numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0
) {
8
return
writer;
9
}
10
int
totalReplicasExpected = numOfReplicas +
results.size();
11
12
int
numOfResults =
results.size();
13
boolean
newBlock = (numOfResults==0
);
14
if
(writer ==
null
&& !
newBlock) {
15
writer = results.get(0
);
16
}
17
18
//
Keep a copy of original excludedNodes
19
final
HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
20
new
HashMap<Node, Node>(excludedNodes) :
null
;
21
22
try
{
23
if
(numOfResults == 0) {
//
選擇本地DN
24
writer =
chooseLocalNode(writer, excludedNodes, blocksize,
25
maxNodesPerRack, results, avoidStaleNodes);
26
if
(--numOfReplicas == 0
) {
27
return
writer;
28
}
29
}
30
if
(numOfResults <= 1) {
//
選擇遠程rack上的DN
31
chooseRemoteRack(1, results.get(0
), excludedNodes, blocksize,
32
maxNodesPerRack, results, avoidStaleNodes);
33
if
(--numOfReplicas == 0
) {
34
return
writer;
35
}
36
}
37
if
(numOfResults <= 2
) {
38
if
(clusterMap.isOnSameRack(results.get(0), results.get(1))) {
//
若前兩個DN在同一個rack上
39
//已選擇的前兩個DN在同一個rack上,則
選擇與第1個DN不在同一個rack上的DN
40
chooseRemoteRack(1, results.get(0
), excludedNodes, blocksize,
41
maxNodesPerRack, results, avoidStaleNodes);
42
}
else
if
(newBlock){
43
//
選擇與第2個DN在同一個rack上的DN
44
chooseLocalRack(results.get(1
), excludedNodes, blocksize,
45
maxNodesPerRack, results, avoidStaleNodes);
46
}
else
{
47
//
選擇與write在同一個rack上的DN
48
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
49
results, avoidStaleNodes);
50
}
51
if
(--numOfReplicas == 0
) {
52
return
writer;
53
}
54
}
55
//
在整個集群中隨機選擇剩余的DN
56
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
57
maxNodesPerRack, results, avoidStaleNodes);
58
}
catch
(NotEnoughReplicasException e) {
59
FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
60
+ (totalReplicasExpected - results.size()) + " to reach "
61
+ totalReplicasExpected + "\n"
62
+
e.getMessage());
63
if
(avoidStaleNodes) {
64
//
Retry chooseTarget again, this time not avoiding stale nodes.
65
66
//
excludedNodes contains the initial excludedNodes and nodes that were
67
//
not chosen because they were stale, decommissioned, etc.
68
//
We need to additionally exclude the nodes that were added to the
69
//
result list in the successful calls to choose*() above.
70
for
(Node node : results) {
71
oldExcludedNodes.put(node, node);
72
}
73
//
Set numOfReplicas, since it can get out of sync with the result list
74
//
if the NotEnoughReplicasException was thrown in chooseRandom().
75
numOfReplicas = totalReplicasExpected -
results.size();
76
return
chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
77
maxNodesPerRack, results,
false
);
78
}
79
}
80
return
writer;
81
}
? 下面依次分析這3個DN的選擇過程。
1、選擇本地DN: chooseLocalNode() ?
1
/*
choose <i>localMachine</i> as the target.
2
* if <i>localMachine</i> is not available,
3
* choose a node on the same rack
4
* @return the chosen node
5
*/
6
protected
DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
7
HashMap<Node, Node> excludedNodes,
long
blocksize,
int
maxNodesPerRack,
8
List<DatanodeDescriptor> results,
boolean
avoidStaleNodes)
9
throws
NotEnoughReplicasException {
10
//
if no local machine, randomly choose one node
11
if
(localMachine ==
null
)
//
client端上沒有DN
12
//
從整個集群中隨機選擇一個DN作為本地DN
13
return
chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
14
maxNodesPerRack, results, avoidStaleNodes);
15
16
//
otherwise try local machine first
17
Node oldNode =
excludedNodes.put(localMachine, localMachine);
18
if
(oldNode ==
null
) {
//
was not in the excluded list
19
//
該client端的DN還沒有被選中時,判斷這個DN是否負載過重
20
if
(isGoodTarget(localMachine, blocksize, maxNodesPerRack,
false
,
21
results, avoidStaleNodes)) {
22
results.add(localMachine);
23
//
add localMachine and related nodes to excludedNode
24
addToExcludedNodes(localMachine, excludedNodes);
25
return
localMachine;
26
}
27
}
28
29
//
try a node on local rack
30
//
選擇與該client同rack的DN
31
return
chooseLocalRack(localMachine, excludedNodes, blocksize,
32
maxNodesPerRack, results, avoidStaleNodes);
33
}
? 本地DN的選擇分三步:
1.1)如果client上沒有DN,則從整個集群中隨機選擇一個DN(chooseRandom()方法),并判斷是否該DN是否負載過重(步驟如1.2);如果負載過重則重新隨機選擇一個。以此類推.....
1.2)如果該client有DN,則判斷該DN是否負載過重(isGoodTarget()方法),步驟如下:結點是否可用、結點是否在“stale”狀態、結點容量是否足夠、結點流量情況、該節點所在的機架中存放當前數據的DN是否過多;
1.3)如果前兩個條件都不滿足,則選擇與client同rack的DN(chooseLocalRack()方法)作為本地結點,步驟如下:
a )隨機選擇一個與client同rack的DN(步驟同1.1);
b)否則從整個集群中隨機選擇一個DN(步驟同1.1)。
這兩步需要解釋一下,他們的步驟與1.1都是相同的,那么怎么會得出不同的結果。原因在于傳給chooseRandom()方法的第一個參數。如果參數是“NodeBase.ROOT”,實質上就是"/",表示的是整個集群;如果是“localMachine.getNetworkLocation()”,則表示localMachine所在的rack。這樣,通過第一個參數就可以表示要進行選擇的范圍。在NetworkTopology接口中定義了DN與rack的關系,機架感知也是借此來實現。
?
2、選擇遠程rack上的DN:chooseRemoteRack()
1
/*
choose <i>numOfReplicas</i> nodes from the racks
2
* that <i>localMachine</i> is NOT on.
3
* if not enough nodes are available, choose the remaining ones
4
* from the local rack
5
*/
6
protected
void
chooseRemoteRack(
int
numOfReplicas,
7
DatanodeDescriptor localMachine,
8
HashMap<Node, Node>
excludedNodes,
9
long
blocksize,
10
int
maxReplicasPerRack,
11
List<DatanodeDescriptor>
results,
12
boolean
avoidStaleNodes)
13
throws
NotEnoughReplicasException {
14
int
oldNumOfReplicas =
results.size();
15
//
randomly choose one node from remote racks
16
try
{
17
//
選擇與localMachine不在同一個rack上的DN
18
chooseRandom(numOfReplicas, "~" +
localMachine.getNetworkLocation(),
19
excludedNodes, blocksize, maxReplicasPerRack, results,
20
avoidStaleNodes);
21
}
catch
(NotEnoughReplicasException e) {
22
//
選擇與localMachine在同一個rack上的DN
23
chooseRandom(numOfReplicas-(results.size()-
oldNumOfReplicas),
24
localMachine.getNetworkLocation(), excludedNodes, blocksize,
25
maxReplicasPerRack, results, avoidStaleNodes);
26
}
27
}
遠程DN的選擇分兩步:
2.1)從非本地rack上選擇一個DN(步驟同1.1);
2.2)否則從 本地 rack上選擇一個DN(步驟同1.1);
同樣,這兩步還是復用了chooseRandom()方法。2.1)的參數為"~" + localMachine.getNetworkLocation(),即在集群中除了localMachine所在的rack中選擇一個DN(“~”表示排除);2.2)的參數為“localMachine.getNetworkLocation()”,表示從localMachine所在的rack中選擇一個DN。這里很重要,可以看到, 選擇的第二個DN與第一個DN并不一定就在不同的rack 。
?
3、選擇第3個DN
代碼在 上面第二段代碼分析的第37~50行中,具體步驟如下:
3.1)如果前兩個DN在同一個rack上,則選擇一個與他們不在同一個rack上的DN,同步驟2; ?
3.2)否則,如果newBlock為true,則選擇與第二個DN同rack的DN,步驟同1.3; ?
3.3)否則,選擇與第一個DN同rack的DN,步驟同1.3;
?
4、 從整個集群中選擇剩余副本個數的DN,步驟同1.1。(代碼在上面第二段代碼分析的第56行)
?
最后返回到上面第一段代碼的最后部分,將這些選中的DN組織成pipeline。
?
通過上面的分析也就明白一個問題:網上經常會看到,有人說第三個DN是與第二個DN是同rack的,也有人說第三個DN是與第一個DN同rack的。那么到底哪個說法對呢?關鍵就看第二個DN的選擇,我在上面寫了,第二個DN可能是與第一個DN不在同一個rack,但也可能在同一個rack中,具體要根據當時集群中的情況來分析。所以不能簡單的認死理。
?
本文基于hadoop1.2.1
如有錯誤,還請指正
參考文章: http://blog.csdn.net/xhh198781/article/details/7109764
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
