欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

關(guān)于Openfire集群源碼的分析

 更新時間:2017年09月23日 17:16:22   作者:5207  
這篇文章主要介紹了關(guān)于Openfire集群源碼的分析,內(nèi)容比較詳細,具有一定參考價值,需要的朋友可以了解下。

本文介紹了openfire的相關(guān)內(nèi)容,這個東西現(xiàn)在用的人好像不多了。算了,我們看看具體內(nèi)容。

openfire是什么?

    Openfire 采用Java開發(fā),開源的實時協(xié)作(RTC)服務(wù)器基于XMPP(Jabber)協(xié)議。Openfire安裝和使用都非常簡單,并利用Web進行管理。單臺服務(wù)器可支持上萬并發(fā)用戶。由于是采用開放的XMPP協(xié)議,您可以使用各種支持XMPP協(xié)議的IM客戶端軟件登陸服務(wù)。如果你想輕易地構(gòu)建高效率的即時通信服務(wù)器,那就選擇它吧!

openfire能做什么?

         我們要了解Openfire,首先要了解XMPP協(xié)議,因為Openfire是用Java語言編寫的,基于XMPP協(xié)議、開源的實時協(xié)作的服務(wù)器。Openfire具有跨平臺的能力,Openfire與客戶端采用的是C/S架構(gòu),一個服務(wù)器要負責(zé)為連接在其上的客戶端提供服務(wù)。Openfire客戶端有spark,pidgin, Miranda IM,iChat等,用戶如果自己開發(fā)客戶端,可以采用遵循GPL的開源Client端API--Smack。Openfire服務(wù)器端支持插件開發(fā),如果開發(fā)者需要添加新的服務(wù),可以開發(fā)出自己的插件后,安裝至服務(wù)器,就可以提供服務(wù),如查找聯(lián)系人服務(wù)就是以插件的形式提供的。

openfire如果用戶量增加后為了解決吞吐量問題,需要引入集群,在openfire中提供了集群的支持,另外也實現(xiàn)了兩個集群插件:hazelcast和clustering。為了了解情況集群的工作原理,我就沿著openfire的源代碼進行了分析,也是一次學(xué)習(xí)的過程。
首先理解集群的一些簡單概念

集群的目的是讓多個實例像一個實例一樣運行,這樣就可以通過增長實例來增長計算能力。也就是所謂的分布式計算問題,這其中最為關(guān)注的一個特性就是——CAP理論,也就是所謂的一致性、可用性、分區(qū)容錯性。集群中最核心解決的問題就是CAP。

CAP綜合理解就是我上面寫的,多個實例像一個實例一樣運行。

所以所謂集群就是把一些數(shù)據(jù)共享或者同步到不同的實例上,這樣系統(tǒng)使用同樣的算法,取的結(jié)果當(dāng)然應(yīng)該是相同啦。所以一些數(shù)據(jù)庫的主從復(fù)制,緩存數(shù)據(jù)集群都是類似這種解決方法。只是代碼實現(xiàn)質(zhì)量和處理規(guī)模的問題。

有了這個基礎(chǔ)我們再來看看openfire是怎么解決這個問題的。

openfire的集群設(shè)計

1、哪些需要進行集群間的同步

對于openfire而言,有這幾方面的數(shù)據(jù)需要進行保證集群間的同步:數(shù)據(jù)庫存的數(shù)據(jù)、緩存數(shù)據(jù)、session。貌似就這些吧?

數(shù)據(jù)庫

因為對于openfire來說基本上是透明的,所以這塊就交給數(shù)據(jù)庫本身來實現(xiàn)。

緩存數(shù)據(jù)

緩存是存在內(nèi)存里的,所以這部分是要同步的

session

session在openfire并不需要所有實例同步,但是需要做用戶路由緩存,否則發(fā)消息時找不到對應(yīng)的會話。由此用戶路由還是要同步的。

2、緩存的設(shè)計

緩存接口

openfire里對緩存的數(shù)據(jù)容器提供了一個包裝接口,這個接口提供了緩存數(shù)據(jù)的基本方法,用于統(tǒng)一數(shù)據(jù)操作。

publicinterface Cache<K,V> extends java.util.Map<K,V>

如果不開啟集群時緩存的默認緩存容器類是:public class DefaultCache<K, V> ,實際上DefaultCache就是用一個Hashmap來存數(shù)據(jù)的。

緩存工廠類

為了保證緩存是可以擴展的,提供了一個工廠類:

publicclass CacheFactory

CacheFactory類中會管理所有的緩存容器,如下代碼:

/**
   * Returns the named cache, creating it as necessary.
   *
   * @param name     the name of the cache to create.
   * @return the named cache, creating it as necessary.
   */
  @SuppressWarnings("unchecked")
  publicstaticsynchronized <T extends Cache> T createCache(String name) {
    T cache = (T) caches.get(name);
    if (cache != null) {
      return cache;
    }
    cache = (T) cacheFactoryStrategy.createCache(name);
    log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name);
    return wrapCache(cache, name);
  }

上面代碼中會通過緩存工廠策略對象來創(chuàng)建一個緩存容器,最后warpCache方法會將此容器放入到caches中。

緩存工廠類的策略

在CacheFactory中默認是使用一個DefaultLocalCacheStrategy來完成緩存創(chuàng)建的。另外還提供了在集群條件下的緩存策略接入。也就是通過實例化不同的策略來切換緩存管理方案。比如后面要提到的hazelcast就是通過這個來替換了本地緩存策略的。從接口的設(shè)計上來看,openfire的緩存策略也就是為了集群與非集群的實現(xiàn)。

3、集群的設(shè)計

在openfire中的集群主要包括:集群管理、數(shù)據(jù)同步管理、集群計算任務(wù)。

集群管理者

在openfire中主要是一個類來實現(xiàn):ClusterManager,在ClusterManager中實現(xiàn)了集群實例的加入、退出管理,因為沒有使用主從結(jié)構(gòu),所以ClusterManager實現(xiàn)了一個無中心管理,不知道我理解的對不對。因為只要當(dāng)前實實例啟用了集群,ClusterManager就會主動的加載集群管理并與其他的集群進行同步。

startup

startup是啟動集群的方法,代碼:

publicstaticsynchronizedvoid startup() {
    if (isClusteringEnabled() && !isClusteringStarted()) {
      initEventDispatcher();
      CacheFactory.startClustering();
    }
  }
 

首先要判斷是否開啟了集群并且當(dāng)前集群實例未運行時才去啟動。

先是初始化了事件分發(fā)器,用于處理集群的同步事情。

然后就是調(diào)用CacheFactory的startClustering來運行集群。在startClustering方法中主要是這幾個事情:

會使用集群的緩存工廠策略來啟動,同時使自己加入到集群中。

開啟一個線程用于同步緩存的狀態(tài)

在前面startup中的initEventDispatcher方法,在這里會注冊一個分發(fā)線程監(jiān)聽到集群事件,收到事件后會執(zhí)行joinedCluster或者leftCluster的操作,joinedCluster就是加入到集群中的意思。

在joinedCluster時會將本地的緩存容器都轉(zhuǎn)換為集群緩存。由此便完成了集群的初始化并加入到集群中了。

shutdown

shutdown相對簡單點就是退出集群,并且將緩存工廠恢復(fù)為本地緩存。

同步管理

上面主要是講了如何管理集群,接著比較重要的就是如何在集群間同步數(shù)據(jù)呢?這部分主要是看具體的分布式計算系統(tǒng)的實現(xiàn)了,從openfire來說就是將數(shù)據(jù)放到集群緩存中,然后通過集群組件來完成的,比如使用hazelcast。

因為使用緩存來解決,所以在CacheFactory中才會有這些么多關(guān)于集群的處理代碼,特別是對于緩存策略的切換,以及集群任務(wù)處理都在CacheFactory作為接口方法向外公開。這樣也把集群的實現(xiàn)透明了。

集群計算任務(wù)

在這之前一直沒有提到集群中的計算問題,因為既然有了集群是不是可以利用集群的優(yōu)勢進行一些并行計算呢?這部分我倒沒有太過確定,只是看到相關(guān)的代碼所以簡單列一下。

在CacheFactory類中有幾個方法:doClusterTask、doSynchronousClusterTask,這兩個都是overload方法,參數(shù)有所不同而已。這幾個方法就是用于執(zhí)行一些計算任務(wù)的。就看一下doClusterTask:

 public static void doClusterTask(final ClusterTask<?> task) {
    cacheFactoryStrategy.doClusterTask(task);
  }

這里有個限定就是必須是ClusterTask派生的類才行,看看它的定義:

public interface ClusterTask<V> extends Runnable, Externalizable {
  V getResult();
}

主要是為了異步執(zhí)行和序列化,異步是因為不能阻塞,而序列化當(dāng)然就是為了能在集群中傳送。

再看CacheFactory的doClusterTask方法可以發(fā)現(xiàn),它只不過是代理了緩存策略工廠的doClusterTask,具體的實現(xiàn)還是要看集群實現(xiàn)的。

看一看hazelcast的實現(xiàn)簡單理解openfire集群

在openfire中有集群的插件實現(xiàn),這里就以hazelcast為例子簡單的做一下分析與學(xué)習(xí)。

緩存策略工廠類(ClusteredCacheFactory)

ClusteredCacheFactory實現(xiàn)了CacheFactoryStrategy,代碼如下:

publicclass ClusteredCacheFactory implements CacheFactoryStrategy {

首先是startCluster方法用于啟動集群,主要完成幾件事情:

設(shè)置緩存序列化工具類,ClusterExternalizableUtil。這個是用于集群間數(shù)據(jù)復(fù)制時的序列化工具

設(shè)置遠程session定位器,RemoteSessionLocator,因為session不同步,所以它主要是用于多實例間的session讀取

設(shè)置遠程包路由器ClusterPacketRouter,這樣就可以在集群中發(fā)送消息了

加載Hazelcast的實例設(shè)置NodeID,以及設(shè)置ClusterListener

在前面說起集群啟動時提到了緩存切換,那具體實現(xiàn)時是如何做的呢?

因為集群啟動后就要是CacheFactory.joinedCluster方法來加入集群的??匆幌录尤氲拇a:

/**
   * Notification message indicating that this JVM has joined a cluster.
   */
  @SuppressWarnings("unchecked")
  publicstaticsynchronizedvoid joinedCluster() {
    cacheFactoryStrategy = clusteredCacheFactoryStrategy;
    // Loop through local caches and switch them to clustered cache (copy content)for (Cache cache : getAllCaches()) {
      // skip local-only cachesif (localOnly.contains(cache.getName())) continue;
      CacheWrapper cacheWrapper = ((CacheWrapper) cache);
      Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
      clusteredCache.putAll(cache);
      cacheWrapper.setWrappedCache(clusteredCache);
    }
    clusteringStarting = false;
    clusteringStarted = true;
    log.info("Clustering started; cache migration complete");
  }

這里可以看到會讀取所有的緩存容器并一個個的使用Wrapper包裝一下,然后用同樣的緩存名稱去createCache一個新的Cache,這步使用的是切換后的集群緩存策略工廠,也就是說會使用ClusteredCacheFactory去創(chuàng)建新的緩存容器。最后再將cache寫入到新的clusteredCache 里,這樣就完成了緩存的切換。

當(dāng)然這里還是要看一下ClusteredCacheFactory的createCache實現(xiàn):

public Cache createCache(String name) {
    // Check if cluster is being started upwhile (state == State.starting) {
      // Wait until cluster is fully started (or failed)try {
        Thread.sleep(250);
      }
      catch (InterruptedException e) {
        // Ignore
      }
    }
    if (state == State.stopped) {
      thrownew IllegalStateException("Cannot create clustered cache when not in a cluster");
    }
    returnnew ClusteredCache(name, hazelcast.getMap(name));
  }

這里使用的是ClusteredCache,而且最重要的是傳入的第二個map參數(shù)換成了hazelcast的了,這樣之后再訪問這個緩存容器時已經(jīng)不再是原先的本地Cache了,已經(jīng)是hazelcast的map對象。hazelcast會自動對map的數(shù)據(jù)進行同步管理,這也就完成了緩存同步的功能。

集群計算

那就看hazelcast的實現(xiàn)吧,在ClusteredCacheFactory中doClusterTask舉個例子吧

publicvoid doClusterTask(final ClusterTask task) {
    if (cluster == null) { return; }
    Set<Member> members = new HashSet<Member>();
    Member current = cluster.getLocalMember();
    for(Member member : cluster.getMembers()) {
      if (!member.getUuid().equals(current.getUuid())) {
        members.add(member);
      }
    }
    if (members.size() > 0) {
      // Asynchronously execute the task on the other cluster members
      logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName());
      hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(
        new CallableTask<Object>(task), members);
    } else {
        logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
    }
  }

過程就是,先獲取到集群中的實例成員,當(dāng)然要排除自己。然后hazelcast提供了ExecutorService來執(zhí)行這個task,方法就是submiteToMembers。這樣就提交了一個運算任務(wù)。只不過具體是如何分配計算并匯集結(jié)果倒真不太清楚。

總結(jié)

花了一天時間看了一下openfire的集群,順手就寫了一篇文章,確實也到了一些東西。和一些網(wǎng)友溝通中好像目前大家更愿意使用redies來完成緩存共享,以及通過代理來實現(xiàn)集群,而不愿意使用openfire的集群方案。這部分我沒有遇到如何大的并發(fā)量需求確實不知道區(qū)別在哪里。以后有機會還是動手試試寫一個redies的插件。

相關(guān)文章

  • Java流操作之?dāng)?shù)據(jù)流實例代碼

    Java流操作之?dāng)?shù)據(jù)流實例代碼

    這篇文章主要介紹了Java流操作之?dāng)?shù)據(jù)流實例代碼,具有一定借鑒價值,需要的朋友可以參考下
    2018-01-01
  • 詳談Springfox與swagger的整合使用

    詳談Springfox與swagger的整合使用

    下面小編就為大家?guī)硪黄斦凷pringfox與swagger的整合使用。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-08-08
  • java教程之對象序列化使用基礎(chǔ)示例詳解

    java教程之對象序列化使用基礎(chǔ)示例詳解

    所謂對象序列化就是將對象的狀態(tài)轉(zhuǎn)換成字節(jié)流,以后可以通過這些值再生成相同狀態(tài)的對象,下面詳細介紹一下java對象的序列化使用方法
    2014-01-01
  • Java實現(xiàn)注冊登錄與郵箱發(fā)送賬號驗證激活功能

    Java實現(xiàn)注冊登錄與郵箱發(fā)送賬號驗證激活功能

    這篇文章主要介紹了Java實現(xiàn)注冊登錄與郵箱發(fā)送賬號驗證激活功能,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧
    2022-12-12
  • 關(guān)于Java使用Http輕量級請求庫Unirest的方法

    關(guān)于Java使用Http輕量級請求庫Unirest的方法

    這篇文章主要介紹了關(guān)于Java使用Http輕量級請求庫Unirest的方法,Unirest 是一個輕量級的 HTTP 請求庫,可發(fā)起 GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS 請求,支持 Node、Ruby、Java、PHP、Python、Objective-C、.NET 等多種語言,需要的朋友可以參考下
    2023-08-08
  • java郵件亂碼的徹底解決方案

    java郵件亂碼的徹底解決方案

    在本篇文章里小編給大家整理的是關(guān)于java郵件亂碼的徹底解決方案,需要的朋友們可以學(xué)習(xí)下。
    2019-12-12
  • mybatis-plus常用注解@TableId和@TableField的用法

    mybatis-plus常用注解@TableId和@TableField的用法

    本文主要介紹了mybatis-plus常用注解@TableId和@TableField的用法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實例代碼

    Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實例代碼

    這篇文章主要介紹了Java 客戶端向服務(wù)端上傳mp3文件數(shù)據(jù)的實例代碼,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下
    2018-09-09
  • 關(guān)于File與MultipartFile的用法概述

    關(guān)于File與MultipartFile的用法概述

    這篇文章主要介紹了關(guān)于File與MultipartFile的用法概述,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • maven的生命周期及常用命令介紹

    maven的生命周期及常用命令介紹

    maven是一個項目構(gòu)建和管理的工具,提供了幫助管理 構(gòu)建、文檔、報告、依賴、scms、發(fā)布、分發(fā)的方法。下面通過本文給大家分享maven的生命周期及常用命令介紹,需要的朋友參考下吧
    2017-11-11

最新評論