RxJava (3)


Получаем данные из Room с помощью RxJava

Привет! Сегодня расскажу как читать данные из sqlite базы с помощью Room и RxJava

Перед началом добавьте необходимые зависимости в build.gradle:

// Room components
implementation "android.arch.persistence.room:runtime:$rootProject.roomVersion"
implementation "android.arch.persistence.room:rxjava2:$rootProject.roomVersion"
annotationProcessor "android.arch.persistence.room:compiler:$rootProject.roomVersion"
androidTestImplementation "android.arch.persistence.room:testing:$rootProject.roomVersion"

// Lifecycle components
implementation "android.arch.lifecycle:extensions:$rootProject.archLifecycleVersion"
implementation "android.arch.lifecycle:reactivestreams:$rootProject.archLifecycleVersion"
annotationProcessor "android.arch.lifecycle:compiler:$rootProject.archLifecycleVersion"

// Rx
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.1'

Теперь можем взять базу из assets и определить класс Dao для данных:

@Database(entities = {Verse.class}, version = 1, exportSchema = false)
public abstract class MyRoomDatabase extends RoomDatabase {
    public abstract MyDataDao myDataDao();

    private static MyRoomDatabase INSTANCE;

    static MyRoomDatabase getDatabase(final Context context) {
        if (INSTANCE == null) {
            synchronized (MyRoomDatabase.class) {
                if (INSTANCE == null) {
                    INSTANCE = Room.databaseBuilder(context.getApplicationContext(),
                            MyRoomDatabase.class, "data.sqlite3") // get db from assets
                            .openHelperFactory(new AssetSQLiteOpenHelperFactory())
                            .build();

                }
            }
        }
        return INSTANCE;
    }

    public void destroyInstance() {
        synchronized (MyRoomDatabase.class) {
            INSTANCE = null;
        }
    }
}

@Dao
public interface MyDataDao {

    @Query("SELECT * from verses ORDER BY RANDOM() LIMIT 1")
    LiveData<Verse> getRandomVerse();

    @Query("SELECT * from verses ORDER BY RANDOM() LIMIT 1")
        // same request
    Flowable<Verse> getRxRandomVerse();
}

Мне нравится как Room позволяет удобно получать Flowable и LiveData прямо из коробки.
Все что нужно — это предоставить запрос в аннотации.
Далее мы объявляем класс репозитория следующим образом:

class MyRepository(context: Context?) {

    private val mDataDao: MyDataDao
    private val db: MyRoomDatabase? = MyRoomDatabase.getDatabase(context)

    init {
        mDataDao = db!!.myDataDao()
    }

    fun getRxRandomVerse(): Flowable<Verse> {
        return mDataDao.getRxRandomVerse
    }

    fun getDb(context: Context?): MyRoomDatabase? {
        return db ?: MyRoomDatabase.getDatabase(context)
    }

    fun closeDb() {
        db?.close()
    }
}

Наконец, получаем данные в презентере и активити:

class MainActivityPresenter {

    private var mRepository: MyRepository? = null

    fun attach(mainActivityView: MainActivityView, context: Context) {
        this.mainActivityView = mainActivityView
        this.mRepository = MyRepository(context)
    }

    fun getRandomVerse() {
        mainActivityView!!.showProgressBar()
        mRepository.getRxRandomVerse()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ verse ->
                    mainActivityView!!.hideProgressBar()
                    updateUI(verse?.toString())
                }, { error ->
                    mainActivityView!!.hideProgressBar()
                    error.printStackTrace()
                })
    }
}

class MainActivity : MainActivityPresenter.MainActivityView {

    ...

    override fun updateUI(verse: String) {
        textViewVerse.text = verse
    }

    ...
}

Вот и все в общем-то. Достаточно просто и удобно. Работу этого кода в действии вы можете в моем новом приложении генератора случайных цитат священного писания.
Если у Вас есть вопросы и комментарии, пишите.




RxJava. ConcatMap для зависимых Observable

Расскажу сегодня, как оператор concatMap помогает преобразовать имеющуюся последовательную зависимую синхронную цепочку методов в реактивную и многопоточную.
Предположим, мы запрашиваем булево значение, и выполняем по результату какое-то UI событие:

  ...
  boolean state = mPresenter.getBooleanState(context)
  if(state) {
      showViewA();
    } else {
      showViewB();
  }

Класс, предоставляющий данные, пусть это будет презентер, имеет зависимую структуру методов, выполнение второго зависит от результата выполнения первого:

public class MyPresenter<MyView> {

  ...

  private int getIntValue(Context context) {
      int retValue = someCalculationMethod(context);
      return retValue;
  }
  public boolean getBooleanState(Context context) {
      int intValue = getIntValue(context);
      return performSomeCalculationWith(intValue);
  }
}

Если метод someCalculationMethod выполняется моментально, то мы можем выполнить его в главном потоке. Если же это, например, запрос к серверу, и на нем можно ожидать задержку по времени, то следует вывести его в отдельный фоновый поток.
С помощью Rx это можно легко сделать, предварительно преобразовав методы getIntValue и getBooleanState в Observable:

  private Observable<Integer> getIntValue(Context context) {
      return Observable.fromCallable(() -> {
         int retValue = someCalculationMethod(context);
         return retValue;
      });
  }

  private Observable<Boolean> getBooleanState(Context context, Integer intValue) {
     return Observable.fromCallable(() -> {
        boolean retValue = performSomeCalculationWith(intValue);
        return retValue;
     });
  }

Как же получить Observable метода getBooleanState, не имея результатов getIntValue? Вся магия  по объединению двух Observable реализуется с помощью оператора concatMap

  private Observable<Boolean> rxGetBooleanState(Context context) {
      Observable integerObservable = getIntValue(context)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

      Observable retObservable = integerObservable
        .concatMap(intValue -> getBooleanState(context, intValue));
      return retObservable;
  }

ConcatMap работает аналогично flatMap. Важная отличительная особенность — сохранение последовательности элементов.

Обратимся к документации

Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then emitting the items that result from concatenating those resulting Observables.

Посмотрим для наглядности на схемы работы оператора concatMap:

concatMap

concatMap scheme

concatMap создает одну цепочку, и выполняет на элементах исходного Observable заданную функцию (тоже Observable), сохраняя порядок элементов. Итоговый Observable содержит преобразованные элементы в соответствующей последовательности.

Теперь нам остается только вызвать наш метод из требуемого контекста:

Disposable disposable = mPresenter.rxGetBooleanState(this)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { state ->
            if (required) {
                showViewA();
            } else {
               showViewB();
            }
        }

Update
На самом деле, наиболее простой способ выполнить два зависимых Observable — это оператор .flatMap, как описано здесь: https://github.com/ReactiveX/RxJava/issues/442


getIntValue(context).flatMap( intValue -> {
   return getBooleanState(context, intValue)
})



RxJava. Использование оператора share

Сегодня расскажу про удобный реактивный подход в обработке ввода текста в EditText. В качестве примера возьмем поисковую строку в SearchView.

Задача

Поиск обычно занимает какое-то время. Неважно, ходим мы за данными на сервер, или читаем локальную базу. Пользователь может вводить символы в поисковую строку гораздо быстрее, чем будут отрабатывать такие запросы. Поэтому нам нужно установить какое-то пороговое значение временной задержки, чаще которой не вызывать поиск. Помимо этого, нам может потребоваться отображать в UI изменения в режиме реального времени, без задержек.

Решение

Без использования Rx в Android этого можно добиться с помощью обычного Runnable и Handler, что не очень удобно и громоздко. Благодаря библиотеке RxBinding мы можем применить известный оператор debounce и получить изящное решение в виде:

RxSearchView.queryTextChanges(searchView) 
 .debounce(500, TimeUnit.MILLISECONDS) // задержка в 500 мс 
 .subscribe(query -> mPresenter.searchRequest(query));

Теперь, независимо от скорости ввода текста, запросы на сервер будут уходить не чаще двух раз в секунду.
Однако, что делать, если нам потребуется делать обновления в UI при каждом вводе нового символа? Например, менять подсказку. При текущей реализации эти обновления UI будут происходить с той же задержкой в 500 мс, что совсем не user-friendly.

На наше счастье, в Rx есть возможность широковещательного уведомления нескольких подписчиков. Для этого нужно вызвать оператор .share() на нашем Observable и вуаля!

Observable<String> sharedTextChanges = RxSearchView.queryTextChages(searchViw).share()
 
sharedTextChanges 
 .debounce(500, TimeUnit.MILLISECONDS) // use debounce 
 .subscribe(query -> mPresenter.searchRequest(query)); 
 
sharedTextChanges 
 .subscribe(query -> mPresenter.updateUI(query));

Мы два раза подписались на Observable: с debounce и без него. Теперь UI обновляется в режиме реального времени, при каждом вводе символа, а запросы на сервер уходят не чаще раза в 500 мс.

Что под капотом?

Оператор .share() — это обертка на цепью операторов .publish().refcount(). Они позволяет “расшарить” испускаемые потоком объекты. Рассмотрим эти операторы подробнее.

Оператор .publish( ) — превращает Observable в ConnectableObservable.

rxjava publish operator scheme

“ConnectedObservable” — это такой Observable, который не выпускает данные, пока на нем не вызовут оператор .connect(). Таким образом, мы можем дождаться, пока все Subscriber-ы не подпишутся на Observable, и только затем испускать данные.

Оператор .refcount() берет на себя функции по управлению несколькими подписчиками. Согласно документации,

Returns an Observable that stays connected to this ConnectableObservable as long as there is at least one subscription to this ConnectableObservable.

RxJava operator refcount

refcount() — держит в памяти количество подписчиков на результирующий Observable и не отключается от источника ConnectedObservable пока все не отпишутся.

 

Вот и все. Очень эффективно и элегантно, как всегда с rx. Надеюсь, этот пост оказался полезным для вас!