Bug 815339 - Extract PromiseWorker from OS.File. r=froydnj

This commit is contained in:
David Rajchenbach-Teller 2012-12-03 20:26:16 -05:00
parent 963d5254ad
commit 97fc5ed662
4 changed files with 216 additions and 186 deletions

View File

@ -38,3 +38,4 @@ libs::
$(NSINSTALL) $(srcdir)/osfile_async_front.jsm $(FINAL_TARGET)/modules/osfile
$(NSINSTALL) $(srcdir)/osfile_async_worker.js $(FINAL_TARGET)/modules/osfile
$(NSINSTALL) $(srcdir)/osfile_shared_front.jsm $(FINAL_TARGET)/modules/osfile
$(NSINSTALL) $(srcdir)/_PromiseWorker.jsm $(FINAL_TARGET)/modules/osfile

View File

@ -0,0 +1,182 @@
/**
* Thin wrapper around a ChromeWorker that wraps postMessage/onmessage/onerror
* as promises.
*
* Not for public use yet.
*/
"use strict";
this.EXPORTED_SYMBOLS = ["PromiseWorker"];
// The library of promises.
Components.utils.import("resource://gre/modules/commonjs/promise/core.js", this);
/**
* An implementation of queues (FIFO).
*
* The current implementation uses one array, runs in O(n ^ 2), and is optimized
* for the case in which queues are generally short.
*/
let Queue = function Queue() {
this._array = [];
};
Queue.prototype = {
pop: function pop() {
return this._array.pop();
},
push: function push(x) {
return this._array.push(x);
},
isEmpty: function isEmpty() {
return this._array.length == 0;
}
};
/**
* An object responsible for dispatching messages to
* a chrome worker and routing the responses.
*
* @param {string} url The url containing the source code for this worker,
* as in constructor ChromeWorker.
* @param {Function=} log Optionally, a logging function.
*
* @constructor
*/
function PromiseWorker(url, log) {
if (typeof url != "string") {
throw new TypeError("Expecting a string");
}
if (log != null && typeof log != "function") {
throw new TypeError("Expecting either null or a function");
}
this._log = log;
this._url = url;
/**
* The queue of deferred, waiting for the completion of their
* respective job by the worker.
*
* Each item in the list may contain an additional field |closure|,
* used to store strong references to value that must not be
* garbage-collected before the reply has been received (e.g.
* arrays).
*
* @type {Queue<{deferred:deferred, closure:*=}>}
*/
this._queue = new Queue();
/**
* The number of the current message.
*
* Used for debugging purposes.
*/
this._id = 0;
}
PromiseWorker.prototype = {
/**
* Instantiate the worker lazily.
*/
get _worker() {
delete this._worker;
let worker = new ChromeWorker(this._url);
let self = this;
Object.defineProperty(this, "_worker", {value:
worker
});
/**
* Receive errors that are not instances of OS.File.Error, propagate
* them to the listeners.
*
* The worker knows how to serialize errors that are instances
* of |OS.File.Error|. These are treated by |worker.onmessage|.
* However, for other errors, we rely on DOM's mechanism for
* serializing errors, which transmits these errors through
* |worker.onerror|.
*
* @param {Error} error Some JS error.
*/
worker.onerror = function onerror(error) {
if (self._log) {
self._log("Received uncaught error from worker", JSON.stringify(error.message), error.message);
}
error.preventDefault();
let {deferred} = self._queue.pop();
deferred.reject(error);
};
/**
* Receive messages from the worker, propagate them to the listeners.
*
* Messages must have one of the following shapes:
* - {ok: some_value} in case of success
* - {fail: some_error} in case of error, where
* some_error is an instance of |PromiseWorker.WorkerError|
*
* Messages may also contain a field |id| to help
* with debugging.
*
* @param {*} msg The message received from the worker.
*/
worker.onmessage = function onmessage(msg) {
if (self._log) {
self._log("Received message from worker", JSON.stringify(msg.data));
}
let handler = self._queue.pop();
let deferred = handler.deferred;
let data = msg.data;
if (data.id != handler.id) {
throw new Error("Internal error: expecting msg " + handler.id + ", " +
" got " + data.id + ": " + JSON.stringify(msg.data));
}
if ("ok" in data) {
deferred.resolve(data.ok);
} else if ("StopIteration" in data) {
// We have received a StopIteration error
deferred.reject(StopIteration);
} if ("fail" in data) {
// We have received an error that was serialized by the
// worker.
deferred.reject(new PromiseWorker.WorkerError(data.fail));
}
};
return worker;
},
/**
* Post a message to a worker.
*
* @param {string} fun The name of the function to call.
* @param {Array} array The contents of the message.
* @param {*=} closure An object holding references that should not be
* garbage-collected before the message treatment is complete.
*
* @return {promise}
*/
post: function post(fun, array, closure) {
let deferred = Promise.defer();
let id = ++this._id;
let message = {fun: fun, args: array, id: id};
if (this._log) {
this._log("Posting message", JSON.stringify(message));
}
this._queue.push({deferred:deferred, closure: closure, id: id});
this._worker.postMessage(message);
if (this._log) {
this._log("Message posted");
}
return deferred.promise;
}
};
/**
* An error that has been serialized by the worker.
*
* @constructor
*/
PromiseWorker.WorkerError = function WorkerError(data) {
this.data = data;
};
this.PromiseWorker = PromiseWorker;

View File

@ -34,12 +34,12 @@ const DEBUG = false;
// The constructor for file errors.
let OSError;
if (OS.Constants.Win) {
Components.utils.import("resource://gre/modules/osfile/osfile_win_allthreads.jsm");
Components.utils.import("resource://gre/modules/osfile/ospath_win_back.jsm");
Components.utils.import("resource://gre/modules/osfile/osfile_win_allthreads.jsm", this);
Components.utils.import("resource://gre/modules/osfile/ospath_win_back.jsm", this);
OSError = OS.Shared.Win.Error;
} else if (OS.Constants.libc) {
Components.utils.import("resource://gre/modules/osfile/osfile_unix_allthreads.jsm");
Components.utils.import("resource://gre/modules/osfile/ospath_unix_back.jsm");
Components.utils.import("resource://gre/modules/osfile/osfile_unix_allthreads.jsm", this);
Components.utils.import("resource://gre/modules/osfile/ospath_unix_back.jsm", this);
OSError = OS.Shared.Unix.Error;
} else {
throw new Error("I am neither under Windows nor under a Posix system");
@ -47,10 +47,17 @@ if (OS.Constants.Win) {
let Type = OS.Shared.Type;
// The library of promises.
Components.utils.import("resource://gre/modules/commonjs/promise/core.js");
Components.utils.import("resource://gre/modules/commonjs/promise/core.js", this);
// The implementation of communications
Components.utils.import("resource://gre/modules/osfile/_PromiseWorker.jsm", this);
/**
* Return a shallow clone of the enumerable properties of an object
* Return a shallow clone of the enumerable properties of an object.
*
* We use this whenever normalizing options requires making (shallow)
* changes to an option object. The copy ensures that we do not modify
* a client-provided object by accident.
*/
let clone = function clone(object) {
let result = {};
@ -65,189 +72,23 @@ let clone = function clone(object) {
*/
const noOptions = {};
/**
* An implementation of queues (FIFO).
*
* The current implementation uses two arrays and runs in O(n * log(n)).
* It is optimized for the case in which many items are enqueued sequentially.
*/
let Queue = function Queue() {
// The array to which the following |push| operations will add elements.
// If |null|, |this._pushing| will receive a new array.
// @type {Array|null}
this._pushing = null;
// The array from which the following |pop| operations will remove elements.
// If |null|, |this._popping| will receive |this._pushing|
// @type {Array|null}
this._popping = null;
// The number of items in |this._popping| that have been popped already
this._popindex = 0;
};
Queue.prototype = {
/**
* Push a new element
*/
push: function push(x) {
if (!this._pushing) {
this._pushing = [];
}
this._pushing.push({ value: x });
},
/**
* Pop an element.
*
* If the queue is empty, raise |Error|.
*/
pop: function pop() {
if (!this._popping) {
if (!this._pushing) {
throw new Error("Queue is empty");
}
this._popping = this._pushing;
this._pushing = null;
this._popindex = 0;
}
let result = this._popping[this._popindex];
delete this._popping[this._popindex];
++this._popindex;
if (this._popindex >= this._popping.length) {
this._popping = null;
}
return result.value;
}
};
/**
* An object responsible for dispatching messages to
* a worker and routing the responses.
*
* In this implementation, the Scheduler uses only
* one worker.
*/
let worker = new PromiseWorker(
"resource://gre/modules/osfile/osfile_async_worker.js",
DEBUG?LOG:null);
let Scheduler = {
/**
* Instantiate the worker lazily.
*/
get _worker() {
delete this._worker;
let worker = new ChromeWorker("osfile_async_worker.js");
let self = this;
Object.defineProperty(this, "_worker", {value:
worker
});
/**
* Receive errors that are not instances of OS.File.Error, propagate
* them to the listeners.
*
* The worker knows how to serialize errors that are instances
* of |OS.File.Error|. These are treated by |worker.onmessage|.
* However, for other errors, we rely on DOM's mechanism for
* serializing errors, which transmits these errors through
* |worker.onerror|.
*
* @param {Error} error Some JS error.
*/
worker.onerror = function onerror(error) {
if (DEBUG) {
LOG("Received uncaught error from worker", JSON.stringify(error.message), error.message);
}
error.preventDefault();
let {deferred} = self._queue.pop();
deferred.reject(error);
};
/**
* Receive messages from the worker, propagate them to the listeners.
*
* Messages must have one of the following shapes:
* - {ok: some_value} in case of success
* - {fail: some_error} in case of error, where
* some_error can be deserialized by
* |OS.File.Error.fromMsg|
*
* Messages may also contain a field |id| to help
* with debugging.
*
* @param {*} msg The message received from the worker.
*/
worker.onmessage = function onmessage(msg) {
if (DEBUG) {
LOG("Received message from worker", JSON.stringify(msg.data));
}
let handler = self._queue.pop();
let deferred = handler.deferred;
let data = msg.data;
if (data.id != handler.id) {
throw new Error("Internal error: expecting msg " + handler.id + ", " +
" got " + data.id + ": " + JSON.stringify(msg.data));
}
if ("ok" in data) {
deferred.resolve(data.ok);
} else if ("fail" in data) {
let error;
try {
error = OS.File.Error.fromMsg(data.fail);
} catch (x) {
LOG("Cannot decode OS.File.Error", data.fail, data.id);
deferred.reject(x);
return;
post: function post(...args) {
let promise = worker.post.apply(worker, args);
return promise.then(
null,
function onError(error) {
// Decode any serialized error
if (error instanceof PromiseWorker.WorkerError) {
throw OS.File.Error.fromMsg(error.data);
} else {
throw error;
}
deferred.reject(error);
} else {
throw new Error("Message does not respect protocol: " +
data.toSource());
}
};
return worker;
},
/**
* The queue of deferred, waiting for the completion of their
* respective job by the worker.
*
* Each item in the list may contain an additional field |closure|,
* used to store strong references to value that must not be
* garbage-collected before the reply has been received (e.g.
* arrays).
*
* @type {Queue<{deferred:deferred, closure:*=}>}
*/
_queue: new Queue(),
/**
* The number of the current message.
*
* Used for debugging purposes.
*/
_id: 0,
/**
* Post a message to a worker.
*
* @param {string} fun The name of the function to call.
* @param array The contents of the message.
* @param closure An object holding references that should not be
* garbage-collected before the message treatment is complete.
*
* @return {promise}
*/
post: function post(fun, array, closure) {
let deferred = Promise.defer();
let id = ++this._id;
let message = {fun: fun, args: array, id: id};
if (DEBUG) {
LOG("Posting message", JSON.stringify(message));
}
this._queue.push({deferred:deferred, closure: closure, id: id});
this._worker.postMessage(message);
if (DEBUG) {
LOG("Message posted");
}
return deferred.promise;
);
}
};

View File

@ -63,6 +63,12 @@ if (this.Components) {
LOG("Sending positive reply", JSON.stringify(result), "id is", id);
}
self.postMessage({ok: result, id:id});
} else if (exn == StopIteration) {
// StopIteration cannot be serialized automatically
if (DEBUG) {
LOG("Sending back StopIteration");
}
self.postMessage({StopIteration: true, id: id});
} else if (exn instanceof exports.OS.File.Error) {
if (DEBUG) {
LOG("Sending back OS.File error", exn, "id is", id);