diff --git a/services/sync/modules/constants.js b/services/sync/modules/constants.js index 870582492621..42ab543450e6 100644 --- a/services/sync/modules/constants.js +++ b/services/sync/modules/constants.js @@ -62,6 +62,10 @@ MULTI_DESKTOP_SYNC: 60 * 60 * 1000, // 1 hour MULTI_MOBILE_SYNC: 5 * 60 * 1000, // 5 minutes PARTIAL_DATA_SYNC: 60 * 1000, // 1 minute +// 50 is hardcoded here because of URL length restrictions. +// (GUIDs can be up to 64 chars long) +MOBILE_BATCH_SIZE: 50, + // score thresholds for early syncs SINGLE_USER_THRESHOLD: 1000, MULTI_DESKTOP_THRESHOLD: 500, diff --git a/services/sync/modules/engines.js b/services/sync/modules/engines.js index 9ac3061c0b68..dc762beb0612 100644 --- a/services/sync/modules/engines.js +++ b/services/sync/modules/engines.js @@ -282,7 +282,6 @@ Engine.prototype = { function SyncEngine(name) { Engine.call(this, name || "SyncEngine"); - this.loadToFetch(); } SyncEngine.prototype = { __proto__: Engine.prototype, @@ -322,19 +321,6 @@ SyncEngine.prototype = { Svc.Prefs.set(this.name + ".lastSync", "0"); }, - get toFetch() this._toFetch, - set toFetch(val) { - this._toFetch = val; - Utils.jsonSave("toFetch/" + this.name, this, val); - }, - - loadToFetch: function loadToFetch() { - // Initialize to empty if there's no file - this._toFetch = []; - Utils.jsonLoad("toFetch/" + this.name, this, Utils.bind2(this, function(o) - this._toFetch = o)); - }, - // Create a new record using the store and add in crypto fields _createRecord: function SyncEngine__createRecord(id) { let record = this._store.createRecord(id, this.engineURL + "/" + id); @@ -439,23 +425,19 @@ SyncEngine.prototype = { this._delete = {}; }, - // Generate outgoing records + // Process incoming records _processIncoming: function SyncEngine__processIncoming() { this._log.trace("Downloading & applying server changes"); // Figure out how many total items to fetch this sync; do less on mobile. - // 50 is hardcoded here because of URL length restrictions. - // (GUIDs can be up to 64 chars long) - let fetchNum = Infinity; - + let batchSize = Infinity; let newitems = new Collection(this.engineURL, this._recordObj); if (Svc.Prefs.get("client.type") == "mobile") { - fetchNum = 50; - newitems.sort = "index"; + batchSize = MOBILE_BATCH_SIZE; } newitems.newer = this.lastSync; newitems.full = true; - newitems.limit = fetchNum; + newitems.limit = batchSize; let count = {applied: 0, reconciled: 0}; let handled = []; @@ -502,16 +484,13 @@ SyncEngine.prototype = { resp.failureCode = ENGINE_DOWNLOAD_FAIL; throw resp; } - - // Subtract out the number of items we just got - fetchNum -= handled.length; } - // Check if we got the maximum that we requested; get the rest if so + // Mobile: check if we got the maximum that we requested; get the rest if so. + let toFetch = []; if (handled.length == newitems.limit) { let guidColl = new Collection(this.engineURL); guidColl.newer = this.lastSync; - guidColl.sort = "index"; let guids = guidColl.get(); if (!guids.success) @@ -521,20 +500,18 @@ SyncEngine.prototype = { // were already waiting and prepend the new ones let extra = Utils.arraySub(guids.obj, handled); if (extra.length > 0) - this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra)); + toFetch = extra.concat(Utils.arraySub(toFetch, extra)); } - // Process any backlog of GUIDs if we haven't fetched too many this sync - while (this.toFetch.length > 0 && fetchNum > 0) { + // Mobile: process any backlog of GUIDs + while (toFetch.length) { // Reuse the original query, but get rid of the restricting params newitems.limit = 0; newitems.newer = 0; // Get the first bunch of records and save the rest for later - let minFetch = Math.min(150, this.toFetch.length, fetchNum); - newitems.ids = this.toFetch.slice(0, minFetch); - this.toFetch = this.toFetch.slice(minFetch); - fetchNum -= minFetch; + newitems.ids = toFetch.slice(0, batchSize); + toFetch = toFetch.slice(batchSize); // Reuse the existing record handler set earlier let resp = newitems.get(); @@ -548,7 +525,7 @@ SyncEngine.prototype = { this.lastSync = this.lastModified; this._log.info(["Records:", count.applied, "applied,", count.reconciled, - "reconciled,", this.toFetch.length, "left to fetch"].join(" ")); + "reconciled."].join(" ")); }, /** @@ -788,7 +765,6 @@ SyncEngine.prototype = { _resetClient: function SyncEngine__resetClient() { this.resetLastSync(); - this.toFetch = []; }, wipeServer: function wipeServer(ignoreCrypto) { diff --git a/services/sync/tests/unit/test_syncengine.js b/services/sync/tests/unit/test_syncengine.js index fb63bc62daaf..8badbc952875 100644 --- a/services/sync/tests/unit/test_syncengine.js +++ b/services/sync/tests/unit/test_syncengine.js @@ -64,48 +64,17 @@ function test_lastSync() { } } -function test_toFetch() { - _("SyncEngine.toFetch corresponds to file on disk"); - let engine = makeSteamEngine(); - try { - // Ensure pristine environment - do_check_eq(engine.toFetch.length, 0); - - // Write file to disk - let toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()]; - engine.toFetch = toFetch; - do_check_eq(engine.toFetch, toFetch); - let fakefile = syncTesting.fakeFilesystem.fakeContents[ - "weave/toFetch/steam.json"]; - do_check_eq(fakefile, JSON.stringify(toFetch)); - - // Read file from disk - toFetch = [Utils.makeGUID(), Utils.makeGUID()]; - syncTesting.fakeFilesystem.fakeContents["weave/toFetch/steam.json"] - = JSON.stringify(toFetch); - engine.loadToFetch(); - do_check_eq(engine.toFetch.length, 2); - do_check_eq(engine.toFetch[0], toFetch[0]); - do_check_eq(engine.toFetch[1], toFetch[1]); - } finally { - syncTesting = new SyncTestingInfrastructure(makeSteamEngine); - } -} - function test_resetClient() { - _("SyncEngine.resetClient resets lastSync and toFetch"); + _("SyncEngine.resetClient resets lastSync"); let engine = makeSteamEngine(); try { // Ensure pristine environment do_check_eq(Svc.Prefs.get("steam.lastSync"), undefined); - do_check_eq(engine.toFetch.length, 0); - engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()]; engine.lastSync = 123.45; engine.resetClient(); do_check_eq(engine.lastSync, 0); - do_check_eq(engine.toFetch.length, 0); } finally { syncTesting = new SyncTestingInfrastructure(makeSteamEngine); Svc.Prefs.resetBranch(""); @@ -128,14 +97,12 @@ function test_wipeServer() { try { // Some data to reset. - engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()]; engine.lastSync = 123.45; _("Wipe server data and reset client."); engine.wipeServer(true); do_check_eq(steamCollection.payload, undefined); do_check_eq(engine.lastSync, 0); - do_check_eq(engine.toFetch.length, 0); _("We passed a truthy arg earlier in which case it doesn't wipe the crypto collection."); do_check_eq(steamCrypto.payload, PAYLOAD); @@ -153,7 +120,6 @@ function run_test() { test_url_attributes(); test_syncID(); test_lastSync(); - test_toFetch(); test_resetClient(); test_wipeServer(); } diff --git a/services/sync/tests/unit/test_syncengine_sync.js b/services/sync/tests/unit/test_syncengine_sync.js index 6aeb566311ce..e52c6e3be072 100644 --- a/services/sync/tests/unit/test_syncengine_sync.js +++ b/services/sync/tests/unit/test_syncengine_sync.js @@ -473,7 +473,6 @@ function test_processIncoming_emptyServer() { // Merely ensure that this code path is run without any errors engine._processIncoming(); do_check_eq(engine.lastSync, 0); - do_check_eq(engine.toFetch.length, 0); } finally { server.stop(do_test_finished); @@ -676,14 +675,22 @@ function test_processIncoming_reconcile() { } -function test_processIncoming_fetchNum() { - _("SyncEngine._processIncoming doesn't fetch everything at ones on mobile clients"); +function test_processIncoming_mobile_batchSize() { + _("SyncEngine._processIncoming doesn't fetch everything at once on mobile clients"); Svc.Prefs.set("clusterURL", "http://localhost:8080/"); Svc.Prefs.set("username", "foo"); Svc.Prefs.set("client.type", "mobile"); let crypto_steam = new ServerWBO('steam'); + + // A collection that logs each GET let collection = new ServerCollection(); + collection.get_log = []; + collection._get = collection.get; + collection.get = function (options) { + this.get_log.push(options); + return this._get(options); + }; // Let's create some 234 server side records. They're all at least // 10 minutes old. @@ -707,72 +714,31 @@ function test_processIncoming_fetchNum() { try { - // On a mobile client, the first sync will only get the first 50 - // objects from the server + // On a mobile client, we get new records from the server in batches of 50. engine._processIncoming(); - do_check_eq([id for (id in engine._store.items)].length, 50); + do_check_eq([id for (id in engine._store.items)].length, 234); do_check_true('record-no-0' in engine._store.items); do_check_true('record-no-49' in engine._store.items); - do_check_eq(engine.toFetch.length, 234 - 50); - - - // The next sync will get another 50 objects, assuming the server - // hasn't got any new data. - engine._processIncoming(); - do_check_eq([id for (id in engine._store.items)].length, 100); do_check_true('record-no-50' in engine._store.items); - do_check_true('record-no-99' in engine._store.items); - do_check_eq(engine.toFetch.length, 234 - 100); - - - // Now let's say there are some new items on the server - for (i=0; i < 5; i++) { - let id = 'new-record-no-' + i; - let payload = encryptPayload({id: id, denomination: "New record No. " + i}); - let wbo = new ServerWBO(id, payload); - wbo.modified = Date.now()/1000 - 60*i; - collection.wbos[id] = wbo; - } - // Let's tell the engine the server has got newer data. This is - // normally done by the WeaveSvc after retrieving info/collections. - engine.lastModified = Date.now() / 1000 + 1; - - // Now we'll fetch another 50 items, but 5 of those are the new - // ones, so we've only fetched another 45 of the older ones. - engine._processIncoming(); - do_check_eq([id for (id in engine._store.items)].length, 150); - do_check_true('new-record-no-0' in engine._store.items); - do_check_true('new-record-no-4' in engine._store.items); - do_check_true('record-no-100' in engine._store.items); - do_check_true('record-no-144' in engine._store.items); - do_check_eq(engine.toFetch.length, 234 - 100 - 45); - - - // Now let's modify a few existing records on the server so that - // they have to be refetched. - collection.wbos['record-no-3'].modified = Date.now()/1000 + 1; - collection.wbos['record-no-41'].modified = Date.now()/1000 + 1; - collection.wbos['record-no-122'].modified = Date.now()/1000 + 1; - - // Once again we'll tell the engine that the server's got newer data - // and once again we'll fetch 50 items, but 3 of those are the - // existing records, so we're only fetching 47 new ones. - engine.lastModified = Date.now() / 1000 + 2; - engine._processIncoming(); - do_check_eq([id for (id in engine._store.items)].length, 197); - do_check_true('record-no-145' in engine._store.items); - do_check_true('record-no-191' in engine._store.items); - do_check_eq(engine.toFetch.length, 234 - 100 - 45 - 47); - - - // Finally let's fetch the rest, making sure that will fetch - // everything up to the last record. - while(engine.toFetch.length) { - engine._processIncoming(); - } - do_check_eq([id for (id in engine._store.items)].length, 234 + 5); do_check_true('record-no-233' in engine._store.items); + // Verify that the right number of GET requests with the right + // kind of parameters were made. + do_check_eq(collection.get_log.length, + Math.ceil(234 / MOBILE_BATCH_SIZE) + 1); + do_check_eq(collection.get_log[0].full, 1); + do_check_eq(collection.get_log[0].limit, MOBILE_BATCH_SIZE); + do_check_eq(collection.get_log[1].full, undefined); + do_check_eq(collection.get_log[1].limit, undefined); + for (let i = 1; i <= Math.floor(234 / MOBILE_BATCH_SIZE); i++) { + do_check_eq(collection.get_log[i+1].full, 1); + do_check_eq(collection.get_log[i+1].limit, undefined); + if (i < Math.floor(234 / MOBILE_BATCH_SIZE)) + do_check_eq(collection.get_log[i+1].ids.length, MOBILE_BATCH_SIZE); + else + do_check_eq(collection.get_log[i+1].ids.length, 234 % MOBILE_BATCH_SIZE); + } + } finally { server.stop(do_test_finished); Svc.Prefs.resetBranch(""); @@ -1162,7 +1128,7 @@ function run_test() { test_processIncoming_emptyServer(); test_processIncoming_createFromServer(); test_processIncoming_reconcile(); - test_processIncoming_fetchNum(); + test_processIncoming_mobile_batchSize(); test_uploadOutgoing_toEmptyServer(); test_uploadOutgoing_failed(); test_uploadOutgoing_MAX_UPLOAD_RECORDS();