gecko-dev/services/sync/modules/engines.js
Edward Lee 975a3a25f8 Bug 567371 - replace server and replace local options does not sync certain Passwords/bookmarks [r=mconnor]
Make sure to clear local cache when deleting crypto records from the server. Handle missing crypto by deleting any existing data and reuploading. Fix broken records by uploading new ones.
2010-05-21 12:15:58 -07:00

768 lines
24 KiB
JavaScript

/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is Bookmarks Sync.
*
* The Initial Developer of the Original Code is Mozilla.
* Portions created by the Initial Developer are Copyright (C) 2007
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
* Dan Mills <thunder@mozilla.com>
* Myk Melez <myk@mozilla.org>
*
* Alternatively, the contents of this file may be used under the terms of
* either the GNU General Public License Version 2 or later (the "GPL"), or
* the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
* in which case the provisions of the GPL or the LGPL are applicable instead
* of those above. If you wish to allow use of your version of this file only
* under the terms of either the GPL or the LGPL, and not to allow others to
* use your version of this file under the terms of the MPL, indicate your
* decision by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL or the LGPL. If you do not delete
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the MPL, the GPL or the LGPL.
*
* ***** END LICENSE BLOCK ***** */
const EXPORTED_SYMBOLS = ['Engines', 'Engine', 'SyncEngine'];
const Cc = Components.classes;
const Ci = Components.interfaces;
const Cr = Components.results;
const Cu = Components.utils;
Cu.import("resource://weave/ext/Observers.js");
Cu.import("resource://weave/ext/Sync.js");
Cu.import("resource://weave/log4moz.js");
Cu.import("resource://weave/constants.js");
Cu.import("resource://weave/util.js");
Cu.import("resource://weave/resource.js");
Cu.import("resource://weave/identity.js");
Cu.import("resource://weave/stores.js");
Cu.import("resource://weave/trackers.js");
Cu.import("resource://weave/base_records/wbo.js");
Cu.import("resource://weave/base_records/keys.js");
Cu.import("resource://weave/base_records/crypto.js");
Cu.import("resource://weave/base_records/collection.js");
// Singleton service, holds registered engines
Utils.lazy(this, 'Engines', EngineManagerSvc);
function EngineManagerSvc() {
this._engines = {};
this._log = Log4Moz.repository.getLogger("Service.Engines");
this._log.level = Log4Moz.Level[Svc.Prefs.get(
"log.logger.service.engines", "Debug")];
}
EngineManagerSvc.prototype = {
get: function EngMgr_get(name) {
// Return an array of engines if we have an array of names
if (Utils.isArray(name)) {
let engines = [];
name.forEach(function(name) {
let engine = this.get(name);
if (engine)
engines.push(engine);
}, this);
return engines;
}
let engine = this._engines[name];
if (!engine)
this._log.debug("Could not get engine: " + name);
return engine;
},
getAll: function EngMgr_getAll() {
return [engine for ([name, engine] in Iterator(Engines._engines))];
},
getEnabled: function EngMgr_getEnabled() {
return this.getAll().filter(function(engine) engine.enabled);
},
/**
* Register an Engine to the service. Alternatively, give an array of engine
* objects to register.
*
* @param engineObject
* Engine object used to get an instance of the engine
* @return The engine object if anything failed
*/
register: function EngMgr_register(engineObject) {
if (Utils.isArray(engineObject))
return engineObject.map(this.register, this);
try {
let engine = new engineObject();
let name = engine.name;
if (name in this._engines)
this._log.error("Engine '" + name + "' is already registered!");
else
this._engines[name] = engine;
}
catch(ex) {
let mesg = ex.message ? ex.message : ex;
let name = engineObject || "";
name = name.prototype || "";
name = name.name || "";
let out = "Could not initialize engine '" + name + "': " + mesg;
dump(out);
this._log.error(out);
return engineObject;
}
},
unregister: function EngMgr_unregister(val) {
let name = val;
if (val instanceof Engine)
name = val.name;
delete this._engines[name];
}
};
function Engine(name) {
this.Name = name || "Unnamed";
this.name = name.toLowerCase();
this._notify = Utils.notify("weave:engine:");
this._log = Log4Moz.repository.getLogger("Engine." + this.Name);
let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
this._log.level = Log4Moz.Level[level];
this._tracker; // initialize tracker to load previously changed IDs
this._log.debug("Engine initialized");
}
Engine.prototype = {
// _storeObj, and _trackerObj should to be overridden in subclasses
_storeObj: Store,
_trackerObj: Tracker,
get prefName() this.name,
get enabled() Svc.Prefs.get("engine." + this.prefName, null),
set enabled(val) Svc.Prefs.set("engine." + this.prefName, !!val),
get score() this._tracker.score,
get _store() {
let store = new this._storeObj(this.Name);
this.__defineGetter__("_store", function() store);
return store;
},
get _tracker() {
let tracker = new this._trackerObj(this.Name);
this.__defineGetter__("_tracker", function() tracker);
return tracker;
},
get displayName() {
try {
return Str.engines.get(this.name);
} catch (e) {}
return this.Name;
},
sync: function Engine_sync() {
if (!this.enabled)
return;
if (!this._sync)
throw "engine does not implement _sync method";
let times = {};
let wrapped = {};
// Find functions in any point of the prototype chain
for (let _name in this) {
let name = _name;
// Ignore certain constructors/functions
if (name.search(/^_(.+Obj|notify)$/) == 0)
continue;
// Only track functions but skip the constructors
if (typeof this[name] == "function") {
times[name] = [];
wrapped[name] = this[name];
// Wrap the original function with a start/stop timer
this[name] = function() {
let start = Date.now();
try {
return wrapped[name].apply(this, arguments);
}
finally {
times[name].push(Date.now() - start);
}
};
}
}
try {
this._notify("sync", this.name, this._sync)();
}
finally {
// Restore original unwrapped functionality
for (let [name, func] in Iterator(wrapped))
this[name] = func;
let stats = {};
for (let [name, time] in Iterator(times)) {
// Figure out stats on the times unless there's nothing
let num = time.length;
if (num == 0)
continue;
// Track the min/max/sum of the values
let stat = {
num: num,
sum: 0
};
time.forEach(function(val) {
if (stat.min == null || val < stat.min)
stat.min = val;
if (stat.max == null || val > stat.max)
stat.max = val;
stat.sum += val;
});
stat.avg = Number((stat.sum / num).toFixed(2));
stats[name] = stat;
}
stats.toString = function() {
let sums = [];
for (let [name, stat] in Iterator(this))
if (stat.sum != null)
sums.push(name.replace(/^_/, "") + " " + stat.sum);
// Order certain functions first before any other random ones
let nameOrder = ["sync", "processIncoming", "uploadOutgoing",
"syncStartup", "syncFinish"];
let getPos = function(str) {
let pos = nameOrder.indexOf(str.split(" ")[0]);
return pos != -1 ? pos : Infinity;
};
let order = function(a, b) getPos(a) > getPos(b);
return "Total (ms): " + sums.sort(order).join(", ");
};
this._log.debug(stats);
}
},
/**
* Get rid of any local meta-data
*/
resetClient: function Engine_resetClient() {
if (!this._resetClient)
throw "engine does not implement _resetClient method";
this._notify("reset-client", this.name, this._resetClient)();
},
_wipeClient: function Engine__wipeClient() {
this.resetClient();
this._log.debug("Deleting all local data");
this._tracker.ignoreAll = true;
this._store.wipe();
this._tracker.ignoreAll = false;
this._tracker.clearChangedIDs();
},
wipeClient: function Engine_wipeClient() {
this._notify("wipe-client", this.name, this._wipeClient)();
}
};
function SyncEngine(name) {
Engine.call(this, name || "SyncEngine");
this.loadToFetch();
}
SyncEngine.prototype = {
__proto__: Engine.prototype,
_recordObj: CryptoWrapper,
version: 1,
get storageURL() Svc.Prefs.get("clusterURL") + Svc.Prefs.get("storageAPI") +
"/" + ID.get("WeaveID").username + "/storage/",
get engineURL() this.storageURL + this.name,
get cryptoMetaURL() this.storageURL + "crypto/" + this.name,
get metaURL() this.storageURL + "meta/global",
get syncID() {
// Generate a random syncID if we don't have one
let syncID = Svc.Prefs.get(this.name + ".syncID", "");
return syncID == "" ? this.syncID = Utils.makeGUID() : syncID;
},
set syncID(value) {
Svc.Prefs.set(this.name + ".syncID", value);
},
get lastSync() {
return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0"));
},
set lastSync(value) {
// Reset the pref in-case it's a number instead of a string
Svc.Prefs.reset(this.name + ".lastSync");
// Store the value as a string to keep floating point precision
Svc.Prefs.set(this.name + ".lastSync", value.toString());
},
resetLastSync: function SyncEngine_resetLastSync() {
this._log.debug("Resetting " + this.name + " last sync time");
Svc.Prefs.reset(this.name + ".lastSync");
Svc.Prefs.set(this.name + ".lastSync", "0");
},
get toFetch() this._toFetch,
set toFetch(val) {
this._toFetch = val;
Utils.jsonSave("toFetch/" + this.name, this, val);
},
loadToFetch: function loadToFetch() {
// Initialize to empty if there's no file
this._toFetch = [];
Utils.jsonLoad("toFetch/" + this.name, this, Utils.bind2(this, function(o)
this._toFetch = o));
},
// Create a new record using the store and add in crypto fields
_createRecord: function SyncEngine__createRecord(id) {
let record = this._store.createRecord(id);
record.id = id;
record.encryption = this.cryptoMetaURL;
return record;
},
// Any setup that needs to happen at the beginning of each sync.
// Makes sure crypto records and keys are all set-up
_syncStartup: function SyncEngine__syncStartup() {
this._log.trace("Ensuring server crypto records are there");
// Try getting/unwrapping the crypto record
let meta = CryptoMetas.get(this.cryptoMetaURL);
if (meta) {
try {
let pubkey = PubKeys.getDefaultKey();
let privkey = PrivKeys.get(pubkey.privateKeyUri);
meta.getKey(privkey, ID.get("WeaveCryptoID"));
}
catch(ex) {
// Indicate that we don't have a cryptometa to delete and reupload
this._log.debug("Purging bad data after failed unwrap crypto: " + ex);
meta = null;
}
}
// Determine if we need to wipe on outdated versions
let metaGlobal = Records.get(this.metaURL);
let engines = metaGlobal.payload.engines || {};
let engineData = engines[this.name] || {};
// Assume missing versions are 0 and wipe the server
if ((engineData.version || 0) < this.version) {
this._log.debug("Old engine data: " + [engineData.version, this.version]);
// Prepare to clear the server and upload everything
meta = null;
this.syncID = "";
// Set the newer version and newly generated syncID
engineData.version = this.version;
engineData.syncID = this.syncID;
// Put the new data back into meta/global and mark for upload
engines[this.name] = engineData;
metaGlobal.payload.engines = engines;
metaGlobal.changed = true;
}
// Don't sync this engine if the server has newer data
else if (engineData.version > this.version) {
let error = new String("New data: " + [engineData.version, this.version]);
error.failureCode = VERSION_OUT_OF_DATE;
throw error;
}
// Changes to syncID mean we'll need to upload everything
else if (engineData.syncID != this.syncID) {
this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]);
this.syncID = engineData.syncID;
this._resetClient();
};
// Delete any existing data and reupload on bad version or missing meta
if (meta == null) {
new Resource(this.engineURL).delete();
this._resetClient();
// Generate a new crypto record
let symkey = Svc.Crypto.generateRandomKey();
let pubkey = PubKeys.getDefaultKey();
meta = new CryptoMeta(this.cryptoMetaURL);
meta.addUnwrappedKey(pubkey, symkey);
let res = new Resource(meta.uri);
let resp = res.put(meta);
if (!resp.success) {
this._log.debug("Metarecord upload fail:" + resp);
resp.failureCode = ENGINE_METARECORD_UPLOAD_FAIL;
throw resp;
}
// Cache the cryto meta that we just put on the server
CryptoMetas.set(meta.uri, meta);
}
// Mark all items to be uploaded, but treat them as changed from long ago
if (!this.lastSync) {
this._log.debug("First sync, uploading all items");
for (let id in this._store.getAllIDs())
this._tracker.addChangedID(id, 0);
}
let outnum = [i for (i in this._tracker.changedIDs)].length;
this._log.info(outnum + " outgoing items pre-reconciliation");
// Keep track of what to delete at the end of sync
this._delete = {};
},
// Generate outgoing records
_processIncoming: function SyncEngine__processIncoming() {
this._log.trace("Downloading & applying server changes");
// Figure out how many total items to fetch this sync; do less on mobile
let fetchNum = Infinity;
if (Svc.Prefs.get("client.type") == "mobile")
fetchNum = 50;
let newitems = new Collection(this.engineURL, this._recordObj);
newitems.newer = this.lastSync;
newitems.full = true;
newitems.sort = "index";
newitems.limit = fetchNum;
let count = {applied: 0, reconciled: 0};
let handled = [];
newitems.recordHandler = Utils.bind2(this, function(item) {
// Grab a later last modified if possible
if (this.lastModified == null || item.modified > this.lastModified)
this.lastModified = item.modified;
// Remember which records were processed
handled.push(item.id);
try {
item.decrypt(ID.get("WeaveCryptoID"));
if (this._reconcile(item)) {
count.applied++;
this._tracker.ignoreAll = true;
this._store.applyIncoming(item);
} else {
count.reconciled++;
this._log.trace("Skipping reconciled incoming item " + item.id);
}
}
catch(ex) {
this._log.warn("Error processing record: " + Utils.exceptionStr(ex));
// Upload a new record to replace the bad one if we have it
if (this._store.itemExists(item.id))
this._tracker.addChangedID(item.id, 0);
}
this._tracker.ignoreAll = false;
Sync.sleep(0);
});
// Only bother getting data from the server if there's new things
if (this.lastModified == null || this.lastModified > this.lastSync) {
let resp = newitems.get();
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
}
// Subtract out the number of items we just got
fetchNum -= handled.length;
}
// Check if we got the maximum that we requested; get the rest if so
if (handled.length == newitems.limit) {
let guidColl = new Collection(this.engineURL);
guidColl.newer = this.lastSync;
guidColl.sort = "index";
let guids = guidColl.get();
if (!guids.success)
throw guids;
// Figure out which guids weren't just fetched then remove any guids that
// were already waiting and prepend the new ones
let extra = Utils.arraySub(guids.obj, handled);
if (extra.length > 0)
this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra));
}
// Process any backlog of GUIDs if we haven't fetched too many this sync
while (this.toFetch.length > 0 && fetchNum > 0) {
// Reuse the original query, but get rid of the restricting params
newitems.limit = 0;
newitems.newer = 0;
// Get the first bunch of records and save the rest for later
let minFetch = Math.min(150, this.toFetch.length, fetchNum);
newitems.ids = this.toFetch.slice(0, minFetch);
this.toFetch = this.toFetch.slice(minFetch);
fetchNum -= minFetch;
// Reuse the existing record handler set earlier
let resp = newitems.get();
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
}
}
if (this.lastSync < this.lastModified)
this.lastSync = this.lastModified;
this._log.info(["Records:", count.applied, "applied,", count.reconciled,
"reconciled,", this.toFetch.length, "left to fetch"].join(" "));
},
/**
* Find a GUID of an item that is a duplicate of the incoming item but happens
* to have a different GUID
*
* @return GUID of the similar item; falsy otherwise
*/
_findDupe: function _findDupe(item) {
// By default, assume there's no dupe items for the engine
},
_isEqual: function SyncEngine__isEqual(item) {
let local = this._createRecord(item.id);
if (this._log.level <= Log4Moz.Level.Trace)
this._log.trace("Local record: " + local);
if (Utils.deepEquals(item.cleartext, local.cleartext)) {
this._log.trace("Local record is the same");
return true;
} else {
this._log.trace("Local record is different");
return false;
}
},
_deleteId: function _deleteId(id) {
this._tracker.removeChangedID(id);
// Remember this id to delete at the end of sync
if (this._delete.ids == null)
this._delete.ids = [id];
else
this._delete.ids.push(id);
},
_handleDupe: function _handleDupe(item, dupeId) {
// Prefer shorter guids; for ties, just do an ASCII compare
let preferLocal = dupeId.length < item.id.length ||
(dupeId.length == item.id.length && dupeId < item.id);
if (preferLocal) {
this._log.trace("Preferring local id: " + [dupeId, item.id]);
this._deleteId(item.id);
item.id = dupeId;
this._tracker.addChangedID(dupeId, 0);
}
else {
this._log.trace("Switching local id to incoming: " + [item.id, dupeId]);
this._store.changeItemID(dupeId, item.id);
this._deleteId(dupeId);
}
},
_reconcile: function SyncEngine__reconcile(item) {
if (this._log.level <= Log4Moz.Level.Trace)
this._log.trace("Incoming: " + item);
this._log.trace("Reconcile step 1: Check for conflicts");
if (item.id in this._tracker.changedIDs) {
// If the incoming and local changes are the same, skip
if (this._isEqual(item)) {
this._tracker.removeChangedID(item.id);
return false;
}
// Records differ so figure out which to take
let recordAge = Resource.serverTime - item.modified;
let localAge = Date.now() / 1000 - this._tracker.changedIDs[item.id];
this._log.trace("Record age vs local age: " + [recordAge, localAge]);
// Apply the record if the record is newer (server wins)
return recordAge < localAge;
}
this._log.trace("Reconcile step 2: Check for updates");
if (this._store.itemExists(item.id))
return !this._isEqual(item);
this._log.trace("Reconcile step 2.5: Don't dupe deletes");
if (item.deleted)
return true;
this._log.trace("Reconcile step 3: Find dupes");
let dupeId = this._findDupe(item);
if (dupeId)
this._handleDupe(item, dupeId);
// Apply the incoming item (now that the dupe is the right id)
return true;
},
// Upload outgoing records
_uploadOutgoing: function SyncEngine__uploadOutgoing() {
let outnum = [i for (i in this._tracker.changedIDs)].length;
if (outnum) {
this._log.trace("Preparing " + outnum + " outgoing records");
// collection we'll upload
let up = new Collection(this.engineURL);
let count = 0;
// Upload what we've got so far in the collection
let doUpload = Utils.bind2(this, function(desc) {
this._log.info("Uploading " + desc + " of " + outnum + " records");
let resp = up.post();
if (!resp.success) {
this._log.debug("Uploading records failed: " + resp);
resp.failureCode = ENGINE_UPLOAD_FAIL;
throw resp;
}
// Record the modified time of the upload
let modified = resp.headers["X-Weave-Timestamp"];
if (modified > this.lastSync)
this.lastSync = modified;
up.clearRecords();
});
for (let id in this._tracker.changedIDs) {
try {
let out = this._createRecord(id);
if (this._log.level <= Log4Moz.Level.Trace)
this._log.trace("Outgoing: " + out);
out.encrypt(ID.get("WeaveCryptoID"));
up.pushData(out);
}
catch(ex) {
this._log.warn("Error creating record: " + Utils.exceptionStr(ex));
}
// Partial upload
if ((++count % MAX_UPLOAD_RECORDS) == 0)
doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out");
Sync.sleep(0);
}
// Final upload
if (count % MAX_UPLOAD_RECORDS > 0)
doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
}
this._tracker.clearChangedIDs();
},
// Any cleanup necessary.
// Save the current snapshot so as to calculate changes at next sync
_syncFinish: function SyncEngine__syncFinish() {
this._log.trace("Finishing up sync");
this._tracker.resetScore();
let doDelete = Utils.bind2(this, function(key, val) {
let coll = new Collection(this.engineURL, this._recordObj);
coll[key] = val;
coll.delete();
});
for (let [key, val] in Iterator(this._delete)) {
// Remove the key for future uses
delete this._delete[key];
// Send a simple delete for the property
if (key != "ids" || val.length <= 100)
doDelete(key, val);
else {
// For many ids, split into chunks of at most 100
while (val.length > 0) {
doDelete(key, val.slice(0, 100));
val = val.slice(100);
}
}
}
},
_sync: function SyncEngine__sync() {
try {
this._syncStartup();
Observers.notify("weave:engine:sync:status", "process-incoming");
this._processIncoming();
Observers.notify("weave:engine:sync:status", "upload-outgoing");
this._uploadOutgoing();
this._syncFinish();
}
catch (e) {
this._log.warn("Sync failed");
throw e;
}
},
_testDecrypt: function _testDecrypt() {
// Report failure even if there's nothing to decrypt
let canDecrypt = false;
// Fetch the most recently uploaded record and try to decrypt it
let test = new Collection(this.engineURL, this._recordObj);
test.limit = 1;
test.sort = "newest";
test.full = true;
test.recordHandler = function(record) {
record.decrypt(ID.get("WeaveCryptoID"));
canDecrypt = true;
};
// Any failure fetching/decrypting will just result in false
try {
this._log.trace("Trying to decrypt a record from the server..");
test.get();
}
catch(ex) {
this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex));
}
return canDecrypt;
},
_resetClient: function SyncEngine__resetClient() {
this.resetLastSync();
this.toFetch = [];
}
};