欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

關(guān)于Openfire集群源碼的分析

 更新時(shí)間:2017年09月23日 17:16:22   作者:5207  
這篇文章主要介紹了關(guān)于Openfire集群源碼的分析,內(nèi)容比較詳細(xì),具有一定參考價(jià)值,需要的朋友可以了解下。

本文介紹了openfire的相關(guān)內(nèi)容,這個(gè)東西現(xiàn)在用的人好像不多了。算了,我們看看具體內(nèi)容。

openfire是什么?

    Openfire 采用Java開(kāi)發(fā),開(kāi)源的實(shí)時(shí)協(xié)作(RTC)服務(wù)器基于XMPP(Jabber)協(xié)議。Openfire安裝和使用都非常簡(jiǎn)單,并利用Web進(jìn)行管理。單臺(tái)服務(wù)器可支持上萬(wàn)并發(fā)用戶。由于是采用開(kāi)放的XMPP協(xié)議,您可以使用各種支持XMPP協(xié)議的IM客戶端軟件登陸服務(wù)。如果你想輕易地構(gòu)建高效率的即時(shí)通信服務(wù)器,那就選擇它吧!

openfire能做什么?

         我們要了解Openfire,首先要了解XMPP協(xié)議,因?yàn)镺penfire是用Java語(yǔ)言編寫的,基于XMPP協(xié)議、開(kāi)源的實(shí)時(shí)協(xié)作的服務(wù)器。Openfire具有跨平臺(tái)的能力,Openfire與客戶端采用的是C/S架構(gòu),一個(gè)服務(wù)器要負(fù)責(zé)為連接在其上的客戶端提供服務(wù)。Openfire客戶端有spark,pidgin, Miranda IM,iChat等,用戶如果自己開(kāi)發(fā)客戶端,可以采用遵循GPL的開(kāi)源Client端API--Smack。Openfire服務(wù)器端支持插件開(kāi)發(fā),如果開(kāi)發(fā)者需要添加新的服務(wù),可以開(kāi)發(fā)出自己的插件后,安裝至服務(wù)器,就可以提供服務(wù),如查找聯(lián)系人服務(wù)就是以插件的形式提供的。

openfire如果用戶量增加后為了解決吞吐量問(wèn)題,需要引入集群,在openfire中提供了集群的支持,另外也實(shí)現(xiàn)了兩個(gè)集群插件:hazelcast和clustering。為了了解情況集群的工作原理,我就沿著openfire的源代碼進(jìn)行了分析,也是一次學(xué)習(xí)的過(guò)程。
首先理解集群的一些簡(jiǎn)單概念

集群的目的是讓多個(gè)實(shí)例像一個(gè)實(shí)例一樣運(yùn)行,這樣就可以通過(guò)增長(zhǎng)實(shí)例來(lái)增長(zhǎng)計(jì)算能力。也就是所謂的分布式計(jì)算問(wèn)題,這其中最為關(guān)注的一個(gè)特性就是——CAP理論,也就是所謂的一致性、可用性、分區(qū)容錯(cuò)性。集群中最核心解決的問(wèn)題就是CAP。

CAP綜合理解就是我上面寫的,多個(gè)實(shí)例像一個(gè)實(shí)例一樣運(yùn)行。

所以所謂集群就是把一些數(shù)據(jù)共享或者同步到不同的實(shí)例上,這樣系統(tǒng)使用同樣的算法,取的結(jié)果當(dāng)然應(yīng)該是相同啦。所以一些數(shù)據(jù)庫(kù)的主從復(fù)制,緩存數(shù)據(jù)集群都是類似這種解決方法。只是代碼實(shí)現(xiàn)質(zhì)量和處理規(guī)模的問(wèn)題。

有了這個(gè)基礎(chǔ)我們?cè)賮?lái)看看openfire是怎么解決這個(gè)問(wèn)題的。

openfire的集群設(shè)計(jì)

1、哪些需要進(jìn)行集群間的同步

對(duì)于openfire而言,有這幾方面的數(shù)據(jù)需要進(jìn)行保證集群間的同步:數(shù)據(jù)庫(kù)存的數(shù)據(jù)、緩存數(shù)據(jù)、session。貌似就這些吧?

數(shù)據(jù)庫(kù)

因?yàn)閷?duì)于openfire來(lái)說(shuō)基本上是透明的,所以這塊就交給數(shù)據(jù)庫(kù)本身來(lái)實(shí)現(xiàn)。

緩存數(shù)據(jù)

緩存是存在內(nèi)存里的,所以這部分是要同步的

session

session在openfire并不需要所有實(shí)例同步,但是需要做用戶路由緩存,否則發(fā)消息時(shí)找不到對(duì)應(yīng)的會(huì)話。由此用戶路由還是要同步的。

2、緩存的設(shè)計(jì)

緩存接口

openfire里對(duì)緩存的數(shù)據(jù)容器提供了一個(gè)包裝接口,這個(gè)接口提供了緩存數(shù)據(jù)的基本方法,用于統(tǒng)一數(shù)據(jù)操作。

publicinterface Cache<K,V> extends java.util.Map<K,V>

如果不開(kāi)啟集群時(shí)緩存的默認(rèn)緩存容器類是:public class DefaultCache<K, V> ,實(shí)際上DefaultCache就是用一個(gè)Hashmap來(lái)存數(shù)據(jù)的。

緩存工廠類

為了保證緩存是可以擴(kuò)展的,提供了一個(gè)工廠類:

publicclass CacheFactory

CacheFactory類中會(huì)管理所有的緩存容器,如下代碼:

/**
   * Returns the named cache, creating it as necessary.
   *
   * @param name     the name of the cache to create.
   * @return the named cache, creating it as necessary.
   */
  @SuppressWarnings("unchecked")
  publicstaticsynchronized <T extends Cache> T createCache(String name) {
    T cache = (T) caches.get(name);
    if (cache != null) {
      return cache;
    }
    cache = (T) cacheFactoryStrategy.createCache(name);
    log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name);
    return wrapCache(cache, name);
  }

上面代碼中會(huì)通過(guò)緩存工廠策略對(duì)象來(lái)創(chuàng)建一個(gè)緩存容器,最后warpCache方法會(huì)將此容器放入到caches中。

緩存工廠類的策略

在CacheFactory中默認(rèn)是使用一個(gè)DefaultLocalCacheStrategy來(lái)完成緩存創(chuàng)建的。另外還提供了在集群條件下的緩存策略接入。也就是通過(guò)實(shí)例化不同的策略來(lái)切換緩存管理方案。比如后面要提到的hazelcast就是通過(guò)這個(gè)來(lái)替換了本地緩存策略的。從接口的設(shè)計(jì)上來(lái)看,openfire的緩存策略也就是為了集群與非集群的實(shí)現(xiàn)。

3、集群的設(shè)計(jì)

在openfire中的集群主要包括:集群管理、數(shù)據(jù)同步管理、集群計(jì)算任務(wù)。

集群管理者

在openfire中主要是一個(gè)類來(lái)實(shí)現(xiàn):ClusterManager,在ClusterManager中實(shí)現(xiàn)了集群實(shí)例的加入、退出管理,因?yàn)闆](méi)有使用主從結(jié)構(gòu),所以ClusterManager實(shí)現(xiàn)了一個(gè)無(wú)中心管理,不知道我理解的對(duì)不對(duì)。因?yàn)橹灰?dāng)前實(shí)實(shí)例啟用了集群,ClusterManager就會(huì)主動(dòng)的加載集群管理并與其他的集群進(jìn)行同步。

startup

startup是啟動(dòng)集群的方法,代碼:

publicstaticsynchronizedvoid startup() {
    if (isClusteringEnabled() && !isClusteringStarted()) {
      initEventDispatcher();
      CacheFactory.startClustering();
    }
  }
 

首先要判斷是否開(kāi)啟了集群并且當(dāng)前集群實(shí)例未運(yùn)行時(shí)才去啟動(dòng)。

先是初始化了事件分發(fā)器,用于處理集群的同步事情。

然后就是調(diào)用CacheFactory的startClustering來(lái)運(yùn)行集群。在startClustering方法中主要是這幾個(gè)事情:

會(huì)使用集群的緩存工廠策略來(lái)啟動(dòng),同時(shí)使自己加入到集群中。

開(kāi)啟一個(gè)線程用于同步緩存的狀態(tài)

在前面startup中的initEventDispatcher方法,在這里會(huì)注冊(cè)一個(gè)分發(fā)線程監(jiān)聽(tīng)到集群事件,收到事件后會(huì)執(zhí)行joinedCluster或者leftCluster的操作,joinedCluster就是加入到集群中的意思。

在joinedCluster時(shí)會(huì)將本地的緩存容器都轉(zhuǎn)換為集群緩存。由此便完成了集群的初始化并加入到集群中了。

shutdown

shutdown相對(duì)簡(jiǎn)單點(diǎn)就是退出集群,并且將緩存工廠恢復(fù)為本地緩存。

同步管理

上面主要是講了如何管理集群,接著比較重要的就是如何在集群間同步數(shù)據(jù)呢?這部分主要是看具體的分布式計(jì)算系統(tǒng)的實(shí)現(xiàn)了,從openfire來(lái)說(shuō)就是將數(shù)據(jù)放到集群緩存中,然后通過(guò)集群組件來(lái)完成的,比如使用hazelcast。

因?yàn)槭褂镁彺鎭?lái)解決,所以在CacheFactory中才會(huì)有這些么多關(guān)于集群的處理代碼,特別是對(duì)于緩存策略的切換,以及集群任務(wù)處理都在CacheFactory作為接口方法向外公開(kāi)。這樣也把集群的實(shí)現(xiàn)透明了。

集群計(jì)算任務(wù)

在這之前一直沒(méi)有提到集群中的計(jì)算問(wèn)題,因?yàn)榧热挥辛思菏遣皇强梢岳眉旱膬?yōu)勢(shì)進(jìn)行一些并行計(jì)算呢?這部分我倒沒(méi)有太過(guò)確定,只是看到相關(guān)的代碼所以簡(jiǎn)單列一下。

在CacheFactory類中有幾個(gè)方法:doClusterTask、doSynchronousClusterTask,這兩個(gè)都是overload方法,參數(shù)有所不同而已。這幾個(gè)方法就是用于執(zhí)行一些計(jì)算任務(wù)的。就看一下doClusterTask:

 public static void doClusterTask(final ClusterTask<?> task) {
    cacheFactoryStrategy.doClusterTask(task);
  }

這里有個(gè)限定就是必須是ClusterTask派生的類才行,看看它的定義:

public interface ClusterTask<V> extends Runnable, Externalizable {
  V getResult();
}

主要是為了異步執(zhí)行和序列化,異步是因?yàn)椴荒茏枞蛄谢?dāng)然就是為了能在集群中傳送。

再看CacheFactory的doClusterTask方法可以發(fā)現(xiàn),它只不過(guò)是代理了緩存策略工廠的doClusterTask,具體的實(shí)現(xiàn)還是要看集群實(shí)現(xiàn)的。

看一看hazelcast的實(shí)現(xiàn)簡(jiǎn)單理解openfire集群

在openfire中有集群的插件實(shí)現(xiàn),這里就以hazelcast為例子簡(jiǎn)單的做一下分析與學(xué)習(xí)。

緩存策略工廠類(ClusteredCacheFactory)

ClusteredCacheFactory實(shí)現(xiàn)了CacheFactoryStrategy,代碼如下:

publicclass ClusteredCacheFactory implements CacheFactoryStrategy {

首先是startCluster方法用于啟動(dòng)集群,主要完成幾件事情:

設(shè)置緩存序列化工具類,ClusterExternalizableUtil。這個(gè)是用于集群間數(shù)據(jù)復(fù)制時(shí)的序列化工具

設(shè)置遠(yuǎn)程session定位器,RemoteSessionLocator,因?yàn)閟ession不同步,所以它主要是用于多實(shí)例間的session讀取

設(shè)置遠(yuǎn)程包路由器ClusterPacketRouter,這樣就可以在集群中發(fā)送消息了

加載Hazelcast的實(shí)例設(shè)置NodeID,以及設(shè)置ClusterListener

在前面說(shuō)起集群?jiǎn)?dòng)時(shí)提到了緩存切換,那具體實(shí)現(xiàn)時(shí)是如何做的呢?

因?yàn)榧簡(jiǎn)?dòng)后就要是CacheFactory.joinedCluster方法來(lái)加入集群的??匆幌录尤氲拇a:

/**
   * Notification message indicating that this JVM has joined a cluster.
   */
  @SuppressWarnings("unchecked")
  publicstaticsynchronizedvoid joinedCluster() {
    cacheFactoryStrategy = clusteredCacheFactoryStrategy;
    // Loop through local caches and switch them to clustered cache (copy content)for (Cache cache : getAllCaches()) {
      // skip local-only cachesif (localOnly.contains(cache.getName())) continue;
      CacheWrapper cacheWrapper = ((CacheWrapper) cache);
      Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
      clusteredCache.putAll(cache);
      cacheWrapper.setWrappedCache(clusteredCache);
    }
    clusteringStarting = false;
    clusteringStarted = true;
    log.info("Clustering started; cache migration complete");
  }

這里可以看到會(huì)讀取所有的緩存容器并一個(gè)個(gè)的使用Wrapper包裝一下,然后用同樣的緩存名稱去createCache一個(gè)新的Cache,這步使用的是切換后的集群緩存策略工廠,也就是說(shuō)會(huì)使用ClusteredCacheFactory去創(chuàng)建新的緩存容器。最后再將cache寫入到新的clusteredCache 里,這樣就完成了緩存的切換。

當(dāng)然這里還是要看一下ClusteredCacheFactory的createCache實(shí)現(xiàn):

public Cache createCache(String name) {
    // Check if cluster is being started upwhile (state == State.starting) {
      // Wait until cluster is fully started (or failed)try {
        Thread.sleep(250);
      }
      catch (InterruptedException e) {
        // Ignore
      }
    }
    if (state == State.stopped) {
      thrownew IllegalStateException("Cannot create clustered cache when not in a cluster");
    }
    returnnew ClusteredCache(name, hazelcast.getMap(name));
  }

這里使用的是ClusteredCache,而且最重要的是傳入的第二個(gè)map參數(shù)換成了hazelcast的了,這樣之后再訪問(wèn)這個(gè)緩存容器時(shí)已經(jīng)不再是原先的本地Cache了,已經(jīng)是hazelcast的map對(duì)象。hazelcast會(huì)自動(dòng)對(duì)map的數(shù)據(jù)進(jìn)行同步管理,這也就完成了緩存同步的功能。

集群計(jì)算

那就看hazelcast的實(shí)現(xiàn)吧,在ClusteredCacheFactory中doClusterTask舉個(gè)例子吧

publicvoid doClusterTask(final ClusterTask task) {
    if (cluster == null) { return; }
    Set<Member> members = new HashSet<Member>();
    Member current = cluster.getLocalMember();
    for(Member member : cluster.getMembers()) {
      if (!member.getUuid().equals(current.getUuid())) {
        members.add(member);
      }
    }
    if (members.size() > 0) {
      // Asynchronously execute the task on the other cluster members
      logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName());
      hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(
        new CallableTask<Object>(task), members);
    } else {
        logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
    }
  }

過(guò)程就是,先獲取到集群中的實(shí)例成員,當(dāng)然要排除自己。然后hazelcast提供了ExecutorService來(lái)執(zhí)行這個(gè)task,方法就是submiteToMembers。這樣就提交了一個(gè)運(yùn)算任務(wù)。只不過(guò)具體是如何分配計(jì)算并匯集結(jié)果倒真不太清楚。

總結(jié)

花了一天時(shí)間看了一下openfire的集群,順手就寫了一篇文章,確實(shí)也到了一些東西。和一些網(wǎng)友溝通中好像目前大家更愿意使用redies來(lái)完成緩存共享,以及通過(guò)代理來(lái)實(shí)現(xiàn)集群,而不愿意使用openfire的集群方案。這部分我沒(méi)有遇到如何大的并發(fā)量需求確實(shí)不知道區(qū)別在哪里。以后有機(jī)會(huì)還是動(dòng)手試試寫一個(gè)redies的插件。

相關(guān)文章

  • Java流操作之?dāng)?shù)據(jù)流實(shí)例代碼

    Java流操作之?dāng)?shù)據(jù)流實(shí)例代碼

    這篇文章主要介紹了Java流操作之?dāng)?shù)據(jù)流實(shí)例代碼,具有一定借鑒價(jià)值,需要的朋友可以參考下
    2018-01-01
  • 詳談Springfox與swagger的整合使用

    詳談Springfox與swagger的整合使用

    下面小編就為大家?guī)?lái)一篇詳談Springfox與swagger的整合使用。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-08-08
  • java教程之對(duì)象序列化使用基礎(chǔ)示例詳解

    java教程之對(duì)象序列化使用基礎(chǔ)示例詳解

    所謂對(duì)象序列化就是將對(duì)象的狀態(tài)轉(zhuǎn)換成字節(jié)流,以后可以通過(guò)這些值再生成相同狀態(tài)的對(duì)象,下面詳細(xì)介紹一下java對(duì)象的序列化使用方法
    2014-01-01
  • Java實(shí)現(xiàn)注冊(cè)登錄與郵箱發(fā)送賬號(hào)驗(yàn)證激活功能

    Java實(shí)現(xiàn)注冊(cè)登錄與郵箱發(fā)送賬號(hào)驗(yàn)證激活功能

    這篇文章主要介紹了Java實(shí)現(xiàn)注冊(cè)登錄與郵箱發(fā)送賬號(hào)驗(yàn)證激活功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧
    2022-12-12
  • 關(guān)于Java使用Http輕量級(jí)請(qǐng)求庫(kù)Unirest的方法

    關(guān)于Java使用Http輕量級(jí)請(qǐng)求庫(kù)Unirest的方法

    這篇文章主要介紹了關(guān)于Java使用Http輕量級(jí)請(qǐng)求庫(kù)Unirest的方法,Unirest 是一個(gè)輕量級(jí)的 HTTP 請(qǐng)求庫(kù),可發(fā)起 GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS 請(qǐng)求,支持 Node、Ruby、Java、PHP、Python、Objective-C、.NET 等多種語(yǔ)言,需要的朋友可以參考下
    2023-08-08
  • java郵件亂碼的徹底解決方案

    java郵件亂碼的徹底解決方案

    在本篇文章里小編給大家整理的是關(guān)于java郵件亂碼的徹底解決方案,需要的朋友們可以學(xué)習(xí)下。
    2019-12-12
  • mybatis-plus常用注解@TableId和@TableField的用法

    mybatis-plus常用注解@TableId和@TableField的用法

    本文主要介紹了mybatis-plus常用注解@TableId和@TableField的用法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實(shí)例代碼

    Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實(shí)例代碼

    這篇文章主要介紹了Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實(shí)例代碼,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2018-09-09
  • 關(guān)于File與MultipartFile的用法概述

    關(guān)于File與MultipartFile的用法概述

    這篇文章主要介紹了關(guān)于File與MultipartFile的用法概述,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • maven的生命周期及常用命令介紹

    maven的生命周期及常用命令介紹

    maven是一個(gè)項(xiàng)目構(gòu)建和管理的工具,提供了幫助管理 構(gòu)建、文檔、報(bào)告、依賴、scms、發(fā)布、分發(fā)的方法。下面通過(guò)本文給大家分享maven的生命周期及常用命令介紹,需要的朋友參考下吧
    2017-11-11

最新評(píng)論