diff --git a/.cargo/config.in b/.cargo/config.in index 12ead67ca232..29ef98f12e37 100644 --- a/.cargo/config.in +++ b/.cargo/config.in @@ -20,7 +20,7 @@ tag = "v0.2.4" [source."https://github.com/mozilla/application-services"] git = "https://github.com/mozilla/application-services" replace-with = "vendored-sources" -rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" +rev = "43d69b250d8185ebc53e887b747d85a2a53c7298" [source."https://github.com/mozilla-spidermonkey/jsparagus"] git = "https://github.com/mozilla-spidermonkey/jsparagus" diff --git a/Cargo.lock b/Cargo.lock index 7a8ed22190c6..799861ddee12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1255,7 +1255,7 @@ checksum = "ff511d5dc435d703f4971bc399647c9bc38e20cb41452e3b9feb4765419ed3f3" [[package]] name = "error-support" version = "0.1.0" -source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" +source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298" dependencies = [ "failure", ] @@ -2193,7 +2193,7 @@ dependencies = [ [[package]] name = "interrupt-support" version = "0.1.0" -source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" +source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298" [[package]] name = "intl-memoizer" @@ -3128,7 +3128,7 @@ dependencies = [ [[package]] name = "nss_build_common" version = "0.1.0" -source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" +source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298" [[package]] name = "nsstring" @@ -4271,7 +4271,7 @@ dependencies = [ [[package]] name = "sql-support" version = "0.1.0" -source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" +source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298" dependencies = [ "ffi-support", "interrupt-support", @@ -4468,7 +4468,7 @@ dependencies = [ [[package]] name = "sync-guid" version = "0.1.0" -source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" +source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298" dependencies = [ "base64 0.12.0", "rand", @@ -4479,7 +4479,7 @@ dependencies = [ [[package]] name = "sync15-traits" version = "0.1.0" -source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" +source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298" dependencies = [ "failure", "ffi-support", @@ -5194,7 +5194,7 @@ dependencies = [ [[package]] name = "webext-storage" version = "0.1.0" -source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" +source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298" dependencies = [ "error-support", "failure", diff --git a/services/interfaces/moz.build b/services/interfaces/moz.build index 84aa06e4661a..bce96642c53e 100644 --- a/services/interfaces/moz.build +++ b/services/interfaces/moz.build @@ -13,5 +13,6 @@ XPIDL_MODULE = 'services' XPIDL_SOURCES += [ 'mozIBridgedSyncEngine.idl', + 'mozIInterruptible.idl', 'mozIServicesLogger.idl', ] diff --git a/services/interfaces/mozIBridgedSyncEngine.idl b/services/interfaces/mozIBridgedSyncEngine.idl index 89b784cb6c23..1f6e315e9151 100644 --- a/services/interfaces/mozIBridgedSyncEngine.idl +++ b/services/interfaces/mozIBridgedSyncEngine.idl @@ -5,7 +5,6 @@ #include "mozIServicesLogger.idl" #include "nsISupports.idl" -interface nsICancelable; interface nsIVariant; // A generic callback called with a result. Variants are automatically unboxed @@ -112,28 +111,28 @@ interface mozIBridgedSyncEngine : nsISupports { // method that can be used to interrupt staging. Typically, engines will // stage incoming records in an SQLite temp table, and merge them with the // local database when `apply` is called. - nsICancelable storeIncoming(in Array incomingRecordsAsJSON, - in mozIBridgedSyncEngineCallback callback); + void storeIncoming(in Array incomingRecordsAsJSON, + in mozIBridgedSyncEngineCallback callback); // Applies all the staged records, and calls the `callback` with // outgoing records to upload. This will always be called after // `storeIncoming`, and only once per sync. Application should be atomic: // either all incoming records apply successfully, or none. - nsICancelable apply(in mozIBridgedSyncEngineApplyCallback callback); + void apply(in mozIBridgedSyncEngineApplyCallback callback); // Notifies the engine that Sync successfully uploaded the records with the // given IDs. This method may be called multiple times per sync, once per // batch upload. This will always be called after `apply`. - nsICancelable setUploaded(in long long newTimestampMillis, - in Array uploadedIds, - in mozIBridgedSyncEngineCallback callback); + void setUploaded(in long long newTimestampMillis, + in Array uploadedIds, + in mozIBridgedSyncEngineCallback callback); // Notifies the engine that syncing has finished, and the engine shouldn't // expect any more `setUploaded` calls. At this point, any outgoing records // that weren't passed to `setUploaded` should be assumed failed. This is // guaranteed to be called even if the sync fails. This will only be called // once per sync. - nsICancelable syncFinished(in mozIBridgedSyncEngineCallback callback); + void syncFinished(in mozIBridgedSyncEngineCallback callback); // Resets all local Sync metadata, including the sync ID, last sync time, // and any change flags, but preserves all data. After a reset, the engine will diff --git a/services/interfaces/mozIInterruptible.idl b/services/interfaces/mozIInterruptible.idl new file mode 100644 index 000000000000..2894c428d651 --- /dev/null +++ b/services/interfaces/mozIInterruptible.idl @@ -0,0 +1,13 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "nsISupports.idl" + +// Interrupts all pending operations on a data store, if possible. This is +// provided as a separate interface because the store may want to hide this +// implementation from callers, as interrupting a write can cause data loss. +[scriptable, uuid(1c06bfd3-76b1-46fa-a64a-db682d478374)] +interface mozIInterruptible : nsISupports { + void interrupt(); +}; diff --git a/services/sync/golden_gate/Cargo.toml b/services/sync/golden_gate/Cargo.toml index 100ac16fed4c..b6d74e5a6d73 100644 --- a/services/sync/golden_gate/Cargo.toml +++ b/services/sync/golden_gate/Cargo.toml @@ -8,13 +8,13 @@ edition = "2018" [dependencies] atomic_refcell = "0.1" cstr = "0.1" -interrupt-support = { git = "https://github.com/mozilla/application-services", rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" } +interrupt-support = { git = "https://github.com/mozilla/application-services", rev = "43d69b250d8185ebc53e887b747d85a2a53c7298" } log = "0.4" moz_task = { path = "../../../xpcom/rust/moz_task" } nserror = { path = "../../../xpcom/rust/nserror" } nsstring = { path = "../../../xpcom/rust/nsstring" } storage_variant = { path = "../../../storage/variant" } -sync15-traits = { git = "https://github.com/mozilla/application-services", rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" } +sync15-traits = { git = "https://github.com/mozilla/application-services", rev = "43d69b250d8185ebc53e887b747d85a2a53c7298" } xpcom = { path = "../../../xpcom/rust/xpcom" } [dependencies.thin-vec] diff --git a/services/sync/golden_gate/src/ferry.rs b/services/sync/golden_gate/src/ferry.rs index 07796cb81eb4..ad703f7574f8 100644 --- a/services/sync/golden_gate/src/ferry.rs +++ b/services/sync/golden_gate/src/ferry.rs @@ -2,30 +2,28 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use std::sync::Arc; - use nsstring::nsCString; use storage_variant::VariantType; use xpcom::{interfaces::nsIVariant, RefPtr}; /// An operation that runs on the background thread, and optionally passes a /// result to its callback. -pub enum Ferry { +pub enum Ferry { Initialize, LastSync, SetLastSync(i64), SyncId, ResetSyncId, EnsureCurrentSyncId(String), - StoreIncoming(Vec, Arc), - SetUploaded(i64, Vec, Arc), - SyncFinished(Arc), + StoreIncoming(Vec), + SetUploaded(i64, Vec), + SyncFinished, Reset, Wipe, Finalize, } -impl Ferry { +impl Ferry { /// Returns the operation name for debugging and labeling the task /// runnable. pub fn name(&self) -> &'static str { @@ -38,7 +36,7 @@ impl Ferry { Ferry::EnsureCurrentSyncId(_) => concat!(module_path!(), "ensureCurrentSyncId"), Ferry::StoreIncoming { .. } => concat!(module_path!(), "storeIncoming"), Ferry::SetUploaded { .. } => concat!(module_path!(), "setUploaded"), - Ferry::SyncFinished(_) => concat!(module_path!(), "sync"), + Ferry::SyncFinished => concat!(module_path!(), "syncFinished"), Ferry::Reset => concat!(module_path!(), "reset"), Ferry::Wipe => concat!(module_path!(), "wipe"), Ferry::Finalize => concat!(module_path!(), "finalize"), @@ -56,8 +54,8 @@ pub enum FerryResult { Null, } -impl From<()> for FerryResult { - fn from(_: ()) -> FerryResult { +impl Default for FerryResult { + fn default() -> Self { FerryResult::Null } } diff --git a/services/sync/golden_gate/src/lib.rs b/services/sync/golden_gate/src/lib.rs index 5df3eeaa5573..e975111514d7 100644 --- a/services/sync/golden_gate/src/lib.rs +++ b/services/sync/golden_gate/src/lib.rs @@ -113,5 +113,5 @@ pub use error::{Error, Result}; // Re-export items from `interrupt-support` and `sync15-traits`, so that // consumers of `golden_gate` don't have to depend on them. pub use interrupt_support::{Interrupted, Interruptee}; -pub use sync15_traits::BridgedEngine; +pub use sync15_traits::{ApplyResults, BridgedEngine}; pub use task::{ApplyTask, FerryTask}; diff --git a/services/sync/golden_gate/src/task.rs b/services/sync/golden_gate/src/task.rs index 9b98d370975c..6c0a3e41a866 100644 --- a/services/sync/golden_gate/src/task.rs +++ b/services/sync/golden_gate/src/task.rs @@ -2,10 +2,13 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use std::{fmt::Write, mem, sync::Arc}; +use std::{ + fmt::Write, + mem, + sync::{Arc, Weak}, +}; use atomic_refcell::AtomicRefCell; -use interrupt_support::Interruptee; use moz_task::{DispatchOptions, Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder}; use nserror::nsresult; use nsstring::{nsACString, nsCString}; @@ -24,17 +27,16 @@ use crate::ferry::{Ferry, FerryResult}; /// A ferry task sends (or ferries) an operation to a bridged engine on a /// background thread or task queue, and ferries back an optional result to /// a callback. -pub struct FerryTask { - engine: Arc, - ferry: Ferry, +pub struct FerryTask { + engine: Weak, + ferry: Ferry, callback: ThreadPtrHandle, result: AtomicRefCell>, } -impl FerryTask +impl FerryTask where N: ?Sized + BridgedEngine + Send + Sync + 'static, - S: Interruptee + Send + Sync + 'static, N::Error: BridgedError, { /// Creates a task to initialize the engine. @@ -42,7 +44,7 @@ where pub fn for_initialize( engine: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { Self::with_ferry(engine, Ferry::Initialize, callback) } @@ -51,7 +53,7 @@ where pub fn for_last_sync( engine: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { Self::with_ferry(engine, Ferry::LastSync, callback) } @@ -61,7 +63,7 @@ where engine: &Arc, last_sync_millis: i64, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { Self::with_ferry(engine, Ferry::SetLastSync(last_sync_millis), callback) } @@ -70,7 +72,7 @@ where pub fn for_sync_id( engine: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { Self::with_ferry(engine, Ferry::SyncId, callback) } @@ -80,7 +82,7 @@ where pub fn for_reset_sync_id( engine: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { Self::with_ferry(engine, Ferry::ResetSyncId, callback) } @@ -92,7 +94,7 @@ where engine: &Arc, new_sync_id: &nsACString, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { Self::with_ferry( engine, Ferry::EnsureCurrentSyncId(std::str::from_utf8(new_sync_id)?.into()), @@ -104,9 +106,8 @@ where pub fn for_store_incoming( engine: &Arc, incoming_cleartexts: &[nsCString], - signal: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { let incoming_cleartexts = incoming_cleartexts.iter().try_fold( Vec::with_capacity(incoming_cleartexts.len()), |mut cleartexts, cleartext| -> error::Result<_> { @@ -117,11 +118,7 @@ where Ok(cleartexts) }, )?; - Self::with_ferry( - engine, - Ferry::StoreIncoming(incoming_cleartexts, Arc::clone(signal)), - callback, - ) + Self::with_ferry(engine, Ferry::StoreIncoming(incoming_cleartexts), callback) } /// Creates a task to mark a subset of outgoing records as uploaded. This @@ -131,9 +128,8 @@ where engine: &Arc, server_modified_millis: i64, uploaded_ids: &[nsCString], - signal: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { let uploaded_ids = uploaded_ids.iter().try_fold( Vec::with_capacity(uploaded_ids.len()), |mut ids, id| -> error::Result<_> { @@ -143,7 +139,7 @@ where )?; Self::with_ferry( engine, - Ferry::SetUploaded(server_modified_millis, uploaded_ids, Arc::clone(signal)), + Ferry::SetUploaded(server_modified_millis, uploaded_ids), callback, ) } @@ -154,10 +150,9 @@ where #[inline] pub fn for_sync_finished( engine: &Arc, - signal: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { - Self::with_ferry(engine, Ferry::SyncFinished(Arc::clone(signal)), callback) + ) -> error::Result> { + Self::with_ferry(engine, Ferry::SyncFinished, callback) } /// Creates a task to reset all local Sync state for the engine, without @@ -166,7 +161,7 @@ where pub fn for_reset( engine: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { Self::with_ferry(engine, Ferry::Reset, callback) } @@ -175,7 +170,7 @@ where pub fn for_wipe( engine: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { Self::with_ferry(engine, Ferry::Wipe, callback) } @@ -184,7 +179,7 @@ where pub fn for_finalize( engine: &Arc, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { Self::with_ferry(engine, Ferry::Finalize, callback) } @@ -193,12 +188,12 @@ where /// background thread. fn with_ferry( engine: &Arc, - ferry: Ferry, + ferry: Ferry, callback: &mozIBridgedSyncEngineCallback, - ) -> error::Result> { + ) -> error::Result> { let name = ferry.name(); Ok(FerryTask { - engine: Arc::clone(engine), + engine: Arc::downgrade(engine), ferry, callback: ThreadPtrHolder::new( cstr!("mozIBridgedSyncEngineCallback"), @@ -218,46 +213,68 @@ where } } -impl Task for FerryTask +impl FerryTask +where + N: ?Sized + BridgedEngine, + N::Error: BridgedError, +{ + /// Runs the task on the background thread. This is split out into its own + /// method to make error handling easier. + fn inner_run(&self) -> Result { + let engine = match self.engine.upgrade() { + Some(outer) => outer, + None => return Err(Error::DidNotRun(self.ferry.name()).into()), + }; + Ok(match &self.ferry { + Ferry::Initialize => { + engine.initialize()?; + FerryResult::default() + } + Ferry::LastSync => FerryResult::LastSync(engine.last_sync()?), + Ferry::SetLastSync(last_sync_millis) => { + engine.set_last_sync(*last_sync_millis)?; + FerryResult::default() + } + Ferry::SyncId => FerryResult::SyncId(engine.sync_id()?), + Ferry::ResetSyncId => FerryResult::AssignedSyncId(engine.reset_sync_id()?), + Ferry::EnsureCurrentSyncId(new_sync_id) => { + FerryResult::AssignedSyncId(engine.ensure_current_sync_id(&*new_sync_id)?) + } + Ferry::StoreIncoming(incoming_cleartexts) => { + engine.store_incoming(incoming_cleartexts.as_slice())?; + FerryResult::default() + } + Ferry::SetUploaded(server_modified_millis, uploaded_ids) => { + engine.set_uploaded(*server_modified_millis, uploaded_ids.as_slice())?; + FerryResult::default() + } + Ferry::SyncFinished => { + engine.sync_finished()?; + FerryResult::default() + } + Ferry::Reset => { + engine.reset()?; + FerryResult::default() + } + Ferry::Wipe => { + engine.wipe()?; + FerryResult::default() + } + Ferry::Finalize => { + engine.finalize()?; + FerryResult::default() + } + }) + } +} + +impl Task for FerryTask where N: ?Sized + BridgedEngine, - S: Interruptee, N::Error: BridgedError, { fn run(&self) { - *self.result.borrow_mut() = match &self.ferry { - Ferry::Initialize => self.engine.initialize().map(FerryResult::from), - Ferry::LastSync => self.engine.last_sync().map(FerryResult::LastSync), - Ferry::SetLastSync(last_sync_millis) => self - .engine - .set_last_sync(*last_sync_millis) - .map(FerryResult::from), - Ferry::SyncId => self.engine.sync_id().map(FerryResult::SyncId), - Ferry::ResetSyncId => self.engine.reset_sync_id().map(FerryResult::AssignedSyncId), - Ferry::EnsureCurrentSyncId(new_sync_id) => self - .engine - .ensure_current_sync_id(&*new_sync_id) - .map(FerryResult::AssignedSyncId), - Ferry::StoreIncoming(incoming_cleartexts, signal) => self - .engine - .store_incoming(incoming_cleartexts.as_slice(), signal.as_ref()) - .map(FerryResult::from), - Ferry::SetUploaded(server_modified_millis, uploaded_ids, signal) => self - .engine - .set_uploaded( - *server_modified_millis, - uploaded_ids.as_slice(), - signal.as_ref(), - ) - .map(FerryResult::from), - Ferry::SyncFinished(signal) => self - .engine - .sync_finished(signal.as_ref()) - .map(FerryResult::from), - Ferry::Reset => self.engine.reset().map(FerryResult::from), - Ferry::Wipe => self.engine.wipe().map(FerryResult::from), - Ferry::Finalize => self.engine.finalize().map(FerryResult::from), - }; + *self.result.borrow_mut() = self.inner_run(); } fn done(&self) -> Result<(), nsresult> { @@ -280,39 +297,45 @@ where /// An apply task ferries incoming records to an engine on a background /// thread, and ferries back records to upload. It's separate from /// `FerryTask` because its callback type is different. -pub struct ApplyTask { - engine: Arc, - signal: Arc, +pub struct ApplyTask { + engine: Weak, callback: ThreadPtrHandle, result: AtomicRefCell>, } -impl ApplyTask +impl ApplyTask where N: ?Sized + BridgedEngine, + N::Error: BridgedError, { /// Returns the task name for debugging. pub fn name() -> &'static str { concat!(module_path!(), "apply") } + + /// Runs the task on the background thread. + fn inner_run(&self) -> Result { + let engine = match self.engine.upgrade() { + Some(outer) => outer, + None => return Err(Error::DidNotRun(Self::name()).into()), + }; + Ok(engine.apply()?) + } } -impl ApplyTask +impl ApplyTask where N: ?Sized + BridgedEngine + Send + Sync + 'static, - S: Interruptee + Send + Sync + 'static, N::Error: BridgedError, { /// Creates a task. The `callback` is bound to the current thread, and will /// be called once, after the records are applied on the background thread. pub fn new( engine: &Arc, - signal: &Arc, callback: &mozIBridgedSyncEngineApplyCallback, - ) -> error::Result> { + ) -> error::Result> { Ok(ApplyTask { - engine: Arc::clone(engine), - signal: Arc::clone(signal), + engine: Arc::downgrade(engine), callback: ThreadPtrHolder::new( cstr!("mozIBridgedSyncEngineApplyCallback"), RefPtr::new(callback), @@ -329,14 +352,13 @@ where } } -impl Task for ApplyTask +impl Task for ApplyTask where N: ?Sized + BridgedEngine, - S: Interruptee, N::Error: BridgedError, { fn run(&self) { - *self.result.borrow_mut() = self.engine.apply(self.signal.as_ref()); + *self.result.borrow_mut() = self.inner_run(); } fn done(&self) -> Result<(), nsresult> { diff --git a/services/sync/modules/bridged_engine.js b/services/sync/modules/bridged_engine.js index cfe16734c06c..4421a688f503 100644 --- a/services/sync/modules/bridged_engine.js +++ b/services/sync/modules/bridged_engine.js @@ -47,11 +47,7 @@ class BridgedStore { // pass attributes like `modified` and `sortindex`, which are not part // of the cleartext. let incomingCleartexts = chunk.map(record => record.cleartextToString()); - await promisifyWithSignal( - null, - this.engine._bridge.storeIncoming, - incomingCleartexts - ); + await promisify(this.engine._bridge.storeIncoming, incomingCleartexts); } // Array of failed records. return []; @@ -194,10 +190,6 @@ function BridgedEngine(bridge, name, service) { this._bridge = bridge; this._bridge.logger = new LogAdapter(this._log); - - // The maximum amount of time that we should wait for the bridged engine - // to apply incoming records before aborting. - this._applyTimeoutMillis = 5 * 60 * 60 * 1000; // 5 minutes } BridgedEngine.prototype = { @@ -308,7 +300,7 @@ BridgedEngine.prototype = { // keep the records around (for example, in a temp table), and automatically // write them back on `syncFinished`? await this.initialize(); - await promisifyWithSignal(null, this._bridge.syncFinished); + await promisify(this._bridge.syncFinished); }, /** @@ -332,34 +324,18 @@ BridgedEngine.prototype = { await super._processIncoming(newitems); await this.initialize(); - // TODO: We could consider having a per-sync watchdog instead; for - // example, time out after 5 minutes total, including any network - // latency. `promisifyWithSignal` makes this flexible. - let watchdog = this._newWatchdog(); - watchdog.start(this._applyTimeoutMillis); - - try { - let outgoingRecords = await promisifyWithSignal( - watchdog.signal, - this._bridge.apply - ); - let changeset = {}; - for (let record of outgoingRecords) { - // TODO: It would be nice if we could pass the cleartext through as-is - // here, too, instead of parsing and re-wrapping for `BridgedRecord`. - let cleartext = JSON.parse(record); - changeset[cleartext.id] = { - synced: false, - cleartext, - }; - } - this._modified.replace(changeset); - } finally { - watchdog.stop(); - if (watchdog.abortReason) { - this._log.warn(`Aborting bookmark merge: ${watchdog.abortReason}`); - } + let outgoingRecords = await promisify(this._bridge.apply); + let changeset = {}; + for (let record of outgoingRecords) { + // TODO: It would be nice if we could pass the cleartext through as-is + // here, too, instead of parsing and re-wrapping for `BridgedRecord`. + let cleartext = JSON.parse(record); + changeset[cleartext.id] = { + synced: false, + cleartext, + }; } + this._modified.replace(changeset); }, /** @@ -370,12 +346,7 @@ BridgedEngine.prototype = { */ async _onRecordsWritten(succeeded, failed, serverModifiedTime) { await this.initialize(); - await promisifyWithSignal( - null, - this._bridge.setUploaded, - serverModifiedTime, - succeeded - ); + await promisify(this._bridge.setUploaded, serverModifiedTime, succeeded); }, async _createTombstone() { @@ -426,35 +397,6 @@ function promisify(func, ...params) { }); } -// Like `promisify`, but takes an `AbortSignal` for cancelable -// operations. -function promisifyWithSignal(signal, func, ...params) { - if (!signal) { - return promisify(func, ...params); - } - return new Promise((resolve, reject) => { - if (signal.aborted) { - // TODO: Record more specific operation names, so we can see which - // ones get interrupted most in telemetry. - throw new InterruptedError("Interrupted before starting operation"); - } - function onAbort() { - signal.removeEventListener("abort", onAbort); - op.cancel(Cr.NS_ERROR_ABORT); - } - let op = func(...params, { - handleSuccess(result) { - signal.removeEventListener("abort", onAbort); - resolve(result); - }, - handleError(code, message) { - reject(transformError(code, message)); - }, - }); - signal.addEventListener("abort", onAbort); - }); -} - class BridgedChangeset extends Changeset { // Only `_reconcile` calls `getModifiedTimestamp` and `has`, and the buffered // engine does its own reconciliation. diff --git a/services/sync/tests/unit/test_bridged_engine.js b/services/sync/tests/unit/test_bridged_engine.js index 16ab9e198fa7..2cb541788b58 100644 --- a/services/sync/tests/unit/test_bridged_engine.js +++ b/services/sync/tests/unit/test_bridged_engine.js @@ -105,7 +105,6 @@ add_task(async function test_interface() { }, ].map(r => JSON.stringify(r)); CommonUtils.nextTick(() => callback.handleSuccess(outgoingRecords)); - return { cancel() {} }; } setUploaded(millis, ids, callback) { @@ -115,7 +114,6 @@ add_task(async function test_interface() { ); this.uploadedIDs.push(...ids); CommonUtils.nextTick(() => callback.handleSuccess()); - return { cancel() {} }; } syncFinished(callback) { @@ -125,7 +123,6 @@ add_task(async function test_interface() { ); this.wasSynced = true; CommonUtils.nextTick(() => callback.handleSuccess()); - return { cancel() {} }; } reset(callback) { diff --git a/third_party/rust/sync15-traits/.cargo-checksum.json b/third_party/rust/sync15-traits/.cargo-checksum.json index 42e30d9d8068..8e734571ba53 100644 --- a/third_party/rust/sync15-traits/.cargo-checksum.json +++ b/third_party/rust/sync15-traits/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"656c4c4af39bcf924098be33996360250f9610ee3a4090b8152b68bdad03c46e","README.md":"396105211d8ce7f40b05d8062d7ab55d99674555f3ac81c061874ae26656ed7e","src/bridged_engine.rs":"b4d45cd43db3e5926df614ae9706b8d1a5bb96860577463d05b56a4213532ec1","src/changeset.rs":"442aa92b5130ec0f8f2b0054acb399c547380e0060015cbf4ca7a72027440d54","src/client.rs":"6be4f550ade823fafc350c5490e031f90a4af833a9bba9739b05568464255a74","src/lib.rs":"c1ca44e7bb6477b8018bd554479021dbf52754e64577185b3f7e208ae45bf754","src/payload.rs":"09db1a444e7893990a4f03cb16263b9c15abc9e48ec4f1343227be1b490865a5","src/request.rs":"9e656ec487e53c7485643687e605d73bb25e138056e920d6f4b7d63fc6a8c460","src/server_timestamp.rs":"43d1b98a90e55e49380a0b66c209c9eb393e2aeaa27d843a4726d93cdd4cea02","src/store.rs":"10e215dd24270b6bec10903ac1d5274ce997eb437134f43be7de44e36fb9d1e4","src/telemetry.rs":"027befb099a6fcded3457f7e566296548a0898ff613267190621856b9ef288f6"},"package":null} \ No newline at end of file +{"files":{"Cargo.toml":"656c4c4af39bcf924098be33996360250f9610ee3a4090b8152b68bdad03c46e","README.md":"396105211d8ce7f40b05d8062d7ab55d99674555f3ac81c061874ae26656ed7e","src/bridged_engine.rs":"3387cea6cdfe9cfbc837d52e79a5d8e5be75e9d468a030760e8e7b3c6784462d","src/changeset.rs":"442aa92b5130ec0f8f2b0054acb399c547380e0060015cbf4ca7a72027440d54","src/client.rs":"6be4f550ade823fafc350c5490e031f90a4af833a9bba9739b05568464255a74","src/lib.rs":"c1ca44e7bb6477b8018bd554479021dbf52754e64577185b3f7e208ae45bf754","src/payload.rs":"09db1a444e7893990a4f03cb16263b9c15abc9e48ec4f1343227be1b490865a5","src/request.rs":"9e656ec487e53c7485643687e605d73bb25e138056e920d6f4b7d63fc6a8c460","src/server_timestamp.rs":"43d1b98a90e55e49380a0b66c209c9eb393e2aeaa27d843a4726d93cdd4cea02","src/store.rs":"10e215dd24270b6bec10903ac1d5274ce997eb437134f43be7de44e36fb9d1e4","src/telemetry.rs":"027befb099a6fcded3457f7e566296548a0898ff613267190621856b9ef288f6"},"package":null} \ No newline at end of file diff --git a/third_party/rust/sync15-traits/src/bridged_engine.rs b/third_party/rust/sync15-traits/src/bridged_engine.rs index 6aba5e2798f7..51b3c882b9fd 100644 --- a/third_party/rust/sync15-traits/src/bridged_engine.rs +++ b/third_party/rust/sync15-traits/src/bridged_engine.rs @@ -2,10 +2,6 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use std::{sync::Mutex, sync::MutexGuard, sync::PoisonError}; - -use interrupt_support::Interruptee; - /// A bridged Sync engine implements all the methods needed to support /// Desktop Sync. pub trait BridgedEngine { @@ -51,31 +47,22 @@ pub trait BridgedEngine { /// times per sync, once for each batch. Implementations can use the /// signal to check if the operation was aborted, and cancel any /// pending work. - fn store_incoming( - &self, - incoming_cleartexts: &[String], - signal: &dyn Interruptee, - ) -> Result<(), Self::Error>; + fn store_incoming(&self, incoming_cleartexts: &[String]) -> Result<(), Self::Error>; /// Applies all staged records, reconciling changes on both sides and /// resolving conflicts. Returns a list of records to upload. - fn apply(&self, signal: &dyn Interruptee) -> Result; + fn apply(&self) -> Result; /// Indicates that the given record IDs were uploaded successfully to the /// server. This is called multiple times per sync, once for each batch /// upload. - fn set_uploaded( - &self, - server_modified_millis: i64, - ids: &[String], - signal: &dyn Interruptee, - ) -> Result<(), Self::Error>; + fn set_uploaded(&self, server_modified_millis: i64, ids: &[String]) -> Result<(), Self::Error>; /// Indicates that all records have been uploaded. At this point, any record /// IDs marked for upload that haven't been passed to `set_uploaded`, can be /// assumed to have failed: for example, because the server rejected a record /// with an invalid TTL or sort index. - fn sync_finished(&self, signal: &dyn Interruptee) -> Result<(), Self::Error>; + fn sync_finished(&self) -> Result<(), Self::Error>; /// Resets all local Sync state, including any change flags, mirrors, and /// the last sync time, such that the next sync is treated as a first sync @@ -122,76 +109,3 @@ impl From> for ApplyResults { } } } - -/// A blanket implementation of `BridgedEngine` for any `Mutex`. -/// This is provided for convenience, since we expect most bridges to hold -/// their engines in an `Arc>`. -impl BridgedEngine for Mutex -where - E: BridgedEngine, - E::Error: for<'a> From>>, -{ - type Error = E::Error; - - fn initialize(&self) -> Result<(), Self::Error> { - self.lock()?.initialize() - } - - fn last_sync(&self) -> Result { - self.lock()?.last_sync() - } - - fn set_last_sync(&self, millis: i64) -> Result<(), Self::Error> { - self.lock()?.set_last_sync(millis) - } - - fn store_incoming( - &self, - incoming_cleartexts: &[String], - signal: &dyn Interruptee, - ) -> Result<(), Self::Error> { - self.lock()?.store_incoming(incoming_cleartexts, signal) - } - - fn apply(&self, signal: &dyn Interruptee) -> Result { - self.lock()?.apply(signal) - } - - fn set_uploaded( - &self, - server_modified_millis: i64, - ids: &[String], - signal: &dyn Interruptee, - ) -> Result<(), Self::Error> { - self.lock()? - .set_uploaded(server_modified_millis, ids, signal) - } - - fn sync_finished(&self, signal: &dyn Interruptee) -> Result<(), Self::Error> { - self.lock()?.sync_finished(signal) - } - - fn reset(&self) -> Result<(), Self::Error> { - self.lock()?.reset() - } - - fn wipe(&self) -> Result<(), Self::Error> { - self.lock()?.wipe() - } - - fn finalize(&self) -> Result<(), Self::Error> { - self.lock()?.finalize() - } - - fn sync_id(&self) -> Result, Self::Error> { - self.lock()?.sync_id() - } - - fn reset_sync_id(&self) -> Result { - self.lock()?.reset_sync_id() - } - - fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result { - self.lock()?.ensure_current_sync_id(new_sync_id) - } -} diff --git a/third_party/rust/webext-storage/.cargo-checksum.json b/third_party/rust/webext-storage/.cargo-checksum.json index cc8c37b8629f..2c8be8f7214a 100644 --- a/third_party/rust/webext-storage/.cargo-checksum.json +++ b/third_party/rust/webext-storage/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"23ed53b7db21b1015cbb1deafe950f80068691dfe7f2256e9f9237a94910a4d8","README.md":"1fd617294339930ee1ad5172377648b268cce0216fc3971facbfe7c6839e9ab1","build.rs":"2b827a62155a3d724cdb4c198270ea467439e537403f82fa873321ac55a69a63","sql/create_schema.sql":"d50b22cb17fc5d4e2aa4d001e853bd2f67eb3ffdbb1ac29013067dceacaec80e","src/api.rs":"578862ecc35d1bfa0c96329702cc7ffd743d749a170d808a729f91f584b33b3d","src/db.rs":"8933ce788e1044aa654eb0ed177596e327057dd49001287d3fbfcf57cc6bb728","src/error.rs":"86ba215ec5a889d1ccca9dcd141e42f75914744a4803598ccf3894da4a7f7475","src/lib.rs":"79685854649ffd0c1664f67cd0264f86c93f49e974de217a94a005b9d9d1c39d","src/schema.rs":"ac475285fb065fdf639d71e34d4e12663eb0650eefe3e151d223eb4fe3a667ef","src/store.rs":"686003abb14ffd1ef06f8a09a74c463ffaaf8e283ae7b92809122806e7085013"},"package":null} \ No newline at end of file +{"files":{"Cargo.toml":"23ed53b7db21b1015cbb1deafe950f80068691dfe7f2256e9f9237a94910a4d8","README.md":"1fd617294339930ee1ad5172377648b268cce0216fc3971facbfe7c6839e9ab1","build.rs":"2b827a62155a3d724cdb4c198270ea467439e537403f82fa873321ac55a69a63","sql/create_schema.sql":"d50b22cb17fc5d4e2aa4d001e853bd2f67eb3ffdbb1ac29013067dceacaec80e","src/api.rs":"578862ecc35d1bfa0c96329702cc7ffd743d749a170d808a729f91f584b33b3d","src/db.rs":"6cf7fdf2a60a640aa5ae9e9b9fc86a45580ec2ce80947fc8991d7da56bc972da","src/error.rs":"86ba215ec5a889d1ccca9dcd141e42f75914744a4803598ccf3894da4a7f7475","src/lib.rs":"79685854649ffd0c1664f67cd0264f86c93f49e974de217a94a005b9d9d1c39d","src/schema.rs":"ac475285fb065fdf639d71e34d4e12663eb0650eefe3e151d223eb4fe3a667ef","src/store.rs":"39ba1aa7e68a58e7a59e747e04144694f9835d88f0cd6514cb0fd8f25285fcd9"},"package":null} \ No newline at end of file diff --git a/third_party/rust/webext-storage/src/db.rs b/third_party/rust/webext-storage/src/db.rs index 1e1411a8ecb7..f5f6ac59cf61 100644 --- a/third_party/rust/webext-storage/src/db.rs +++ b/third_party/rust/webext-storage/src/db.rs @@ -7,11 +7,12 @@ use crate::schema; use rusqlite::types::{FromSql, ToSql}; use rusqlite::Connection; use rusqlite::OpenFlags; -use sql_support::ConnExt; +use sql_support::{ConnExt, SqlInterruptHandle, SqlInterruptScope}; use std::fs; use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; use std::result; +use std::sync::{atomic::AtomicUsize, Arc}; use url::Url; /// A `StorageDb` wraps a read-write SQLite connection, and handles schema @@ -23,6 +24,7 @@ use url::Url; /// store. It's still a bit overkill, but there's only so many yaks in a day. pub struct StorageDb { writer: Connection, + interrupt_counter: Arc, } impl StorageDb { /// Create a new, or fetch an already open, StorageDb backed by a file on disk. @@ -50,7 +52,10 @@ impl StorageDb { let conn = Connection::open_with_flags(db_path.clone(), flags)?; match init_sql_connection(&conn, true) { - Ok(()) => Ok(Self { writer: conn }), + Ok(()) => Ok(Self { + writer: conn, + interrupt_counter: Arc::new(AtomicUsize::new(0)), + }), Err(e) => { // like with places, failure to upgrade means "you lose your data" if let ErrorKind::DatabaseUpgradeError = e.kind() { @@ -63,6 +68,29 @@ impl StorageDb { } } + /// Returns an interrupt handle for this database connection. This handle + /// should be handed out to consumers that want to interrupt long-running + /// operations. It's FFI-safe, and `Send + Sync`, since it only makes sense + /// to use from another thread. Calling `interrupt` on the handle sets a + /// flag on all currently active interrupt scopes. + pub fn interrupt_handle(&self) -> SqlInterruptHandle { + SqlInterruptHandle::new( + self.writer.get_interrupt_handle(), + self.interrupt_counter.clone(), + ) + } + + /// Creates an object that knows when it's been interrupted. A new interrupt + /// scope should be created inside each method that does long-running + /// database work, like batch writes. This is the other side of a + /// `SqlInterruptHandle`: when a handle is interrupted, it flags all active + /// interrupt scopes as interrupted, too, so that they can abort pending + /// work as soon as possible. + #[allow(dead_code)] + pub fn begin_interrupt_scope(&self) -> SqlInterruptScope { + SqlInterruptScope::new(self.interrupt_counter.clone()) + } + /// Closes the database connection. If there are any unfinalized prepared /// statements on the connection, `close` will fail and the `StorageDb` will /// be returned to the caller so that it can retry, drop (via `mem::drop`) @@ -71,9 +99,19 @@ impl StorageDb { /// Keep in mind that dropping the connection tries to close it again, and /// panics on error. pub fn close(self) -> result::Result<(), (StorageDb, Error)> { - self.writer - .close() - .map_err(|(writer, err)| (StorageDb { writer }, err.into())) + let StorageDb { + writer, + interrupt_counter, + } = self; + writer.close().map_err(|(writer, err)| { + ( + StorageDb { + writer, + interrupt_counter, + }, + err.into(), + ) + }) } } diff --git a/third_party/rust/webext-storage/src/store.rs b/third_party/rust/webext-storage/src/store.rs index 0cc561d524e7..251fc5b35209 100644 --- a/third_party/rust/webext-storage/src/store.rs +++ b/third_party/rust/webext-storage/src/store.rs @@ -9,6 +9,7 @@ use std::path::Path; use std::result; use serde_json::Value as JsonValue; +use sql_support::SqlInterruptHandle; /// A store is used to access `storage.sync` data. It manages an underlying /// database connection, and exposes methods for reading and writing storage @@ -42,6 +43,11 @@ impl Store { }) } + /// Returns an interrupt handle for this store. + pub fn interrupt_handle(&self) -> SqlInterruptHandle { + self.db.interrupt_handle() + } + /// Sets one or more JSON key-value pairs for an extension ID. Returns a /// list of changes, with existing and new values for each key in `val`. pub fn set(&self, ext_id: &str, val: JsonValue) -> Result { diff --git a/toolkit/components/extensions/storage/webext_storage_bridge/Cargo.toml b/toolkit/components/extensions/storage/webext_storage_bridge/Cargo.toml index f28687700411..185d30dc9ca7 100644 --- a/toolkit/components/extensions/storage/webext_storage_bridge/Cargo.toml +++ b/toolkit/components/extensions/storage/webext_storage_bridge/Cargo.toml @@ -16,4 +16,4 @@ xpcom = { path = "../../../../../xpcom/rust/xpcom" } serde = "1" serde_json = "1" storage_variant = { path = "../../../../../storage/variant" } -webext-storage = { git = "https://github.com/mozilla/application-services", rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" } +webext-storage = { git = "https://github.com/mozilla/application-services", rev = "43d69b250d8185ebc53e887b747d85a2a53c7298" } diff --git a/toolkit/library/rust/shared/Cargo.toml b/toolkit/library/rust/shared/Cargo.toml index 59bb94f6215b..88acbd0054ac 100644 --- a/toolkit/library/rust/shared/Cargo.toml +++ b/toolkit/library/rust/shared/Cargo.toml @@ -55,7 +55,7 @@ unic-langid = { version = "0.8", features = ["likelysubtags"] } unic-langid-ffi = { path = "../../../../intl/locale/rust/unic-langid-ffi" } fluent-langneg = { version = "0.12.1", features = ["cldr"] } fluent-langneg-ffi = { path = "../../../../intl/locale/rust/fluent-langneg-ffi" } -webext-storage = { git = "https://github.com/mozilla/application-services", rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7", optional = true } +webext-storage = { git = "https://github.com/mozilla/application-services", rev = "43d69b250d8185ebc53e887b747d85a2a53c7298", optional = true } golden_gate = { path = "../../../../services/sync/golden_gate", optional = true } # Note: `modern_sqlite` means rusqlite's bindings file be for a sqlite with