RxJava. Использование оператора share

Сегодня расскажу про удобный реактивный подход в обработке ввода текста в EditText. В качестве примера возьмем поисковую строку в SearchView.

Задача

Поиск обычно занимает какое-то время. Неважно, ходим мы за данными на сервер, или читаем локальную базу. Пользователь может вводить символы в поисковую строку гораздо быстрее, чем будут отрабатывать такие запросы. Поэтому нам нужно установить какое-то пороговое значение временной задержки, чаще которой не вызывать поиск. Помимо этого, нам может потребоваться отображать в UI изменения в режиме реального времени, без задержек.

Решение

Без использования Rx в Android этого можно добиться с помощью обычного Runnable и Handler, что не очень удобно и громоздко. Благодаря библиотеке RxBinding мы можем применить известный оператор debounce и получить изящное решение в виде:

RxSearchView.queryTextChanges(searchView) 
 .debounce(500, TimeUnit.MILLISECONDS) // задержка в 500 мс 
 .subscribe(query -> mPresenter.searchRequest(query));

Теперь, независимо от скорости ввода текста, запросы на сервер будут уходить не чаще двух раз в секунду.
Однако, что делать, если нам потребуется делать обновления в UI при каждом вводе нового символа? Например, менять подсказку. При текущей реализации эти обновления UI будут происходить с той же задержкой в 500 мс, что совсем не user-friendly.

На наше счастье, в Rx есть возможность широковещательного уведомления нескольких подписчиков. Для этого нужно вызвать оператор .share() на нашем Observable и вуаля!

Observable<String> sharedTextChanges = RxSearchView.queryTextChages(searchViw).share()
 
sharedTextChanges 
 .debounce(500, TimeUnit.MILLISECONDS) // use debounce 
 .subscribe(query -> mPresenter.searchRequest(query)); 
 
sharedTextChanges 
 .subscribe(query -> mPresenter.updateUI(query));

Мы два раза подписались на Observable: с debounce и без него. Теперь UI обновляется в режиме реального времени, при каждом вводе символа, а запросы на сервер уходят не чаще раза в 500 мс.

Что под капотом?

Оператор .share() — это обертка на цепью операторов .publish().refcount(). Они позволяет “расшарить” испускаемые потоком объекты. Рассмотрим эти операторы подробнее.

Оператор .publish( ) — превращает Observable в ConnectableObservable.

rxjava publish operator scheme

“ConnectedObservable” — это такой Observable, который не выпускает данные, пока на нем не вызовут оператор .connect(). Таким образом, мы можем дождаться, пока все Subscriber-ы не подпишутся на Observable, и только затем испускать данные.

Оператор .refcount() берет на себя функции по управлению несколькими подписчиками. Согласно документации,

Returns an Observable that stays connected to this ConnectableObservable as long as there is at least one subscription to this ConnectableObservable.

RxJava operator refcount

refcount() — держит в памяти количество подписчиков на результирующий Observable и не отключается от источника ConnectedObservable пока все не отпишутся.

 

Вот и все. Очень эффективно и элегантно, как всегда с rx. Надеюсь, этот пост оказался полезным для вас!




Комментариев нет


You can leave the first : )



Добавить комментарий