Bug 1207744 - Track and re-send push unregister requests on reconnect. r=dragana

MozReview-Commit-ID: 2rFLm07n4EU

--HG--
extra : rebase_source : a93554e2320c0ff46e7b66d41f668ecaf75836d4
extra : source : f7e1ab1bd99c05c219fe75913f8f37ba39aec092
This commit is contained in:
Kit Cambridge 2016-06-08 06:13:15 -07:00
parent 9e36cfc751
commit 39cc699a48
12 changed files with 204 additions and 102 deletions

View File

@ -1023,10 +1023,8 @@ this.PushService = {
_sendRequest(action, ...params) {
if (this._state == PUSH_SERVICE_CONNECTION_DISABLE) {
return Promise.reject(new Error("Push service disabled"));
} else if (this._state == PUSH_SERVICE_ACTIVE_OFFLINE) {
if (this._service.serviceType() == "WebSocket" && action == "unregister") {
return Promise.resolve();
}
if (this._state == PUSH_SERVICE_ACTIVE_OFFLINE) {
return Promise.reject(new Error("Push service offline"));
}
// Ensure the backend is ready. `getByPageRecord` already checks this, but
@ -1203,12 +1201,13 @@ this.PushService = {
let reason = Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL;
return Promise.all([
this._sendUnregister(record, reason),
this._db.delete(record.keyID),
]).then(() => {
this._db.delete(record.keyID).then(record => {
if (record) {
gPushNotifier.notifySubscriptionModified(record.scope,
record.principal);
return true;
});
}
}),
]).then(([success]) => success);
});
},

View File

@ -59,10 +59,6 @@ this.PushServiceAndroidGCM = {
PushRecordAndroidGCM);
},
serviceType: function() {
return "AndroidGCM";
},
validServerURI: function(serverURI) {
if (!serverURI) {
return false;

View File

@ -430,10 +430,6 @@ this.PushServiceHttp2 = {
PushRecordHttp2);
},
serviceType: function() {
return "http2";
},
hasmainPushService: function() {
return this._mainPushService !== null;
},

View File

@ -143,10 +143,6 @@ this.PushServiceWebSocket = {
PushRecordWebSocket);
},
serviceType: function() {
return "WebSocket";
},
disconnect: function() {
this._shutdownWS();
},
@ -234,17 +230,15 @@ this.PushServiceWebSocket = {
requestTimedOut = true;
} else {
for (let [channelID, request] of this._registerRequests) {
for (let [key, request] of this._pendingRequests) {
let duration = now - request.ctime;
// If any of the registration requests time out, all the ones after it
// also made to fail, since we are going to be disconnecting the
// socket.
requestTimedOut |= duration > this._requestTimeout;
if (requestTimedOut) {
request.reject(new Error(
"Register request timed out for channel ID " + channelID));
this._registerRequests.delete(channelID);
request.reject(new Error("Request timed out: " + key));
this._pendingRequests.delete(key);
}
}
}
@ -278,7 +272,7 @@ this.PushServiceWebSocket = {
},
_ws: null,
_registerRequests: new Map(),
_pendingRequests: new Map(),
_currentState: STATE_SHUT_DOWN,
_requestTimeout: 0,
_requestTimeoutTimer: null,
@ -376,7 +370,7 @@ this.PushServiceWebSocket = {
}
if (shouldCancelPending) {
this._cancelRegisterRequests();
this._cancelPendingRequests();
}
if (this._notifyRequestQueue) {
@ -437,7 +431,7 @@ this.PushServiceWebSocket = {
/** Indicates whether we're waiting for pongs or requests. */
_hasPendingRequests() {
return this._lastPingTime > 0 || this._registerRequests.size > 0;
return this._lastPingTime > 0 || this._pendingRequests.size > 0;
},
/**
@ -622,7 +616,7 @@ this.PushServiceWebSocket = {
this._notifyRequestQueue();
this._notifyRequestQueue = null;
}
this._sendRegisterRequests();
this._sendPendingRequests();
};
function finishHandshake() {
@ -669,15 +663,10 @@ this.PushServiceWebSocket = {
*/
_handleRegisterReply: function(reply) {
console.debug("handleRegisterReply()");
if (typeof reply.channelID !== "string" ||
!this._registerRequests.has(reply.channelID)) {
return;
}
let tmp = this._registerRequests.get(reply.channelID);
this._registerRequests.delete(reply.channelID);
if (!this._hasPendingRequests()) {
this._requestTimeoutTimer.cancel();
let tmp = this._takeRequestForReply(reply);
if (!tmp) {
return;
}
if (reply.status == 200) {
@ -708,6 +697,18 @@ this.PushServiceWebSocket = {
}
},
_handleUnregisterReply(reply) {
console.debug("handleUnregisterReply()");
let request = this._takeRequestForReply(reply);
if (!request) {
return;
}
let success = reply.status === 200;
request.resolve(success);
},
_handleDataUpdate: function(update) {
let promise;
if (typeof update.channelID != "string") {
@ -845,9 +846,6 @@ this.PushServiceWebSocket = {
register(record) {
console.debug("register() ", record);
// start the timer since we now have at least one request
this._startRequestTimeoutTimer();
let data = {channelID: this._generateID(),
messageType: "register"};
@ -858,15 +856,7 @@ this.PushServiceWebSocket = {
});
}
return new Promise((resolve, reject) => {
this._registerRequests.set(data.channelID, {
record: record,
resolve: resolve,
reject: reject,
ctime: Date.now(),
});
this._queueRequest(data);
}).then(record => {
return this._sendRequestForReply(record, data).then(record => {
if (!this._dataEnabled) {
return record;
}
@ -883,15 +873,17 @@ this.PushServiceWebSocket = {
unregister(record, reason) {
console.debug("unregister() ", record, reason);
return Promise.resolve().then(_ => {
let code = kUNREGISTER_REASON_TO_CODE[reason];
if (!code) {
return Promise.reject(new Error('Invalid unregister reason'));
throw new Error('Invalid unregister reason');
}
let data = {channelID: record.channelID,
messageType: "unregister",
code: code};
this._queueRequest(data);
return Promise.resolve();
return this._sendRequestForReply(record, data);
});
},
_queueStart: Promise.resolve(),
@ -907,40 +899,68 @@ this.PushServiceWebSocket = {
.catch(_ => {});
},
/** Sends a request to the server. */
_send(data) {
if (this._currentState == STATE_READY) {
if (data.messageType != "register" ||
this._registerRequests.has(data.channelID)) {
// check if request has not been cancelled
if (this._currentState != STATE_READY) {
console.warn("send: Unexpected state; ignoring message",
this._currentState);
return;
}
if (!this._requestHasReply(data)) {
this._wsSendMessage(data);
return;
}
// If we're expecting a reply, check that we haven't cancelled the request.
let key = this._makePendingRequestKey(data);
if (!this._pendingRequests.has(key)) {
console.log("send: Request cancelled; ignoring message", key);
return;
}
this._wsSendMessage(data);
},
_sendRegisterRequests() {
/** Indicates whether a request has a corresponding reply from the server. */
_requestHasReply(data) {
return data.messageType == "register" || data.messageType == "unregister";
},
/**
* Sends all pending requests that expect replies. Called after the connection
* is established and the handshake is complete.
*/
_sendPendingRequests() {
this._enqueue(_ => {
for (let channelID of this._registerRequests.keys()) {
this._send({
messageType: "register",
channelID: channelID,
});
for (let request of this._pendingRequests.values()) {
this._send(request.data);
}
});
},
/** Queues an outgoing request, establishing a connection if necessary. */
_queueRequest(data) {
if (data.messageType != "register") {
if (this._currentState != STATE_READY && !this._notifyRequestQueue) {
console.debug("queueRequest()", data);
if (this._currentState == STATE_READY) {
// If we're ready, no need to queue; just send the request.
this._send(data);
return;
}
// Otherwise, we're still setting up. If we don't have a request queue,
// make one now.
if (!this._notifyRequestQueue) {
let promise = new Promise((resolve, reject) => {
this._notifyRequestQueue = resolve;
});
this._enqueue(_ => promise);
}
let isRequest = this._requestHasReply(data);
if (!isRequest) {
// Don't queue requests, since they're stored in `_pendingRequests`, and
// `_sendPendingRequests` will send them after reconnecting. Without this
// check, we'd send requests twice.
this._enqueue(_ => this._send(data));
} else if (this._currentState == STATE_READY) {
this._send(data);
}
if (!this._ws) {
@ -1059,7 +1079,7 @@ this.PushServiceWebSocket = {
// A whitelist of protocol handlers. Add to these if new messages are added
// in the protocol.
let handlers = ["Hello", "Register", "Notification"];
let handlers = ["Hello", "Register", "Unregister", "Notification"];
// Build up the handler name to call from messageType.
// e.g. messageType == "register" -> _handleRegisterReply.
@ -1105,11 +1125,58 @@ this.PushServiceWebSocket = {
/**
* Rejects all pending register requests with errors.
*/
_cancelRegisterRequests: function() {
for (let request of this._registerRequests.values()) {
request.reject(new Error("Register request aborted"));
_cancelPendingRequests() {
for (let request of this._pendingRequests.values()) {
request.reject(new Error("Request aborted"));
}
this._registerRequests.clear();
this._pendingRequests.clear();
},
/** Creates a case-insensitive map key for a request that expects a reply. */
_makePendingRequestKey(data) {
return (data.messageType + "|" + data.channelID).toLowerCase();
},
/** Sends a request and waits for a reply from the server. */
_sendRequestForReply(record, data) {
return Promise.resolve().then(_ => {
// start the timer since we now have at least one request
this._startRequestTimeoutTimer();
let key = this._makePendingRequestKey(data);
if (!this._pendingRequests.has(key)) {
let request = {
data: data,
record: record,
ctime: Date.now(),
};
request.promise = new Promise((resolve, reject) => {
request.resolve = resolve;
request.reject = reject;
});
this._pendingRequests.set(key, request);
this._queueRequest(data);
}
return this._pendingRequests.get(key).promise;
});
},
/** Removes and returns a pending request for a server reply. */
_takeRequestForReply(reply) {
if (typeof reply.channelID !== "string") {
return null;
}
let key = this._makePendingRequestKey(reply);
let request = this._pendingRequests.get(key);
if (!request) {
return null;
}
this._pendingRequests.delete(key);
if (!this._hasPendingRequests()) {
this._requestTimeoutTimer.cancel();
}
return request;
},
};

View File

@ -106,7 +106,11 @@
},
onUnregister(request) {
// Do nothing.
this.serverSendMsg(JSON.stringify({
messageType: "unregister",
channelID: request.channelID,
status: 200,
}));
},
onAck(request) {

View File

@ -406,6 +406,13 @@ var setUpServiceInParent = Task.async(function* (service, db) {
pushEndpoint: 'https://example.org/push/' + request.channelID,
}));
},
onUnregister(request) {
this.serverSendMsg(JSON.stringify({
messageType: 'unregister',
channelID: request.channelID,
status: 200,
}));
},
});
},
});

View File

@ -59,6 +59,11 @@ add_task(function* setup() {
delete unregisterDefers[request.channelID];
equal(request.code, 200,
'Expected manual unregister reason');
this.serverSendMsg(JSON.stringify({
messageType: 'unregister',
channelID: request.channelID,
status: 200,
}));
resolve();
},
});

View File

@ -63,6 +63,11 @@ add_task(function* setup() {
equal(request.code, 200,
'Expected manual unregister reason');
resolve();
this.serverSendMsg(JSON.stringify({
messageType: 'unregister',
status: 200,
channelID: request.channelID,
}));
},
});
},

View File

@ -115,6 +115,13 @@ add_task(function* setUp() {
uaid: userAgentID,
}));
},
onUnregister(request) {
this.serverSendMsg(JSON.stringify({
messageType: 'unregister',
channelID: request.channelID,
status: 200,
}));
},
});
},
});

View File

@ -120,6 +120,11 @@ add_task(function* setUp() {
equal(request.code, 202,
'Expected permission revoked unregister reason');
resolve();
this.serverSendMsg(JSON.stringify({
messageType: 'unregister',
status: 200,
channelID: request.channelID,
}));
},
onACK(request) {},
});

View File

@ -50,7 +50,8 @@ add_task(function* test_unregister_invalid_json() {
this.serverSendMsg(JSON.stringify({
messageType: 'hello',
status: 200,
uaid: userAgentID
uaid: userAgentID,
use_webpush: true,
}));
},
onUnregister(request) {
@ -61,21 +62,27 @@ add_task(function* test_unregister_invalid_json() {
}
});
// "unregister" is fire-and-forget: it's sent via _send(), not
// _sendRequest().
yield PushService.unregister({
yield rejects(
PushService.unregister({
scope: 'https://example.edu/page/1',
originAttributes: '',
});
}),
'Expected error for first invalid JSON response'
);
let record = yield db.getByKeyID(
'87902e90-c57e-4d18-8354-013f4a556559');
ok(!record, 'Failed to delete unregistered record');
yield PushService.unregister({
yield rejects(
PushService.unregister({
scope: 'https://example.net/page/1',
originAttributes: ChromeUtils.originAttributesToSuffix(
{ appId: Ci.nsIScriptSecurityManager.NO_APP_ID, inIsolatedMozBrowser: false }),
});
}),
'Expected error for second invalid JSON response'
);
record = yield db.getByKeyID(
'057caa8f-9b99-47ff-891c-adad18ce603e');
ok(!record,

View File

@ -5,11 +5,14 @@
const {PushDB, PushService, PushServiceWebSocket} = serviceExports;
const userAgentID = 'fbe865a6-aeb8-446f-873c-aeebdb8d493c';
const channelID = 'db0a7021-ec2d-4bd3-8802-7a6966f10ed8';
function run_test() {
do_get_profile();
setPrefs();
setPrefs({
userAgentID: userAgentID,
});
run_next_test();
}
@ -36,7 +39,8 @@ add_task(function* test_unregister_success() {
this.serverSendMsg(JSON.stringify({
messageType: 'hello',
status: 200,
uaid: 'fbe865a6-aeb8-446f-873c-aeebdb8d493c'
uaid: userAgentID,
use_webpush: true,
}));
},
onUnregister(request) {