RxJava. Обработка ошибок от нескольких Observable

Предположим, нам надо выполнить параллельно несколько запросов на сервер с помощью, RxJava и Retrofit.
Скорее всего вы будете использовать такие объединяющие операторы: как combineLatest или zip. В итоге у вас получится
примерно такая цепочка:

    val firstObservable = ServerApi().firstRequest( requestParams1 )
    val secondObservable = ServerApi().secondRequest( requestParams2 )
    val thirdObservable = ServerApi().thirdRequest( requestParams3 )

    // объединяем Observable-ы в один с помощью combineLatest
    Observable.combineLatest(
        firstObservable,
        secondObservable,
        thirdObservable,
        Function3<FirstResponse, SecondResponse, ThirdResponse, CombinedResult> { firstResponse, secondResponse, thirdResponse ->
            // объединяем и создаем результат для Function3
            CombinedResult( firstResponse, secondResponse, thirdResponse )
        }
    )
    .flatMap { combinedResult ->
        // здесь выполняем необходимые операции в случае необходимости
        Observable.just(combinedResult) // return original observable unchanged
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ combinedResult ->
        // end of request
        // show result
    }, {
       it.printStackTrace()
    })

В этой цепочке есть проблема. Если один из Observable-ов выбросит HttpException или любой другой Throwable, то вся цепочка прекратит выполнение в onError, и мы потеряем успешные данные от других запросов.
Как избежать этого? Все что нам нужно — это корректно обрабатывать ошибки от каждого из Observable-ов.
Для этого есть удобный оператор onErrorReturn() — он возвращает новый Observable в случае ошибки, без прекращения выполнения цепочки.
добавьте его вызов после каждого параметра (эммитера) в операторе combineLatest следующим образом:

    Observable.combineLatest(
        firstObservable
            .onErrorReturn { responseFromException(it) },
        secondObservable
            .onErrorReturn { responseFromException(it) },
    ....

onErrorReturn — это удобный оператор, который эмитит новый объект вместо того, чтобы ломать цепочку в случае возникновения Exception-а

Для создания новых объектов, которые мы будем эмитить, напишите специальный метод или фабрику на дженериках, подходящую под ваши бизнес-требования:

    ...
    // FirstResponse, SecondResponse and ThirdResponse should inherit from BaseResponse
    fun <T : Any> responseFromException(throwable: Throwable): BaseResponse<T> {
        var errorResponse: BaseResponse<T> = BaseResponse()
        if (throwable is HttpException) {
            errorResponse.code = it.code()
            errorResponse.message = it.message()
        }
      return errorResponse
     }

Вот и все. С помощью одной строчки для каждого из Observable-ов мы сделали нашу цепочку устойчивой к ошибкам.




Комментариев нет


You can leave the first : )



Добавить комментарий