Bug 1634626 - Refactor interruption in Golden Gate. r=markh,tcsc

This commit removes the `nsICancelable` return values from all
`mozIBridgedSyncEngine` methods, and replaces them with a
`mozIInterruptible` interface that can be implemented by store
classes that support interrupting.

The `nsICancelable` pattern was intended to make each operation
interruptible, without affecting the others. But we can't guarantee
that with SQLite, because it only has a way to interrupt all
running statements on a connection, not specific ones. Further,
this pattern doesn't match what we currently do in a-s, where we
create an internal "interrupt scope" for each operation, and hand
out an "interrupt handle" for interrupting all in-progress
operations.

Storage classes like `StorageSyncArea` can opt in to interruption
by implementing `mozIInterruptible`. It's a separate interface to
protect against accidental misuse: because it interrupts all
statements on the connection, it might lose writes if the current
operation is a `set`, for example. But it's useful for testing and
debugging, so we still expose it.

This commit also changes Golden Gate ferries to hold weak references to
the `BridgedEngine`, so that they don't block teardown.

Differential Revision: https://phabricator.services.mozilla.com/D73413
This commit is contained in:
Lina Cambridge 2020-05-04 21:32:29 +00:00
parent 3ceed5d372
commit a9b590300f
18 changed files with 213 additions and 283 deletions

View File

@ -20,7 +20,7 @@ tag = "v0.2.4"
[source."https://github.com/mozilla/application-services"]
git = "https://github.com/mozilla/application-services"
replace-with = "vendored-sources"
rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7"
rev = "43d69b250d8185ebc53e887b747d85a2a53c7298"
[source."https://github.com/mozilla-spidermonkey/jsparagus"]
git = "https://github.com/mozilla-spidermonkey/jsparagus"

14
Cargo.lock generated
View File

@ -1255,7 +1255,7 @@ checksum = "ff511d5dc435d703f4971bc399647c9bc38e20cb41452e3b9feb4765419ed3f3"
[[package]]
name = "error-support"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7"
source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298"
dependencies = [
"failure",
]
@ -2193,7 +2193,7 @@ dependencies = [
[[package]]
name = "interrupt-support"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7"
source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298"
[[package]]
name = "intl-memoizer"
@ -3128,7 +3128,7 @@ dependencies = [
[[package]]
name = "nss_build_common"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7"
source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298"
[[package]]
name = "nsstring"
@ -4271,7 +4271,7 @@ dependencies = [
[[package]]
name = "sql-support"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7"
source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298"
dependencies = [
"ffi-support",
"interrupt-support",
@ -4468,7 +4468,7 @@ dependencies = [
[[package]]
name = "sync-guid"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7"
source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298"
dependencies = [
"base64 0.12.0",
"rand",
@ -4479,7 +4479,7 @@ dependencies = [
[[package]]
name = "sync15-traits"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7"
source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298"
dependencies = [
"failure",
"ffi-support",
@ -5194,7 +5194,7 @@ dependencies = [
[[package]]
name = "webext-storage"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7#d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7"
source = "git+https://github.com/mozilla/application-services?rev=43d69b250d8185ebc53e887b747d85a2a53c7298#43d69b250d8185ebc53e887b747d85a2a53c7298"
dependencies = [
"error-support",
"failure",

View File

@ -13,5 +13,6 @@ XPIDL_MODULE = 'services'
XPIDL_SOURCES += [
'mozIBridgedSyncEngine.idl',
'mozIInterruptible.idl',
'mozIServicesLogger.idl',
]

View File

@ -5,7 +5,6 @@
#include "mozIServicesLogger.idl"
#include "nsISupports.idl"
interface nsICancelable;
interface nsIVariant;
// A generic callback called with a result. Variants are automatically unboxed
@ -112,28 +111,28 @@ interface mozIBridgedSyncEngine : nsISupports {
// method that can be used to interrupt staging. Typically, engines will
// stage incoming records in an SQLite temp table, and merge them with the
// local database when `apply` is called.
nsICancelable storeIncoming(in Array<AUTF8String> incomingRecordsAsJSON,
in mozIBridgedSyncEngineCallback callback);
void storeIncoming(in Array<AUTF8String> incomingRecordsAsJSON,
in mozIBridgedSyncEngineCallback callback);
// Applies all the staged records, and calls the `callback` with
// outgoing records to upload. This will always be called after
// `storeIncoming`, and only once per sync. Application should be atomic:
// either all incoming records apply successfully, or none.
nsICancelable apply(in mozIBridgedSyncEngineApplyCallback callback);
void apply(in mozIBridgedSyncEngineApplyCallback callback);
// Notifies the engine that Sync successfully uploaded the records with the
// given IDs. This method may be called multiple times per sync, once per
// batch upload. This will always be called after `apply`.
nsICancelable setUploaded(in long long newTimestampMillis,
in Array<AUTF8String> uploadedIds,
in mozIBridgedSyncEngineCallback callback);
void setUploaded(in long long newTimestampMillis,
in Array<AUTF8String> uploadedIds,
in mozIBridgedSyncEngineCallback callback);
// Notifies the engine that syncing has finished, and the engine shouldn't
// expect any more `setUploaded` calls. At this point, any outgoing records
// that weren't passed to `setUploaded` should be assumed failed. This is
// guaranteed to be called even if the sync fails. This will only be called
// once per sync.
nsICancelable syncFinished(in mozIBridgedSyncEngineCallback callback);
void syncFinished(in mozIBridgedSyncEngineCallback callback);
// Resets all local Sync metadata, including the sync ID, last sync time,
// and any change flags, but preserves all data. After a reset, the engine will

View File

@ -0,0 +1,13 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "nsISupports.idl"
// Interrupts all pending operations on a data store, if possible. This is
// provided as a separate interface because the store may want to hide this
// implementation from callers, as interrupting a write can cause data loss.
[scriptable, uuid(1c06bfd3-76b1-46fa-a64a-db682d478374)]
interface mozIInterruptible : nsISupports {
void interrupt();
};

View File

@ -8,13 +8,13 @@ edition = "2018"
[dependencies]
atomic_refcell = "0.1"
cstr = "0.1"
interrupt-support = { git = "https://github.com/mozilla/application-services", rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" }
interrupt-support = { git = "https://github.com/mozilla/application-services", rev = "43d69b250d8185ebc53e887b747d85a2a53c7298" }
log = "0.4"
moz_task = { path = "../../../xpcom/rust/moz_task" }
nserror = { path = "../../../xpcom/rust/nserror" }
nsstring = { path = "../../../xpcom/rust/nsstring" }
storage_variant = { path = "../../../storage/variant" }
sync15-traits = { git = "https://github.com/mozilla/application-services", rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" }
sync15-traits = { git = "https://github.com/mozilla/application-services", rev = "43d69b250d8185ebc53e887b747d85a2a53c7298" }
xpcom = { path = "../../../xpcom/rust/xpcom" }
[dependencies.thin-vec]

View File

@ -2,30 +2,28 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use std::sync::Arc;
use nsstring::nsCString;
use storage_variant::VariantType;
use xpcom::{interfaces::nsIVariant, RefPtr};
/// An operation that runs on the background thread, and optionally passes a
/// result to its callback.
pub enum Ferry<S> {
pub enum Ferry {
Initialize,
LastSync,
SetLastSync(i64),
SyncId,
ResetSyncId,
EnsureCurrentSyncId(String),
StoreIncoming(Vec<String>, Arc<S>),
SetUploaded(i64, Vec<String>, Arc<S>),
SyncFinished(Arc<S>),
StoreIncoming(Vec<String>),
SetUploaded(i64, Vec<String>),
SyncFinished,
Reset,
Wipe,
Finalize,
}
impl<S> Ferry<S> {
impl Ferry {
/// Returns the operation name for debugging and labeling the task
/// runnable.
pub fn name(&self) -> &'static str {
@ -38,7 +36,7 @@ impl<S> Ferry<S> {
Ferry::EnsureCurrentSyncId(_) => concat!(module_path!(), "ensureCurrentSyncId"),
Ferry::StoreIncoming { .. } => concat!(module_path!(), "storeIncoming"),
Ferry::SetUploaded { .. } => concat!(module_path!(), "setUploaded"),
Ferry::SyncFinished(_) => concat!(module_path!(), "sync"),
Ferry::SyncFinished => concat!(module_path!(), "syncFinished"),
Ferry::Reset => concat!(module_path!(), "reset"),
Ferry::Wipe => concat!(module_path!(), "wipe"),
Ferry::Finalize => concat!(module_path!(), "finalize"),
@ -56,8 +54,8 @@ pub enum FerryResult {
Null,
}
impl From<()> for FerryResult {
fn from(_: ()) -> FerryResult {
impl Default for FerryResult {
fn default() -> Self {
FerryResult::Null
}
}

View File

@ -113,5 +113,5 @@ pub use error::{Error, Result};
// Re-export items from `interrupt-support` and `sync15-traits`, so that
// consumers of `golden_gate` don't have to depend on them.
pub use interrupt_support::{Interrupted, Interruptee};
pub use sync15_traits::BridgedEngine;
pub use sync15_traits::{ApplyResults, BridgedEngine};
pub use task::{ApplyTask, FerryTask};

View File

@ -2,10 +2,13 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use std::{fmt::Write, mem, sync::Arc};
use std::{
fmt::Write,
mem,
sync::{Arc, Weak},
};
use atomic_refcell::AtomicRefCell;
use interrupt_support::Interruptee;
use moz_task::{DispatchOptions, Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder};
use nserror::nsresult;
use nsstring::{nsACString, nsCString};
@ -24,17 +27,16 @@ use crate::ferry::{Ferry, FerryResult};
/// A ferry task sends (or ferries) an operation to a bridged engine on a
/// background thread or task queue, and ferries back an optional result to
/// a callback.
pub struct FerryTask<N: ?Sized + BridgedEngine, S> {
engine: Arc<N>,
ferry: Ferry<S>,
pub struct FerryTask<N: ?Sized + BridgedEngine> {
engine: Weak<N>,
ferry: Ferry,
callback: ThreadPtrHandle<mozIBridgedSyncEngineCallback>,
result: AtomicRefCell<Result<FerryResult, N::Error>>,
}
impl<N, S> FerryTask<N, S>
impl<N> FerryTask<N>
where
N: ?Sized + BridgedEngine + Send + Sync + 'static,
S: Interruptee + Send + Sync + 'static,
N::Error: BridgedError,
{
/// Creates a task to initialize the engine.
@ -42,7 +44,7 @@ where
pub fn for_initialize(
engine: &Arc<N>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
Self::with_ferry(engine, Ferry::Initialize, callback)
}
@ -51,7 +53,7 @@ where
pub fn for_last_sync(
engine: &Arc<N>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
Self::with_ferry(engine, Ferry::LastSync, callback)
}
@ -61,7 +63,7 @@ where
engine: &Arc<N>,
last_sync_millis: i64,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
Self::with_ferry(engine, Ferry::SetLastSync(last_sync_millis), callback)
}
@ -70,7 +72,7 @@ where
pub fn for_sync_id(
engine: &Arc<N>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
Self::with_ferry(engine, Ferry::SyncId, callback)
}
@ -80,7 +82,7 @@ where
pub fn for_reset_sync_id(
engine: &Arc<N>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
Self::with_ferry(engine, Ferry::ResetSyncId, callback)
}
@ -92,7 +94,7 @@ where
engine: &Arc<N>,
new_sync_id: &nsACString,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
Self::with_ferry(
engine,
Ferry::EnsureCurrentSyncId(std::str::from_utf8(new_sync_id)?.into()),
@ -104,9 +106,8 @@ where
pub fn for_store_incoming(
engine: &Arc<N>,
incoming_cleartexts: &[nsCString],
signal: &Arc<S>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
let incoming_cleartexts = incoming_cleartexts.iter().try_fold(
Vec::with_capacity(incoming_cleartexts.len()),
|mut cleartexts, cleartext| -> error::Result<_> {
@ -117,11 +118,7 @@ where
Ok(cleartexts)
},
)?;
Self::with_ferry(
engine,
Ferry::StoreIncoming(incoming_cleartexts, Arc::clone(signal)),
callback,
)
Self::with_ferry(engine, Ferry::StoreIncoming(incoming_cleartexts), callback)
}
/// Creates a task to mark a subset of outgoing records as uploaded. This
@ -131,9 +128,8 @@ where
engine: &Arc<N>,
server_modified_millis: i64,
uploaded_ids: &[nsCString],
signal: &Arc<S>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
let uploaded_ids = uploaded_ids.iter().try_fold(
Vec::with_capacity(uploaded_ids.len()),
|mut ids, id| -> error::Result<_> {
@ -143,7 +139,7 @@ where
)?;
Self::with_ferry(
engine,
Ferry::SetUploaded(server_modified_millis, uploaded_ids, Arc::clone(signal)),
Ferry::SetUploaded(server_modified_millis, uploaded_ids),
callback,
)
}
@ -154,10 +150,9 @@ where
#[inline]
pub fn for_sync_finished(
engine: &Arc<N>,
signal: &Arc<S>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
Self::with_ferry(engine, Ferry::SyncFinished(Arc::clone(signal)), callback)
) -> error::Result<FerryTask<N>> {
Self::with_ferry(engine, Ferry::SyncFinished, callback)
}
/// Creates a task to reset all local Sync state for the engine, without
@ -166,7 +161,7 @@ where
pub fn for_reset(
engine: &Arc<N>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
Self::with_ferry(engine, Ferry::Reset, callback)
}
@ -175,7 +170,7 @@ where
pub fn for_wipe(
engine: &Arc<N>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
Self::with_ferry(engine, Ferry::Wipe, callback)
}
@ -184,7 +179,7 @@ where
pub fn for_finalize(
engine: &Arc<N>,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
Self::with_ferry(engine, Ferry::Finalize, callback)
}
@ -193,12 +188,12 @@ where
/// background thread.
fn with_ferry(
engine: &Arc<N>,
ferry: Ferry<S>,
ferry: Ferry,
callback: &mozIBridgedSyncEngineCallback,
) -> error::Result<FerryTask<N, S>> {
) -> error::Result<FerryTask<N>> {
let name = ferry.name();
Ok(FerryTask {
engine: Arc::clone(engine),
engine: Arc::downgrade(engine),
ferry,
callback: ThreadPtrHolder::new(
cstr!("mozIBridgedSyncEngineCallback"),
@ -218,46 +213,68 @@ where
}
}
impl<N, S> Task for FerryTask<N, S>
impl<N> FerryTask<N>
where
N: ?Sized + BridgedEngine,
N::Error: BridgedError,
{
/// Runs the task on the background thread. This is split out into its own
/// method to make error handling easier.
fn inner_run(&self) -> Result<FerryResult, N::Error> {
let engine = match self.engine.upgrade() {
Some(outer) => outer,
None => return Err(Error::DidNotRun(self.ferry.name()).into()),
};
Ok(match &self.ferry {
Ferry::Initialize => {
engine.initialize()?;
FerryResult::default()
}
Ferry::LastSync => FerryResult::LastSync(engine.last_sync()?),
Ferry::SetLastSync(last_sync_millis) => {
engine.set_last_sync(*last_sync_millis)?;
FerryResult::default()
}
Ferry::SyncId => FerryResult::SyncId(engine.sync_id()?),
Ferry::ResetSyncId => FerryResult::AssignedSyncId(engine.reset_sync_id()?),
Ferry::EnsureCurrentSyncId(new_sync_id) => {
FerryResult::AssignedSyncId(engine.ensure_current_sync_id(&*new_sync_id)?)
}
Ferry::StoreIncoming(incoming_cleartexts) => {
engine.store_incoming(incoming_cleartexts.as_slice())?;
FerryResult::default()
}
Ferry::SetUploaded(server_modified_millis, uploaded_ids) => {
engine.set_uploaded(*server_modified_millis, uploaded_ids.as_slice())?;
FerryResult::default()
}
Ferry::SyncFinished => {
engine.sync_finished()?;
FerryResult::default()
}
Ferry::Reset => {
engine.reset()?;
FerryResult::default()
}
Ferry::Wipe => {
engine.wipe()?;
FerryResult::default()
}
Ferry::Finalize => {
engine.finalize()?;
FerryResult::default()
}
})
}
}
impl<N> Task for FerryTask<N>
where
N: ?Sized + BridgedEngine,
S: Interruptee,
N::Error: BridgedError,
{
fn run(&self) {
*self.result.borrow_mut() = match &self.ferry {
Ferry::Initialize => self.engine.initialize().map(FerryResult::from),
Ferry::LastSync => self.engine.last_sync().map(FerryResult::LastSync),
Ferry::SetLastSync(last_sync_millis) => self
.engine
.set_last_sync(*last_sync_millis)
.map(FerryResult::from),
Ferry::SyncId => self.engine.sync_id().map(FerryResult::SyncId),
Ferry::ResetSyncId => self.engine.reset_sync_id().map(FerryResult::AssignedSyncId),
Ferry::EnsureCurrentSyncId(new_sync_id) => self
.engine
.ensure_current_sync_id(&*new_sync_id)
.map(FerryResult::AssignedSyncId),
Ferry::StoreIncoming(incoming_cleartexts, signal) => self
.engine
.store_incoming(incoming_cleartexts.as_slice(), signal.as_ref())
.map(FerryResult::from),
Ferry::SetUploaded(server_modified_millis, uploaded_ids, signal) => self
.engine
.set_uploaded(
*server_modified_millis,
uploaded_ids.as_slice(),
signal.as_ref(),
)
.map(FerryResult::from),
Ferry::SyncFinished(signal) => self
.engine
.sync_finished(signal.as_ref())
.map(FerryResult::from),
Ferry::Reset => self.engine.reset().map(FerryResult::from),
Ferry::Wipe => self.engine.wipe().map(FerryResult::from),
Ferry::Finalize => self.engine.finalize().map(FerryResult::from),
};
*self.result.borrow_mut() = self.inner_run();
}
fn done(&self) -> Result<(), nsresult> {
@ -280,39 +297,45 @@ where
/// An apply task ferries incoming records to an engine on a background
/// thread, and ferries back records to upload. It's separate from
/// `FerryTask` because its callback type is different.
pub struct ApplyTask<N: ?Sized + BridgedEngine, S> {
engine: Arc<N>,
signal: Arc<S>,
pub struct ApplyTask<N: ?Sized + BridgedEngine> {
engine: Weak<N>,
callback: ThreadPtrHandle<mozIBridgedSyncEngineApplyCallback>,
result: AtomicRefCell<Result<ApplyResults, N::Error>>,
}
impl<N, S> ApplyTask<N, S>
impl<N> ApplyTask<N>
where
N: ?Sized + BridgedEngine,
N::Error: BridgedError,
{
/// Returns the task name for debugging.
pub fn name() -> &'static str {
concat!(module_path!(), "apply")
}
/// Runs the task on the background thread.
fn inner_run(&self) -> Result<ApplyResults, N::Error> {
let engine = match self.engine.upgrade() {
Some(outer) => outer,
None => return Err(Error::DidNotRun(Self::name()).into()),
};
Ok(engine.apply()?)
}
}
impl<N, S> ApplyTask<N, S>
impl<N> ApplyTask<N>
where
N: ?Sized + BridgedEngine + Send + Sync + 'static,
S: Interruptee + Send + Sync + 'static,
N::Error: BridgedError,
{
/// Creates a task. The `callback` is bound to the current thread, and will
/// be called once, after the records are applied on the background thread.
pub fn new(
engine: &Arc<N>,
signal: &Arc<S>,
callback: &mozIBridgedSyncEngineApplyCallback,
) -> error::Result<ApplyTask<N, S>> {
) -> error::Result<ApplyTask<N>> {
Ok(ApplyTask {
engine: Arc::clone(engine),
signal: Arc::clone(signal),
engine: Arc::downgrade(engine),
callback: ThreadPtrHolder::new(
cstr!("mozIBridgedSyncEngineApplyCallback"),
RefPtr::new(callback),
@ -329,14 +352,13 @@ where
}
}
impl<N, S> Task for ApplyTask<N, S>
impl<N> Task for ApplyTask<N>
where
N: ?Sized + BridgedEngine,
S: Interruptee,
N::Error: BridgedError,
{
fn run(&self) {
*self.result.borrow_mut() = self.engine.apply(self.signal.as_ref());
*self.result.borrow_mut() = self.inner_run();
}
fn done(&self) -> Result<(), nsresult> {

View File

@ -47,11 +47,7 @@ class BridgedStore {
// pass attributes like `modified` and `sortindex`, which are not part
// of the cleartext.
let incomingCleartexts = chunk.map(record => record.cleartextToString());
await promisifyWithSignal(
null,
this.engine._bridge.storeIncoming,
incomingCleartexts
);
await promisify(this.engine._bridge.storeIncoming, incomingCleartexts);
}
// Array of failed records.
return [];
@ -194,10 +190,6 @@ function BridgedEngine(bridge, name, service) {
this._bridge = bridge;
this._bridge.logger = new LogAdapter(this._log);
// The maximum amount of time that we should wait for the bridged engine
// to apply incoming records before aborting.
this._applyTimeoutMillis = 5 * 60 * 60 * 1000; // 5 minutes
}
BridgedEngine.prototype = {
@ -308,7 +300,7 @@ BridgedEngine.prototype = {
// keep the records around (for example, in a temp table), and automatically
// write them back on `syncFinished`?
await this.initialize();
await promisifyWithSignal(null, this._bridge.syncFinished);
await promisify(this._bridge.syncFinished);
},
/**
@ -332,34 +324,18 @@ BridgedEngine.prototype = {
await super._processIncoming(newitems);
await this.initialize();
// TODO: We could consider having a per-sync watchdog instead; for
// example, time out after 5 minutes total, including any network
// latency. `promisifyWithSignal` makes this flexible.
let watchdog = this._newWatchdog();
watchdog.start(this._applyTimeoutMillis);
try {
let outgoingRecords = await promisifyWithSignal(
watchdog.signal,
this._bridge.apply
);
let changeset = {};
for (let record of outgoingRecords) {
// TODO: It would be nice if we could pass the cleartext through as-is
// here, too, instead of parsing and re-wrapping for `BridgedRecord`.
let cleartext = JSON.parse(record);
changeset[cleartext.id] = {
synced: false,
cleartext,
};
}
this._modified.replace(changeset);
} finally {
watchdog.stop();
if (watchdog.abortReason) {
this._log.warn(`Aborting bookmark merge: ${watchdog.abortReason}`);
}
let outgoingRecords = await promisify(this._bridge.apply);
let changeset = {};
for (let record of outgoingRecords) {
// TODO: It would be nice if we could pass the cleartext through as-is
// here, too, instead of parsing and re-wrapping for `BridgedRecord`.
let cleartext = JSON.parse(record);
changeset[cleartext.id] = {
synced: false,
cleartext,
};
}
this._modified.replace(changeset);
},
/**
@ -370,12 +346,7 @@ BridgedEngine.prototype = {
*/
async _onRecordsWritten(succeeded, failed, serverModifiedTime) {
await this.initialize();
await promisifyWithSignal(
null,
this._bridge.setUploaded,
serverModifiedTime,
succeeded
);
await promisify(this._bridge.setUploaded, serverModifiedTime, succeeded);
},
async _createTombstone() {
@ -426,35 +397,6 @@ function promisify(func, ...params) {
});
}
// Like `promisify`, but takes an `AbortSignal` for cancelable
// operations.
function promisifyWithSignal(signal, func, ...params) {
if (!signal) {
return promisify(func, ...params);
}
return new Promise((resolve, reject) => {
if (signal.aborted) {
// TODO: Record more specific operation names, so we can see which
// ones get interrupted most in telemetry.
throw new InterruptedError("Interrupted before starting operation");
}
function onAbort() {
signal.removeEventListener("abort", onAbort);
op.cancel(Cr.NS_ERROR_ABORT);
}
let op = func(...params, {
handleSuccess(result) {
signal.removeEventListener("abort", onAbort);
resolve(result);
},
handleError(code, message) {
reject(transformError(code, message));
},
});
signal.addEventListener("abort", onAbort);
});
}
class BridgedChangeset extends Changeset {
// Only `_reconcile` calls `getModifiedTimestamp` and `has`, and the buffered
// engine does its own reconciliation.

View File

@ -105,7 +105,6 @@ add_task(async function test_interface() {
},
].map(r => JSON.stringify(r));
CommonUtils.nextTick(() => callback.handleSuccess(outgoingRecords));
return { cancel() {} };
}
setUploaded(millis, ids, callback) {
@ -115,7 +114,6 @@ add_task(async function test_interface() {
);
this.uploadedIDs.push(...ids);
CommonUtils.nextTick(() => callback.handleSuccess());
return { cancel() {} };
}
syncFinished(callback) {
@ -125,7 +123,6 @@ add_task(async function test_interface() {
);
this.wasSynced = true;
CommonUtils.nextTick(() => callback.handleSuccess());
return { cancel() {} };
}
reset(callback) {

View File

@ -1 +1 @@
{"files":{"Cargo.toml":"656c4c4af39bcf924098be33996360250f9610ee3a4090b8152b68bdad03c46e","README.md":"396105211d8ce7f40b05d8062d7ab55d99674555f3ac81c061874ae26656ed7e","src/bridged_engine.rs":"b4d45cd43db3e5926df614ae9706b8d1a5bb96860577463d05b56a4213532ec1","src/changeset.rs":"442aa92b5130ec0f8f2b0054acb399c547380e0060015cbf4ca7a72027440d54","src/client.rs":"6be4f550ade823fafc350c5490e031f90a4af833a9bba9739b05568464255a74","src/lib.rs":"c1ca44e7bb6477b8018bd554479021dbf52754e64577185b3f7e208ae45bf754","src/payload.rs":"09db1a444e7893990a4f03cb16263b9c15abc9e48ec4f1343227be1b490865a5","src/request.rs":"9e656ec487e53c7485643687e605d73bb25e138056e920d6f4b7d63fc6a8c460","src/server_timestamp.rs":"43d1b98a90e55e49380a0b66c209c9eb393e2aeaa27d843a4726d93cdd4cea02","src/store.rs":"10e215dd24270b6bec10903ac1d5274ce997eb437134f43be7de44e36fb9d1e4","src/telemetry.rs":"027befb099a6fcded3457f7e566296548a0898ff613267190621856b9ef288f6"},"package":null}
{"files":{"Cargo.toml":"656c4c4af39bcf924098be33996360250f9610ee3a4090b8152b68bdad03c46e","README.md":"396105211d8ce7f40b05d8062d7ab55d99674555f3ac81c061874ae26656ed7e","src/bridged_engine.rs":"3387cea6cdfe9cfbc837d52e79a5d8e5be75e9d468a030760e8e7b3c6784462d","src/changeset.rs":"442aa92b5130ec0f8f2b0054acb399c547380e0060015cbf4ca7a72027440d54","src/client.rs":"6be4f550ade823fafc350c5490e031f90a4af833a9bba9739b05568464255a74","src/lib.rs":"c1ca44e7bb6477b8018bd554479021dbf52754e64577185b3f7e208ae45bf754","src/payload.rs":"09db1a444e7893990a4f03cb16263b9c15abc9e48ec4f1343227be1b490865a5","src/request.rs":"9e656ec487e53c7485643687e605d73bb25e138056e920d6f4b7d63fc6a8c460","src/server_timestamp.rs":"43d1b98a90e55e49380a0b66c209c9eb393e2aeaa27d843a4726d93cdd4cea02","src/store.rs":"10e215dd24270b6bec10903ac1d5274ce997eb437134f43be7de44e36fb9d1e4","src/telemetry.rs":"027befb099a6fcded3457f7e566296548a0898ff613267190621856b9ef288f6"},"package":null}

View File

@ -2,10 +2,6 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use std::{sync::Mutex, sync::MutexGuard, sync::PoisonError};
use interrupt_support::Interruptee;
/// A bridged Sync engine implements all the methods needed to support
/// Desktop Sync.
pub trait BridgedEngine {
@ -51,31 +47,22 @@ pub trait BridgedEngine {
/// times per sync, once for each batch. Implementations can use the
/// signal to check if the operation was aborted, and cancel any
/// pending work.
fn store_incoming(
&self,
incoming_cleartexts: &[String],
signal: &dyn Interruptee,
) -> Result<(), Self::Error>;
fn store_incoming(&self, incoming_cleartexts: &[String]) -> Result<(), Self::Error>;
/// Applies all staged records, reconciling changes on both sides and
/// resolving conflicts. Returns a list of records to upload.
fn apply(&self, signal: &dyn Interruptee) -> Result<ApplyResults, Self::Error>;
fn apply(&self) -> Result<ApplyResults, Self::Error>;
/// Indicates that the given record IDs were uploaded successfully to the
/// server. This is called multiple times per sync, once for each batch
/// upload.
fn set_uploaded(
&self,
server_modified_millis: i64,
ids: &[String],
signal: &dyn Interruptee,
) -> Result<(), Self::Error>;
fn set_uploaded(&self, server_modified_millis: i64, ids: &[String]) -> Result<(), Self::Error>;
/// Indicates that all records have been uploaded. At this point, any record
/// IDs marked for upload that haven't been passed to `set_uploaded`, can be
/// assumed to have failed: for example, because the server rejected a record
/// with an invalid TTL or sort index.
fn sync_finished(&self, signal: &dyn Interruptee) -> Result<(), Self::Error>;
fn sync_finished(&self) -> Result<(), Self::Error>;
/// Resets all local Sync state, including any change flags, mirrors, and
/// the last sync time, such that the next sync is treated as a first sync
@ -122,76 +109,3 @@ impl From<Vec<String>> for ApplyResults {
}
}
}
/// A blanket implementation of `BridgedEngine` for any `Mutex<BridgedEngine>`.
/// This is provided for convenience, since we expect most bridges to hold
/// their engines in an `Arc<Mutex<impl BridgedEngine>>`.
impl<E> BridgedEngine for Mutex<E>
where
E: BridgedEngine,
E::Error: for<'a> From<PoisonError<MutexGuard<'a, E>>>,
{
type Error = E::Error;
fn initialize(&self) -> Result<(), Self::Error> {
self.lock()?.initialize()
}
fn last_sync(&self) -> Result<i64, Self::Error> {
self.lock()?.last_sync()
}
fn set_last_sync(&self, millis: i64) -> Result<(), Self::Error> {
self.lock()?.set_last_sync(millis)
}
fn store_incoming(
&self,
incoming_cleartexts: &[String],
signal: &dyn Interruptee,
) -> Result<(), Self::Error> {
self.lock()?.store_incoming(incoming_cleartexts, signal)
}
fn apply(&self, signal: &dyn Interruptee) -> Result<ApplyResults, Self::Error> {
self.lock()?.apply(signal)
}
fn set_uploaded(
&self,
server_modified_millis: i64,
ids: &[String],
signal: &dyn Interruptee,
) -> Result<(), Self::Error> {
self.lock()?
.set_uploaded(server_modified_millis, ids, signal)
}
fn sync_finished(&self, signal: &dyn Interruptee) -> Result<(), Self::Error> {
self.lock()?.sync_finished(signal)
}
fn reset(&self) -> Result<(), Self::Error> {
self.lock()?.reset()
}
fn wipe(&self) -> Result<(), Self::Error> {
self.lock()?.wipe()
}
fn finalize(&self) -> Result<(), Self::Error> {
self.lock()?.finalize()
}
fn sync_id(&self) -> Result<Option<String>, Self::Error> {
self.lock()?.sync_id()
}
fn reset_sync_id(&self) -> Result<String, Self::Error> {
self.lock()?.reset_sync_id()
}
fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String, Self::Error> {
self.lock()?.ensure_current_sync_id(new_sync_id)
}
}

View File

@ -1 +1 @@
{"files":{"Cargo.toml":"23ed53b7db21b1015cbb1deafe950f80068691dfe7f2256e9f9237a94910a4d8","README.md":"1fd617294339930ee1ad5172377648b268cce0216fc3971facbfe7c6839e9ab1","build.rs":"2b827a62155a3d724cdb4c198270ea467439e537403f82fa873321ac55a69a63","sql/create_schema.sql":"d50b22cb17fc5d4e2aa4d001e853bd2f67eb3ffdbb1ac29013067dceacaec80e","src/api.rs":"578862ecc35d1bfa0c96329702cc7ffd743d749a170d808a729f91f584b33b3d","src/db.rs":"8933ce788e1044aa654eb0ed177596e327057dd49001287d3fbfcf57cc6bb728","src/error.rs":"86ba215ec5a889d1ccca9dcd141e42f75914744a4803598ccf3894da4a7f7475","src/lib.rs":"79685854649ffd0c1664f67cd0264f86c93f49e974de217a94a005b9d9d1c39d","src/schema.rs":"ac475285fb065fdf639d71e34d4e12663eb0650eefe3e151d223eb4fe3a667ef","src/store.rs":"686003abb14ffd1ef06f8a09a74c463ffaaf8e283ae7b92809122806e7085013"},"package":null}
{"files":{"Cargo.toml":"23ed53b7db21b1015cbb1deafe950f80068691dfe7f2256e9f9237a94910a4d8","README.md":"1fd617294339930ee1ad5172377648b268cce0216fc3971facbfe7c6839e9ab1","build.rs":"2b827a62155a3d724cdb4c198270ea467439e537403f82fa873321ac55a69a63","sql/create_schema.sql":"d50b22cb17fc5d4e2aa4d001e853bd2f67eb3ffdbb1ac29013067dceacaec80e","src/api.rs":"578862ecc35d1bfa0c96329702cc7ffd743d749a170d808a729f91f584b33b3d","src/db.rs":"6cf7fdf2a60a640aa5ae9e9b9fc86a45580ec2ce80947fc8991d7da56bc972da","src/error.rs":"86ba215ec5a889d1ccca9dcd141e42f75914744a4803598ccf3894da4a7f7475","src/lib.rs":"79685854649ffd0c1664f67cd0264f86c93f49e974de217a94a005b9d9d1c39d","src/schema.rs":"ac475285fb065fdf639d71e34d4e12663eb0650eefe3e151d223eb4fe3a667ef","src/store.rs":"39ba1aa7e68a58e7a59e747e04144694f9835d88f0cd6514cb0fd8f25285fcd9"},"package":null}

View File

@ -7,11 +7,12 @@ use crate::schema;
use rusqlite::types::{FromSql, ToSql};
use rusqlite::Connection;
use rusqlite::OpenFlags;
use sql_support::ConnExt;
use sql_support::{ConnExt, SqlInterruptHandle, SqlInterruptScope};
use std::fs;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::result;
use std::sync::{atomic::AtomicUsize, Arc};
use url::Url;
/// A `StorageDb` wraps a read-write SQLite connection, and handles schema
@ -23,6 +24,7 @@ use url::Url;
/// store. It's still a bit overkill, but there's only so many yaks in a day.
pub struct StorageDb {
writer: Connection,
interrupt_counter: Arc<AtomicUsize>,
}
impl StorageDb {
/// Create a new, or fetch an already open, StorageDb backed by a file on disk.
@ -50,7 +52,10 @@ impl StorageDb {
let conn = Connection::open_with_flags(db_path.clone(), flags)?;
match init_sql_connection(&conn, true) {
Ok(()) => Ok(Self { writer: conn }),
Ok(()) => Ok(Self {
writer: conn,
interrupt_counter: Arc::new(AtomicUsize::new(0)),
}),
Err(e) => {
// like with places, failure to upgrade means "you lose your data"
if let ErrorKind::DatabaseUpgradeError = e.kind() {
@ -63,6 +68,29 @@ impl StorageDb {
}
}
/// Returns an interrupt handle for this database connection. This handle
/// should be handed out to consumers that want to interrupt long-running
/// operations. It's FFI-safe, and `Send + Sync`, since it only makes sense
/// to use from another thread. Calling `interrupt` on the handle sets a
/// flag on all currently active interrupt scopes.
pub fn interrupt_handle(&self) -> SqlInterruptHandle {
SqlInterruptHandle::new(
self.writer.get_interrupt_handle(),
self.interrupt_counter.clone(),
)
}
/// Creates an object that knows when it's been interrupted. A new interrupt
/// scope should be created inside each method that does long-running
/// database work, like batch writes. This is the other side of a
/// `SqlInterruptHandle`: when a handle is interrupted, it flags all active
/// interrupt scopes as interrupted, too, so that they can abort pending
/// work as soon as possible.
#[allow(dead_code)]
pub fn begin_interrupt_scope(&self) -> SqlInterruptScope {
SqlInterruptScope::new(self.interrupt_counter.clone())
}
/// Closes the database connection. If there are any unfinalized prepared
/// statements on the connection, `close` will fail and the `StorageDb` will
/// be returned to the caller so that it can retry, drop (via `mem::drop`)
@ -71,9 +99,19 @@ impl StorageDb {
/// Keep in mind that dropping the connection tries to close it again, and
/// panics on error.
pub fn close(self) -> result::Result<(), (StorageDb, Error)> {
self.writer
.close()
.map_err(|(writer, err)| (StorageDb { writer }, err.into()))
let StorageDb {
writer,
interrupt_counter,
} = self;
writer.close().map_err(|(writer, err)| {
(
StorageDb {
writer,
interrupt_counter,
},
err.into(),
)
})
}
}

View File

@ -9,6 +9,7 @@ use std::path::Path;
use std::result;
use serde_json::Value as JsonValue;
use sql_support::SqlInterruptHandle;
/// A store is used to access `storage.sync` data. It manages an underlying
/// database connection, and exposes methods for reading and writing storage
@ -42,6 +43,11 @@ impl Store {
})
}
/// Returns an interrupt handle for this store.
pub fn interrupt_handle(&self) -> SqlInterruptHandle {
self.db.interrupt_handle()
}
/// Sets one or more JSON key-value pairs for an extension ID. Returns a
/// list of changes, with existing and new values for each key in `val`.
pub fn set(&self, ext_id: &str, val: JsonValue) -> Result<StorageChanges> {

View File

@ -16,4 +16,4 @@ xpcom = { path = "../../../../../xpcom/rust/xpcom" }
serde = "1"
serde_json = "1"
storage_variant = { path = "../../../../../storage/variant" }
webext-storage = { git = "https://github.com/mozilla/application-services", rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7" }
webext-storage = { git = "https://github.com/mozilla/application-services", rev = "43d69b250d8185ebc53e887b747d85a2a53c7298" }

View File

@ -55,7 +55,7 @@ unic-langid = { version = "0.8", features = ["likelysubtags"] }
unic-langid-ffi = { path = "../../../../intl/locale/rust/unic-langid-ffi" }
fluent-langneg = { version = "0.12.1", features = ["cldr"] }
fluent-langneg-ffi = { path = "../../../../intl/locale/rust/fluent-langneg-ffi" }
webext-storage = { git = "https://github.com/mozilla/application-services", rev = "d8a50bb0b010ab7289fe2d2864cc3eb9687a52e7", optional = true }
webext-storage = { git = "https://github.com/mozilla/application-services", rev = "43d69b250d8185ebc53e887b747d85a2a53c7298", optional = true }
golden_gate = { path = "../../../../services/sync/golden_gate", optional = true }
# Note: `modern_sqlite` means rusqlite's bindings file be for a sqlite with