From bd2cb97179de60dded147f1ec9cdb55f70f28e74 Mon Sep 17 00:00:00 2001 From: Two-Ai <81279822+Two-Ai@users.noreply.github.com> Date: Tue, 7 Feb 2023 22:13:19 -0500 Subject: [PATCH] Replace RxJava in DownloadQueue (#9016) * Misc cleanup - Replace !List.isEmpty with List.isNotEmpty - Remove redundant case in MoreScreenModel - Drop no-op StateFlow.catch - From lint warning: > SharedFlow never completes, so this operator typically has not > effect, it can only catch exceptions from 'onSubscribe' operator * Convert DownloadQueue queue to MutableStateFlow Replace delegation to a MutableList with an internal MutableStateFlow. In order to avoid modifying every usage of the queue as a list, add passthrough functions for the currently used list functions. This should be later refactored, possibly by inlining DownloadQueue into Downloader. DownloadQueue.updates was a SharedFlow which updated every time a change was made to the queue. This is now equivalent to the queue StateFlow. Simultaneous assignments to _state.value could cause concurrency issues. To avoid this, always modify the queue using _state.update. * Add Download.statusFlow/progressFlow progressFlow is based on the DownloadQueueScreenModel implementation rather than the DownloadQueue implementation. * Reimplement DownloadQueue.statusFlow/progressFlow Use StateFlow>.flatMapLatest() and List>.merge() to replicate the effect of PublishSubject. Use drop(1) to avoid re-emitting the state of each download each time the merged flow is recreated. * fixup! Reimplement DownloadQueue.statusFlow/progressFlow --- .../tachiyomi/data/download/Downloader.kt | 2 +- .../tachiyomi/data/download/model/Download.kt | 36 +++- .../data/download/model/DownloadQueue.kt | 154 +++++++----------- .../ui/download/DownloadQueueScreenModel.kt | 6 +- .../eu/kanade/tachiyomi/ui/more/MoreTab.kt | 5 +- .../eu/kanade/tachiyomi/source/model/Page.kt | 5 - 6 files changed, 92 insertions(+), 116 deletions(-) diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt index 9f350b7dd..887e62ef8 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt @@ -148,7 +148,7 @@ class Downloader( return } - if (notifier.paused && !queue.isEmpty()) { + if (notifier.paused && queue.isNotEmpty()) { notifier.onPaused() } else { notifier.onComplete() diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt index 38ddacddb..a66ff1103 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt @@ -5,7 +5,14 @@ import eu.kanade.domain.manga.interactor.GetManga import eu.kanade.tachiyomi.source.SourceManager import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.source.online.HttpSource -import rx.subjects.PublishSubject +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow import tachiyomi.domain.chapter.model.Chapter import tachiyomi.domain.manga.model.Manga import uy.kohesive.injekt.Injekt @@ -25,20 +32,31 @@ data class Download( @Transient var downloadedImages: Int = 0 - @Volatile @Transient - var status: State = State.NOT_DOWNLOADED + private val _statusFlow = MutableStateFlow(State.NOT_DOWNLOADED) + + @Transient + val statusFlow = _statusFlow.asStateFlow() + var status: State + get() = _statusFlow.value set(status) { - field = status - statusSubject?.onNext(this) - statusCallback?.invoke(this) + _statusFlow.value = status } @Transient - var statusSubject: PublishSubject? = null + val progressFlow = flow { + if (pages == null) { + emit(0) + while (pages == null) { + delay(50) + } + } - @Transient - var statusCallback: ((Download) -> Unit)? = null + val progressFlows = pages!!.map(Page::progressFlow) + emitAll(combine(progressFlows) { it.average().toInt() }) + } + .distinctUntilChanged() + .debounce(50) val progress: Int get() { diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt index f3d79c63a..f2877fa7e 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt @@ -1,69 +1,48 @@ package eu.kanade.tachiyomi.data.download.model -import eu.kanade.core.util.asFlow import eu.kanade.tachiyomi.data.download.DownloadStore -import eu.kanade.tachiyomi.source.model.Page -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onStart -import kotlinx.coroutines.flow.receiveAsFlow -import kotlinx.coroutines.flow.shareIn -import rx.Observable -import rx.subjects.PublishSubject -import tachiyomi.core.util.lang.launchNonCancellable +import kotlinx.coroutines.flow.update import tachiyomi.domain.chapter.model.Chapter import tachiyomi.domain.manga.model.Manga -import java.util.concurrent.CopyOnWriteArrayList class DownloadQueue( private val store: DownloadStore, - private val queue: MutableList = CopyOnWriteArrayList(), -) : List by queue { - - private val scope = CoroutineScope(Dispatchers.IO) - - private val statusSubject = PublishSubject.create() - - private val _updates: Channel = Channel(Channel.UNLIMITED) - val updates = _updates.receiveAsFlow() - .onStart { emit(Unit) } - .map { queue } - .shareIn(scope, SharingStarted.Eagerly, 1) +) { + private val _state = MutableStateFlow>(emptyList()) + val state = _state.asStateFlow() fun addAll(downloads: List) { - downloads.forEach { download -> - download.statusSubject = statusSubject - download.statusCallback = ::setPagesFor - download.status = Download.State.QUEUE - } - queue.addAll(downloads) - store.addAll(downloads) - scope.launchNonCancellable { - _updates.send(Unit) + _state.update { + downloads.forEach { download -> + download.status = Download.State.QUEUE + } + store.addAll(downloads) + it + downloads } } fun remove(download: Download) { - val removed = queue.remove(download) - store.remove(download) - download.statusSubject = null - download.statusCallback = null - if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) { - download.status = Download.State.NOT_DOWNLOADED - } - if (removed) { - scope.launchNonCancellable { - _updates.send(Unit) + _state.update { + store.remove(download) + if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) { + download.status = Download.State.NOT_DOWNLOADED } + it - download } } fun remove(chapter: Chapter) { - find { it.chapter.id == chapter.id }?.let { remove(it) } + _state.value.find { it.chapter.id == chapter.id }?.let { remove(it) } } fun remove(chapters: List) { @@ -71,61 +50,50 @@ class DownloadQueue( } fun remove(manga: Manga) { - filter { it.manga.id == manga.id }.forEach { remove(it) } + _state.value.filter { it.manga.id == manga.id }.forEach { remove(it) } } fun clear() { - queue.forEach { download -> - download.statusSubject = null - download.statusCallback = null - if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) { - download.status = Download.State.NOT_DOWNLOADED - } - } - queue.clear() - store.clear() - scope.launchNonCancellable { - _updates.send(Unit) - } - } - - fun statusFlow(): Flow = getStatusObservable().asFlow() - - fun progressFlow(): Flow = getProgressObservable().asFlow() - - private fun getActiveDownloads(): Observable = - Observable.from(this).filter { download -> download.status == Download.State.DOWNLOADING } - - private fun getStatusObservable(): Observable = statusSubject - .startWith(getActiveDownloads()) - .onBackpressureBuffer() - - private fun getProgressObservable(): Observable { - return statusSubject.onBackpressureBuffer() - .startWith(getActiveDownloads()) - .flatMap { download -> - if (download.status == Download.State.DOWNLOADING) { - val pageStatusSubject = PublishSubject.create() - setPagesSubject(download.pages, pageStatusSubject) - return@flatMap pageStatusSubject - .onBackpressureBuffer() - .filter { it == Page.State.READY } - .map { download } - } else if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) { - setPagesSubject(download.pages, null) + _state.update { + it.forEach { download -> + if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) { + download.status = Download.State.NOT_DOWNLOADED } - Observable.just(download) } - .filter { it.status == Download.State.DOWNLOADING } - } - - private fun setPagesFor(download: Download) { - if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) { - setPagesSubject(download.pages, null) + store.clear() + emptyList() } } - private fun setPagesSubject(pages: List?, subject: PublishSubject?) { - pages?.forEach { it.statusSubject = subject } - } + fun statusFlow(): Flow = state + .flatMapLatest { downloads -> + downloads + .map { download -> + download.statusFlow.drop(1).map { download } + } + .merge() + } + .onStart { emitAll(getActiveDownloads()) } + + fun progressFlow(): Flow = state + .flatMapLatest { downloads -> + downloads + .map { download -> + download.progressFlow.drop(1).map { download } + } + .merge() + } + .onStart { emitAll(getActiveDownloads()) } + + private fun getActiveDownloads(): Flow = + _state.value.filter { download -> download.status == Download.State.DOWNLOADING }.asFlow() + + fun count(predicate: (Download) -> Boolean) = _state.value.count(predicate) + fun filter(predicate: (Download) -> Boolean) = _state.value.filter(predicate) + fun find(predicate: (Download) -> Boolean) = _state.value.find(predicate) + fun groupBy(keySelector: (Download) -> K) = _state.value.groupBy(keySelector) + fun isEmpty() = _state.value.isEmpty() + fun isNotEmpty() = _state.value.isNotEmpty() + fun none(predicate: (Download) -> Boolean) = _state.value.none(predicate) + fun toMutableList() = _state.value.toMutableList() } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt index b14f31085..95d95df32 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt @@ -14,7 +14,6 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.debounce @@ -22,8 +21,6 @@ import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch -import logcat.LogPriority -import tachiyomi.core.util.system.logcat import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get @@ -116,8 +113,7 @@ class DownloadQueueScreenModel( init { coroutineScope.launch { - downloadManager.queue.updates - .catch { logcat(LogPriority.ERROR, it) } + downloadManager.queue.state .map { downloads -> downloads .groupBy { it.source } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreTab.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreTab.kt index 77412f542..e422fa0fd 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreTab.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreTab.kt @@ -95,14 +95,13 @@ private class MoreScreenModel( coroutineScope.launchIO { combine( DownloadService.isRunning, - downloadManager.queue.updates, + downloadManager.queue.state, ) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) } .collectLatest { (isDownloading, downloadQueueSize) -> val pendingDownloadExists = downloadQueueSize != 0 _state.value = when { !pendingDownloadExists -> DownloadQueueState.Stopped - !isDownloading && !pendingDownloadExists -> DownloadQueueState.Paused(0) - !isDownloading && pendingDownloadExists -> DownloadQueueState.Paused(downloadQueueSize) + !isDownloading -> DownloadQueueState.Paused(downloadQueueSize) else -> DownloadQueueState.Downloading(downloadQueueSize) } } diff --git a/source-api/src/main/java/eu/kanade/tachiyomi/source/model/Page.kt b/source-api/src/main/java/eu/kanade/tachiyomi/source/model/Page.kt index efd169e88..7ce18934f 100644 --- a/source-api/src/main/java/eu/kanade/tachiyomi/source/model/Page.kt +++ b/source-api/src/main/java/eu/kanade/tachiyomi/source/model/Page.kt @@ -6,7 +6,6 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.serialization.Serializable import kotlinx.serialization.Transient -import rx.subjects.Subject @Serializable open class Page( @@ -28,7 +27,6 @@ open class Page( get() = _statusFlow.value set(value) { _statusFlow.value = value - statusSubject?.onNext(value) } @Transient @@ -42,9 +40,6 @@ open class Page( _progressFlow.value = value } - @Transient - var statusSubject: Subject? = null - override fun update(bytesRead: Long, contentLength: Long, done: Boolean) { progress = if (contentLength > 0) { (100 * bytesRead / contentLength).toInt()