數據流處理的方法和裝置與流程
2023-06-14 10:34:11 6

本發明涉及數據處理技術領域,特別涉及一種數據流處理的方法和裝置。
背景技術:
目前,數據流可以應用到很多場景中,例如,數據流可以應用到欺詐監測、精準營銷推薦、罪案災情預測中等。由於數據流對數據處理的及時性要求非常高,因此,接收到數據流時,應立刻通過數據處理任務對數據流進行處理,也即流計算。
在通過數據處理任務對數據流進行處理之前,為數據處理任務分配內存空間和窗口計算算子邏輯,內存空間用於存儲該數據處理任務處理的數據流,窗口計算算子邏輯用於對內存空間中的數據流進行處理。例如,數據處理任務為求和運算時,窗口計算算子邏輯就是對內存空間中的數據流進行求和運算。當接收到該數據處理任務對應的數據流時,管理設備將該數據流存儲在該數據處理任務的內存空間中,當執行該數據處理任務時,管理設備從該內存空間讀取該數據流,通過窗口計算算子邏輯對該數據流進行處理。
在實現本發明的過程中,發明人發現現有技術至少存在以下問題:
由於多個數據處理任務可能對同一個數據流進行處理,然而在上述技術中會在每個數據處理任務的內存空間都存儲該數據流,浪費內存空間,導致空間利用率低。
技術實現要素:
為了解決現有技術的問題,本發明實施例提供了一種數據流處理的方法和裝置。技術方案如下:
第一方面,本發明實施例提供了一種數據流處理的方法,所述方法包括:
接收第一數據處理任務的第一數據流,以及,接收第二數據處理任務的第二數據流;
如果所述第一數據流和所述第二數據流相同,在共享內存空間中存儲所述第一數據流和所述第二數據流中的任一數據流,將所述任一數據流在所述共享內存空間中的位置索引存儲到第一內存空間和第二內存空間中,所述第一內存空間為所述第一數據處理任務的內存空間,所述第二內存空間為所述第二數據處理任務的內存空間;
當執行所述第一數據處理任務或所述第二數據處理任務時,根據所述位置索引,從所述共享內存空間中讀取所述任一數據流,根據所述任一數據流執行所述第一數據處理任務或所述第二數據處理任務。
本發明實施例所示的方案中,管理設備接收第一數據流和第二數據流後,如果第一數據流和第二數據流相同,管理設備在共享內存空間中存儲第一數據流和第二數據流中的任一數據流,並將該任一數據流在共享內存空間中的位置索引存儲到第一內存空間和第二內存空間中,當執行第一數據處理任務或第二數據處理任務時,根據該位置索引,從該共享內存空間中讀取第一數據流或者第二數據流。由於共享內存空間中只存儲了第一數據流和第二數據流中的任一數據流,避免了相同數據流的重複存儲,從而節省了內存空間,提高了內存空間的利用率。
在一種可能的設計中,所述在共享內存空間中存儲所述第一數據流和所述第二數據流中的任一數據流之前,所述方法還包括:
當檢測到所述第一數據處理任務或者所述第二數據處理任務是當前任務隊列的第一個數據處理任務時,創建所述共享內存空間。
本發明實施例所示的方案中,管理設備檢測到第一數據處理任務或者第二數據處理任務是當前任務隊列的第一個數據處理任務時,才創建共享內存空間,使得該當前任務隊列的多個數據處理任務對應一個共享內存空間,節省了內存空間。
在一種可能的設計中,所述根據所述任一數據流執行所述第一數據處理任務或所述第二數據處理任務之後,所述方法還包括:
當檢測到所述第一數據處理任務或者所述第二數據處理任務是所述當前任務隊列的最後一個數據處理任務,銷毀所述共享內存空間。
本發明實施例所示的方案中,管理設備檢測到第一數據處理任務或者第二數據處理任務是當前任務隊列的最後一個數據處理任務時,便銷毀該共享內存空間,提高了共享內存空間的使用率。
在一種可能的設計中,所述在共享內存空間中存儲所述第一數據流和所述第二數據流中的任一數據流之前,所述方法還包括:
獲取所述第一數據流的第一數據屬性和所述第二數據流的第二數據屬性;
根據所述第一數據屬性,確定所述第一數據屬性對應的所述共享內存空間,以及,根據所述第二數據屬性,確定所述第二數據屬性對應的所述共享內存空間。
本發明實施例所示的方案中,根據第一數據流的第一數據屬性,確定第一數據屬性對應的共享內存空間,以及,根據第二數據流的第二數據屬性,確定第二數據屬性對應的共享內存空間,提高了數據屬性相同的數據流中相同數據流合併的可能性,進一步提高了共享內存空間的利用率。
在一種可能的設計中,所述如果所述第一數據流和所述第二數據流相同,在共享內存空間中存儲所述第一數據流和所述第二數據流中的任一數據流,包括:
將所述第一數據流和所述第二數據流存儲到所述共享內存空間中;
當檢測到所述第一數據流和所述第二數據流相同時,刪除所述第一數據流或者所述第二數據流。
本發明實施例所示的方案中,管理設備先將第一數據流和第二數據流存儲到共享內存空間中,然後判斷第一數據流和第二數據流是否相同,保證了第一數據處理任務和第二數據處理任務的實時性;如果第一數據流和第二數據流相同,管理設備刪除第一數據流或者第二數據流,節省了共享內存空間佔用的空間的同時,提高了管理設備執行第一數據處理任務和第二數據處理任務的效率。
在一種可能的設計中,所述方法還包括:
獲取所述第一數據流的第一標識和所述第二數據流的第二標識;
如果所述第一標識和所述第二標識相同,確定所述第一數據流和所述第二數據流相同。
本發明實施例所示的方案中,管理設備根據第一數據流的第一標識和第二數據流的第二標識,確定第一數據流和第二數據流是否相同,提高了確定第一數據流和第二數據流是否相同的準確性。
在一種可能的設計中,所述將所述任一數據流在所述共享內存空間中的位置索引存儲到第一內存空間和第二內存空間中,包括:
如果刪除所述第一數據流,將所述第二數據流在所述共享內存空間中的位置索引存儲到所述第一內存空間和所述第二內存空間中;或者,
如果刪除所述第二數據流,將所述第一數據流在所述共享內存空間中的位置索引存儲到所述第一內存空間和所述第二內存空間中。
本發明實施例所示的方案中,管理設備將未被刪除的第二數據流在共享內存空間中的位置索引存儲到第一內存空間和第二內存空間中,或者,將未被刪除的第一數據流在共享內存空間中的位置索引存儲到第一內存空間和第二內存空間中,使得管理設備執行第一數據處理任務或者第二數據處理任務時,仍能夠根據該位置索引讀取該第一數據流或者第二數據流,提高了管理設備執行第一數據處理任務或者第二數據處理任務的準確性。
在一種可能的設計中,所述將所述任一數據流在所述共享內存空間中的位置索引存儲到第一內存空間和第二內存空間中之後,所述方法還包括:
將所述位置索引的當前引用狀態修改為被所述第一數據處理任務和所述第二數據處理任務引用;
當所述當前引用狀態為無數據處理任務引用時,刪除所述任一數據流。
本發明實施例所示的方案中,第一數據流或者第二數據流的位置索引的當前引用狀態為無數據處理任務引用時,刪除該位置索引對應的第一數據流或者第二數據流,進一步節省了共享內存空間佔用的內存空間,提高了共享內存空間的利用率。
在一種可能的設計中,所述方法還包括:
獲取所述共享內存空間的總容量和所述共享內存空間當前存儲的數據流的數據量;
根據所述總容量和所述數據量,調整所述共享內存空間的總容量大小。
本發明實施例所示的方案中,管理設備可以根據共享內存空間當前存儲的數據流的數據量和該共享內存空間的總容量,調整該共享內存空間的總容量大小,使得管理設備能夠靈活調整該共享內存空間總容量大小,進一步提高了該共享內存空間的利用率。
第二方面,本發明實施例提供了一種數據流處理裝置,該數據流處理裝置包括至少一個單元,至少一個單元用於實現上述第一方面或第一方面中任意一種可能的實現方式所提供的數據流處理方法。
第三方面,本發明實施例提供了一種數據流處理設備,該數據流處理設備包括:處理器和存儲器,用於執行上述第一方面或第一方面中任意一種可能的實現方式所提供的數據流處理方法。
第四方面,本發明實施例提供了一種計算機存儲介質,用於存儲上述數據流處理所用的計算機軟體指令,其包含用於執行上述方面為管理設備所涉及的程序。
上述本發明實施例第二到第四方面所獲得的技術效果與第一方面中對應的技術手段獲得的技術效果近似,在這裡不再贅述。
綜上所述,本發明實施例提供的技術方案帶來的有益效果是:
本發明實施例所示的方案中,管理設備接收第一數據流和第二數據流後,如果第一數據流和第二數據流相同,管理設備在共享內存空間中存儲第一數據流和第二數據流中的任一數據流,並將該任一數據流在共享內存空間中的位置索引存儲到第一內存空間和第二內存空間中,當執行第一數據處理任務或第二數據處理任務時,根據該位置索引,從該共享內存空間中讀取第一數據流或者第二數據流。由於共享內存空間中只存儲了第一數據流和第二數據流中的任一數據流,避免了相同數據流的重複存儲,從而節省了內存空間,提高了內存空間的利用率。
附圖說明
圖1是本發明實施例提供的一種數據流處理的實施環境圖;
圖2是本發明實施例提供的一種管理設備的結構方框圖;
圖3是本發明實施例提供的一種數據流處理的方法流程圖;
圖4是本發明實施例提供的一種共享內存空間結構圖;
圖5是本發明實施例提供的一種確定共享內存空間的示例圖;
圖6是本發明實施例提供的一種共享內存空間示意圖;
圖7是本發明實施例提供的一種共享內存空間示意圖;
圖8是本發明實施例提供的一種數據流處理的裝置方框圖;
圖9是本發明實施例提供的一種數據流處理的裝置方框圖;
圖10是本發明實施例提供的一種數據流處理的裝置方框圖;
圖11是本發明實施例提供的一種數據流處理的裝置方框圖;
圖12是本發明實施例提供的一種數據流處理的裝置方框圖;
圖13是本發明實施例提供的一種數據流處理的裝置方框圖。
具體實施方式
為使本發明實施例的目的、技術方案和優點更加清楚,下面將結合附圖對本發明實施方式作進一步地詳細描述。
圖1是本發明實施例提供的一種數據流處理的實施環境圖。實施環境包括管理設備、共享內存空間、第一數據處理任務和第二數據處理任務。該管理設備用於接收第一數據處理任務的第一數據流和第二數據處理任務的第二數據流,並根據該第一數據流執行第一數據處理任務,根據該第二數據流執行第二數據處理任務。該共享內存空間用於存儲第一數據流和第二數據流,管理設備接收第一數據流和第二數據流後,將該第一數據流和第二數據流存儲在共享內存空間中;第一數據流為管理設備執行第一數據處理任務時引用的數據流,第二數據流為管理設備執行第二數據處理任務時引用的數據流。其中,第一數據處理任務可以為一個或多個數據處理任務,第二數據處理任務也可以為一個或多個數據處理任務。
管理設備執行第一數據處理任務或者第二數據處理任務之前,如果第一數據處理任務或者第二數據處理任務是當前任務隊列的第一個數據處理任務,管理設備創建一個共享存儲空間,該當前任務隊列為第一數據處理任務和第二數據處理任務所在的任務隊列,該共享內存空間用於存儲當前任務隊列中第一數據處理任務的第一數據流和第二數據處理任務的第二數據流。
如圖1所示,管理設備執行第一數據處理任務或者第二數據處理任務時,將第一數據流和第二數據流存儲在共享內存空間中,並為第一數據處理任務分配一個第一內存空間和第一窗口計算算子邏輯,為第二數據處理任務分配一個第二內存空間和第二窗口計算算子邏輯。其中,該第一內存空間用於存儲該第一數據流在共享內存空間中的第一位置索引;該第二內存空間用於存儲該第二數據流在共享內存空間中的第二位置索引(為了區分第一數據流對應的位置索引和第二數據流對應的位置索引,將第一數據流對應的位置索引稱為第一位置索引,將第二數據流對應的位置索引稱為第二位置索引);第一窗口計算算子邏輯用於管理設備根據該第一窗口計算算子邏輯對第一數據流進行處理,第二窗口計算算子邏輯用於管理設備根據該第二窗口計算算子邏輯對第二數據流進行處理。
為了節省內存空間,提高空間利用率,如果共享存儲空間中存儲的第一數據流和第二數據流相同,管理設備在共享內存空間中刪除第一數據流或者第二數據流;同時,管理設備將未刪除的第一數據流的第一位置索引的當前引用狀態修改為被第一數據處理任務和第二數據處理任務引用,或者,將未刪除的第二數據流的第二位置索引的當前引用狀態修改為被第一數據處理任務和第二數據處理任務引用。
當第一位置索引的當前引用狀態或者第二位置索引的當前引用狀態為無數據處理任務引用時,刪除第一數據流或者第二數據流。管理設備執行第一數據處理任務或者第二數據處理任務結束時,如果第一數據處理任務或者該第二數據處理任務是當前任務隊列的最後一個數據處理任務,管理設備銷毀該共享內存空間。
其中,該管理設備可以為具備數據流處理功能的處理器、控制器等,也可以為集成在計算機或其它設備上的具備數據流處理功能的處理模塊、控制模塊等。
參見圖2,其示出了本發明實施例提供的一種管理設備1,該管理設備1包括收發器11和存儲器12,該管理設備1還可以包括處理器13和網絡接口14。其中,存儲器12和網絡接口14分別與處理器13連接;存儲器12用於存儲程序代碼,程序代碼包括計算機操作指令,處理器13和收發器11用於執行存儲器12中存儲的程序代碼,用於實現數據流的相關處理,並可以通過網絡接口14與基站或其他管理設備進行交互。
處理器13包括一個或者一個以上處理核心。處理器13通過運行軟體程序以及單元,從而執行下述數據流處理的方法。
在一個可能的設計中,該管理設備1還可以包括總線15等部件。其中,存儲器12與網絡接口14分別通過總線15與處理器13和收發器11相連。
存儲器12可用於存儲軟體程序以及單元。具體的,存儲器12可存儲作業系統121、至少一個功能所需的應用程式單元122。作業系統121可以是實時作業系統(Real Time eXecutive,RTX)、LINUX、UNIX、WINDOWS或OS X之類的作業系統。
本發明實施例提供了一種數據流處理的方法,該方法的執行主體為管理設備。參見圖3,該方法包括:
步驟301:管理設備接收第一數據處理任務的第一數據流,以及,接收第二數據處理任務的第二數據流。
本步驟中,由於管理設備在窗口中執行第一數據處理任務和第二數據處理任務,因此,本步驟中管理設備接收第一數據處理任務的第一數據流,以及,接收第二數據處理任務的第二數據流的步驟可以為:管理設備檢測第一數據處理任務的第一數據流是否進入第一數據處理任務的窗口,以及,管理設備檢測第二數據處理任務的第二數據流是否進入第二數據處理任務的窗口;當檢測到第一數據流進入第一數據處理任務的窗口時,管理設備接收第一數據流,以及,當檢測到第二數據流進入第二數據處理任務的窗口時,管理設備接收第二數據流。
管理設備接收第一數據流和第二數據流後,通過以下步驟302,將第一數據流和/或第二數據流存儲到共享內存空間中。
步驟302:如果第一數據流和第二數據流相同,管理設備在共享內存空間中存儲第一數據流和第二數據流中的任一數據流,管理設備將該任一數據流在共享內存空間中的位置索引存儲到第一內存空間和第二內存空間中,該第一內存空間為該第一數據處理任務的內存空間,該第二內存空間為該第二數據處理任務的內存空間。
管理設備接收到第一數據流和第二數據流時,管理設備可以先將第一數據流和第二數據流存儲到共享內存空間中,即刻開始執行第一數據處理任務和第二數據處理任務,並在後臺檢測第一數據流和第二數據流是否相同,從而進行數據流合併,也即以下第一種實現方式。管理設備還可以先只將第一數據流和第二數據流中的一個數據流存儲到共享內存空間中,檢測另一個數據流和共享內存空間中已存儲的數據流是否相同,從而確定是否存儲另一個數據流,也即以下第二種實現方式。
對於第一種實現方式,如圖4所示,本發明實施例中,為了提高第一內存空間和第二內存空間的利用率,管理設備將第一數據流和第二數據流存儲到共享存儲空間,將該第一數據流的第一位置索引存儲到該第一內存空間,將該第二數據流的第二位置索引存儲到該第二內存空間。
本發明實施例中,由於管理設備分別將第一數據處理任務的第一數據流和第二數據處理任務的第二數據流存儲在共享內存空間中,而當前任務隊列中第一數據處理任務和第二數據處理任務可能引用同一數據流,導致共享內存空間中重複存儲數據流,為了提高共享存儲空間的空間利用率,管理設備檢測第一數據流和第二數據流是否相同,如果第一數據流和第二數據流相同,管理設備在將第一數據流和第二數據流合併為一個數據流。
本發明實施例的一種可能實現方式中,管理設備在共享內存空間中存儲第一數據流和第二數據流中的任一數據流時,為了保證第一數據處理任務和第二數據處理任務的實時性,管理設備接收第一數據流和第二數據流後,先將該第一數據流和該第二數據流存儲到共享存儲空間中,即刻開始執行第一數據處理任務和第二數據處理任務,並在後臺檢測第一數據流和第二數據流是否相同,如果相同,管理設備刪除第一數據流或者第二數據流,進而將共享存儲空間中重複存儲的多個數據流合併為一個數據流,提高了共享存儲空間的空間利用率。
因此,本步驟中,如果第一數據流和第二數據流相同,管理設備在共享內存空間中存儲第一數據流和第二數據流中的任一數據流的步驟可以通過以下步驟3021-3024實現。
步驟3021:管理設備將第一數據流和第二數據流存儲到該共享內存空間中。
本步驟中,管理設備接收第一數據流和第二數據流後,直接將該第一數據流和第二數據流存儲到共享內存空間中,同時,管理設備將該第一數據流的第一位置索引存儲到該第一內存空間,將該第二數據流的第二位置索引存儲到該第二內存空間,第一位置索引用於管理設備根據該第一位置索引,從共享內存空間中讀取該第一位置索引對應的第一數據流;第二位置索引用於管理設備根據該第二位置索引,從共享內存空間中讀取該第二位置索引對應的第二數據流。
需要說明的是,數據流的位置索引可以為該數據流在共享內存空間的存儲路徑,管理設備存儲了每個數據流的位置索引和該數據流的第三標識的對應關係,便於管理設備根據該數據流的第三標識,從第三標識和位置索引的對應關係中,獲取該數據流的位置索引,進而根據該位置索引,從共享內存空間中讀取該數據流。其中,該第三標識可以根據用戶需要設置並更改,本發明實施例對此不做具體限定,例如,該第三標識可以為數據流的編號、名稱等。
本發明實施例的一種可能實現方式中,如圖4所示,管理設備在共享內存空間中存儲第一位置索引的當前引用狀態,該當前引用狀態用於記錄共享存儲空間中的該第一數據流被當前任務隊列引用的狀態;管理設備在共享內存空間中存儲第二位置索引的當前引用狀態,該當前引用狀態用於記錄共享存儲空間中的該第二數據流被當前任務隊列引用的狀態。
因此,管理設備將第一數據流的第一位置索引存儲到第一內存空間的步驟可以為:管理設備將第一數據流的第一位置索引存儲到第一內存空間,將該第一位置索引的當前引用狀態存儲到共享內存空間。
需要說明的是,位置索引的當前引用狀態可以用引用計數表示,管理設備讀取該數據流之前,該數據流的位置索引的當前引用狀態為初始狀態,該數據流的位置索引的當前引用狀態為無數據處理任務引用。對應的,管理設備將該數據流的引用計數記為初始數值。
當該位置索引對應的數據流被數據處理任務引用時,管理設備將該位置索引的引用計數增加第一預設數值。因此,管理設備將該第一位置索引的當前引用狀態存儲到共享內存空間的步驟可以為:管理設備將該第一位置索引的引用計數增加第一預設數值,在共享內存空間中存儲該第一位置索引的引用計數。
當該位置索引對應的數據流被數據處理任務引用結束時,管理設備將位置索引的引用計數減少第一預設數值。
其中,引用計數的初始數值、第一預設數值可以根據用戶需要設置並更改,本發明實施例對此不作具體限定。
例如,初始數值可以為0,第一預設數值可以為1,管理設備讀取第一數據流之前,第一位置索引的引用計數為0,第一數據流被數據處理任務引用時,管理設備將該第一位置索引的引用計數增加1,此時,該第一位置索引的引用計數為1。
再如,如果第一位置索引的引用計數為3,即第一數據流當前被當前任務隊列中的3個數據處理任務引用,當第一數據流出被3個數據處理任務中其中一個數據處理任務引用結束時,管理設備將第一位置索引的引用計數減1,此時,第一位置索引的引用計數變為2。
步驟3022:管理設備獲取第一數據流的第一標識和第二數據流的第二標識,如果該第一標識和該第二標識相同,確定第一數據流和第二數據流相同。
本步驟中,第一數據流的第一標識可以為該第一數據流的ID(Identity,身份標識號碼),第二數據流的第二標識可以為該第二數據流的ID。管理設備可以通過標識算法計算出第一數據流的ID和第二數據流的ID,通過該第一數據流的ID和第二數據流的ID確定第一數據流和第二數據流是否相同。
因此,本步驟可以為:管理設備根據標識算法,計算得到第一數據流的ID和第二數據流的ID,判斷第一數據流的ID和第二數據流的ID是否相同,如果第一數據流的ID和第二數據流的ID相同,確定第一數據流和第二數據流相同;如果第一數據流的ID和第二數據流的ID不相同,確定第一數據流和第二數據流不相同。
其中,該標識算法可以根據用戶需要設置並更改,本發明實施例對該標識算法不作具體限定。例如,該標識算法可以為哈希算法,還可以為CRUSH(Controlled Replication Under Scalable Hashing,可擴展的偽隨機數據分布)算法。
例如,以哈希算法為例進行說,數據流的ID可以為該數據流的哈希值,此時,管理設備獲取第一數據流的第一標識和第二數據流的第二標識,如果該第一標識和該第二標識相同,確定第一數據流和第二數據流相同的步驟可以為:管理設備通過該哈希算法,分別計算出第一數據流的哈希值和第二數據流的哈希值,管理設備判斷第一數據流的哈希值和第二數據流的哈希值是否相同,如果第一數據流的哈希值和第二數據流的哈希值相同,確定第一數據流和第二數據流相同;如果第一數據流的哈希值和第二數據流的哈希值不相同,確定第一數據流和第二數據流不相同。
步驟3023:當檢測到第一數據流和第二數據流相同時,管理設備刪除第一數據流或者第二數據流。
本步驟中,為了避免因重複存儲導致共享內存空間存儲的數據流冗餘,提高共享內存空間的利用率,管理設備可以刪除第一數據流,只存儲第二數據流,或者,刪除第二數據流,只存儲第一數據流。
步驟3024:如果刪除該第一數據流,管理設備將該第二數據流在該共享內存空間中的位置索引存儲到該第一內存空間和該第二內存空間中,或者,如果刪除該第二數據流,管理設備將該第一數據流在該共享內存空間中的位置索引存儲到該第一內存空間和該第二內存空間中。
本步驟中,由於管理設備將第一數據流和第二數據流合併為一個數據流,因此,管理設備需要更新第一數據流和第二數據流中被刪除的第一數據流的第一位置索引,或者被刪除的第二數據流的第二位置索引。
如果管理設備刪除第一數據流,本步驟可以為:管理設備刪除第一內存空間中的第一位置索引,將第二位置索引作為第一數據流的位置索引存儲在第一內存空間中。
如果管理設備刪除第二數據流,本步驟可以為:管理設備刪除第二內存空間中的第二位置索引,將第一位置索引作為第二數據流的位置索引存儲在第二內存空間中。
這樣,即使在共享內存空間中刪除了第一數據流(或者第二數據流),管理設備仍能根據第一內存空間(或者第二內存空間)中的更新後的位置索引從共享內存空間中讀取第一數據處理任務(或者第二數據處理任務)需要引用的數據流。
本發明實施例的一種可能實現方式中,由於管理設備在共享內存空間中存儲了第一位置索引的當前引用狀態和第二位置索引的當前引用狀態,因此,管理設備需要在第一位置索引和第二位置索引中,更新未被刪除的第一位置索引的當前引用狀態,或者更新未被刪除的第二位置索引的當前引用狀態。本步驟可以為:管理設備將該第一位置索引的當前引用狀態修改為被第一數據處理任務和第二數據處理任務引用,或者將該第二位置索引的當前引用狀態修改為被第一數據處理任務和第二數據處理任務引用。
由於當前引用狀態可以用引用計數表示,如果管理設備刪除第一數據流,本步驟可以為:管理設備將第二位置索引的引用計數增加第一預設數值。
如果管理設備刪除第二數據流,本步驟可以為:管理設備將第一位置索引的引用計數增加第一預設數值。
例如,以引用計數的初始數值為0、第一預設數值為1為例,第一位置索引和第二位置索引的當前引用計數均為1,如果管理設備刪除第一數據流,則需將第二位置索引的引用計數更新為2。
本發明實施例的一種可能實現方式中,為了提高共享內存空間的利用率,管理設備清理共享內存空間中的無數據處理任務引用的第一數據流(或者第二數據流)。本步驟可以為:當第一位置索引的當前引用狀態為無數據處理任務引用時,管理設備刪除第一數據流,或者,當第二位置索引的當前引用狀態為無數據處理任務引用時,管理設備刪除第二數據流。
本步驟中,為了提高管理設備對無數據處理任務引用的第一數據流(或者第二數據流)的清理效率,管理設備無需實時檢測共享內存空間中每個數據流對應的位置索引的當前引用狀態,僅需通過檢測被第一數據處理任務或者第二數據處理任務引用結束的第一數據流(或者第二數據流),獲取該第一位置索引(或者第二位置索引)的當前引用狀態,進而確定是否刪除第一數據流(或者第二數據流)。
因此,當第一位置索引當前引用狀態為無數據處理任務引用時,管理設備刪除第一數據流的步驟可以為:當檢測到第一數據流被第一數據處理任務引用結束,即檢測到第一數據流出第一數據處理任務的窗口時,管理設備獲取第一位置索引的當前引用狀態,根據該第一位置索引的當前引用狀態,確定是否刪除第一數據流:如果該第一位置索引的當前引用狀態為無數據處理任務引用,管理設備刪除該第一數據流;如果該第一位置索引的當前引用狀態為仍被數據處理任務引用,管理設備不刪除該第一數據流。
當第二位置索引當前引用狀態為無數據處理任務引用時,管理設備刪除第二數據流的步驟同上述步驟的實現方式一致,此處不再一一贅述。
其中,用引用計數表示該第一位置索引的當前引用狀態時,本步驟中,管理設備根據該第一位置索引的當前引用狀態,確定是否刪除第一數據流的步驟可以為:管理設備檢測第一位置索引的引用計數是否與初始數值相同,如果該第一位置索引的引用計數與初始數值相同,說明該第一位置索引的當前引用狀態為無數據處理任務引用,管理設備刪除第一數據流;如果該第一位置索引的引用計數與初始數值不相同,說明該第一位置索引的當前引用狀態為仍被數據處理任務引用,管理設備不刪除第一數據流。
例如,以引用計數的初始數值為0、第一預設數值為1為例,管理設備檢測到第一數據流出第一數據處理任務的窗口時,管理設備獲取第一位置索引的引用計數,如果第一位置索引的引用計數為0,管理設備刪除該第一數據流;如果第一位置索引的引用計數為1,說明該第一數據流仍被當前任務隊列中的其它數據處理任務引用,管理設備不刪除該第一數據流。
對於第二種實現方式,本步驟可以為:管理設備將第一數據流存儲到共享內存空間中;檢測第二數據流和共享內存空間中已存儲的第一數據流是否相同,如果相同,丟棄第二數據流,將第一數據流在共享內存空間中的第一位置索引存儲到第一內存空間和第二內存空間中。如果不相同,將第二數據流也存儲到共享內存空間中,將第一數據流在共享內存空間中的第一位置索引存儲到第一內存空間中,將第二數據流在共享內存空間中的第二位置索引存儲到第二內存空間中;或者,
管理設備將第二數據流存儲到共享內存空間中;檢測第一數據流和共享內存空間中已存儲的第二數據流是否相同;如果相同,丟棄第一數據流,將第二數據流在共享內存空間中的第二位置索引存儲到第一內存空間和第二內存空間中。如果不相同,將第二數據流也存儲到共享內存空間中,將第一數據流在共享內存空間中的第一位置索引存儲到第一內存空間中,將第二數據流在共享內存空間中的第二位置索引存儲到第二內存空間中。
其中,管理設備確定第一數據流和第二數據流是否相同的步驟與步驟3022實現方式一致,此處不再一一贅述。
本發明實施例的一種可能實現方式中,為了進一步提高共享內存空間的利用率,管理設備將第一數據流和第二數據流中的任一數據流存儲到共享內存空間後,還可以繼續將該任一數據流中的相同數據合併為一個數據。
本發明實施例的一種可能實現方式中,為了提高數據流處理的可靠性,防止數據丟失造成數據流處理中斷,管理設備接收到第一數據流和第二數據流時,還可以將該第一數據流和第二數據流持久化存儲到第三內存空間中。當共享內存空間中存儲的第一數據流或者第二數據流丟失時,管理設備從該第三內存空間中獲取該第一數據流或者第二數據流。
在本步驟之前,管理設備需要確定一個共享內存空間,在確定共享內存空間時,管理設備可以根據第一數據流的數據屬性和第二數據流的數據屬性選擇一個共享內存空間。管理設備還可以為每個任務隊列創建一個共享內存空間。
管理設備根據第一數據流的數據屬性和第二數據流的數據屬性選擇一個共享內存空間的步驟可以為:管理設備獲取第一數據流的第一數據屬性和第二數據流的第二數據屬性,根據該第一數據屬性,確定該第一數據屬性對應的共享內存空間,以及,根據該第二數據屬性,確定該第二數據屬性對應的共享內存空間。
其中,第一數據屬性可以為該第一數據流的某一數據特徵,第二數據屬性可以為第二數據流的某一數據特徵。例如,第一數據流和第二數據流均為交易產生的數據流,如圖5所示,該數據屬性可以為交易地,管理設備獲取第一數據流的第一數據屬性和第二數據流的第二數據屬性,將第一數據流和第二數據流中第一數據屬性第二數據屬性均為國外的數據存儲到第一共享內存空間中,將第一數據流和第二數據流中第一數據屬性和第二數據屬性均為中國的數據流存儲到第二共享內存空間中。這樣改進後,第一共享內存空間存儲的交易地為國外的多個數據流中,相同數據流可以合併為一個數據流。例如,管理設備將第一共享內存空間中交易金額均為20000的兩個數據流合併為一個數據流,這樣,管理設備只需存儲相同數據中的一個數據流,提高了共享內存空間的使用率。
本發明實施例提供的一種可能的設計中,管理設備為每個任務隊列創建一個共享內存空間,管理設備將該當前任務隊列中第一數據處理任務的第一數據流和第二數據處理任務的第二數據流存儲到該共享內存空間中。
管理設備檢測第一數據處理任務或者第二數據處理任務是否為當前任務隊列的第一個數據處理任務,如果第一數據處理任務或者第二數據處理任務為當前任務隊列的第一個數據處理任務,管理設備創建共享內存空間,後續步驟中,管理設備將第一數據流和第二數據流存儲到該共享內存空間中;如果第一數據處理任務或者第二數據處理任務不是當前任務隊列的第一個數據處理任務,說明管理設備執行當前任務隊列的第一個數據處理任務時,已經創建了共享內存空間,後續步驟中,管理設備將第一數據流和第二數據流存儲到已經創建的共享內存空間中。
如圖6所示,該當前任務隊列可以運行在多個進程間,一個進程可以對應運行一個數據處理任務,當前任務隊列的第一個數據處理任務對應的進程為多個進程中的首個進程。
如圖7所示,該當前任務隊列也可以運行在同一進程的不同線程間,一個線程可以對應運行一個數據處理任務,當前任務隊列的第一個數據處理任務對應的線程為多個線程中的首個線程。
以該當前任務隊列運行在多個進程間為例進行說明,管理設備執行第一數據處理任務或者第二數據處理任務時,啟動該第一數據處理任務或者第二數據處理任務所在的進程。
因此,管理設備檢測第一數據處理任務或者第二數據處理任務是否為當前任務隊列的第一個數據處理任務的步驟可以為:管理設備通過進程間通信,檢測第一數據處理任務(或者第二數據處理任務)所在進程是否為首個進程;當檢測到該進程不是多個進程中的首個進程時,管理設備確定第一數據處理任務(或者第二數據處理任務)不是當前任務隊列的第一個數據處理任務;當檢測到該進程為多個進程中的首個進程時,管理設備確定第一數據處理任務(或者第二數據處理任務)為當前任務隊列的第一個數據處理任務。
當該多個數據處理任務運行在一個進程的多個線程間時,管理設備檢測第一數據處理任務或者第二數據處理任務是否為當前任務隊列的第一個數據處理任務的步驟與該當前任務隊列運行在多個進程間時的實現方式一致,此處不再一一贅述。
本步驟中,管理設備預先存儲了該共享內存空間的預設容量,因此,管理設備創建共享內存空間的步驟可以為:管理設備獲取該共享內存空間的預設容量,根據該預設容量,創建總容量為該預設容量的共享內存空間。
其中,該預設容量可以根據用戶需要設置並更改,本發明實施例對此不作具體限定,例如,該預設容量可以為500MB(MByte,兆字節)、50GB(Gigabyte,吉字節)、2TB(Terabyte,太字節)等。
步驟303:當執行該第一數據處理任務或該第二數據處理任務時,管理設備根據該位置索引,從該共享內存空間中讀取該任一數據流,根據該任一數據流執行該第一數據處理任務或該第二數據處理任務。
本步驟中,該位置索引為更新後的第一數據流的第一位置索引或第二數據流的第二位置索引;該任一數據流為第一數據流和第二數據流中未被刪除的數據流。
如果管理設備刪除第一數據流,管理設備根據第二數據流的第三標識,從第三標識和位置索引的對應關係中,獲取該第二數據流的第二位置索引,根據該第二位置索引,讀取第二數據流,進而,管理設備根據第二數據處理任務的第二窗口計算算子邏輯,對該第二數據流進行處理。
本發明實施例的一種可能實現方式中,為了提高該共享內存空間的使用率,管理設備可以實時檢測該共享內存空間中存儲的數據流的數據量,根據該數據量,實時調整該共享內存空間的總容量大小。本步驟可以為:管理設備獲取該共享內存空間的總容量和該共享內存空間當前存儲的數據流的數據量,管理設備根據該總容量和該數據量,調整該共享內存空間的總容量大小。
本步驟中,管理設備根據該總容量和該數據量,調整該共享內存空間的總容量大小時,管理設備可以通過計算當前存儲的數據流的數據量和共享內存空間當前的總容量的比值,調整該共享內存空間的總容量大小,使得該比值的大小在預設數值範圍內;其中比值為管理設備用該當前存儲的數據量除以該總容量得到該數據量和該總容量的比值。
因此,管理設備根據該總容量和該數據量,調整該共享內存空間的總容量大小的步驟可以為:管理設備計算該數據量和該總容量的比值,當該比值大於第二預設數值時,增大該共享內存空間當前的總容量,當該比值小於第三預設數值時,減小該共享內存空間當前的總容量。
其中,第二預設數值、第三預設數值可以根據需要設備並更改,本發明實施例對此不作具體限定。
例如,以第二預設數值為80%、第三預設數值為40%為例,該共享內存空間當前的總容量大小為500GB,如果共享內存空間當前存儲的數據流的數據量為100GB,此時,需減小該總容量的大小,可以將該總容量減小至200GB;如果共享內存空間當前存儲的數據流的數據量為450GB,此時,需增大該總容量的大小,可以將該總容量增大至600GB。
本發明實施例中,管理設備可以每隔預設周期獲取該共享內存空間當前存儲的數據流的數據量和共享內存空間的總容量。該預設周期可以根據用戶需要設置並更改,本發明實施例對此不作具體限定。例如,該預設周期可以為2秒、10毫秒等。
本發明實施例中,為了減少共享內存空間佔用的內存,管理設備執行完第一數據處理任務和第二數據處理任務時,可以通過以下步驟a和b,當檢測到該第一數據處理任務或者該第二數據處理任務是該當前任務隊列的最後一個數據處理任務,管理設備銷毀該共享內存空間,包括:
步驟a:管理設備檢測第一數據處理任務或者第二數據處理任務是否為當前任務隊列的最後一個數據處理任務。
由於該當前任務隊列可以運行在多個進程間,也可以運行在一個進程的多個線程間。該當前任務隊列運行在多個進程間時,當前任務隊列的最後一個數據處理任務對應的進程為多個進程中的最後一個進程;該當前任務隊列運行在一個進程的多個線程間時,當前任務隊列的最後一個數據處理任務對應的線程為多個線程中的最後一個線程。
因此,該當前任務隊列運行在多個進程間時,本步驟可以為:管理設備通過進程間通信,檢測第一數據處理任務(或者第二數據處理任務)所在進程是否為最後一個進程;當檢測到該進程不是多個進程中的最後一個進程時,管理設備確定第一數據處理任務(或者第二數據處理任務)不是當前任務隊列的最後一個數據處理任務;當檢測到該進程為多個進程中的最後一個進程時,管理設備確定第一數據處理任務(或者第二數據處理任務)為當前任務隊列的最後一個數據處理任務,進而,通過以下步驟b,銷毀共享內存空間。
當該多個數據處理任務運行在一個進程的多個線程間時,管理設備檢測第一數據處理任務或者第二數據處理任務是否為當前任務隊列的最後一個數據處理任務的步驟與該當前任務隊列運行在多個進程間時的實現方式一致,此處不再一一贅述。
步驟b:如果第一數據處理任務或者第二數據處理任務是當前任務隊列的最後一個數據處理任務,管理設備銷毀共享內存空間。
本步驟中,當檢測到第一數據處理任務(或者第二數據處理任務)所在進程為最後一個進程時,管理設備銷毀該進程的同時,銷毀該進程對應的共享內存空間,結束。
當檢測到第一數據處理任務(或者第二數據處理任務)所在進程不是最後一個進程時,管理設備直接銷毀該進程,不銷毀該進程對應的共享內存空間,結束。
本發明實施例所示的方案中,管理設備接收第一數據流和第二數據流後,如果第一數據流和第二數據流相同,管理設備在共享內存空間中存儲第一數據流和第二數據流中的任一數據流,並將該任一數據流在共享內存空間中的位置索引存儲到第一內存空間和第二內存空間中,當執行第一數據處理任務或第二數據處理任務時,根據該位置索引,從該共享內存空間中讀取第一數據流或者第二數據流。由於共享內存空間中只存儲了第一數據流和第二數據流中的任一數據流,避免了相同數據流的重複存儲,從而節省了內存空間,提高了內存空間的利用率。
下述為本本發明實施例裝置實施例,可以用於執行本發明實施例方法實施例。對於本發明實施例裝置實施例中未披露的細節,請參照本發明實施例方法實施例。
圖8是本發明實施例提供的一種數據流處理的裝置的結構方框圖,該裝置可以通過軟體、硬體或者兩者的結合實現成為管理設備的部分或者全部。
該裝置包括接收單元401、存儲單元402、讀取單元403和執行單元404;
接收單元401用於執行上述實施例中的步驟301及其可選方案。
存儲單元402用於執行上述實施例中的步驟302及其可選方案。
讀取單元403用於執行上述實施例中的步驟303及其可選方案。
執行單元404用於執行上述實施例中的步驟303及其可選方案。
參見圖9,該裝置還可以包括創建單元405、銷毀單元406;
創建單元405用於執行上述實施例中的步驟301及其可選方案。
銷毀單元406用於執行上述實施例中的步驟303及其可選方案。
參見圖10,該裝置還可以包括第一獲取單元407、第一確定單元408;
第一獲取單元407用於執行上述實施例中的步驟302及其可選方案。
第一確定單元408用於執行上述實施例中的步驟302及其可選方案。
參見圖11,該裝置還可以包括第二獲取單元409、第二確定單元410;
第二獲取單元409用於執行上述實施例中的步驟302及其可選方案。
第二確定單元410用於執行上述實施例中的步驟302及其可選方案。
參見圖12,該裝置還可以包括修改單元411、刪除單元412;
修改單元411用於執行上述實施例中的步驟302及其可選方案。
刪除單元412用於執行上述實施例中的步驟302及其可選方案。
參見圖13,該裝置還可以包括第三獲取單元413、調整單元414;
第三獲取單元413用於執行上述實施例中的步驟303及其可選方案。
調整單元414用於執行上述實施例中的步驟303及其可選方案。
需要說明的是:上述實施例提供的數據流處理的裝置在數據流處理時,僅以上述各功能模塊的劃分進行舉例說明,實際應用中,可以根據需要而將上述功能分配由不同的功能模塊完成,即將裝置的內部結構劃分成不同的功能模塊,以完成以上描述的全部或者部分功能。另外,上述實施例提供的數據流處理的裝置與數據流處理的方法實施例屬於同一構思,其具體實現過程詳見方法實施例,這裡不再贅述。
本領域普通技術人員可以理解實現上述實施例的全部或部分步驟可以通過硬體來完成,也可以通過程序來指令相關的硬體完成,所述的程序可以存儲於一種計算機可讀存儲介質中,上述提到的存儲介質可以是只讀存儲器,磁碟或光碟等。
以上所述僅為本發明的較佳實施例,並不用以限制本發明,凡在本發明的精神和原則之內,所作的任何修改、等同替換、改進等,均應包含在本發明的保護範圍之內。