欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

簡(jiǎn)單談?wù)凴xJava和多線程并發(fā)

 更新時(shí)間:2017年03月03日 09:48:08   作者:zjutkz  
認(rèn)識(shí)RxJava已經(jīng)有一段時(shí)間了,但是一直沒有機(jī)會(huì)在項(xiàng)目中嘗試,最近在新的項(xiàng)目里引進(jìn)了RxJava寫一些事件處理,在review代碼的時(shí)候發(fā)現(xiàn)了一些和多線程并發(fā)相關(guān)的問(wèn)題,所以寫了這篇文章,需要的朋友可以參考借鑒,下面來(lái)一起看看吧。

前言

相信對(duì)于RxJava,大家應(yīng)該都很熟悉,他最核心的兩個(gè)字就是異步,誠(chéng)然,它對(duì)異步的處理非常的出色,但是異步絕對(duì)不等于并發(fā),更不等于線程安全,如果把這幾個(gè)概念搞混了,錯(cuò)誤的使用RxJava,是會(huì)來(lái)帶非常多的問(wèn)題的。

RxJava與并發(fā)

首先讓我們來(lái)看一段RxJava協(xié)議的原文:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

如上所述,RxJava對(duì)多線程并發(fā)其實(shí)并沒有做非常的多保護(hù),這段話中說(shuō),如果多個(gè)Observables從多個(gè)線程中發(fā)射數(shù)據(jù),必須要滿足happens-before原則。

下面來(lái)看一個(gè)簡(jiǎn)單的例子:

final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() {
 @Override
 public void onCompleted() {

 }

 @Override
 public void onError(Throwable e) {

 }

 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  Log.d("TAG", "onNext: " + unSafeCount);
 }
});

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;
  for(int i = 0;i < 10;i++) {
   new Thread(new Runnable() {
    @Override
    public void run() {
     for (int j = 0; j < 1000; j++) {
      subject.onNext(unit);
     }
    }
   }).start();
  }
 }
});

這是一個(gè)最典型的多線程問(wèn)題,從10個(gè)線程中發(fā)射數(shù)據(jù)并相加,這樣最終得到的答案是小于10000的。雖然使用了RxJava,但是這樣的使用對(duì)于并發(fā)是沒有意義的,因?yàn)镽xJava并沒有去處理并發(fā)帶來(lái)的問(wèn)題。我們可以看下subject的onNext方法的源碼,里面很簡(jiǎn)單,就是調(diào)用了對(duì)應(yīng)observer的onNext方法而已。不止是這樣,絕大多數(shù)的Subject都是線程不安全的,所以當(dāng)你在使用這樣的類的時(shí)候(典型場(chǎng)景就是自制的RxBus),如果從多個(gè)線程中發(fā)射數(shù)據(jù),那你就要小心了。

對(duì)于這樣的問(wèn)題,有兩種解決方案:

第一種就是簡(jiǎn)單的使用傳統(tǒng)的解決方法,比如用AtomicInteger代替int。

第二種則是使用RxJava的解決方案,在這里就是用SerializedSubject去代替Subject:

final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() {
 @Override
 public void onCompleted() {

 }

 @Override
 public void onError(Throwable e) {

 }

 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  count.addAndGet(integer);

  Log.d("TAG", "onNext: " + count);
 }
});

final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;

  for(int i = 0;i < 10;i++){
   new Thread(new Runnable() {
    @Override
    public void run() {
     for(int j = 0;j < 1000;j++){
      ser.onNext(unit);
     }
    }
   }).start();
  }
 }
});

可以看一下SerializedSubject的onNext方法做了什么:

@Override
public void onNext(T t) {
 if (terminated) {
  return;
 }
 synchronized (this) {
  if (terminated) {
   return;
  }
  if (emitting) {
   FastList list = queue;
   if (list == null) {
    list = new FastList();
    queue = list;
   }
   list.add(nl.next(t));
   return;
  }
  emitting = true;
 }
 try {
  actual.onNext(t);
 } catch (Throwable e) {
  terminated = true;
  Exceptions.throwOrReport(e, actual, t);
  return;
 }
 for (;;) {
  for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
   FastList list;
   synchronized (this) {
    list = queue;
    if (list == null) {
     emitting = false;
     return;
    }
    queue = null;
   }
   for (Object o : list.array) {
    if (o == null) {
     break;
    }
    try {
     if (nl.accept(actual, o)) {
      terminated = true;
      return;
     }
    } catch (Throwable e) {
     terminated = true;
     Exceptions.throwIfFatal(e);
     actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
     return;
    }
   }
  }
 }
}

處理方式很簡(jiǎn)單,如果有其他線程在發(fā)射數(shù)據(jù),那就將數(shù)據(jù)放置到隊(duì)列中,等待下次發(fā)射。這保證了同一時(shí)間只會(huì)有一個(gè)線程調(diào)用onNext,onComplete和onError這些方法。

但是這樣操作顯然是會(huì)造成性能的影響的,所以RxJava并不會(huì)把所有的操作都打上線程安全的標(biāo)簽。

在這里就要引申出一個(gè)問(wèn)題,那就是使用者對(duì)create方法的濫用,其實(shí)這個(gè)方法不應(yīng)該被使用者頻繁的調(diào)用的,因?yàn)槟惚仨氁⌒牡奶幚硭械臄?shù)據(jù)發(fā)射,接收的邏輯。相反的,使用已有的操作符能很好的解決這個(gè)問(wèn)題,所以下次大家在遇到問(wèn)題的時(shí)候不要簡(jiǎn)單的使用create去自己寫,而是應(yīng)該想想有沒有現(xiàn)成的操作符可以完成相應(yīng)的需求。

RxJava中的一些操作符

RxJava中有一些操作符也和多線程并發(fā)有關(guān),下面讓我來(lái)講一講merge和concat,以及他們的一些變種操作符。

對(duì)于多線程發(fā)射數(shù)據(jù),有時(shí)候我們需要得到的結(jié)果也保持和發(fā)射時(shí)候一樣的順序,這個(gè)時(shí)候如果我們使用merge這個(gè)操作符去結(jié)合多個(gè)發(fā)射源,那么就會(huì)產(chǎn)生一定的問(wèn)題了(例子中做了非常不好的示范——使用了create操作符,請(qǐng)大家不要學(xué)習(xí)這樣的寫法,這里單純是為了求證結(jié)果)。

Observable o1 = Observable.create(new Observable.OnSubscribe<Integer>() {
 @Override
 public void call(final Subscriber<? super Integer> subscriber) {
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     Thread.sleep(1000);
     subscriber.onNext(1);
     subscriber.onCompleted();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }).start();
 }
});
Observable o2 = Observable.create(new Observable.OnSubscribe<Integer>() {
 @Override
 public void call(Subscriber<? super Integer> subscriber) {
  subscriber.onNext(2);
  subscriber.onCompleted();
 }
});

Observable.merge(o1,o2)
  .subscribe(new Subscriber<Integer>() {
   @Override
   public void onCompleted() {

   }

   @Override
   public void onError(Throwable e) {

   }

   @Override
   public void onNext(Integer i) {
    Log.d("TAG", "onNext: " + i);
   }
  });

對(duì)于這樣的場(chǎng)景,我們得到的答案將是2,1而不是先得到o1發(fā)射的數(shù)據(jù),再獲取o2的數(shù)據(jù)。

究其原因,就是因?yàn)閙erge其實(shí)就是給什么傳什么,也不會(huì)去管數(shù)據(jù)發(fā)射的順序:

@Override
public void onNext(Observable<? extends T> t) {
  if (t == null) {
    return;
  }
  if (t == Observable.empty()) {
    emitEmpty();
  } else
  if (t instanceof ScalarSynchronousObservable) {
    tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
  } else {
    InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
    addInner(inner);
    t.unsafeSubscribe(inner);
    emit();
  }
}

可以看到在經(jīng)過(guò)lift操作之后,對(duì)應(yīng)的中間人MergeSubscriber的onNext,沒有什么多余的代碼,所以在多個(gè)Observable從多線程中發(fā)射數(shù)據(jù)的時(shí)候,順序當(dāng)然不能得到保證。

一個(gè)單詞說(shuō)明這個(gè)問(wèn)題:interleaving——交錯(cuò)。merge后的數(shù)據(jù)源可能是交錯(cuò)的。由于merge有這樣數(shù)據(jù)交錯(cuò)的問(wèn)題,所以它的變種—flatMap也會(huì)有同樣的問(wèn)題。

對(duì)于這樣的場(chǎng)景,我們可以使用concat操作符來(lái)完成:

Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.

根據(jù)文檔,我們知道concat操作符是一個(gè)接一個(gè)的處理數(shù)據(jù)源的數(shù)據(jù)的。

if (wip.getAndIncrement() != 0) {
  return;
}

final int delayErrorMode = this.delayErrorMode;

for (;;) {
  if (actual.isUnsubscribed()) {
    return;
  }

  if (!active) {
    if (delayErrorMode == BOUNDARY) {
      if (error.get() != null) {
        Throwable ex = ExceptionsUtils.terminate(error);
        if (!ExceptionsUtils.isTerminated(ex)) {
          actual.onError(ex);
        }
        return;
      }
    }

    boolean mainDone = done;
    Object v = queue.poll();
    boolean empty = v == null;

    if (mainDone && empty) {
      Throwable ex = ExceptionsUtils.terminate(error);
      if (ex == null) {
        actual.onCompleted();
      } else
      if (!ExceptionsUtils.isTerminated(ex)) {
        actual.onError(ex);
      }
      return;
    }

    if (!empty) {

      Observable<? extends R> source;

      try {
        source = mapper.call(NotificationLite.<T>instance().getValue(v));
      } catch (Throwable mapperError) {
        Exceptions.throwIfFatal(mapperError);
        drainError(mapperError);
        return;
      }

      if (source == null) {
        drainError(new NullPointerException("The source returned by the mapper was null"));
        return;
      }

      if (source != Observable.empty()) {

        if (source instanceof ScalarSynchronousObservable) {
          ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;

          active = true;

          arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
        } else {
          ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
          inner.set(innerSubscriber);

          if (!innerSubscriber.isUnsubscribed()) {
            active = true;

            source.unsafeSubscribe(innerSubscriber);
          } else {
            return;
          }
        }
        request(1);
      } else {
        request(1);
        continue;
      }
    }
  }
  if (wip.decrementAndGet() == 0) {
    break;
  }
}

通過(guò)源碼我們可以知道,active字段就保證了如果上一個(gè)數(shù)據(jù)源還沒有發(fā)射完數(shù)據(jù),就會(huì)一直在for循環(huán)中等待,直到上一個(gè)數(shù)據(jù)源發(fā)射完了數(shù)據(jù)重置了active字段。

對(duì)于concat,其實(shí)還存在一個(gè)問(wèn)題,那就是多個(gè)Observable變成了串行,會(huì)大大的增加整個(gè)RxJava事件流的處理時(shí)間,對(duì)于這個(gè)場(chǎng)景,我們可以使用concatEager來(lái)解決。concatEager的源碼就不帶大家分析了,有興趣的同學(xué)可以自行查看。

總結(jié)

這篇文章比較短,講的東西也比較淺顯,其實(shí)就是討論了一下RxJava中多線程并發(fā)的幾個(gè)問(wèn)題。最后我想說(shuō),RxJava并不是什么高大上的東西,在你的項(xiàng)目引入之前,要考慮一下是否真的有必要這么做。就算真的有場(chǎng)景需要RxJava,也請(qǐng)不要一口氣把項(xiàng)目中所有的操作都換成RxJava,一些簡(jiǎn)單的操作不一定需要使用RxJava的操作符的實(shí)現(xiàn),用了反而降低了代碼的可讀性,切勿為了使用Rx而使用Rx。

好了,以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來(lái)一定的幫助,如果有疑問(wèn)大家可以留言交流。

相關(guān)文章

  • SpringMVC實(shí)現(xiàn)注解式權(quán)限驗(yàn)證的實(shí)例

    SpringMVC實(shí)現(xiàn)注解式權(quán)限驗(yàn)證的實(shí)例

    本篇文章主要介紹了SpringMVC實(shí)現(xiàn)注解式權(quán)限驗(yàn)證的實(shí)例,可以使用Spring MVC中的action攔截器來(lái)實(shí)現(xiàn),具有一定的參考價(jià)值,有興趣的可以了解下。
    2017-02-02
  • Java計(jì)算器核心算法代碼實(shí)現(xiàn)

    Java計(jì)算器核心算法代碼實(shí)現(xiàn)

    今天小編就為大家分享一篇關(guān)于Java計(jì)算器核心算法代碼實(shí)現(xiàn),小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2019-01-01
  • 深入了解Java設(shè)計(jì)模式之策略模式

    深入了解Java設(shè)計(jì)模式之策略模式

    策略模式屬于Java-設(shè)計(jì)模式中行為模式之一,該模式定義了一系列算法,并將每個(gè)算法封裝起來(lái),使它們可以相互替換。本文將通過(guò)示例詳細(xì)講解這一模式,需要的可以參考一下
    2022-09-09
  • Maven配置倉(cāng)庫(kù)的方法步驟

    Maven配置倉(cāng)庫(kù)的方法步驟

    本文主要介紹了Maven配置倉(cāng)庫(kù)的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • SpringBoot中實(shí)現(xiàn)Druid前端監(jiān)控界面自動(dòng)登錄功能

    SpringBoot中實(shí)現(xiàn)Druid前端監(jiān)控界面自動(dòng)登錄功能

    這篇文章主要介紹了SpringBoot中實(shí)現(xiàn)Druid前端監(jiān)控界面自動(dòng)登錄功能,需要的朋友可以參考下
    2024-08-08
  • java使用FastJson解析Json數(shù)據(jù)

    java使用FastJson解析Json數(shù)據(jù)

    本篇文章主要介紹了java使用FastJson解析Json數(shù)據(jù),fastjson 是一個(gè)性能極好的用 Java 語(yǔ)言實(shí)現(xiàn)的 JSON 解析器和生成器,有興趣的可以了解一下。
    2017-02-02
  • 利用Java獲取文件名、類名、方法名和行號(hào)的方法小結(jié)

    利用Java獲取文件名、類名、方法名和行號(hào)的方法小結(jié)

    這篇文章運(yùn)用實(shí)例代碼給大家介紹了利用Java怎樣獲取文件名、類名、方法名和行號(hào),有需要的可以參考借鑒,下面一起來(lái)看看吧。
    2016-08-08
  • Spring?Boot中@Import三種使用方式實(shí)例詳解

    Spring?Boot中@Import三種使用方式實(shí)例詳解

    這篇文章主要介紹了Spring?Boot中@Import三種使用方式,主要有引入普通類,引入importSelector的實(shí)現(xiàn)類及引入importBeanDefinitionRegister的實(shí)現(xiàn)類,結(jié)合實(shí)例代碼給大家講解的非常詳細(xì),需要的朋友可以參考下
    2022-11-11
  • 如何實(shí)現(xiàn)Java中一個(gè)簡(jiǎn)單的LinkedList

    如何實(shí)現(xiàn)Java中一個(gè)簡(jiǎn)單的LinkedList

    LinkedList與ArrayList都是List接口的具體實(shí)現(xiàn)類。下面將介紹如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的LinkedList,具有很好的參考價(jià)值,下面跟著小編一起來(lái)看下吧
    2017-02-02
  • Java權(quán)重隨機(jī)的實(shí)現(xiàn)方法

    Java權(quán)重隨機(jī)的實(shí)現(xiàn)方法

    這篇文章主要介紹了Java權(quán)重隨機(jī)的實(shí)現(xiàn)方法,實(shí)例分析了權(quán)重隨機(jī)算法的原理與完整實(shí)現(xiàn)方法,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-01-01

最新評(píng)論