RxJava. ConcatMap для зависимых Observable

Расскажу сегодня, как оператор concatMap помогает преобразовать имеющуюся последовательную зависимую синхронную цепочку методов в реактивную и многопоточную.
Предположим, мы запрашиваем булево значение, и выполняем по результату какое-то UI событие:

  ...
  boolean state = mPresenter.getBooleanState(context)
  if(state) {
      showViewA();
    } else {
      showViewB();
  }

Класс, предоставляющий данные, пусть это будет презентер, имеет зависимую структуру методов, выполнение второго зависит от результата выполнения первого:

public class MyPresenter<MyView> {

  ...

  private int getIntValue(Context context) {
      int retValue = someCalculationMethod(context);
      return retValue;
  }
  public boolean getBooleanState(Context context) {
      int intValue = getIntValue(context);
      return performSomeCalculationWith(intValue);
  }
}

Если метод someCalculationMethod выполняется моментально, то мы можем выполнить его в главном потоке. Если же это, например, запрос к серверу, и на нем можно ожидать задержку по времени, то следует вывести его в отдельный фоновый поток.
С помощью Rx это можно легко сделать, предварительно преобразовав методы getIntValue и getBooleanState в Observable:

  private Observable<Integer> getIntValue(Context context) {
      return Observable.fromCallable(() -> {
         int retValue = someCalculationMethod(context);
         return retValue;
      });
  }

  private Observable<Boolean> getBooleanState(Context context, Integer intValue) {
     return Observable.fromCallable(() -> {
        boolean retValue = performSomeCalculationWith(intValue);
        return retValue;
     });
  }

Как же получить Observable метода getBooleanState, не имея результатов getIntValue? Вся магия  по объединению двух Observable реализуется с помощью оператора concatMap

  private Observable<Boolean> rxGetBooleanState(Context context) {
      Observable integerObservable = getIntValue(context)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

      Observable retObservable = integerObservable
        .concatMap(intValue -> getBooleanState(context, intValue));
      return retObservable;
  }

ConcatMap работает аналогично flatMap. Важная отличительная особенность — сохранение последовательности элементов.

Обратимся к документации

Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then emitting the items that result from concatenating those resulting Observables.

Посмотрим для наглядности на схемы работы оператора concatMap:

concatMap

concatMap scheme

concatMap создает одну цепочку, и выполняет на элементах исходного Observable заданную функцию (тоже Observable), сохраняя порядок элементов. Итоговый Observable содержит преобразованные элементы в соответствующей последовательности.

Теперь нам остается только вызвать наш метод из требуемого контекста:

Disposable disposable = mPresenter.rxGetBooleanState(this)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { state ->
            if (required) {
                showViewA();
            } else {
               showViewB();
            }
        }

Update
На самом деле, наиболее простой способ выполнить два зависимых Observable — это оператор .flatMap, как описано здесь: https://github.com/ReactiveX/RxJava/issues/442


getIntValue(context).flatMap( intValue -> {
   return getBooleanState(context, intValue)
})



1 комментарий

Добавить комментарий