基於nio和io的面向發布訂閱系統的傳輸方法
2023-10-06 02:04:14 1
專利名稱::基於nio和io的面向發布訂閱系統的傳輸方法
技術領域:
:本發明涉及數據傳輸領域,主要是一種基於NIO和IO的面向發布訂閱系統的傳輸方法。
背景技術:
:發布/訂閱(publish/subscribe,簡稱pub/sub)系統是一種分布式中間件系統,是隨著複雜大規模的網絡計算而出現的一種技術,用於為分布式系統中的各參與者提供一種鬆耦合的通信方式。在大規模、高度動態的分布式系統中,參與者在時間以及空間位置上通常都是動態變化的,比如在出差途中使用掌上電腦等行動裝置,頻繁的加入和退出網絡,因此需要一種鬆耦合的方式參與消息交互和協同工作。而發布/訂閱系統的鬆耦合的特性,恰能解決這個問題。如圖1所示,一個發布/訂閱系統中一般由消息生產者(publisher),消息消費者(subscriber)和事件通知服務組成。在發布/訂閱系統中,消息通常被稱為"事件",消息生產者將"事件"發送給事件通知服務;消息消費者則向事件通知服務發出一個"訂閱條件",表示對系統的哪些事件感興趣;而事件通知服務則保證將發布的事件及時、可靠地送給所有感興趣的消息消費者。而發布者和消費者在進行消息交互時,都不需要知道對方的存在,使得它們在時間和空間被完全解耦,從而能夠滿足大規模、動態的分布式系統的需要。而在傳統的發布/訂閱系統中,事件的生產者和消費者之間的傳輸多採用基於java的老式IO(輸入/輸出)方式。在這種方式中,服務端將在開始時建立一個監聽socket(套接字),socket是主要用來進行客戶端和伺服器端通訊的工具,當簡歷一個監聽socket後,就等待客戶端請求,客戶連接後,產生會話,會話完成後,關閉連接;而客戶端通過socket對服務端發出連接請求,一旦連接成功,打開會話,會話完成,關閉socket。在這種傳輸系統中,系統的瓶頸通常在10讀寫方面,在打開一個IO通道後,read方法將一直等待在埠一邊讀取字節內容,如果沒有內容進來,read也是一直等待,這會影響我們程序繼續做其他事情,那麼改進做法就是開設線程,讓線程去等待,但是這樣做也是相當耗費資源的,尤其是在大規模分布式系統中,如果我們要接受大量的客戶端的連接,必須要開設大量的線程,而這將是對資源的嚴重浪費。而基於java的NIO(newIO)的出現,則改變了這種現狀,使網絡傳輸的協議不再集中在IO讀寫上。NIO最大的特點有兩個一個是塊狀讀寫;另外一個是多路復用技術(multiplex)。第一個特點改變了老式10中,每次讀寫只有一個字節(stream)或兩個字節(reader)的方式,提高了讀寫的速度;第二個特點改變了原來讀寫socket時候阻塞的同步狀況,使socket的讀寫具有異步的功能。JavaNIO非阻塞技術實際是採取Reactor模式,或者說是Observer模式為我們監察IO埠,如果有內容進來,會自動通知我們,這樣,我們就不必開啟多個線程死等,從外界看,實現了流暢的IO讀寫,不堵塞了。NK)有一個主要的類Selector,這個類就好像一個觀察者,只要我們把需要探知的socketchannel(套接字通道)告訴Selector,我們接著做別的事情,當有事件發生時,他會通知我們,傳回一組SelectionKey,我們讀取這些Key,就會獲得我們剛剛註冊過的socketchannel,然後,我們從這個Channel中讀取數據,放心,包準能夠讀到,接著我們可以處理這些數據。在我們使用NIO的時候,cpu時間多會消耗在Selector的select操作上(NIO的多路復用),而select操作主要調用的是poll函數,這個操作是一個輪詢socket的同步操作,它的主要功能就是不斷地去查詢向該選擇器註冊的socket,看看有沒有可做的操作,如有,則返回,沒有,繼續不斷地輪詢。也就說,分配的cpu的時間基本上被消耗在poll上了。之所以如此,是因為沒有東西可以寫入或從socket中讀取;那麼,就應該給"提供讀寫數據的線程"更多的cpu時間,使得每一次的poll都有可作的操作,使時間耗費在讀寫socket之上,而不是poll之上。然而,從實際情況中看來,poll線程顯然比"提供讀寫數據的線程(wr線程)"佔用了更多的cpu時間,假設他們的優先級相同,也假設作業系統是公平的分配cpu時間,那麼它們得到的cpu時間應該是一樣的,此時,造成它們佔用cpu時間不同的原因就很可能是這樣在相同的時間內,wr線程提供的數據量根本不夠poll線程處理,poll線程處理在相同的時間內處理這些數據綽綽有餘,那麼,poll線程更多的時間就耗費在輪詢上了。到這裡,我們假設的是這兩個線程分配到了相同的時間,所以,即使poll線程大部分時間是在做poll輪詢操作,那麼所佔用的時間最大值也是有限的,不會超過線程分配得到的時間。但是,如果此時儘管wr線程得到了cpu時間,但是它在處理過程中主動放棄cpu(比如執行wait操作),那麼,在cpu時間總量相等的情況下,由於poll線程沒有執行wait操作,因此,就會得到更多的cpu時間,這樣,大部分的cpu時間都給poll的輪詢操作了。如果我們可以如果使wr線程不斷的運行,而使poll線程在沒有數據的時候放棄cpu時間,就應該可以減少poll線程的輪詢操作,而使wr線程得到更多的cpu時間,提供更多的可供讀寫的數據,提高讀寫的效率。然而,由於poll輪詢操作是一個不間斷的過程,是sun公司提供的底層的api,是無法修改的。所以,就要放棄poll操作,也就是要放棄選擇器。
發明內容本發明要解決上述技術所存在的缺陷,提供一種基於NIO和IO的面向發布訂閱系統的傳輸方法。本發明解決其技術問題所採用的技術方案這種基於NIO和IO的面向發布訂閱系統的傳輸方法,我們的傳輸協議在服務端摒棄老式的基於流的IO,而採用了JDK1.4中的NIO,而在客戶端仍採用老式的I/O。在服務端我們啟用唯一的接收socket連接的線程,這個接收線程包含有一個Selector對象和一個讀線程組和寫線程組,Selector對象主要從客戶端捕捉到各種事件(比如向伺服器端寫入數據),讀線程組和寫線程組用來接收來自客戶端的socket連接以及向客戶端寫入數據,這個Selector對象註冊了一個ServerSocketChannel(伺服器套接字通道)對象,這個ServerSocketChannel對象用來接收來自客戶端的socket連接。每個讀線程或寫線程分別對應一個socket連接,並將每次接收到的socket都封裝起來成為一個可讀寫數據的SelectableChannel對象,調用Java的regist函數將其再註冊到唯一的Selector對象上,當客戶端連接關閉的時候,將連接從讀寫線程中註銷,註銷後,讀寫線程則可以綁定新的連接,這樣就可以使用固定數目的線程處理任意多的客戶端連接。對於選中的用來讀寫數據的Channel(通道),包括ReadableByteChannel(讀通道,用來從客戶端讀取數據)和WritableByteChannel(寫通道,用來向客戶端寫入數據)進行的操作是非阻塞的,因而所有的線程都可以真正用來發送、接收消息。在客戶端連接數量很大的情況下,NIO可以保證線程上下文切換時較小的開銷。而在客戶端,因為我們每個客戶端一般只有一個到服務端的連接,所以摒棄NIO,釆用老式IO操作。主要過程包括如下步驟1)啟動伺服器端接收SocketChannel連接的接收線程,並啟動讀、寫線程組;2)客戶端進行操作時,打開一個Socket連接到伺服器端,伺服器端的接收線程開始接收連接,並把接收到的SocketChannd向伺服器端的讀寫線程註冊,即將連接的SocketChannel綁定到讀寫線程上;3)客戶端將要發送的消息作為數據報的一部分,每個數據報格式為數據報頭、消息體,其中數據報頭包含一個特殊的標誌符和表示消息大小的一個整數,而後將數據報加碼為二進位流,然後通過Socket調用write函數將數據報發送到伺服器端;4)伺服器端讀線程從Socket中讀取標誌數據報的二進位流,如讀取數據為數據報頭,則新建一個數據報,並將此流解碼為數據報;然後伺服器處理此數據報,並生成返回客戶端的數據報,利用NIO的WriteableByteChannel,將處理後得到的數據報加碼後變為二進位流,發送到客戶端;5)客戶端從Socket讀取此二進位流,同上解碼為數據報,並根據數據報格式判斷數據報是否需要客戶端處理器進行處理,如果需要處理,則將此數據報進行處理,否則,什麼也不做。按照以上步驟,就完成了一個消息從消息生產者發送到事件通知服務,再從事件通知服務發送到消息訂閱者的過程。本發明有益的效果是基於Java的IO和NIO的混合使用,將NIO用在服務端可以以非阻塞的形式處理連入socket,減少線程數量,從而減少資源的消耗,在客戶端利用老式IO與服務端連接,進而大大提高了消息發送的速度,改進了整個系統的性能。對於大規模分布式系統有著重要的意義。圖1為發布/訂閱系統的概念結構示意圖。圖2為本發明的傳輸協議的整體流程圖。附圖標記在第五步之前,我們應該確定用戶名,並且檢査連接;④接受線程;⑩服務端發報機;^客戶端發報機;[M1讀寫線程^^客戶端處理器;A連接代理;O解碼器;^^服務端處理器;§數據報對列口客戶端連接代理;0客戶端讀線程;|SDS|服務端發報服務;O客戶端發報服務。具體實施例方式下面結合附圖和實施例對本發明作進一步介紹圖1中,Client2和CUent3為消息訂閱者,兩個客戶端向事件通知服務訂閱消息,而Clientl為消息發布者,負責向事件通知服務發送消息,當事件通知服務接收到消息時,就會向消息訂閱者發送其訂閱的消息。此時的消息訂閱者和消息發布者統稱為客戶端,而事件通知服務即為服務端。如圖2所示,整個系統由客戶端和服務端組成,並啟動客戶端和服務端各自的操作來完成對消息的整個傳輸過程。上述各部分具體功能如下-接收線程伺服器端監視線程,接收客戶端連接,並把其註冊到相應的讀寫線程上服務端發報機調用連接向伺服器端發送數據報。客戶端發報機向客戶端發送數據報。讀寫線程服務端線程,調用連接代理讀取客戶端數據並將數據寫回客戶端。客戶端處理器處理伺服器端返回數據報。連接代理負責向客戶端寫入數據報,並讀取數據報。解碼器將二進位流轉換為數據報,以及將數據報轉換為二進位流。服務端處理器在伺服器端處理從客戶端接收到的數據報。數據報隊列存儲數據報。客戶端連接代理負責向客戶端寫入數據報,並讀取數據報。客戶端讀線程讀取服務端發送的數據報。服務端發報服務啟動服務端服務,等待客戶端連接。客戶端發報服務啟動客戶端服務,與服務端建立連接,開始通訊。整個過程可描述如下首先啟動服務端服務,這個過程分為2個主要的步驟1、啟動接收SocketChannel連接的接收線程(圖中接收線程);2、啟動處理"讀""寫"任務的讀寫線程(圖中讀寫線程);在步驟1中,伺服器的啟動可以根據設定參數來確定同時啟動讀寫線程數目,並只啟動一個接收線程來接收客戶端的連接。其中讀寫線程裡維持一個鍊表,用來存放接收到的客戶端連接。當服務端啟動以後,服務端就會有一個接收線程監聽來自客戶端的socket連接,一旦有來自客戶端的socket連接,則讀、寫線程就會開始工作,即通過channel讀取socket中數據並將數據寫入soclcet。然後就開始創建客戶端到服務端的連接,這個過程分為5個主要的步驟1、用戶使用通過JNDI査找到的連接工廠來創建連接;2、根據査找到的連接類型從服務端發報機工廠中獲得相應的服務端發報機(如圖);3、調用服務端發報機的方法,從客戶端打開一個Socket連接到伺服器端;4、伺服器端的接收線程接收連接,並把接收到的SocketChannel向伺服器端的讀寫線程註冊;5、啟動客戶端的通訊層服務,並客戶端讀線程(如圖)。在步驟1中,連接工程主要的功能為建立與伺服器端的連接,當連接建立好後,才可以發送、接收消息,其中連接可根據配置文件從連接工廠中創建。在步驟2中,連接類型可從配置文件中得到,而服務端發報機工廠為一創建發報機的對象,發報機則用來向服務端發送消息。在步驟3中,我們給服務端發報機定義了發送數據報的方法,利用socket定義的接口write來發送各種數據報。在發送之前,這些數據報都經過了加碼處理,變為二進位流發送到伺服器端。在步驟4中,伺服器端的接收線程不停査詢看是否有新的連接,如果有新的連接,則將此連接加入讀寫線程的鍊表中,並喚醒讀寫線程來處理鍊表的連接。步驟5中的通訊層服務啟動後才可以讀取從伺服器端發回的消息,讀線程主要用來讀取從服務端發回的數據,並交給解碼器(如圖)處理。當客戶端與服務端建立連接後,就開始消息的收發過程,此過程分為5個主要的步驟1、客戶端產生消息,把消息封裝成數據報中後,調用客戶端連接代理(如圖)來輸出數據報;2、連接代理把數據報發往伺服器端;3、伺服器端的讀寫線程從Socket中讀到數據,把數據提交給解碼器解碼;4、解碼器解碼得到數據報之後,判斷這個數據報是否需要提交給伺服器端的處理器處理,如果不需要,什麼都不做。如果需要,則服務端的處理器會調用相應的方法來處理這個數據報。5、處理完這個數據報之後,通過連接代理把數據報的回執發往客戶端。步驟1中產生消息主要是根據SUN公司制定的JMS規範中提供的的API(標準用戶接口)生產的,根據JMS規範,在生成消息的時候,可以給消息設置屬性。生成消息後,根據前面提到的加碼方法,將本消息封裝為一個數據報。步驟2中的發送方式主要為調用socket中的標準write0函數。步驟3中的讀寫線程讀取二進位數據,並將此二進位數據交給解碼器解碼,將二進位文件還原為一個標準的數據報。步驟4中,數據報可分為多種類型,有的數據報中封裝消息,有的數據報中封裝請求。而伺服器端的根據不同的數據報類型會有不同的處理方式,比如有的數據報需要需要提取出消息後將消息存儲到資料庫中。步驟5中,連接代理會調用解碼器將數據報回執解碼為二進位數據,通過前面提到的SelectableChannel的write方法發往客戶端。當服務端發送數據報後,客戶端會接收服務端發來的數據,並進行處理1、客戶端端的讀線程從Socket中讀到數據,把數據提交給解碼器解碼;2、解碼器解碼得到數據報之後,判斷這個數據報是否需要提交給客戶端的處理器處理,如果不需要,則什麼也不做。如果需要,則客戶端的處理器會調用相應的方法來處理這個數據報。步驟1中,客戶端通過socket讀取一段二進位數據,並根據解碼器將此二進位數據解為相應的數據報。步驟2中,根據數據報類型進行處理的時候,根據數據報類型的不同採取不同的處理方式。例如-.如果數據報中包含有消息,則將此消息提取出來。這樣,就完成了一個消息從消息發布者發送到事件通知服務,再從事件通知服務發送到消息訂閱者的過程。上述實施例用來解釋說明本發明,而不是對本發明進行限制,在本發明的精神和權利要求的保護範圍內,對本發明作出的任何修改和改變,都落入本發明的保護範圍。權利要求1、一種基於NIO和IO的面向發布訂閱系統的傳輸方法,其特徵在於在服務端採用了JDK1.4中的NIO,啟用唯一的接收socket連接的線程,這個接收線程包含有一個Selector對象和一個讀線程組和寫線程組,Selector對象主要從客戶端捕捉到各種事件,讀線程組和寫線程組用來接收來自客戶端的socket連接以及向客戶端寫入數據,這個Selector對象註冊了一個ServerSocketChannel對象,這個ServerSocketChannel對象用來接收來自客戶端的socket連接;每個讀線程或寫線程分別對應一個socket連接,並將每次接收到的socket都封裝起來成為一個可讀寫數據的SelectableChannel對象,調用java的regist函數將其再註冊到唯一的Selector對象上,當客戶端連接關閉的時候,將連接從讀寫線程中註銷,註銷後,讀寫線程則可以綁定新的連接;對於選中的用來讀寫數據的Channel,包括ReadableByteChannel和WritableByteChannel進行的操作是非阻塞的;在客戶端採用基於流的IO操作,按照以上步驟,就完成了一個消息從消息生產者發送到事件通知服務,再從事件通知服務發送到消息訂閱者的過程。2、根據權利要求1所述的基於NIO和IO的面向發布訂閱系統的傳輸方法,其特徵在於在客戶端採用基於流的IO操作,主要過程包括如下步驟1)啟動伺服器端接收SocketChannel連接的接收線程,並啟動讀、寫線程組;2)客戶端進行操作時,打開一個Socket連接到伺服器端,伺服器端的接收線程開始接收連接,並把接收到的SocketChannel向伺服器端的讀寫線程註冊,即將連接的SocketChannel綁定到讀寫線程上;3)客戶端將要發送的消息作為數據報的一部分,每個數據報格式為數據報頭、消息體,其中數據報頭包含一個特殊的標誌符和表示消息大小的一個整數,而後將數據報加碼為二進位流,然後通過Socket調用write函數將數據報發送到伺服器端;4)伺服器端讀線程從Socket中讀取標誌數據報的二進位流,如讀取數據為數據報頭,則新建一個數據報,並將此流解碼為數據報;然後伺服器處理此數據報,並生成返回客戶端的數據報,利用NIO的WriteableByteChaimel,將處理後得到的數據報加碼後變為二進位流,發送到客戶端;5)客戶端從Socket讀取此二進位流,同上解碼為數據報,並根據數據報格式判斷數據報是否需要客戶端處理器進行處理,如果需要處理,則將此數據報進行處理,否則,什麼也不做。全文摘要本發明涉及一種基於NIO和IO的面向發布訂閱系統的傳輸方法,在服務端採用NIO,啟用唯一的接收socket連接的線程,這個接收線程包含有Selector對象、讀線程組和寫線程組,Selector對象從客戶端捕捉到各種事件,讀線程組和寫線程組用來接收來自客戶端的socket連接以及向客戶端寫入數據;每個讀線程或寫線程分別對應一個socket連接,並將每次接收到的socket都封裝起來成為一個可讀寫數據的對象,將其再註冊到唯一的Selector對象上,當客戶端連接關閉的時,將連接從讀寫線程中註銷,註銷後,讀寫線程則可以綁定新的連接。本發明優點是基於java的IO和NIO的混合使用,將NIO用在服務端減少線程數量,從而減少資源的消耗,在客戶端利用IO與服務端連接,進而大大提高了消息發送的速度。文檔編號H04L29/08GK101651698SQ20071016475公開日2010年2月17日申請日期2007年12月12日優先權日2007年12月12日發明者健吳,吳朝暉,尹建偉,施東材,瑩李,楨王,鄧水光申請人:浙江大學