Allow to resume downloads when a connection is again available

This commit is contained in:
inorichi 2015-11-07 16:34:22 +01:00
parent 17c60644dd
commit 0f372ba069
5 changed files with 106 additions and 34 deletions

View File

@ -7,10 +7,13 @@ import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonReader;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List; import java.util.List;
import eu.kanade.mangafeed.data.models.Chapter; import eu.kanade.mangafeed.data.models.Chapter;
@ -18,6 +21,7 @@ import eu.kanade.mangafeed.data.models.Download;
import eu.kanade.mangafeed.data.models.DownloadQueue; import eu.kanade.mangafeed.data.models.DownloadQueue;
import eu.kanade.mangafeed.data.models.Manga; import eu.kanade.mangafeed.data.models.Manga;
import eu.kanade.mangafeed.data.models.Page; import eu.kanade.mangafeed.data.models.Page;
import eu.kanade.mangafeed.data.services.DownloadService;
import eu.kanade.mangafeed.events.DownloadChaptersEvent; import eu.kanade.mangafeed.events.DownloadChaptersEvent;
import eu.kanade.mangafeed.sources.base.Source; import eu.kanade.mangafeed.sources.base.Source;
import eu.kanade.mangafeed.util.DiskUtils; import eu.kanade.mangafeed.util.DiskUtils;
@ -39,7 +43,7 @@ public class DownloadManager {
private PublishSubject<Download> downloadsQueueSubject; private PublishSubject<Download> downloadsQueueSubject;
private BehaviorSubject<Integer> threadsNumber; private BehaviorSubject<Integer> threadsNumber;
private Subscription downloadsSubscription; private Subscription downloadsSubscription;
private Subscription threadNumberSubscription; private Subscription threadsNumberSubscription;
private DownloadQueue queue; private DownloadQueue queue;
private transient boolean isQueuePaused; private transient boolean isQueuePaused;
@ -53,21 +57,19 @@ public class DownloadManager {
this.gson = new Gson(); this.gson = new Gson();
queue = new DownloadQueue(); queue = new DownloadQueue();
initializeDownloadsSubscription();
} }
private void initializeDownloadsSubscription() { public void initializeSubscriptions() {
if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed()) if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed())
downloadsSubscription.unsubscribe(); downloadsSubscription.unsubscribe();
if (threadNumberSubscription != null && !threadNumberSubscription.isUnsubscribed()) if (threadsNumberSubscription != null && !threadsNumberSubscription.isUnsubscribed())
threadNumberSubscription.unsubscribe(); threadsNumberSubscription.unsubscribe();
downloadsQueueSubject = PublishSubject.create(); downloadsQueueSubject = PublishSubject.create();
threadsNumber = BehaviorSubject.create(); threadsNumber = BehaviorSubject.create();
threadNumberSubscription = preferences.getDownloadTheadsObservable() threadsNumberSubscription = preferences.getDownloadTheadsObservable()
.filter(n -> !isQueuePaused) .filter(n -> !isQueuePaused)
.doOnNext(n -> isQueuePaused = (n == 0)) .doOnNext(n -> isQueuePaused = (n == 0))
.subscribe(threadsNumber::onNext); .subscribe(threadsNumber::onNext);
@ -80,6 +82,18 @@ public class DownloadManager {
e -> Timber.e(e.fillInStackTrace(), e.getMessage())); e -> Timber.e(e.fillInStackTrace(), e.getMessage()));
} }
public void destroySubscriptions() {
if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed()) {
downloadsSubscription.unsubscribe();
downloadsSubscription = null;
}
if (threadsNumberSubscription != null && !threadsNumberSubscription.isUnsubscribed()) {
threadsNumberSubscription.unsubscribe();
threadsNumberSubscription = null;
}
}
// Create a download object for every chapter in the event and add them to the downloads queue // Create a download object for every chapter in the event and add them to the downloads queue
public void onDownloadChaptersEvent(DownloadChaptersEvent event) { public void onDownloadChaptersEvent(DownloadChaptersEvent event) {
final Manga manga = event.getManga(); final Manga manga = event.getManga();
@ -90,7 +104,6 @@ public class DownloadManager {
if (!isChapterDownloaded(download)) { if (!isChapterDownloaded(download)) {
queue.add(download); queue.add(download);
downloadsQueueSubject.onNext(download);
} }
} }
} }
@ -139,7 +152,7 @@ public class DownloadManager {
.pullPageListFromNetwork(download.chapter.url) .pullPageListFromNetwork(download.chapter.url)
.doOnNext(pages -> download.pages = pages) .doOnNext(pages -> download.pages = pages)
.doOnNext(pages -> savePageList(download)) : .doOnNext(pages -> savePageList(download)) :
// Or if the file exists, start from here // Or if the page list already exists, start from the file
Observable.just(download.pages); Observable.just(download.pages);
return pageListObservable return pageListObservable
@ -169,14 +182,19 @@ public class DownloadManager {
} }
return pageObservable return pageObservable
.doOnNext(p -> p.setImagePath(imagePath.getAbsolutePath())) // When the image is ready, set image path, progress (just in case) and status
.doOnNext(p -> p.setStatus(Page.READY)) .doOnNext(p -> {
p.setImagePath(imagePath.getAbsolutePath());
p.setProgress(100);
p.setStatus(Page.READY);
})
// If the download fails, mark this page as error
.doOnError(e -> page.setStatus(Page.ERROR)) .doOnError(e -> page.setStatus(Page.ERROR))
// Allow to download the remaining images // Allow to download the remaining images
.onErrorResumeNext(e -> Observable.just(page)); .onErrorResumeNext(e -> Observable.just(page));
} }
// Download the image // Download the image and save it to the filesystem
private Observable<Page> downloadImage(final Page page, Source source, File chapterDir, String imageFilename) { private Observable<Page> downloadImage(final Page page, Source source, File chapterDir, String imageFilename) {
return source.getImageProgressResponse(page) return source.getImageProgressResponse(page)
.flatMap(resp -> { .flatMap(resp -> {
@ -192,9 +210,15 @@ public class DownloadManager {
// Get the filename for an image given the page // Get the filename for an image given the page
private String getImageFilename(Page page) { private String getImageFilename(Page page) {
return page.getImageUrl().substring( String url;
page.getImageUrl().lastIndexOf("/") + 1, try {
page.getImageUrl().length()); url = new URL(page.getImageUrl()).getPath();
} catch (MalformedURLException e) {
url = page.getImageUrl();
}
return url.substring(
url.lastIndexOf("/") + 1,
url.length());
} }
private boolean isImageDownloaded(File imagePath) { private boolean isImageDownloaded(File imagePath) {
@ -205,10 +229,12 @@ public class DownloadManager {
private void onDownloadCompleted(final Download download) { private void onDownloadCompleted(final Download download) {
checkDownloadIsSuccessful(download); checkDownloadIsSuccessful(download);
savePageList(download); savePageList(download);
if (areAllDownloadsFinished()) {
DownloadService.stop(context);
}
} }
private void checkDownloadIsSuccessful(final Download download) { private void checkDownloadIsSuccessful(final Download download) {
int expectedProgress = download.pages.size() * 100;
int actualProgress = 0; int actualProgress = 0;
int status = Download.DOWNLOADED; int status = Download.DOWNLOADED;
// If any page has an error, the download result will be error // If any page has an error, the download result will be error
@ -216,8 +242,7 @@ public class DownloadManager {
actualProgress += page.getProgress(); actualProgress += page.getProgress();
if (page.getStatus() == Page.ERROR) status = Download.ERROR; if (page.getStatus() == Page.ERROR) status = Download.ERROR;
} }
// If the download is successful, it's safer to use the expected progress download.totalProgress = actualProgress;
download.totalProgress = (status == Download.DOWNLOADED) ? expectedProgress : actualProgress;
download.setStatus(status); download.setStatus(status);
} }
@ -227,15 +252,17 @@ public class DownloadManager {
File chapterDir = getAbsoluteChapterDirectory(source, manga, chapter); File chapterDir = getAbsoluteChapterDirectory(source, manga, chapter);
File pagesFile = new File(chapterDir, PAGE_LIST_FILE); File pagesFile = new File(chapterDir, PAGE_LIST_FILE);
JsonReader reader = null;
try { try {
if (pagesFile.exists()) { if (pagesFile.exists()) {
JsonReader reader = new JsonReader(new FileReader(pagesFile.getAbsolutePath())); reader = new JsonReader(new FileReader(pagesFile.getAbsolutePath()));
Type collectionType = new TypeToken<List<Page>>() {}.getType(); Type collectionType = new TypeToken<List<Page>>() {}.getType();
pages = gson.fromJson(reader, collectionType); pages = gson.fromJson(reader, collectionType);
reader.close();
} }
} catch (Exception e) { } catch (FileNotFoundException e) {
Timber.e(e.fillInStackTrace(), e.getMessage()); Timber.e(e.fillInStackTrace(), e.getMessage());
} finally {
if (reader != null) try { reader.close(); } catch (IOException e) { /* Do nothing */ }
} }
return pages; return pages;
} }
@ -250,14 +277,15 @@ public class DownloadManager {
File chapterDir = getAbsoluteChapterDirectory(source, manga, chapter); File chapterDir = getAbsoluteChapterDirectory(source, manga, chapter);
File pagesFile = new File(chapterDir, PAGE_LIST_FILE); File pagesFile = new File(chapterDir, PAGE_LIST_FILE);
FileOutputStream out; FileOutputStream out = null;
try { try {
out = new FileOutputStream(pagesFile); out = new FileOutputStream(pagesFile);
out.write(gson.toJson(pages).getBytes()); out.write(gson.toJson(pages).getBytes());
out.flush(); out.flush();
out.close(); } catch (IOException e) {
} catch (Exception e) {
Timber.e(e.fillInStackTrace(), e.getMessage()); Timber.e(e.fillInStackTrace(), e.getMessage());
} finally {
if (out != null) try { out.close(); } catch (IOException e) { /* Do nothing */ }
} }
} }
@ -291,12 +319,37 @@ public class DownloadManager {
return queue; return queue;
} }
public void pauseDownloads() { public boolean areAllDownloadsFinished() {
threadsNumber.onNext(0); for (Download download : queue.get()) {
if (download.getStatus() <= Download.DOWNLOADING)
return false;
}
return true;
} }
public void resumeDownloads() { public void resumeDownloads() {
isQueuePaused = false; isQueuePaused = false;
threadsNumber.onNext(preferences.getDownloadThreads()); threadsNumber.onNext(preferences.getDownloadThreads());
} }
public void pauseDownloads() {
threadsNumber.onNext(0);
}
public void startDownloads() {
if (downloadsSubscription == null || threadsNumberSubscription == null)
initializeSubscriptions();
for (Download download : queue.get()) {
if (download.getStatus() != Download.DOWNLOADED) {
download.setStatus(Download.QUEUE);
downloadsQueueSubject.onNext(download);
}
}
}
public void stopDownloads() {
destroySubscriptions();
}
} }

View File

@ -30,6 +30,14 @@ public class DownloadQueue {
return queue; return queue;
} }
public void clearSuccessfulDownloads() {
for (Download download : queue) {
if (download.getStatus() == Download.DOWNLOADED) {
remove(download);
}
}
}
public Observable<Download> getActiveDownloads() { public Observable<Download> getActiveDownloads() {
return Observable.from(queue) return Observable.from(queue)
.filter(download -> download.getStatus() == Download.DOWNLOADING); .filter(download -> download.getStatus() == Download.DOWNLOADING);

View File

@ -69,6 +69,10 @@ public class Page implements NetworkHelper.ProgressListener {
return progress; return progress;
} }
public void setProgress(int value) {
progress = value;
}
@Override @Override
public void update(long bytesRead, long contentLength, boolean done) { public void update(long bytesRead, long contentLength, boolean done) {
progress = (int) ((100 * bytesRead) / contentLength); progress = (int) ((100 * bytesRead) / contentLength);

View File

@ -13,9 +13,9 @@ import de.greenrobot.event.EventBus;
import eu.kanade.mangafeed.App; import eu.kanade.mangafeed.App;
import eu.kanade.mangafeed.data.helpers.DownloadManager; import eu.kanade.mangafeed.data.helpers.DownloadManager;
import eu.kanade.mangafeed.events.DownloadChaptersEvent; import eu.kanade.mangafeed.events.DownloadChaptersEvent;
import eu.kanade.mangafeed.util.AndroidComponentUtil;
import eu.kanade.mangafeed.util.ContentObservable; import eu.kanade.mangafeed.util.ContentObservable;
import eu.kanade.mangafeed.util.EventBusHook; import eu.kanade.mangafeed.util.EventBusHook;
import eu.kanade.mangafeed.util.NetworkUtil;
import rx.Subscription; import rx.Subscription;
public class DownloadService extends Service { public class DownloadService extends Service {
@ -28,14 +28,17 @@ public class DownloadService extends Service {
context.startService(new Intent(context, DownloadService.class)); context.startService(new Intent(context, DownloadService.class));
} }
public static boolean isRunning(Context context) { public static void stop(Context context) {
return AndroidComponentUtil.isServiceRunning(context, DownloadService.class); context.stopService(new Intent(context, DownloadService.class));
} }
@Override @Override
public void onCreate() { public void onCreate() {
super.onCreate(); super.onCreate();
App.get(this).getComponent().inject(this); App.get(this).getComponent().inject(this);
// An initial event will be fired when subscribed.
// This will cause the following download events to start or wait for a connection
listenNetworkChanges(); listenNetworkChanges();
EventBus.getDefault().registerSticky(this); EventBus.getDefault().registerSticky(this);
@ -50,6 +53,7 @@ public class DownloadService extends Service {
public void onDestroy() { public void onDestroy() {
EventBus.getDefault().unregister(this); EventBus.getDefault().unregister(this);
networkChangeSubscription.unsubscribe(); networkChangeSubscription.unsubscribe();
downloadManager.destroySubscriptions();
super.onDestroy(); super.onDestroy();
} }
@ -68,7 +72,11 @@ public class DownloadService extends Service {
IntentFilter intentFilter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION); IntentFilter intentFilter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
networkChangeSubscription = ContentObservable.fromBroadcast(this, intentFilter) networkChangeSubscription = ContentObservable.fromBroadcast(this, intentFilter)
.subscribe(state -> { .subscribe(state -> {
// TODO if (NetworkUtil.isNetworkConnected(this)) {
downloadManager.startDownloads();
} else {
downloadManager.stopDownloads();
}
}); });
} }

View File

@ -130,12 +130,11 @@ public final class DiskUtils {
try { try {
bufferedSink = Okio.buffer(Okio.sink(writeFile)); bufferedSink = Okio.buffer(Okio.sink(writeFile));
bufferedSink.writeAll(bufferedSource); bufferedSink.writeAll(bufferedSource);
} finally { bufferedSink.close();
if (bufferedSource != null) { } catch (Exception e) {
bufferedSource.close();
}
if (bufferedSink != null) { if (bufferedSink != null) {
bufferedSink.close(); bufferedSink.close();
writeFile.delete();
} }
} }