Replace RxJava in ChapterLoader and ReaderViewModel (#8915)
* Replace RxJava in ChapterLoader * Don't swallow CancellationException * Simplify loadChapter behavior * Add error handling to loadAdjacent
This commit is contained in:
parent
e7937fe562
commit
62480f090b
@ -6,7 +6,6 @@ import android.net.Uri
|
||||
import androidx.lifecycle.SavedStateHandle
|
||||
import androidx.lifecycle.ViewModel
|
||||
import androidx.lifecycle.viewModelScope
|
||||
import eu.kanade.core.util.asFlow
|
||||
import eu.kanade.domain.base.BasePreferences
|
||||
import eu.kanade.domain.chapter.interactor.GetChapterByMangaId
|
||||
import eu.kanade.domain.chapter.interactor.UpdateChapter
|
||||
@ -60,17 +59,15 @@ import eu.kanade.tachiyomi.util.storage.DiskUtil
|
||||
import eu.kanade.tachiyomi.util.storage.cacheImageDir
|
||||
import eu.kanade.tachiyomi.util.system.isOnline
|
||||
import eu.kanade.tachiyomi.util.system.logcat
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.awaitAll
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.filterNotNull
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.firstOrNull
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
@ -79,9 +76,6 @@ import kotlinx.coroutines.flow.update
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import logcat.LogPriority
|
||||
import rx.Observable
|
||||
import rx.Subscription
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import java.util.Date
|
||||
@ -141,11 +135,6 @@ class ReaderViewModel(
|
||||
*/
|
||||
private var chapterReadStartTime: Long? = null
|
||||
|
||||
/**
|
||||
* Subscription to prevent setting chapters as active from multiple threads.
|
||||
*/
|
||||
private var activeChapterSubscription: Subscription? = null
|
||||
|
||||
private var chapterToDownload: Download? = null
|
||||
|
||||
/**
|
||||
@ -279,54 +268,49 @@ class ReaderViewModel(
|
||||
val source = sourceManager.getOrStub(manga.source)
|
||||
loader = ChapterLoader(context, downloadManager, downloadProvider, manga, source)
|
||||
|
||||
getLoadObservable(loader!!, chapterList.first { chapterId == it.chapter.id })
|
||||
.asFlow()
|
||||
.first()
|
||||
loadChapter(loader!!, chapterList.first { chapterId == it.chapter.id })
|
||||
Result.success(true)
|
||||
} else {
|
||||
// Unlikely but okay
|
||||
Result.success(false)
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
if (e is CancellationException) {
|
||||
throw e
|
||||
}
|
||||
Result.failure(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an observable that loads the given [chapter] with this [loader]. This observable
|
||||
* handles main thread synchronization and updating the currently active chapters on
|
||||
* [viewerChaptersRelay], however callers must ensure there won't be more than one
|
||||
* subscription active by unsubscribing any existing [activeChapterSubscription] before.
|
||||
* Callers must also handle the onError event.
|
||||
* Loads the given [chapter] with this [loader] and updates the currently active chapters.
|
||||
* Callers must handle errors.
|
||||
*/
|
||||
private fun getLoadObservable(
|
||||
private suspend fun loadChapter(
|
||||
loader: ChapterLoader,
|
||||
chapter: ReaderChapter,
|
||||
): Observable<ViewerChapters> {
|
||||
return loader.loadChapter(chapter)
|
||||
.andThen(
|
||||
Observable.fromCallable {
|
||||
val chapterPos = chapterList.indexOf(chapter)
|
||||
): ViewerChapters {
|
||||
loader.loadChapter(chapter)
|
||||
|
||||
ViewerChapters(
|
||||
chapter,
|
||||
chapterList.getOrNull(chapterPos - 1),
|
||||
chapterList.getOrNull(chapterPos + 1),
|
||||
)
|
||||
},
|
||||
)
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.doOnNext { newChapters ->
|
||||
mutableState.update {
|
||||
// Add new references first to avoid unnecessary recycling
|
||||
newChapters.ref()
|
||||
it.viewerChapters?.unref()
|
||||
val chapterPos = chapterList.indexOf(chapter)
|
||||
val newChapters = ViewerChapters(
|
||||
chapter,
|
||||
chapterList.getOrNull(chapterPos - 1),
|
||||
chapterList.getOrNull(chapterPos + 1),
|
||||
)
|
||||
|
||||
chapterToDownload = cancelQueuedDownloads(newChapters.currChapter)
|
||||
it.copy(viewerChapters = newChapters)
|
||||
}
|
||||
withUIContext {
|
||||
mutableState.update {
|
||||
// Add new references first to avoid unnecessary recycling
|
||||
newChapters.ref()
|
||||
it.viewerChapters?.unref()
|
||||
|
||||
chapterToDownload = cancelQueuedDownloads(newChapters.currChapter)
|
||||
it.copy(viewerChapters = newChapters)
|
||||
}
|
||||
}
|
||||
return newChapters
|
||||
}
|
||||
|
||||
/**
|
||||
@ -339,17 +323,19 @@ class ReaderViewModel(
|
||||
logcat { "Loading ${chapter.chapter.url}" }
|
||||
|
||||
withIOContext {
|
||||
getLoadObservable(loader, chapter)
|
||||
.asFlow()
|
||||
.catch { logcat(LogPriority.ERROR, it) }
|
||||
.first()
|
||||
try {
|
||||
loadChapter(loader, chapter)
|
||||
} catch (e: Throwable) {
|
||||
if (e is CancellationException) {
|
||||
throw e
|
||||
}
|
||||
logcat(LogPriority.ERROR, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the user is going to load the prev/next chapter through the menu button. It
|
||||
* sets the [isLoadingAdjacentChapterRelay] that the view uses to prevent any further
|
||||
* interaction until the chapter is loaded.
|
||||
* Called when the user is going to load the prev/next chapter through the menu button.
|
||||
*/
|
||||
private suspend fun loadAdjacent(chapter: ReaderChapter) {
|
||||
val loader = loader ?: return
|
||||
@ -357,12 +343,18 @@ class ReaderViewModel(
|
||||
logcat { "Loading adjacent ${chapter.chapter.url}" }
|
||||
|
||||
mutableState.update { it.copy(isLoadingAdjacentChapter = true) }
|
||||
withIOContext {
|
||||
getLoadObservable(loader, chapter)
|
||||
.asFlow()
|
||||
.first()
|
||||
try {
|
||||
withIOContext {
|
||||
loadChapter(loader, chapter)
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
if (e is CancellationException) {
|
||||
throw e
|
||||
}
|
||||
logcat(LogPriority.ERROR, e)
|
||||
} finally {
|
||||
mutableState.update { it.copy(isLoadingAdjacentChapter = false) }
|
||||
}
|
||||
mutableState.update { it.copy(isLoadingAdjacentChapter = false) }
|
||||
}
|
||||
|
||||
/**
|
||||
@ -393,12 +385,15 @@ class ReaderViewModel(
|
||||
|
||||
val loader = loader ?: return
|
||||
withIOContext {
|
||||
loader.loadChapter(chapter)
|
||||
.doOnCompleted { eventChannel.trySend(Event.ReloadViewerChapters) }
|
||||
.onErrorComplete()
|
||||
.toObservable<Unit>()
|
||||
.asFlow()
|
||||
.firstOrNull()
|
||||
try {
|
||||
loader.loadChapter(chapter)
|
||||
} catch (e: Throwable) {
|
||||
if (e is CancellationException) {
|
||||
throw e
|
||||
}
|
||||
return@withIOContext
|
||||
}
|
||||
eventChannel.trySend(Event.ReloadViewerChapters)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11,11 +11,9 @@ import eu.kanade.tachiyomi.source.Source
|
||||
import eu.kanade.tachiyomi.source.SourceManager
|
||||
import eu.kanade.tachiyomi.source.online.HttpSource
|
||||
import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
|
||||
import eu.kanade.tachiyomi.util.lang.awaitSingle
|
||||
import eu.kanade.tachiyomi.util.lang.withIOContext
|
||||
import eu.kanade.tachiyomi.util.system.logcat
|
||||
import rx.Completable
|
||||
import rx.Observable
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import rx.schedulers.Schedulers
|
||||
|
||||
/**
|
||||
* Loader used to retrieve the [PageLoader] for a given chapter.
|
||||
@ -29,43 +27,40 @@ class ChapterLoader(
|
||||
) {
|
||||
|
||||
/**
|
||||
* Returns a completable that assigns the page loader and loads the its pages. It just
|
||||
* completes if the chapter is already loaded.
|
||||
* Assigns the chapter's page loader and loads the its pages. Returns immediately if the chapter
|
||||
* is already loaded.
|
||||
*/
|
||||
fun loadChapter(chapter: ReaderChapter): Completable {
|
||||
suspend fun loadChapter(chapter: ReaderChapter) {
|
||||
if (chapterIsReady(chapter)) {
|
||||
return Completable.complete()
|
||||
return
|
||||
}
|
||||
|
||||
return Observable.just(chapter)
|
||||
.doOnNext { chapter.state = ReaderChapter.State.Loading }
|
||||
.observeOn(Schedulers.io())
|
||||
.flatMap { readerChapter ->
|
||||
logcat { "Loading pages for ${chapter.chapter.name}" }
|
||||
|
||||
val loader = getPageLoader(readerChapter)
|
||||
chapter.state = ReaderChapter.State.Loading
|
||||
withIOContext {
|
||||
logcat { "Loading pages for ${chapter.chapter.name}" }
|
||||
try {
|
||||
val loader = getPageLoader(chapter)
|
||||
chapter.pageLoader = loader
|
||||
|
||||
loader.getPages().take(1).doOnNext { pages ->
|
||||
pages.forEach { it.chapter = chapter }
|
||||
}
|
||||
}
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.doOnError { chapter.state = ReaderChapter.State.Error(it) }
|
||||
.doOnNext { pages ->
|
||||
val pages = loader.getPages().awaitSingle()
|
||||
.onEach { it.chapter = chapter }
|
||||
|
||||
if (pages.isEmpty()) {
|
||||
throw Exception(context.getString(R.string.page_list_empty_error))
|
||||
}
|
||||
|
||||
chapter.state = ReaderChapter.State.Loaded(pages)
|
||||
|
||||
// If the chapter is partially read, set the starting page to the last the user read
|
||||
// otherwise use the requested page.
|
||||
if (!chapter.chapter.read) {
|
||||
chapter.requestedPage = chapter.chapter.last_page_read
|
||||
}
|
||||
|
||||
chapter.state = ReaderChapter.State.Loaded(pages)
|
||||
} catch (e: Throwable) {
|
||||
chapter.state = ReaderChapter.State.Error(e)
|
||||
throw e
|
||||
}
|
||||
.toCompletable()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user