新四季網

flink簡單理解(深入了解ProcessFunction的狀態操作)

2023-04-17 13:21:35

學習Flink的ProcessFunction過程中,官方文檔中涉及狀態處理的時候,不止一次提到只適用於keyed stream的元素,如下圖紅框所示:

之前寫過一些flink應用,keyed stream常用但不是必須用的,所以產生了疑問:

為何只有keyed stream的元素能讀寫狀態?每個key對應的狀態是如何操作的?Flink的"狀態"先去回顧Flink"狀態"的知識點;官方文檔說就兩種狀態:keyed state和operator state:

如上圖,keyed stream的元素是具有key的特徵,與ProcessFunction的操作狀態時要求匹配,其他steam的元素由於沒有key的特徵,所以也就沒有"狀態"一說了;另一種狀態是Operator State,如下圖,這是和多並行度計算時的算子實例綁定的,例如當前算子消費kafka的某個分區的最新offset,而ProcessFunction是用來處理stream元素的,不會涉及到Operator State:

官方demo

為了學習ProcessFunction就去看官方demo,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html ,簡單說說這個demo的功能:

數據源在不間斷的產生單詞,每個單詞對應一個Tuple2的實例;數據源被keyBy方法轉成KeyedStream,key是Tuple2實例的f0欄位;一個KeyedProcessFunction的子類CountWithTimeoutFunction,被用來處理KeyedStream的每個元素,處理的邏輯:為每個key維護一個狀態,狀態的內容是這個key的出現次數和最後一次出現時間;如果那個key連續一分鐘沒有出現,KeyedProcessFunction就向下遊發送這個元素;

以上就是官方demo的功能,本來是想通過demo來加深認識,結果看完不但沒有明白,反而更暈了,下圖是我對demo代碼的疑惑:

從上圖可見我的疑惑,這裡再複述一下:

入參value是Tuple2類型,假設其f0欄位等於aaa,那麼processElement方法的作用,就是取出aaa的狀態,更新後保存;從代碼上看,state.value返回了aaa的狀態,這個value方法並沒有將aaa作為入參,那怎麼做到返回aaa的狀態呢?如果下一個入參value的f0欄位等於bbb了,這個state.value能返回bbb的狀態嗎?對更新狀態的代碼state.update(current)也是同樣的疑惑;然後又產生了新的疑惑:成員變量state難道是一直在變?每執行一次processElement,都會變成該key對應的state實例?先反思為何會有上述疑惑上述疑惑產生的原因,應該是受到平時使用HashMap的影響,HashMap獲取值就是在調用get方法時指定key,設置值也是在put時指定key,所以看到state.value方法沒有用key做入參就不習慣了要消除這種不適應,要做的第一件事就是提醒自己:processElement是在框架內運行的,很多數據在之前已經由框架準備好了;接下來要做的,就是把框架準備數據的邏輯看一遍,除了弄明白自己的問題,由於ProcessFunction屬於最低階抽象(如下圖的最下方位置),看懂了這些,其實也是在了解DataStream/DataSet API的設計思路:

跟蹤源碼如下圖,讓我們從一個斷點的堆棧開始吧,這是在執行上面demo中的processElement方法之前的一個斷點,可見根源是個線程的run方法,也就是KeyedProcessFunction對應的算子執行任務的線程:

上面的堆棧不必每一層都細看,只關注重要的部分,下圖這段很重要:StreamTask.run方法中,有個無限循環(猜測是每次執行processInput方法都處理KeyedStream的一個元素):

如下圖,StreamOneInputProcessor.processInput方法取出KeyedStream的一個元素,調用processElement方法,並將此元素作為入參,再結合上一幅圖可以看出:在編寫KeyedProcessFunction子類的時候,KeyedStream的每個元素都會作為入參,在調用你重寫的processElement方法時傳進去;這一點,在做ProcessFunction和KeyedProcessFunction開發時都是要格外注意的:

接下來到了最關鍵的地方了,下圖紅框中的streamOperator.setKeyContextElement1(record)會解答我前面的疑惑,一定要進去看個清楚,(後面的黃線上的代碼,您應該猜到了,裡面其實就是調用demo中的processElement方法)

下圖中,AbstractStreamOperator.setKeyContextElement給出了答案:對於KeyedStream的每個元素,都會在這裡算出key,再調用setCurrentKey保存這個key

展開setCurrentKey,如下圖,發現key的保存和當前狀態的存儲策略(StateBackend)有關,我這裡是默認策略HeapKeyedStateBackend

最終,根據當前元素得到的key會在StateBackend的keyContext對象中找地方保存,StateBackend的具體實現和Flink設置有關,我這裡是保存到了InternalKeyContextImpl實例的currentKey變量中:

代碼讀到這裡,對我前面的疑惑,您應該能推測出答案了:state.value裡面會通過StateBackend的keyContext取出剛才保存的key,接下來就能像HashMap那樣根據key查出該key的狀態了,接下來是愉快的印證我們推測的過程;在state.value代碼位置打斷點一次看個明白,如下圖,果然,state裡面有StateBackend的keyContext對象的引用,訪問剛才保存的key就不成問題了:

展開state.value方法如下,簡單明了,直接拿keyContext保存的key作為入參去取對應的狀態:

再展開上面的get方法,可見最終是從stateMap中取得的,而這個stateMap的具體實現是CopyOnWriteStateMap類型的實例:

代碼讀到這裡,只剩最後一處需要印證了:更新狀態的state.update(current)方法,應該也是以StateBackend的keyContext中的key作為自己的key,再將入參的current作為value,更新到stateMap中,來吧,一起印證這個推測;展開方法,看到的是stateTable.put方法(前面剛看過stateTable的get方法,穩了):

stateTable.put方法裡面和前面的get方法一樣,直接拿keyContext保存的key作為自己的key:

最終是調用了stateMap.put方法,將數據保存在CopyOnWriteStateMap實例中:

得益於Flink代碼自身規範、清晰的設計和實現,再加上IDEA強大的debug功能,整個閱讀和分析過程十分順利,這其中的收穫會逐漸在今後深入學習DataStreamAPI的過程中見效;

最後,根據上面的分析過程繪製了一幅簡陋的流程圖,希望能幫助您加快理解:

歡迎關注我的公眾號:程式設計師欣宸

,
同类文章
葬禮的夢想

葬禮的夢想

夢見葬禮,我得到了這個夢想,五個要素的五個要素,水火只好,主要名字在外面,職業生涯良好,一切都應該對待他人治療誠意,由於小,吉利的冬天夢想,秋天的夢是不吉利的
找到手機是什麼意思?

找到手機是什麼意思?

找到手機是什麼意思?五次選舉的五個要素是兩名士兵的跡象。與他溝通很好。這是非常財富,它擅長運作,職業是仙人的標誌。單身男人有這個夢想,主要生活可以有人幫忙
我不怎麼想?

我不怎麼想?

我做了什麼意味著看到米飯烹飪?我得到了這個夢想,五線的主要土壤,但是Tu Ke水是錢的跡象,職業生涯更加真誠。他真誠地誠實。這是豐富的,這是夏瑞的巨星
夢想你的意思是什麼?

夢想你的意思是什麼?

你是什​​麼意思夢想的夢想?夢想,主要木材的五個要素,水的跡象,主營業務,主營業務,案子應該抓住魅力,不能疏忽,春天夢想的吉利夢想夏天的夢想不幸。詢問學者夢想
拯救夢想

拯救夢想

拯救夢想什麼意思?你夢想著拯救人嗎?拯救人們的夢想有一個現實,也有夢想的主觀想像力,請參閱週宮官方網站拯救人民夢想的詳細解釋。夢想著敵人被拯救出來
2022愛方向和生日是在[質量個性]中

2022愛方向和生日是在[質量個性]中

[救生員]有人說,在出生88天之前,胎兒已經知道哪天的出生,如何有優質的個性,將走在什麼樣的愛情之旅,將與生活生活有什么生活。今天
夢想切割剪裁

夢想切割剪裁

夢想切割剪裁什麼意思?你夢想切你的手是好的嗎?夢想切割手工切割手有一個真正的影響和反應,也有夢想的主觀想像力。請參閱官方網站夢想的細節,以削減手
夢想著親人死了

夢想著親人死了

夢想著親人死了什麼意思?你夢想夢想你的親人死嗎?夢想有一個現實的影響和反應,還有夢想的主觀想像力,請參閱夢想世界夢想死亡的親屬的詳細解釋
夢想搶劫

夢想搶劫

夢想搶劫什麼意思?你夢想搶劫嗎?夢想著搶劫有一個現實的影響和反應,也有夢想的主觀想像力,請參閱週恭吉夢官方網站的詳細解釋。夢想搶劫
夢想缺乏缺乏紊亂

夢想缺乏缺乏紊亂

夢想缺乏缺乏紊亂什麼意思?你夢想缺乏異常藥物嗎?夢想缺乏現實世界的影響和現實,還有夢想的主觀想像,請看官方網站的夢想組織缺乏異常藥物。我覺得有些東西缺失了