Bug 1043863 - OpenedConnection.prototype.executeBeforeShutdown. r=mak

--HG--
extra : transplant_source : %19%8F%9Fx%F1%C3%3C%DE%A5%3C%97%3A%91%11%5D%C7h%D4%60%CA
This commit is contained in:
David Rajchenbach-Teller 2015-05-05 12:45:25 +02:00
parent 58f0e147c3
commit 849af1389a

View File

@ -219,6 +219,9 @@ function ConnectionData(connection, identifier, options={}) {
// Increments for each executed statement for the life of the connection.
this._statementCounter = 0;
// Increments whenever we request a unique operation id.
this._operationsCounter = 0;
this._hasInProgressTransaction = false;
// Manages a chain of transactions promises, so that new transactions
// always happen in queue to the previous ones. It never rejects.
@ -237,6 +240,10 @@ function ConnectionData(connection, identifier, options={}) {
this._deferredClose = PromiseUtils.defer();
this._closeRequested = false;
// An AsyncShutdown barrier used to make sure that we wait until clients
// are done before shutting down the connection.
this._barrier = new AsyncShutdown.Barrier(`${this._identifier}: waiting for clients`);
Barriers.connections.client.addBlocker(
this._identifier + ": waiting for shutdown",
this._deferredClose.promise,
@ -264,6 +271,112 @@ function ConnectionData(connection, identifier, options={}) {
ConnectionData.byId = new Map();
ConnectionData.prototype = Object.freeze({
/**
* Run a task, ensuring that its execution will not be interrupted by shutdown.
*
* As the operations of this module are asynchronous, a sequence of operations,
* or even an individual operation, can still be pending when the process shuts
* down. If any of this operations is a write, this can cause data loss, simply
* because the write has not been completed (or even started) by shutdown.
*
* To avoid this risk, clients are encouraged to use `executeBeforeShutdown` for
* any write operation, as follows:
*
* myConnection.executeBeforeShutdown("Bookmarks: Removing a bookmark",
* Task.async(function*(db) {
* // The connection will not be closed and shutdown will not proceed
* // until this task has completed.
*
* // `db` exposes the same API as `myConnection` but provides additional
* // logging support to help debug hard-to-catch shutdown timeouts.
*
* yield db.execute(...);
* }));
*
* @param {string} name A human-readable name for the ongoing operation, used
* for logging and debugging purposes.
* @param {function(db)} task A function that takes as argument a Sqlite.jsm
* db and returns a Promise.
*/
executeBeforeShutdown: function(name, task) {
if (!name) {
throw new TypeError("Expected a human-readable name as first argument");
}
if (typeof task != "function") {
throw new TypeError("Expected a function as second argument");
}
if (this._closeRequested) {
throw new Error(`${this._identifier}: cannot execute operation ${name}, the connection is already closing`);
}
// Status, used for AsyncShutdown crash reports.
let status = {
// The latest command started by `task`, either as a
// sql string, or as one of "<not started>" or "<closing>".
command: "<not started>",
// `true` if `command` was started but not completed yet.
isPending: false,
};
// An object with the same API as `this` but with
// additional logging. To keep logging simple, we
// assume that `task` is not running several queries
// concurrently.
let loggedDb = Object.create(this, {
execute: {
value: Task.async(function*(sql, ...rest) {
status.isPending = true;
status.command = sql;
try {
return (yield this.execute(sql, ...rest));
} finally {
status.isPending = false;
}
}.bind(this))
},
close: {
value: Task.async(function*() {
status.isPending = false;
status.command = "<close>";
try {
return (yield this.close());
} finally {
status.isPending = false;
}
}.bind(this))
},
executeCached: {
value: Task.async(function*(sql, ...rest) {
status.isPending = false;
status.command = sql;
try {
return (yield this.executeCached(sql, ...rest));
} finally {
status.isPending = false;
}
}.bind(this))
},
});
let promiseResult = task(loggedDb);
if (!promiseResult || typeof promiseResult != "object" || !("then" in promiseResult)) {
throw new TypeError("Expected a Promise");
}
let key = `${this._identifier}: ${name} (${this._getOperationId()})`;
let promiseComplete = promiseResult.catch(() => {});
this._barrier.client.addBlocker(key, promiseComplete, {
fetchState: () => status
});
return Task.spawn(function*() {
try {
return (yield promiseResult);
} finally {
this._barrier.client.removeBlocker(key, promiseComplete)
}
}.bind(this));
},
close: function () {
this._closeRequested = true;
@ -274,20 +387,12 @@ ConnectionData.prototype = Object.freeze({
this._log.debug("Request to close connection.");
this._clearIdleShrinkTimer();
// We need to take extra care with transactions during shutdown.
//
// If we don't have a transaction in progress, we can proceed with shutdown
// immediately.
if (!this._hasInProgressTransaction) {
return this._barrier.wait().then(() => {
if (!this._dbConn) {
return;
}
return this._finalize();
}
// If instead we do have a transaction in progress, it might be rollback-ed
// automaticall by closing the connection. Regardless, we wait for its
// completion, next enqueued transactions will be rejected.
this._log.warn("Transaction in progress at time of close. Rolling back.");
return this._transactionQueue.then(() => this._finalize());
});
},
clone: function (readOnly=false) {
@ -304,7 +409,9 @@ ConnectionData.prototype = Object.freeze({
return cloneStorageConnection(options);
},
_getOperationId: function() {
return this._operationsCounter++;
},
_finalize: function () {
this._log.debug("Finalizing connection.");
// Cancel any pending statements.
@ -529,6 +636,10 @@ ConnectionData.prototype = Object.freeze({
// Atomically update the queue before anyone else has a chance to enqueue
// further transactions.
this._transactionQueue = promise.catch(ex => { console.error(ex) });
// Make sure that we do not shutdown the connection during a transaction.
this._barrier.client.addBlocker(`Transaction (${this._getOperationId()})`,
this._transactionQueue);
return promise;
},
@ -801,8 +912,8 @@ function openConnection(options) {
}
Services.storage.openAsyncDatabase(file, dbOptions, (status, connection) => {
if (!connection) {
log.warn("Could not open connection: " + status);
reject(new Error("Could not open connection: " + status));
log.warn(`Could not open connection to ${path}: ${status}`);
reject(new Error(`Could not open connection to ${path}: ${status}`));
return;
}
log.info("Connection opened");
@ -1095,6 +1206,10 @@ OpenedConnection.prototype = Object.freeze({
return this._connectionData.clone(readOnly);
},
executeBeforeShutdown: function(name, task) {
return this._connectionData.executeBeforeShutdown(name, task);
},
/**
* Execute a SQL statement and cache the underlying statement object.
*