diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadService.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadService.kt index 06420659b..c7056370e 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadService.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadService.kt @@ -8,11 +8,9 @@ import android.os.IBinder import android.os.PowerManager import androidx.annotation.StringRes import androidx.core.content.ContextCompat -import com.jakewharton.rxrelay.BehaviorRelay import eu.kanade.domain.download.service.DownloadPreferences import eu.kanade.tachiyomi.R import eu.kanade.tachiyomi.data.notification.Notifications -import eu.kanade.tachiyomi.util.lang.plusAssign import eu.kanade.tachiyomi.util.lang.withUIContext import eu.kanade.tachiyomi.util.system.acquireWakeLock import eu.kanade.tachiyomi.util.system.isConnectedToWifi @@ -32,7 +30,6 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import logcat.LogPriority import ru.beryukhov.reactivenetwork.ReactiveNetwork -import rx.subscriptions.CompositeSubscription import uy.kohesive.injekt.injectLazy /** @@ -44,11 +41,6 @@ class DownloadService : Service() { companion object { - /** - * Relay used to know when the service is running. - */ - val runningRelay: BehaviorRelay = BehaviorRelay.create(false) - private val _isRunning = MutableStateFlow(false) val isRunning = _isRunning.asStateFlow() @@ -83,7 +75,6 @@ class DownloadService : Service() { } private val downloadManager: DownloadManager by injectLazy() - private val downloadPreferences: DownloadPreferences by injectLazy() /** @@ -91,62 +82,58 @@ class DownloadService : Service() { */ private lateinit var wakeLock: PowerManager.WakeLock - private lateinit var subscriptions: CompositeSubscription private lateinit var ioScope: CoroutineScope - /** - * Called when the service is created. - */ override fun onCreate() { super.onCreate() ioScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) startForeground(Notifications.ID_DOWNLOAD_CHAPTER_PROGRESS, getPlaceholderNotification()) wakeLock = acquireWakeLock(javaClass.name) - runningRelay.call(true) _isRunning.value = true - subscriptions = CompositeSubscription() listenDownloaderState() listenNetworkChanges() } - /** - * Called when the service is destroyed. - */ override fun onDestroy() { ioScope?.cancel() - runningRelay.call(false) _isRunning.value = false - subscriptions.unsubscribe() downloadManager.stopDownloads() wakeLock.releaseIfNeeded() super.onDestroy() } - /** - * Not used. - */ + // Not used override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { return START_NOT_STICKY } - /** - * Not used. - */ + // Not used override fun onBind(intent: Intent): IBinder? { return null } + private fun stopDownloads(@StringRes string: Int) { + downloadManager.stopDownloads(getString(string)) + } + /** * Listens to network changes. - * - * @see onNetworkStateChanged */ private fun listenNetworkChanges() { ReactiveNetwork() .observeNetworkConnectivity(applicationContext) .onEach { withUIContext { - onNetworkStateChanged() + if (isOnline()) { + if (downloadPreferences.downloadOnlyOverWifi().get() && !isConnectedToWifi()) { + stopDownloads(R.string.download_notifier_text_only_wifi) + } else { + val started = downloadManager.startDownloads() + if (!started) stopSelf() + } + } else { + stopDownloads(R.string.download_notifier_no_network) + } } } .catch { error -> @@ -159,41 +146,20 @@ class DownloadService : Service() { .launchIn(ioScope) } - /** - * Called when the network state changes. - */ - private fun onNetworkStateChanged() { - if (isOnline()) { - if (downloadPreferences.downloadOnlyOverWifi().get() && !isConnectedToWifi()) { - stopDownloads(R.string.download_notifier_text_only_wifi) - } else { - val started = downloadManager.startDownloads() - if (!started) stopSelf() - } - } else { - stopDownloads(R.string.download_notifier_no_network) - } - } - - private fun stopDownloads(@StringRes string: Int) { - downloadManager.stopDownloads(getString(string)) - } - /** * Listens to downloader status. Enables or disables the wake lock depending on the status. */ private fun listenDownloaderState() { - subscriptions += downloadManager.runningRelay - .doOnError { - /* Swallow wakelock error */ - } - .subscribe { running -> - if (running) { + _isRunning + .onEach { isRunning -> + if (isRunning) { wakeLock.acquireIfNeeded() } else { wakeLock.releaseIfNeeded() } } + .catch { /* Ignore errors */ } + .launchIn(ioScope) } /** diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt index 06c3587eb..137e7dfd0 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt @@ -75,13 +75,14 @@ class DownloadQueue( Observable.from(this).filter { download -> download.status == Download.State.DOWNLOADING } @Deprecated("Use getStatusAsFlow instead") - fun getStatusObservable(): Observable = statusSubject + private fun getStatusObservable(): Observable = statusSubject .startWith(getActiveDownloads()) .onBackpressureBuffer() fun getStatusAsFlow(): Flow = getStatusObservable().asFlow() - fun getUpdatedObservable(): Observable> = updatedRelay.onBackpressureBuffer() + @Deprecated("Use getUpdatedAsFlow instead") + private fun getUpdatedObservable(): Observable> = updatedRelay.onBackpressureBuffer() .startWith(Unit) .map { this } @@ -94,7 +95,7 @@ class DownloadQueue( } @Deprecated("Use getProgressAsFlow instead") - fun getProgressObservable(): Observable { + private fun getProgressObservable(): Observable { return statusSubject.onBackpressureBuffer() .startWith(getActiveDownloads()) .flatMap { download -> @@ -113,9 +114,7 @@ class DownloadQueue( .filter { it.status == Download.State.DOWNLOADING } } - fun getProgressAsFlow(): Flow { - return getProgressObservable().asFlow() - } + fun getProgressAsFlow(): Flow = getProgressObservable().asFlow() private fun setPagesSubject(pages: List?, subject: PublishSubject?) { pages?.forEach { it.setStatusSubject(subject) } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt index 078e756e6..6521f1ed1 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt @@ -11,7 +11,6 @@ import eu.kanade.tachiyomi.util.system.logcat import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/library/LibraryPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/library/LibraryPresenter.kt index 0451d55b0..f6974ac3d 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/library/LibraryPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/library/LibraryPresenter.kt @@ -46,7 +46,6 @@ import eu.kanade.tachiyomi.source.SourceManager import eu.kanade.tachiyomi.source.model.SManga import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter -import eu.kanade.tachiyomi.util.lang.combineLatest import eu.kanade.tachiyomi.util.lang.launchIO import eu.kanade.tachiyomi.util.lang.launchNonCancellable import eu.kanade.tachiyomi.util.removeCovers @@ -687,6 +686,10 @@ class LibraryPresenter( state.selection = items.filterNot { it in selection } } + private fun Observable.combineLatest(o2: Observable, combineFn: (T, U) -> R): Observable { + return Observable.combineLatest(this, o2, combineFn) + } + sealed class Dialog { data class ChangeCategory(val manga: List, val initialSelection: List>) : Dialog() data class DeleteManga(val manga: List) : Dialog() diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt index 8ea2ed57a..a40b5e9e7 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt @@ -9,17 +9,14 @@ import eu.kanade.tachiyomi.util.lang.launchIO import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow -import rx.Observable -import rx.Subscription -import rx.android.schedulers.AndroidSchedulers -import rx.subscriptions.CompositeSubscription +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.combine import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get class MorePresenter( private val downloadManager: DownloadManager = Injekt.get(), preferences: BasePreferences = Injekt.get(), - ) : BasePresenter() { val downloadedOnly = preferences.downloadedOnly().asState() @@ -28,58 +25,26 @@ class MorePresenter( private var _state: MutableStateFlow = MutableStateFlow(DownloadQueueState.Stopped) val downloadQueueState: StateFlow = _state.asStateFlow() - private var isDownloading: Boolean = false - private var downloadQueueSize: Int = 0 - private var untilDestroySubscriptions = CompositeSubscription() - override fun onCreate(savedState: Bundle?) { super.onCreate(savedState) - if (untilDestroySubscriptions.isUnsubscribed) { - untilDestroySubscriptions = CompositeSubscription() - } - - initDownloadQueueSummary() - } - - override fun onDestroy() { - super.onDestroy() - untilDestroySubscriptions.unsubscribe() - } - - private fun initDownloadQueueSummary() { - // Handle running/paused status change - DownloadService.runningRelay - .observeOn(AndroidSchedulers.mainThread()) - .subscribeUntilDestroy { isRunning -> - isDownloading = isRunning - updateDownloadQueueState() - } - - // Handle queue progress updating - downloadManager.queue.getUpdatedObservable() - .observeOn(AndroidSchedulers.mainThread()) - .subscribeUntilDestroy { - downloadQueueSize = it.size - updateDownloadQueueState() - } - } - - private fun updateDownloadQueueState() { + // Handle running/paused status change and queue progress updating presenterScope.launchIO { - val pendingDownloadExists = downloadQueueSize != 0 - _state.value = when { - !pendingDownloadExists -> DownloadQueueState.Stopped - !isDownloading && !pendingDownloadExists -> DownloadQueueState.Paused(0) - !isDownloading && pendingDownloadExists -> DownloadQueueState.Paused(downloadQueueSize) - else -> DownloadQueueState.Downloading(downloadQueueSize) - } + combine( + DownloadService.isRunning, + downloadManager.queue.getUpdatedAsFlow(), + ) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) } + .collectLatest { (isDownloading, downloadQueueSize) -> + val pendingDownloadExists = downloadQueueSize != 0 + _state.value = when { + !pendingDownloadExists -> DownloadQueueState.Stopped + !isDownloading && !pendingDownloadExists -> DownloadQueueState.Paused(0) + !isDownloading && pendingDownloadExists -> DownloadQueueState.Paused(downloadQueueSize) + else -> DownloadQueueState.Downloading(downloadQueueSize) + } + } } } - - private fun Observable.subscribeUntilDestroy(onNext: (T) -> Unit): Subscription { - return subscribe(onNext).also { untilDestroySubscriptions.add(it) } - } } sealed class DownloadQueueState { diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt index ccbd5dfe4..46f131c93 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt @@ -33,7 +33,6 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch import logcat.LogPriority diff --git a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt b/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt index e46cc909a..dc162bf2c 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxExtensions.kt @@ -1,11 +1,6 @@ package eu.kanade.tachiyomi.util.lang -import rx.Observable import rx.Subscription import rx.subscriptions.CompositeSubscription operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription) - -fun Observable.combineLatest(o2: Observable, combineFn: (T, U) -> R): Observable { - return Observable.combineLatest(this, o2, combineFn) -}