mirror of
https://github.com/mozilla/gecko-dev.git
synced 2025-02-09 16:57:36 +00:00
Bug 1440022: initial implementation r=lina
MozReview-Commit-ID: GMnGfpUSnox --HG-- extra : rebase_source : 859c7b3d6c7ef70850b80b6a7356cef76ec66570
This commit is contained in:
parent
693edd476b
commit
bad5e0b59e
216
dom/push/PushBroadcastService.jsm
Normal file
216
dom/push/PushBroadcastService.jsm
Normal file
@ -0,0 +1,216 @@
|
||||
/* jshint moz: true, esnext: true */
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
* You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
"use strict";
|
||||
|
||||
ChromeUtils.import("resource://gre/modules/osfile.jsm");
|
||||
ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
|
||||
ChromeUtils.defineModuleGetter(this, "JSONFile", "resource://gre/modules/JSONFile.jsm");
|
||||
|
||||
var EXPORTED_SYMBOLS = ["pushBroadcastService"];
|
||||
|
||||
// We are supposed to ignore any updates with this version.
|
||||
// FIXME: what is the actual "dummy" version?
|
||||
// See bug 1467550.
|
||||
const DUMMY_VERSION_STRING = "dummy";
|
||||
|
||||
XPCOMUtils.defineLazyGetter(this, "console", () => {
|
||||
let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
|
||||
return new ConsoleAPI({
|
||||
maxLogLevelPref: "dom.push.loglevel",
|
||||
prefix: "BroadcastService",
|
||||
});
|
||||
});
|
||||
ChromeUtils.defineModuleGetter(this, "PushService", "resource://gre/modules/PushService.jsm");
|
||||
|
||||
class InvalidSourceInfo extends Error {
|
||||
constructor(message) {
|
||||
super(message);
|
||||
this.name = 'InvalidSourceInfo';
|
||||
}
|
||||
}
|
||||
|
||||
const BROADCAST_SERVICE_VERSION = 1;
|
||||
|
||||
var BroadcastService = class {
|
||||
constructor(pushService, path) {
|
||||
this.pushService = pushService;
|
||||
this.jsonFile = new JSONFile({
|
||||
path,
|
||||
dataPostProcessor: this._initializeJSONFile,
|
||||
});
|
||||
this.initializePromise = this.jsonFile.load();
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the listeners from our on-disk format to the format
|
||||
* needed by a hello message.
|
||||
*/
|
||||
async getListeners() {
|
||||
await this.initializePromise;
|
||||
return Object.entries(this.jsonFile.data.listeners).reduce((acc, [k, v]) => {
|
||||
acc[k] = v.version;
|
||||
return acc;
|
||||
}, {});
|
||||
}
|
||||
|
||||
_initializeJSONFile(data) {
|
||||
if (!data.version) {
|
||||
data.version = BROADCAST_SERVICE_VERSION;
|
||||
}
|
||||
if (!data.hasOwnProperty("listeners")) {
|
||||
data.listeners = {};
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset to a state akin to what you would get in a new profile.
|
||||
* In particular, wipe anything from storage.
|
||||
*
|
||||
* Used mainly for testing.
|
||||
*/
|
||||
async _resetListeners() {
|
||||
await this.initializePromise;
|
||||
this.jsonFile.data = this._initializeJSONFile({});
|
||||
this.initializePromise = Promise.resolve();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that a sourceInfo is correct (has the expected fields).
|
||||
*/
|
||||
_validateSourceInfo(sourceInfo) {
|
||||
const {moduleURI, symbolName} = sourceInfo;
|
||||
if (typeof moduleURI !== "string") {
|
||||
throw new InvalidSourceInfo(`moduleURI must be a string (got ${typeof moduleURI})`);
|
||||
}
|
||||
if (typeof symbolName !== "string") {
|
||||
throw new InvalidSourceInfo(`symbolName must be a string (got ${typeof symbolName})`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an entry for a given listener if it isn't present, or update
|
||||
* one if it is already present.
|
||||
*
|
||||
* Note that this means only a single listener can be set for a
|
||||
* given subscription. This is a limitation in the current API that
|
||||
* stems from the fact that there can only be one source of truth
|
||||
* for the subscriber's version. As a workaround, you can define a
|
||||
* listener which calls multiple other listeners.
|
||||
*
|
||||
* @param {string} broadcastId The broadcastID to listen for
|
||||
* @param {string} version The most recent version we have for
|
||||
* updates from this broadcastID
|
||||
* @param {Object} sourceInfo A description of the handler for
|
||||
* updates on this broadcastID
|
||||
*/
|
||||
async addListener(broadcastId, version, sourceInfo) {
|
||||
console.info("addListener: adding listener", broadcastId, version, sourceInfo);
|
||||
await this.initializePromise;
|
||||
this._validateSourceInfo(sourceInfo);
|
||||
if (typeof version !== "string") {
|
||||
throw new TypeError("version should be a string");
|
||||
}
|
||||
const isNew = !this.jsonFile.data.listeners.hasOwnProperty(broadcastId);
|
||||
|
||||
// Update listeners before telling the pushService to subscribe,
|
||||
// in case it would disregard the update in the small window
|
||||
// between getting listeners and setting state to RUNNING.
|
||||
this.jsonFile.data.listeners[broadcastId] = {version, sourceInfo};
|
||||
this.jsonFile.saveSoon();
|
||||
|
||||
if (isNew) {
|
||||
await this.pushService.subscribeBroadcast(broadcastId, version);
|
||||
}
|
||||
}
|
||||
|
||||
async receivedBroadcastMessage(broadcasts) {
|
||||
console.info("receivedBroadcastMessage:", broadcasts);
|
||||
await this.initializePromise;
|
||||
for (const broadcastId in broadcasts) {
|
||||
const version = broadcasts[broadcastId];
|
||||
if (version === DUMMY_VERSION_STRING) {
|
||||
console.info("Ignoring", version, "because it's the dummy version");
|
||||
continue;
|
||||
}
|
||||
// We don't know this broadcastID. This is probably a bug?
|
||||
if (!this.jsonFile.data.listeners.hasOwnProperty(broadcastId)) {
|
||||
console.warn("receivedBroadcastMessage: unknown broadcastId", broadcastId);
|
||||
continue;
|
||||
}
|
||||
|
||||
const {sourceInfo} = this.jsonFile.data.listeners[broadcastId];
|
||||
try {
|
||||
this._validateSourceInfo(sourceInfo);
|
||||
} catch (e) {
|
||||
console.error("receivedBroadcastMessage: malformed sourceInfo", sourceInfo, e);
|
||||
continue;
|
||||
}
|
||||
|
||||
const {moduleURI, symbolName} = sourceInfo;
|
||||
|
||||
const module = {};
|
||||
try {
|
||||
ChromeUtils.import(moduleURI, module);
|
||||
} catch (e) {
|
||||
console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
|
||||
"because import of module", moduleURI,
|
||||
"failed", e);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!module[symbolName]) {
|
||||
console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
|
||||
"because module", moduleName, "missing attribute", symbolName);
|
||||
continue;
|
||||
}
|
||||
|
||||
const handler = module[symbolName];
|
||||
|
||||
if (!handler.receivedBroadcastMessage) {
|
||||
console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
|
||||
"because handler returned by", `${moduleURI}.${symbolName}`,
|
||||
"has no receivedBroadcastMessage method");
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await handler.receivedBroadcastMessage(version, broadcastId);
|
||||
} catch (e) {
|
||||
console.error("receivedBroadcastMessage: handler for", broadcastId,
|
||||
"threw error:", e);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Broadcast message applied successfully. Update the version we
|
||||
// received if it's different than the one we had. We don't
|
||||
// enforce an ordering here (i.e. we use != instead of <)
|
||||
// because we don't know what the ordering of the service's
|
||||
// versions is going to be.
|
||||
if (this.jsonFile.data.listeners[broadcastId].version != version) {
|
||||
this.jsonFile.data.listeners[broadcastId].version = version;
|
||||
this.jsonFile.saveSoon();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// For test only.
|
||||
_saveImmediately() {
|
||||
return this.jsonFile._save();
|
||||
}
|
||||
}
|
||||
|
||||
function initializeBroadcastService() {
|
||||
// Fallback path for xpcshell tests.
|
||||
let path = "broadcast-listeners.json";
|
||||
if (OS.Constants.Path.profileDir) {
|
||||
// Real path for use in a real profile.
|
||||
path = OS.Path.join(OS.Constants.Path.profileDir, path);
|
||||
}
|
||||
return new BroadcastService(PushService, path);
|
||||
};
|
||||
|
||||
var pushBroadcastService = initializeBroadcastService();
|
@ -35,6 +35,7 @@ XPCOMUtils.defineLazyServiceGetter(this, "gPushNotifier",
|
||||
XPCOMUtils.defineLazyServiceGetter(this, "eTLDService",
|
||||
"@mozilla.org/network/effective-tld-service;1",
|
||||
"nsIEffectiveTLDService");
|
||||
ChromeUtils.defineModuleGetter(this, "pushBroadcastService", "resource://gre/modules/PushBroadcastService.jsm");
|
||||
|
||||
var EXPORTED_SYMBOLS = ["PushService"];
|
||||
|
||||
@ -216,13 +217,24 @@ var PushService = {
|
||||
}
|
||||
|
||||
let records = await this.getAllUnexpired();
|
||||
let broadcastListeners = await pushBroadcastService.getListeners();
|
||||
|
||||
// In principle, a listener could be added to the
|
||||
// pushBroadcastService here, after we have gotten listeners and
|
||||
// before we're RUNNING, but this can't happen in practice because
|
||||
// the only caller that can add listeners is PushBroadcastService,
|
||||
// and it waits on the same promise we are before it can add
|
||||
// listeners. If PushBroadcastService gets woken first, it will
|
||||
// update the value that is eventually returned from
|
||||
// getListeners.
|
||||
this._setState(PUSH_SERVICE_RUNNING);
|
||||
|
||||
if (records.length > 0 || prefs.get("alwaysConnect")) {
|
||||
// Connect if we have existing subscriptions, or if the always-on pref
|
||||
// is set.
|
||||
this._service.connect(records);
|
||||
// is set. We gate on the pref to let us do load testing before
|
||||
// turning it on for everyone, but if the user has push
|
||||
// subscriptions, we need to connect them anyhow.
|
||||
this._service.connect(records, broadcastListeners);
|
||||
}
|
||||
},
|
||||
|
||||
@ -462,13 +474,13 @@ var PushService = {
|
||||
if (options.serverURI) {
|
||||
// this is use for xpcshell test.
|
||||
|
||||
this._stateChangeProcessEnqueue(_ =>
|
||||
return this._stateChangeProcessEnqueue(_ =>
|
||||
this._changeServerURL(options.serverURI, STARTING_SERVICE_EVENT, options));
|
||||
|
||||
} else {
|
||||
// This is only used for testing. Different tests require connecting to
|
||||
// slightly different URLs.
|
||||
this._stateChangeProcessEnqueue(_ =>
|
||||
return this._stateChangeProcessEnqueue(_ =>
|
||||
this._changeServerURL(prefs.get("serverURL"), STARTING_SERVICE_EVENT));
|
||||
}
|
||||
},
|
||||
@ -740,6 +752,16 @@ var PushService = {
|
||||
});
|
||||
},
|
||||
|
||||
/**
|
||||
* Dispatches a broadcast notification to the BroadcastService.
|
||||
*/
|
||||
receivedBroadcastMessage(message) {
|
||||
pushBroadcastService.receivedBroadcastMessage(message.broadcasts)
|
||||
.catch(e => {
|
||||
console.error(e);
|
||||
});;
|
||||
},
|
||||
|
||||
/**
|
||||
* Updates a registration record after receiving a push message.
|
||||
*
|
||||
@ -1058,6 +1080,21 @@ var PushService = {
|
||||
});
|
||||
},
|
||||
|
||||
/*
|
||||
* Called only by the PushBroadcastService on the receipt of a new
|
||||
* subscription. Don't call this directly. Go through PushBroadcastService.
|
||||
*/
|
||||
async subscribeBroadcast(broadcastId, version) {
|
||||
if (this._state != PUSH_SERVICE_RUNNING) {
|
||||
// Ignore any request to subscribe before we send a hello.
|
||||
// We'll send all the broadcast listeners as part of the hello
|
||||
// anyhow.
|
||||
return;
|
||||
}
|
||||
|
||||
await this._service.sendSubscribeBroadcast(broadcastId, version);
|
||||
},
|
||||
|
||||
/**
|
||||
* Called on message from the child process.
|
||||
*
|
||||
|
@ -167,7 +167,7 @@ var PushServiceAndroidGCM = {
|
||||
// No action required.
|
||||
},
|
||||
|
||||
connect: function(records) {
|
||||
connect: function(records, broadcastListeners) {
|
||||
console.debug("connect:", records);
|
||||
// It's possible for the registration or subscriptions backing the
|
||||
// PushService to not be registered with the underlying AndroidPushService.
|
||||
@ -194,6 +194,10 @@ var PushServiceAndroidGCM = {
|
||||
});
|
||||
},
|
||||
|
||||
sendSubscribeBroadcast: async function(serviceId, version) {
|
||||
// Not implemented yet
|
||||
},
|
||||
|
||||
isConnected: function() {
|
||||
return this._mainPushService != null;
|
||||
},
|
||||
|
@ -426,10 +426,14 @@ var PushServiceHttp2 = {
|
||||
return serverURI.scheme == "https";
|
||||
},
|
||||
|
||||
connect: function(subscriptions) {
|
||||
connect: function(subscriptions, broadcastListeners) {
|
||||
this.startConnections(subscriptions);
|
||||
},
|
||||
|
||||
sendSubscribeBroadcast: async function(serviceId, version) {
|
||||
// Not implemented yet
|
||||
},
|
||||
|
||||
isConnected: function() {
|
||||
return this._mainPushService != null;
|
||||
},
|
||||
|
@ -314,6 +314,8 @@ var PushServiceWebSocket = {
|
||||
|
||||
this._mainPushService = mainPushService;
|
||||
this._serverURI = serverURI;
|
||||
// Filled in at connect() time
|
||||
this._broadcastListeners = null;
|
||||
|
||||
// Override the default WebSocket factory function. The returned object
|
||||
// must be null or satisfy the nsIWebSocketChannel interface. Used by
|
||||
@ -512,8 +514,9 @@ var PushServiceWebSocket = {
|
||||
}
|
||||
},
|
||||
|
||||
connect: function(records) {
|
||||
console.debug("connect()");
|
||||
connect: function(records, broadcastListeners) {
|
||||
console.debug("connect()", broadcastListeners);
|
||||
this._broadcastListeners = broadcastListeners;
|
||||
this._beginWSSetup();
|
||||
},
|
||||
|
||||
@ -566,6 +569,13 @@ var PushServiceWebSocket = {
|
||||
this._currentState = STATE_READY;
|
||||
prefs.observe("userAgentID", this);
|
||||
|
||||
// Handle broadcasts received in response to the "hello" message.
|
||||
if (reply.broadcasts) {
|
||||
// The reply isn't technically a broadcast message, but it has
|
||||
// the shape of a broadcast message (it has a broadcasts field).
|
||||
this._mainPushService.receivedBroadcastMessage(reply);
|
||||
}
|
||||
|
||||
this._dataEnabled = !!reply.use_webpush;
|
||||
if (this._dataEnabled) {
|
||||
this._mainPushService.getAllUnexpired().then(records =>
|
||||
@ -747,6 +757,10 @@ var PushServiceWebSocket = {
|
||||
}
|
||||
},
|
||||
|
||||
_handleBroadcastReply: function(reply) {
|
||||
this._mainPushService.receivedBroadcastMessage(reply);
|
||||
},
|
||||
|
||||
reportDeliveryError(messageID, reason) {
|
||||
console.debug("reportDeliveryError()");
|
||||
let code = kDELIVERY_REASON_TO_CODE[reason];
|
||||
@ -945,6 +959,7 @@ var PushServiceWebSocket = {
|
||||
|
||||
let data = {
|
||||
messageType: "hello",
|
||||
broadcasts: this._broadcastListeners,
|
||||
use_webpush: true,
|
||||
};
|
||||
|
||||
@ -1013,7 +1028,7 @@ var PushServiceWebSocket = {
|
||||
|
||||
// A whitelist of protocol handlers. Add to these if new messages are added
|
||||
// in the protocol.
|
||||
let handlers = ["Hello", "Register", "Unregister", "Notification"];
|
||||
let handlers = ["Hello", "Register", "Unregister", "Notification", "Broadcast"];
|
||||
|
||||
// Build up the handler name to call from messageType.
|
||||
// e.g. messageType == "register" -> _handleRegisterReply.
|
||||
@ -1112,6 +1127,17 @@ var PushServiceWebSocket = {
|
||||
}
|
||||
return request;
|
||||
},
|
||||
|
||||
sendSubscribeBroadcast(serviceId, version) {
|
||||
let data = {
|
||||
messageType: "broadcast_subscribe",
|
||||
broadcasts: {
|
||||
[serviceId]: version
|
||||
},
|
||||
};
|
||||
|
||||
this._queueRequest(data);
|
||||
},
|
||||
};
|
||||
|
||||
function PushRecordWebSocket(record) {
|
||||
|
@ -13,6 +13,7 @@ EXTRA_COMPONENTS += [
|
||||
]
|
||||
|
||||
EXTRA_JS_MODULES += [
|
||||
'PushBroadcastService.jsm',
|
||||
'PushCrypto.jsm',
|
||||
'PushDB.jsm',
|
||||
'PushRecord.jsm',
|
||||
|
Loading…
x
Reference in New Issue
Block a user