1. 基本概念
1.1.1 彈性分散式數據集
RDD 是DPark 的核心概念,是DPark 使用的一種數據模型,RDD 的一個重要特徵就是在計算過程中,一個RDD 可以在不同的並行循環中被重複利用。
RDD 是一種支持容錯、可進行並行計算的元素集合。一個RDD 由多個分片(Split)組成,分片是計算過程中並行計算的基本單位。目前,DPark 支持從以下兩種數據源中獲取RDD。
(1)並行化數據集:可以將一個普通的Python 數據集合(如list)拆分為若干分片後組成一個RDD。
(2)分散式檔案系統中的單個或者多個檔案:即將分散式檔案系統中的檔案按照行拆分後,組成一個RDD,目前,DPark 支持兩種格式的檔案,即文本格式檔案和CSV 格式檔案。
從上述數據源生成的RDD以及在並行計算過程中新生成的RDD都支持相同的操作和變換。在DPark 中通過上述兩種數據源生成RDD 的具體方法如下。
① 從並行化數據集生成RDD。
對於並行化的數據集,可以調用DparkContext 的parallelize 函式得到,方法是:rdd = ctx.parallelize(seq, numSlices)。其中,參數numSlices 表示將數據集seq 劃分的分片數,DPark 會將每個分片作為一個任務傳送到集群上進行並行計算,對於基於Mesos 的集群節點,通常每個CPU 可以運行2~4 個任務。numSlices 針對不同的集群設定,也都有相應的默認值。
② 從分散式檔案系統生成RDD。
DPark 可以讀取從分散式檔案系統中的一個或多個檔案生成RDD,目前支持的檔案格式有文本檔案和CSV 格式檔案,而且將來會增加對更多檔案格式的支持,例如,支持從MySQL 中讀取數據。當然,用戶也可以自己按照RDD 的要求實現對應的RDD。
● 讀取文本檔案為RDD。
可以調用DparkContext 的textFile 方法從文本檔案生成RDD。text_rdd = ctx.textFile('data.txt', splitSize=64<<20)通過這種方式創建的RDD 中,每個元素為源檔案中的一行,包含行結尾的回車符,splitSize 指定RDD 中每個分片的大小,默認為64MB。
● 從CSV 格式檔案生成RDD。
可以調用DparkContext 的csvFile 方法從文本檔案生成RDD。csv_rdd = ctx.csvFile('data.csv', splitSize=32<<20, dialect='excel')通過這種方式生成的RDD 中,每個元素為源檔案中每一行分割後生成的數組。splitSize 指定RDD 中每個分片的大小,默認為32MB。dialect 參數指定csv 檔案中使用的分隔設定,具體請參見csv.reader,默認使用逗號('excel')分割。
1.1.2 共享變數
在 DPark 中,具體的計算過程發生在集群的每個計算節點上,所以DPark需要將RDD 的分片數據和計算函式(如map 和reduce 函式)進行序列化,通過網路傳送到計算節點上,然後在計算節點上執行反序列化,在這個過程中,計算操作所依賴的全局變數、模組和閉包等也會被複製到該計算節點上,所以,在計算節點中對普通變數的修改不會影響到主程式中的變數。
對於在計算過程中數據共享的需要,DPark 通過提供共享變數來實現。共享變數在不同的計算任務之間可以進行共享式的讀/寫,DPark 目前支持兩種類型的共享變數:唯讀廣播變數和只可以寫的累加器。
● 廣播變數。
DPark 支持將變數一次性傳送到集群的所有計算節點上,這種變數稱為廣播變數。使用廣播變數避免了變數在每次調用時需要通過網路傳輸,執行序列化和反序列化等操作。這種情況一般用在計算函式需要依賴一個特別大的數據集的時候。需要注意的是,被廣播的對象大小不能超出計算節點的記憶體限制。DPark 使用較高效率的廣播算法執行廣播變數在集群中的傳遞,目前支持分散式檔案系統和樹形結構兩種廣播算法。
●累加器。
累加器可以在執行數據量較小的任務的時候,用於收集任務產生的少量數據。累加器只支持add 操作,不支持刪除、更新等操作,DPark 默認使用的累加器可以支持數值類型、list 類型和dict 類型,用戶也可以自定義累加器。
DPark的計算模型
DPark 並行計算模型基於上節講到的DPark 的兩個基本概念,一是基於對RDD 的分散式計算,二是基於計算過程中能夠通過不同的機器訪問的共享變數。共享變數必須很容易在現存的分散式檔案系統中實現。RDD 能夠在多次循環計算過程中反覆被利用,所以DPark 支持將RDD 快取在計算節點的記憶體中以加快計算。
RDD 目前支持兩種類型的並行計算。
(1)變換
變換是將現有的RDD 通過運算變成另外一個RDD,例如,DPark 中的map、filter 操作就是該功能。
(2)操作
操作是指將現有的RDD 進行聚合運算,然後將計算結果立即返回給主程式,例如,DPark 中的求和操作count。需要說明的是,所有的RDD 的變換都是滯後的,當在一個RDD 上調用變換函式(如map)的時候,並沒有立即執行該計算,只是生成了一個攜帶計算信息的新的RDD,只有當RDD 需要將某個操作結果返回給主程式的時候,才執行真正的計算。這種設計有如下好處。
(1)提高效率:DPark 可以自動將多個變換操作進行合併,然後同時運行,這樣可以最大程度地減少數據傳輸量。
(2)容錯:在某個分片計算失敗的情況下,RDD 由於攜帶有計算信息,因此可以重新執行計算。
另外,RDD 支持一種很特別且很重要的變換,即快取(cache),當在某個RDD 上調用cache()函式的時候,每個計算節點會將分配給自己的計算分片的結果快取在記憶體中,當以後需要對該RDD 或者從該RDD 轉換來的RDD 進行操作的時候,計算節點就可以直接從記憶體中取得該RDD 的計算結果。很顯然,當整個計算過程需要對RDD 進行重複利用時,快取技術將大大提高計算性能。這種設計方法對於疊代式計算非常有幫助。
3. 工作原理
在前面DPark 和MapReduce 的比較一節中已經提到,DPark 和MapReduce的關鍵性區別在於處理數據流上。MapReduce 基於非循環的數據流模型,而DPark 對於需要重複使用數據集的疊代式算法具有較高的效率。這其中的關鍵是DPark 使用了一種特殊的數據模型,即RDD。
RDD 的設計目標是既保留了MapReduce 等數據流處理模型優點,例如,自動容錯、數據本地化、可拓展性強等,也添加了自己獨特的特性,即將一部分數據快取在記憶體中,以加快對這部分數據的查詢和計算的效率。
RDD 可以認為提供了一個高度限制的共享記憶體,RDD 唯讀或者只能從別的RDD 轉化而來。這些限制可以降低自動容錯需要的開銷,RDD 使用一種稱為“血統”的容錯機制,也就是每個RDD 都攜帶了它是如何從別的RDD 轉變過來的信息以及如何重建某一分片的信息。
在 RDD 這種數據模型出現之前,也有很多數據處理模型被創造出來解決非循環的數據模型中的問題。例如,Google 的Pregel(疊代圖計算框架)、Twister和HaLoop(疊代MapReduce 框架)等,這些模型的一個共同特徵就是套用場景有限,這些模型是根據企業(如Google)自己的業務需求而開發出來的,所以無法通用。而基於RDD 的DPark 提供了一種更加通用的疊代式並行計算框架,用戶可以顯示控制計算的中間結果,自由控制計算的流程。
RDD 的發明者已經在Spark 的基礎上實現了Pregel(100 行Scala 代碼)以及疊代MapReduce 框架(200 行Scala 代碼),從實現代碼行數上可以看到,Spark提供了一個很簡潔的編程模型。Spark 在其他框架不能很好適用的場景下也可以達到很好的效果。例如,在互動式的大數據查詢上。一些實踐和研究表明,Spark 在那些疊代式計算中比Hadoop 快20 倍,能夠在5~7s 的時間內互動式地查詢1TB 的數據(比Hadoop 快40 倍)。
4. 容錯機制
通常,實現分散式數據集容錯有兩種常用的方法,即數據檢查點和記錄更新。
考慮到RDD 的套用場景(面向大規模數據的存儲和分析),給數據設定檢查點代價較高,原因在於設定檢查點需要在集群節點之間進行大量數據的拷貝操作,而這種拷貝操作會受到集群頻寬的限制,而且頻寬是集群中的稀缺資源,拷貝操作會犧牲大量的頻寬,而且拷貝也增加了集群節點的存儲負載。
考慮到上述情況,DPark 使用記錄更新的方法實現容錯,但是更新所有記錄的方法也需要較高的代價,所以RDD 僅支持粗顆粒度變換,即僅記錄在單個分片上的單個操作,然後創建某個RDD 的變換序列存儲下來,即保存“血統”信息,當出現數據丟失的情況時,可以根據保存的“血統”信息重新構建數據,以此來達到數據容錯的功能。
當時由於RDD 在計算過程中需要進行多次變換,導致變換的序列很長,因此根據這些很長的變換序列恢復丟失的數據需要很長的時間,所以DPark 設計者建議在變換序列很長的情況下,適當地建立一些數據檢測點以加快實現容錯。DPark 目前還沒有實現自動判斷是否需要建立檢查點的機制,用戶可以通過saveAsTextFile 方法來手動設定數據檢查點。據DPark 網站描述,DPark 會在後來的升級版本中添加這一機制,實現更加自動化、更加友好的容錯機制。
5. 內部設計機制
在DPark 設計中,為了將來使RDD 支持更多類型的變換,而不用改變現存的任務調度機制,而且也為了保持RDD 的基於“血統”的容錯機制,RDD被設計了幾個通用接口,具體來講,每個RDD 必須包含如下4 方面的信息。
● 數據源分割後的數據分片信息,即原始碼中的split 變數。
● “血統”信息,即該RDD 所依賴的父RDD 信息以及二者之間的關係信息,即原始碼中的dependence 變數。
● 計算函式,即該RDD 如何從父RDD 經過計算轉變得來,即原始碼中的iterator(split)和compute 函式。
● 如何對數據進行分片和分片保持位置相關的元數據信息,即原始碼中的partitioner 和preferredLocations。
下面進行舉例說明。
例如,從分散式檔案系統中的檔案轉換得到的RDD,這些RDD 中的數據分片通過對源數據檔案進行分割後得到的,它們沒有父RDD,這些RDD 的計算函式只是讀取檔案中的每一行,然後返回給子RDD,對於通過map 函式轉換得到的RDD,會具有和父RDD 相同的數據分片。
上面所列的RDD 的四個通用信息如何表達父RDD 和子RDD 之間的關係是DPark 和Spark 必須要考慮的事情,在Spark 實現中,將這種依賴關係劃分為兩種類型:窄依賴和寬依賴。窄依賴是指子RDD 的每個數據分片只對父RDD 中的有限個數據分片有依賴關係,而且依賴的數量在規模上要和父RDD 分片數量差別很大。寬依賴是指子RDD 中的每個數據分片都可以對父RDD中的每個數據分片有依賴關係。例如,如圖2-20 左側所示,對於map,filter 變換操作就屬於窄依賴,由map 變換產生的子RDD 中的某個數據分片只對父RDD相應的數據分片有依賴,這就是一種窄依賴關係。但是對於groupByKey 變換操作,由於每個子RDD 中的數據分片對父RDD 中的所有數據分片都有依賴,所以這是一種寬依賴關係。
DPark 和Spark 對父子RDD 依賴關係進行分類的特性,主要是為了針對不同的依賴類型使用不同的任務調度機制和數據容錯機制,從而更加高效地進行計算。對於窄依賴關係,可以在計算節點上根據父RDD 中的數據分片計算得到子RDD 中相應的數據分片。對於寬依賴,意味著子RDD 中的數據分片的計算需要在父RDD 中的所有數據分片計算完成的情況下才可以進行。而且對於窄依賴來說,數據丟失或者出錯所需要的恢復時間要比寬依賴少很多,因為對於窄依賴來說,只有丟失的那些數據分片需要重新計算,而對於寬依賴,則需要對丟失的數據分片的所有祖先RDD 重新計算一遍。所以,DPark 和Spark 建議,對於有長“血統”鏈特別是存在寬依賴的情況下,需要在適當的時間設定一個數據檢查點來避免過長的數據恢復時間。
6. 任務調度機制
在DPark 實現中,設計者試圖利用RDD 的特性為所有的RDD 操作找到一種最有效的執行策略,任務調度器提供一個runJob 接口給RDD 使用,該接口接收三個參數,分別是RDD 對象本身、感興趣的部分數據分片和數據分片上的操作函式。當RDD 需要執行一個操作的時候,例如,對RDD 執行count、collect 等操作,DPark 就會調用runJob 接口來執行並行計算。
從總體來講,DPark 的任務調度機制很像Dryad 的任務調度方法,其區別在於DPark 在進行任務調度的時候會考慮RDD 的哪些數據分片需要快取在集群的哪些計算節點上,選擇的過程是這樣實現的,首先,RDD 根據自身攜帶的“血統”序列信息創建出一些階段stage,每個階段會儘可能多地包含可以連續運行的變換,即基於窄依賴的變換,一個stage 的邊界是那些需要在節點之間移動數據的寬依賴變換,或者是那些已經被快取的RDD。在如圖1 所示的整體計算的示例中表示了階段的分割,只有父階段完成了計算,子階段的計算才能開始,運行的時候每個數據分片分配一個任務,沒有父子關係的階段之間可以並行計算。圖1 中的Stage 1 和Stage 2 就可以並行計算,Stage 3 必須在Stage 1 和Stage2 計算完成後才能開始。
在進行具體任務分配的時候,調度器會根據數據本地化的原則來分配計算任務到計算節點,例如,某個計算任務需要訪問一個已經被快取的數據分片,那么調度器就將該任務分配給快取有該數據分片的計算節點來執行,這種調度策略可以最大程度地減少集群網路頻寬的占用,降低集群節點通信代價。
如果一個計算任務執行失敗,但是它所在階段的父階段的數據沒有丟失,那么調度器可以調度該任務到其他計算節點上重新運行。如果它所在的父階段已經不可用,那么調度器需要重新提交父階段中的所有需要重新計算的任務。
7. 共享變數的實現機制
共享變數是DPark 中除了RDD 外的另外一個重要的概念,目前在DPark中支持兩種類型的共享變數:廣播變數和累加器。下面簡單介紹兩種共享變數的實現機制。
(1)廣播變數的實現
在 DPark 中,當在程式中調用RDD 上的map、filter 等變換操作時,會傳遞一個變換函式給這些變換操作,DPark 在實際運行該函式的時候需要將該函式所需要的閉包序列化後通過網路傳送到計算節點上,在計算節點上計算函式就可以調用函式域的變數、函式等。但是如果閉包中的函式需要訪問一個較大的數據集,那么序列化閉包並在集群中傳送的方式所花費的代價就很高。考慮到這種情況,DPark 支持使用廣播變數,從而使用戶可以一次性將信息傳送到所有集群的計算節點上。目前實現的算法有基於檔案系統的廣播算法和樹形廣播算法。
在分散式檔案系統廣播算法中,當用戶為變數v 創建一個廣播變數bv 的時候,DPark 會為其創建一個唯一的廣播ID,然後將變數v 序列化後存儲在檔案系統中的一個檔案里。當傳遞閉包的時候,DPark 會將bv 而不是v 序列化後傳遞過去,而廣播變數bv 的序列化結果就是廣播ID。當用戶使用bv.value 中v的值時候,DPark 會在快取中檢查v 是否存在,如果存在,直接使用,如果不存在,就將對應的檔案反序列化後提供給計算節點使用。
在樹形廣播算法中,當用戶為變數v 創建一個廣播變數bv 時,DPark 也會給該廣播變數創建一個唯一的廣播ID,但是不會將v 序列化後存放到檔案系統的某一個檔案中,而是利用zeromq 在master 上綁定一個連線埠,當worker 需要讀取真實數據的時候,如果本地快取中沒有該數據,就訪問master 節點請求數據,當一個worker 獲取了數據,就可以將該數據傳播給其他worker,其傳播過程是一個N 叉樹形。
兩種算法中,一般來說,樹形廣播算法會更快一些,因為樹形廣播算法只有記憶體和網路I/O 操作,而分散式檔案系統算法不僅有記憶體和網路I/O 操作,還有磁碟操作,所以相對來說,速度會慢一些。在Spark 實現中,除了上述兩種廣播算法,還支持P2P 的廣播算法,但是DPark 目前還沒有實現該算法。
(2)累加器的實現
在 DPark 中創建累加器的時候,需要提供accumulator.AccumulatorParam這樣一個對象。該對象中有兩個值,分別代表累加器的0 值和加法操作定義。例如,對於數值型累加器,0 值就是數值0,加法定義就是普通的數值加法。但是對於列表型累加器,0 值就是空的[ ],加法就是extend 函式。當累加器被創建的時候,DPark 會給累加器創建一個唯一的ID,當累加器隨著計算任務被分配到計算節點上,在節點上會創建值為0 的累加器的副本,在計算節點上對累加器進行的任何操作都是在該副本上的操作,不會傳遞到主程式中的累加器上。當任務完成返回後,DPark 調度器會將這個副本累加器和主程式中的累加器進行合併,在所有的計算任務完成後,才能得到最終累加器的正確值。