MapReduce計算框架下的可迭代式數據處理方法
2023-05-04 09:39:31
MapReduce計算框架下的可迭代式數據處理方法
【專利摘要】本發明提出一種MapReduce計算框架下的可迭代式數據處理方法,包括以下步驟:S10、讀取原始數據,將原始數據解析成獨立數據項;S20、採用Shuffle?Grouping機制將所述輸入數據分發至各線程或進程進行處理;S30、對數據進行哈希重組、排序,並採用Fields?Grouping機制將排序後數據分發至各線程或進程;S40、各線程或進程實時對緩衝池中數據進行排序、分組;S50、將數據發送至線程或進程進行處理;S60、將返回的計算結果解析成獨立數據項,重複步驟S20至S50,直至發出表示停止迭代的數據項。本發明可保持MapReduce的計算性能不因迭代而受到影響,也降低了虛擬機的創建和銷毀的開銷。
【專利說明】MapReduce計算框架下的可迭代式數據處理方法
【【技術領域】】
[0001]本發明涉及一種MapReduce計算框架下的可迭代式數據處理方法。
【【背景技術】】
[0002]在大數據時代,數據量成爆炸式增長,這對數據的計算處理提出了極高的要求。Hadoop生態圈的提出為海量數據的大規模計算和分布式可靠存儲提供了一個強大的工具。在Hadoop中,MapReduce是一個為海量數據計算提供可靠、易用、可規模化的關鍵組件,所基於的MapReduce計算框架對於許多數據分析和計算方法十分友好,這使得MapReduce計算框架在海量數據分析中具有廣泛的應用。然而,在現實使用中,MapReduce計算框架下迭代計算執行受到了 Hadoop生態圈實現的制約,從而導致迭代計算性能受到影響。
[0003]在MapReduce計算框架下,數據集MassiveDataSet會被劃分為若干個數據塊,然後每個Map處理一個數據塊,並輸出一個由key-value對組成的隊列,在shuffle階段,會對所有key-value對執行哈希重組和根據key排序,組成key_value_list對,然後在Reduce階段,每個key_value_list對都會被單獨處理並輸出結果。
[0004]MapReduce計算框架下迭代計算受到以下限制:(I)兩個MapReduce任務之間的中間數據必須被寫回Hadoop Distributed File System中,因此導致性能受到損失。(2)Map和Reduce自身無法迭代執行,如果要求迭代計算的話,需要串聯兩個MapReduce,導致Java虛擬機的創建和銷毀開銷,影響性能。為解決以上問題,現有技術採用串聯多個MapReduce任務,然而依然存在如下缺點: (I)兩個MapReduce任務之間的中間數據必須被寫回HadoopDistributed FileSystem (HDFS)中,因此導致性能損失。(2)Map和Reduce自身無法迭代執行,如果要求迭代計算的話,需要串聯兩個MapReduce,導致Java虛擬機的創建和銷毀開銷,影響性能。
【
【發明內容】
】
[0005]本發明旨在解決上述現有技術中存在的問題,提出一種MapReduce計算框架下的可迭代式數據處理方法。
[0006]本發明提出的MapReduce計算框架下的可迭代式數據處理方法,包括以下步驟:S10、ReadNode從Hadoop分布式文件系統中讀取原始數據,並將所述原始數據解析成獨立數據項,以所述獨立數據項作為MapNode的輸入數據;S20、MapNode採用ShuffIe Grouping機制將所述輸入數據分發至MapNode的各線程或進程進行處理,對於每一所述獨立數據項輸出〈key, value〉格式數據;S30、ShuffIeNode對〈key, value〉進行哈希重組、基於key值執行排序,並採用FieldsGrouping機制將排序後的〈key, value〉分發至ShuffleNode的各線程或進程;S40、ShuffleNode的各線程或進程實時將〈key, value〉存入本地KVlist緩衝池,直至收到表示數據發送完畢的〈key, value),基於key值對KVlist緩衝池中的〈key, value〉進行排序、分組,對每一分組輸出{i,〈key, value_list>}格式數據,其中,i為當前線程或進程編號;S50、ReduceNode將{i,〈key, value_list>}發送至其第i個線程或進程進行處理,輸出〈key』 , value』 > ;S60>CoordinateNode 接收並緩衝〈key』 , value』 > 直至接收到表示發送完畢的數據項,CoordinateNode將基於〈key』,value』 >的計算結果返回至ReadNode, ReadNode將所述計算結果解析成獨立數據項,重複步驟S20至S50進行迭代,直至ReduceNode發出表示停止迭代的數據項,CoordinateNode退出。
[0007]本發明提出的MapReduce計算框架下的可迭代式數據處理方法基於流式計算實現可迭代MapReduce計算框架,可保持MapReduce的計算性能不會因為迭代而受到影響。該方法使中間數據不用寫回分布式文件系統,也避免java虛擬機的創建和銷毀的開銷,並可以支持更靈活和更高效的數據分析和處理算法的實現。
【【專利附圖】
【附圖說明】】
[0008]圖1為本發明提出的的MapReduce計算框架下的可迭代式數據處理方法流程圖。
[0009]圖2為為本發明實施例之一的MapReduce計算框架下的可迭代式數據處理方法拓撲結構圖。
[0010]圖3為為本發明實施例之二的MapReduce計算框架下的可迭代式數據處理方法拓撲結構圖。
[0011]圖4為為本發明實施例之三的MapReduce計算框架下的可迭代式數據處理方法拓撲結構圖。 【【具體實施方式】】
[0012]下面結合具體實施例及附圖對本發明作進一步詳細說明。下面詳細描述本發明的實施例,所述實施例的示例在附圖中示出,其中自始至終相同或類似的標號表示相同或類似的元件或具有相同或類似功能的元件。下面通過參考附圖描述的實施例是示例性的,僅用於解釋本發明的技術方案,而不應當理解為對本發明的限制。
[0013]在本發明的描述中,術語「內」、「外」、「縱向」、「橫向」、「上」、「下」、「頂」、「底」等指
示的方位或位置關係為基於附圖所示的方位或位置關係,僅是為了便於描述本發明而不是要求本發明必須以特定的方位構造和操作,因此不應當理解為對本發明的限制。
[0014]本發明提供一種MapReduce計算框架下的可迭代式數據處理方法。如圖1所示,該方法包括以下步驟:S10、ReadNode從Hadoop分布式文件系統中讀取原始數據,並將所述原始數據解析成獨立數據項,以所述獨立數據項作為MapNode的輸入數據;S20、MapNode採用Shuffle Grouping機制將所述輸入數據分發至MapNode的各線程或進程進行處理,對於每一所述獨立數據項輸出〈key, value〉格式數據;S30、ShuffleNode對〈key, value〉進行哈希重組、基於key值執行排序,並採用Fields Grouping機制將排序後的〈key, value〉分發至ShuffleNode的各線程或進程;S40、Shuff IeNode的各線程或進程實時將〈key, value〉存入本地KVlist緩衝池,直至收到表示數據發送完畢的〈key, value),基於key值對KVlist緩衝池中的〈key, value〉進行排序、分組,對每一分組輸出{i,〈key, value_list>}格式數據,其中,i為當前線程或進程編號;S50、ReduceNode將{i, }發送至其第i個線程或進程進行處理,輸出〈key』 , value』 > ;S60> CoordinateNode接收並緩衝〈key』,value』 >直至接收到表示發送完畢的數據項,CoordinateNode將基於〈key』,value』 >的計算結果返回至ReadNode, ReadNode將所述計算結果解析成獨立數據項,重複步驟S20至S50進行迭代,直至ReduceNode發出表示停止迭代的數據項,CoordinateNode 退出。
[0015]具體地,可一併參照圖2。本發明提出的MapReduce計算框架下的可迭代式數據處理方法基於流式計算,即使用流式計算實現MapReduce計算框架的Map階段、Shuffle階段、Reduce階段並使用流式計算實現迭代機制。
[0016]本發明提出的MapReduce計算框架下的可迭代式數據處理方法的整個拓撲結構由五種節點構成:ReadNode, MapNode, ShuffleNode, ReduceNode, CoordinateNode。ReadNode負責從分布式文件系統(Hadoop Distributed FileSystem, HDFS)中讀取原始數據,並解析成一個個獨立的數據項輸入拓撲結構中;MapNode負責實現MapReduce計算框架的Map階段,該節點的線程或進程數量決定了 Map數量;ShuffleNode負責實現MapReduce計算框架的Shuffle階段,該節點的線程或進程數量等於ReduceNode的線程或進程數量;ReduceNode負責實現MapReduce計算框架的Reduce階段,該節點的線程或進程數量決定了Reduce數量。CoordinateNode負責執行迭代時的數據收集和數據同步。
[0017]其中,MapNode用於實現MapReduce計算框架中的Map階段,MapNode接收ReadNode輸出的數據項,優選地,MapNode中的線程或進程數就是Map數量,這與「Hadoop的MapReduce計算框架的Map數量由數據塊數量決定」不一樣;由於數據集中的數據項之間沒有關聯性,所以為了平衡各個線程或進程的計算負載,MapNode使用shuffIeGrouping機制來分發其所接收到的數據給各個線程或進程,對於每個數據項,MapNode執行計算並輸出一個〈key, value〉;當MapNode接收到一個特定表示數據集發送完畢的數據項時,貝U輸出一個特定表示數據發送完畢的〈key, value) ;ShuffleNode用於實現MapReduce計算框架中的Shuffle階段,ShuffleNode接收MapNode輸出的數據項,優選地,ShuffleNode中的線程或進程數等於ReduceNode的線程或者進程數,Shuff IeNode中一個線程的所有輸出都必須由ReduceNode中任一對應的線程接收;ShuffleNode負責對所接收到的〈key, value〉進行哈希重組,並且根據key執行排序,因此ShuffleNode使用FieldsGrouping來分發所接收到的〈key, value〉;ShuffleNode的每個線程或進程,對於接收到的一個〈key, value〉,先將其放入本地的一個緩衝池(KVlist),直到接收到一個特定表示所有數據發送完畢的〈key, value〉,當ShuffleNode的一個線程或進程接收到一個特定表示所有數據發送完畢的〈key, value)後,則基於key,先對KVlist中所有〈key, value)進行排序,然後再把所有〈key, value)分組,把相同key的〈key, value)歸到一組。對於一個分組,生成一個〈key, value_list>,最終輸出{i,〈key, value_list>},其中,i為當前線程或進程的編號;當一個線程處理完KVlist後,輸出一個特定表示數據發送完畢的數據項。
[0018]ReduceNode用於實現MapReduce計算框架中的Reduce階段,ReduceNode接收ShuffleNode輸出的數據項:對於{i,〈key, value_list>},則發送到ReduceNode的第i個線程或進程
[0019]對於ReduceNode的一個線程或進程來說,每接收到一個{i,〈key, value_list>},則處理一個〈key, value_list>並輸出以〈key』 , value』 >格式輸出結果。
[0020]CoordinateNode負責迭代機制的數據緩衝、數據同步和數據計算。CoordinateNode對於接收到的數據分配給其內部各個線程或進程的機制由具體應用而定。
[0021]當一個節點Nodei需要執行迭代操作時,Nodei先把數據項發送到CoordinateNode中,CoordinateNode會接收並緩衝所有數據項直到接收到一個特定表示發送完畢的數據項,然後Coordinate會基於所接收到的數據項執行計算,並把計算結果返回給Nodei,直到Nodei發出一個特定表示停止迭代的數據項時,CoordinateNode將退出。
[0022]其中圖2所示為:CoordinateNode接收並緩衝〈key』 , value』 >直至接收到表示發送完畢的數據項,CoordinateNode將基於〈key』,value』 >的計算結果返回至ReadNode, ReadNode將所述計算結果解析成獨立數據項,重複步驟S20至S50進行迭代,直至ReduceNode發出表示停止迭代的數據項,CoordinateNode退出
[0023]圖3所示為在執行步驟S20之後,CoordinateNode接收並緩衝〈key, value〉直至接收到表示發送完畢的數據項,CoordinateNode將基於〈key, value)的計算結果返回至MapNode,再以該計算結果作為MapNode的輸入數據重新執行步驟S20。
[0024]圖4所示為在執行步驟S40之後,CoordinateNode接收並緩衝{i,〈key, value_list〉}直至接收到表示發送完畢的數據項,CoordinateNode將基於{i,〈key, value_list〉}的計算結果返回至ShuffleNode,再以該計算結果作為ShuffleNode的輸入數據重新執行步驟S30以及S40。
[0025]對於圖3所描述的實施例,舉例而言,對於一個數據集Set,首先ReadNode會接收並解析成一個個數據項DataEntryi,然後發送給MapNode ;MapNode把接收到的數據項按照ShuffleGrouping的方式分發給其內部的各個線程,對於MapNode中的一個線程來說,處理一個數據項,輸出{type,〈key, value〉},其中type為一個4位的標識符,描述了是否需要迭代,數據是否發送完畢等額外信息。
[0026]CoordinateNodel和ShuffleNode都會收到MapNode的輸出,如果輸入的數據項中type表示需要迭代,則ShuffleNode會忽略所接收到的數據項。CoordinateNode將會接收數據項並緩存到一個數組中,直到接收到一個特定表示數據發送完畢。當數據接收完畢後,CoordinateNode開始處理數據項數組並輸出一個〈key, value)描述處理結果,MapNode接收CoordinateNode的輸出,並再次進行處理。
[0027]迭代終止時,MapNode需要在輸出的{type,〈key, value〉}的type中設置相應位以表示迭代結束。
[0028]ShuffleNode接收MapNode的輸出,同樣首先檢查type,如果type表示迭代終止,則ShuffleNode開始接收數據項{type,〈key, value〉}並緩存到一個數組中,直到接收到一個數據項,其type表示數據發送完畢。當數據接收完畢後,ShufTleNode首先基於key,對數據項數組中所有的〈key, value)進行排序,然後進行分組,把相同key的〈key, value)放到一個分組中,然後對於每個分組生成〈key, value_list> (value_list為一個value鍊表),然後輸出!type』,〈key, value_list>}。
[0029]ReduceNode 接收到 ShuffleNode 的輸出的數據項{type,,〈key, value_list>},然後處理並以〈key』 , value』 >形式輸出結果。
[0030]CoordinateNode2 接收 ReduceNode 的輸出,和 CoordinateNodel — 樣,CoordinateNode2接收數據項並緩存到一個數組中,直到接收到一個特定表示數據發送完畢。當數據接收完畢後,CoordinateNode2開始處理數據項數組並以〈key, value〉形式描述處理結果並輸出。
[0031]ReadNode接收CoordinateNode2的輸出並向整個框架再次輸入需要處理的數據。[0032]本發明提出的MapReduce計算框架下的可迭代式數據處理方法基於流式計算實現可迭代MapReduce計算框架,可保持MapReduce的計算性能不會因為迭代而受到影響。該方法使中間數據不用寫回分布式文件系統,也避免java虛擬機的創建和銷毀的開銷,並可以支持更靈活和更高效的數據分析和處理算法的實現。
[0033]本發明提出的MapReduce計算框架下的可迭代式數據處理方法已通過使用Storm流式計算工具實現,實驗效果良好。
[0034]雖然本發明參照當前的較佳實施方式進行了描述,但本領域的技術人員應能理解,上述較佳實施方式僅用來解釋和說明本發明的技術方案,而並非用來限定本發明的保護範圍,任何在本發明的精神和原則範圍之內,所做的任何修飾、等效替換、變形、改進等,均應包含在本發明的權利要求保護範圍之內。
【權利要求】
1.一種MapReduce計算框架下的可迭代式數據處理方法,包括以下步驟: S10、ReadNode從Hadoop分布式文件系統中讀取原始數據,並將所述原始數據解析成獨立數據項,以所述獨立數據項作為MapNode的輸入數據; S20、MapNode採用Shuffle Grouping機制將所述輸入數據分發至MapNode的各線程或進程進行處理,對於每一所述獨立數據項輸出〈key,value〉格式數據; S30>ShuffleNode對〈key, value)進行哈希重組、基於key值執行排序,並採用FieldsGrouping機制將排序後的〈key, value〉分發至ShuffleNode的各線程或進程; S40、ShuffIeNode的各線程或進程實時將〈key, value〉存入本地KVlist緩衝池,直至收到表示數據發送完畢的〈key, value〉,基於key值對KVlist緩衝池中的〈key, value)進行排序、分組,對每一分組輸出{i,〈key,ValUe_liSt>}格式數據,其中,i為當前線程或進程編號; S50、ReduceNode將{i,〈key, value_list>}發送至其第i個線程或進程進行處理,輸出〈key,, value,> ; S60、CoordinateNode接收並緩衝〈key』,value』〉直至接收到表示發送完畢的數據項,CoordinateNode將基於〈key』 , value』 >的計算結果返回至ReadNode,ReadNode將所述計算結果解析成獨立數據項,重複步驟S20至S50進行迭代,直至ReduceNode發出表示停止迭代的數據項,CoordinateNode退出。
2.根據權利要求1所述的MapReduce計算框架下的可迭代式數據處理方法,其特徵在於,在執行步驟S20之後,CoordinateNode接收並緩衝〈key, value)直至接收到表示發送完畢的數據項,CoordinateNode將基於〈key, value〉的計算結果返回至MapNode ,再以該計算結果作為MapNode的輸入數據重新執行步驟S20。
3.根據權利要求1所述的MapReduce計算框架下的可迭代式數據處理方法,其特徵在於,在執行步驟S40之後,CoordinateNode接收並緩衝{i,〈key, value_list>}直至接收到表示發送完畢的數據項,CoordinateNode將基於{i,〈key, value_list>}的計算結果返回至ShuffleNode,再以該計算結果作為ShuffleNode的輸入數據重新執行步驟S30以及S40。
4.根據權利要求1所述的MapReduce計算框架下的可迭代式數據處理方法,其特徵在於,MapNode中的線程或進程數量即為Map數量。
5.根據權利要求1所述的MapReduce計算框架下的可迭代式數據處理方法,其特徵在於,當MapNode接收到表示所述獨立數據項發送完畢的數據項時,則輸出表示數據項發送完畢的〈key, value〉。
6.根據權利要求1所述的MapReduce計算框架下的可迭代式數據處理方法,其特徵在於,ShuffleNode中的線程或進程數與ReduceNode中的線程或進程數相等。
7.根據權利要求6所述的MapReduce計算框架下的可迭代式數據處理方法,其特徵在於,ShuffleNode中每一線程或進程的所有輸出均由ReduceNode中一線程或進程接收。
【文檔編號】G06F9/455GK103699442SQ201310686716
【公開日】2014年4月2日 申請日期:2013年12月12日 優先權日:2013年12月12日
【發明者】鄒瑜斌, 張帆, 須成忠 申請人:深圳先進技術研究院