Allow to start/stop queue from download queue fragment. DownloadQueue now extends from ArrayList.
This commit is contained in:
@@ -42,11 +42,11 @@ public class DownloadManager {
|
||||
|
||||
private PublishSubject<Download> downloadsQueueSubject;
|
||||
private BehaviorSubject<Integer> threadsNumber;
|
||||
private BehaviorSubject<Boolean> runningSubject;
|
||||
private Subscription downloadsSubscription;
|
||||
private Subscription threadsNumberSubscription;
|
||||
|
||||
private DownloadQueue queue;
|
||||
private volatile boolean isQueuePaused;
|
||||
private volatile boolean isRunning;
|
||||
|
||||
public static final String PAGE_LIST_FILE = "index.json";
|
||||
@@ -61,9 +61,10 @@ public class DownloadManager {
|
||||
|
||||
downloadsQueueSubject = PublishSubject.create();
|
||||
threadsNumber = BehaviorSubject.create();
|
||||
runningSubject = BehaviorSubject.create();
|
||||
}
|
||||
|
||||
public void initializeSubscriptions() {
|
||||
private void initializeSubscriptions() {
|
||||
if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed())
|
||||
downloadsSubscription.unsubscribe();
|
||||
|
||||
@@ -71,8 +72,6 @@ public class DownloadManager {
|
||||
threadsNumberSubscription.unsubscribe();
|
||||
|
||||
threadsNumberSubscription = preferences.getDownloadTheadsObservable()
|
||||
.filter(n -> !isQueuePaused)
|
||||
.doOnNext(n -> isQueuePaused = (n == 0))
|
||||
.subscribe(threadsNumber::onNext);
|
||||
|
||||
downloadsSubscription = downloadsQueueSubject
|
||||
@@ -86,11 +85,17 @@ public class DownloadManager {
|
||||
}
|
||||
}, e -> Timber.e(e.getCause(), e.getMessage()));
|
||||
|
||||
isRunning = true;
|
||||
if (!isRunning) {
|
||||
isRunning = true;
|
||||
runningSubject.onNext(true);
|
||||
}
|
||||
}
|
||||
|
||||
public void destroySubscriptions() {
|
||||
isRunning = false;
|
||||
if (isRunning) {
|
||||
isRunning = false;
|
||||
runningSubject.onNext(false);
|
||||
}
|
||||
|
||||
if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed()) {
|
||||
downloadsSubscription.unsubscribe();
|
||||
@@ -131,7 +136,7 @@ public class DownloadManager {
|
||||
// Prepare the download. Returns true if the chapter is already downloaded
|
||||
private boolean prepareDownload(Download download) {
|
||||
// If the chapter is already queued, don't add it again
|
||||
for (Download queuedDownload : queue.get()) {
|
||||
for (Download queuedDownload : queue) {
|
||||
if (download.chapter.id.equals(queuedDownload.chapter.id))
|
||||
return true;
|
||||
}
|
||||
@@ -376,28 +381,22 @@ public class DownloadManager {
|
||||
}
|
||||
|
||||
public boolean areAllDownloadsFinished() {
|
||||
for (Download download : queue.get()) {
|
||||
for (Download download : queue) {
|
||||
if (download.getStatus() <= Download.DOWNLOADING)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public void resumeDownloads() {
|
||||
isQueuePaused = false;
|
||||
threadsNumber.onNext(preferences.getDownloadThreads());
|
||||
}
|
||||
|
||||
public void pauseDownloads() {
|
||||
threadsNumber.onNext(0);
|
||||
}
|
||||
|
||||
public boolean startDownloads() {
|
||||
if (queue.isEmpty())
|
||||
return false;
|
||||
|
||||
boolean hasPendingDownloads = false;
|
||||
if (downloadsSubscription == null || threadsNumberSubscription == null)
|
||||
initializeSubscriptions();
|
||||
|
||||
for (Download download : queue.get()) {
|
||||
for (Download download : queue) {
|
||||
if (download.getStatus() != Download.DOWNLOADED) {
|
||||
if (download.getStatus() != Download.QUEUE) download.setStatus(Download.QUEUE);
|
||||
if (!hasPendingDownloads) hasPendingDownloads = true;
|
||||
@@ -409,15 +408,15 @@ public class DownloadManager {
|
||||
|
||||
public void stopDownloads() {
|
||||
destroySubscriptions();
|
||||
for (Download download : queue.get()) {
|
||||
for (Download download : queue) {
|
||||
if (download.getStatus() == Download.DOWNLOADING) {
|
||||
download.setStatus(Download.ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return isRunning;
|
||||
public BehaviorSubject<Boolean> getRunningSubject() {
|
||||
return runningSubject;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ public class DownloadService extends Service {
|
||||
|
||||
private PowerManager.WakeLock wakeLock;
|
||||
private Subscription networkChangeSubscription;
|
||||
private Subscription queueRunningSubscription;
|
||||
|
||||
public static void start(Context context) {
|
||||
context.startService(new Intent(context, DownloadService.class));
|
||||
@@ -40,6 +41,7 @@ public class DownloadService extends Service {
|
||||
|
||||
createWakeLock();
|
||||
|
||||
listenQueueRunningChanges();
|
||||
EventBus.getDefault().registerSticky(this);
|
||||
listenNetworkChanges();
|
||||
}
|
||||
@@ -52,6 +54,7 @@ public class DownloadService extends Service {
|
||||
@Override
|
||||
public void onDestroy() {
|
||||
EventBus.getDefault().unregister(this);
|
||||
queueRunningSubscription.unsubscribe();
|
||||
networkChangeSubscription.unsubscribe();
|
||||
downloadManager.destroySubscriptions();
|
||||
destroyWakeLock();
|
||||
@@ -67,8 +70,6 @@ public class DownloadService extends Service {
|
||||
public void onEvent(DownloadChaptersEvent event) {
|
||||
EventBus.getDefault().removeStickyEvent(event);
|
||||
downloadManager.onDownloadChaptersEvent(event);
|
||||
if (downloadManager.isRunning())
|
||||
acquireWakeLock();
|
||||
}
|
||||
|
||||
private void listenNetworkChanges() {
|
||||
@@ -79,15 +80,22 @@ public class DownloadService extends Service {
|
||||
// If there are no remaining downloads, destroy the service
|
||||
if (!downloadManager.startDownloads())
|
||||
stopSelf();
|
||||
else
|
||||
acquireWakeLock();
|
||||
} else {
|
||||
downloadManager.stopDownloads();
|
||||
releaseWakeLock();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void listenQueueRunningChanges() {
|
||||
queueRunningSubscription = downloadManager.getRunningSubject()
|
||||
.subscribe(running -> {
|
||||
if (running)
|
||||
acquireWakeLock();
|
||||
else
|
||||
releaseWakeLock();
|
||||
});
|
||||
}
|
||||
|
||||
private void createWakeLock() {
|
||||
wakeLock = ((PowerManager)getSystemService(POWER_SERVICE)).newWakeLock(
|
||||
PowerManager.PARTIAL_WAKE_LOCK, "DownloadService:WakeLock");
|
||||
|
||||
@@ -8,29 +8,28 @@ import eu.kanade.mangafeed.data.source.model.Page;
|
||||
import rx.Observable;
|
||||
import rx.subjects.PublishSubject;
|
||||
|
||||
public class DownloadQueue {
|
||||
public class DownloadQueue extends ArrayList<Download> {
|
||||
|
||||
private List<Download> queue;
|
||||
private PublishSubject<Download> statusSubject;
|
||||
|
||||
public DownloadQueue() {
|
||||
queue = new ArrayList<>();
|
||||
super();
|
||||
statusSubject = PublishSubject.create();
|
||||
}
|
||||
|
||||
public void add(Download download) {
|
||||
public boolean add(Download download) {
|
||||
download.setStatusSubject(statusSubject);
|
||||
download.setStatus(Download.QUEUE);
|
||||
queue.add(download);
|
||||
return super.add(download);
|
||||
}
|
||||
|
||||
public void remove(Download download) {
|
||||
queue.remove(download);
|
||||
super.remove(download);
|
||||
download.setStatusSubject(null);
|
||||
}
|
||||
|
||||
public void remove(Chapter chapter) {
|
||||
for (Download download : queue) {
|
||||
for (Download download : this) {
|
||||
if (download.chapter.id.equals(chapter.id)) {
|
||||
remove(download);
|
||||
break;
|
||||
@@ -38,12 +37,8 @@ public class DownloadQueue {
|
||||
}
|
||||
}
|
||||
|
||||
public List<Download> get() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
public Observable<Download> getActiveDownloads() {
|
||||
return Observable.from(queue)
|
||||
return Observable.from(this)
|
||||
.filter(download -> download.getStatus() == Download.DOWNLOADING);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user