Bug 1351673 - Use a single-threaded work queue to process batching downloader work items r=rnewman

Before we'd recurse instead while fetching multiple batches, overflowing the stack on older devices.

MozReview-Commit-ID: 37BG6zGBdn0

--HG--
extra : rebase_source : 2e9d2eeeba247454051e9fe4ab875d9f9ca5e2d4
This commit is contained in:
Grigory Kruglov 2017-09-21 16:53:03 -04:00
parent 3dc36a71a4
commit 2e039ddbf5
4 changed files with 41 additions and 97 deletions

View File

@ -883,7 +883,6 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil
'sync/crypto/PersistedCrypto5Keys.java',
'sync/CryptoKeysChangedException.java',
'sync/CryptoRecord.java',
'sync/DelayedWorkTracker.java',
'sync/delegates/ClientsDataDelegate.java',
'sync/delegates/FreshStartDelegate.java',
'sync/delegates/GlobalSessionCallback.java',

View File

@ -1,69 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync;
import org.mozilla.gecko.background.common.log.Logger;
/**
* A little class to allow us to maintain a count of extant
* things (in our case, callbacks that need to fire), and
* some work that we want done when that count hits 0.
*
* @author rnewman
*
*/
public class DelayedWorkTracker {
private static final String LOG_TAG = "DelayedWorkTracker";
protected Runnable workItem = null;
protected int outstandingCount = 0;
public int incrementOutstanding() {
Logger.trace(LOG_TAG, "Incrementing outstanding.");
synchronized(this) {
return ++outstandingCount;
}
}
public int decrementOutstanding() {
Logger.trace(LOG_TAG, "Decrementing outstanding.");
Runnable job = null;
int count;
synchronized(this) {
if ((count = --outstandingCount) == 0 &&
workItem != null) {
job = workItem;
workItem = null;
} else {
return count;
}
}
job.run();
// In case it's changed.
return getOutstandingOperations();
}
public int getOutstandingOperations() {
synchronized(this) {
return outstandingCount;
}
}
public void delayWorkItem(Runnable item) {
Logger.trace(LOG_TAG, "delayWorkItem.");
boolean runnableNow = false;
synchronized(this) {
Logger.trace(LOG_TAG, "outstandingCount: " + outstandingCount);
if (outstandingCount == 0) {
runnableNow = true;
} else {
if (workItem != null) {
throw new IllegalStateException("Work item already set!");
}
workItem = item;
}
}
if (runnableNow) {
Logger.trace(LOG_TAG, "Running item now.");
item.run();
}
}
}

View File

@ -12,7 +12,6 @@ import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.DelayedWorkTracker;
import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
@ -29,6 +28,8 @@ import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
@ -58,7 +59,6 @@ public class BatchingDownloader {
private static final String DEFAULT_SORT_ORDER = "index";
private final RepositorySession repositorySession;
private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
private final Uri baseCollectionUri;
private final long fetchDeadline;
private final boolean allowMultipleBatches;
@ -73,6 +73,8 @@ public class BatchingDownloader {
protected final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
/* @GuardedBy("this") */ private String lastModified;
private final ExecutorService taskQueue = Executors.newSingleThreadExecutor();
public BatchingDownloader(
AuthHeaderProvider authHeaderProvider,
Uri baseCollectionUri,
@ -91,7 +93,7 @@ public class BatchingDownloader {
}
@VisibleForTesting
protected static String flattenIDs(String[] guids) {
/* package-private */ static String flattenIDs(String[] guids) {
// Consider using Utils.toDelimitedString if and when the signature changes
// to Collection<String> guids.
if (guids.length == 0) {
@ -110,18 +112,23 @@ public class BatchingDownloader {
}
@VisibleForTesting
protected void fetchWithParameters(long newer,
long batchLimit,
boolean full,
String sort,
String ids,
SyncStorageCollectionRequest request,
RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
protected void fetchWithParameters(final long newer,
final long batchLimit,
final boolean full,
final String sort,
final String ids,
final SyncStorageCollectionRequest request,
final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
throws URISyntaxException, UnsupportedEncodingException {
request.delegate = new BatchingDownloaderDelegate(this, fetchRecordsDelegate, request,
newer, batchLimit, full, sort, ids);
this.pending.add(request);
request.get();
runTaskOnQueue(new Runnable() {
@Override
public void run() {
request.delegate = new BatchingDownloaderDelegate(BatchingDownloader.this, fetchRecordsDelegate, request,
newer, batchLimit, full, sort, ids);
pending.add(request);
request.get();
}
});
}
@VisibleForTesting
@ -215,10 +222,10 @@ public class BatchingDownloader {
Logger.warn(LOG_TAG, "Failed to reset resume context while completing a batch");
}
this.workTracker.delayWorkItem(new Runnable() {
runTaskOnQueue(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
Logger.debug(LOG_TAG, "onFetchCompleted running.");
fetchRecordsDelegate.onFetchCompleted();
}
});
@ -240,9 +247,10 @@ public class BatchingDownloader {
// We need to make another batching request!
// Let the delegate know that a batch fetch just completed before we proceed.
// This operation needs to run after every call to onFetchedRecord for this batch has been
// processed, hence the delayWorkItem call.
this.workTracker.delayWorkItem(new Runnable() {
// Beware that while this operation will run after every call to onFetchedRecord returned,
// it's not guaranteed that the 'sink' session actually processed all of the fetched records.
// See Bug https://bugzilla.mozilla.org/show_bug.cgi?id=1351673#c28 for details.
runTaskOnQueue(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Running onBatchCompleted.");
@ -265,16 +273,21 @@ public class BatchingDownloader {
if (!this.stateProvider.commit()) {
Logger.warn(LOG_TAG, "Failed to commit repository state while handling request creation error");
}
this.workTracker.delayWorkItem(new Runnable() {
runTaskOnQueue(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
Logger.debug(LOG_TAG, "onFetchCompleted running.");
fetchRecordsDelegate.onFetchFailed(e);
}
});
}
}
@VisibleForTesting
/* package-private */ void runTaskOnQueue(Runnable task) {
taskQueue.execute(task);
}
private void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
final Exception ex) {
handleFetchFailed(fetchRecordsDelegate, ex, null);
@ -304,7 +317,7 @@ public class BatchingDownloader {
}
}
this.workTracker.delayWorkItem(new Runnable() {
runTaskOnQueue(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Running onFetchFailed.");
@ -315,8 +328,6 @@ public class BatchingDownloader {
public void onFetchedRecord(CryptoRecord record,
RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
this.workTracker.incrementOutstanding();
try {
fetchRecordsDelegate.onFetchedRecord(record);
// NB: changes to stateProvider are committed in either onFetchCompleted or handleFetchFailed.
@ -326,8 +337,6 @@ public class BatchingDownloader {
} catch (Exception ex) {
Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
throw new RuntimeException(ex);
} finally {
this.workTracker.decrementOutstanding();
}
}
@ -362,7 +371,7 @@ public class BatchingDownloader {
}
@VisibleForTesting
public static URI buildCollectionURI(Uri baseCollectionUri, boolean full, long newer, long limit, String sort, String ids, String offset) throws URISyntaxException {
/* package-private */ static URI buildCollectionURI(Uri baseCollectionUri, boolean full, long newer, long limit, String sort, String ids, String offset) throws URISyntaxException {
Uri.Builder uriBuilder = baseCollectionUri.buildUpon();
if (full) {

View File

@ -214,6 +214,11 @@ public class BatchingDownloaderTest {
this.offset = offset;
return super.makeSyncStorageCollectionRequest(newer, batchLimit, full, sort, ids, offset);
}
@Override
void runTaskOnQueue(Runnable task) {
task.run();
}
}
static class MockSever15Repository extends Server15Repository {