Bug 1636256 - abort importJSONDump tasks in the remote settings worker at shutdown, r=leplatrem,asuth

To use a single transaction for `importJSONDump`, this commit:

- changes IDBHelpers' `executeIDB` method to take either a string or array
  pointing to `objectStore`s that the caller wants to use.
- uses that from RemoteSettingsWorker to start a single transaction using both
  the `records` and the `timestamps` store
- updates `bulkOperationHelper` to take an optional `completion` callback, in
  addition to the rejection callback, to be called when all the bulk
  operations are complete
- uses that optional argument from RemoteSettingsWorker's `importDumpIDB`
  (the actual implementation of IDB access from `importJSONDump`) to first
  bulk-import the actual records, and then update the timestamp stored for
  that remote settings collection.

Then to abort that single transaction, this commit:
- stores pending transactions in a set, similar to what Database.jsm already
  does, and removes items from that set when the `promise` from `executeIDB`
  either resolves or rejects.
- adds a `prepareShutdown` action on the RemoteSettingsWorker's `Agent` class,
  to be called by the jsm side of the worker manager when shutdown happens.
  When called, it iterates over the pending transactions and aborts all of
  them.
  This also sets a `gShutdown` flag.
- ensures that where code `await`s in the middle of an operation, it stops
  (throws) immediately if `gShutdown` has been set.
- adds a test to test_shutdown_handling.js to verify that this mechanism now
  stops pending import tasks in the worker.


Finally, as a driveby, fixes an oversight in test_remote_settings_worker.js
where the second `.get()` call wasn't actually testing whether the
`importJSONDump` call in the worker had succeeded, because if the collection
was empty it would do the import itself, which I realized when I used similar
code in the shutdown test...

Differential Revision: https://phabricator.services.mozilla.com/D74315
This commit is contained in:
Gijs Kruitbosch 2020-05-11 12:53:23 +00:00
parent b8917ab3a4
commit b46afee956
6 changed files with 157 additions and 60 deletions

View File

@ -82,7 +82,9 @@ class Database {
(store, rejectTransaction) => {
IDBHelpers.bulkOperationHelper(
store,
rejectTransaction,
{
reject: rejectTransaction,
},
"put",
toInsert.map(item => {
return Object.assign({ _cid }, item);
@ -104,7 +106,9 @@ class Database {
(store, rejectTransaction) => {
IDBHelpers.bulkOperationHelper(
store,
rejectTransaction,
{
reject: rejectTransaction,
},
"delete",
toDelete.map(item => {
return [_cid, item.id];

View File

@ -41,7 +41,13 @@ class ShutdownError extends IndexedDBError {
// machine and 10-15ms on a slow machine.
// Every chunk waits for success before starting the next, and
// the final chunk's completion will fire transaction.oncomplete .
function bulkOperationHelper(store, reject, operation, list, listIndex = 0) {
function bulkOperationHelper(
store,
{ reject, completion },
operation,
list,
listIndex = 0
) {
try {
const CHUNK_LENGTH = 250;
const max = Math.min(listIndex + CHUNK_LENGTH, list.length);
@ -54,11 +60,13 @@ function bulkOperationHelper(store, reject, operation, list, listIndex = 0) {
request.onsuccess = bulkOperationHelper.bind(
null,
store,
reject,
{ reject, completion },
operation,
list,
listIndex
);
} else if (completion) {
completion();
}
// otherwise, we're done, and the transaction will complete on its own.
} catch (e) {
@ -77,25 +85,28 @@ function bulkOperationHelper(store, reject, operation, list, listIndex = 0) {
* Helper to wrap some IDBObjectStore operations into a promise.
*
* @param {IDBDatabase} db
* @param {String} storeName
* @param {String|String[]} storeNames - either a string or an array of strings.
* @param {String} mode
* @param {function} callback
* @param {String} description of the operation for error handling purposes.
*/
function executeIDB(db, storeName, mode, callback, desc) {
const transaction = db.transaction([storeName], mode);
function executeIDB(db, storeNames, mode, callback, desc) {
if (!Array.isArray(storeNames)) {
storeNames = [storeNames];
}
const transaction = db.transaction(storeNames, mode);
let promise = new Promise((resolve, reject) => {
const store = transaction.objectStore(storeName);
let stores = storeNames.map(name => transaction.objectStore(name));
let result;
let rejectWrapper = e => {
reject(new IndexedDBError(e, desc || "execute()", storeName));
reject(new IndexedDBError(e, desc || "execute()", storeNames.join(", ")));
try {
transaction.abort();
} catch (ex) {
Cu.reportError(ex);
}
};
// Add all the handlers before using the store.
// Add all the handlers before using the stores.
transaction.onerror = event =>
reject(new IndexedDBError(event.target.error, desc || "execute()"));
transaction.onabort = event =>
@ -106,6 +117,10 @@ function executeIDB(db, storeName, mode, callback, desc) {
)
);
transaction.oncomplete = event => resolve(result);
// Simplify access to a single datastore:
if (stores.length == 1) {
stores = stores[0];
}
try {
// Although this looks sync, once the callback places requests
// on the datastore, it can independently keep the transaction alive and
@ -116,7 +131,7 @@ function executeIDB(db, storeName, mode, callback, desc) {
// In theory, exceptions thrown from onsuccess handlers should also
// cause IndexedDB to abort the transaction, so this is a belt-and-braces
// approach.
result = callback(store, rejectWrapper);
result = callback(stores, rejectWrapper);
} catch (e) {
rejectWrapper(e);
}

View File

@ -20,6 +20,8 @@ importScripts(
const IDB_RECORDS_STORE = "records";
const IDB_TIMESTAMPS_STORE = "timestamps";
let gShutdown = false;
const Agent = {
/**
* Return the canonical JSON serialization of the specified records.
@ -64,6 +66,9 @@ const Agent = {
*/
async importJSONDump(bucket, collection) {
const { data: records } = await loadJSONDump(bucket, collection);
if (gShutdown) {
throw new Error("Can't import when we've started shutting down.");
}
await importDumpIDB(bucket, collection, records);
return records.length;
},
@ -107,6 +112,25 @@ const Agent = {
const hashStr = Array.from(hashBytes, toHex).join("");
return hashStr == hash;
},
async prepareShutdown() {
gShutdown = true;
// Ensure we can iterate and abort (which may delete items) by cloning
// the list.
let transactions = Array.from(gPendingTransactions);
for (let transaction of transactions) {
try {
transaction.abort();
} catch (ex) {
// We can hit this case if the transaction has finished but
// we haven't heard about it yet.
}
}
},
_test_only_import(bucket, collection, records) {
return importDumpIDB(bucket, collection, records);
},
};
/**
@ -140,10 +164,15 @@ async function loadJSONDump(bucket, collection) {
// Return empty dataset if file is missing.
return { data: [] };
}
if (gShutdown) {
throw new Error("Can't import when we've started shutting down.");
}
// Will throw if JSON is invalid.
return response.json();
}
let gPendingTransactions = new Set();
/**
* Import the records into the Remote Settings Chrome IndexedDB.
*
@ -158,30 +187,47 @@ async function importDumpIDB(bucket, collection, records) {
// we already tried to read the timestamp in `remote-settings.js`
const db = await IDBHelpers.openIDB(false /* do not allow upgrades */);
// Each entry of the dump will be stored in the records store.
// They are indexed by `_cid`.
const cid = bucket + "/" + collection;
// We can just modify the items in-place, as we got them from loadJSONDump.
records.forEach(item => {
item._cid = cid;
});
await IDBHelpers.executeIDB(
db,
IDB_RECORDS_STORE,
"readwrite",
(store, rejectTransaction) => {
IDBHelpers.bulkOperationHelper(store, rejectTransaction, "put", records);
// try...finally to ensure we always close the db.
try {
if (gShutdown) {
throw new Error("Can't import when we've started shutting down.");
}
).promise;
// Store the highest timestamp as the collection timestamp (or zero if dump is empty).
const timestamp =
records.length === 0
? 0
: Math.max(...records.map(record => record.last_modified));
await IDBHelpers.executeIDB(db, IDB_TIMESTAMPS_STORE, "readwrite", store =>
store.put({ cid, value: timestamp })
).promise;
// Close now that we're done.
db.close();
// Each entry of the dump will be stored in the records store.
// They are indexed by `_cid`.
const cid = bucket + "/" + collection;
// We can just modify the items in-place, as we got them from loadJSONDump.
records.forEach(item => {
item._cid = cid;
});
// Store the highest timestamp as the collection timestamp (or zero if dump is empty).
const timestamp =
records.length === 0
? 0
: Math.max(...records.map(record => record.last_modified));
let { transaction, promise } = IDBHelpers.executeIDB(
db,
[IDB_RECORDS_STORE, IDB_TIMESTAMPS_STORE],
"readwrite",
([recordsStore, timestampStore], rejectTransaction) => {
IDBHelpers.bulkOperationHelper(
recordsStore,
{
reject: rejectTransaction,
completion() {
timestampStore.put({ cid, value: timestamp });
},
},
"put",
records
);
}
);
gPendingTransactions.add(transaction);
promise = promise.finally(() => gPendingTransactions.delete(transaction));
await promise;
} finally {
// Close now that we're done.
db.close();
}
}

View File

@ -56,11 +56,16 @@ class Worker {
}
async _execute(method, args = [], options = {}) {
if (gShutdown) {
// Check if we're shutting down.
if (gShutdown && method != "prepareShutdown") {
throw new RemoteSettingsWorkerError("Remote Settings has shut down.");
}
const { mustComplete = false } = options;
// Don't instantiate the worker to shut it down.
if (method == "prepareShutdown" && !this.worker) {
return null;
}
const { mustComplete = false } = options;
// (Re)instantiate the worker if it was terminated.
if (!this.worker) {
this.worker = new ChromeWorker(this.source);
@ -93,6 +98,11 @@ class Worker {
_onWorkerMessage(event) {
const { callbackId, result, error } = event.data;
// If we're shutting down, we may have already rejected this operation
// and removed its callback from our map:
if (!this.callbacks.has(callbackId)) {
return;
}
const { resolve, reject } = this.callbacks.get(callbackId);
if (error) {
reject(new RemoteSettingsWorkerError(error));
@ -140,6 +150,8 @@ class Worker {
}
// If there was something left, we'll stop as soon as we get messages from
// those tasks, too.
// Let's hurry them along a bit:
this._execute("prepareShutdown");
}
stop() {

View File

@ -53,8 +53,16 @@ add_task(async function test_import_json_dump_into_idb() {
await RemoteSettingsWorker.importJSONDump("main", "language-dictionaries");
const after = await client.get();
const after = await client.get({ syncIfEmpty: false });
Assert.ok(after.length > 0);
let lastModifiedStamp = await client.getLastModified();
Assert.equal(
lastModifiedStamp,
Math.max(...after.map(record => record.last_modified)),
"Should have correct last modified timestamp"
);
// Force a DB close for shutdown so we can delete the DB later.
Database._shutdownHandler();
});

View File

@ -3,9 +3,12 @@ http://creativecommons.org/publicdomain/zero/1.0/ */
"use strict";
const { AppConstants } = ChromeUtils.import(
"resource://gre/modules/AppConstants.jsm"
const { Services } = ChromeUtils.import("resource://gre/modules/Services.jsm");
const { TestUtils } = ChromeUtils.import(
"resource://testing-common/TestUtils.jsm"
);
const { Database } = ChromeUtils.import(
"resource://services-settings/Database.jsm"
);
@ -96,36 +99,45 @@ add_task(async function test_shutdown_immediate_abort() {
});
add_task(async function test_shutdown_worker() {
// Fallback for android:
let importPromise = Promise.resolve();
let client;
// Android has no dumps, so only run the import if we do have dumps:
if (AppConstants.platform != "android") {
client = new RemoteSettingsClient("language-dictionaries", {
bucketNamePref: "services.settings.default_bucket",
});
const before = await client.get({ syncIfEmpty: false });
Assert.equal(before.length, 0);
let client = new RemoteSettingsClient("language-dictionaries", {
bucketNamePref: "services.settings.default_bucket",
});
const before = await client.get({ syncIfEmpty: false });
Assert.equal(before.length, 0);
importPromise = RemoteSettingsWorker.importJSONDump(
"main",
"language-dictionaries"
);
}
let records = [{}];
let importPromise = RemoteSettingsWorker._execute(
"_test_only_import",
["main", "language-dictionaries", records],
{ mustComplete: true }
);
let stringifyPromise = RemoteSettingsWorker.canonicalStringify(
[],
[],
Date.now()
);
// Change the idle time so we shut the worker down even though we can't
// set gShutdown from outside of the worker management code.
Services.prefs.setIntPref(
"services.settings.worker_idle_max_milliseconds",
1
);
RemoteSettingsWorker._abortCancelableRequests();
await Assert.rejects(
stringifyPromise,
/Shutdown/,
"Should have aborted the stringify request at shutdown."
);
await importPromise.catch(e => ok(false, "importing failed!"));
if (client) {
const after = await client.get();
Assert.ok(after.length > 0);
}
await Assert.rejects(
importPromise,
/shutting down/,
"Ensure imports get aborted during shutdown"
);
const after = await client.get({ syncIfEmpty: false });
Assert.equal(after.length, 0);
await TestUtils.waitForCondition(() => !RemoteSettingsWorker.worker);
Assert.ok(
!RemoteSettingsWorker.worker,
"Worker should have been terminated."
);
});