用於多路數據流相關的可擴縮處理的系統和方法
2023-06-08 22:49:41 2
專利名稱::用於多路數據流相關的可擴縮處理的系統和方法
技術領域:
:本發明一般涉及一種改進的數據處理系統,並且特別地,涉及一種用於處理數據流的計算機實現的方法和裝置。更特別地,本發明涉及一種用於多路數據流相關的可擴縮處理的計算機實現的方法、裝置以及計算機可用程序代碼。
背景技術:
:流處理計算應用是滿足關於數據的一些限制的應用,在該應用中數據以信息流的形式進入系統。注意到正在處理的數據的容量可能太大而無法存儲;因此,信息流要求對動態數據流進行複雜的實時處理,例如傳感器數據分析以及網絡業務監控。流處理計算應用的例子包括視頻處理、音頻處理、流式資料庫以及傳感器網絡。在這些應用中,來自外部源的數據流流入數據流管理系統,其在此由不同的連續查詢算子處理。為了支持無界流,流處理系統將滑動窗與每個流相關聯。滑動窗含有在流上最近到達的數據項。窗口可以是基於時間的,例如在最後60秒中到達的視頻幀,或者是基於數字的,例如最後1000個視頻幀。最重要的連續查詢算子之一是在多個不同的數據流上的滑動窗連接。滑動窗連接的輸出含有滿足預先定義的連接判定(joinpredicate)的、並且在其相應的窗口中被同時顯現的相關元組的所有集合。一些實例應用包括在不同的新聞視頻流中搜索類似的圖像用於熱門話題檢測,以及在不同的網絡業務流中對源/目的地址進行相關用於入侵檢測。基於關鍵字的同等連接(equijoin)可能不太有效,因為很多流相關應用要求比關鍵字比較更複雜的連接判定。例如,在新聞視頻相關應用中,連接條件是兩個圖像的40維分類值之間的距離是否低於閾值。因而,不同流的相關數據意味著在不同流上找到滿足一個或多個預先定義的相關判定的那些數據。處理多路流連接的主要挑戰在於在多個大容量並且時變的數據流上實時地進行大量的連接比較。給定高流速和大窗口尺寸,加窗的流連接可能會有大的存儲需求。此外,諸如圖像比較的一些查詢處理還可以是中央處理器密集的。單個主機可能由於多路流連接工作負荷而易於過載。
發明內容本說明性實施例提供了一種用於處理多路流相關的計算機實現的方法、裝置以及計算機可用程序代碼。接收流數據用於相關。形成用於連續地將多路流相關工作負荷劃分成較小的工作負荷片的任務。所述較小的工作負荷片中的每一個均可以由單個主機來處理。將所述流數據發送至不同的主機用於相關處理。在所附權利要求中闡述了被認為是說明性實施例的特性的新穎的特徵。然而,當結合附圖閱讀時,通過參照以下對說明性實施例的具體描述,將最好地理解說明性實施例本身,以及使用的優選模式及其更多的目的和優勢,其中圖1是可以在其中實現說明性實施例的數據處理系統的網絡的圖形表示;圖2是可以在其中實現說明性實施例的數據處理系統的框圖;圖3描述了依照說明性實施例的流處理的例子;圖4是依照說明性實施例的滑動窗流連接算子模型的示圖;圖5是依照說明性實施例的分布式多路流連接執行模型的示圖;圖6A-6B是依照說明性實施例的相關感知元組路由方案的示圖;圖7是依照說明性實施例的對準元組路由模型的示圖;圖8是依照說明性實施例的約束元組路由模型的示圖;圖9是依照說明性實施例的集中式連接方法的流程圖;圖10是依照說明性實施例的分布式連接方法的流程圖;圖11是依照說明性實施例的對準元組路由方法的流程圖;圖12A-12B是依照說明性實施例的約束元組路由方法的流程圖;圖13是依照說明性實施例的多路流連接算法;圖14是依照說明性實施例的對準元組路由算法;以及圖15是依照說明性實施例的約束元組路由算法。具體實施例方式現參照附圖,並且特別參照圖1-2,提供了可以在其中實現說明性實施例的數據處理環境的示例圖。應當理解圖1-2僅是示例性的,並不旨在主張或暗示對關於可以在其中實現不同實施例的環境的任何限制。可以對所描述的環境進行很多修改。現參照附圖,圖1描述了可以在其中實現說明性實施例的數據處理系統的網絡的圖形表示。網絡數據處理系統100是可以在其中實現實施例的計算機網絡。網絡數據處理系統100含有網絡102,其是用於在網絡數據處理系統100內連在一起的各種設備和計算機之間提供通信鏈路的介質。網絡102可以包括諸如有線、無線通信鏈路或光纜的連接。在所描述的例子中,伺服器104和伺服器106與存儲單元108一起連接至網絡102。另外,客戶機110、112和114連接至網絡102。這些客戶機110、112和114可以是,例如個人計算機或網絡計算機。在所描述的例子中,伺服器104向客戶機110、112和114提供諸如引導文件、作業系統映像以及應用的數據。在該例中,客戶機110、112和114是伺服器104的客戶機。網絡數據處理系統100可以包括附加的伺服器、客戶機和未示出的其它設備。在所描述的例子中,網絡數據處理系統100是具有網絡102的網際網路,網絡102代表使用傳輸控制協議/網際協議(TCP/IP)協議組相互通信的全世界的網絡和網關的集合。處於網際網路核心的是主節點或主計算機之間的高速數據通信線路的主幹線,包括路由數據和消息的數千商業的、政府的、教育的和其它的計算機系統。當然,網絡數據處理系統100還可以實現為許多不同類型的網絡,舉例來說,例如內聯網、區域網(LAN)或廣域網(WAN)。圖1意在舉例,並不作為對不同實施例的結構限制。現參照圖2,示出了可以在其中實現說明性實施例的數據處理系統的框圖。數據處理系統200是諸如圖1中的伺服器104或客戶機110的計算機的例子,可以在其中針對說明性實施例設置實現過程的計算機可用代碼或指令。在所描述的例子中,數據處理系統200使用包括北橋和存儲控制器集線器(MCH)202以及南橋和輸入/輸出(I/O)控制器集線器(ICH)204的集線器體系結構。處理器206、主存儲器208以及圖形處理器210耦合於北橋和存儲控制器集線器202。舉例來說,圖形處理器210可以通過加速圖形埠(AGP)耦合於MCH。在所描述的例子中,區域網(LAN)適配器212耦合於南橋和I/O控制器集線器204,並且音頻適配器216、鍵盤和滑鼠適配器220、數據機222、只讀存儲器(ROM)224、通用串行總線(USB)埠和其它通信埠232,以及PCI/PCIe設備234通過總線238耦合於南橋和I/O控制器集線器204,並且硬磁碟驅動器(HDD)226和CD-ROM驅動器230通過總線240耦合於南橋和I/O控制器集線器204。PCI/PCIe設備可以包括,例如乙太網適配器、附加卡(add-incards)以及用於筆記本計算機的PC卡。PCI使用卡式總線控制器,而PCIe並不使用。ROM224可以是,例如閃速二進位輸入/輸出系統(BIOS)。硬磁碟驅動器226和CD-ROM驅動器230可以使用,例如集成驅動器電子電路(IDE)或串行高級技術配件(SATA)接口。超級I/O(SIO)設備236可以耦合於南橋和I/O控制器集線器204。作業系統在處理器206上運行,並且協調和提供圖2中數據處理系統200內的各種組件的控制。作業系統可以是市場上可獲得的作業系統,例如MicrosoftWindowsXP(Microsoft和Windows是微軟公司在美國、其它國家或二者的商標)。面向對象的編程系統,例如JavaTM編程系統,可以結合作業系統運行,並且從在數據處理系統200上執行的Java程序或應用向作業系統提供調用(Java和所有基於Java的商標是SunMicrosystems公司在美國、其它國家或二者的商標)。用於作業系統的指令、面向對象的編程系統以及應用或程序位於諸如硬磁碟驅動器226的存儲設備,並且可以被加載到主存儲器208給處理器206執行。說明性實施例的過程可以由處理器206使用計算機實現的指令來實現,舉例來說,其可以位於諸如主存儲器208、只讀存儲器224的存儲器中,或者位於一個或多個外圍設備中。圖1-2中的硬體可以取決於實現而變化。除了圖1-2中所描述的硬體之外,或者代替圖1-2中所描述的硬體,可以使用其它的內部硬體或外圍設備,例如閃速存儲器、等效非易失性存儲器或光碟驅動器等。此外,說明性實施例的過程可以應用於多處理器數據處理系統。在一些說明性的例子中,數據處理系統200可以是個人數字助理(PDA),其通常被配置具有閃速存儲器,以提供用於存儲作業系統文件和/或用戶產生的數據的非易失性存儲器。總線系統可以包括一個或多個總線,例如系統總線、I/O總線和PCI總線。當然,可以使用在依附於構造或體系結構的不同組件或設備之間提供數據傳輸的任何類型的通信構造或體系結構來實現總線系統。通信單元可以包括用於發送和接收數據的一個或多個設備,例如數據機或網絡適配器。舉例來說,存儲器可以是主存儲器208或者諸如可以在北橋和存儲控制器集線器202中找到的高速緩衝存儲器。處理單元可以包括一個或多個處理器或CPU。圖1-2中所描述的例子以及上述例子並不意味著暗示結構限制。例如,數據處理系統200除了採取PDA的形式之外,還可以是平板計算機(tabletcomputer)、膝上型計算機或電話設備。說明性實施例提供了基於對處理單元、原始流(primalstream)以及用戶對輸出數據的要求的形式描述而自動創建工作流的過程。該過程能夠快速適應新近可用的原始流、處理單元以及其它變化的參數、環境或條件,而不過度使系統資源有所負擔,並且沒有人工交互。此外,可以將工作流轉換成可以在Web服務執行環境中執行的格式。圖3描述了依照說明性實施例的流處理的例子。在該例中,當特定股票很可能要超過預定值的時候,將相關結果350傳遞給已經請求被通知的用戶。在一個例子中,相關結果是具有相關的股票價格變化的一組股票。在這些說明性例子中,原始流或廣播流包括貿易310、電視新聞320以及無線電330。在所描述的例子中,應用組件包括股票分析312、運動圖像專家組4(MPEG-4)多路分用器322、圖像分析324、語音到文本(speech-to-text)326、文本分析328、語音到文本332、文本分析334,以及多路流連接340。流處理應用可以由使用可用的原始流的現有應用組件組成,以便應用組件產生滿足用戶需求的結果。因而,股票分析312接收信息流(貿易310)並且向多路流連接340輸出結果。在該例中,MPEG-4多路分用器322接收廣播流(電視新聞320),並且向圖像分析324、文本分析328和語音到文本326輸出。而語音到文本326向文本分析328輸出。圖像分析324和文本分析328向多路流連接340輸出。語音到文本332接收原始流(無線電330),並且向文本分析334輸出。而文本分析334向多路流連接340輸出。多路流連接340以相關結果350的形式提供輸出。在一個實施例中,可以將流特性編碼為以流對象參數化的變數(fluent)和判定(predicate)。在編程中,判定是評估表達並且基於數據的條件提供真或假的回答的陳述。這些條件被表示為關於流特性的邏輯表達式。變數是比判定更一般的函數。變數可以從不同於判定的布爾域的域取值。在文獻中變數也被稱為函數。將組件描述編碼為以輸入和輸出流對象參數化的動作。動作的前提由輸入流上轉換的輸入埠要求組成,並且動作效應利用與輸出埠關聯的轉換公式計算輸出流對象的特性。通過基於共享對應於埠的例示動作參數之間的流對象來標識輸入-輸出埠連接,然後將由規劃系統產生的作為動作序列的計劃轉換成工作流。說明性實施例提供了一種用於處理多路流連接的可擴縮的分布式解決方案的計算機實現的方法、裝置和計算機可用程序代碼。由於很多流相關應用比關鍵字比較要求更複雜的連接判定,因此考慮諸如同等連接和非同等連接的一般流連接。說明性實施例針對能夠分布式執行多路流連接的相關感知元組路由框架。該分布方案可以觀察滑動窗連接語義。滑動窗連接的輸出含有滿足預先定義的連接判定的、並且在其相應的窗口中被同時顯現的相關元組的所有集合。由於相關約束,對於維持滑動窗連接語義來說,分布開銷是不可避免的。因此,說明性實施例針對在具有最小分布開銷的一組分布式主機中多路流連接算子的工作負荷的最優分布。分布開銷指的是使用或過度使用系統資源。例如,分布開銷可以包括所消耗的處理器時間、存儲空間以及處理輸入數據流所需要的網絡帶寬。在一個說明性實施例中,對準元組路由(ATR)方法或方案使用流劃分實現多路流連接的分布式執行。對準元組路由動態地選擇最快的輸入流作為主流,並且將其它流的元組路由與主流對準,以符合相關約束。對準元組路由將輸入流分成被路由至不同主機用於連接處理的分段。分段是流的一部分或片段。連接處理也被稱為相關處理。對準元組路由可以用於保持連接語義。此外,對準元組路由的開銷獨立於主機的數目。相反,對準元組路由開銷僅與滑動窗大小以及從流的速率有關。因此,對準元組路由適於從流具有低速率並且滑動窗大小不是很大的情況。在另一說明性實施例中,約束元組路由(CTR)方案在相關約束下分別路由不同流的元組。約束元組路由使用流劃分和算子劃分二者來分布多路流連接算子。與對準元組路由不同,約束元組路由允許將很大的多路連接劃分成由不同主機執行的一組較小的多路連接。約束元組路由方法、過程和算法針對的是利用輕負荷主機的最小集合覆蓋相關元組的問題。約束元組路由用於保持連接語義,並且具有獨立於主機數目的開銷。不同於對準元組路由,約束元組路由的開銷獨立於滑動窗大小,這使得約束元組路由更適於具有大滑動窗規格的連接算子。為支持連續流,流處理系統將滑動窗與每個流相關聯。窗口含有稱為元組的流上最近到達的數據項。元組是一組值或流數據。流中的數據也稱為流數據,並且其是以比特、字、數字或形成於一個或多個數據流的其它流式數據的形式所接收的信息。窗口可以是基於時間的或基於元組的。基於時間的窗口可以是,例如,在最後10分鐘內到達的元組,而基於元組的窗口可以是,例如,最後1000個元組。重要的連續查詢算子之一是兩個流(流S1和流S2)之間的滑動窗連接。該窗口連接的輸出含有來自於流S1和流S2的每對這樣的元組,即其滿足連接判定並且在其相應的窗口中被同時顯現。連接判定是關於兩個元組之間的一個或多個共同屬性的比較函數。基本連接判定是兩個元組s1和s2之間關於共同屬性A的相等比較,表示為s1·A=s2·A。然而,可以將說明性方案應用於任何一般的連接判定。滑動窗連接具有很多應用。例如,考慮兩個流,其中一個流含有電話呼叫記錄,而另一個流含有股票交易記錄。可以使用這樣的滑動窗連接來產生交易欺詐警報,即該滑動窗連接運作以便在可疑電話呼叫與異常交易記錄之間關於共同屬性「貿易標識符」進行相關或連接。圖4是依照說明性實施例的滑動窗流連接算子模型的示圖。圖4用於描述滑動窗連接的語義,並且給出集中式加窗連接處理過程。可以在諸如圖1的伺服器104和106或客戶機110、112和114的伺服器或客戶機中實現多路流連接算子400。多路流連接算子400包括表示為Si的各種數據流,包括流1402、流2404、流3406和流4408。數據流由表示為si∈Si的一連串元組或數據項組成。每個流可以具有可變的數據到達速率。例如,ri表示流Si在當前時間周期的平均到達速率。在動態流環境中,平均流速率ri可以隨時間變化。在該例中,每個元組si∈Si攜帶時間戳si·t來表示元組si到達流Si的時間。諸如Si[t1,t2]的語言表示在時間[t1,t2]期間到達流Si的所有元組。為了處理無限數據流,將滑動窗與每個流相關聯,用於將連接處理的範圍限制在新近到達的元組。例如,Si[Wi]表示流Si上的滑動窗,其中Wi表示單位時間的窗口長度。在時間t,如果si在時間間隔[t-Wi,t]內到達Si,則si屬於Si[Wi]。因此,可以將Si[Wi]認為是Si[t-Wi,t]的縮寫。將n,n≥2個輸入流中的多路流連接算子400表示為多路流連接算子400的輸出包括所有的元組群(s1,s2,...,sn),以便在時間si·t,si∈Si,sk∈Sk[Wk],1≤k≤n,k≠i,s1,...,sn滿足預先定義的連接判定θ(s1,...,sn)。連接判定可以是諸如多維空間中的距離函數的任何一般的函數。例如,在圖4中,考慮連接結果,包括在時間8到達的流3406的元組s3410,表示為s38410。sit表示在時間t到達Si的元組。多路流連接算子400將流3406的s38410與在時間8的包括在滑動窗S1[W1]、S2[W2]和S4[W4]中的所有元組進行比較。例如,在多路流連接算子400中,元組s38410需要首先與S4[4,7]412中的元組連接。圖5是依照說明性實施例的分布式多路流連接執行模型的示圖。多路流連接經常是資源密集的。例如,多路流連接可以具有對緩衝所有滑動窗中的元組的大存儲需求,以及對多個輸入流中的大量連接探測的快速處理的需求。單個主機可能由於多路流連接查詢處理而易於過載。用於處理多路流連接的可擴縮的分布式流處理系統500針對這些問題。可擴縮的分布式流處理系統500包括擴散算子502OPd、融合算子504OPf,以及通過高速網絡508而連接的一群伺服器主機506V1。該伺服器主機群集506可以包括伺服器,例如通過諸如圖1的網絡102的網絡而互聯的伺服器104和106。融合算子504和擴散算子502是可以在諸如圖2的數據處理系統200的通用計算機上實現的軟體模塊。擴散算子502可以將多路流連接的工作負荷加速到多個分布式主機上。融合算子504將分散的連接結果融為完整的相關結果,例如圖3的相關結果350。擴散算子502將來自於輸入流512的輸入元組510動態路由至不同的伺服器主機506用於連接處理,而融合算子504可以將分散的連接結果508融為完整的查詢應答514。與連接算子不同,擴散算子502進行簡單的元組路由計算,並且要求對輸入流512的小緩衝。擴散算子502的處理時間常常比連接計算的處理時間少幾個以上的數量級。因而,擴散算子502不是可擴縮的分布式流處理系統500的瓶頸。對分布式連接執行的一個基本要求是保持連接結果的正確性。元組路由方案不應當遺漏任何的連接結果或產生重複的連接結果。然而,強制(brute-force)元組路由方案可能違犯多流相關約束或相關約束。在先前的例子中,在圖4的多路流連接算子400中,元組s38410需要首先與S4[4,7]412中的元組連接。通過分布式執行方案可以將圖4的S4[4,7]412中的元組分散到諸如伺服器主機506的不同主機上。假設元組s44;s45;s46位於v1506上,並且元組s45;s46;s47位於v2506上。如果將s38發送至v1506或v2506,則遺漏了一些連接結果。如果將s38發送至v1506和v2506二者,則可能產生重複的連接結果。為了保持滑動窗連接語義,必須仔細設計元組路由方案以滿足相關約束定義1給定n路連接算子任何(s1,s2,...,sn),si∈Si,1≤k≤n,其必須一次且僅一次由出現於相同主機上的Ji相關。在示出分布式多路連接執行需要複製元組或路由中間連接結果以符合相關約束的證明中,令來自於n個輸入流的元組表示需要被相關的輸入流。給定相反的假設,即既未將元組複製於多個主機上,也未將中間結果路由經過不同的主機。考慮每兩個連續的元組siti和sjtj,0≤tj-ti≤Wi,如果i≠j,那麼由於sjtj需要相關siti,因此需要將sjtj路由至與siti的相同的主機上。如果i=j,則由於sjtj需要和與siti連接的那些元組連接,因此也需要將sjtj路由至與siti的相同的主機上。然後,將n個輸入流的所有相關的元組都路由至相同的主機,這就成為集中式連接執行。因而,分布式連接執行需要在不同的主機之間複製元組或路由中間結果,這被稱為擴散開銷。這些開銷元組和路由操作可能消耗處理器時間、存儲空間以及系統中的網絡帶寬。因此,擴散算子的目標是在相關約束下實現最優的分布式連接執行。相關約束在形式上定義如下定義2給定連接算子和m個主機{v1,...,vm},將每個元組最優地路由至一個或多個主機,以便(1)滿足相關約束,(2)最優地平衡不同主機的工作負荷,以及(3)最小化擴散開銷。圖6A-6B是依照說明性實施例的相關感知元組路由方案的示圖。圖6A-6B表示用於多路流連接算子的可擴縮處理的一組相關感知元組路由方案。圖6A-6B的方案允許單個連接算子利用多個主機602的資源,或者允許多個連接算子以細粒度共享多個主機602的資源。粒度指示工作負荷劃分的大小。較細粒度意味著可以將相關工作負荷分成可以分布於不同主機上的較小的片。主機可以是多個互聯的伺服器,例如圖1的伺服器104和106。任務是計算機系統承擔以實現預定目標的過程。多路流相關工作負荷是計算機資源的總量,例如處理需求、存儲器,以及在多個輸入流上進行相關處理所需要的帶寬。連續優化過程是連續地調整計算機系統的操作,以在動態計算環境中實現最優性能,從而解決連續優化問題。諸如相關處理的連接處理包括計算機系統需要承擔以便將一個流的元組與所有其它流的元組相關的一組操作。在高級別,該組相關感知元組路由方案以二維完成分布式多路流連接執行(1)流劃分將輸入流分成被路由至不同主機602的分段;以及(2)算子劃分將多路連接算子分成利用不同主機602上處理的相關來計算的分算子。圖6A是單獨進行流劃分的稱為對準元組路由(ATR)的簡單相關感知元組路由方案。圖6A描述了使用四個主機602執行三路連接算子的對準元組路由方案600的分布快照(distributionsnapshot)。為了符合相關約束,對準元組路由方案600調整所有輸入流的元組路由,並且複製流劃分的邊界周圍的元組子集。對準元組路由方案600是不經過不同主機602路由中間結果的單跳(one-hop)路由過程。每個主機602僅僅只是在所有輸入元組的子集上進行整個連接計算。例如,在圖6A中,主機602v1,v2,v3和v4分別在於時間[t,t+T]、[t+T,t+2T]、[t+2T,t+3T]和[t+3T,t+4T]內到達的元組上執行連接計算。然而,如果多路連接算子將很多輸入流與大的滑動窗相關,則對準元組路由的分布粒度可能會太粗糙,導致低效率的資源利用和大的複製開銷。稱為約束元組路由(CTR)方案604的圖6B的第二元組路由方案探究流劃分和算子劃分二者。不同於對準元組路由方案600,約束元組路由方案604通過允許經過不同的主機602路由中間連接結果606,可以分離多個主機602中的多路連接計算的執行。因此,約束元組路由方案604可以減少複製開銷,因為對準元組路由系統不必保證在第一路由跳轉中所有相關的元組都位於相同的主機上。例如,在圖6B中,約束元組路由方案604首先連接主機v1上的元組S1和S2,並且然後將中間結果路由至主機v2以連接第三個流S3。相比於對準元組路由方案600,約束元組路由方案604具有路由中間結果606的額外開銷,但卻可以以較細粒度實現較好的負荷分布。在圖6B中,約束元組路由方案604通過允許將一個多路連接計算分成可以在不同主機上執行的多個兩路連接來進行算子分割。下面在圖11和圖14中進一步解釋了對準元組路由方案、方法和算法的細節。下面在圖12和圖15中進一步解釋了約束元組路由方案、方法和算法的細節。圖7是依照說明性實施例的對準元組路由模型的示圖。圖7給出了諸如圖6A的對準元組路由方案600的對準元組路由方案的設計細節。對準元組路由模型700同時相互配合地路由連接算子的輸入流的元組。對準元組路由模型700使用流劃分實現分布式流連接執行。對準元組路由模型700包括主流702、從流704和從流706。V1指示主機,W1指示滑動窗大小,並且T指示分段長度。主流702是具有最高數據流速率的流。當主流702的速率變的慢於從流之一的時候,對準元組路由採用轉換階段(transitionphase)來改變主流702。對準元組路由是一種用於解決連續優化問題的方案。對準元組路由動態地選擇一個輸入流作為主流,並且根據時間戳將所有其它輸入流的元組與主流對準。相比之下,為了符合相關約束,基於對主流的劃分將其它的流劃分成重疊的分段,其被稱為從流704和706。通常,對準元組路由連續地將所有輸入流分成不同的分段,每個分段含有在特定時間周期內到達的元組。基於相關約束將從流704和706的分段與主流702的分段對準。將屬於已對準的分段的元組路由至相同的主機用於產生連接結果。圖8是依照說明性實施例的約束元組路由模型的示圖。圖8給出了包括諸如圖6B的約束元組路由方案604的約束元組路由方案的設計細節的約束元組路由(CTR)模型800。約束元組路由模型800是一種獨立地路由不同流的元組的方案。對於流1802、流2804和流3806的每個輸入流來說,約束元組路由模型800基於所有先前的相關元組的布局進行路由判定。約束元組路由是另一種用於解決連續優化問題的方案。約束元組路由模型800分別路由不同輸入流的元組,而不是像對準元組路由方法那樣同時路由來自不同輸入的元組。圖8示出了用於三路流連接算子的約束元組路由方案。對於具有探測序列的任何元組si∈Si,1≤i≤n,約束元組路由基於先前的相關元組的布局為元組si以及所有的中間連接結果1≤k≤n-1進行路由判定。為了避免要求所有的連接算子進行路由計算,將約束元組路由實現為計算si的整個路由路徑從而與其它n-1個流連接的源路由過程。每個元組攜帶其路由路徑以表示其需要訪問來產生連接結果的主機的集合。為了減少路由計算開銷,約束元組路由將每個輸入流上的元組分組成段,並且將每個分段作為整體路由至不同的主機。因而,約束元組路由僅需要計算每個分段的路由而不是每個元組的路由。分段長度表示負荷平衡粒度與路由開銷之間的折衷。約束元組路由還維護記錄先前所路由的分段的布局的路由表。如果分段不需要基於多路流連接語義而與任何未來的分段相關,則從路由表刪除分段信息。圖9是依照說明性實施例的集中式連接方法的流程圖。圖9的方法可以在諸如圖4的多路流連接算子400的多路流連接算子中實現。通過接收用於所連接的流的元組開始過程(步驟902)。例如,步驟902的原始數據流可以由輸入流緩衝器接收。步驟902的原始流可以是諸如圖4的流1402、流2404、流3406和流4408的流。接下來,該過程選擇一個流以基於時間戳處理(步驟904)。例如,根據當前所緩衝的元組的時間戳,該流是下一元組si。接下來,該過程根據所選擇的流的時間戳標記所有其它流中的過期元組(步驟906)。該過程接下來從所有的流移除已被所有其它的流處理並且被標記為過期的過期元組(步驟908)。步驟906和908用於查找步驟904中所選擇的流的連接順序。接下來,該過程基於連接順序產生連接結果(步驟910)。該過程然後確定流連接是否是完整的(步驟912)。如果流連接不是完整的,則該過程接收用於所連接的流的元組(步驟902)。在步驟910中,該過程還可以更新指針pi以指出步驟904中將要選擇的、在輸入流緩衝器Qi中的下一個流或元組。如所示,重複該過程以繼續處理所有的流。如果在步驟912的確定中流連接是完整的,則該過程結束。圖10是依照說明性實施例的多路流連接方法的流程圖。圖10的方法可以在諸如圖5的可擴縮的分布式流處理系統500的多路流連接系統中實現。通過接收用於所連接的流的元組開始過程(步驟1002)。該過程然後在擴散算子處為每個元組計算路由路徑(步驟1004)。接下來,該過程基於由擴散算子計算的路由路徑將元組路由至一個或多個主機(步驟1006)。接下來,該過程在不同的主機並行地進行集中式連接(步驟1008)。主機可以是諸如圖5的伺服器主機506的伺服器主機。集中式連接可以包括圖9的方法和步驟。接下來,該過程在融合算子處基於連接標識聚集連接結果(步驟1010)。融合算子可以是諸如圖5的融合算子504的算子。該過程然後確定流連接是否是完整的(步驟1012)。如果流連接是完整的,則該過程終止。如果流連接在步驟1012不是完整的,則該過程接收用於所連接的流的元組(步驟1002)。當已經處理了所有輸入元組的時候,就已完全產生了完整的連接結果。圖11是依照說明性實施例的對準元組路由方法的流程圖。可以使用諸如圖7的對準元組路由模型700的對準元組路由模型實現圖11中所描述的步驟。連續地重複圖11的過程來處理輸入元組。通過接收元組Si開始過程(步驟1102)。接下來,該過程確定元組是否屬於主流SA(步驟1104)。步驟1104的確定基於元組的流標識以及主流的標識。如果元組屬於主流,則該過程確定是否開始新的分段(步驟1106)。步驟1106的確定基於元組的時間戳以及當前分段的開始/結束時間。如果確定開始新的分段,則該過程存儲最後選擇的主機vblast(步驟1110)。接下來,該過程選擇新的主機Vb(步驟1112)。該過程然後將Si發送至新選擇的主機Vb(步驟1114)。此後,該過程將更新的分段開始時間更新為t=t+T(步驟1116)。接下來,該過程確定流連接是否是完整的(步驟1109)。返回步驟1106,如果該過程確定不開始新的分段,則該過程將Si發送至為當前分段選擇的主機(步驟1108)。接下來,該過程確定流連接是否是完整的(步驟1109)。如果該過程確定流連接是完整的,則該過程終止。如果在步驟1109中該過程確定流連接不是完整的,則該過程接收元組Si(步驟1102)。返回步驟1104,如果在步驟1104中該過程確定元組並不屬於主流SA,則該過程確定是否開始新的分段(步驟1118)。步驟1118的確定基於元組的時間戳以及當前分段的開始/結束時間。如果該過程確定不開始新的分段,則該過程首先將Si發送至為當前分段選擇的主機Vb(步驟1120)。接下來,該過程確定Si是否在t+WA之前到達(步驟1122)。如果Si確實是在t+WA之前到達的,則該過程將Si發送至為最後的分段選擇的主機vblast(步驟1124)。接下來,該過程確定流連接是否是完整的(步驟1109)。如果在步驟1122中Si不是在t+WA之前到達的,則該過程確定流連接是否是完整的(步驟1109)。如果在步驟1118中該過程確定開始新的分段,則該過程將Si[t+T-Wi,t+T]衝刷(flush)至Vb(步驟1126)。接下來,該過程將Si發送至Vb和Vblast(步驟1128)。接下來,該過程將分段開始時間更新為t=t+T(步驟1130)。接下來,該過程確定流連接是否是完整的(步驟1109)。圖14的偽代碼中進一步解釋了圖11的過程。圖12A-12B是依照說明性實施例的約束元組路由方法的流程圖。可以使用諸如圖8的約束元組路由模型800的約束元組路由模型實現圖12A-12B。通過接收元組Si開始過程(步驟1202)。接下來,該過程確定是否開始新的分段(步驟1204)。如果該過程確定開始新的分段,則該過程檢索探測序列(步驟1206)。接下來,該過程以v0=初始化第一路由跳轉(步驟1208)。接下來,該過程設置k=1(步驟1210)。接下來,該過程確定是否k<n(步驟1214)。如果k>n,則該過程更新路由表路徑(步驟1216)。接下來,該過程將分段開始時間更新為t=t+T(步驟1218)。接下來,該過程將新的分段的位置信息添加到路由表中(步驟1220)。接下來,該過程確定流連接是否是完整的(步驟1221)。如果流連接是完整的,則該過程終止。如果在步驟1221中流連接不是完整的,則該過程接收元組Si(步驟1202)。如果在步驟1214中k<n,則該過程檢索Sik[Wik]中分段的位置(步驟1222)。接下來,該過程移除由前一跳Vk-1所覆蓋的那些分段(步驟1224)。接下來,該過程為Sik[Wik]計算最小的集合覆蓋(步驟1226)。接下來,該過程為避免重複而注釋路由路徑(步驟1228)。接下來,該過程將Vk附加到路由路徑P(步驟1230)。接下來,該過程設置k=k+1(步驟1232)。該過程然後返回確定是否k<n(步驟1214)。返回步驟1204,如果在步驟1204中該過程確定不開始新的分段,則該過程檢查路由表以獲得Si的當前分段的路由路徑(步驟1234)。接下來,該過程以路由路徑注釋Si(步驟1236)。接下來,該過程將Si的副本發送至第一路由跳轉中的每個主機(步驟1238)。接下來,該過程確定流連接是否是完整的(步驟1239)。如果流連接是完整的,則該過程終止。如果在步驟1239中流連接不是完整的,則該過程接收元組Si(步驟1202)。圖15的偽代碼中進一步解釋了圖12A-12B的過程。圖13是依照說明性實施例的多路流連接算法。多路流連接算法1300是用於在單個主機上處理多路流查詢的集中式算法。多路流連接算法1300可以通過諸如圖4的多路流連接算子400的算子實現。在諸如圖9的步驟的過程中可以實現多路流連接算子的基本步驟。系統對每一個輸入流Si維持一個隊列Qi,1≤i≤n,用於緩衝輸入元組。Qi可以是輸入緩衝器。當新的元組si∈Si到達的時候,如果本地主機上的存儲空間沒有滿,則將新的元組插入相應的隊列Qi。否則,系統要麼捨棄新近到達的元組,要麼用新近到達的元組替換緩衝器中舊的元組。以時間順序處理所有隊列中的元組。例如,如果si·t<sj·t,那麼首先處理si。每個隊列Qi都保持指針指向由連接算子當前處理的元組的緩衝器中的元組。如果當前正在處理的元組是si,則連接算子將si與所有其它的流Sj[Wj],1≤j≤n,j≠i進行比較,以產生包括si的所有的連接結果。第j個流的滑動窗Sj[Wj]包括在時間si·t-Wk與si·t之間到達Sj的所有元組sj。兩個元組之間的每個連接判定評估稱為一個連接探測。基於不同的流之間的連接選擇性動態地判定si∈Si的連接順序[11,1,10]。對si的連接處理從其自身開始,並且選擇與Si具有最小選擇性的流Sj作為下一跳。然後,利用與Sj具有最少連接選擇性的下一個所選擇的流Sk連接所有的中間結果例如,在圖4中,當前正在處理的元組是在時間8到達流S3的s38410。S3的探測序列是S3→S4[W4]→S1[W1]→S2[W2]。因而,基於S3與S4之間的連接判定θ3,4首先將s2與S4[W4]進行比較。將中間結果與S1[W1]進行比較。最後,將與S2[W2]進行比較。圖14是依照說明性實施例的對準元組路由算法。對準元組路由算法1400有差別地處理連接算子的輸入流,並且根據一個選擇的主流來對準所有流的元組路由操作。對準元組路由算法1400可以在諸如圖7的其中S1是主流的、用於三路流連接算子的對準元組路由模型700的路由模型中實現。對準元組路由算法的基本步驟可以在諸如圖11的步驟的過程中實現。對準元組路由算法1400動態地選擇一個輸入流作為主流,表示為SA,並且根據時間戳將所有其它輸入流的元組與主流對準。擴散算子將主流SA切成或劃分成不相交的分段。不相交的分段是彼此不具有共同元組的分段。將一個分段中所有的元組路由至相同的主機,而基於諸如最小負荷最先(LLF)的特定的調度策略將不同的分段路由至不同的主機。相比之下,為了符合相關約束,將其它n-1個流基於對主流的劃分而劃分成重疊的分段,其被稱為從流。重疊的分段是彼此具有至少一個共同元組的分段。通常,對準元組路由連續地將所有輸入流分成不同的分段,每個分段含有在特定的時間周期內到達的元組。Si[t,t+T]表示包括在時間[t,t+T]內到達Si的所有元組的Si的分段,其中t稱為分段的開始時間,並且T稱為分段長度。基於相關約束將從流的分段與主流的分段對準。將屬於已對準的分段的元組路由至相同的主機用於產生連接結果。例如,圖7示出了用於主流S1和兩個從流S2、S3的對準元組路由算法的路由結果。為了便於說明,假定ri=1元組/秒,i=1,2,3,並且分段長度T=5,並且三個滑動窗的大小W1=2,W2=2和W3=3。擴散算子將主流S1分成被分別路由至主機v1、v2和v3的不相交的分段。將從流S2劃分成重疊的分段S2[1,7]到v1、S2[4,12]到v2,以及S2[9,17]到v3。將從流S3也劃分成重疊的分段S3[1,7]到v1、S3[3,14]到v2,以及S3[8,17]到v3。圖14描述了用於擴散連接算子的負荷的具體的對準元組路由算法1400步驟。圖14示出了用於使用m個主機{v1,...,vm}處理J的對準元組路由算法1400的偽代碼。區段1402描述了用於主流SA的路由步驟。當擴散算子接收到來自於SA的元組sA的時候,其首先根據sA的時間戳sA·t檢查sA是否屬於當前分段SA[t,t+T]。如果t≤sA·t<t+T,則sA屬於當前分段,並且將其路由至在分段的開始時間t所選擇的主機vi。如果sA·t≥t+T,則對準元組路由開始新的分段,並且選擇新的主機作為該新的分段的路由目的地。對準元組路由遵循最小負荷最先(LLF)策略來為每個分段選擇主機。因為在分布式連接處理系統中考慮了諸如處理器、存儲器和網絡帶寬的不同資源,所以組合度量wi表示主機vi的負荷條件。對於每種資源類型Ri,對準元組路由算法1400定義負荷指示器Ri=URiCRi,]]>其中URi和CRi分別表示主機vi上的資源Ri的使用和容量。將負荷值wi定義如下wi=ω1Φcpu+ω2·Φmemory+ω3·Φbandwith其中i=13i=1,]]>0≤ωi≤1表示可以由系統動態配置的不同資源類型的重要性。基於對主流的劃分,對準元組路由將所有的從流分成重疊的分段用於保持相關約束。對於從流Si,i≠A,如果對準元組路由將分段SA[t,t+T]路由至主機vk,則對準元組路由將分段Si[t-Wi,t+T+WA]路由至相同的主機vk,以便符合相關約束。類似地,如果對準元組路由將主流的下一分段SA[t+T,t+2T]發送至主機vj,則對準元組路由需要將從流的分段Si[t+T-Wi,t+2T+WA],1≤i≤n,i≠A發送至相同的主機vj。因而,將在時間周期[t+T-Wi,t+T+WA]之間到達Si的元組發送至vi和vj二者。重複元組的數目是ri·(WA+Wi)。對準元組路由算法1400假設擴散算子具有緩衝容量以高速緩衝Si[t-Wi,t],1≤i≤n,i≠A中的元組。如果該假設不成立,則將對每個分段SA[t,t+T]的主機選擇移位至較早的時間t-Wj,其中Wj表示所有從流中最大的滑動窗。例如,在圖7中,在時間t=3選擇對第二分段的布局。然後,對準元組路由將S2[4,7]和S3[3,7]中的元組路由至主機v1和v2二者。通過示出對準元組路由與原始連接算子產生相同的連接結果集合證明了對準元組路由算法1400的正確性。C(J)和C′(J)分別表示由原始連接算子以及使用對準元組路由算法的分布式處理方案所產生的連接結果的集合。通過示出C(J)=C′(J)證明了對準元組路由算法的正確性。定理A給定多路流連接算子令C(J)和C′(J)分別表示由原始連接算子以及由使用對準元組路由算法的分布式處理方案所產生的連接結果的集合。由此,C(J)=C′(J)。證明概略通過示出si,1≤i≤n證明C(J)C′(J),如果那麼這是通過示出如果對準元組路由將si發送至伺服器vi,則對準元組路由也將sik∈Sik[Wik],1≤k≤n-1發送至vi來證明的。首先考慮si屬於主流的情況。假設將si∈Si[t,t+T]發送至主機vi。對準元組路由算法也將Sik[t-Wik,t+T+Wi]發送至vi。另一方面,滑動窗Sik[Wik]包括Sik[si·t-Wik,si·t]中所有的元組。因為si·t∈[t,t+T],所以Sik[si·t-Wik,Sik[si·t-Wik,si·t]Sik[t-Wik,t+T+Wi]也是正確的。因而,對準元組路由也將Sik[Wik],1≤k≤n-1中所有的元組發送至主機vi。當考慮si屬於從流的時候,證明sA∈SA[WA],其中SA表示主流,對準元組路由在sA被發送的機器上發送si的副本。假設sA屬於分段SA[t,t+T],並且被發送至vi。對準元組路由也將分段Si[t-Wi,t+T+WA]發送至vi。通過證明si∈Si[t-Wi,t+T+WA],則因為sA屬於分段SA[t,t+T],所以得到t≤sA·t<t+T。因而,結果是si·t≥sA·t≥t以及si·t<sA·t+WA<t+T+WA。因而,結果是t≤si·t<t+T+WA。從而si屬於分段Si[t-Wi,t+T+WA],其也被發送至vi。通過證明sj∈Sj[Wj],其中Sj表示從流,對準元組路由將si的副本和si發送至相同的主機。假設si屬於與被路由至vi的主流分段SA[t,t+T]對準的分段Si[t,t+T]。因而,對準元組路由也將Sj[t-Wj,t+T+WA]發送至vi。接下來,通過證明Sj[Wj]=Sj[si·t-Wj,si·t]Sj[t-Wj,t+T+WA],則因為t≤si·t<t+T,得出t-Wj≤si·t-Wj以及si·t<t+T+WA。因而,Sj[Wj]Sj[t-Wj,t+T+WA]也被發送至vi。得出結論,即si,si和Sik[Wik],1≤k≤n-1出現在相同的主機上。因而,C(J)C′(J)。接下來證明C′(J)C(J)。首先,由擴散連接算子所產生的C′(J)中的任何連接結果都遵循多路流連接語義,這也應當出現在C(J)中。其次,由於主流上的任何元組sA∈SA不會出現在兩個不同的主機上,所以對準元組路由並不產生重複的連接結果。因而,C′(J)C(J)。結合C(J)C′(J)和C′(J)C(J),得到結果C(J)=C′(J)。□還可以分析對準元組路由算法的開銷。相比於原始輸入流,由於從流的部分重複,對準元組路由將更多的元組推入了系統中。將對準元組路由算法的開銷定義為每單位時間由對準元組路由產生的額外元組的數目。分布式流處理系統需要消耗一部分網絡帶寬、CPU時間以及存儲空間,用於傳輸、處理以及緩衝那些開銷數據。定理B給定多路流連接算子令SA表示當前的主流。令T表示分段長度。令ri,1≤i≤n表示流Si的平均速率。令OATR表示對準元組路由算法的平均開銷。由此,OATR=i=1,iAnWi+WT.]]>證明概略對於在時間周期T上的每個分段SA[t,t+T],以及每個從流Si,1≤i≤n,i≠A,對準元組路由引入比原始從流Si更多的元組ri·(Wi+WA)。因而,對於每個分段長度T,由對準元組路由算法產生的額外元組的總數是。因而,每單位時間由對準元組路由算法產生的額外元組的平均數目是以上分析揭示了對準元組路由算法的有趣的特性。對準元組路由算法的開銷獨立於用於處理連接算子的主機數。該特性使得對準元組路由特別適合於大規模流處理群集,其中可以從大量主機搜集可用資源而沒有過多的負荷擴散開銷。各種自適應方案可以用於優化動態流環境中對準元組路由算法的性能。根據定理B,觀察到對準元組路由算法的開銷與分段長度成反比。由於在較長的時間周期上分攤了重疊分段的成本,因此較大的分段長度引入較少的負荷擴散開銷。然而,由於大分段中的大量元組被強制到相同的主機,因此大的分段長度限制了負荷平衡粒度。因此,對準元組路由自適應地調整或觸發分段長度T的分段自適應,以便在動態流環境中維持最優的性能。對準元組路由採用基於採樣的仿形(profiling)過程來找到當系統條件改變時的最優分段長度。令T表示當前的分段長度,並且ΔT表示自適應步長值。自適應過程測試T+ΔT和T-ΔT二者。如果T+ΔT的性能較好,則最優分段長度應當大於當前的分段長度。系統逐漸增加分段長度,直到所測量的系統性能達到其峰值。否則,如果T-ΔT產生較好的性能,則系統逐漸降低分段長度以搜索最優值。對準元組路由總是在一個分段的末尾改變分段長度,以保證自適應不違犯相關約束。對準元組路由的開銷僅與從流的速率有關,而獨立於主流的速率。在動態流環境中,每個輸入流的速率可以隨時間動態地改變。因此,對準元組路由根據定理B動態地選擇具有最小負荷擴散開銷的主流。主流應當總是具有最高速率的流。當主流的速率變得慢於從流之一的時候,對準元組路由採用轉換階段來改變主流。類似於分段自適應,總是在一個分段的末尾觸發流角色交換以符合相關約束。圖15是依照說明性實施例的約束元組路由算法。約束元組路由算法1500可以在諸如圖8的約束元組路由模型800的路由模型中實現。約束元組路由算法1500的基本步驟可以在諸如圖12A-B的步驟的過程中實現。約束元組路由1500在區段1502中為具有探測序列的任何元組si∈Si,1≤i≤n進行路由判定,基於先前的相關元組的布局,約束元組路由為元組si以及所有的中間連接結果1≤k≤n-1進行路由判定。為了避免要求所有的連接算子進行路由計算,將約束元組路由作為計算si的整個路由路徑以便與其它n-1個流連接的源路由算法來實現。每個元組攜帶其路由路徑以表示其需要訪問來產生連接結果的主機的集合。為了減少路由計算開銷,約束元組路由將每個輸入流上的元組分組成段,並且將每個分段作為整體路由至不同的主機。因而,約束元組路由僅需要計算每個分段的路由而不是每個元組的路由。分段長度表示負荷平衡粒度與路由開銷之間的折衷。約束元組路由還維護記錄先前所路由的分段的布局的路由表。如果分段不需要基於多路流連接語義與任何未來的分段相關,則從路由表刪除該分段的信息。在區段1504中,約束元組路由為需要與滑動窗Si1[Wi1]中的元組連接的分段ηi=Si[t,t+T]進行路由判定。約束元組路由首先得到Si1[Wi1]=Si1[t-Wi1,t+T]中所有元組的位置。為最小化開銷,約束元組路由選擇可以覆蓋所有的相關元組的最小主機集合V1{v1,...,vm}。將以上問題用公式表達為將詳細描述的加權最小集合覆蓋問題。在約束元組路由期間,連接順序用於將多路相關處理分成多個較小的算子。具體地,將n路連接算子劃分成(n-1)個2路連接算子每個2路連接算子都可以在不同的主機上執行。在每跳查找相關分段的位置。在每跳計算覆蓋所有相關分段的最小主機集合。約束元組路由然後將ηi的第一路由跳轉設置為V1中的所有主機。可以將分段ηi=Si[t,t+T]保存在V1中每個主機上的存儲緩衝器Qi中,直到根據連接語義不再需要其元組。約束元組路由還更新路由表以記錄分段ηi=Si[t,t+T]位於V1中的一組主機上。例如,在圖8中,約束元組路由計算S1[9,10]的路由,其探測序列是s1→S2[W2]→S3[W3]。約束元組路由得到S2[W2]中所有相關元組的布局S2[1,2]在v1、v2上;S2[3,4]在v2、v3上;S3[5,6]在v1、v4上,等等。約束元組路由然後選擇可以覆蓋S2[W2]中所有元組的最小主機集合V1={v2,v4}。因此,約束元組路由將S1[9,10]的路由路徑上的第一跳設置為V1={v2,v4}。約束元組路由還在路由表中添加條目,指定將分段S1[9,10]置於主機V1={v2,v4}。接下來,約束元組路由需要將中間結果路由至覆蓋Si2[Wi2]中所有元組的主機。類似於第一步,約束元組路由首先獲取Si2[Wi2]中所有相關元組的位置。然而,為了最小化經過不同主機傳輸中間結果的開銷,對的路由判定應當考慮的當前位置。給定第一路由跳轉V1={v1,...,vk},約束元組路由首先消除Si2[Wi2]中已經被V1中的主機覆蓋的那些元組。合理性在於當前位於V1中的主機上的任何中間結果應當與Si2[Wi2]中局部可用的元組連接。然後,約束元組路由計算最小主機集合V2以覆蓋Si2[Wi2]中那些剩餘的元組。然而,與原始元組不同,並不將中間結果緩衝於存儲隊列用於與其它元組連接。因而,約束元組路由並不需要將中間結果的布局記錄在路由表中。例如,在圖8中,第二路由跳轉是為當前位於主機v2、v4的中間結果選擇一組主機。然後,由於S3[3,4]和S3[7,8]已經被主機v2覆蓋,因此約束元組路由將其移除。接下來,基於剩餘元組的位置,例如在{v3,v5}上的S3[1,2]、在{v5,v6}上的S3[5,6]、在{v6,v7}上的S3[9,10],約束元組路由計算最小主機集合V2={v5,v6}作為分段S1[9,10]的第二路由跳轉。重複以上計算直到約束元組路由計算出中所有n-1個探測步驟的主機集合V1,...,Vn-1。然後,約束元組路由通過為分段ηi插入條目來更新路由表,其中分段ηi的位置是由V1所指定的主機集合。在區段1504中,約束元組路由算法1500以路由路徑V1→V2...→Vn-1注釋每個元組si∈Si[t,t+T]。圖14示出了用於使用m個主機{v1,...,vm}處理連接算子的約束元組路由算法的偽代碼。另一算法可以用於諸如圖12A-B的過程的最優主機集合選擇。最優主機選擇算法的目的是為每個路由跳轉Vk,1≤k≤n-1選擇最佳主機集合。第k個路由跳轉的目標是在χo=si與Sik[Wik]中所有的元組之間產生所有的連接結果。假設滑動窗Sik[Wik]包括表示為E={η1,..,ηe}的一組分段。從路由表檢索每個分段的布局信息。每個分段ηz,1≤z≤e分布在主機集合Uz{v1,...,vm}上。約束元組路由然後將分段布局信息轉換成主機覆蓋信息。例如,假設ηz分布在主機集合Uz上,Uz中的每個主機覆蓋分段ηz。讓我們表示為Y=Yz=1mUz.]]>對於每個主機vi∈Y,其覆蓋形成E的子集的一組分段,表示為AiE。因為目標是實現平衡的負荷分布,所以將Xk-1分布於可以覆蓋Sik[Wik]中所有相關元組的最小數目的最少負荷主機。因而,權值wi與每個子集Ai相關聯。權值wj是主機vi的負荷值wi,其由負荷值的方程來定義。因此,將最優主機選擇問題用公式表達為加權最小集合覆蓋問題定義給定基本集合(groundset)E={η1,...,ηe},子集A1,...,AkE,以及每個子集Ai的成本wi,目標是找到最小集合覆蓋I{1,...,K},以便∪j∈IAj=E以及最小。根據I導出主機集合Vk。例如,如果集合覆蓋I={1,2},那麼Vk={v1,v2}。最小集合覆蓋問題是眾所周知的NP難解問題(NP-hardproblem)。因此,約束元組路由使用貪婪啟發式(greedyheuristic)算法來找到最小集合覆蓋。基本思想是選擇具有argminj,Aj0wj|Aj|]]>的最小值的子集Aj,其中|Aj|表示集合Aj的基數。將Aj添加到集合覆蓋I中,並且通過移除包括在Aj中的那些元素來更新每個剩餘的子集。重複將Aj添加到集合的過程,直到所選擇的集合覆蓋I包括E={η1,...,ηe}中所有的分段。然而,以上方案可能進行多餘的連接計算。假設當前所選擇的主機集合是Vk。對於任何分段ηz∈Sik[Wik],其被置於主機集合Uz={vz1,...,vz1}上。如果集合Vk和Uz含有多於一個的共同主機(即,|Vk∩Uz|>1),則在含於|Vk∩Uz|中的不同主機處多餘地計算了Xk-1與ηz之間的連接探測。這樣的多餘計算可能會導致多餘的連接結果。為了解決該問題,注釋由Xk-1的不同副本所攜帶的路由路徑,以保證僅由一個主機執行每個連接探測。為了與Sik[Wik]中所有的元組相關,將Xk-1的副本發送至Vk中的所有主機。對於位於Vz中的主機上的ηz∈{η1,...,ηe},如果|Vk∩Uz|>1,則選擇來自於Vk∩Uz的最小負荷主機Vj執行Xi與ηz之間的連接探測。對於任何其它的主機vj∈Vk∩Uz,以標誌(vj/ηz)注釋路由路徑,其意味著的任何中間結果元組並未與主機vj上的ηz連接。通過證明約束元組路由與原始連接算子產生相同的連接結果的集合,示出了約束元組路由算法的正確性。C(J)和C′(J)分別表示由原始連接算子以及由使用約束元組路由算法的分布式處理方案所產生的連接結果的集合。通過示出C(J)=C′(J),證明了約束元組路由算法的正確性。定理C給定多路流連接算子令C(J)和C′(J)分別表示由原始連接算子以及由使用約束元組路由算法的分布式處理方案所產生的連接結果的集合。由此,C(J)=C′(J)。證明概略通過示出如果si,1≤i≤n,那麼首先證明了C(J)C′(J)。這是通過證明更強的命題來證明的k,1≤k≤n-1,是由約束元組路由產生的。使用數學歸納法(1)證明當k=1時命題成立。由於約束元組路由將si發送至覆蓋Si1[Wi1]中所有元組的主機集合V1,因此約束元組路由產生(2)假設該命題對於某個k,1≤k≤n-2是成立的,證明該命題對於k+1是成立的。根據該假設,約束元組路由產生由於Sik+1[Wik+1]中所有的元組要麼與Xk共置(co-located),要麼由第k+1個路由跳轉Vk+1中的主機所覆蓋,因此中所有的結果元組都是由約束元組路由產生的。因此,C(J)C′(J)。接下來,證明C′(J)C(J)。首先,由擴散連接算子所產生的C′(J)中的任何連接結果都遵循多路流連接語義,這也應當出現在C(J)中。其次,證明約束元組路由並不產生任何重複的結果。由於約束元組路由實現重複避免,因此任何結果元組1≤k≤n-1都僅由單個主機產生一次。因而,C′(J)C(J)。結合C(J)C′(J)和C′(J)C(J),得到結果C(J)=C′(J)。□將約束元組路由算法1500的開銷定義為每單位時間由約束元組路由所產生的額外數據元組的數目。不同於進行單跳路由的對準元組路由,約束元組路由進行多跳路由,其不僅需要在多個主機上複製原始輸入流的元組,而且需要經過不同主機傳輸中間結果。因而,約束元組路由的開銷由兩部分組成(1)通過將分段si[t,t+T]發送至由第一路由跳轉V1所指定的多個主機來複製原始輸入流;(2)將中間結果傳輸至由Vk所指定的多個主機。對於開銷的第一部分,系統需要花費額外的處理、存儲器以及網絡帶寬,用於那些開銷數據。然而,由於並未將中間結果存儲在存儲緩衝器中,因此中間結果的開銷僅引起CPU和帶寬花費。定理D給定多路流連接算子令ri,1≤i≤n表示流Si的平均速率。令T表示分段長度。Si的探測序列表示為Si1,...,Sin-1。令σi,j定義Si與Sj之間的連接選擇性。將Si,1≤i≤n中元組的複製品的平均數目表示為Mi。將中間結果的複製品的平均數目表示為Mik。令OCTR表示原始數據流的平均複製開銷。令O*CTR表示中間結果的平均開銷。由此,OCTR*=i=1nk=2n-2Mik(ijj=1k-1ij,ij+1)(rij=1k-1rijWij).]]>證明概略對於每個分段Si[t,t+T],1≤i≤n,在T的時間周期內,約束元組路由相比於原始輸入流發送(Mi-1)·ri·T個額外元組。因而,每單位時間由約束元組路由算法所產生的額外元組的平均數目是對於每個分段Si[t,t+T],1≤i≤n,其需要與Si1[Wi1],...,Sin-1[Win-1]連接。從產生的中間結果的數目是σi,i1(ri·T)·(ri1·Wi1)。將每個中間結果發送至Mi2以連接Si2[Wi2]。用於的中間結果的開銷是Mik·σi,i1(ri·T)·(ri1·Wi1)·σi1,i2·(ri2·Wi2)...σik-1,ik(rik·Wik)。因而,用於計算的中間結果的總數是對於所有的n個輸入流,每單位時間由約束元組路由算法產生的中間結果的總數是類似於對準元組路由方案,約束元組路由的開銷也獨立於用於執行多路流連接算子的主機{v1,...,vm}的總數。因而,約束元組路由允許連接算子利用分布式流處理系統中所有可用的主機而沒有過多的開銷。約束元組路由的開銷取決於兩個新的參數Mi和Mik,其定義了用於路由原始元組和中間結果元組的主機集合的平均數目。由於我們的最優主機集合選擇算法總是選擇最小的主機集合來符合相關約束。Mi或Mik的值常常遠小於總的主機數。不同於對準元組路由的複製開銷OATR,原始流的複製開銷OCTR獨立於滑動窗大小。因此,當連接算子採用大的滑動窗的時候,約束元組路由可以具有比對準元組路由少的開銷。儘管相比於對準元組路由,約束元組路由具有額外的中間結果開銷,然而由於在實際應用中連接選擇性常常很小,因此中間結果開銷O*CTR並不顯著。對準元組路由與約束元組路由之間的其它的不同在於對準元組路由有差別地處理n個輸入流,而約束元組路由平等地處理所有的輸入流。因而,對準元組路由更適於在具有小滑動窗的一個快流和一組慢流中連接的情況,而約束元組路由則在所有的輸入流都具有類似的速率並且連接算子採用大的滑動窗的時候效果最好。此外,約束元組路由需要維護路由表,該路由表記錄新近所路由的分段的位置。儘管分段長度不影響約束元組路由的複製開銷,然而分段長度決定路由表的大小以及路由計算開銷。因而,說明性實施例提供了一種用於在流處理環境中自動規劃的方法。當應用於流處理規劃問題的時候,所描述的搜索方法實現了相比於其它規劃方法顯著改善的可擴縮性。通過實現對複雜多路流連接的精確處理,改善了可擴縮性。通過使用並行處理加速了處理。另外,說明性實施例適應數據流波動。本發明可以採取全硬體實施例、全軟體實施例或者既含有硬體元素又含有軟體元素的實施例的形式。在優選的實施例中,以軟體實現本發明,其包括但不限於固件、常駐軟體、微碼等。此外,本發明可以採取可訪問於計算機可用或計算機可讀介質的電腦程式產品的形式,該計算機可用或計算機可讀介質提供由計算機或任何指令執行系統使用的或者與計算機或任何指令執行系統相連的程序代碼。對於該描述來說,計算機可用或計算機可讀介質可以是可以容納、存儲、通信、傳播或傳送由指令執行系統、裝置或設備使用的或者與指令執行系統、裝置或設備相連的程序的任何實體裝置。介質可以是電子、磁性、光學、電磁、紅外或半導體系統(或裝置或設備)或者傳播介質。計算機可讀介質的例子包括半導體或固態存儲器、磁帶、可移動計算機磁碟、隨機訪問存儲器(RAM)、只讀存儲器(ROM)、硬磁碟和光碟。光碟的當前的例子包括壓縮磁碟-只讀存儲器(CD-ROM)、壓縮磁碟-讀/寫(CD-R/W)和DVD。適於存儲和/或執行程序代碼的數據處理系統將包括通過系統總線直接或間接耦合於存儲元件的至少一個處理器。存儲元件可以包括在程序代碼的實際執行期間所使用的局部存儲器、大容量存儲器,以及為了減少在執行期間必須從大容量存儲器檢索代碼的次數而提供至少一些程序代碼的臨時存儲的高速緩衝存儲器。輸入/輸出或I/O設備(包括但不限於鍵盤、顯示器、指點設備等)可以直接地或者通過插入I/O控制器耦合於系統。還可以將網絡適配器耦合於系統,從而使得數據處理系統能夠適於通過介入專用或公用網絡耦合於其它的數據處理系統或遠程印表機或存儲設備。數據機、電纜數據機和乙太網卡僅僅是幾種當前可用類型的網絡適配器。已經出於說明和描述的目的給出了對說明性實施列的描述,但並不旨在以所公開的形式窮舉或限制本發明。對本領域的普通技術人員來說,很多修改和變形是顯而易見的。所選擇和描述的實施例是為了最好地解釋本發明的原理、實際應用,以及使本領域的普通技術人員能夠針對各種實施例以及適於預期的特定用途的各種修改理解本發明。權利要求1.一種用於處理多路流相關的計算機實現的方法,所述計算機實現的方法包括接收流數據用於相關;形成用於連續地將多路流相關工作負荷劃分成較小的工作負荷片的任務,其中所述較小的工作負荷片中的每一個均可以由單個主機處理;以及將所述流數據發送至不同的主機用於相關處理。2.根據權利要求1的計算機實現的方法,其中所述形成步驟用於進行流劃分,並且其進一步包括將輸入流分成分段;以及將所述分段分布到所述不同的主機上用於相關處理。3.根據權利要求1的計算機實現的方法,其中所述形成步驟用於進行算子劃分,並且其進一步包括將多路相關算子分成多個較小的算子;以及在不同的分布式主機上執行不同的較小的算子。4.根據權利要求1的計算機實現的方法,其中通過多個輸入流上的滑動窗連接實現所述多路流相關。5.根據權利要求1的計算機實現的方法,其中所述形成步驟用於形成連續優化問題,並且其包括標識相關約束;計算每個流數據的路由路徑,用於確保滿足所述相關約束、平衡所述不同主機的工作負荷,以及最小化所述流數據的複製開銷。6.根據權利要求2的計算機實現的方法,其進一步包括選擇具有最高速率的輸入流作為主流,並且將其它的流指示為從流;將所述主流劃分成不相交的分段;以及將所述從流劃分成重疊的分段以符合相關約束。7.根據權利要求6的計算機實現的方法,其中所述選擇步驟在運行時是動態改變的。8.根據權利要求7的計算機實現的方法,其中所述選擇步驟進一步包括響應於所述主流慢於所述其它流中的任何一個,觸發流角色交換,用於選擇所述具有最高速率的輸入流作為所述主流,並且將所述其它的流指示為從流;以及根據所述相關約束採用轉換階段來實現所述流角色交換。9.根據權利要求6的計算機實現的方法,其中所述將所述主流劃分成不相交的分段的步驟用於調整所述不相交的分段的分段長度,並且其進一步包括當系統條件改變時觸發分段自適應;進行基於採樣的仿形過程,以便搜索新的分段長度。10.根據權利要求3的計算機實現的方法,其中所述算子劃分進一步包括確定連接順序,以便將所述多路相關處理分成多跳;在每跳查找相關分段的位置;以及在每跳計算覆蓋所有所述相關分段的最小主機集合。11.根據權利要求10的計算機實現的方法,其中由在不同流之間所觀察到的連接選擇性來確定所述連接順序。12.根據權利要求10的計算機實現的方法,其中使用路由表進行所述查找步驟,其中所述路由表包括不同輸入流的滑動窗中相關分段的布局信息。13.根據權利要求10的計算機實現的方法,其中所述計算步驟進一步包括考慮前一跳的主機集合選擇;以及重新使用由所述前一跳選擇的主機,用於最小化中間結果傳輸。14.一種系統,其包括數據處理系統,所述數據處理系統用於接收來自於輸入流的流數據進行相關,以及用於連續地將多路流相關工作負荷劃分成較小的工作負荷片的任務形成,其中所述較小的工作負荷片中的每一個均可以由單個主機處理,其中所述任務用於進行算子劃分,以便將多路相關算子分成多個較小的算子、在分布式主機上執行不同的較小的算子,以及用於進行流劃分,以便將輸入流分成分段,並且將所述分段分布到主機上進行相關處理;以及多個主機,所述多個主機可操作連接至所述數據處理系統;其中所述數據處理系統將所述流數據發送至所述多個主機,用於所述相關處理。15.根據權利要求14的系統,其中所述數據處理系統選擇具有最高速率的輸入流作為主流,並且將其它的流指示為從流,將所述主流劃分成不相交的分段,並且將所述從流劃分成重疊的分段以符合相關約束。16.一種用於處理多路流相關的計算機系統,所述計算機系統包括接收裝置,所述接收裝置用於接收流數據用於相關;形成裝置,所述形成裝置用於形成連續地將多路流相關工作負荷劃分成較小的工作負荷片的任務,其中所述較小的工作負荷片中的每一個均可以由單個主機處理;以及發送裝置,所述發送裝置用於將所述流數據發送至不同的主機用於相關處理。全文摘要一種用於處理多路流相關的計算機實現的方法、裝置以及計算機可用程序代碼。接收流數據用於相關。形成用於連續地將多路流相關工作負荷劃分成較小的工作負荷片的任務。所述較小的工作負荷片中的每一個均可以由單個主機處理。將所述流數據發送至不同的主機用於相關處理。文檔編號H04N7/26GK101067792SQ200710084738公開日2007年11月7日申請日期2007年2月28日優先權日2006年5月4日發明者顧曉暉,王海勳,P·S-L·渝申請人:國際商業機器公司