From 271de31d510f4a1aa8a30b0dbe1b83419198d180 Mon Sep 17 00:00:00 2001 From: arkon Date: Thu, 24 Dec 2020 16:39:28 -0500 Subject: [PATCH] Migrate Kitsu API to coroutines and kotlinx.serialization --- app/build.gradle | 2 - .../tachiyomi/data/track/kitsu/Kitsu.kt | 15 +- .../tachiyomi/data/track/kitsu/KitsuApi.kt | 241 ++++++++---------- .../tachiyomi/data/track/kitsu/KitsuModels.kt | 51 ++-- .../data/track/myanimelist/MyAnimeList.kt | 19 +- .../tachiyomi/util/lang/RxCoroutineBridge.kt | 37 ++- 6 files changed, 184 insertions(+), 181 deletions(-) diff --git a/app/build.gradle b/app/build.gradle index 4b658d35c..f058c0f5f 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -175,8 +175,6 @@ dependencies { final retrofit_version = '2.9.0' implementation "com.squareup.retrofit2:retrofit:$retrofit_version" implementation "com.jakewharton.retrofit:retrofit2-kotlinx-serialization-converter:0.8.0" - implementation "com.squareup.retrofit2:converter-gson:$retrofit_version" - implementation "com.squareup.retrofit2:adapter-rxjava:$retrofit_version" // JSON final kotlin_serialization_version = '1.0.1' diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/Kitsu.kt b/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/Kitsu.kt index 412f04b55..32c47892e 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/Kitsu.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/Kitsu.kt @@ -7,6 +7,7 @@ import eu.kanade.tachiyomi.R import eu.kanade.tachiyomi.data.database.models.Track import eu.kanade.tachiyomi.data.track.TrackService import eu.kanade.tachiyomi.data.track.model.TrackSearch +import eu.kanade.tachiyomi.util.lang.runAsObservable import rx.Completable import rx.Observable import uy.kohesive.injekt.injectLazy @@ -69,15 +70,15 @@ class Kitsu(private val context: Context, id: Int) : TrackService(id) { } override fun add(track: Track): Observable { - return api.addLibManga(track, getUserId()) + return runAsObservable({ api.addLibManga(track, getUserId()) }) } override fun update(track: Track): Observable { - return api.updateLibManga(track) + return runAsObservable({ api.updateLibManga(track) }) } override fun bind(track: Track): Observable { - return api.findLibManga(track, getUserId()) + return runAsObservable({ api.findLibManga(track, getUserId()) }) .flatMap { remoteTrack -> if (remoteTrack != null) { track.copyPersonalFrom(remoteTrack) @@ -92,11 +93,11 @@ class Kitsu(private val context: Context, id: Int) : TrackService(id) { } override fun search(query: String): Observable> { - return api.search(query) + return runAsObservable({ api.search(query) }) } override fun refresh(track: Track): Observable { - return api.getLibManga(track) + return runAsObservable({ api.getLibManga(track) }) .map { remoteTrack -> track.copyPersonalFrom(remoteTrack) track.total_chapters = remoteTrack.total_chapters @@ -105,9 +106,9 @@ class Kitsu(private val context: Context, id: Int) : TrackService(id) { } override fun login(username: String, password: String): Completable { - return api.login(username, password) + return runAsObservable({ api.login(username, password) }) .doOnNext { interceptor.newAuth(it) } - .flatMap { api.getCurrentUser() } + .flatMap { runAsObservable({ api.getCurrentUser() }) } .doOnNext { userId -> saveCredentials(username, userId) } .doOnError { logout() } .toCompletable() diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuApi.kt b/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuApi.kt index 63e55c65d..eafb6e5c1 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuApi.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuApi.kt @@ -1,21 +1,22 @@ package eu.kanade.tachiyomi.data.track.kitsu -import com.github.salomonbrys.kotson.array -import com.github.salomonbrys.kotson.get -import com.github.salomonbrys.kotson.int -import com.github.salomonbrys.kotson.jsonObject -import com.github.salomonbrys.kotson.obj -import com.github.salomonbrys.kotson.string -import com.google.gson.GsonBuilder -import com.google.gson.JsonObject +import com.jakewharton.retrofit2.converter.kotlinx.serialization.asConverterFactory import eu.kanade.tachiyomi.data.database.models.Track import eu.kanade.tachiyomi.data.track.model.TrackSearch import eu.kanade.tachiyomi.network.POST +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.int +import kotlinx.serialization.json.jsonArray +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import kotlinx.serialization.json.put +import kotlinx.serialization.json.putJsonObject import okhttp3.FormBody +import okhttp3.MediaType.Companion.toMediaType import okhttp3.OkHttpClient import retrofit2.Retrofit -import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory -import retrofit2.converter.gson.GsonConverterFactory import retrofit2.http.Body import retrofit2.http.Field import retrofit2.http.FormUrlEncoded @@ -26,7 +27,6 @@ import retrofit2.http.PATCH import retrofit2.http.POST import retrofit2.http.Path import retrofit2.http.Query -import rx.Observable class KitsuApi(private val client: OkHttpClient, interceptor: KitsuInterceptor) { @@ -35,196 +35,179 @@ class KitsuApi(private val client: OkHttpClient, interceptor: KitsuInterceptor) private val rest = Retrofit.Builder() .baseUrl(baseUrl) .client(authClient) - .addConverterFactory(GsonConverterFactory.create(GsonBuilder().serializeNulls().create())) - .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) + .addConverterFactory(jsonConverter) .build() .create(Rest::class.java) private val searchRest = Retrofit.Builder() .baseUrl(algoliaKeyUrl) .client(authClient) - .addConverterFactory(GsonConverterFactory.create()) - .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) + .addConverterFactory(jsonConverter) .build() .create(SearchKeyRest::class.java) private val algoliaRest = Retrofit.Builder() .baseUrl(algoliaUrl) .client(client) - .addConverterFactory(GsonConverterFactory.create()) - .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) + .addConverterFactory(jsonConverter) .build() .create(AgoliaSearchRest::class.java) - fun addLibManga(track: Track, userId: String): Observable { - return Observable.defer { - // @formatter:off - val data = jsonObject( - "type" to "libraryEntries", - "attributes" to jsonObject( - "status" to track.toKitsuStatus(), - "progress" to track.last_chapter_read - ), - "relationships" to jsonObject( - "user" to jsonObject( - "data" to jsonObject( - "id" to userId, - "type" to "users" - ) - ), - "media" to jsonObject( - "data" to jsonObject( - "id" to track.media_id, - "type" to "manga" - ) - ) - ) - ) - - rest.addLibManga(jsonObject("data" to data)) - .map { json -> - track.media_id = json["data"]["id"].int - track + suspend fun addLibManga(track: Track, userId: String): Track { + val data = buildJsonObject { + putJsonObject("data") { + put("type", "libraryEntries") + putJsonObject("attributes") { + put("status", track.toKitsuStatus()) + put("progress", track.last_chapter_read) } + putJsonObject("relationships") { + putJsonObject("user") { + putJsonObject("data") { + put("id", userId) + put("type", "users") + } + } + putJsonObject("media") { + putJsonObject("data") { + put("id", track.media_id) + put("type", "manga") + } + } + } + } + } + + val json = rest.addLibManga(data) + track.media_id = json["data"]!!.jsonObject["id"]!!.jsonPrimitive.int + return track + } + + suspend fun updateLibManga(track: Track): Track { + val data = buildJsonObject { + putJsonObject("data") { + put("type", "libraryEntries") + put("id", track.media_id) + putJsonObject("attributes") { + put("status", track.toKitsuStatus()) + put("progress", track.last_chapter_read) + put("ratingTwenty", track.toKitsuScore()) + } + } + } + + rest.updateLibManga(track.media_id, data) + return track + } + + suspend fun search(query: String): List { + val json = searchRest.getKey() + val key = json["media"]!!.jsonObject["key"]!!.jsonPrimitive.content + return algoliaSearch(key, query) + } + + private suspend fun algoliaSearch(key: String, query: String): List { + val jsonObject = buildJsonObject { + put("params", "query=$query$algoliaFilter") + } + val json = algoliaRest.getSearchQuery(algoliaAppId, key, jsonObject) + val data = json["hits"]!!.jsonArray + return data.map { KitsuSearchManga(it.jsonObject) } + .filter { it.subType != "novel" } + .map { it.toTrack() } + } + + suspend fun findLibManga(track: Track, userId: String): Track? { + val json = rest.findLibManga(track.media_id, userId) + val data = json["data"]!!.jsonArray + return if (data.size > 0) { + val manga = json["included"]!!.jsonArray[0].jsonObject + KitsuLibManga(data[0].jsonObject, manga).toTrack() + } else { + null } } - fun updateLibManga(track: Track): Observable { - return Observable.defer { - // @formatter:off - val data = jsonObject( - "type" to "libraryEntries", - "id" to track.media_id, - "attributes" to jsonObject( - "status" to track.toKitsuStatus(), - "progress" to track.last_chapter_read, - "ratingTwenty" to track.toKitsuScore() - ) - ) - // @formatter:on - - rest.updateLibManga(track.media_id, jsonObject("data" to data)) - .map { track } + suspend fun getLibManga(track: Track): Track { + val json = rest.getLibManga(track.media_id) + val data = json["data"]!!.jsonArray + return if (data.size > 0) { + val manga = json["included"]!!.jsonArray[0].jsonObject + KitsuLibManga(data[0].jsonObject, manga).toTrack() + } else { + throw Exception("Could not find manga") } } - fun search(query: String): Observable> { - return searchRest - .getKey().map { json -> - json["media"].asJsonObject["key"].string - }.flatMap { key -> - algoliaSearch(key, query) - } - } - - private fun algoliaSearch(key: String, query: String): Observable> { - val jsonObject = jsonObject("params" to "query=$query$algoliaFilter") - return algoliaRest - .getSearchQuery(algoliaAppId, key, jsonObject) - .map { json -> - val data = json["hits"].array - data.map { KitsuSearchManga(it.obj) } - .filter { it.subType != "novel" } - .map { it.toTrack() } - } - } - - fun findLibManga(track: Track, userId: String): Observable { - return rest.findLibManga(track.media_id, userId) - .map { json -> - val data = json["data"].array - if (data.size() > 0) { - val manga = json["included"].array[0].obj - KitsuLibManga(data[0].obj, manga).toTrack() - } else { - null - } - } - } - - fun getLibManga(track: Track): Observable { - return rest.getLibManga(track.media_id) - .map { json -> - val data = json["data"].array - if (data.size() > 0) { - val manga = json["included"].array[0].obj - KitsuLibManga(data[0].obj, manga).toTrack() - } else { - throw Exception("Could not find manga") - } - } - } - - fun login(username: String, password: String): Observable { + suspend fun login(username: String, password: String): OAuth { return Retrofit.Builder() .baseUrl(loginUrl) .client(client) - .addConverterFactory(GsonConverterFactory.create()) - .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) + .addConverterFactory(jsonConverter) .build() .create(LoginRest::class.java) .requestAccessToken(username, password) } - fun getCurrentUser(): Observable { - return rest.getCurrentUser().map { it["data"].array[0]["id"].string } + suspend fun getCurrentUser(): String { + return rest.getCurrentUser()["data"]!!.jsonArray[0].jsonObject["id"]!!.jsonPrimitive.content } private interface Rest { @Headers("Content-Type: application/vnd.api+json") @POST("library-entries") - fun addLibManga( + suspend fun addLibManga( @Body data: JsonObject - ): Observable + ): JsonObject @Headers("Content-Type: application/vnd.api+json") @PATCH("library-entries/{id}") - fun updateLibManga( + suspend fun updateLibManga( @Path("id") remoteId: Int, @Body data: JsonObject - ): Observable + ): JsonObject @GET("library-entries") - fun findLibManga( + suspend fun findLibManga( @Query("filter[manga_id]", encoded = true) remoteId: Int, @Query("filter[user_id]", encoded = true) userId: String, @Query("include") includes: String = "manga" - ): Observable + ): JsonObject @GET("library-entries") - fun getLibManga( + suspend fun getLibManga( @Query("filter[id]", encoded = true) remoteId: Int, @Query("include") includes: String = "manga" - ): Observable + ): JsonObject @GET("users") - fun getCurrentUser( + suspend fun getCurrentUser( @Query("filter[self]", encoded = true) self: Boolean = true - ): Observable + ): JsonObject } private interface SearchKeyRest { @GET("media/") - fun getKey(): Observable + suspend fun getKey(): JsonObject } private interface AgoliaSearchRest { @POST("query/") - fun getSearchQuery(@Header("X-Algolia-Application-Id") appid: String, @Header("X-Algolia-API-Key") key: String, @Body json: JsonObject): Observable + suspend fun getSearchQuery(@Header("X-Algolia-Application-Id") appid: String, @Header("X-Algolia-API-Key") key: String, @Body json: JsonObject): JsonObject } private interface LoginRest { @FormUrlEncoded @POST("oauth/token") - fun requestAccessToken( + suspend fun requestAccessToken( @Field("username") username: String, @Field("password") password: String, @Field("grant_type") grantType: String = "password", @Field("client_id") client_id: String = clientId, @Field("client_secret") client_secret: String = clientSecret - ): Observable + ): OAuth } companion object { @@ -238,6 +221,8 @@ class KitsuApi(private val client: OkHttpClient, interceptor: KitsuInterceptor) private const val algoliaAppId = "AWQO5J657S" private const val algoliaFilter = "&facetFilters=%5B%22kind%3Amanga%22%5D&attributesToRetrieve=%5B%22synopsis%22%2C%22canonicalTitle%22%2C%22chapterCount%22%2C%22posterImage%22%2C%22startDate%22%2C%22subtype%22%2C%22endDate%22%2C%20%22id%22%5D" + private val jsonConverter = Json { ignoreUnknownKeys = true }.asConverterFactory("application/json".toMediaType()) + fun mangaUrl(remoteId: Int): String { return baseMangaUrl + remoteId } diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuModels.kt b/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuModels.kt index 48e5d3871..91b3b94df 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuModels.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuModels.kt @@ -1,32 +1,31 @@ package eu.kanade.tachiyomi.data.track.kitsu import androidx.annotation.CallSuper -import com.github.salomonbrys.kotson.byInt -import com.github.salomonbrys.kotson.byString -import com.github.salomonbrys.kotson.nullInt -import com.github.salomonbrys.kotson.nullObj -import com.github.salomonbrys.kotson.nullString -import com.github.salomonbrys.kotson.obj -import com.google.gson.JsonObject import eu.kanade.tachiyomi.data.database.models.Track import eu.kanade.tachiyomi.data.track.TrackManager import eu.kanade.tachiyomi.data.track.model.TrackSearch +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.contentOrNull +import kotlinx.serialization.json.int +import kotlinx.serialization.json.intOrNull +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive import java.text.SimpleDateFormat import java.util.Date import java.util.Locale class KitsuSearchManga(obj: JsonObject) { - val id by obj.byInt - private val canonicalTitle by obj.byString - private val chapterCount = obj.get("chapterCount").nullInt - val subType = obj.get("subtype").nullString - val original = obj.get("posterImage").nullObj?.get("original")?.asString - private val synopsis by obj.byString - private var startDate = obj.get("startDate").nullString?.let { + val id = obj["id"]!!.jsonPrimitive.int + private val canonicalTitle = obj["canonicalTitle"]!!.jsonPrimitive.content + private val chapterCount = obj["chapterCount"]?.jsonPrimitive?.intOrNull + val subType = obj["subtype"]?.jsonPrimitive?.contentOrNull + val original = obj["posterImage"]?.jsonObject?.get("original")?.jsonPrimitive?.content + private val synopsis = obj["synopsis"]!!.jsonPrimitive.content + private var startDate = obj["startDate"]?.jsonPrimitive?.contentOrNull?.let { val outputDf = SimpleDateFormat("yyyy-MM-dd", Locale.US) outputDf.format(Date(it.toLong() * 1000)) } - private val endDate = obj.get("endDate").nullString + private val endDate = obj["endDate"]?.jsonPrimitive?.contentOrNull @CallSuper fun toTrack() = TrackSearch.create(TrackManager.KITSU).apply { @@ -47,17 +46,17 @@ class KitsuSearchManga(obj: JsonObject) { } class KitsuLibManga(obj: JsonObject, manga: JsonObject) { - val id by manga.byInt - private val canonicalTitle by manga["attributes"].byString - private val chapterCount = manga["attributes"].obj.get("chapterCount").nullInt - val type = manga["attributes"].obj.get("mangaType").nullString.orEmpty() - val original by manga["attributes"].obj["posterImage"].byString - private val synopsis by manga["attributes"].byString - private val startDate = manga["attributes"].obj.get("startDate").nullString.orEmpty() - private val libraryId by obj.byInt("id") - val status by obj["attributes"].byString - private val ratingTwenty = obj["attributes"].obj.get("ratingTwenty").nullString - val progress by obj["attributes"].byInt + val id = manga["id"]!!.jsonPrimitive.int + private val canonicalTitle = manga["attributes"]!!.jsonObject["canonicalTitle"]!!.jsonPrimitive.content + private val chapterCount = manga["attributes"]!!.jsonObject["chapterCount"]?.jsonPrimitive?.intOrNull + val type = manga["attributes"]!!.jsonObject["mangaType"]?.jsonPrimitive?.contentOrNull.orEmpty() + val original = manga["attributes"]!!.jsonObject["original"]!!.jsonObject["posterImage"]!!.jsonPrimitive.content + private val synopsis = manga["attributes"]!!.jsonObject["synopsis"]!!.jsonPrimitive.content + private val startDate = manga["attributes"]!!.jsonObject["startDate"]?.jsonPrimitive?.contentOrNull.orEmpty() + private val libraryId = obj["id"]!!.jsonPrimitive.int + val status = obj["attributes"]!!.jsonObject["status"]!!.jsonPrimitive.content + private val ratingTwenty = obj["attributes"]!!.jsonObject["ratingTwenty"]?.jsonPrimitive?.contentOrNull + val progress = obj["attributes"]!!.jsonObject["progress"]!!.jsonPrimitive.int fun toTrack() = TrackSearch.create(TrackManager.KITSU).apply { media_id = libraryId diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/track/myanimelist/MyAnimeList.kt b/app/src/main/java/eu/kanade/tachiyomi/data/track/myanimelist/MyAnimeList.kt index 9191e4c8c..3b6f8c51e 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/track/myanimelist/MyAnimeList.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/track/myanimelist/MyAnimeList.kt @@ -6,7 +6,7 @@ import eu.kanade.tachiyomi.R import eu.kanade.tachiyomi.data.database.models.Track import eu.kanade.tachiyomi.data.track.TrackService import eu.kanade.tachiyomi.data.track.model.TrackSearch -import kotlinx.coroutines.Dispatchers +import eu.kanade.tachiyomi.util.lang.runAsObservable import kotlinx.coroutines.runBlocking import kotlinx.serialization.decodeFromString import kotlinx.serialization.encodeToString @@ -68,24 +68,24 @@ class MyAnimeList(private val context: Context, id: Int) : TrackService(id) { } override fun add(track: Track): Observable { - return runAsObservable { api.addItemToList(track) } + return runAsObservable({ api.addItemToList(track) }) } override fun update(track: Track): Observable { - return runAsObservable { api.updateItem(track) } + return runAsObservable({ api.updateItem(track) }) } override fun bind(track: Track): Observable { // TODO: change this to call add and update like the other trackers? - return runAsObservable { api.getListItem(track) } + return runAsObservable({ api.getListItem(track) }) } override fun search(query: String): Observable> { - return runAsObservable { api.search(query) } + return runAsObservable({ api.search(query) }) } override fun refresh(track: Track): Observable { - return runAsObservable { api.getListItem(track) } + return runAsObservable({ api.getListItem(track) }) } override fun login(username: String, password: String) = login(password) @@ -122,11 +122,4 @@ class MyAnimeList(private val context: Context, id: Int) : TrackService(id) { null } } - - private fun runAsObservable(block: suspend () -> T): Observable { - return Observable.fromCallable { runBlocking(Dispatchers.IO) { block() } } - .subscribeOn(Schedulers.io()) - .observeOn(AndroidSchedulers.mainThread()) - .map { it } - } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt b/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt index 8ec5ccfa2..67d8e195b 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt @@ -119,7 +119,8 @@ suspend fun Single.await(): T = suspendCancellableCoroutine { cont -> suspend fun Observable.awaitFirst(): T = first().awaitOne() @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) -suspend fun Observable.awaitFirstOrDefault(default: T): T = firstOrDefault(default).awaitOne() +suspend fun Observable.awaitFirstOrDefault(default: T): T = + firstOrDefault(default).awaitOne() @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) suspend fun Observable.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne() @@ -137,7 +138,8 @@ suspend fun Observable.awaitLast(): T = last().awaitOne() @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class) suspend fun Observable.awaitSingle(): T = single().awaitOne() -suspend fun Observable.awaitSingleOrDefault(default: T): T = singleOrDefault(default).awaitOne() +suspend fun Observable.awaitSingleOrDefault(default: T): T = + singleOrDefault(default).awaitOne() suspend fun Observable.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne() @@ -203,9 +205,9 @@ fun Flow.asObservable(backpressureMode: Emitter.BackpressureMode = return Observable.create( { emitter -> /* - * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if - * asObservable is already invoked from unconfined - */ + * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if + * asObservable is already invoked from unconfined + */ val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) { try { collect { emitter.onNext(it) } @@ -224,3 +226,28 @@ fun Flow.asObservable(backpressureMode: Emitter.BackpressureMode = backpressureMode ) } + +fun runAsObservable( + block: suspend () -> T, + backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE +): Observable { + return Observable.create( + { emitter -> + val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) { + try { + emitter.onNext(block()) + emitter.onCompleted() + } catch (e: Throwable) { + // Ignore `CancellationException` as error, since it indicates "normal cancellation" + if (e !is CancellationException) { + emitter.onError(e) + } else { + emitter.onCompleted() + } + } + } + emitter.setCancellation { job.cancel() } + }, + backpressureMode + ) +}