作者:chen_h
微信號 & QQ:862251340
微信公眾號:coderpai
(一)機器學習中的集成學習入門
(二)bagging 方法
(三)使用Python進行交易的隨機森林算法
(四)Python中隨機森林的實現與解釋
(五)如何用 Python 從頭開始實現 Bagging 算法
(六)如何利用Python從頭開始實現隨機森林算法
介紹
隨機森林是集成學習中一個主要的算法。簡而言之,集成方法是一種將幾個弱學習器的預測結果進行組合,最終形成一個強學習器的方法。可以直觀的猜測一下,隨機森林通過減少過擬合來達到比決策樹更好的效果。決策樹和隨機森林都可用于回歸和分類問題。在這篇文章中,我們利用隨機森林來解決一些問題。
理論
在開始編寫代碼之前,我們需要了解一些基本理論:
1.特征bagging:自舉過程是一種從原始樣本中進行又放回的采樣。在特征 bagging 過程中,我們從原始特征中進行隨機特征采樣,并且把采樣到的特征傳遞到不同的樹上面。(不采用放回的采集,因為具有冗余特征是沒有意義的)。這樣做事為了減少樹之間的相關性。我們的目標就是制作高度不相關的決策樹。
2.聚合:使隨機森林比決策樹更好的核心是聚合不相關的樹。我們的想法是創建幾個淺層的樹模型,然后將它們平均化以創建更好的隨機森林,這樣可以將一些隨機誤差的平均值變為零。在回歸的情況下,我們可以平均每個樹的預測(平均值),而在分類問題的情況下,我們可以簡單的取每個樹投票的大多數類別。
Python 代碼
要從頭開始編碼我們的隨機森林,我們將遵循自上而下的方法。我們將從一個黑盒子開始,并進一步將其分解為幾個黑盒子,抽象級別越來越低,細節越來越多,直到我們最終達到不再抽象的程度。
隨機森林類
我們正在創建一個隨機森林回歸器,如果你想創建一個分類器,那么只需要對此代碼進行細微的調整就行了。首先,我們需要知道我們的黑盒子的輸入和輸出是什么,所以我們需要知道定義我們的隨機森林的參數是:
- x:訓練集的自變量。為了保持簡單,我不單獨創建一個 fit 方法,因此基類構造函數將接受訓練集;
- y:監督學習所需的相應因變量(隨機森林是一種監督學習技術);
- n_trees:我們合作創建隨機森林的不相關樹的數量;
- n_features:要采樣并傳遞到每棵樹的要素數量,這是特征bagging 發生的位置。它可以是 sqrt ,log2 或者整數。在 sqrt 的情況下,對于每個樹采樣的特征的數量是總特征的平方根,在 log2 的情況下是總特征的對數基數 2;
- sample_size:隨機選擇并傳遞到每個樹的行數。這通常等于總行數,但在某些情況下可以減少以提高性能并降低樹的相關性(樹的 bagging 方法是一種完全獨立的機器學習技術);
- depth:每個決策樹的深度。更高的深度意味著更多的分裂,這增加了每棵樹的過度擬合傾向,但由于我們聚集了幾個不相關的樹木,所以過度擬合單個樹木幾乎不會對整個森林造成干擾;
- min_leaf:節點中導致進一步拆分所需的最小行數。降低 min_leaf,樹的深度會越高;
讓我們開始定義我們的隨機森林類。
class
RandomForest
(
)
:
def
__init__
(
self
,
x
,
y
,
n_trees
,
n_features
,
sample_sz
,
depth
=
10
,
min_leaf
=
5
)
:
np
.
random
.
seed
(
12
)
if
n_features
==
'sqrt'
:
self
.
n_features
=
int
(
np
.
sqrt
(
x
.
shape
[
1
]
)
)
elif
n_features
==
'log2'
:
self
.
n_features
=
int
(
np
.
log2
(
x
.
shape
[
1
]
)
)
else
:
self
.
n_features
=
n_features
print
(
self
.
n_features
,
"sha: "
,
x
.
shape
[
1
]
)
self
.
x
,
self
.
y
,
self
.
sample_sz
,
self
.
depth
,
self
.
min_leaf
=
x
,
y
,
sample_sz
,
depth
,
min_leaf
self
.
trees
=
[
self
.
create_tree
(
)
for
i
in
range
(
n_trees
)
]
def
create_tree
(
self
)
:
idxs
=
np
.
random
.
permutation
(
len
(
self
.
y
)
)
[
:
self
.
sample_sz
]
f_idxs
=
np
.
random
.
permutation
(
self
.
x
.
shape
[
1
]
)
[
:
self
.
n_features
]
return
DecisionTree
(
self
.
x
.
iloc
[
idxs
]
,
self
.
y
[
idxs
]
,
self
.
n_features
,
f_idxs
,
idxs
=
np
.
array
(
range
(
self
.
sample_sz
)
)
,
depth
=
self
.
depth
,
min_leaf
=
self
.
min_leaf
)
def
predict
(
self
,
x
)
:
return
np
.
mean
(
[
t
.
predict
(
x
)
for
t
in
self
.
trees
]
,
axis
=
0
)
def
std_agg
(
cnt
,
s1
,
s2
)
:
return
math
.
sqrt
(
(
s2
/
cnt
)
-
(
s1
/
cnt
)
**
2
)
- __init__:構造函數只需借助我們的參數定義隨機森林并創建所需數量的樹;
- creat_tree:通過調用 Decision Tree 類的構造函數創建一個新的決策樹。現在假設它是一個黑盒子。我們稍后會寫關于它的代碼。每棵樹都會受到一個隨機的特征子集(特征 bagging)和一組隨機的行;
- Predict:我們的隨機森林預測只是所有決策樹預測的平均值;
如果我們能夠神奇的創建樹,那么想想隨機森林是多么容易。現在我們降低抽象級別并編寫代碼來創建決策樹。
決策樹類
決策樹將具有以下參數:
- indxs:此參數用于跟蹤原始集的哪些索引向右移動,哪些索引轉到左側樹。因此,每個樹都有這個參數 indxs,它存儲它包含的行的索引。通過平均這些行來進行預測。
- min_leaf:葉節點上需要的最小行樣本。每個葉節點的行樣本都小于 min_leaf ,因為它們不能再分割。
- depth:每棵樹內可能的最大深度或者最大分割數。
class
DecisionTree
(
)
:
def
__init__
(
self
,
x
,
y
,
n_features
,
f_idxs
,
idxs
,
depth
=
10
,
min_leaf
=
5
)
:
self
.
x
,
self
.
y
,
self
.
idxs
,
self
.
min_leaf
,
self
.
f_idxs
=
x
,
y
,
idxs
,
min_leaf
,
f_idxs
self
.
depth
=
depth
self
.
n_features
=
n_features
self
.
n
,
self
.
c
=
len
(
idxs
)
,
x
.
shape
[
1
]
self
.
val
=
np
.
mean
(
y
[
idxs
]
)
self
.
score
=
float
(
'inf'
)
self
.
find_varsplit
(
)
def
find_varsplit
(
self
)
:
#Will make it recursive later
for
i
in
self
.
f_idxs
:
self
.
find_better_split
(
i
)
def
find_better_split
(
self
,
var_idx
)
:
#Lets write it later
pass
for
i
in
range
(
0
,
self
.
n
-
self
.
min_leaf
-
1
)
:
xi
,
yi
=
sort_x
[
i
]
,
sort_y
[
i
]
lhs_cnt
+=
1
;
rhs_cnt
-=
1
lhs_sum
+=
yi
;
rhs_sum
-=
yi
lhs_sum2
+=
yi
**
2
;
rhs_sum2
-=
yi
**
2
if
i
<
self
.
min_leaf
or
xi
==
sort_x
[
i
+
1
]
:
continue
lhs_std
=
std_agg
(
lhs_cnt
,
lhs_sum
,
lhs_sum2
)
rhs_std
=
std_agg
(
rhs_cnt
,
rhs_sum
,
rhs_sum2
)
curr_score
=
lhs_std
*
lhs_cnt
+
rhs_std
*
rhs_cnt
if
curr_score
<
self
.
score
:
self
.
var_idx
,
self
.
score
,
self
.
split
=
var_idx
,
curr_score
,
xi
@
property
def
split_name
(
self
)
:
return
self
.
x
.
columns
[
self
.
var_idx
]
@
property
def
split_col
(
self
)
:
return
self
.
x
.
values
[
self
.
idxs
,
self
.
var_idx
]
@
property
def
is_leaf
(
self
)
:
return
self
.
score
==
float
(
'inf'
)
or
self
.
depth
<=
0
def
predict
(
self
,
x
)
:
return
np
.
array
(
[
self
.
predict_row
(
xi
)
for
xi
in
x
]
)
def
predict_row
(
self
,
xi
)
:
if
self
.
is_leaf
:
return
self
.
val
t
=
self
.
lhs
if
xi
[
self
.
var_idx
]
<=
self
.
split
else
self
.
rhs
return
t
.
predict_row
(
xi
)
我們使用屬性裝飾器使我們的代碼更加簡潔。
- __init__:決策樹構造函數。它有幾個有趣的片段可供研究:
a. 如果 idxs 為 None:idxs = np.arange(len(y)),如果我們沒有在這個特定樹的計算中指定行的索引,只需占用所有行;
b. self.val = np.mean(y[idxs]) 每個決策樹預測一個值,該值是它所持有的所有行的平均值。變量 self.val 保存樹的每個節點的預測。對于根節點,該值將僅僅是所有觀察值的平均值,因為它保留了所有行,因為我們尚未進行拆分。我在這里使用了“節點”這個詞,因為本質上決策樹只是一個節點,左邊是決策樹,右邊也是決策樹。
c. Self.score = float(“inf”) 節點的得分是根據它如何 “劃分” 原始數據集來進行計算的。我們稍后會定義這個 “好”,我們現在假設我們有辦法測量這樣的數量。此外,我們的節點將得分設置為無窮大,因為我們尚未進行任何拆分,因此我們存在的拆分無線差,表明任何拆分都優于不拆分。
d. self.find_varsplit() 我們首先進行拆分!
- find_varsplit:我們使用暴力方法找到最佳分裂。此函數按順序循環遍歷所有列,并在他們之間找到最佳分割。這個函數仍然不完整,因為它只進行一次拆分,后來我們擴展這個函數,為每個拆分做出左右決策,直到我們到達葉子節點。
- split_name:一個屬性裝飾器,用于返回我們要拆分的列的名稱。var_idx 是此列的索引,我們將在 find_better_split 函數中計算此索引以及我們拆分的列的值。
- split_col:一個屬性裝飾器,用于返回索引 var_idx 處的列,其中元素位于 indxs 變量給出的索引處。基本上,將列與選定的行隔離。
- find_better_split:這個函數是在某個列中找到最好的分割,這很復雜,所以我們在上面的代碼中把它看做是一個黑盒子。讓我們稍后再定義它。
- is_leaf:葉節點是從未進行過分割的節點,因此它具有無限分數,因此該函數用于標識葉節點。同樣,如果我們已經越過了最大深度,即 self.depth <= 0 ,它就是一個葉子節點,因為我們不能再深入了。
如何找到最好的分割點?
決策樹通過基于某些條件遞歸的將數據分為兩半來進行訓練。如果測試集在每列中有 10 列,每列有 10 個數據點,則總共可以進行 10*10 = 100 次拆分,我們手頭的任務是找到哪些拆分是最適合我們的數據。
我們根據將數據分為兩半,然后使得兩者中的每一個數據都是非常“相似的”。增加這種相似性的一種方法是減少兩半的方差或者標準偏差。因此,我們希望最小化兩邊標準差的加權平均值。我們使用貪婪算法通過將數據劃分為列中每個值的兩半來找到拆分,并計算兩半的標準偏差的加權平均值以找到最小值。
為了加快速度,我們可以復制一個列并對其進行排序,通過在第 n+1 個索引處使用 sum 的值和由第 n 個索引分割創建的兩半值的平方和來分割加權平均值來計算加權平能均值。這是基于以下標準偏差公式:
下面的圖像以圖形方式展示了分數計算的過程,每個圖像中的最后一列是表示分割得分的單個數字,即左右標準偏差的加權平均值。
我們繼續對每列進行排序:
現在我們按順序進行拆分:
index = 0
index = 1
Index = 2 (best split)
Index = 3
index = 4
index=5
通過簡單的貪婪算法,我們發現在 index = 2 時進行的拆分是最好的拆分,因為它得分最低。我們稍后對所有列執行相同的步驟并將它們全部比較以貪婪算法找到最小值。
以下是上述圖示表示的簡單代碼:
def
std_agg
(
cnt
,
s1
,
s2
)
:
return
math
.
sqrt
(
(
s2
/
cnt
)
-
(
s1
/
cnt
)
**
2
)
def
find_better_split
(
self
,
var_idx
)
:
x
,
y
=
self
.
x
.
values
[
self
.
idxs
,
var_idx
]
,
self
.
y
[
self
.
idxs
]
sort_idx
=
np
.
argsort
(
x
)
sort_y
,
sort_x
=
y
[
sort_idx
]
,
x
[
sort_idx
]
rhs_cnt
,
rhs_sum
,
rhs_sum2
=
self
.
n
,
sort_y
.
sum
(
)
,
(
sort_y
**
2
)
.
sum
(
)
lhs_cnt
,
lhs_sum
,
lhs_sum2
=
0
,
0
.
,
0
.
for
i
in
range
(
0
,
self
.
n
-
self
.
min_leaf
-
1
)
:
xi
,
yi
=
sort_x
[
i
]
,
sort_y
[
i
]
lhs_cnt
+=
1
;
rhs_cnt
-=
1
lhs_sum
+=
yi
;
rhs_sum
-=
yi
lhs_sum2
+=
yi
**
2
;
rhs_sum2
-=
yi
**
2
if
i
<
self
.
min_leaf
or
xi
==
sort_x
[
i
+
1
]
:
continue
lhs_std
=
std_agg
(
lhs_cnt
,
lhs_sum
,
lhs_sum2
)
rhs_std
=
std_agg
(
rhs_cnt
,
rhs_sum
,
rhs_sum2
)
curr_score
=
lhs_std
*
lhs_cnt
+
rhs_std
*
rhs_cnt
if
curr_score
<
self
.
score
:
self
.
var_idx
,
self
.
score
,
self
.
split
=
var_idx
,
curr_score
,
xi
上面的代碼我們需要一些解釋:
- 函數 std_agg 使用平方和的值來計算標準偏差;
- curr_score = lhs_std*lhs_cnt + rhs_std*rhs_cnt 每次迭代的分割得分只是兩個標準差的加權平均值。較低的分數有助于降低方差,較低的方差有助于對類似數據進行分組,從而實現更好的預測;
-
if curr_score
現在我們知道如何為所選列找到最佳拆分,我們需要遞歸的為每個決策樹進行拆分。對于每一棵樹,我們找到最好的列和它的值,然后我們遞歸的制作兩個決策樹,知道我們到達葉子及誒單。為此,我們將不完整的函數 find_varsplit 進行擴展:
def
find_varsplit
(
self
)
:
for
i
in
self
.
f_idxs
:
self
.
find_better_split
(
i
)
if
self
.
is_leaf
:
return
x
=
self
.
split_col
lhs
=
np
.
nonzero
(
x
<=
self
.
split
)
[
0
]
rhs
=
np
.
nonzero
(
x
>
self
.
split
)
[
0
]
lf_idxs
=
np
.
random
.
permutation
(
self
.
x
.
shape
[
1
]
)
[
:
self
.
n_features
]
rf_idxs
=
np
.
random
.
permutation
(
self
.
x
.
shape
[
1
]
)
[
:
self
.
n_features
]
self
.
lhs
=
DecisionTree
(
self
.
x
,
self
.
y
,
self
.
n_features
,
lf_idxs
,
self
.
idxs
[
lhs
]
,
depth
=
self
.
depth
-
1
,
min_leaf
=
self
.
min_leaf
)
self
.
rhs
=
DecisionTree
(
self
.
x
,
self
.
y
,
self
.
n_features
,
rf_idxs
,
self
.
idxs
[
rhs
]
,
depth
=
self
.
depth
-
1
,
min_leaf
=
self
.
min_leaf
)
完結
最后我們給出完整代碼:
class
RandomForest
(
)
:
def
__init__
(
self
,
x
,
y
,
n_trees
,
n_features
,
sample_sz
,
depth
=
10
,
min_leaf
=
5
)
:
np
.
random
.
seed
(
12
)
if
n_features
==
'sqrt'
:
self
.
n_features
=
int
(
np
.
sqrt
(
x
.
shape
[
1
]
)
)
elif
n_features
==
'log2'
:
self
.
n_features
=
int
(
np
.
log2
(
x
.
shape
[
1
]
)
)
else
:
self
.
n_features
=
n_features
print
(
self
.
n_features
,
"sha: "
,
x
.
shape
[
1
]
)
self
.
x
,
self
.
y
,
self
.
sample_sz
,
self
.
depth
,
self
.
min_leaf
=
x
,
y
,
sample_sz
,
depth
,
min_leaf
self
.
trees
=
[
self
.
create_tree
(
)
for
i
in
range
(
n_trees
)
]
def
create_tree
(
self
)
:
idxs
=
np
.
random
.
permutation
(
len
(
self
.
y
)
)
[
:
self
.
sample_sz
]
f_idxs
=
np
.
random
.
permutation
(
self
.
x
.
shape
[
1
]
)
[
:
self
.
n_features
]
return
DecisionTree
(
self
.
x
.
iloc
[
idxs
]
,
self
.
y
[
idxs
]
,
self
.
n_features
,
f_idxs
,
idxs
=
np
.
array
(
range
(
self
.
sample_sz
)
)
,
depth
=
self
.
depth
,
min_leaf
=
self
.
min_leaf
)
def
predict
(
self
,
x
)
:
return
np
.
mean
(
[
t
.
predict
(
x
)
for
t
in
self
.
trees
]
,
axis
=
0
)
def
std_agg
(
cnt
,
s1
,
s2
)
:
return
math
.
sqrt
(
(
s2
/
cnt
)
-
(
s1
/
cnt
)
**
2
)
class
DecisionTree
(
)
:
def
__init__
(
self
,
x
,
y
,
n_features
,
f_idxs
,
idxs
,
depth
=
10
,
min_leaf
=
5
)
:
self
.
x
,
self
.
y
,
self
.
idxs
,
self
.
min_leaf
,
self
.
f_idxs
=
x
,
y
,
idxs
,
min_leaf
,
f_idxs
self
.
depth
=
depth
print
(
f_idxs
)
# print(self.depth)
self
.
n_features
=
n_features
self
.
n
,
self
.
c
=
len
(
idxs
)
,
x
.
shape
[
1
]
self
.
val
=
np
.
mean
(
y
[
idxs
]
)
self
.
score
=
float
(
'inf'
)
self
.
find_varsplit
(
)
def
find_varsplit
(
self
)
:
for
i
in
self
.
f_idxs
:
self
.
find_better_split
(
i
)
if
self
.
is_leaf
:
return
x
=
self
.
split_col
lhs
=
np
.
nonzero
(
x
<=
self
.
split
)
[
0
]
rhs
=
np
.
nonzero
(
x
>
self
.
split
)
[
0
]
lf_idxs
=
np
.
random
.
permutation
(
self
.
x
.
shape
[
1
]
)
[
:
self
.
n_features
]
rf_idxs
=
np
.
random
.
permutation
(
self
.
x
.
shape
[
1
]
)
[
:
self
.
n_features
]
self
.
lhs
=
DecisionTree
(
self
.
x
,
self
.
y
,
self
.
n_features
,
lf_idxs
,
self
.
idxs
[
lhs
]
,
depth
=
self
.
depth
-
1
,
min_leaf
=
self
.
min_leaf
)
self
.
rhs
=
DecisionTree
(
self
.
x
,
self
.
y
,
self
.
n_features
,
rf_idxs
,
self
.
idxs
[
rhs
]
,
depth
=
self
.
depth
-
1
,
min_leaf
=
self
.
min_leaf
)
def
find_better_split
(
self
,
var_idx
)
:
x
,
y
=
self
.
x
.
values
[
self
.
idxs
,
var_idx
]
,
self
.
y
[
self
.
idxs
]
sort_idx
=
np
.
argsort
(
x
)
sort_y
,
sort_x
=
y
[
sort_idx
]
,
x
[
sort_idx
]
rhs_cnt
,
rhs_sum
,
rhs_sum2
=
self
.
n
,
sort_y
.
sum
(
)
,
(
sort_y
**
2
)
.
sum
(
)
lhs_cnt
,
lhs_sum
,
lhs_sum2
=
0
,
0
.
,
0
.
for
i
in
range
(
0
,
self
.
n
-
self
.
min_leaf
-
1
)
:
xi
,
yi
=
sort_x
[
i
]
,
sort_y
[
i
]
lhs_cnt
+=
1
;
rhs_cnt
-=
1
lhs_sum
+=
yi
;
rhs_sum
-=
yi
lhs_sum2
+=
yi
**
2
;
rhs_sum2
-=
yi
**
2
if
i
<
self
.
min_leaf
or
xi
==
sort_x
[
i
+
1
]
:
continue
lhs_std
=
std_agg
(
lhs_cnt
,
lhs_sum
,
lhs_sum2
)
rhs_std
=
std_agg
(
rhs_cnt
,
rhs_sum
,
rhs_sum2
)
curr_score
=
lhs_std
*
lhs_cnt
+
rhs_std
*
rhs_cnt
if
curr_score
<
self
.
score
:
self
.
var_idx
,
self
.
score
,
self
.
split
=
var_idx
,
curr_score
,
xi
@
property
def
split_name
(
self
)
:
return
self
.
x
.
columns
[
self
.
var_idx
]
@
property
def
split_col
(
self
)
:
return
self
.
x
.
values
[
self
.
idxs
,
self
.
var_idx
]
@
property
def
is_leaf
(
self
)
:
return
self
.
score
==
float
(
'inf'
)
or
self
.
depth
<=
0
def
predict
(
self
,
x
)
:
return
np
.
array
(
[
self
.
predict_row
(
xi
)
for
xi
in
x
]
)
def
predict_row
(
self
,
xi
)
:
if
self
.
is_leaf
:
return
self
.
val
t
=
self
.
lhs
if
xi
[
self
.
var_idx
]
<=
self
.
split
else
self
.
rhs
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
