通常在使用Spark算子函數(shù),比如使用map()或者reduce函數(shù)我們向函數(shù)傳入條件時(shí),函數(shù)內(nèi)部可以使用驅(qū)動(dòng)程序中定義的變量,但是這樣會(huì)使集群中所有任務(wù)都會(huì)得到變量新的副本,這些副本的更新不會(huì)傳播回驅(qū)動(dòng)程序,導(dǎo)致讀寫共享變量效率低下或者內(nèi)存溢出,為了解決這個(gè)問題Spark提供了兩種共享變量類型:廣播變量和累加器
- 廣播變量:用來(lái)高效分發(fā)較大對(duì)象,只能在Driver定義,不能在Executor端定義,同時(shí)RDD不存儲(chǔ)數(shù)據(jù)所以不能廣播出去
- 累加器:用來(lái)對(duì)信息進(jìn)行聚合,常用場(chǎng)景reduce()
不使用廣播變量,直接定義一個(gè)變量list,然后在filter()來(lái)判斷元素是否存在list中,實(shí)現(xiàn)代碼如下:
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="broadcast", master="local[*]")
list = [2, 4, 6, 8]
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
print(rdd.filter(lambda x: list.__contains__(x)).collect())
# [2, 4]
sc.stop()
定義的list變量在驅(qū)動(dòng)端Driver創(chuàng)建的,但是要到Executor端運(yùn)行,Driver端會(huì)把list以task形式拷貝到Executor端,如果有很多task那么就會(huì)有很多l(xiāng)ist復(fù)制過(guò)去,這個(gè)list非常大的時(shí)候就會(huì)造成內(nèi)存溢出,關(guān)系圖如下所示:
使用廣播變量,變量只會(huì)被發(fā)送到各節(jié)點(diǎn)一次,同時(shí)存放在Executor的BlockManager中,實(shí)現(xiàn)代碼如下:
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="broadcast", master="local[*]")
list = [2, 4, 6, 8]
bclist = sc.broadcast(list)
data = [1, 2, 3]
rdd = sc.parallelize(data)
print(rdd.map(lambda x: bclist.value[x]).collect())
# [4, 6, 8]
sc.stop()
關(guān)系圖如下:
累加器,對(duì)作業(yè)執(zhí)行過(guò)程事件進(jìn)行計(jì)數(shù),實(shí)現(xiàn)代碼如下:
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="broadcast", master="local[*]")
data = [1, 2, 3]
rdd = sc.parallelize(data)
accumulator = sc.accumulator(0)
rdd.map(lambda x: accumulator.add(1)).collect()
print(accumulator.value)
# 3
sc.stop()
關(guān)系圖如下:
累加器在Driver端定義賦初始值,在Executor端更新,最后在Driver端讀取最后的值。
?
Spark學(xué)習(xí)目錄:
- Spark學(xué)習(xí)實(shí)例1(Python):?jiǎn)卧~統(tǒng)計(jì) Word Count
- Spark學(xué)習(xí)實(shí)例2(Python):加載數(shù)據(jù)源Load Data Source
- Spark學(xué)習(xí)實(shí)例3(Python):保存數(shù)據(jù)Save Data
- Spark學(xué)習(xí)實(shí)例4(Python):RDD轉(zhuǎn)換 Transformations
- Spark學(xué)習(xí)實(shí)例5(Python):RDD執(zhí)行 Actions
- Spark學(xué)習(xí)實(shí)例6(Python):共享變量Shared Variables
- Spark學(xué)習(xí)實(shí)例7(Python):RDD、DataFrame、DataSet相互轉(zhuǎn)換
- Spark學(xué)習(xí)實(shí)例8(Python):輸入源實(shí)時(shí)處理 Input Sources Streaming
- Spark學(xué)習(xí)實(shí)例9(Python):窗口操作 Window Operations
?
?
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
