mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-12-02 01:48:05 +00:00
Bug 609398 - Get rid of partial sync [r=mconnor]
This commit is contained in:
parent
fb0b837cdf
commit
80f1e34958
@ -62,6 +62,10 @@ MULTI_DESKTOP_SYNC: 60 * 60 * 1000, // 1 hour
|
||||
MULTI_MOBILE_SYNC: 5 * 60 * 1000, // 5 minutes
|
||||
PARTIAL_DATA_SYNC: 60 * 1000, // 1 minute
|
||||
|
||||
// 50 is hardcoded here because of URL length restrictions.
|
||||
// (GUIDs can be up to 64 chars long)
|
||||
MOBILE_BATCH_SIZE: 50,
|
||||
|
||||
// score thresholds for early syncs
|
||||
SINGLE_USER_THRESHOLD: 1000,
|
||||
MULTI_DESKTOP_THRESHOLD: 500,
|
||||
|
@ -282,7 +282,6 @@ Engine.prototype = {
|
||||
|
||||
function SyncEngine(name) {
|
||||
Engine.call(this, name || "SyncEngine");
|
||||
this.loadToFetch();
|
||||
}
|
||||
SyncEngine.prototype = {
|
||||
__proto__: Engine.prototype,
|
||||
@ -322,19 +321,6 @@ SyncEngine.prototype = {
|
||||
Svc.Prefs.set(this.name + ".lastSync", "0");
|
||||
},
|
||||
|
||||
get toFetch() this._toFetch,
|
||||
set toFetch(val) {
|
||||
this._toFetch = val;
|
||||
Utils.jsonSave("toFetch/" + this.name, this, val);
|
||||
},
|
||||
|
||||
loadToFetch: function loadToFetch() {
|
||||
// Initialize to empty if there's no file
|
||||
this._toFetch = [];
|
||||
Utils.jsonLoad("toFetch/" + this.name, this, Utils.bind2(this, function(o)
|
||||
this._toFetch = o));
|
||||
},
|
||||
|
||||
// Create a new record using the store and add in crypto fields
|
||||
_createRecord: function SyncEngine__createRecord(id) {
|
||||
let record = this._store.createRecord(id, this.engineURL + "/" + id);
|
||||
@ -439,23 +425,19 @@ SyncEngine.prototype = {
|
||||
this._delete = {};
|
||||
},
|
||||
|
||||
// Generate outgoing records
|
||||
// Process incoming records
|
||||
_processIncoming: function SyncEngine__processIncoming() {
|
||||
this._log.trace("Downloading & applying server changes");
|
||||
|
||||
// Figure out how many total items to fetch this sync; do less on mobile.
|
||||
// 50 is hardcoded here because of URL length restrictions.
|
||||
// (GUIDs can be up to 64 chars long)
|
||||
let fetchNum = Infinity;
|
||||
|
||||
let batchSize = Infinity;
|
||||
let newitems = new Collection(this.engineURL, this._recordObj);
|
||||
if (Svc.Prefs.get("client.type") == "mobile") {
|
||||
fetchNum = 50;
|
||||
newitems.sort = "index";
|
||||
batchSize = MOBILE_BATCH_SIZE;
|
||||
}
|
||||
newitems.newer = this.lastSync;
|
||||
newitems.full = true;
|
||||
newitems.limit = fetchNum;
|
||||
newitems.limit = batchSize;
|
||||
|
||||
let count = {applied: 0, reconciled: 0};
|
||||
let handled = [];
|
||||
@ -502,16 +484,13 @@ SyncEngine.prototype = {
|
||||
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
|
||||
throw resp;
|
||||
}
|
||||
|
||||
// Subtract out the number of items we just got
|
||||
fetchNum -= handled.length;
|
||||
}
|
||||
|
||||
// Check if we got the maximum that we requested; get the rest if so
|
||||
// Mobile: check if we got the maximum that we requested; get the rest if so.
|
||||
let toFetch = [];
|
||||
if (handled.length == newitems.limit) {
|
||||
let guidColl = new Collection(this.engineURL);
|
||||
guidColl.newer = this.lastSync;
|
||||
guidColl.sort = "index";
|
||||
|
||||
let guids = guidColl.get();
|
||||
if (!guids.success)
|
||||
@ -521,20 +500,18 @@ SyncEngine.prototype = {
|
||||
// were already waiting and prepend the new ones
|
||||
let extra = Utils.arraySub(guids.obj, handled);
|
||||
if (extra.length > 0)
|
||||
this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra));
|
||||
toFetch = extra.concat(Utils.arraySub(toFetch, extra));
|
||||
}
|
||||
|
||||
// Process any backlog of GUIDs if we haven't fetched too many this sync
|
||||
while (this.toFetch.length > 0 && fetchNum > 0) {
|
||||
// Mobile: process any backlog of GUIDs
|
||||
while (toFetch.length) {
|
||||
// Reuse the original query, but get rid of the restricting params
|
||||
newitems.limit = 0;
|
||||
newitems.newer = 0;
|
||||
|
||||
// Get the first bunch of records and save the rest for later
|
||||
let minFetch = Math.min(150, this.toFetch.length, fetchNum);
|
||||
newitems.ids = this.toFetch.slice(0, minFetch);
|
||||
this.toFetch = this.toFetch.slice(minFetch);
|
||||
fetchNum -= minFetch;
|
||||
newitems.ids = toFetch.slice(0, batchSize);
|
||||
toFetch = toFetch.slice(batchSize);
|
||||
|
||||
// Reuse the existing record handler set earlier
|
||||
let resp = newitems.get();
|
||||
@ -548,7 +525,7 @@ SyncEngine.prototype = {
|
||||
this.lastSync = this.lastModified;
|
||||
|
||||
this._log.info(["Records:", count.applied, "applied,", count.reconciled,
|
||||
"reconciled,", this.toFetch.length, "left to fetch"].join(" "));
|
||||
"reconciled."].join(" "));
|
||||
},
|
||||
|
||||
/**
|
||||
@ -788,7 +765,6 @@ SyncEngine.prototype = {
|
||||
|
||||
_resetClient: function SyncEngine__resetClient() {
|
||||
this.resetLastSync();
|
||||
this.toFetch = [];
|
||||
},
|
||||
|
||||
wipeServer: function wipeServer(ignoreCrypto) {
|
||||
|
@ -64,48 +64,17 @@ function test_lastSync() {
|
||||
}
|
||||
}
|
||||
|
||||
function test_toFetch() {
|
||||
_("SyncEngine.toFetch corresponds to file on disk");
|
||||
let engine = makeSteamEngine();
|
||||
try {
|
||||
// Ensure pristine environment
|
||||
do_check_eq(engine.toFetch.length, 0);
|
||||
|
||||
// Write file to disk
|
||||
let toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
|
||||
engine.toFetch = toFetch;
|
||||
do_check_eq(engine.toFetch, toFetch);
|
||||
let fakefile = syncTesting.fakeFilesystem.fakeContents[
|
||||
"weave/toFetch/steam.json"];
|
||||
do_check_eq(fakefile, JSON.stringify(toFetch));
|
||||
|
||||
// Read file from disk
|
||||
toFetch = [Utils.makeGUID(), Utils.makeGUID()];
|
||||
syncTesting.fakeFilesystem.fakeContents["weave/toFetch/steam.json"]
|
||||
= JSON.stringify(toFetch);
|
||||
engine.loadToFetch();
|
||||
do_check_eq(engine.toFetch.length, 2);
|
||||
do_check_eq(engine.toFetch[0], toFetch[0]);
|
||||
do_check_eq(engine.toFetch[1], toFetch[1]);
|
||||
} finally {
|
||||
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
|
||||
}
|
||||
}
|
||||
|
||||
function test_resetClient() {
|
||||
_("SyncEngine.resetClient resets lastSync and toFetch");
|
||||
_("SyncEngine.resetClient resets lastSync");
|
||||
let engine = makeSteamEngine();
|
||||
try {
|
||||
// Ensure pristine environment
|
||||
do_check_eq(Svc.Prefs.get("steam.lastSync"), undefined);
|
||||
do_check_eq(engine.toFetch.length, 0);
|
||||
|
||||
engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
|
||||
engine.lastSync = 123.45;
|
||||
|
||||
engine.resetClient();
|
||||
do_check_eq(engine.lastSync, 0);
|
||||
do_check_eq(engine.toFetch.length, 0);
|
||||
} finally {
|
||||
syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
|
||||
Svc.Prefs.resetBranch("");
|
||||
@ -128,14 +97,12 @@ function test_wipeServer() {
|
||||
|
||||
try {
|
||||
// Some data to reset.
|
||||
engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
|
||||
engine.lastSync = 123.45;
|
||||
|
||||
_("Wipe server data and reset client.");
|
||||
engine.wipeServer(true);
|
||||
do_check_eq(steamCollection.payload, undefined);
|
||||
do_check_eq(engine.lastSync, 0);
|
||||
do_check_eq(engine.toFetch.length, 0);
|
||||
|
||||
_("We passed a truthy arg earlier in which case it doesn't wipe the crypto collection.");
|
||||
do_check_eq(steamCrypto.payload, PAYLOAD);
|
||||
@ -153,7 +120,6 @@ function run_test() {
|
||||
test_url_attributes();
|
||||
test_syncID();
|
||||
test_lastSync();
|
||||
test_toFetch();
|
||||
test_resetClient();
|
||||
test_wipeServer();
|
||||
}
|
||||
|
@ -473,7 +473,6 @@ function test_processIncoming_emptyServer() {
|
||||
// Merely ensure that this code path is run without any errors
|
||||
engine._processIncoming();
|
||||
do_check_eq(engine.lastSync, 0);
|
||||
do_check_eq(engine.toFetch.length, 0);
|
||||
|
||||
} finally {
|
||||
server.stop(do_test_finished);
|
||||
@ -676,14 +675,22 @@ function test_processIncoming_reconcile() {
|
||||
}
|
||||
|
||||
|
||||
function test_processIncoming_fetchNum() {
|
||||
_("SyncEngine._processIncoming doesn't fetch everything at ones on mobile clients");
|
||||
function test_processIncoming_mobile_batchSize() {
|
||||
_("SyncEngine._processIncoming doesn't fetch everything at once on mobile clients");
|
||||
|
||||
Svc.Prefs.set("clusterURL", "http://localhost:8080/");
|
||||
Svc.Prefs.set("username", "foo");
|
||||
Svc.Prefs.set("client.type", "mobile");
|
||||
let crypto_steam = new ServerWBO('steam');
|
||||
|
||||
// A collection that logs each GET
|
||||
let collection = new ServerCollection();
|
||||
collection.get_log = [];
|
||||
collection._get = collection.get;
|
||||
collection.get = function (options) {
|
||||
this.get_log.push(options);
|
||||
return this._get(options);
|
||||
};
|
||||
|
||||
// Let's create some 234 server side records. They're all at least
|
||||
// 10 minutes old.
|
||||
@ -707,72 +714,31 @@ function test_processIncoming_fetchNum() {
|
||||
|
||||
try {
|
||||
|
||||
// On a mobile client, the first sync will only get the first 50
|
||||
// objects from the server
|
||||
// On a mobile client, we get new records from the server in batches of 50.
|
||||
engine._processIncoming();
|
||||
do_check_eq([id for (id in engine._store.items)].length, 50);
|
||||
do_check_eq([id for (id in engine._store.items)].length, 234);
|
||||
do_check_true('record-no-0' in engine._store.items);
|
||||
do_check_true('record-no-49' in engine._store.items);
|
||||
do_check_eq(engine.toFetch.length, 234 - 50);
|
||||
|
||||
|
||||
// The next sync will get another 50 objects, assuming the server
|
||||
// hasn't got any new data.
|
||||
engine._processIncoming();
|
||||
do_check_eq([id for (id in engine._store.items)].length, 100);
|
||||
do_check_true('record-no-50' in engine._store.items);
|
||||
do_check_true('record-no-99' in engine._store.items);
|
||||
do_check_eq(engine.toFetch.length, 234 - 100);
|
||||
|
||||
|
||||
// Now let's say there are some new items on the server
|
||||
for (i=0; i < 5; i++) {
|
||||
let id = 'new-record-no-' + i;
|
||||
let payload = encryptPayload({id: id, denomination: "New record No. " + i});
|
||||
let wbo = new ServerWBO(id, payload);
|
||||
wbo.modified = Date.now()/1000 - 60*i;
|
||||
collection.wbos[id] = wbo;
|
||||
}
|
||||
// Let's tell the engine the server has got newer data. This is
|
||||
// normally done by the WeaveSvc after retrieving info/collections.
|
||||
engine.lastModified = Date.now() / 1000 + 1;
|
||||
|
||||
// Now we'll fetch another 50 items, but 5 of those are the new
|
||||
// ones, so we've only fetched another 45 of the older ones.
|
||||
engine._processIncoming();
|
||||
do_check_eq([id for (id in engine._store.items)].length, 150);
|
||||
do_check_true('new-record-no-0' in engine._store.items);
|
||||
do_check_true('new-record-no-4' in engine._store.items);
|
||||
do_check_true('record-no-100' in engine._store.items);
|
||||
do_check_true('record-no-144' in engine._store.items);
|
||||
do_check_eq(engine.toFetch.length, 234 - 100 - 45);
|
||||
|
||||
|
||||
// Now let's modify a few existing records on the server so that
|
||||
// they have to be refetched.
|
||||
collection.wbos['record-no-3'].modified = Date.now()/1000 + 1;
|
||||
collection.wbos['record-no-41'].modified = Date.now()/1000 + 1;
|
||||
collection.wbos['record-no-122'].modified = Date.now()/1000 + 1;
|
||||
|
||||
// Once again we'll tell the engine that the server's got newer data
|
||||
// and once again we'll fetch 50 items, but 3 of those are the
|
||||
// existing records, so we're only fetching 47 new ones.
|
||||
engine.lastModified = Date.now() / 1000 + 2;
|
||||
engine._processIncoming();
|
||||
do_check_eq([id for (id in engine._store.items)].length, 197);
|
||||
do_check_true('record-no-145' in engine._store.items);
|
||||
do_check_true('record-no-191' in engine._store.items);
|
||||
do_check_eq(engine.toFetch.length, 234 - 100 - 45 - 47);
|
||||
|
||||
|
||||
// Finally let's fetch the rest, making sure that will fetch
|
||||
// everything up to the last record.
|
||||
while(engine.toFetch.length) {
|
||||
engine._processIncoming();
|
||||
}
|
||||
do_check_eq([id for (id in engine._store.items)].length, 234 + 5);
|
||||
do_check_true('record-no-233' in engine._store.items);
|
||||
|
||||
// Verify that the right number of GET requests with the right
|
||||
// kind of parameters were made.
|
||||
do_check_eq(collection.get_log.length,
|
||||
Math.ceil(234 / MOBILE_BATCH_SIZE) + 1);
|
||||
do_check_eq(collection.get_log[0].full, 1);
|
||||
do_check_eq(collection.get_log[0].limit, MOBILE_BATCH_SIZE);
|
||||
do_check_eq(collection.get_log[1].full, undefined);
|
||||
do_check_eq(collection.get_log[1].limit, undefined);
|
||||
for (let i = 1; i <= Math.floor(234 / MOBILE_BATCH_SIZE); i++) {
|
||||
do_check_eq(collection.get_log[i+1].full, 1);
|
||||
do_check_eq(collection.get_log[i+1].limit, undefined);
|
||||
if (i < Math.floor(234 / MOBILE_BATCH_SIZE))
|
||||
do_check_eq(collection.get_log[i+1].ids.length, MOBILE_BATCH_SIZE);
|
||||
else
|
||||
do_check_eq(collection.get_log[i+1].ids.length, 234 % MOBILE_BATCH_SIZE);
|
||||
}
|
||||
|
||||
} finally {
|
||||
server.stop(do_test_finished);
|
||||
Svc.Prefs.resetBranch("");
|
||||
@ -1162,7 +1128,7 @@ function run_test() {
|
||||
test_processIncoming_emptyServer();
|
||||
test_processIncoming_createFromServer();
|
||||
test_processIncoming_reconcile();
|
||||
test_processIncoming_fetchNum();
|
||||
test_processIncoming_mobile_batchSize();
|
||||
test_uploadOutgoing_toEmptyServer();
|
||||
test_uploadOutgoing_failed();
|
||||
test_uploadOutgoing_MAX_UPLOAD_RECORDS();
|
||||
|
Loading…
Reference in New Issue
Block a user