From e6190683dd39b818e0f15f4d1684f68a291f5758 Mon Sep 17 00:00:00 2001 From: len Date: Sun, 10 Jul 2016 12:14:30 +0200 Subject: [PATCH] Observable calls can now be retried, previously all retries were failing --- .../data/network/OkHttpExtensions.kt | 47 ++++++++++++++----- .../ui/reader/ReaderSettingsDialog.kt | 4 +- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/network/OkHttpExtensions.kt b/app/src/main/java/eu/kanade/tachiyomi/data/network/OkHttpExtensions.kt index 32b39d912..3e8ab812b 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/network/OkHttpExtensions.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/network/OkHttpExtensions.kt @@ -5,24 +5,49 @@ import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.Response import rx.Observable -import rx.subscriptions.Subscriptions -import java.io.IOException +import rx.Producer +import rx.Subscription +import java.util.concurrent.atomic.AtomicBoolean fun Call.asObservable(): Observable { return Observable.create { subscriber -> - subscriber.add(Subscriptions.create { cancel() }) + // Since Call is a one-shot type, clone it for each new subscriber. + val call = if (!isExecuted) this else { + // TODO use clone method in OkHttp 3.5 + val field = javaClass.getDeclaredField("client").apply { isAccessible = true } + val client = field.get(this) as OkHttpClient + client.newCall(request()) + } - try { - val response = execute() - if (!subscriber.isUnsubscribed) { - subscriber.onNext(response) - subscriber.onCompleted() + // Wrap the call in a helper which handles both unsubscription and backpressure. + val requestArbiter = object : AtomicBoolean(), Producer, Subscription { + override fun request(n: Long) { + if (n == 0L || !compareAndSet(false, true)) return + + try { + val response = call.execute() + if (!subscriber.isUnsubscribed) { + subscriber.onNext(response) + subscriber.onCompleted() + } + } catch (error: Exception) { + if (!subscriber.isUnsubscribed) { + subscriber.onError(error) + } + } } - } catch (error: IOException) { - if (!subscriber.isUnsubscribed) { - subscriber.onError(error) + + override fun unsubscribe() { + call.cancel() + } + + override fun isUnsubscribed(): Boolean { + return call.isCanceled } } + + subscriber.add(requestArbiter) + subscriber.setProducer(requestArbiter) } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/ReaderSettingsDialog.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/ReaderSettingsDialog.kt index 8e96f31a2..75f78806b 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/ReaderSettingsDialog.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/ReaderSettingsDialog.kt @@ -39,8 +39,7 @@ class ReaderSettingsDialog : DialogFragment() { override fun onViewCreated(view: View, savedState: Bundle?) = with(view) { viewer.onItemSelectedListener = IgnoreFirstSpinnerListener { position -> - subscriptions += Observable.timer(250, MILLISECONDS) - .observeOn(AndroidSchedulers.mainThread()) + subscriptions += Observable.timer(250, MILLISECONDS, AndroidSchedulers.mainThread()) .subscribe { (activity as ReaderActivity).presenter.updateMangaViewer(position) activity.recreate() @@ -50,7 +49,6 @@ class ReaderSettingsDialog : DialogFragment() { rotation_mode.onItemSelectedListener = IgnoreFirstSpinnerListener { position -> subscriptions += Observable.timer(250, MILLISECONDS) - .observeOn(AndroidSchedulers.mainThread()) .subscribe { preferences.rotation().set(position + 1) }