足球资料库数据/孙祥/nba五佳球/足球直播哪个平台好 - cctv5今日现场直播

首頁 > 知識庫 > 正文

Spark的性能調(diào)優(yōu)
2016-01-20 17:49:38   來源:四火   評論:0 點擊:

本文是對Spark調(diào)優(yōu)的一個總結(jié),從內(nèi)存、CPU、序列化與傳輸、文件讀寫和任務(wù)五個方面進(jìn)行了講解調(diào)優(yōu)的過程。

基本概念和原則

首先,要搞清楚Spark的幾個基本概念和原則,否則系統(tǒng)的性能調(diào)優(yōu)無從談起:

  • 每一臺host上面可以并行N個worker,每一個worker下面可以并行M個executor,task們會被分配到executor上面去執(zhí)行。Stage指的是一組并行運行的task,stage內(nèi)部是不能出現(xiàn)shuffle的,因為shuffle的就像籬笆一樣阻止了并行task的運行,遇到shuffle就意味著到了stage的邊界。
  • CPU的core數(shù)量,每個executor可以占用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor占用了多個core,但是總的CPU使用率卻不高(因為一個executor并不總能充分利用多核的能力),這個時候可以考慮讓么個executor占用更少的core,同時worker下面增加更多的executor,或者一臺host上面增加更多的worker來增加并行執(zhí)行的executor的數(shù)量,從而增加CPU利用率。但是增加executor的時候需要考慮好內(nèi)存消耗,因為一臺機(jī)器的內(nèi)存分配給越多的executor,每個executor的內(nèi)存就越小,以致出現(xiàn)過多的數(shù)據(jù)spill over甚至out of memory的情況。
  • partition和parallelism,partition指的就是數(shù)據(jù)分片的數(shù)量,每一次task只能處理一個partition的數(shù)據(jù),這個值太小了會導(dǎo)致每片數(shù)據(jù)量太大,導(dǎo)致內(nèi)存壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導(dǎo)致分片太多,執(zhí)行效率降低。在執(zhí)行action類型操作的時候(比如各種reduce操作),partition的數(shù)量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進(jìn)行reduce類操作的時候,默認(rèn)返回數(shù)據(jù)的paritition數(shù)量(而在進(jìn)行map類操作的時候,partition數(shù)量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的參數(shù)沒有影響)。所以說,這兩個概念密切相關(guān),都是涉及到數(shù)據(jù)分片的,作用方式其實是統(tǒng)一的。通過spark.default.parallelism可以設(shè)置默認(rèn)的分片數(shù)量,而很多RDD的操作都可以指定一個partition參數(shù)來顯式控制具體的分片數(shù)量。
  • 上面這兩條原理上看起來很簡單,但是卻非常重要,根據(jù)硬件和任務(wù)的情況選擇不同的取值。想要取一個放之四海而皆準(zhǔn)的配置是不現(xiàn)實的??催@樣幾個例子:(1)實踐中跑的EMR Spark job,有的特別慢,查看CPU利用率很低,我們就嘗試減少每個executor占用CPU core的數(shù)量,增加并行的executor數(shù)量,同時配合增加分片,整體上增加了CPU的利用率,加快數(shù)據(jù)處理速度。(2)發(fā)現(xiàn)某job很容易發(fā)生內(nèi)存溢出,我們就增大分片數(shù)量,從而減少了每片數(shù)據(jù)的規(guī)模,同時還減少并行的executor數(shù)量,這樣相同的內(nèi)存資源分配給數(shù)量更少的executor,相當(dāng)于增加了每個task的內(nèi)存分配,這樣運行速度可能慢了些,但是總比OOM強(qiáng)。(3)數(shù)據(jù)量特別少,有大量的小文件生成,就減少文件分片,沒必要創(chuàng)建那么多task,這種情況,如果只是最原始的input比較小,一般都能被注意到;但是,如果是在運算過程中,比如應(yīng)用某個reduceBy或者某個filter以后,數(shù)據(jù)大量減少,這種低效情況就很少被留意到。
  • 最后再補(bǔ)充一點,隨著參數(shù)和配置的變化,性能的瓶頸是變化的,在分析問題的時候不要忘記。例如在每臺機(jī)器上部署的executor數(shù)量增加的時候,性能一開始是增加的,同時也觀察到CPU的平均使用率在增加;但是隨著單臺機(jī)器上的executor越來越多,性能下降了,因為隨著executor的數(shù)量增加,被分配到每個executor的內(nèi)存數(shù)量減小,在內(nèi)存里直接操作的越來越少,spill over到磁盤上的數(shù)據(jù)越來越多,自然性能就變差了。

下面給這樣一個直觀的例子,當(dāng)前總的cpu利用率并不高:

但是經(jīng)過根據(jù)上述原則的的調(diào)整之后,可以顯著發(fā)現(xiàn)cpu總利用率增加了:

其次,涉及性能調(diào)優(yōu)我們經(jīng)常要改配置,在Spark里面有三種常見的配置方式,雖然有些參數(shù)的配置是可以互相替代,但是作為最佳實踐,還是需要遵循不同的情形下使用不同的配置:

  1. 設(shè)置環(huán)境變量,這種方式主要用于和環(huán)境、硬件相關(guān)的配置;
  2. 命令行參數(shù),這種方式主要用于不同次的運行會發(fā)生變化的參數(shù),用雙橫線開頭;
  3. 代碼里面(比如Scala)顯式設(shè)置(SparkConf對象),這種配置通常是application級別的配置,一般不改變。

舉一個配置的具體例子。slave、worker和executor之間的比例調(diào)整。我們經(jīng)常需要調(diào)整并行的executor的數(shù)量,那么簡單說有兩種方式:

  1. 每個worker內(nèi)始終跑一個executor,但是調(diào)整單臺slave上并行的worker的數(shù)量。比如,SPARK_WORKER_INSTANCES可以設(shè)置每個slave的worker的數(shù)量,但是在改變這個參數(shù)的時候,比如改成2,一定要相應(yīng)設(shè)置SPARK_WORKER_CORES的值,讓每個worker使用原有一半的core,這樣才能讓兩個worker一同工作;
  2. 每臺slave內(nèi)始終只部署一個worker,但是worker內(nèi)部署多個executor。我們是在YARN框架下采用這個調(diào)整來實現(xiàn)executor數(shù)量改變的,一種典型辦法是,一個host只跑一個worker,然后配置spark.executor.cores為host上CPU core的N分之一,同時也設(shè)置spark.executor.memory為host上分配給Spark計算內(nèi)存的N分之一,這樣這個host上就能夠啟動N個executor。
    有的配置在不同的MR框架/工具下是不一樣的,比如YARN下有的參數(shù)的默認(rèn)取值就不同,這點需要注意。

明確這些基礎(chǔ)的事情以后,再來一項一項看性能調(diào)優(yōu)的要點。

內(nèi)存

Memory Tuning,Java對象會占用原始數(shù)據(jù)2~5倍甚至更多的空間。最好的檢測對象內(nèi)存消耗的辦法就是創(chuàng)建RDD,然后放到cache里面去,然后在UI上面看storage的變化;當(dāng)然也可以使用SizeEstimator來估算。使用-XX:+UseCompressedOops選項可以壓縮指針(8字節(jié)變成4字節(jié))。在調(diào)用collect等等API的時候也要小心——大塊數(shù)據(jù)往內(nèi)存拷貝的時候心里要清楚。內(nèi)存要留一些給操作系統(tǒng),比如20%,這里面也包括了OS的buffer cache,如果預(yù)留得太少了,會見到這樣的錯誤:

“ Required executor memory (235520+23552 MB) is above the max threshold (241664 MB) of this cluster! Please increase the value of ‘yarn.scheduler.maximum-allocation-mb’.

或者干脆就沒有這樣的錯誤,但是依然有因為內(nèi)存不足導(dǎo)致的問題,有的會有警告,比如這個:

“ 16/01/13 23:54:48 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

有的時候連這樣的日志都見不到,而是見到一些不清楚原因的executor丟失信息:

“ Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 17.0 failed 4 times, most recent failure: Lost task 12.3 in stage 17.0 (TID 1257, ip-10-184-192-56.ec2.internal): ExecutorLostFailure (executor 79 lost)

Reduce Task的內(nèi)存使用。在某些情況下reduce task特別消耗內(nèi)存,比如當(dāng)shuffle出現(xiàn)的時候,比如sortByKey、groupByKey、reduceByKey和join等,要在內(nèi)存里面建立一個巨大的hash table。其中一個解決辦法是增大level of parallelism,這樣每個task的輸入規(guī)模就相應(yīng)減小。另外,注意shuffle的內(nèi)存上限設(shè)置,有時候有足夠的內(nèi)存,但是shuffle內(nèi)存不夠的話,性能也是上不去的。我們在有大量數(shù)據(jù)join等操作的時候,shuffle的內(nèi)存上限經(jīng)常配置到executor的50%。

注意原始input的大小,有很多操作始終都是需要某類全集數(shù)據(jù)在內(nèi)存里面完成的,那么并非拼命增加parallelism和partition的值就可以把內(nèi)存占用減得非常小的。我們遇到過某些性能低下甚至OOM的問題,是改變這兩個參數(shù)所難以緩解的。但是可以通過增加每臺機(jī)器的內(nèi)存,或者增加機(jī)器的數(shù)量都可以直接或間接增加內(nèi)存總量來解決。

在選擇EC2機(jī)器類型的時候,要明確瓶頸(可以借由測試來明確),比如我們遇到的情況就是使用r3.8 xlarge和c3.8 xlarge選擇的問題,運算能力相當(dāng),前者比后者貴50%,但是內(nèi)存是后者的5倍。

另外,有一些RDD的API,比如cache,persist,都會把數(shù)據(jù)強(qiáng)制放到內(nèi)存里面,如果并不明確這樣做帶來的好處,就不要用它們。

CPU

Level of Parallelism。指定它以后,在進(jìn)行reduce類型操作的時候,默認(rèn)partition的數(shù)量就被指定了。這個參數(shù)在實際工程中通常是必不可少的,一般都要根據(jù)input和每個executor內(nèi)存的大小來確定。設(shè)置level of parallelism或者屬性spark.default.parallelism來改變并行級別,通常來說,每一個CPU核可以分配2~3個task。

CPU core的訪問模式是共享還是獨占。即CPU核是被同一host上的executor共享還是瓜分并獨占。比如,一臺機(jī)器上共有32個CPU core的資源,同時部署了兩個executor,總內(nèi)存是50G,那么一種方式是配置spark.executor.cores為16,spark.executor.memory為20G,這樣由于內(nèi)存的限制,這臺機(jī)器上會部署兩個executor,每個都使用20G內(nèi)存,并且各使用“獨占”的16個CPU core資源;而在內(nèi)存資源不變的前提下,也可以讓這兩個executor“共享”這32個core。根據(jù)我的測試,獨占模式的性能要略好與共享模式。

GC調(diào)優(yōu)。打印GC信息:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps。要記得默認(rèn)60%的executor內(nèi)存可以被用來作為RDD的緩存,因此只有40%的內(nèi)存可以被用來作為對象創(chuàng)建的空間,這一點可以通過設(shè)置spark.storage.memoryFraction改變。如果有很多小對象創(chuàng)建,但是這些對象在不完全GC的過程中就可以回收,那么增大Eden區(qū)會有一定幫助。如果有任務(wù)從HDFS拷貝數(shù)據(jù),內(nèi)存消耗有一個簡單的估算公式——比如HDFS的block size是64MB,工作區(qū)內(nèi)有4個task拷貝數(shù)據(jù),而解壓縮一個block要增大3倍大小,那么估算內(nèi)存消耗就是:4*3*64MB。另外,工作中遇到過這樣的一個問題:GC默認(rèn)情況下有一個限制,默認(rèn)是GC時間不能超過2%的CPU時間,但是如果大量對象創(chuàng)建(在Spark里很容易出現(xiàn),代碼模式就是一個RDD轉(zhuǎn)下一個RDD),就會導(dǎo)致大量的GC時間,從而出現(xiàn)“OutOfMemoryError: GC overhead limit exceeded”,對于這個,可以通過設(shè)置-XX:-UseGCOverheadLimit關(guān)掉它。

序列化和傳輸

Data Serialization,默認(rèn)使用的是Java Serialization,這個程序員最熟悉,但是性能、空間表現(xiàn)都比較差。還有一個選項是Kryo Serialization,更快,壓縮率也更高,但是并非支持任意類的序列化。在Spark UI上能夠看到序列化占用總時間開銷的比例,如果這個比例高的話可以考慮優(yōu)化內(nèi)存使用和序列化。

Broadcasting Large Variables。在task使用靜態(tài)大對象的時候,可以把它broadcast出去。Spark會打印序列化后的大小,通常來說如果它超過20KB就值得這么做。有一種常見情形是,一個大表join一個小表,把小表broadcast后,大表的數(shù)據(jù)就不需要在各個node之間瘋跑,安安靜靜地呆在本地等小表broadcast過來就好了。

Data Locality。數(shù)據(jù)和代碼要放到一起才能處理,通常代碼總比數(shù)據(jù)要小一些,因此把代碼送到各處會更快。Data Locality是數(shù)據(jù)和處理的代碼在屋里空間上接近的程度:PROCESS_LOCAL(同一個JVM)、NODE_LOCAL(同一個node,比如數(shù)據(jù)在HDFS上,但是和代碼在同一個node)、NO_PREF、RACK_LOCAL(不在同一個server,但在同一個機(jī)架)、ANY。當(dāng)然優(yōu)先級從高到低,但是如果在空閑的executor上面沒有未處理數(shù)據(jù)了,那么就有兩個選擇:

(1)要么等如今繁忙的CPU閑下來處理盡可能“本地”的數(shù)據(jù),
(2)要么就不等直接啟動task去處理相對遠(yuǎn)程的數(shù)據(jù)。
默認(rèn)當(dāng)這種情況發(fā)生Spark會等一會兒(spark.locality),即策略(1),如果繁忙的CPU停不下來,就會執(zhí)行策略(2)。

代碼里對大對象的引用。在task里面引用大對象的時候要小心,因為它會隨著task序列化到每個節(jié)點上去,引發(fā)性能問題。只要序列化的過程不拋出異常,引用對象序列化的問題事實上很少被人重視。如果,這個大對象確實是需要的,那么就不如干脆把它變成RDD好了。絕大多數(shù)時候,對于大對象的序列化行為,是不知不覺發(fā)生的,或者說是預(yù)期之外的,比如在我們的項目中有這樣一段代碼:

rdd.map(r => { println(BackfillTypeIndex) }) 

其實呢,它等價于這樣:

rdd.map(r => { println(this.BackfillTypeIndex) }) 

不要小看了這個this,有時候它的序列化是非常大的開銷。

對于這樣的問題,一種最直接的解決方法就是:

val dereferencedVariable = this.BackfillTypeIndex rdd.map(r => println(dereferencedVariable)) // "this" is not serialized 

相關(guān)地,注解@transient用來標(biāo)識某變量不要被序列化,這對于將大對象從序列化的陷阱中排除掉是很有用的。另外,注意class之間的繼承層級關(guān)系,有時候一個小的case class可能來自一棵大樹。

文件讀寫

文件存儲和讀取的優(yōu)化。比如對于一些case而言,如果只需要某幾列,使用rcfile和parquet這樣的格式會大大減少文件讀取成本。再有就是存儲文件到S3上或者HDFS上,可以根據(jù)情況選擇更合適的格式,比如壓縮率更高的格式。另外,特別是對于shuffle特別多的情況,考慮留下一定量的額外內(nèi)存給操作系統(tǒng)作為操作系統(tǒng)的buffer cache,比如總共50G的內(nèi)存,JVM最多分配到40G多一點。

文件分片。比如在S3上面就支持文件以分片形式存放,后綴是partXX。使用coalesce方法來設(shè)置分成多少片,這個調(diào)整成并行級別或者其整數(shù)倍可以提高讀寫性能。但是太高太低都不好,太低了沒法充分利用S3并行讀寫的能力,太高了則是小文件太多,預(yù)處理、合并、連接建立等等都是時間開銷啊,讀寫還容易超過throttle。

任務(wù)

Spark的Speculation。通過設(shè)置spark.speculation等幾個相關(guān)選項,可以讓Spark在發(fā)現(xiàn)某些task執(zhí)行特別慢的時候,可以在不等待完成的情況下被重新執(zhí)行,最后相同的task只要有一個執(zhí)行完了,那么最快執(zhí)行完的那個結(jié)果就會被采納。

減少Shuffle。其實Spark的計算往往很快,但是大量開銷都花在網(wǎng)絡(luò)和IO上面,而shuffle就是一個典型。舉個例子,如果(k, v1) join (k, v2) => (k, v3),那么,這種情況其實Spark是優(yōu)化得非常好的,因為需要join的都在一個node的一個partition里面,join很快完成,結(jié)果也是在同一個node(這一系列操作可以被放在同一個stage里面)。但是如果數(shù)據(jù)結(jié)構(gòu)被設(shè)計為(obj1) join (obj2) => (obj3),而其中的join條件為obj1.column1 == obj2.column1,這個時候往往就被迫shuffle了,因為不再有同一個key使得數(shù)據(jù)在同一個node上的強(qiáng)保證。在一定要shuffle的情況下,盡可能減少shuffle前的數(shù)據(jù)規(guī)模,比如這個避免groupByKey的例子。下面這個比較的圖片來自Spark Summit 2013的一個演講,講的是同一件事情:

Repartition。運算過程中數(shù)據(jù)量時大時小,選擇合適的partition數(shù)量關(guān)系重大,如果太多partition就導(dǎo)致有很多小任務(wù)和空任務(wù)產(chǎn)生;如果太少則導(dǎo)致運算資源沒法充分利用,必要時候可以使用repartition來調(diào)整,不過它也不是沒有代價的,其中一個最主要代價就是shuffle。再有一個常見問題是數(shù)據(jù)大小差異太大,這種情況主要是數(shù)據(jù)的partition的key其實取值并不均勻造成的(默認(rèn)使用HashPartitioner),需要改進(jìn)這一點,比如重寫hash算法。測試的時候想知道partition的數(shù)量可以調(diào)用rdd.partitions().size()獲知。

Task時間分布。關(guān)注Spark UI,在Stage的詳情頁面上,可以看得到shuffle寫的總開銷,GC時間,當(dāng)前方法棧,還有task的時間花費。如果你發(fā)現(xiàn)task的時間花費分布太散,就是說有的花費時間很長,有的很短,這就說明計算分布不均,需要重新審視數(shù)據(jù)分片、key的hash、task內(nèi)部的計算邏輯等等,瓶頸出現(xiàn)在耗時長的task上面。

重用資源。有的資源申請開銷巨大,而且往往相當(dāng)有限,比如建立連接,可以考慮在partition建立的時候就創(chuàng)建好(比如使用mapPartition方法),這樣對于每個partition內(nèi)的每個元素的操作,就只要重用這個連接就好了,不需要重新建立連接。

編者按:本文由作者授權(quán)轉(zhuǎn)載,原文地址,基于作者本身的經(jīng)驗以及官方和別的工程師的總結(jié),作者提供了可供參考的文檔:官方調(diào)優(yōu)文檔Tuning Spark,Spark配置的官方文檔,Spark Programming Guide,JVMGC調(diào)優(yōu)文檔,JVM性能調(diào)優(yōu)文檔,How-to: Tune Your Apache Spark Jobs part-1 & part-2。


感謝杜小芳對本文的審校。

給InfoQ中文站投稿或者參與內(nèi)容翻譯工作,請郵件至editors@cn.infoq.com。也歡迎大家通過新浪微博(@InfoQ@丁曉昀),微信(微信號:InfoQChina)關(guān)注我們,并與我們的編輯和其他讀者朋友交流(歡迎加入InfoQ讀者交流群InfoQ好讀者(已滿),InfoQ讀者交流群(#2)InfoQ好讀者)。

相關(guān)熱詞搜索:Spark performance tuning 架構(gòu) & 設(shè)計 語言 & 開發(fā) 他山之石

上一篇:OpenSSH新曝出嚴(yán)重Bug,影響廣泛。
下一篇:百度開源其人工智能系統(tǒng):Warp-CTC

分享到: 收藏