Clean up download state logic in MorePresenter
This commit is contained in:
parent
bd9a08c73d
commit
3bfbd58402
@ -8,11 +8,9 @@ import android.os.IBinder
|
||||
import android.os.PowerManager
|
||||
import androidx.annotation.StringRes
|
||||
import androidx.core.content.ContextCompat
|
||||
import com.jakewharton.rxrelay.BehaviorRelay
|
||||
import eu.kanade.domain.download.service.DownloadPreferences
|
||||
import eu.kanade.tachiyomi.R
|
||||
import eu.kanade.tachiyomi.data.notification.Notifications
|
||||
import eu.kanade.tachiyomi.util.lang.plusAssign
|
||||
import eu.kanade.tachiyomi.util.lang.withUIContext
|
||||
import eu.kanade.tachiyomi.util.system.acquireWakeLock
|
||||
import eu.kanade.tachiyomi.util.system.isConnectedToWifi
|
||||
@ -32,7 +30,6 @@ import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import logcat.LogPriority
|
||||
import ru.beryukhov.reactivenetwork.ReactiveNetwork
|
||||
import rx.subscriptions.CompositeSubscription
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
|
||||
/**
|
||||
@ -44,11 +41,6 @@ class DownloadService : Service() {
|
||||
|
||||
companion object {
|
||||
|
||||
/**
|
||||
* Relay used to know when the service is running.
|
||||
*/
|
||||
val runningRelay: BehaviorRelay<Boolean> = BehaviorRelay.create(false)
|
||||
|
||||
private val _isRunning = MutableStateFlow(false)
|
||||
val isRunning = _isRunning.asStateFlow()
|
||||
|
||||
@ -83,7 +75,6 @@ class DownloadService : Service() {
|
||||
}
|
||||
|
||||
private val downloadManager: DownloadManager by injectLazy()
|
||||
|
||||
private val downloadPreferences: DownloadPreferences by injectLazy()
|
||||
|
||||
/**
|
||||
@ -91,62 +82,58 @@ class DownloadService : Service() {
|
||||
*/
|
||||
private lateinit var wakeLock: PowerManager.WakeLock
|
||||
|
||||
private lateinit var subscriptions: CompositeSubscription
|
||||
private lateinit var ioScope: CoroutineScope
|
||||
|
||||
/**
|
||||
* Called when the service is created.
|
||||
*/
|
||||
override fun onCreate() {
|
||||
super.onCreate()
|
||||
ioScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||
startForeground(Notifications.ID_DOWNLOAD_CHAPTER_PROGRESS, getPlaceholderNotification())
|
||||
wakeLock = acquireWakeLock(javaClass.name)
|
||||
runningRelay.call(true)
|
||||
_isRunning.value = true
|
||||
subscriptions = CompositeSubscription()
|
||||
listenDownloaderState()
|
||||
listenNetworkChanges()
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the service is destroyed.
|
||||
*/
|
||||
override fun onDestroy() {
|
||||
ioScope?.cancel()
|
||||
runningRelay.call(false)
|
||||
_isRunning.value = false
|
||||
subscriptions.unsubscribe()
|
||||
downloadManager.stopDownloads()
|
||||
wakeLock.releaseIfNeeded()
|
||||
super.onDestroy()
|
||||
}
|
||||
|
||||
/**
|
||||
* Not used.
|
||||
*/
|
||||
// Not used
|
||||
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
|
||||
return START_NOT_STICKY
|
||||
}
|
||||
|
||||
/**
|
||||
* Not used.
|
||||
*/
|
||||
// Not used
|
||||
override fun onBind(intent: Intent): IBinder? {
|
||||
return null
|
||||
}
|
||||
|
||||
private fun stopDownloads(@StringRes string: Int) {
|
||||
downloadManager.stopDownloads(getString(string))
|
||||
}
|
||||
|
||||
/**
|
||||
* Listens to network changes.
|
||||
*
|
||||
* @see onNetworkStateChanged
|
||||
*/
|
||||
private fun listenNetworkChanges() {
|
||||
ReactiveNetwork()
|
||||
.observeNetworkConnectivity(applicationContext)
|
||||
.onEach {
|
||||
withUIContext {
|
||||
onNetworkStateChanged()
|
||||
if (isOnline()) {
|
||||
if (downloadPreferences.downloadOnlyOverWifi().get() && !isConnectedToWifi()) {
|
||||
stopDownloads(R.string.download_notifier_text_only_wifi)
|
||||
} else {
|
||||
val started = downloadManager.startDownloads()
|
||||
if (!started) stopSelf()
|
||||
}
|
||||
} else {
|
||||
stopDownloads(R.string.download_notifier_no_network)
|
||||
}
|
||||
}
|
||||
}
|
||||
.catch { error ->
|
||||
@ -159,41 +146,20 @@ class DownloadService : Service() {
|
||||
.launchIn(ioScope)
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the network state changes.
|
||||
*/
|
||||
private fun onNetworkStateChanged() {
|
||||
if (isOnline()) {
|
||||
if (downloadPreferences.downloadOnlyOverWifi().get() && !isConnectedToWifi()) {
|
||||
stopDownloads(R.string.download_notifier_text_only_wifi)
|
||||
} else {
|
||||
val started = downloadManager.startDownloads()
|
||||
if (!started) stopSelf()
|
||||
}
|
||||
} else {
|
||||
stopDownloads(R.string.download_notifier_no_network)
|
||||
}
|
||||
}
|
||||
|
||||
private fun stopDownloads(@StringRes string: Int) {
|
||||
downloadManager.stopDownloads(getString(string))
|
||||
}
|
||||
|
||||
/**
|
||||
* Listens to downloader status. Enables or disables the wake lock depending on the status.
|
||||
*/
|
||||
private fun listenDownloaderState() {
|
||||
subscriptions += downloadManager.runningRelay
|
||||
.doOnError {
|
||||
/* Swallow wakelock error */
|
||||
}
|
||||
.subscribe { running ->
|
||||
if (running) {
|
||||
_isRunning
|
||||
.onEach { isRunning ->
|
||||
if (isRunning) {
|
||||
wakeLock.acquireIfNeeded()
|
||||
} else {
|
||||
wakeLock.releaseIfNeeded()
|
||||
}
|
||||
}
|
||||
.catch { /* Ignore errors */ }
|
||||
.launchIn(ioScope)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -75,13 +75,14 @@ class DownloadQueue(
|
||||
Observable.from(this).filter { download -> download.status == Download.State.DOWNLOADING }
|
||||
|
||||
@Deprecated("Use getStatusAsFlow instead")
|
||||
fun getStatusObservable(): Observable<Download> = statusSubject
|
||||
private fun getStatusObservable(): Observable<Download> = statusSubject
|
||||
.startWith(getActiveDownloads())
|
||||
.onBackpressureBuffer()
|
||||
|
||||
fun getStatusAsFlow(): Flow<Download> = getStatusObservable().asFlow()
|
||||
|
||||
fun getUpdatedObservable(): Observable<List<Download>> = updatedRelay.onBackpressureBuffer()
|
||||
@Deprecated("Use getUpdatedAsFlow instead")
|
||||
private fun getUpdatedObservable(): Observable<List<Download>> = updatedRelay.onBackpressureBuffer()
|
||||
.startWith(Unit)
|
||||
.map { this }
|
||||
|
||||
@ -94,7 +95,7 @@ class DownloadQueue(
|
||||
}
|
||||
|
||||
@Deprecated("Use getProgressAsFlow instead")
|
||||
fun getProgressObservable(): Observable<Download> {
|
||||
private fun getProgressObservable(): Observable<Download> {
|
||||
return statusSubject.onBackpressureBuffer()
|
||||
.startWith(getActiveDownloads())
|
||||
.flatMap { download ->
|
||||
@ -113,9 +114,7 @@ class DownloadQueue(
|
||||
.filter { it.status == Download.State.DOWNLOADING }
|
||||
}
|
||||
|
||||
fun getProgressAsFlow(): Flow<Download> {
|
||||
return getProgressObservable().asFlow()
|
||||
}
|
||||
fun getProgressAsFlow(): Flow<Download> = getProgressObservable().asFlow()
|
||||
|
||||
private fun setPagesSubject(pages: List<Page>?, subject: PublishSubject<Int>?) {
|
||||
pages?.forEach { it.setStatusSubject(subject) }
|
||||
|
@ -11,7 +11,6 @@ import eu.kanade.tachiyomi.util.system.logcat
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.update
|
||||
import kotlinx.coroutines.launch
|
||||
|
@ -46,7 +46,6 @@ import eu.kanade.tachiyomi.source.SourceManager
|
||||
import eu.kanade.tachiyomi.source.model.SManga
|
||||
import eu.kanade.tachiyomi.source.online.HttpSource
|
||||
import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
|
||||
import eu.kanade.tachiyomi.util.lang.combineLatest
|
||||
import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
import eu.kanade.tachiyomi.util.lang.launchNonCancellable
|
||||
import eu.kanade.tachiyomi.util.removeCovers
|
||||
@ -687,6 +686,10 @@ class LibraryPresenter(
|
||||
state.selection = items.filterNot { it in selection }
|
||||
}
|
||||
|
||||
private fun <T, U, R> Observable<T>.combineLatest(o2: Observable<U>, combineFn: (T, U) -> R): Observable<R> {
|
||||
return Observable.combineLatest(this, o2, combineFn)
|
||||
}
|
||||
|
||||
sealed class Dialog {
|
||||
data class ChangeCategory(val manga: List<Manga>, val initialSelection: List<CheckboxState<Category>>) : Dialog()
|
||||
data class DeleteManga(val manga: List<Manga>) : Dialog()
|
||||
|
@ -9,17 +9,14 @@ import eu.kanade.tachiyomi.util.lang.launchIO
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import rx.Observable
|
||||
import rx.Subscription
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import rx.subscriptions.CompositeSubscription
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
|
||||
class MorePresenter(
|
||||
private val downloadManager: DownloadManager = Injekt.get(),
|
||||
preferences: BasePreferences = Injekt.get(),
|
||||
|
||||
) : BasePresenter<MoreController>() {
|
||||
|
||||
val downloadedOnly = preferences.downloadedOnly().asState()
|
||||
@ -28,58 +25,26 @@ class MorePresenter(
|
||||
private var _state: MutableStateFlow<DownloadQueueState> = MutableStateFlow(DownloadQueueState.Stopped)
|
||||
val downloadQueueState: StateFlow<DownloadQueueState> = _state.asStateFlow()
|
||||
|
||||
private var isDownloading: Boolean = false
|
||||
private var downloadQueueSize: Int = 0
|
||||
private var untilDestroySubscriptions = CompositeSubscription()
|
||||
|
||||
override fun onCreate(savedState: Bundle?) {
|
||||
super.onCreate(savedState)
|
||||
|
||||
if (untilDestroySubscriptions.isUnsubscribed) {
|
||||
untilDestroySubscriptions = CompositeSubscription()
|
||||
}
|
||||
|
||||
initDownloadQueueSummary()
|
||||
}
|
||||
|
||||
override fun onDestroy() {
|
||||
super.onDestroy()
|
||||
untilDestroySubscriptions.unsubscribe()
|
||||
}
|
||||
|
||||
private fun initDownloadQueueSummary() {
|
||||
// Handle running/paused status change
|
||||
DownloadService.runningRelay
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribeUntilDestroy { isRunning ->
|
||||
isDownloading = isRunning
|
||||
updateDownloadQueueState()
|
||||
}
|
||||
|
||||
// Handle queue progress updating
|
||||
downloadManager.queue.getUpdatedObservable()
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribeUntilDestroy {
|
||||
downloadQueueSize = it.size
|
||||
updateDownloadQueueState()
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateDownloadQueueState() {
|
||||
// Handle running/paused status change and queue progress updating
|
||||
presenterScope.launchIO {
|
||||
val pendingDownloadExists = downloadQueueSize != 0
|
||||
_state.value = when {
|
||||
!pendingDownloadExists -> DownloadQueueState.Stopped
|
||||
!isDownloading && !pendingDownloadExists -> DownloadQueueState.Paused(0)
|
||||
!isDownloading && pendingDownloadExists -> DownloadQueueState.Paused(downloadQueueSize)
|
||||
else -> DownloadQueueState.Downloading(downloadQueueSize)
|
||||
}
|
||||
combine(
|
||||
DownloadService.isRunning,
|
||||
downloadManager.queue.getUpdatedAsFlow(),
|
||||
) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) }
|
||||
.collectLatest { (isDownloading, downloadQueueSize) ->
|
||||
val pendingDownloadExists = downloadQueueSize != 0
|
||||
_state.value = when {
|
||||
!pendingDownloadExists -> DownloadQueueState.Stopped
|
||||
!isDownloading && !pendingDownloadExists -> DownloadQueueState.Paused(0)
|
||||
!isDownloading && pendingDownloadExists -> DownloadQueueState.Paused(downloadQueueSize)
|
||||
else -> DownloadQueueState.Downloading(downloadQueueSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T> Observable<T>.subscribeUntilDestroy(onNext: (T) -> Unit): Subscription {
|
||||
return subscribe(onNext).also { untilDestroySubscriptions.add(it) }
|
||||
}
|
||||
}
|
||||
|
||||
sealed class DownloadQueueState {
|
||||
|
@ -33,7 +33,6 @@ import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.receiveAsFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import logcat.LogPriority
|
||||
|
@ -1,11 +1,6 @@
|
||||
package eu.kanade.tachiyomi.util.lang
|
||||
|
||||
import rx.Observable
|
||||
import rx.Subscription
|
||||
import rx.subscriptions.CompositeSubscription
|
||||
|
||||
operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription)
|
||||
|
||||
fun <T, U, R> Observable<T>.combineLatest(o2: Observable<U>, combineFn: (T, U) -> R): Observable<R> {
|
||||
return Observable.combineLatest(this, o2, combineFn)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user