Bug 1817904 (part 1) - Vendor a new application-services. r=skhamis,glandium

Differential Revision: https://phabricator.services.mozilla.com/D173242
This commit is contained in:
Mark Hammond 2023-03-23 03:28:07 +00:00
parent 60d4a6edc5
commit 333488dcfc
13 changed files with 367 additions and 206 deletions

View File

@ -75,9 +75,9 @@ git = "https://github.com/mozilla-spidermonkey/jsparagus"
rev = "64ba08e24749616de2344112f226d1ef4ba893ae"
replace-with = "vendored-sources"
[source."git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683"]
[source."git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883"]
git = "https://github.com/mozilla/application-services"
rev = "fe2867dbe82a2aaa85a856648107be94b1534683"
rev = "86c84c217036c12283d19368867323a66bf35883"
replace-with = "vendored-sources"
[source."git+https://github.com/mozilla/audioipc?rev=fb7a2b12ced3b43e6a268621989c6191d1ed7e39"]

22
Cargo.lock generated
View File

@ -1609,7 +1609,7 @@ dependencies = [
[[package]]
name = "error-support"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
dependencies = [
"error-support-macros",
"lazy_static",
@ -1621,7 +1621,7 @@ dependencies = [
[[package]]
name = "error-support-macros"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
dependencies = [
"proc-macro2",
"quote",
@ -2732,7 +2732,7 @@ dependencies = [
[[package]]
name = "interrupt-support"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
dependencies = [
"lazy_static",
"parking_lot 0.12.999",
@ -3854,7 +3854,7 @@ dependencies = [
[[package]]
name = "nss_build_common"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
[[package]]
name = "nsstring"
@ -5072,7 +5072,7 @@ dependencies = [
[[package]]
name = "sql-support"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
dependencies = [
"ffi-support",
"interrupt-support",
@ -5254,7 +5254,7 @@ dependencies = [
[[package]]
name = "sync-guid"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
dependencies = [
"base64",
"rand 0.8.5",
@ -5265,7 +5265,7 @@ dependencies = [
[[package]]
name = "sync15"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
dependencies = [
"anyhow",
"error-support",
@ -5295,7 +5295,7 @@ dependencies = [
[[package]]
name = "tabs"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
dependencies = [
"anyhow",
"error-support",
@ -6072,7 +6072,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "viaduct"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
dependencies = [
"ffi-support",
"log",
@ -6229,14 +6229,16 @@ dependencies = [
[[package]]
name = "webext-storage"
version = "0.1.0"
source = "git+https://github.com/mozilla/application-services?rev=fe2867dbe82a2aaa85a856648107be94b1534683#fe2867dbe82a2aaa85a856648107be94b1534683"
source = "git+https://github.com/mozilla/application-services?rev=86c84c217036c12283d19368867323a66bf35883#86c84c217036c12283d19368867323a66bf35883"
dependencies = [
"anyhow",
"error-support",
"ffi-support",
"interrupt-support",
"lazy_static",
"log",
"nss_build_common",
"parking_lot 0.12.999",
"rusqlite",
"serde",
"serde_derive",

View File

@ -170,12 +170,12 @@ warp = { git = "https://github.com/glandium/warp", rev = "4af45fae95bc98b0eba1ef
cssparser = { git = "https://github.com/servo/rust-cssparser", rev = "45bc47e2bcb846f1efb5aea156be5fe7d18624bf" }
# application-services overrides to make updating them all simpler.
interrupt-support = { git = "https://github.com/mozilla/application-services", rev = "fe2867dbe82a2aaa85a856648107be94b1534683" }
sql-support = { git = "https://github.com/mozilla/application-services", rev = "fe2867dbe82a2aaa85a856648107be94b1534683" }
sync15 = { git = "https://github.com/mozilla/application-services", rev = "fe2867dbe82a2aaa85a856648107be94b1534683" }
tabs = { git = "https://github.com/mozilla/application-services", rev = "fe2867dbe82a2aaa85a856648107be94b1534683" }
viaduct = { git = "https://github.com/mozilla/application-services", rev = "fe2867dbe82a2aaa85a856648107be94b1534683" }
webext-storage = { git = "https://github.com/mozilla/application-services", rev = "fe2867dbe82a2aaa85a856648107be94b1534683" }
interrupt-support = { git = "https://github.com/mozilla/application-services", rev = "86c84c217036c12283d19368867323a66bf35883" }
sql-support = { git = "https://github.com/mozilla/application-services", rev = "86c84c217036c12283d19368867323a66bf35883" }
sync15 = { git = "https://github.com/mozilla/application-services", rev = "86c84c217036c12283d19368867323a66bf35883" }
tabs = { git = "https://github.com/mozilla/application-services", rev = "86c84c217036c12283d19368867323a66bf35883" }
viaduct = { git = "https://github.com/mozilla/application-services", rev = "86c84c217036c12283d19368867323a66bf35883" }
webext-storage = { git = "https://github.com/mozilla/application-services", rev = "86c84c217036c12283d19368867323a66bf35883" }
# Patch mio 0.6 to use winapi 0.3 and miow 0.3, getting rid of winapi 0.2.
# There is not going to be new version of mio 0.6, mio now being >= 0.7.11.

View File

@ -1 +1 @@
{"files":{"Cargo.toml":"0102c944a52a98d1b7c6ba7a5f3ec69e28e5a05f0c58a743f0c5ba020988122a","README.md":"6d4ff5b079ac5340d18fa127f583e7ad793c5a2328b8ecd12c3fc723939804f2","src/bso/content.rs":"d2d650f4932e9a7068a25dd7df0085b92cd8976a0635320e6ae306d5a425075c","src/bso/crypto.rs":"27602dcccb37d3a55620ee4e16b705da455d49af575de115c7c79c0178eb1d6d","src/bso/mod.rs":"09e723dc7e99295ecafdcadffaf604d66ea27cf2b7f1fd9ab3cac4f4698ff6a7","src/bso/test_utils.rs":"4ec5a2df5e1c0ec14dc770681e959bdcef6ef04f6fde435999197f46a8ae4831","src/client/coll_state.rs":"4301526b987532cd7ebd5d089980c9524aa29360e08fa0c1f5437696ed354822","src/client/coll_update.rs":"627e2266c5c8f1c5e0bc83061fa14c1287c7d17c78b47882543521f75543b499","src/client/collection_keys.rs":"c27b2277a3a52033b58ab01490fc2ea7007494195dd5e6dc2c6931a4ca96795a","src/client/mod.rs":"8f588d4a035cf79d96f2500f06d5651c1a7c566127c456ffa5429811ddce3fd6","src/client/request.rs":"8841524e37d8195867bdf6ba98c75f610cf47a4644adeebd6372cc6713f2260a","src/client/state.rs":"4e31193ef2471c1dfabf1c6a391bcb95e14ddb45855786a4194ff187d5c9347c","src/client/status.rs":"f445a8765dac9789444e23b5145148413407bb1d18a15ef56682243997f591bf","src/client/storage_client.rs":"8de72d4ba3ca4f68c8e1898466de83a2b543545a18679800cb4f7fbda2dc3183","src/client/sync.rs":"a04478c4e31bbf39f593decfc4844bfb7e678907031f0a93d1e005cf0047ebb4","src/client/sync_multiple.rs":"3729d4afd90ab1bd9982a3506252c99d8f37619cc1792ef4feba352ad01a7192","src/client/token.rs":"b268759d31e0fe17e0e2a428694cd9a317fcfbdd52f023d5d8c7cc6f00f1a102","src/client/util.rs":"71cc70ee41f821f53078675e636e9fad9c6046fa1a989e37f5487e340a2277d6","src/client_types.rs":"3c3cac1540b92482f43660d9e43bdde8481c4cc1a98253a68c80e791231f5976","src/clients_engine/engine.rs":"0eaa078978c95fc96284b48a7458ebff49f0aa70b1148ef019b9350b5ba4d0d4","src/clients_engine/mod.rs":"461729e6f89b66b2cbd89b041a03d4d6a8ba582284ed4f3015cb13e1a0c6da97","src/clients_engine/record.rs":"69357413571d688eea3a5207f9b88088cde285b9373c7bd4ea1e018dbc823dd2","src/clients_engine/ser.rs":"9796e44ed7daf04f22afbb51238ac25fd0de1438b72181351b4ca29fd70fd429","src/device_type.rs":"fe217453f19b374abcc10e9f503b25f4f712b555497bebe5aefcf2e9b258d28e","src/enc_payload.rs":"aa3eea7df49b24cd59831680a47c417b73a3e36e6b0f3f4baf14ca66bd68be6b","src/engine/bridged_engine.rs":"9c0d602b3553932e77a87caba9262d3a0fc146500c6d46f1770273be6636d064","src/engine/changeset.rs":"949c520e508da0158b2df9fa20235f942ad8096cebbfc942b0fae41c35830c6b","src/engine/mod.rs":"cb638c2170c3785785877dfd059f26fbbfeae5edc599745dd861e4c299d310ad","src/engine/request.rs":"5923025fb9550178339f880a1bf8526d8e853e7a0b2bce6d9d687cc808ac0085","src/engine/sync_engine.rs":"9a5c6993dcceec8f82ead97a6d1840c7ed7dc6b326f7234c77f18848b6baf836","src/error.rs":"a45cfe02e6301f473c34678b694943c1a04308b8c292c6e0448bf495194c3b5e","src/key_bundle.rs":"ff8b10b95add934ecbc434b37ed089805886828ed159fd38bd692d1f01d06f7f","src/lib.rs":"eca1fa801820238141c4badeeec45d430aaec8b2ce088446ef94456d51014889","src/record_types.rs":"02bb3d352fb808131d298f9b90d9c95b7e9e0138b97c5401f3b9fdacc5562f44","src/server_timestamp.rs":"0020f31971ccbfc485894cabc3087459d42252b86d7de07f2136997864b0373b","src/telemetry.rs":"35e0313a052f16326e451e3d6e371337c1d71a471f32234ad9649fc1fa9f2237"},"package":null}
{"files":{"Cargo.toml":"0102c944a52a98d1b7c6ba7a5f3ec69e28e5a05f0c58a743f0c5ba020988122a","README.md":"6d4ff5b079ac5340d18fa127f583e7ad793c5a2328b8ecd12c3fc723939804f2","src/bso/content.rs":"d2d650f4932e9a7068a25dd7df0085b92cd8976a0635320e6ae306d5a425075c","src/bso/crypto.rs":"27602dcccb37d3a55620ee4e16b705da455d49af575de115c7c79c0178eb1d6d","src/bso/mod.rs":"09e723dc7e99295ecafdcadffaf604d66ea27cf2b7f1fd9ab3cac4f4698ff6a7","src/bso/test_utils.rs":"4ec5a2df5e1c0ec14dc770681e959bdcef6ef04f6fde435999197f46a8ae4831","src/client/coll_state.rs":"4301526b987532cd7ebd5d089980c9524aa29360e08fa0c1f5437696ed354822","src/client/coll_update.rs":"627e2266c5c8f1c5e0bc83061fa14c1287c7d17c78b47882543521f75543b499","src/client/collection_keys.rs":"c27b2277a3a52033b58ab01490fc2ea7007494195dd5e6dc2c6931a4ca96795a","src/client/mod.rs":"8f588d4a035cf79d96f2500f06d5651c1a7c566127c456ffa5429811ddce3fd6","src/client/request.rs":"8841524e37d8195867bdf6ba98c75f610cf47a4644adeebd6372cc6713f2260a","src/client/state.rs":"4e31193ef2471c1dfabf1c6a391bcb95e14ddb45855786a4194ff187d5c9347c","src/client/status.rs":"f445a8765dac9789444e23b5145148413407bb1d18a15ef56682243997f591bf","src/client/storage_client.rs":"8de72d4ba3ca4f68c8e1898466de83a2b543545a18679800cb4f7fbda2dc3183","src/client/sync.rs":"a04478c4e31bbf39f593decfc4844bfb7e678907031f0a93d1e005cf0047ebb4","src/client/sync_multiple.rs":"3729d4afd90ab1bd9982a3506252c99d8f37619cc1792ef4feba352ad01a7192","src/client/token.rs":"b268759d31e0fe17e0e2a428694cd9a317fcfbdd52f023d5d8c7cc6f00f1a102","src/client/util.rs":"71cc70ee41f821f53078675e636e9fad9c6046fa1a989e37f5487e340a2277d6","src/client_types.rs":"3c3cac1540b92482f43660d9e43bdde8481c4cc1a98253a68c80e791231f5976","src/clients_engine/engine.rs":"0eaa078978c95fc96284b48a7458ebff49f0aa70b1148ef019b9350b5ba4d0d4","src/clients_engine/mod.rs":"461729e6f89b66b2cbd89b041a03d4d6a8ba582284ed4f3015cb13e1a0c6da97","src/clients_engine/record.rs":"69357413571d688eea3a5207f9b88088cde285b9373c7bd4ea1e018dbc823dd2","src/clients_engine/ser.rs":"9796e44ed7daf04f22afbb51238ac25fd0de1438b72181351b4ca29fd70fd429","src/device_type.rs":"fe217453f19b374abcc10e9f503b25f4f712b555497bebe5aefcf2e9b258d28e","src/enc_payload.rs":"aa3eea7df49b24cd59831680a47c417b73a3e36e6b0f3f4baf14ca66bd68be6b","src/engine/bridged_engine.rs":"5c0748358a7b3040442bc81122400ab7b26acf728b194dbeee8a2166dc653e11","src/engine/changeset.rs":"949c520e508da0158b2df9fa20235f942ad8096cebbfc942b0fae41c35830c6b","src/engine/mod.rs":"cb638c2170c3785785877dfd059f26fbbfeae5edc599745dd861e4c299d310ad","src/engine/request.rs":"5923025fb9550178339f880a1bf8526d8e853e7a0b2bce6d9d687cc808ac0085","src/engine/sync_engine.rs":"9a5c6993dcceec8f82ead97a6d1840c7ed7dc6b326f7234c77f18848b6baf836","src/error.rs":"a45cfe02e6301f473c34678b694943c1a04308b8c292c6e0448bf495194c3b5e","src/key_bundle.rs":"ff8b10b95add934ecbc434b37ed089805886828ed159fd38bd692d1f01d06f7f","src/lib.rs":"eca1fa801820238141c4badeeec45d430aaec8b2ce088446ef94456d51014889","src/record_types.rs":"02bb3d352fb808131d298f9b90d9c95b7e9e0138b97c5401f3b9fdacc5562f44","src/server_timestamp.rs":"0020f31971ccbfc485894cabc3087459d42252b86d7de07f2136997864b0373b","src/telemetry.rs":"35e0313a052f16326e451e3d6e371337c1d71a471f32234ad9649fc1fa9f2237"},"package":null}

View File

@ -2,8 +2,9 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::bso::{IncomingBso, OutgoingBso};
use anyhow::Result;
use crate::bso::{IncomingBso, OutgoingBso};
use crate::Guid;
/// A BridgedEngine acts as a bridge between application-services, rust
@ -15,28 +16,25 @@ use crate::Guid;
/// implemented in Rust use a different shape (specifically, the
/// [SyncEngine](crate::SyncEngine) trait), so this BridgedEngine trait adapts
/// between the 2.
pub trait BridgedEngine {
/// The type returned for errors.
type Error;
pub trait BridgedEngine: Send + Sync {
/// Returns the last sync time, in milliseconds, for this engine's
/// collection. This is called before each sync, to determine the lower
/// bound for new records to fetch from the server.
fn last_sync(&self) -> Result<i64, Self::Error>;
fn last_sync(&self) -> Result<i64>;
/// Sets the last sync time, in milliseconds. This is called throughout
/// the sync, to fast-forward the stored last sync time to match the
/// timestamp on the uploaded records.
fn set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>;
fn set_last_sync(&self, last_sync_millis: i64) -> Result<()>;
/// Returns the sync ID for this engine's collection. This is only used in
/// tests.
fn sync_id(&self) -> Result<Option<String>, Self::Error>;
fn sync_id(&self) -> Result<Option<String>>;
/// Resets the sync ID for this engine's collection, returning the new ID.
/// As a side effect, implementations should reset all local Sync state,
/// as in `reset`.
fn reset_sync_id(&self) -> Result<String, Self::Error>;
fn reset_sync_id(&self) -> Result<String>;
/// Ensures that the locally stored sync ID for this engine's collection
/// matches the `new_sync_id` from the server. If the two don't match,
@ -45,48 +43,48 @@ pub trait BridgedEngine {
/// `new_sync_id`, or a different one if the engine wants to force other
/// devices to reset their Sync state for this collection the next time they
/// sync.
fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String, Self::Error>;
fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String>;
/// Tells the tabs engine about recent FxA devices. A bit of a leaky abstration as it only
/// makes sense for tabs.
/// The arg is a json serialized `ClientData` struct.
fn prepare_for_sync(&self, _client_data: &str) -> Result<(), Self::Error> {
fn prepare_for_sync(&self, _client_data: &str) -> Result<()> {
Ok(())
}
/// Indicates that the engine is about to start syncing. This is called
/// once per sync, and always before `store_incoming`.
fn sync_started(&self) -> Result<(), Self::Error>;
fn sync_started(&self) -> Result<()>;
/// Stages a batch of incoming Sync records. This is called multiple
/// times per sync, once for each batch. Implementations can use the
/// signal to check if the operation was aborted, and cancel any
/// pending work.
fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<(), Self::Error>;
fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<()>;
/// Applies all staged records, reconciling changes on both sides and
/// resolving conflicts. Returns a list of records to upload.
fn apply(&self) -> Result<ApplyResults, Self::Error>;
fn apply(&self) -> Result<ApplyResults>;
/// Indicates that the given record IDs were uploaded successfully to the
/// server. This is called multiple times per sync, once for each batch
/// upload.
fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<(), Self::Error>;
fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<()>;
/// Indicates that all records have been uploaded. At this point, any record
/// IDs marked for upload that haven't been passed to `set_uploaded`, can be
/// assumed to have failed: for example, because the server rejected a record
/// with an invalid TTL or sort index.
fn sync_finished(&self) -> Result<(), Self::Error>;
fn sync_finished(&self) -> Result<()>;
/// Resets all local Sync state, including any change flags, mirrors, and
/// the last sync time, such that the next sync is treated as a first sync
/// with all new local data. Does not erase any local user data.
fn reset(&self) -> Result<(), Self::Error>;
fn reset(&self) -> Result<()>;
/// Erases all local user data for this collection, and any Sync metadata.
/// This method is destructive, and unused for most collections.
fn wipe(&self) -> Result<(), Self::Error>;
fn wipe(&self) -> Result<()>;
}
// TODO: We should replace this with OutgoingChangeset to reduce the number

View File

@ -1 +1 @@
{"files":{"Cargo.toml":"bd0335ac0aea7342d8349b621f07d6270af430dd3b8d8cb84b4a04097f9aa0b9","README.md":"c48b8f391ef822c4f3971b5f453a1e7b43bea232752d520460d2f04803aead1a","build.rs":"33e61b811b19ed2b58e319cc65d5988bed258d2c4fea2d706301184c59847a0f","src/error.rs":"ac3d450f0ba6a855c37fa2dd829004b22dce5ad4416ebec66a3d7d6212bdbcd7","src/lib.rs":"7208f78955e015ef8bab7916307e551cd3c1bd56d7fe14f8b53cd53bc4b38555","src/schema.rs":"2b7b51f3c2edc0ca603495c10b917603fd9ac791c4a366080e40d090b13b91f2","src/storage.rs":"4ad235bda076a85660f7825aea518c0d1ae458e7eddf47b1cf1167ca5a52385f","src/store.rs":"ab0b6214b30b0f0fa7c6a89098ff3db1a8f76264f6711c4481c0be460afe522b","src/sync/bridge.rs":"705a24934c90494c2074288d03266504a866c4f04cb89c4e17b762cb498d621e","src/sync/engine.rs":"f2c5e4a2cb400ca33b649a078fc56e59a01a81a6e16e42b126e8b00d22b5eeea","src/sync/full_sync.rs":"47660644b3a66f3adff1130bb64e2ef40cb29749a40bc28d71246641d28ad080","src/sync/mod.rs":"81f98303b2a60aa6c6ed0d9777a95d6bbff46e57a494a0172d81c5819296802e","src/sync/record.rs":"896ebc6aa213bac9e6170c7e0b7dbee322f62e5f6c28462cb6da0bfe8ce938ba","src/tabs.udl":"a555fe11b5fa7ea9aefa7d7be31906a63b31cbc16b9b7f5ad952fd0e08ba5c61","uniffi.toml":"5156701368f0b5856e658143714a43058385c8ac53bee72d7a5a332b576dfb82"},"package":null}
{"files":{"Cargo.toml":"bd0335ac0aea7342d8349b621f07d6270af430dd3b8d8cb84b4a04097f9aa0b9","README.md":"c48b8f391ef822c4f3971b5f453a1e7b43bea232752d520460d2f04803aead1a","build.rs":"33e61b811b19ed2b58e319cc65d5988bed258d2c4fea2d706301184c59847a0f","src/error.rs":"ac3d450f0ba6a855c37fa2dd829004b22dce5ad4416ebec66a3d7d6212bdbcd7","src/lib.rs":"7208f78955e015ef8bab7916307e551cd3c1bd56d7fe14f8b53cd53bc4b38555","src/schema.rs":"2b7b51f3c2edc0ca603495c10b917603fd9ac791c4a366080e40d090b13b91f2","src/storage.rs":"4ad235bda076a85660f7825aea518c0d1ae458e7eddf47b1cf1167ca5a52385f","src/store.rs":"ab0b6214b30b0f0fa7c6a89098ff3db1a8f76264f6711c4481c0be460afe522b","src/sync/bridge.rs":"c8b6ccf6081499539d8a36a25ede1ce19d36a66a81b2e1abec58fa9c0a72856e","src/sync/engine.rs":"f2c5e4a2cb400ca33b649a078fc56e59a01a81a6e16e42b126e8b00d22b5eeea","src/sync/full_sync.rs":"47660644b3a66f3adff1130bb64e2ef40cb29749a40bc28d71246641d28ad080","src/sync/mod.rs":"81f98303b2a60aa6c6ed0d9777a95d6bbff46e57a494a0172d81c5819296802e","src/sync/record.rs":"896ebc6aa213bac9e6170c7e0b7dbee322f62e5f6c28462cb6da0bfe8ce938ba","src/tabs.udl":"a555fe11b5fa7ea9aefa7d7be31906a63b31cbc16b9b7f5ad952fd0e08ba5c61","uniffi.toml":"5156701368f0b5856e658143714a43058385c8ac53bee72d7a5a332b576dfb82"},"package":null}

View File

@ -2,12 +2,11 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use std::sync::{Arc, Mutex};
use crate::error::{ApiResult, TabsApiError};
use crate::error::TabsApiError;
use crate::sync::engine::TabsSyncImpl;
use crate::TabsStore;
use error_support::handle_error;
use anyhow::Result;
use std::sync::{Arc, Mutex};
use sync15::bso::{IncomingBso, OutgoingBso};
use sync15::engine::{ApplyResults, BridgedEngine, CollSyncIds, EngineSyncAssociation};
use sync15::{ClientData, ServerTimestamp};
@ -45,10 +44,7 @@ impl BridgedEngineImpl {
}
impl BridgedEngine for BridgedEngineImpl {
type Error = TabsApiError;
#[handle_error(crate::Error)]
fn last_sync(&self) -> ApiResult<i64> {
fn last_sync(&self) -> Result<i64> {
Ok(self
.sync_impl
.lock()
@ -58,8 +54,7 @@ impl BridgedEngine for BridgedEngineImpl {
.as_millis())
}
#[handle_error(crate::Error)]
fn set_last_sync(&self, last_sync_millis: i64) -> ApiResult<()> {
fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
self.sync_impl
.lock()
.unwrap()
@ -67,16 +62,14 @@ impl BridgedEngine for BridgedEngineImpl {
Ok(())
}
#[handle_error(crate::Error)]
fn sync_id(&self) -> ApiResult<Option<String>> {
fn sync_id(&self) -> Result<Option<String>> {
Ok(match self.sync_impl.lock().unwrap().get_sync_assoc()? {
EngineSyncAssociation::Connected(id) => Some(id.coll.to_string()),
EngineSyncAssociation::Disconnected => None,
})
}
#[handle_error(crate::Error)]
fn reset_sync_id(&self) -> ApiResult<String> {
fn reset_sync_id(&self) -> Result<String> {
let new_id = SyncGuid::random().to_string();
let new_coll_ids = CollSyncIds {
global: SyncGuid::empty(),
@ -89,8 +82,7 @@ impl BridgedEngine for BridgedEngineImpl {
Ok(new_id)
}
#[handle_error(crate::Error)]
fn ensure_current_sync_id(&self, sync_id: &str) -> ApiResult<String> {
fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
let mut sync_impl = self.sync_impl.lock().unwrap();
let assoc = sync_impl.get_sync_assoc()?;
if matches!(assoc, EngineSyncAssociation::Connected(c) if c.coll == sync_id) {
@ -102,29 +94,26 @@ impl BridgedEngine for BridgedEngineImpl {
};
sync_impl.reset(&EngineSyncAssociation::Connected(new_coll_ids))?;
}
Ok(sync_id.to_string()) // this is a bit odd, why the result?
Ok(sync_id.to_string())
}
#[handle_error(crate::Error)]
fn prepare_for_sync(&self, client_data: &str) -> ApiResult<()> {
fn prepare_for_sync(&self, client_data: &str) -> Result<()> {
let data: ClientData = serde_json::from_str(client_data)?;
self.sync_impl.lock().unwrap().prepare_for_sync(data)
Ok(self.sync_impl.lock().unwrap().prepare_for_sync(data)?)
}
fn sync_started(&self) -> ApiResult<()> {
fn sync_started(&self) -> Result<()> {
// This is a no-op for the Tabs Engine
Ok(())
}
#[handle_error(crate::Error)]
fn store_incoming(&self, incoming: Vec<IncomingBso>) -> ApiResult<()> {
fn store_incoming(&self, incoming: Vec<IncomingBso>) -> Result<()> {
// Store the incoming payload in memory so we can use it in apply
*(self.incoming.lock().unwrap()) = incoming;
Ok(())
}
#[handle_error(crate::Error)]
fn apply(&self) -> ApiResult<ApplyResults> {
fn apply(&self) -> Result<ApplyResults> {
let mut incoming = self.incoming.lock().unwrap();
// We've a reference to a Vec<> but it's owned by the mutex - swap the mutex owned
// value for an empty vec so we can consume the original.
@ -141,22 +130,20 @@ impl BridgedEngine for BridgedEngineImpl {
})
}
#[handle_error(crate::Error)]
fn set_uploaded(&self, server_modified_millis: i64, ids: &[SyncGuid]) -> ApiResult<()> {
self.sync_impl
fn set_uploaded(&self, server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> {
Ok(self
.sync_impl
.lock()
.unwrap()
.sync_finished(ServerTimestamp::from_millis(server_modified_millis), ids)
.sync_finished(ServerTimestamp::from_millis(server_modified_millis), ids)?)
}
#[handle_error(crate::Error)]
fn sync_finished(&self) -> ApiResult<()> {
fn sync_finished(&self) -> Result<()> {
*(self.incoming.lock().unwrap()) = Vec::default();
Ok(())
}
#[handle_error(crate::Error)]
fn reset(&self) -> ApiResult<()> {
fn reset(&self) -> Result<()> {
self.sync_impl
.lock()
.unwrap()
@ -164,8 +151,7 @@ impl BridgedEngine for BridgedEngineImpl {
Ok(())
}
#[handle_error(crate::Error)]
fn wipe(&self) -> ApiResult<()> {
fn wipe(&self) -> Result<()> {
self.sync_impl.lock().unwrap().wipe()?;
Ok(())
}
@ -181,37 +167,36 @@ impl TabsBridgedEngine {
Self { bridge_impl }
}
pub fn last_sync(&self) -> ApiResult<i64> {
pub fn last_sync(&self) -> Result<i64> {
self.bridge_impl.last_sync()
}
pub fn set_last_sync(&self, last_sync: i64) -> ApiResult<()> {
pub fn set_last_sync(&self, last_sync: i64) -> Result<()> {
self.bridge_impl.set_last_sync(last_sync)
}
pub fn sync_id(&self) -> ApiResult<Option<String>> {
pub fn sync_id(&self) -> Result<Option<String>> {
self.bridge_impl.sync_id()
}
pub fn reset_sync_id(&self) -> ApiResult<String> {
pub fn reset_sync_id(&self) -> Result<String> {
self.bridge_impl.reset_sync_id()
}
pub fn ensure_current_sync_id(&self, sync_id: &str) -> ApiResult<String> {
pub fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
self.bridge_impl.ensure_current_sync_id(sync_id)
}
pub fn prepare_for_sync(&self, client_data: &str) -> ApiResult<()> {
pub fn prepare_for_sync(&self, client_data: &str) -> Result<()> {
self.bridge_impl.prepare_for_sync(client_data)
}
pub fn sync_started(&self) -> ApiResult<()> {
pub fn sync_started(&self) -> Result<()> {
self.bridge_impl.sync_started()
}
// Decode the JSON-encoded IncomingBso's that UniFFI passes to us
#[handle_error(crate::Error)]
fn convert_incoming_bsos(&self, incoming: Vec<String>) -> ApiResult<Vec<IncomingBso>> {
fn convert_incoming_bsos(&self, incoming: Vec<String>) -> Result<Vec<IncomingBso>> {
let mut bsos = Vec::with_capacity(incoming.len());
for inc in incoming {
bsos.push(serde_json::from_str::<IncomingBso>(&inc)?);
@ -220,8 +205,7 @@ impl TabsBridgedEngine {
}
// Encode OutgoingBso's into JSON for UniFFI
#[handle_error(crate::Error)]
fn convert_outgoing_bsos(&self, outgoing: Vec<OutgoingBso>) -> ApiResult<Vec<String>> {
fn convert_outgoing_bsos(&self, outgoing: Vec<OutgoingBso>) -> Result<Vec<String>> {
let mut bsos = Vec::with_capacity(outgoing.len());
for e in outgoing {
bsos.push(serde_json::to_string(&e)?);
@ -229,34 +213,42 @@ impl TabsBridgedEngine {
Ok(bsos)
}
pub fn store_incoming(&self, incoming: Vec<String>) -> ApiResult<()> {
pub fn store_incoming(&self, incoming: Vec<String>) -> Result<()> {
self.bridge_impl
.store_incoming(self.convert_incoming_bsos(incoming)?)
}
pub fn apply(&self) -> ApiResult<Vec<String>> {
pub fn apply(&self) -> Result<Vec<String>> {
let apply_results = self.bridge_impl.apply()?;
self.convert_outgoing_bsos(apply_results.records)
}
pub fn set_uploaded(&self, server_modified_millis: i64, guids: Vec<SyncGuid>) -> ApiResult<()> {
pub fn set_uploaded(&self, server_modified_millis: i64, guids: Vec<SyncGuid>) -> Result<()> {
self.bridge_impl
.set_uploaded(server_modified_millis, &guids)
}
pub fn sync_finished(&self) -> ApiResult<()> {
pub fn sync_finished(&self) -> Result<()> {
self.bridge_impl.sync_finished()
}
pub fn reset(&self) -> ApiResult<()> {
pub fn reset(&self) -> Result<()> {
self.bridge_impl.reset()
}
pub fn wipe(&self) -> ApiResult<()> {
pub fn wipe(&self) -> Result<()> {
self.bridge_impl.wipe()
}
}
impl From<anyhow::Error> for TabsApiError {
fn from(value: anyhow::Error) -> Self {
TabsApiError::UnexpectedTabsError {
reason: value.to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1 +1 @@
{"files":{"Cargo.toml":"d9c464a00d5d3fc7ec33074a685fde01563c7b3c7a9af8f2806642d1f8f9b65c","README.md":"1fd617294339930ee1ad5172377648b268cce0216fc3971facbfe7c6839e9ab1","build.rs":"2bea192a782a5ebe3d3ec0ca6dae2d51844eb7ad163a38a2a62fbfe6dd3c34d8","sql/create_schema.sql":"a17311a407ec10e033886b7125da4c8b84bc6d761f6b28edc9594de430e1d964","sql/create_sync_temp_tables.sql":"860ede362c94feb47d85522553fa2852f9bdb9f9b025d6438dd5dee3d4acd527","sql/tests/create_schema_v1.sql":"77cf0c90eaac3e1aea626537147e1b8ec349b68d6076c92fa7ae402aac613050","src/api.rs":"f3e6f8065089df06ef4b8ce093727154f96afb7ea168b083d942a30266e7dbf8","src/db.rs":"72b2df354785278af7f87baa0ee4231df9abe2831c4b1413ac760f7efd80b519","src/error.rs":"29a0e2f62d7bebe71eb7c0e41fe14390b8e82c873a0f7a6c673c80b2b1b20409","src/ffi.rs":"670088d3a13a7349e751489197a3bb123990db69fccf8b815831e9bf5901afc6","src/lib.rs":"324300143818ad545f7e85f9bb5dba03ca45e9002e110d824a3639b5213d8763","src/migration.rs":"8d92f82b2ba38e1039fd054c8c75078a6b896a0d3cdc1a52571456b25a32c9c3","src/schema.rs":"cce3ed593809c3e47bbc050e6c2795cecd1b1ce7d6e39da633123e7a0614a213","src/store.rs":"8d69eda0f461106102cdf5754584d51963929f7d50dbdb8197d829f95337aa37","src/sync/bridge.rs":"ddd402d72c7a2e1d13497bd2c1a4767f24a38a10693bc8dc9115c1d68d615ccd","src/sync/incoming.rs":"dd77c64e2ade4f39cba258decab6d3db8ad0b5f513aa018efbd56b9869a021d9","src/sync/mod.rs":"bd1bc5c428dfda6aee7efe53b6e74b8015da5129a303638a21ca8d63516e4061","src/sync/outgoing.rs":"dacb77b956f2546fd60a89367927a199d9b662b17201d0781145f7405b61fdce","src/sync/sync_tests.rs":"f3846ca7e463315ba9788826613b987ddcff7b21672ff257a98769ee94f4191a"},"package":null}
{"files":{"Cargo.toml":"407fd109394698e912b776a2b316bed2f9def38287052d88a66f9f1ba2be53b4","README.md":"1fd617294339930ee1ad5172377648b268cce0216fc3971facbfe7c6839e9ab1","build.rs":"2bea192a782a5ebe3d3ec0ca6dae2d51844eb7ad163a38a2a62fbfe6dd3c34d8","sql/create_schema.sql":"a17311a407ec10e033886b7125da4c8b84bc6d761f6b28edc9594de430e1d964","sql/create_sync_temp_tables.sql":"860ede362c94feb47d85522553fa2852f9bdb9f9b025d6438dd5dee3d4acd527","sql/tests/create_schema_v1.sql":"77cf0c90eaac3e1aea626537147e1b8ec349b68d6076c92fa7ae402aac613050","src/api.rs":"f3e6f8065089df06ef4b8ce093727154f96afb7ea168b083d942a30266e7dbf8","src/db.rs":"931459b6b71f96e91d588d2f568a2165adb3b6bd47eb98cdd5ed6a3526ed0a62","src/error.rs":"7857c368d72872c09b1dbf47e8b364e8e4b6007ff0798f8cab4a44ebe997955e","src/ffi.rs":"670088d3a13a7349e751489197a3bb123990db69fccf8b815831e9bf5901afc6","src/lib.rs":"324300143818ad545f7e85f9bb5dba03ca45e9002e110d824a3639b5213d8763","src/migration.rs":"8d92f82b2ba38e1039fd054c8c75078a6b896a0d3cdc1a52571456b25a32c9c3","src/schema.rs":"cce3ed593809c3e47bbc050e6c2795cecd1b1ce7d6e39da633123e7a0614a213","src/store.rs":"94f29198cba0b44d050ffd208bc0254fe3036a30ec574d36c9bf26d8b34d20c9","src/sync/bridge.rs":"d4a2dfe6c567d5afe24319e29b0056d38d92f3507793c96e874814bb289bfa73","src/sync/incoming.rs":"dd77c64e2ade4f39cba258decab6d3db8ad0b5f513aa018efbd56b9869a021d9","src/sync/mod.rs":"bd1bc5c428dfda6aee7efe53b6e74b8015da5129a303638a21ca8d63516e4061","src/sync/outgoing.rs":"dacb77b956f2546fd60a89367927a199d9b662b17201d0781145f7405b61fdce","src/sync/sync_tests.rs":"f3846ca7e463315ba9788826613b987ddcff7b21672ff257a98769ee94f4191a"},"package":null}

View File

@ -18,9 +18,11 @@ readme = "README.md"
license = "MPL-2.0"
[dependencies]
anyhow = "1.0"
ffi-support = "0.4"
lazy_static = "1.4"
log = "0.4"
parking_lot = ">=0.11,<=0.12"
serde = "1"
serde_derive = "1"
serde_json = "1"

View File

@ -5,6 +5,7 @@
use crate::error::*;
use crate::schema;
use interrupt_support::{SqlInterruptHandle, SqlInterruptScope};
use parking_lot::Mutex;
use rusqlite::types::{FromSql, ToSql};
use rusqlite::Connection;
use rusqlite::OpenFlags;
@ -12,7 +13,6 @@ use sql_support::open_database::open_database_with_flags;
use sql_support::ConnExt;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::result;
use std::sync::Arc;
use url::Url;
@ -69,24 +69,18 @@ impl StorageDb {
/// Closes the database connection. If there are any unfinalized prepared
/// statements on the connection, `close` will fail and the `StorageDb` will
/// be returned to the caller so that it can retry, drop (via `mem::drop`)
// or leak (`mem::forget`) the connection.
///
/// Keep in mind that dropping the connection tries to close it again, and
/// panics on error.
pub fn close(self) -> result::Result<(), (StorageDb, Error)> {
let StorageDb {
writer,
interrupt_handle,
} = self;
writer.close().map_err(|(writer, err)| {
(
StorageDb {
writer,
interrupt_handle,
},
err.into(),
)
/// remain open and the connection will be leaked - we used to return the
/// underlying connection so the caller can retry but (a) that's very tricky
/// in an Arc<Mutex<>> world and (b) we never actually took advantage of
/// that retry capability.
pub fn close(self) -> Result<()> {
self.writer.close().map_err(|(writer, err)| {
// In rusqlite 0.28.0 and earlier, if we just let `writer` drop,
// the close would panic on failure.
// Later rusqlite versions will not panic, but this behavior doesn't
// hurt there.
std::mem::forget(writer);
err.into()
})
}
}
@ -105,6 +99,53 @@ impl DerefMut for StorageDb {
}
}
// We almost exclusively use this ThreadSafeStorageDb
pub struct ThreadSafeStorageDb {
db: Mutex<StorageDb>,
// This "outer" interrupt_handle not protected by the mutex means
// consumers can interrupt us when the mutex is held - which it always will
// be if we are doing anything interruptable!
interrupt_handle: Arc<SqlInterruptHandle>,
}
impl ThreadSafeStorageDb {
pub fn new(db: StorageDb) -> Self {
Self {
interrupt_handle: db.interrupt_handle(),
db: Mutex::new(db),
}
}
pub fn interrupt_handle(&self) -> Arc<SqlInterruptHandle> {
Arc::clone(&self.interrupt_handle)
}
pub fn begin_interrupt_scope(&self) -> Result<SqlInterruptScope> {
Ok(self.interrupt_handle.begin_interrupt_scope()?)
}
pub fn into_inner(self) -> StorageDb {
self.db.into_inner()
}
}
// Deref to a Mutex<StorageDb>, which is how we will use ThreadSafeStorageDb most of the time
impl Deref for ThreadSafeStorageDb {
type Target = Mutex<StorageDb>;
#[inline]
fn deref(&self) -> &Mutex<StorageDb> {
&self.db
}
}
// Also implement AsRef<SqlInterruptHandle> so that we can interrupt this at shutdown
impl AsRef<SqlInterruptHandle> for ThreadSafeStorageDb {
fn as_ref(&self) -> &SqlInterruptHandle {
&self.interrupt_handle
}
}
pub(crate) mod sql_fns {
use rusqlite::{functions::Context, Result};
use sync_guid::Guid as SyncGuid;
@ -227,6 +268,10 @@ pub mod test {
let counter = ATOMIC_COUNTER.fetch_add(1, Ordering::Relaxed);
StorageDb::new_memory(&format!("test-api-{}", counter)).expect("should get an API")
}
pub fn new_mem_thread_safe_storage_db() -> Arc<ThreadSafeStorageDb> {
Arc::new(ThreadSafeStorageDb::new(new_mem_db()))
}
}
#[cfg(test)]

View File

@ -49,8 +49,15 @@ pub enum ErrorKind {
#[error("Error opening database: {0}")]
OpenDatabaseError(#[from] sql_support::open_database::Error),
#[error("Sync Payload Error: {0}")]
IncomingPayloadError(#[from] sync15::Error),
// When trying to close a connection but we aren't the exclusive owner of the containing Arc<>
#[error("Other shared references to this connection are alive")]
OtherConnectionReferencesExist,
#[error("The storage database has been closed")]
DatabaseConnectionClosed,
#[error("Sync Error: {0}")]
SyncError(String),
}
error_support::define_error! {
@ -60,7 +67,6 @@ error_support::define_error! {
(IoError, std::io::Error),
(InterruptedError, Interrupted),
(Utf8Error, std::str::Utf8Error),
(IncomingPayloadError, sync15::Error),
(OpenDatabaseError, sql_support::open_database::Error),
}
}

View File

@ -3,12 +3,11 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::api::{self, StorageChanges};
use crate::db::StorageDb;
use crate::db::{StorageDb, ThreadSafeStorageDb};
use crate::error::*;
use crate::migration::{migrate, MigrationInfo};
use crate::sync;
use std::path::Path;
use std::result;
use std::sync::Arc;
use interrupt_support::SqlInterruptHandle;
@ -25,24 +24,30 @@ use serde_json::Value as JsonValue;
/// create its own database connection, using up extra memory and CPU cycles,
/// and causing write contention. For this reason, you should only call
/// `Store::new()` (or `webext_store_new()`, from the FFI) once.
///
/// Note that our Db implementation is behind an Arc<> because we share that
/// connection with our sync engines - ie, these engines also hold an Arc<>
/// around the same object.
pub struct Store {
db: StorageDb,
db: Arc<ThreadSafeStorageDb>,
}
impl Store {
/// Creates a store backed by a database at `db_path`. The path can be a
/// file path or `file:` URI.
pub fn new(db_path: impl AsRef<Path>) -> Result<Self> {
let db = StorageDb::new(db_path)?;
Ok(Self {
db: StorageDb::new(db_path)?,
db: Arc::new(ThreadSafeStorageDb::new(db)),
})
}
/// Creates a store backed by an in-memory database.
#[cfg(test)]
pub fn new_memory(db_path: &str) -> Result<Self> {
let db = StorageDb::new_memory(db_path)?;
Ok(Self {
db: StorageDb::new_memory(db_path)?,
db: Arc::new(ThreadSafeStorageDb::new(db)),
})
}
@ -54,7 +59,8 @@ impl Store {
/// Sets one or more JSON key-value pairs for an extension ID. Returns a
/// list of changes, with existing and new values for each key in `val`.
pub fn set(&self, ext_id: &str, val: JsonValue) -> Result<StorageChanges> {
let tx = self.db.unchecked_transaction()?;
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let result = api::set(&tx, ext_id, val)?;
tx.commit()?;
Ok(result)
@ -62,7 +68,8 @@ impl Store {
/// Returns information about per-extension usage
pub fn usage(&self) -> Result<Vec<crate::UsageInfo>> {
api::usage(&self.db)
let db = self.db.lock();
api::usage(&db)
}
/// Returns the values for one or more keys `keys` can be:
@ -83,7 +90,8 @@ impl Store {
/// `serde_json::Value::Object`).
pub fn get(&self, ext_id: &str, keys: JsonValue) -> Result<JsonValue> {
// Don't care about transactions here.
api::get(&self.db, ext_id, keys)
let db = self.db.lock();
api::get(&db, ext_id, keys)
}
/// Deletes the values for one or more keys. As with `get`, `keys` can be
@ -91,7 +99,8 @@ impl Store {
/// of changes, where each change contains the old value for each deleted
/// key.
pub fn remove(&self, ext_id: &str, keys: JsonValue) -> Result<StorageChanges> {
let tx = self.db.unchecked_transaction()?;
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let result = api::remove(&tx, ext_id, keys)?;
tx.commit()?;
Ok(result)
@ -101,7 +110,8 @@ impl Store {
/// a list of changes, where each change contains the old value for each
/// deleted key.
pub fn clear(&self, ext_id: &str) -> Result<StorageChanges> {
let tx = self.db.unchecked_transaction()?;
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let result = api::clear(&tx, ext_id)?;
tx.commit()?;
Ok(result)
@ -110,18 +120,45 @@ impl Store {
/// Returns the bytes in use for the specified items (which can be null,
/// a string, or an array)
pub fn get_bytes_in_use(&self, ext_id: &str, keys: JsonValue) -> Result<usize> {
api::get_bytes_in_use(&self.db, ext_id, keys)
let db = self.db.lock();
api::get_bytes_in_use(&db, ext_id, keys)
}
/// Returns a bridged sync engine for Desktop for this store.
pub fn bridged_engine(&self) -> sync::BridgedEngine<'_> {
pub fn bridged_engine(&self) -> sync::BridgedEngine {
sync::BridgedEngine::new(&self.db)
}
/// Closes the store and its database connection. See the docs for
/// `StorageDb::close` for more details on when this can fail.
pub fn close(self) -> result::Result<(), (Store, Error)> {
self.db.close().map_err(|(db, err)| (Store { db }, err))
pub fn close(self) -> Result<()> {
// Even though this consumes `self`, the fact we use an Arc<> means
// we can't guarantee we can actually consume the inner DB - so do
// the best we can.
let shared: ThreadSafeStorageDb = match Arc::try_unwrap(self.db) {
Ok(shared) => shared,
_ => {
// The only way this is possible is if the sync engine has an operation
// running - but that shouldn't be possible in practice because desktop
// uses a single "task queue" such that the close operation can't possibly
// be running concurrently with any sync or storage tasks.
// If this *could* get hit, rusqlite will attempt to close the DB connection
// as it is dropped, and if that close fails, then rusqlite 0.28.0 and earlier
// would panic - but even that only happens if prepared statements are
// not finalized, which ruqlite also does.
// tl;dr - this should be impossible. If it was possible, rusqlite might panic,
// but we've never seen it panic in practice other places we don't close
// connections, and the next rusqlite version will not panic anyway.
// So this-is-fine.jpg
log::warn!("Attempting to close a store while other DB references exist.");
return Err(ErrorKind::OtherConnectionReferencesExist.into());
}
};
// consume the mutex and get back the inner.
let db = shared.into_inner();
db.close()
}
/// Gets the changes which the current sync applied. Should be used
@ -130,7 +167,8 @@ impl Store {
/// that were applied.
/// The result is a Vec of already JSON stringified changes.
pub fn get_synced_changes(&self) -> Result<Vec<sync::SyncedExtensionChange>> {
sync::get_synced_changes(&self.db)
let db = self.db.lock();
sync::get_synced_changes(&db)
}
/// Migrates data from a database in the format of the "old" kinto
@ -139,11 +177,12 @@ impl Store {
///
/// Note that `filename` isn't normalized or canonicalized.
pub fn migrate(&self, filename: impl AsRef<Path>) -> Result<()> {
let tx = self.db.unchecked_transaction()?;
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let result = migrate(&tx, filename.as_ref())?;
tx.commit()?;
// Failing to store this information should not cause migration failure.
if let Err(e) = result.store(&self.db) {
if let Err(e) = result.store(&db) {
debug_assert!(false, "Migration error: {:?}", e);
log::warn!("Failed to record migration telmetry: {}", e);
}
@ -153,7 +192,8 @@ impl Store {
/// Read-and-delete (e.g. `take` in rust parlance, see Option::take)
/// operation for any MigrationInfo stored in this database.
pub fn take_migration_info(&self) -> Result<Option<MigrationInfo>> {
let tx = self.db.unchecked_transaction()?;
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let result = MigrationInfo::take(&tx)?;
tx.commit()?;
Ok(result)
@ -172,7 +212,7 @@ pub mod test {
pub fn new_mem_store() -> Store {
Store {
db: crate::db::test::new_mem_db(),
db: Arc::new(ThreadSafeStorageDb::new(crate::db::test::new_mem_db())),
}
}
}

View File

@ -2,13 +2,14 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use anyhow::Result;
use rusqlite::Transaction;
use std::sync::{Arc, Weak};
use sync15::bso::IncomingBso;
use sync15::engine::ApplyResults;
use sync_guid::Guid as SyncGuid;
use crate::db::{delete_meta, get_meta, put_meta, StorageDb};
use crate::error::{Error, Result};
use crate::db::{delete_meta, get_meta, put_meta, ThreadSafeStorageDb};
use crate::schema;
use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming};
use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing};
@ -20,14 +21,21 @@ const SYNC_ID_META_KEY: &str = "sync_id";
/// `storage.sync` store work with Desktop's Sync implementation.
/// Conceptually, it's similar to `sync15::Store`, which we
/// should eventually rename and unify with this trait (#2841).
pub struct BridgedEngine<'a> {
db: &'a StorageDb,
///
/// Unlike most of our other implementation which hold a strong reference
/// to the store, this engine keeps a weak reference in an attempt to keep
/// the desktop semantics as close as possible to what they were when the
/// engines all took lifetime params to ensure they don't outlive the store.
pub struct BridgedEngine {
db: Weak<ThreadSafeStorageDb>,
}
impl<'a> BridgedEngine<'a> {
impl BridgedEngine {
/// Creates a bridged engine for syncing.
pub fn new(db: &'a StorageDb) -> Self {
BridgedEngine { db }
pub fn new(db: &Arc<ThreadSafeStorageDb>) -> Self {
BridgedEngine {
db: Arc::downgrade(db),
}
}
fn do_reset(&self, tx: &Transaction<'_>) -> Result<()> {
@ -38,42 +46,56 @@ impl<'a> BridgedEngine<'a> {
delete_meta(tx, LAST_SYNC_META_KEY)?;
Ok(())
}
fn thread_safe_storage_db(&self) -> Result<Arc<ThreadSafeStorageDb>> {
self.db
.upgrade()
.ok_or_else(|| crate::error::ErrorKind::DatabaseConnectionClosed.into())
}
}
impl<'a> sync15::engine::BridgedEngine for BridgedEngine<'a> {
type Error = Error;
impl sync15::engine::BridgedEngine for BridgedEngine {
fn last_sync(&self) -> Result<i64> {
Ok(get_meta(self.db, LAST_SYNC_META_KEY)?.unwrap_or(0))
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
Ok(get_meta(&db, LAST_SYNC_META_KEY)?.unwrap_or(0))
}
fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
put_meta(self.db, LAST_SYNC_META_KEY, &last_sync_millis)?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
put_meta(&db, LAST_SYNC_META_KEY, &last_sync_millis)?;
Ok(())
}
fn sync_id(&self) -> Result<Option<String>> {
get_meta(self.db, SYNC_ID_META_KEY)
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
Ok(get_meta(&db, SYNC_ID_META_KEY)?)
}
fn reset_sync_id(&self) -> Result<String> {
let tx = self.db.unchecked_transaction()?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let tx = db.unchecked_transaction()?;
let new_id = SyncGuid::random().to_string();
self.do_reset(&tx)?;
put_meta(self.db, SYNC_ID_META_KEY, &new_id)?;
put_meta(&tx, SYNC_ID_META_KEY, &new_id)?;
tx.commit()?;
Ok(new_id)
}
fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
let current: Option<String> = get_meta(self.db, SYNC_ID_META_KEY)?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let current: Option<String> = get_meta(&db, SYNC_ID_META_KEY)?;
Ok(match current {
Some(current) if current == sync_id => current,
_ => {
let tx = self.db.unchecked_transaction()?;
let tx = db.unchecked_transaction()?;
self.do_reset(&tx)?;
let result = sync_id.to_string();
put_meta(self.db, SYNC_ID_META_KEY, &result)?;
put_meta(&tx, SYNC_ID_META_KEY, &result)?;
tx.commit()?;
result
}
@ -81,13 +103,17 @@ impl<'a> sync15::engine::BridgedEngine for BridgedEngine<'a> {
}
fn sync_started(&self) -> Result<()> {
schema::create_empty_sync_temp_tables(self.db)?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
schema::create_empty_sync_temp_tables(&db)?;
Ok(())
}
fn store_incoming(&self, incoming_bsos: Vec<IncomingBso>) -> Result<()> {
let signal = self.db.begin_interrupt_scope()?;
let tx = self.db.unchecked_transaction()?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let signal = db.begin_interrupt_scope()?;
let tx = db.unchecked_transaction()?;
let incoming_content: Vec<_> = incoming_bsos
.into_iter()
.map(IncomingBso::into_content::<super::WebextRecord>)
@ -98,9 +124,11 @@ impl<'a> sync15::engine::BridgedEngine for BridgedEngine<'a> {
}
fn apply(&self) -> Result<ApplyResults> {
let signal = self.db.begin_interrupt_scope()?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let signal = db.begin_interrupt_scope()?;
let tx = self.db.unchecked_transaction()?;
let tx = db.unchecked_transaction()?;
let incoming = get_incoming(&tx)?;
let actions = incoming
.into_iter()
@ -110,12 +138,14 @@ impl<'a> sync15::engine::BridgedEngine for BridgedEngine<'a> {
stage_outgoing(&tx)?;
tx.commit()?;
Ok(get_outgoing(self.db, &signal)?.into())
Ok(get_outgoing(&db, &signal)?.into())
}
fn set_uploaded(&self, _server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> {
let signal = self.db.begin_interrupt_scope()?;
let tx = self.db.unchecked_transaction()?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let signal = db.begin_interrupt_scope()?;
let tx = db.unchecked_transaction()?;
record_uploaded(&tx, ids, &signal)?;
tx.commit()?;
@ -123,12 +153,16 @@ impl<'a> sync15::engine::BridgedEngine for BridgedEngine<'a> {
}
fn sync_finished(&self) -> Result<()> {
schema::create_empty_sync_temp_tables(self.db)?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
schema::create_empty_sync_temp_tables(&db)?;
Ok(())
}
fn reset(&self) -> Result<()> {
let tx = self.db.unchecked_transaction()?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let tx = db.unchecked_transaction()?;
self.do_reset(&tx)?;
delete_meta(&tx, SYNC_ID_META_KEY)?;
tx.commit()?;
@ -136,7 +170,9 @@ impl<'a> sync15::engine::BridgedEngine for BridgedEngine<'a> {
}
fn wipe(&self) -> Result<()> {
let tx = self.db.unchecked_transaction()?;
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let tx = db.unchecked_transaction()?;
// We assume the meta table is only used by sync.
tx.execute_batch(
"DELETE FROM storage_sync_data; DELETE FROM storage_sync_mirror; DELETE FROM meta;",
@ -146,10 +182,17 @@ impl<'a> sync15::engine::BridgedEngine for BridgedEngine<'a> {
}
}
impl From<anyhow::Error> for crate::error::Error {
fn from(value: anyhow::Error) -> Self {
crate::error::ErrorKind::SyncError(value.to_string()).into()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::test::new_mem_db;
use crate::db::test::new_mem_thread_safe_storage_db;
use crate::db::StorageDb;
use sync15::engine::BridgedEngine;
fn query_count(conn: &StorageDb, table: &str) -> u32 {
@ -160,94 +203,115 @@ mod tests {
}
// Sets up mock data for the tests here.
fn setup_mock_data(engine: &super::BridgedEngine<'_>) -> Result<()> {
engine.db.execute(
"INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
VALUES ('ext-a', 'invalid-json', 2)",
[],
)?;
engine.db.execute(
"INSERT INTO storage_sync_mirror (guid, ext_id, data)
VALUES ('guid', 'ext-a', '3')",
[],
)?;
fn setup_mock_data(engine: &super::BridgedEngine) -> Result<()> {
{
let shared = engine.thread_safe_storage_db()?;
let db = shared.lock();
db.execute(
"INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
VALUES ('ext-a', 'invalid-json', 2)",
[],
)?;
db.execute(
"INSERT INTO storage_sync_mirror (guid, ext_id, data)
VALUES ('guid', 'ext-a', '3')",
[],
)?;
}
engine.set_last_sync(1)?;
let shared = engine.thread_safe_storage_db()?;
let db = shared.lock();
// and assert we wrote what we think we did.
assert_eq!(query_count(engine.db, "storage_sync_data"), 1);
assert_eq!(query_count(engine.db, "storage_sync_mirror"), 1);
assert_eq!(query_count(engine.db, "meta"), 1);
assert_eq!(query_count(&db, "storage_sync_data"), 1);
assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
assert_eq!(query_count(&db, "meta"), 1);
Ok(())
}
// Assuming a DB setup with setup_mock_data, assert it was correctly reset.
fn assert_reset(engine: &super::BridgedEngine<'_>) -> Result<()> {
fn assert_reset(engine: &super::BridgedEngine) -> Result<()> {
// A reset never wipes data...
assert_eq!(query_count(engine.db, "storage_sync_data"), 1);
let shared = engine.thread_safe_storage_db()?;
let db = shared.lock();
assert_eq!(query_count(&db, "storage_sync_data"), 1);
// But did reset the change counter.
let cc = engine.db.query_row_and_then(
let cc = db.query_row_and_then(
"SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
[],
|row| row.get::<_, u32>(0),
)?;
assert_eq!(cc, 1);
// But did wipe the mirror...
assert_eq!(query_count(engine.db, "storage_sync_mirror"), 0);
assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
// And the last_sync should have been wiped.
assert!(get_meta::<i64>(engine.db, LAST_SYNC_META_KEY)?.is_none());
assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_none());
Ok(())
}
// Assuming a DB setup with setup_mock_data, assert it has not been reset.
fn assert_not_reset(engine: &super::BridgedEngine<'_>) -> Result<()> {
assert_eq!(query_count(engine.db, "storage_sync_data"), 1);
let cc = engine.db.query_row_and_then(
fn assert_not_reset(engine: &super::BridgedEngine) -> Result<()> {
let shared = engine.thread_safe_storage_db()?;
let db = shared.lock();
assert_eq!(query_count(&db, "storage_sync_data"), 1);
let cc = db.query_row_and_then(
"SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
[],
|row| row.get::<_, u32>(0),
)?;
assert_eq!(cc, 2);
assert_eq!(query_count(engine.db, "storage_sync_mirror"), 1);
assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
// And the last_sync should remain.
assert!(get_meta::<i64>(engine.db, LAST_SYNC_META_KEY)?.is_some());
assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_some());
Ok(())
}
#[test]
fn test_wipe() -> Result<()> {
let db = new_mem_db();
let engine = super::BridgedEngine::new(&db);
let strong = new_mem_thread_safe_storage_db();
let engine = super::BridgedEngine::new(&strong);
setup_mock_data(&engine)?;
engine.wipe()?;
assert_eq!(query_count(engine.db, "storage_sync_data"), 0);
assert_eq!(query_count(engine.db, "storage_sync_mirror"), 0);
assert_eq!(query_count(engine.db, "meta"), 0);
let shared = engine.thread_safe_storage_db()?;
let db = shared.lock();
assert_eq!(query_count(&db, "storage_sync_data"), 0);
assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
assert_eq!(query_count(&db, "meta"), 0);
Ok(())
}
#[test]
fn test_reset() -> Result<()> {
let db = new_mem_db();
let engine = super::BridgedEngine::new(&db);
let strong = new_mem_thread_safe_storage_db();
let engine = super::BridgedEngine::new(&strong);
setup_mock_data(&engine)?;
put_meta(engine.db, SYNC_ID_META_KEY, &"sync-id".to_string())?;
put_meta(
&engine.thread_safe_storage_db()?.lock(),
SYNC_ID_META_KEY,
&"sync-id".to_string(),
)?;
engine.reset()?;
assert_reset(&engine)?;
// Only an explicit reset kills the sync-id, so check that here.
assert_eq!(get_meta::<String>(engine.db, SYNC_ID_META_KEY)?, None);
assert_eq!(
get_meta::<String>(&engine.thread_safe_storage_db()?.lock(), SYNC_ID_META_KEY)?,
None
);
Ok(())
}
#[test]
fn test_ensure_missing_sync_id() -> Result<()> {
let db = new_mem_db();
let engine = super::BridgedEngine::new(&db);
let strong = new_mem_thread_safe_storage_db();
let engine = super::BridgedEngine::new(&strong);
setup_mock_data(&engine)?;
@ -261,12 +325,16 @@ mod tests {
#[test]
fn test_ensure_new_sync_id() -> Result<()> {
let db = new_mem_db();
let engine = super::BridgedEngine::new(&db);
let strong = new_mem_thread_safe_storage_db();
let engine = super::BridgedEngine::new(&strong);
setup_mock_data(&engine)?;
put_meta(engine.db, SYNC_ID_META_KEY, &"old-id".to_string())?;
put_meta(
&engine.thread_safe_storage_db()?.lock(),
SYNC_ID_META_KEY,
&"old-id".to_string(),
)?;
assert_not_reset(&engine)?;
assert_eq!(engine.sync_id()?, Some("old-id".to_string()));
@ -280,13 +348,17 @@ mod tests {
#[test]
fn test_ensure_same_sync_id() -> Result<()> {
let db = new_mem_db();
let engine = super::BridgedEngine::new(&db);
let strong = new_mem_thread_safe_storage_db();
let engine = super::BridgedEngine::new(&strong);
setup_mock_data(&engine)?;
assert_not_reset(&engine)?;
put_meta(engine.db, SYNC_ID_META_KEY, &"sync-id".to_string())?;
put_meta(
&engine.thread_safe_storage_db()?.lock(),
SYNC_ID_META_KEY,
&"sync-id".to_string(),
)?;
engine.ensure_current_sync_id("sync-id")?;
// should not have reset.
@ -296,11 +368,15 @@ mod tests {
#[test]
fn test_reset_sync_id() -> Result<()> {
let db = new_mem_db();
let engine = super::BridgedEngine::new(&db);
let strong = new_mem_thread_safe_storage_db();
let engine = super::BridgedEngine::new(&strong);
setup_mock_data(&engine)?;
put_meta(engine.db, SYNC_ID_META_KEY, &"sync-id".to_string())?;
put_meta(
&engine.thread_safe_storage_db()?.lock(),
SYNC_ID_META_KEY,
&"sync-id".to_string(),
)?;
assert_eq!(engine.sync_id()?, Some("sync-id".to_string()));
let new_id = engine.reset_sync_id()?;