Bug 1403022 - Abort session on BatchingUploader failures r=rnewman

The main goal of these changes is to ensure we're not doing any unnecessary work
in the unahppy cases of BatchingUploader. We might fail in three general ways:
- encounter a 412 error
- encounter another type of HTTP error
- encounter a GUID in the "failed" array

Currently, in all of these cases, we de-facto abort the session, without performing
an actual abort. E.g. we won't commit a batch, we'll refuse to upload any still-flowing
records. This patch simplifies our unhappy-case behaviour: if something failed, actually
abort the session (triggering a shutdownNow of the work queues), declare store as failed, etc.

It's important to note that our "did the synchronization fail?" login in the SynchronizerSession
depends on the store failure counts, and so this patch maintains the "record failed to store"
delegate chain. However, these counts are largely meaningless. What does it mean to fail to store
50 records, if we abort on the 51st, and prevent the other 100 from flowing (and from being counted
as failed?).

This patch also fixes an omission in the verstion tracking logic:
- prior, if we encountered a record in the "failed" array, we'd continue on with the flow, won't upload
anything, mark the synchronization as failed, but we'd also call into 'onStoreCompleted' which will
trigger an update of syncVersion for outflowing records
- with this patch, we won't call into onStoreCompleted in the case above, and so won't update syncVersion
in case of such failures
- this is the correct behaviour for batching uploads (now enabled on all but one server), but possibly
non-optimal behaviour if batching isn't enabled. However, this behaviour should be safe from a data consistency
point of view regardless of the batching mode.

MozReview-Commit-ID: LIYCPaRX8JA

--HG--
extra : rebase_source : 110224b2db85a383635db933ec6c19b21af886e7
This commit is contained in:
Grigory Kruglov 2017-09-26 17:36:22 -04:00
parent 5f143a8673
commit 95e9c477dd
4 changed files with 59 additions and 41 deletions

View File

@ -15,6 +15,7 @@ import org.mozilla.gecko.sync.Server15PreviousPostFailedException;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
import org.mozilla.gecko.sync.repositories.domain.Record;
import java.util.ArrayList;
@ -132,15 +133,6 @@ public class BatchingUploader {
return;
}
// If a record or a payload failed, we won't let subsequent requests proceed.
// This means that we may bail much earlier.
if (payloadDispatcher.recordUploadFailed) {
sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
new Server15PreviousPostFailedException(), guid
);
return;
}
final JSONObject recordJSON = record.toJSONObject();
final String payloadField = (String) recordJSON.get(CryptoRecord.KEY_PAYLOAD);
@ -283,6 +275,10 @@ public class BatchingUploader {
}
}
/* package-local */ void abort() {
repositorySession.abort();
}
/**
* Allows tests to define their own PayloadDispatcher.
*/

View File

@ -35,10 +35,8 @@ class PayloadDispatcher {
private final Executor executor;
private final BatchingUploader uploader;
// For both of these flags:
// Written from sequentially running thread(s) on the SingleThreadExecutor `executor`.
// Read by many threads running concurrently on the records consumer thread pool.
volatile boolean recordUploadFailed = false;
final AtomicBoolean storeFailed = new AtomicBoolean(false);
PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) {
@ -105,7 +103,7 @@ class PayloadDispatcher {
}
}
void lastPayloadFailed(Exception e) {
void payloadFailed(Exception e) {
doStoreFailed(e);
}
@ -131,7 +129,6 @@ class PayloadDispatcher {
void recordFailed(final Exception e, final String recordGuid) {
Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
recordUploadFailed = true;
uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
}
@ -147,7 +144,7 @@ class PayloadDispatcher {
if (storeFailed.getAndSet(true)) {
return;
}
recordUploadFailed = true;
uploader.abort();
uploader.sessionStoreDelegate.onStoreFailed(reason);
}
@ -209,7 +206,7 @@ class PayloadDispatcher {
// Instances of this class must be accessed from threads running on the `executor`.
private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider {
public boolean mayUpload() {
return !recordUploadFailed;
return !storeFailed.get();
}
}
}

View File

@ -10,6 +10,7 @@ import org.mozilla.gecko.sync.ExtendedJSONObject;
import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.NonArrayJSONException;
import org.mozilla.gecko.sync.NonObjectJSONException;
import org.mozilla.gecko.sync.Server15RecordPostFailedException;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse;
@ -165,6 +166,8 @@ class PayloadUploadDelegate implements SyncStorageRequestDelegate {
for (String guid : failed.keySet()) {
dispatcher.recordFailed(guid);
}
dispatcher.payloadFailed(new Server15RecordPostFailedException());
return;
}
// GC
failed = null;
@ -197,9 +200,6 @@ class PayloadUploadDelegate implements SyncStorageRequestDelegate {
}
// GC
postedRecordGuids = null;
if (isLastPayload) {
dispatcher.lastPayloadFailed(e);
}
dispatcher.payloadFailed(e);
}
}

View File

@ -8,6 +8,7 @@ import android.net.Uri;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.HTTPFailureException;
@ -34,9 +35,13 @@ import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
import ch.boye.httpclientandroidlib.message.BasicStatusLine;
import static org.junit.Assert.*;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(TestRunner.class)
public class PayloadUploadDelegateTest {
private RepositorySession repositorySession;
private PayloadDispatcher payloadDispatcher;
private AuthHeaderProvider authHeaderProvider;
@ -44,7 +49,7 @@ public class PayloadUploadDelegateTest {
class MockPayloadDispatcher extends PayloadDispatcher {
public final HashMap<String, Exception> failedRecords = new HashMap<>();
public boolean didLastPayloadFail = false;
public boolean didPayloadFail = false;
public ArrayList<SyncStorageResponse> successResponses = new ArrayList<>();
public int commitPayloadsSucceeded = 0;
@ -79,13 +84,13 @@ public class PayloadUploadDelegateTest {
@Override
public void recordFailed(final Exception e, final String recordGuid) {
recordUploadFailed = true;
failedRecords.put(recordGuid, e);
}
@Override
public void lastPayloadFailed(Exception e) {
didLastPayloadFail = true;
public void payloadFailed(Exception e) {
didPayloadFail = true;
super.payloadFailed(e);
}
}
@ -124,11 +129,21 @@ public class PayloadUploadDelegateTest {
@Before
public void setUp() throws Exception {
sessionStoreDelegate = new MockRepositorySessionStoreDelegate();
sessionStoreDelegate = spy(new MockRepositorySessionStoreDelegate());
repositorySession = mock(RepositorySession.class);
payloadDispatcher = new MockPayloadDispatcher(
null,
new BatchingUploader(mock(RepositorySession.class), null, sessionStoreDelegate, Uri.parse("https://example.com"), 0L, mock(InfoConfiguration.class), mock(AuthHeaderProvider.class))
new BatchingUploader(
repositorySession,
null,
sessionStoreDelegate,
Uri.parse("https://example.com"),
0L,
mock(InfoConfiguration.class),
mock(AuthHeaderProvider.class)
)
);
authHeaderProvider = mock(AuthHeaderProvider.class);
@ -145,7 +160,7 @@ public class PayloadUploadDelegateTest {
// Test that non-2* responses aren't processed
payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(404, null, null));
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
assertEquals(IllegalStateException.class,
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
assertEquals(IllegalStateException.class,
@ -163,7 +178,7 @@ public class PayloadUploadDelegateTest {
// Test that responses without X-Last-Modified header aren't processed
payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, null, null));
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
assertEquals(IllegalStateException.class,
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
assertEquals(IllegalStateException.class,
@ -181,7 +196,7 @@ public class PayloadUploadDelegateTest {
// Test that we catch json processing errors
payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, "non json body", "123"));
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
assertEquals(NonObjectJSONException.class,
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
assertEquals(NonObjectJSONException.class,
@ -235,7 +250,7 @@ public class PayloadUploadDelegateTest {
payloadUploadDelegate.handleRequestSuccess(
makeSyncStorageResponse(200, "{\"success\": [\"guid1\", \"guid2\", \"guid3\"]}", "123"));
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
@ -248,7 +263,7 @@ public class PayloadUploadDelegateTest {
payloadUploadDelegate.handleRequestSuccess(
makeSyncStorageResponse(200, "{\"success\": [\"guid4\", 5, \"guid6\"]}", "123"));
assertEquals(5, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
@ -288,12 +303,14 @@ public class PayloadUploadDelegateTest {
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertEquals(6, ((MockPayloadDispatcher) payloadDispatcher).committedGuids);
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
assertFalse(payloadDispatcher.batchWhiteboard.getInBatchingMode());
// Encountering records in the "failed" array is treated as a store failure for all posted
// records in the non-batching mode as well.
postedGuids = new ArrayList<>();
postedGuids.add("guid7");
postedGuids.add("guid8");
@ -301,13 +318,14 @@ public class PayloadUploadDelegateTest {
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
payloadUploadDelegate.handleRequestSuccess(
makeSyncStorageResponse(200, "{\"success\": [\"guid8\"], \"failed\": {\"guid7\": \"reason\"}}", "555"));
assertEquals(7, ((MockPayloadDispatcher) payloadDispatcher).committedGuids);
assertEquals(6, ((MockPayloadDispatcher) payloadDispatcher).committedGuids);
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertTrue(((MockPayloadDispatcher) payloadDispatcher).failedRecords.containsKey("guid7"));
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertEquals(4, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
assertTrue(payloadDispatcher.storeFailed.get());
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
assertFalse(payloadDispatcher.batchWhiteboard.getInBatchingMode());
}
@ -362,7 +380,7 @@ public class PayloadUploadDelegateTest {
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertEquals(8, ((MockPayloadDispatcher) payloadDispatcher).committedGuids);
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
assertEquals(4, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
@ -385,13 +403,13 @@ public class PayloadUploadDelegateTest {
assertEquals(e, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1"));
assertEquals(e, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2"));
assertEquals(e, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid3"));
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
payloadUploadDelegate = new PayloadUploadDelegate(
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
payloadUploadDelegate.handleRequestError(e);
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
}
@Test
@ -413,13 +431,19 @@ public class PayloadUploadDelegateTest {
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2").getClass());
assertEquals(HTTPFailureException.class,
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid3").getClass());
verify(repositorySession, times(1)).abort();
verify(sessionStoreDelegate, times(1)).onStoreFailed((Exception) Mockito.any());
payloadUploadDelegate = new PayloadUploadDelegate(
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertTrue(payloadDispatcher.recordUploadFailed);
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didPayloadFail);
assertTrue(payloadDispatcher.storeFailed.get());
// Ensure we'd only ever call .abort and .onStoreFailed once.
verify(repositorySession, times(1)).abort();
verify(sessionStoreDelegate, times(1)).onStoreFailed((Exception) Mockito.any());
}
@Test
@ -435,10 +459,11 @@ public class PayloadUploadDelegateTest {
payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertTrue(payloadDispatcher.recordUploadFailed);
assertTrue(payloadDispatcher.storeFailed.get());
assertTrue(((MockRepositorySessionStoreDelegate) sessionStoreDelegate).storeFailedException instanceof CollectionConcurrentModificationException);
verify(repositorySession, times(1)).abort();
verify(sessionStoreDelegate, times(1)).onStoreFailed((Exception) Mockito.any());
}
@Test