關(guān)于Openfire集群源碼的分析
本文介紹了openfire的相關(guān)內(nèi)容,這個東西現(xiàn)在用的人好像不多了。算了,我們看看具體內(nèi)容。
openfire是什么?
Openfire 采用Java開發(fā),開源的實時協(xié)作(RTC)服務(wù)器基于XMPP(Jabber)協(xié)議。Openfire安裝和使用都非常簡單,并利用Web進行管理。單臺服務(wù)器可支持上萬并發(fā)用戶。由于是采用開放的XMPP協(xié)議,您可以使用各種支持XMPP協(xié)議的IM客戶端軟件登陸服務(wù)。如果你想輕易地構(gòu)建高效率的即時通信服務(wù)器,那就選擇它吧!
openfire能做什么?
我們要了解Openfire,首先要了解XMPP協(xié)議,因為Openfire是用Java語言編寫的,基于XMPP協(xié)議、開源的實時協(xié)作的服務(wù)器。Openfire具有跨平臺的能力,Openfire與客戶端采用的是C/S架構(gòu),一個服務(wù)器要負責(zé)為連接在其上的客戶端提供服務(wù)。Openfire客戶端有spark,pidgin, Miranda IM,iChat等,用戶如果自己開發(fā)客戶端,可以采用遵循GPL的開源Client端API--Smack。Openfire服務(wù)器端支持插件開發(fā),如果開發(fā)者需要添加新的服務(wù),可以開發(fā)出自己的插件后,安裝至服務(wù)器,就可以提供服務(wù),如查找聯(lián)系人服務(wù)就是以插件的形式提供的。
openfire如果用戶量增加后為了解決吞吐量問題,需要引入集群,在openfire中提供了集群的支持,另外也實現(xiàn)了兩個集群插件:hazelcast和clustering。為了了解情況集群的工作原理,我就沿著openfire的源代碼進行了分析,也是一次學(xué)習(xí)的過程。
首先理解集群的一些簡單概念
集群的目的是讓多個實例像一個實例一樣運行,這樣就可以通過增長實例來增長計算能力。也就是所謂的分布式計算問題,這其中最為關(guān)注的一個特性就是——CAP理論,也就是所謂的一致性、可用性、分區(qū)容錯性。集群中最核心解決的問題就是CAP。
CAP綜合理解就是我上面寫的,多個實例像一個實例一樣運行。
所以所謂集群就是把一些數(shù)據(jù)共享或者同步到不同的實例上,這樣系統(tǒng)使用同樣的算法,取的結(jié)果當(dāng)然應(yīng)該是相同啦。所以一些數(shù)據(jù)庫的主從復(fù)制,緩存數(shù)據(jù)集群都是類似這種解決方法。只是代碼實現(xiàn)質(zhì)量和處理規(guī)模的問題。
有了這個基礎(chǔ)我們再來看看openfire是怎么解決這個問題的。
openfire的集群設(shè)計
1、哪些需要進行集群間的同步
對于openfire而言,有這幾方面的數(shù)據(jù)需要進行保證集群間的同步:數(shù)據(jù)庫存的數(shù)據(jù)、緩存數(shù)據(jù)、session。貌似就這些吧?
數(shù)據(jù)庫
因為對于openfire來說基本上是透明的,所以這塊就交給數(shù)據(jù)庫本身來實現(xiàn)。
緩存數(shù)據(jù)
緩存是存在內(nèi)存里的,所以這部分是要同步的
session
session在openfire并不需要所有實例同步,但是需要做用戶路由緩存,否則發(fā)消息時找不到對應(yīng)的會話。由此用戶路由還是要同步的。
2、緩存的設(shè)計
緩存接口
openfire里對緩存的數(shù)據(jù)容器提供了一個包裝接口,這個接口提供了緩存數(shù)據(jù)的基本方法,用于統(tǒng)一數(shù)據(jù)操作。
publicinterface Cache<K,V> extends java.util.Map<K,V>
如果不開啟集群時緩存的默認緩存容器類是:public class DefaultCache<K, V> ,實際上DefaultCache就是用一個Hashmap來存數(shù)據(jù)的。
緩存工廠類
為了保證緩存是可以擴展的,提供了一個工廠類:
publicclass CacheFactory
CacheFactory類中會管理所有的緩存容器,如下代碼:
/** * Returns the named cache, creating it as necessary. * * @param name the name of the cache to create. * @return the named cache, creating it as necessary. */ @SuppressWarnings("unchecked") publicstaticsynchronized <T extends Cache> T createCache(String name) { T cache = (T) caches.get(name); if (cache != null) { return cache; } cache = (T) cacheFactoryStrategy.createCache(name); log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name); return wrapCache(cache, name); }
上面代碼中會通過緩存工廠策略對象來創(chuàng)建一個緩存容器,最后warpCache方法會將此容器放入到caches中。
緩存工廠類的策略
在CacheFactory中默認是使用一個DefaultLocalCacheStrategy來完成緩存創(chuàng)建的。另外還提供了在集群條件下的緩存策略接入。也就是通過實例化不同的策略來切換緩存管理方案。比如后面要提到的hazelcast就是通過這個來替換了本地緩存策略的。從接口的設(shè)計上來看,openfire的緩存策略也就是為了集群與非集群的實現(xiàn)。
3、集群的設(shè)計
在openfire中的集群主要包括:集群管理、數(shù)據(jù)同步管理、集群計算任務(wù)。
集群管理者
在openfire中主要是一個類來實現(xiàn):ClusterManager,在ClusterManager中實現(xiàn)了集群實例的加入、退出管理,因為沒有使用主從結(jié)構(gòu),所以ClusterManager實現(xiàn)了一個無中心管理,不知道我理解的對不對。因為只要當(dāng)前實實例啟用了集群,ClusterManager就會主動的加載集群管理并與其他的集群進行同步。
startup
startup是啟動集群的方法,代碼:
publicstaticsynchronizedvoid startup() { if (isClusteringEnabled() && !isClusteringStarted()) { initEventDispatcher(); CacheFactory.startClustering(); } }
首先要判斷是否開啟了集群并且當(dāng)前集群實例未運行時才去啟動。
先是初始化了事件分發(fā)器,用于處理集群的同步事情。
然后就是調(diào)用CacheFactory的startClustering來運行集群。在startClustering方法中主要是這幾個事情:
會使用集群的緩存工廠策略來啟動,同時使自己加入到集群中。
開啟一個線程用于同步緩存的狀態(tài)
在前面startup中的initEventDispatcher方法,在這里會注冊一個分發(fā)線程監(jiān)聽到集群事件,收到事件后會執(zhí)行joinedCluster或者leftCluster的操作,joinedCluster就是加入到集群中的意思。
在joinedCluster時會將本地的緩存容器都轉(zhuǎn)換為集群緩存。由此便完成了集群的初始化并加入到集群中了。
shutdown
shutdown相對簡單點就是退出集群,并且將緩存工廠恢復(fù)為本地緩存。
同步管理
上面主要是講了如何管理集群,接著比較重要的就是如何在集群間同步數(shù)據(jù)呢?這部分主要是看具體的分布式計算系統(tǒng)的實現(xiàn)了,從openfire來說就是將數(shù)據(jù)放到集群緩存中,然后通過集群組件來完成的,比如使用hazelcast。
因為使用緩存來解決,所以在CacheFactory中才會有這些么多關(guān)于集群的處理代碼,特別是對于緩存策略的切換,以及集群任務(wù)處理都在CacheFactory作為接口方法向外公開。這樣也把集群的實現(xiàn)透明了。
集群計算任務(wù)
在這之前一直沒有提到集群中的計算問題,因為既然有了集群是不是可以利用集群的優(yōu)勢進行一些并行計算呢?這部分我倒沒有太過確定,只是看到相關(guān)的代碼所以簡單列一下。
在CacheFactory類中有幾個方法:doClusterTask、doSynchronousClusterTask,這兩個都是overload方法,參數(shù)有所不同而已。這幾個方法就是用于執(zhí)行一些計算任務(wù)的。就看一下doClusterTask:
public static void doClusterTask(final ClusterTask<?> task) { cacheFactoryStrategy.doClusterTask(task); }
這里有個限定就是必須是ClusterTask派生的類才行,看看它的定義:
public interface ClusterTask<V> extends Runnable, Externalizable { V getResult(); }
主要是為了異步執(zhí)行和序列化,異步是因為不能阻塞,而序列化當(dāng)然就是為了能在集群中傳送。
再看CacheFactory的doClusterTask方法可以發(fā)現(xiàn),它只不過是代理了緩存策略工廠的doClusterTask,具體的實現(xiàn)還是要看集群實現(xiàn)的。
看一看hazelcast的實現(xiàn)簡單理解openfire集群
在openfire中有集群的插件實現(xiàn),這里就以hazelcast為例子簡單的做一下分析與學(xué)習(xí)。
緩存策略工廠類(ClusteredCacheFactory)
ClusteredCacheFactory實現(xiàn)了CacheFactoryStrategy,代碼如下:
publicclass ClusteredCacheFactory implements CacheFactoryStrategy {
首先是startCluster方法用于啟動集群,主要完成幾件事情:
設(shè)置緩存序列化工具類,ClusterExternalizableUtil。這個是用于集群間數(shù)據(jù)復(fù)制時的序列化工具
設(shè)置遠程session定位器,RemoteSessionLocator,因為session不同步,所以它主要是用于多實例間的session讀取
設(shè)置遠程包路由器ClusterPacketRouter,這樣就可以在集群中發(fā)送消息了
加載Hazelcast的實例設(shè)置NodeID,以及設(shè)置ClusterListener
在前面說起集群啟動時提到了緩存切換,那具體實現(xiàn)時是如何做的呢?
因為集群啟動后就要是CacheFactory.joinedCluster方法來加入集群的??匆幌录尤氲拇a:
/** * Notification message indicating that this JVM has joined a cluster. */ @SuppressWarnings("unchecked") publicstaticsynchronizedvoid joinedCluster() { cacheFactoryStrategy = clusteredCacheFactoryStrategy; // Loop through local caches and switch them to clustered cache (copy content)for (Cache cache : getAllCaches()) { // skip local-only cachesif (localOnly.contains(cache.getName())) continue; CacheWrapper cacheWrapper = ((CacheWrapper) cache); Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName()); clusteredCache.putAll(cache); cacheWrapper.setWrappedCache(clusteredCache); } clusteringStarting = false; clusteringStarted = true; log.info("Clustering started; cache migration complete"); }
這里可以看到會讀取所有的緩存容器并一個個的使用Wrapper包裝一下,然后用同樣的緩存名稱去createCache一個新的Cache,這步使用的是切換后的集群緩存策略工廠,也就是說會使用ClusteredCacheFactory去創(chuàng)建新的緩存容器。最后再將cache寫入到新的clusteredCache 里,這樣就完成了緩存的切換。
當(dāng)然這里還是要看一下ClusteredCacheFactory的createCache實現(xiàn):
public Cache createCache(String name) { // Check if cluster is being started upwhile (state == State.starting) { // Wait until cluster is fully started (or failed)try { Thread.sleep(250); } catch (InterruptedException e) { // Ignore } } if (state == State.stopped) { thrownew IllegalStateException("Cannot create clustered cache when not in a cluster"); } returnnew ClusteredCache(name, hazelcast.getMap(name)); }
這里使用的是ClusteredCache,而且最重要的是傳入的第二個map參數(shù)換成了hazelcast的了,這樣之后再訪問這個緩存容器時已經(jīng)不再是原先的本地Cache了,已經(jīng)是hazelcast的map對象。hazelcast會自動對map的數(shù)據(jù)進行同步管理,這也就完成了緩存同步的功能。
集群計算
那就看hazelcast的實現(xiàn)吧,在ClusteredCacheFactory中doClusterTask舉個例子吧
publicvoid doClusterTask(final ClusterTask task) { if (cluster == null) { return; } Set<Member> members = new HashSet<Member>(); Member current = cluster.getLocalMember(); for(Member member : cluster.getMembers()) { if (!member.getUuid().equals(current.getUuid())) { members.add(member); } } if (members.size() > 0) { // Asynchronously execute the task on the other cluster members logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName()); hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers( new CallableTask<Object>(task), members); } else { logger.warn("No cluster members selected for cluster task " + task.getClass().getName()); } }
過程就是,先獲取到集群中的實例成員,當(dāng)然要排除自己。然后hazelcast提供了ExecutorService來執(zhí)行這個task,方法就是submiteToMembers。這樣就提交了一個運算任務(wù)。只不過具體是如何分配計算并匯集結(jié)果倒真不太清楚。
總結(jié)
花了一天時間看了一下openfire的集群,順手就寫了一篇文章,確實也到了一些東西。和一些網(wǎng)友溝通中好像目前大家更愿意使用redies來完成緩存共享,以及通過代理來實現(xiàn)集群,而不愿意使用openfire的集群方案。這部分我沒有遇到如何大的并發(fā)量需求確實不知道區(qū)別在哪里。以后有機會還是動手試試寫一個redies的插件。
相關(guān)文章
Java實現(xiàn)注冊登錄與郵箱發(fā)送賬號驗證激活功能
這篇文章主要介紹了Java實現(xiàn)注冊登錄與郵箱發(fā)送賬號驗證激活功能,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-12-12關(guān)于Java使用Http輕量級請求庫Unirest的方法
這篇文章主要介紹了關(guān)于Java使用Http輕量級請求庫Unirest的方法,Unirest 是一個輕量級的 HTTP 請求庫,可發(fā)起 GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS 請求,支持 Node、Ruby、Java、PHP、Python、Objective-C、.NET 等多種語言,需要的朋友可以參考下2023-08-08mybatis-plus常用注解@TableId和@TableField的用法
本文主要介紹了mybatis-plus常用注解@TableId和@TableField的用法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實例代碼
這篇文章主要介紹了Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實例代碼,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2018-09-09關(guān)于File與MultipartFile的用法概述
這篇文章主要介紹了關(guān)于File與MultipartFile的用法概述,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09