diff --git a/toolkit/modules/Sqlite.jsm b/toolkit/modules/Sqlite.jsm index 09d645b1b078..760ca1cd2a99 100644 --- a/toolkit/modules/Sqlite.jsm +++ b/toolkit/modules/Sqlite.jsm @@ -219,6 +219,9 @@ function ConnectionData(connection, identifier, options={}) { // Increments for each executed statement for the life of the connection. this._statementCounter = 0; + // Increments whenever we request a unique operation id. + this._operationsCounter = 0; + this._hasInProgressTransaction = false; // Manages a chain of transactions promises, so that new transactions // always happen in queue to the previous ones. It never rejects. @@ -237,6 +240,10 @@ function ConnectionData(connection, identifier, options={}) { this._deferredClose = PromiseUtils.defer(); this._closeRequested = false; + // An AsyncShutdown barrier used to make sure that we wait until clients + // are done before shutting down the connection. + this._barrier = new AsyncShutdown.Barrier(`${this._identifier}: waiting for clients`); + Barriers.connections.client.addBlocker( this._identifier + ": waiting for shutdown", this._deferredClose.promise, @@ -264,6 +271,112 @@ function ConnectionData(connection, identifier, options={}) { ConnectionData.byId = new Map(); ConnectionData.prototype = Object.freeze({ + /** + * Run a task, ensuring that its execution will not be interrupted by shutdown. + * + * As the operations of this module are asynchronous, a sequence of operations, + * or even an individual operation, can still be pending when the process shuts + * down. If any of this operations is a write, this can cause data loss, simply + * because the write has not been completed (or even started) by shutdown. + * + * To avoid this risk, clients are encouraged to use `executeBeforeShutdown` for + * any write operation, as follows: + * + * myConnection.executeBeforeShutdown("Bookmarks: Removing a bookmark", + * Task.async(function*(db) { + * // The connection will not be closed and shutdown will not proceed + * // until this task has completed. + * + * // `db` exposes the same API as `myConnection` but provides additional + * // logging support to help debug hard-to-catch shutdown timeouts. + * + * yield db.execute(...); + * })); + * + * @param {string} name A human-readable name for the ongoing operation, used + * for logging and debugging purposes. + * @param {function(db)} task A function that takes as argument a Sqlite.jsm + * db and returns a Promise. + */ + executeBeforeShutdown: function(name, task) { + if (!name) { + throw new TypeError("Expected a human-readable name as first argument"); + } + if (typeof task != "function") { + throw new TypeError("Expected a function as second argument"); + } + if (this._closeRequested) { + throw new Error(`${this._identifier}: cannot execute operation ${name}, the connection is already closing`); + } + + // Status, used for AsyncShutdown crash reports. + let status = { + // The latest command started by `task`, either as a + // sql string, or as one of "" or "". + command: "", + + // `true` if `command` was started but not completed yet. + isPending: false, + }; + + // An object with the same API as `this` but with + // additional logging. To keep logging simple, we + // assume that `task` is not running several queries + // concurrently. + let loggedDb = Object.create(this, { + execute: { + value: Task.async(function*(sql, ...rest) { + status.isPending = true; + status.command = sql; + try { + return (yield this.execute(sql, ...rest)); + } finally { + status.isPending = false; + } + }.bind(this)) + }, + close: { + value: Task.async(function*() { + status.isPending = false; + status.command = ""; + try { + return (yield this.close()); + } finally { + status.isPending = false; + } + }.bind(this)) + }, + executeCached: { + value: Task.async(function*(sql, ...rest) { + status.isPending = false; + status.command = sql; + try { + return (yield this.executeCached(sql, ...rest)); + } finally { + status.isPending = false; + } + }.bind(this)) + }, + }); + + let promiseResult = task(loggedDb); + if (!promiseResult || typeof promiseResult != "object" || !("then" in promiseResult)) { + throw new TypeError("Expected a Promise"); + } + let key = `${this._identifier}: ${name} (${this._getOperationId()})`; + let promiseComplete = promiseResult.catch(() => {}); + this._barrier.client.addBlocker(key, promiseComplete, { + fetchState: () => status + }); + + return Task.spawn(function*() { + try { + return (yield promiseResult); + } finally { + this._barrier.client.removeBlocker(key, promiseComplete) + } + }.bind(this)); + }, close: function () { this._closeRequested = true; @@ -274,20 +387,12 @@ ConnectionData.prototype = Object.freeze({ this._log.debug("Request to close connection."); this._clearIdleShrinkTimer(); - // We need to take extra care with transactions during shutdown. - // - // If we don't have a transaction in progress, we can proceed with shutdown - // immediately. - if (!this._hasInProgressTransaction) { + return this._barrier.wait().then(() => { + if (!this._dbConn) { + return; + } return this._finalize(); - } - - // If instead we do have a transaction in progress, it might be rollback-ed - // automaticall by closing the connection. Regardless, we wait for its - // completion, next enqueued transactions will be rejected. - this._log.warn("Transaction in progress at time of close. Rolling back."); - - return this._transactionQueue.then(() => this._finalize()); + }); }, clone: function (readOnly=false) { @@ -304,7 +409,9 @@ ConnectionData.prototype = Object.freeze({ return cloneStorageConnection(options); }, - + _getOperationId: function() { + return this._operationsCounter++; + }, _finalize: function () { this._log.debug("Finalizing connection."); // Cancel any pending statements. @@ -529,6 +636,10 @@ ConnectionData.prototype = Object.freeze({ // Atomically update the queue before anyone else has a chance to enqueue // further transactions. this._transactionQueue = promise.catch(ex => { console.error(ex) }); + + // Make sure that we do not shutdown the connection during a transaction. + this._barrier.client.addBlocker(`Transaction (${this._getOperationId()})`, + this._transactionQueue); return promise; }, @@ -801,8 +912,8 @@ function openConnection(options) { } Services.storage.openAsyncDatabase(file, dbOptions, (status, connection) => { if (!connection) { - log.warn("Could not open connection: " + status); - reject(new Error("Could not open connection: " + status)); + log.warn(`Could not open connection to ${path}: ${status}`); + reject(new Error(`Could not open connection to ${path}: ${status}`)); return; } log.info("Connection opened"); @@ -1095,6 +1206,10 @@ OpenedConnection.prototype = Object.freeze({ return this._connectionData.clone(readOnly); }, + executeBeforeShutdown: function(name, task) { + return this._connectionData.executeBeforeShutdown(name, task); + }, + /** * Execute a SQL statement and cache the underlying statement object. *