關(guān)于Openfire集群源碼的分析
本文介紹了openfire的相關(guān)內(nèi)容,這個(gè)東西現(xiàn)在用的人好像不多了。算了,我們看看具體內(nèi)容。
openfire是什么?
Openfire 采用Java開(kāi)發(fā),開(kāi)源的實(shí)時(shí)協(xié)作(RTC)服務(wù)器基于XMPP(Jabber)協(xié)議。Openfire安裝和使用都非常簡(jiǎn)單,并利用Web進(jìn)行管理。單臺(tái)服務(wù)器可支持上萬(wàn)并發(fā)用戶。由于是采用開(kāi)放的XMPP協(xié)議,您可以使用各種支持XMPP協(xié)議的IM客戶端軟件登陸服務(wù)。如果你想輕易地構(gòu)建高效率的即時(shí)通信服務(wù)器,那就選擇它吧!
openfire能做什么?
我們要了解Openfire,首先要了解XMPP協(xié)議,因?yàn)镺penfire是用Java語(yǔ)言編寫的,基于XMPP協(xié)議、開(kāi)源的實(shí)時(shí)協(xié)作的服務(wù)器。Openfire具有跨平臺(tái)的能力,Openfire與客戶端采用的是C/S架構(gòu),一個(gè)服務(wù)器要負(fù)責(zé)為連接在其上的客戶端提供服務(wù)。Openfire客戶端有spark,pidgin, Miranda IM,iChat等,用戶如果自己開(kāi)發(fā)客戶端,可以采用遵循GPL的開(kāi)源Client端API--Smack。Openfire服務(wù)器端支持插件開(kāi)發(fā),如果開(kāi)發(fā)者需要添加新的服務(wù),可以開(kāi)發(fā)出自己的插件后,安裝至服務(wù)器,就可以提供服務(wù),如查找聯(lián)系人服務(wù)就是以插件的形式提供的。
openfire如果用戶量增加后為了解決吞吐量問(wèn)題,需要引入集群,在openfire中提供了集群的支持,另外也實(shí)現(xiàn)了兩個(gè)集群插件:hazelcast和clustering。為了了解情況集群的工作原理,我就沿著openfire的源代碼進(jìn)行了分析,也是一次學(xué)習(xí)的過(guò)程。
首先理解集群的一些簡(jiǎn)單概念
集群的目的是讓多個(gè)實(shí)例像一個(gè)實(shí)例一樣運(yùn)行,這樣就可以通過(guò)增長(zhǎng)實(shí)例來(lái)增長(zhǎng)計(jì)算能力。也就是所謂的分布式計(jì)算問(wèn)題,這其中最為關(guān)注的一個(gè)特性就是——CAP理論,也就是所謂的一致性、可用性、分區(qū)容錯(cuò)性。集群中最核心解決的問(wèn)題就是CAP。
CAP綜合理解就是我上面寫的,多個(gè)實(shí)例像一個(gè)實(shí)例一樣運(yùn)行。
所以所謂集群就是把一些數(shù)據(jù)共享或者同步到不同的實(shí)例上,這樣系統(tǒng)使用同樣的算法,取的結(jié)果當(dāng)然應(yīng)該是相同啦。所以一些數(shù)據(jù)庫(kù)的主從復(fù)制,緩存數(shù)據(jù)集群都是類似這種解決方法。只是代碼實(shí)現(xiàn)質(zhì)量和處理規(guī)模的問(wèn)題。
有了這個(gè)基礎(chǔ)我們?cè)賮?lái)看看openfire是怎么解決這個(gè)問(wèn)題的。
openfire的集群設(shè)計(jì)
1、哪些需要進(jìn)行集群間的同步
對(duì)于openfire而言,有這幾方面的數(shù)據(jù)需要進(jìn)行保證集群間的同步:數(shù)據(jù)庫(kù)存的數(shù)據(jù)、緩存數(shù)據(jù)、session。貌似就這些吧?
數(shù)據(jù)庫(kù)
因?yàn)閷?duì)于openfire來(lái)說(shuō)基本上是透明的,所以這塊就交給數(shù)據(jù)庫(kù)本身來(lái)實(shí)現(xiàn)。
緩存數(shù)據(jù)
緩存是存在內(nèi)存里的,所以這部分是要同步的
session
session在openfire并不需要所有實(shí)例同步,但是需要做用戶路由緩存,否則發(fā)消息時(shí)找不到對(duì)應(yīng)的會(huì)話。由此用戶路由還是要同步的。
2、緩存的設(shè)計(jì)
緩存接口
openfire里對(duì)緩存的數(shù)據(jù)容器提供了一個(gè)包裝接口,這個(gè)接口提供了緩存數(shù)據(jù)的基本方法,用于統(tǒng)一數(shù)據(jù)操作。
publicinterface Cache<K,V> extends java.util.Map<K,V>
如果不開(kāi)啟集群時(shí)緩存的默認(rèn)緩存容器類是:public class DefaultCache<K, V> ,實(shí)際上DefaultCache就是用一個(gè)Hashmap來(lái)存數(shù)據(jù)的。
緩存工廠類
為了保證緩存是可以擴(kuò)展的,提供了一個(gè)工廠類:
publicclass CacheFactory
CacheFactory類中會(huì)管理所有的緩存容器,如下代碼:
/** * Returns the named cache, creating it as necessary. * * @param name the name of the cache to create. * @return the named cache, creating it as necessary. */ @SuppressWarnings("unchecked") publicstaticsynchronized <T extends Cache> T createCache(String name) { T cache = (T) caches.get(name); if (cache != null) { return cache; } cache = (T) cacheFactoryStrategy.createCache(name); log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name); return wrapCache(cache, name); }
上面代碼中會(huì)通過(guò)緩存工廠策略對(duì)象來(lái)創(chuàng)建一個(gè)緩存容器,最后warpCache方法會(huì)將此容器放入到caches中。
緩存工廠類的策略
在CacheFactory中默認(rèn)是使用一個(gè)DefaultLocalCacheStrategy來(lái)完成緩存創(chuàng)建的。另外還提供了在集群條件下的緩存策略接入。也就是通過(guò)實(shí)例化不同的策略來(lái)切換緩存管理方案。比如后面要提到的hazelcast就是通過(guò)這個(gè)來(lái)替換了本地緩存策略的。從接口的設(shè)計(jì)上來(lái)看,openfire的緩存策略也就是為了集群與非集群的實(shí)現(xiàn)。
3、集群的設(shè)計(jì)
在openfire中的集群主要包括:集群管理、數(shù)據(jù)同步管理、集群計(jì)算任務(wù)。
集群管理者
在openfire中主要是一個(gè)類來(lái)實(shí)現(xiàn):ClusterManager,在ClusterManager中實(shí)現(xiàn)了集群實(shí)例的加入、退出管理,因?yàn)闆](méi)有使用主從結(jié)構(gòu),所以ClusterManager實(shí)現(xiàn)了一個(gè)無(wú)中心管理,不知道我理解的對(duì)不對(duì)。因?yàn)橹灰?dāng)前實(shí)實(shí)例啟用了集群,ClusterManager就會(huì)主動(dòng)的加載集群管理并與其他的集群進(jìn)行同步。
startup
startup是啟動(dòng)集群的方法,代碼:
publicstaticsynchronizedvoid startup() { if (isClusteringEnabled() && !isClusteringStarted()) { initEventDispatcher(); CacheFactory.startClustering(); } }
首先要判斷是否開(kāi)啟了集群并且當(dāng)前集群實(shí)例未運(yùn)行時(shí)才去啟動(dòng)。
先是初始化了事件分發(fā)器,用于處理集群的同步事情。
然后就是調(diào)用CacheFactory的startClustering來(lái)運(yùn)行集群。在startClustering方法中主要是這幾個(gè)事情:
會(huì)使用集群的緩存工廠策略來(lái)啟動(dòng),同時(shí)使自己加入到集群中。
開(kāi)啟一個(gè)線程用于同步緩存的狀態(tài)
在前面startup中的initEventDispatcher方法,在這里會(huì)注冊(cè)一個(gè)分發(fā)線程監(jiān)聽(tīng)到集群事件,收到事件后會(huì)執(zhí)行joinedCluster或者leftCluster的操作,joinedCluster就是加入到集群中的意思。
在joinedCluster時(shí)會(huì)將本地的緩存容器都轉(zhuǎn)換為集群緩存。由此便完成了集群的初始化并加入到集群中了。
shutdown
shutdown相對(duì)簡(jiǎn)單點(diǎn)就是退出集群,并且將緩存工廠恢復(fù)為本地緩存。
同步管理
上面主要是講了如何管理集群,接著比較重要的就是如何在集群間同步數(shù)據(jù)呢?這部分主要是看具體的分布式計(jì)算系統(tǒng)的實(shí)現(xiàn)了,從openfire來(lái)說(shuō)就是將數(shù)據(jù)放到集群緩存中,然后通過(guò)集群組件來(lái)完成的,比如使用hazelcast。
因?yàn)槭褂镁彺鎭?lái)解決,所以在CacheFactory中才會(huì)有這些么多關(guān)于集群的處理代碼,特別是對(duì)于緩存策略的切換,以及集群任務(wù)處理都在CacheFactory作為接口方法向外公開(kāi)。這樣也把集群的實(shí)現(xiàn)透明了。
集群計(jì)算任務(wù)
在這之前一直沒(méi)有提到集群中的計(jì)算問(wèn)題,因?yàn)榧热挥辛思菏遣皇强梢岳眉旱膬?yōu)勢(shì)進(jìn)行一些并行計(jì)算呢?這部分我倒沒(méi)有太過(guò)確定,只是看到相關(guān)的代碼所以簡(jiǎn)單列一下。
在CacheFactory類中有幾個(gè)方法:doClusterTask、doSynchronousClusterTask,這兩個(gè)都是overload方法,參數(shù)有所不同而已。這幾個(gè)方法就是用于執(zhí)行一些計(jì)算任務(wù)的。就看一下doClusterTask:
public static void doClusterTask(final ClusterTask<?> task) { cacheFactoryStrategy.doClusterTask(task); }
這里有個(gè)限定就是必須是ClusterTask派生的類才行,看看它的定義:
public interface ClusterTask<V> extends Runnable, Externalizable { V getResult(); }
主要是為了異步執(zhí)行和序列化,異步是因?yàn)椴荒茏枞蛄谢?dāng)然就是為了能在集群中傳送。
再看CacheFactory的doClusterTask方法可以發(fā)現(xiàn),它只不過(guò)是代理了緩存策略工廠的doClusterTask,具體的實(shí)現(xiàn)還是要看集群實(shí)現(xiàn)的。
看一看hazelcast的實(shí)現(xiàn)簡(jiǎn)單理解openfire集群
在openfire中有集群的插件實(shí)現(xiàn),這里就以hazelcast為例子簡(jiǎn)單的做一下分析與學(xué)習(xí)。
緩存策略工廠類(ClusteredCacheFactory)
ClusteredCacheFactory實(shí)現(xiàn)了CacheFactoryStrategy,代碼如下:
publicclass ClusteredCacheFactory implements CacheFactoryStrategy {
首先是startCluster方法用于啟動(dòng)集群,主要完成幾件事情:
設(shè)置緩存序列化工具類,ClusterExternalizableUtil。這個(gè)是用于集群間數(shù)據(jù)復(fù)制時(shí)的序列化工具
設(shè)置遠(yuǎn)程session定位器,RemoteSessionLocator,因?yàn)閟ession不同步,所以它主要是用于多實(shí)例間的session讀取
設(shè)置遠(yuǎn)程包路由器ClusterPacketRouter,這樣就可以在集群中發(fā)送消息了
加載Hazelcast的實(shí)例設(shè)置NodeID,以及設(shè)置ClusterListener
在前面說(shuō)起集群?jiǎn)?dòng)時(shí)提到了緩存切換,那具體實(shí)現(xiàn)時(shí)是如何做的呢?
因?yàn)榧簡(jiǎn)?dòng)后就要是CacheFactory.joinedCluster方法來(lái)加入集群的??匆幌录尤氲拇a:
/** * Notification message indicating that this JVM has joined a cluster. */ @SuppressWarnings("unchecked") publicstaticsynchronizedvoid joinedCluster() { cacheFactoryStrategy = clusteredCacheFactoryStrategy; // Loop through local caches and switch them to clustered cache (copy content)for (Cache cache : getAllCaches()) { // skip local-only cachesif (localOnly.contains(cache.getName())) continue; CacheWrapper cacheWrapper = ((CacheWrapper) cache); Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName()); clusteredCache.putAll(cache); cacheWrapper.setWrappedCache(clusteredCache); } clusteringStarting = false; clusteringStarted = true; log.info("Clustering started; cache migration complete"); }
這里可以看到會(huì)讀取所有的緩存容器并一個(gè)個(gè)的使用Wrapper包裝一下,然后用同樣的緩存名稱去createCache一個(gè)新的Cache,這步使用的是切換后的集群緩存策略工廠,也就是說(shuō)會(huì)使用ClusteredCacheFactory去創(chuàng)建新的緩存容器。最后再將cache寫入到新的clusteredCache 里,這樣就完成了緩存的切換。
當(dāng)然這里還是要看一下ClusteredCacheFactory的createCache實(shí)現(xiàn):
public Cache createCache(String name) { // Check if cluster is being started upwhile (state == State.starting) { // Wait until cluster is fully started (or failed)try { Thread.sleep(250); } catch (InterruptedException e) { // Ignore } } if (state == State.stopped) { thrownew IllegalStateException("Cannot create clustered cache when not in a cluster"); } returnnew ClusteredCache(name, hazelcast.getMap(name)); }
這里使用的是ClusteredCache,而且最重要的是傳入的第二個(gè)map參數(shù)換成了hazelcast的了,這樣之后再訪問(wèn)這個(gè)緩存容器時(shí)已經(jīng)不再是原先的本地Cache了,已經(jīng)是hazelcast的map對(duì)象。hazelcast會(huì)自動(dòng)對(duì)map的數(shù)據(jù)進(jìn)行同步管理,這也就完成了緩存同步的功能。
集群計(jì)算
那就看hazelcast的實(shí)現(xiàn)吧,在ClusteredCacheFactory中doClusterTask舉個(gè)例子吧
publicvoid doClusterTask(final ClusterTask task) { if (cluster == null) { return; } Set<Member> members = new HashSet<Member>(); Member current = cluster.getLocalMember(); for(Member member : cluster.getMembers()) { if (!member.getUuid().equals(current.getUuid())) { members.add(member); } } if (members.size() > 0) { // Asynchronously execute the task on the other cluster members logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName()); hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers( new CallableTask<Object>(task), members); } else { logger.warn("No cluster members selected for cluster task " + task.getClass().getName()); } }
過(guò)程就是,先獲取到集群中的實(shí)例成員,當(dāng)然要排除自己。然后hazelcast提供了ExecutorService來(lái)執(zhí)行這個(gè)task,方法就是submiteToMembers。這樣就提交了一個(gè)運(yùn)算任務(wù)。只不過(guò)具體是如何分配計(jì)算并匯集結(jié)果倒真不太清楚。
總結(jié)
花了一天時(shí)間看了一下openfire的集群,順手就寫了一篇文章,確實(shí)也到了一些東西。和一些網(wǎng)友溝通中好像目前大家更愿意使用redies來(lái)完成緩存共享,以及通過(guò)代理來(lái)實(shí)現(xiàn)集群,而不愿意使用openfire的集群方案。這部分我沒(méi)有遇到如何大的并發(fā)量需求確實(shí)不知道區(qū)別在哪里。以后有機(jī)會(huì)還是動(dòng)手試試寫一個(gè)redies的插件。
相關(guān)文章
Java流操作之?dāng)?shù)據(jù)流實(shí)例代碼
這篇文章主要介紹了Java流操作之?dāng)?shù)據(jù)流實(shí)例代碼,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01java教程之對(duì)象序列化使用基礎(chǔ)示例詳解
所謂對(duì)象序列化就是將對(duì)象的狀態(tài)轉(zhuǎn)換成字節(jié)流,以后可以通過(guò)這些值再生成相同狀態(tài)的對(duì)象,下面詳細(xì)介紹一下java對(duì)象的序列化使用方法2014-01-01Java實(shí)現(xiàn)注冊(cè)登錄與郵箱發(fā)送賬號(hào)驗(yàn)證激活功能
這篇文章主要介紹了Java實(shí)現(xiàn)注冊(cè)登錄與郵箱發(fā)送賬號(hào)驗(yàn)證激活功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧2022-12-12關(guān)于Java使用Http輕量級(jí)請(qǐng)求庫(kù)Unirest的方法
這篇文章主要介紹了關(guān)于Java使用Http輕量級(jí)請(qǐng)求庫(kù)Unirest的方法,Unirest 是一個(gè)輕量級(jí)的 HTTP 請(qǐng)求庫(kù),可發(fā)起 GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS 請(qǐng)求,支持 Node、Ruby、Java、PHP、Python、Objective-C、.NET 等多種語(yǔ)言,需要的朋友可以參考下2023-08-08mybatis-plus常用注解@TableId和@TableField的用法
本文主要介紹了mybatis-plus常用注解@TableId和@TableField的用法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實(shí)例代碼
這篇文章主要介紹了Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實(shí)例代碼,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-09-09關(guān)于File與MultipartFile的用法概述
這篇文章主要介紹了關(guān)于File與MultipartFile的用法概述,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09