gecko-dev/dom/push/PushServiceHttp2.jsm
Andrew McCreight 5dec0e0beb Bug 1432992, part 1 - Remove definitions of Ci, Cr, Cc, and Cu. r=florian
This patch was autogenerated by my decomponents.py

It covers almost every file with the extension js, jsm, html, py,
xhtml, or xul.

It removes blank lines after removed lines, when the removed lines are
preceded by either blank lines or the start of a new block. The "start
of a new block" is defined fairly hackily: either the line starts with
//, ends with */, ends with {, <![CDATA[, """ or '''. The first two
cover comments, the third one covers JS, the fourth covers JS embedded
in XUL, and the final two cover JS embedded in Python. This also
applies if the removed line was the first line of the file.

It covers the pattern matching cases like "var {classes: Cc,
interfaces: Ci, utils: Cu, results: Cr} = Components;". It'll remove
the entire thing if they are all either Ci, Cr, Cc or Cu, or it will
remove the appropriate ones and leave the residue behind. If there's
only one behind, then it will turn it into a normal, non-pattern
matching variable definition. (For instance, "const { classes: Cc,
Constructor: CC, interfaces: Ci, utils: Cu } = Components" becomes
"const CC = Components.Constructor".)

MozReview-Commit-ID: DeSHcClQ7cG

--HG--
extra : rebase_source : d9c41878036c1ef7766ef5e91a7005025bc1d72b
2018-02-06 09:36:57 -08:00

813 lines
24 KiB
JavaScript

/* jshint moz: true, esnext: true */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
const {PushDB} = ChromeUtils.import("resource://gre/modules/PushDB.jsm");
const {PushRecord} = ChromeUtils.import("resource://gre/modules/PushRecord.jsm");
ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
ChromeUtils.import("resource://gre/modules/Services.jsm");
ChromeUtils.import("resource://gre/modules/NetUtil.jsm");
ChromeUtils.import("resource://gre/modules/IndexedDBHelper.jsm");
ChromeUtils.import("resource://gre/modules/Timer.jsm");
const {
PushCrypto,
concatArray,
} = ChromeUtils.import("resource://gre/modules/PushCrypto.jsm");
this.EXPORTED_SYMBOLS = ["PushServiceHttp2"];
XPCOMUtils.defineLazyGetter(this, "console", () => {
let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
return new ConsoleAPI({
maxLogLevelPref: "dom.push.loglevel",
prefix: "PushServiceHttp2",
});
});
const prefs = Services.prefs.getBranch("dom.push.");
const kPUSHHTTP2DB_DB_NAME = "pushHttp2";
const kPUSHHTTP2DB_DB_VERSION = 5; // Change this if the IndexedDB format changes
const kPUSHHTTP2DB_STORE_NAME = "pushHttp2";
/**
* A proxy between the PushService and connections listening for incoming push
* messages. The PushService can silence messages from the connections by
* setting PushSubscriptionListener._pushService to null. This is required
* because it can happen that there is an outstanding push message that will
* be send on OnStopRequest but the PushService may not be interested in these.
* It's easier to stop listening than to have checks at specific points.
*/
var PushSubscriptionListener = function(pushService, uri) {
console.debug("PushSubscriptionListener()");
this._pushService = pushService;
this.uri = uri;
};
PushSubscriptionListener.prototype = {
QueryInterface: function (aIID) {
if (aIID.equals(Ci.nsIHttpPushListener) ||
aIID.equals(Ci.nsIStreamListener)) {
return this;
}
throw Components.results.NS_ERROR_NO_INTERFACE;
},
getInterface: function(aIID) {
return this.QueryInterface(aIID);
},
onStartRequest: function(aRequest, aContext) {
console.debug("PushSubscriptionListener: onStartRequest()");
// We do not do anything here.
},
onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) {
console.debug("PushSubscriptionListener: onDataAvailable()");
// Nobody should send data, but just to be sure, otherwise necko will
// complain.
if (aCount === 0) {
return;
}
let inputStream = Cc["@mozilla.org/scriptableinputstream;1"]
.createInstance(Ci.nsIScriptableInputStream);
inputStream.init(aStream);
var data = inputStream.read(aCount);
},
onStopRequest: function(aRequest, aContext, aStatusCode) {
console.debug("PushSubscriptionListener: onStopRequest()");
if (!this._pushService) {
return;
}
this._pushService.connOnStop(aRequest,
Components.isSuccessCode(aStatusCode),
this.uri);
},
onPush: function(associatedChannel, pushChannel) {
console.debug("PushSubscriptionListener: onPush()");
var pushChannelListener = new PushChannelListener(this);
pushChannel.asyncOpen2(pushChannelListener);
},
disconnect: function() {
this._pushService = null;
}
};
/**
* The listener for pushed messages. The message data is collected in
* OnDataAvailable and send to the app in OnStopRequest.
*/
var PushChannelListener = function(pushSubscriptionListener) {
console.debug("PushChannelListener()");
this._mainListener = pushSubscriptionListener;
this._message = [];
this._ackUri = null;
};
PushChannelListener.prototype = {
onStartRequest: function(aRequest, aContext) {
this._ackUri = aRequest.URI.spec;
},
onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) {
console.debug("PushChannelListener: onDataAvailable()");
if (aCount === 0) {
return;
}
let inputStream = Cc["@mozilla.org/binaryinputstream;1"]
.createInstance(Ci.nsIBinaryInputStream);
inputStream.setInputStream(aStream);
let chunk = new ArrayBuffer(aCount);
inputStream.readArrayBuffer(aCount, chunk);
this._message.push(chunk);
},
onStopRequest: function(aRequest, aContext, aStatusCode) {
console.debug("PushChannelListener: onStopRequest()", "status code",
aStatusCode);
if (Components.isSuccessCode(aStatusCode) &&
this._mainListener &&
this._mainListener._pushService) {
let headers = {
encryption_key: getHeaderField(aRequest, "Encryption-Key"),
crypto_key: getHeaderField(aRequest, "Crypto-Key"),
encryption: getHeaderField(aRequest, "Encryption"),
encoding: getHeaderField(aRequest, "Content-Encoding"),
};
let msg = concatArray(this._message);
this._mainListener._pushService._pushChannelOnStop(this._mainListener.uri,
this._ackUri,
headers,
msg);
}
}
};
function getHeaderField(aRequest, name) {
try {
return aRequest.getRequestHeader(name);
} catch(e) {
// getRequestHeader can throw.
return null;
}
}
var PushServiceDelete = function(resolve, reject) {
this._resolve = resolve;
this._reject = reject;
};
PushServiceDelete.prototype = {
onStartRequest: function(aRequest, aContext) {},
onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) {
// Nobody should send data, but just to be sure, otherwise necko will
// complain.
if (aCount === 0) {
return;
}
let inputStream = Cc["@mozilla.org/scriptableinputstream;1"]
.createInstance(Ci.nsIScriptableInputStream);
inputStream.init(aStream);
var data = inputStream.read(aCount);
},
onStopRequest: function(aRequest, aContext, aStatusCode) {
if (Components.isSuccessCode(aStatusCode)) {
this._resolve();
} else {
this._reject(new Error("Error removing subscription: " + aStatusCode));
}
}
};
var SubscriptionListener = function(aSubInfo, aResolve, aReject,
aServerURI, aPushServiceHttp2) {
console.debug("SubscriptionListener()");
this._subInfo = aSubInfo;
this._resolve = aResolve;
this._reject = aReject;
this._data = '';
this._serverURI = aServerURI;
this._service = aPushServiceHttp2;
this._ctime = Date.now();
this._retryTimeoutID = null;
};
SubscriptionListener.prototype = {
onStartRequest: function(aRequest, aContext) {},
onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) {
console.debug("SubscriptionListener: onDataAvailable()");
// We do not expect any data, but necko will complain if we do not consume
// it.
if (aCount === 0) {
return;
}
let inputStream = Cc["@mozilla.org/scriptableinputstream;1"]
.createInstance(Ci.nsIScriptableInputStream);
inputStream.init(aStream);
this._data.concat(inputStream.read(aCount));
},
onStopRequest: function(aRequest, aContext, aStatus) {
console.debug("SubscriptionListener: onStopRequest()");
// Check if pushService is still active.
if (!this._service.hasmainPushService()) {
this._reject(new Error("Push service unavailable"));
return;
}
if (!Components.isSuccessCode(aStatus)) {
this._reject(new Error("Error listening for messages: " + aStatus));
return;
}
var statusCode = aRequest.QueryInterface(Ci.nsIHttpChannel).responseStatus;
if (Math.floor(statusCode / 100) == 5) {
if (this._subInfo.retries < prefs.getIntPref("http2.maxRetries")) {
this._subInfo.retries++;
var retryAfter = retryAfterParser(aRequest);
this._retryTimeoutID = setTimeout(_ =>
{
this._reject(
{
retry: true,
subInfo: this._subInfo
});
this._service.removeListenerPendingRetry(this);
this._retryTimeoutID = null;
}, retryAfter);
this._service.addListenerPendingRetry(this);
} else {
this._reject(new Error("Unexpected server response: " + statusCode));
}
return;
} else if (statusCode != 201) {
this._reject(new Error("Unexpected server response: " + statusCode));
return;
}
var subscriptionUri;
try {
subscriptionUri = aRequest.getResponseHeader("location");
} catch (err) {
this._reject(new Error("Missing Location header"));
return;
}
console.debug("onStopRequest: subscriptionUri", subscriptionUri);
var linkList;
try {
linkList = aRequest.getResponseHeader("link");
} catch (err) {
this._reject(new Error("Missing Link header"));
return;
}
var linkParserResult;
try {
linkParserResult = linkParser(linkList, this._serverURI);
} catch (e) {
this._reject(e);
return;
}
if (!subscriptionUri) {
this._reject(new Error("Invalid Location header"));
return;
}
try {
let uriTry = Services.io.newURI(subscriptionUri);
} catch (e) {
console.error("onStopRequest: Invalid subscription URI",
subscriptionUri);
this._reject(new Error("Invalid subscription endpoint: " +
subscriptionUri));
return;
}
let reply = new PushRecordHttp2({
subscriptionUri: subscriptionUri,
pushEndpoint: linkParserResult.pushEndpoint,
pushReceiptEndpoint: linkParserResult.pushReceiptEndpoint,
scope: this._subInfo.record.scope,
originAttributes: this._subInfo.record.originAttributes,
systemRecord: this._subInfo.record.systemRecord,
appServerKey: this._subInfo.record.appServerKey,
ctime: Date.now(),
});
this._resolve(reply);
},
abortRetry: function() {
if (this._retryTimeoutID != null) {
clearTimeout(this._retryTimeoutID);
this._retryTimeoutID = null;
} else {
console.debug("SubscriptionListener.abortRetry: aborting non-existent retry?");
}
},
};
function retryAfterParser(aRequest) {
var retryAfter = 0;
try {
var retryField = aRequest.getResponseHeader("retry-after");
if (isNaN(retryField)) {
retryAfter = Date.parse(retryField) - (new Date().getTime());
} else {
retryAfter = parseInt(retryField, 10) * 1000;
}
retryAfter = (retryAfter > 0) ? retryAfter : 0;
} catch(e) {}
return retryAfter;
}
function linkParser(linkHeader, serverURI) {
var linkList = linkHeader.split(',');
if ((linkList.length < 1)) {
throw new Error("Invalid Link header");
}
var pushEndpoint;
var pushReceiptEndpoint;
linkList.forEach(link => {
var linkElems = link.split(';');
if (linkElems.length == 2) {
if (linkElems[1].trim() === 'rel="urn:ietf:params:push"') {
pushEndpoint = linkElems[0].substring(linkElems[0].indexOf('<') + 1,
linkElems[0].indexOf('>'));
} else if (linkElems[1].trim() === 'rel="urn:ietf:params:push:receipt"') {
pushReceiptEndpoint = linkElems[0].substring(linkElems[0].indexOf('<') + 1,
linkElems[0].indexOf('>'));
}
}
});
console.debug("linkParser: pushEndpoint", pushEndpoint);
console.debug("linkParser: pushReceiptEndpoint", pushReceiptEndpoint);
// Missing pushReceiptEndpoint is allowed.
if (!pushEndpoint) {
throw new Error("Missing push endpoint");
}
var pushURI = Services.io.newURI(pushEndpoint, null, serverURI);
var pushReceiptURI;
if (pushReceiptEndpoint) {
pushReceiptURI = Services.io.newURI(pushReceiptEndpoint, null,
serverURI);
}
return {
pushEndpoint: pushURI.spec,
pushReceiptEndpoint: (pushReceiptURI) ? pushReceiptURI.spec : "",
};
}
/**
* The implementation of the WebPush.
*/
this.PushServiceHttp2 = {
_mainPushService: null,
_serverURI: null,
// Keep information about all connections, e.g. the channel, listener...
_conns: {},
_started: false,
// Set of SubscriptionListeners that are pending a subscription retry attempt.
_listenersPendingRetry: new Set(),
newPushDB: function() {
return new PushDB(kPUSHHTTP2DB_DB_NAME,
kPUSHHTTP2DB_DB_VERSION,
kPUSHHTTP2DB_STORE_NAME,
"subscriptionUri",
PushRecordHttp2);
},
hasmainPushService: function() {
return this._mainPushService !== null;
},
validServerURI: function(serverURI) {
if (serverURI.scheme == "http") {
return !!prefs.getBoolPref("testing.allowInsecureServerURL", false);
}
return serverURI.scheme == "https";
},
connect: function(subscriptions) {
this.startConnections(subscriptions);
},
isConnected: function() {
return this._mainPushService != null;
},
disconnect: function() {
this._shutdownConnections(false);
},
_makeChannel: function(aUri) {
var chan = NetUtil.newChannel({uri: aUri, loadUsingSystemPrincipal: true})
.QueryInterface(Ci.nsIHttpChannel);
var loadGroup = Cc["@mozilla.org/network/load-group;1"]
.createInstance(Ci.nsILoadGroup);
chan.loadGroup = loadGroup;
return chan;
},
/**
* Subscribe new resource.
*/
register: function(aRecord) {
console.debug("subscribeResource()");
return this._subscribeResourceInternal({
record: aRecord,
retries: 0
})
.then(result =>
PushCrypto.generateKeys()
.then(([publicKey, privateKey]) => {
result.p256dhPublicKey = publicKey;
result.p256dhPrivateKey = privateKey;
result.authenticationSecret = PushCrypto.generateAuthenticationSecret();
this._conns[result.subscriptionUri] = {
channel: null,
listener: null,
countUnableToConnect: 0,
lastStartListening: 0,
retryTimerID: 0,
};
this._listenForMsgs(result.subscriptionUri);
return result;
})
);
},
_subscribeResourceInternal: function(aSubInfo) {
console.debug("subscribeResourceInternal()");
return new Promise((resolve, reject) => {
var listener = new SubscriptionListener(aSubInfo,
resolve,
reject,
this._serverURI,
this);
var chan = this._makeChannel(this._serverURI.spec);
chan.requestMethod = "POST";
chan.asyncOpen2(listener);
})
.catch(err => {
if ("retry" in err) {
return this._subscribeResourceInternal(err.subInfo);
} else {
throw err;
}
})
},
_deleteResource: function(aUri) {
return new Promise((resolve,reject) => {
var chan = this._makeChannel(aUri);
chan.requestMethod = "DELETE";
chan.asyncOpen2(new PushServiceDelete(resolve, reject));
});
},
/**
* Unsubscribe the resource with a subscription uri aSubscriptionUri.
* We can't do anything about it if it fails, so we don't listen for response.
*/
_unsubscribeResource: function(aSubscriptionUri) {
console.debug("unsubscribeResource()");
return this._deleteResource(aSubscriptionUri);
},
/**
* Start listening for messages.
*/
_listenForMsgs: function(aSubscriptionUri) {
console.debug("listenForMsgs()", aSubscriptionUri);
if (!this._conns[aSubscriptionUri]) {
console.warn("listenForMsgs: We do not have this subscription",
aSubscriptionUri);
return;
}
var chan = this._makeChannel(aSubscriptionUri);
var conn = {};
conn.channel = chan;
var listener = new PushSubscriptionListener(this, aSubscriptionUri);
conn.listener = listener;
chan.notificationCallbacks = listener;
try {
chan.asyncOpen2(listener);
} catch (e) {
console.error("listenForMsgs: Error connecting to push server.",
"asyncOpen2 failed", e);
conn.listener.disconnect();
chan.cancel(Cr.NS_ERROR_ABORT);
this._retryAfterBackoff(aSubscriptionUri, -1);
return;
}
this._conns[aSubscriptionUri].lastStartListening = Date.now();
this._conns[aSubscriptionUri].channel = conn.channel;
this._conns[aSubscriptionUri].listener = conn.listener;
},
_ackMsgRecv: function(aAckUri) {
console.debug("ackMsgRecv()", aAckUri);
return this._deleteResource(aAckUri);
},
init: function(aOptions, aMainPushService, aServerURL) {
console.debug("init()");
this._mainPushService = aMainPushService;
this._serverURI = aServerURL;
return Promise.resolve();
},
_retryAfterBackoff: function(aSubscriptionUri, retryAfter) {
console.debug("retryAfterBackoff()");
var resetRetryCount = prefs.getIntPref("http2.reset_retry_count_after_ms");
// If it was running for some time, reset retry counter.
if ((Date.now() - this._conns[aSubscriptionUri].lastStartListening) >
resetRetryCount) {
this._conns[aSubscriptionUri].countUnableToConnect = 0;
}
let maxRetries = prefs.getIntPref("http2.maxRetries");
if (this._conns[aSubscriptionUri].countUnableToConnect >= maxRetries) {
this._shutdownSubscription(aSubscriptionUri);
this._resubscribe(aSubscriptionUri);
return;
}
if (retryAfter !== -1) {
// This is a 5xx response.
this._conns[aSubscriptionUri].countUnableToConnect++;
this._conns[aSubscriptionUri].retryTimerID =
setTimeout(_ => this._listenForMsgs(aSubscriptionUri), retryAfter);
return;
}
retryAfter = prefs.getIntPref("http2.retryInterval") *
Math.pow(2, this._conns[aSubscriptionUri].countUnableToConnect);
retryAfter = retryAfter * (0.8 + Math.random() * 0.4); // add +/-20%.
this._conns[aSubscriptionUri].countUnableToConnect++;
this._conns[aSubscriptionUri].retryTimerID =
setTimeout(_ => this._listenForMsgs(aSubscriptionUri), retryAfter);
console.debug("retryAfterBackoff: Retry in", retryAfter);
},
// Close connections.
_shutdownConnections: function(deleteInfo) {
console.debug("shutdownConnections()");
for (let subscriptionUri in this._conns) {
if (this._conns[subscriptionUri]) {
if (this._conns[subscriptionUri].listener) {
this._conns[subscriptionUri].listener._pushService = null;
}
if (this._conns[subscriptionUri].channel) {
try {
this._conns[subscriptionUri].channel.cancel(Cr.NS_ERROR_ABORT);
} catch (e) {}
}
this._conns[subscriptionUri].listener = null;
this._conns[subscriptionUri].channel = null;
if (this._conns[subscriptionUri].retryTimerID > 0) {
clearTimeout(this._conns[subscriptionUri].retryTimerID);
}
if (deleteInfo) {
delete this._conns[subscriptionUri];
}
}
}
},
// Start listening if subscriptions present.
startConnections: function(aSubscriptions) {
console.debug("startConnections()", aSubscriptions.length);
for (let i = 0; i < aSubscriptions.length; i++) {
let record = aSubscriptions[i];
this._mainPushService.ensureCrypto(record).then(record => {
this._startSingleConnection(record);
}, error => {
console.error("startConnections: Error updating record",
record.keyID, error);
});
}
},
_startSingleConnection: function(record) {
console.debug("_startSingleConnection()");
if (typeof this._conns[record.subscriptionUri] != "object") {
this._conns[record.subscriptionUri] = {channel: null,
listener: null,
countUnableToConnect: 0,
retryTimerID: 0};
}
if (!this._conns[record.subscriptionUri].conn) {
this._listenForMsgs(record.subscriptionUri);
}
},
// Close connection and notify apps that subscription are gone.
_shutdownSubscription: function(aSubscriptionUri) {
console.debug("shutdownSubscriptions()");
if (typeof this._conns[aSubscriptionUri] == "object") {
if (this._conns[aSubscriptionUri].listener) {
this._conns[aSubscriptionUri].listener._pushService = null;
}
if (this._conns[aSubscriptionUri].channel) {
try {
this._conns[aSubscriptionUri].channel.cancel(Cr.NS_ERROR_ABORT);
} catch (e) {}
}
delete this._conns[aSubscriptionUri];
}
},
uninit: function() {
console.debug("uninit()");
this._abortPendingSubscriptionRetries();
this._shutdownConnections(true);
this._mainPushService = null;
},
_abortPendingSubscriptionRetries: function() {
this._listenersPendingRetry.forEach((listener) => listener.abortRetry());
this._listenersPendingRetry.clear();
},
unregister: function(aRecord) {
this._shutdownSubscription(aRecord.subscriptionUri);
return this._unsubscribeResource(aRecord.subscriptionUri);
},
reportDeliveryError: function(messageID, reason) {
console.warn("reportDeliveryError: Ignoring message delivery error",
messageID, reason);
},
/** Push server has deleted subscription.
* Re-subscribe - if it succeeds send update db record and send
* pushsubscriptionchange,
* - on error delete record and send pushsubscriptionchange
* TODO: maybe pushsubscriptionerror will be included.
*/
_resubscribe: function(aSubscriptionUri) {
this._mainPushService.getByKeyID(aSubscriptionUri)
.then(record => this.register(record)
.then(recordNew => {
if (this._mainPushService) {
this._mainPushService
.updateRegistrationAndNotifyApp(aSubscriptionUri, recordNew)
.catch(Cu.reportError);
}
}, error => {
if (this._mainPushService) {
this._mainPushService
.dropRegistrationAndNotifyApp(aSubscriptionUri)
.catch(Cu.reportError);
}
})
);
},
connOnStop: function(aRequest, aSuccess,
aSubscriptionUri) {
console.debug("connOnStop() succeeded", aSuccess);
var conn = this._conns[aSubscriptionUri];
if (!conn) {
// there is no connection description that means that we closed
// connection, so do nothing. But we should have already deleted
// the listener.
return;
}
conn.channel = null;
conn.listener = null;
if (!aSuccess) {
this._retryAfterBackoff(aSubscriptionUri, -1);
} else if (Math.floor(aRequest.responseStatus / 100) == 5) {
var retryAfter = retryAfterParser(aRequest);
this._retryAfterBackoff(aSubscriptionUri, retryAfter);
} else if (Math.floor(aRequest.responseStatus / 100) == 4) {
this._shutdownSubscription(aSubscriptionUri);
this._resubscribe(aSubscriptionUri);
} else if (Math.floor(aRequest.responseStatus / 100) == 2) { // This should be 204
setTimeout(_ => this._listenForMsgs(aSubscriptionUri), 0);
} else {
this._retryAfterBackoff(aSubscriptionUri, -1);
}
},
addListenerPendingRetry: function(aListener) {
this._listenersPendingRetry.add(aListener);
},
removeListenerPendingRetry: function(aListener) {
if (!this._listenersPendingRetry.remove(aListener)) {
console.debug("removeListenerPendingRetry: listener not in list?");
}
},
_pushChannelOnStop: function(aUri, aAckUri, aHeaders, aMessage) {
console.debug("pushChannelOnStop()");
this._mainPushService.receivedPushMessage(
aUri, "", aHeaders, aMessage, record => {
// Always update the stored record.
return record;
}
)
.then(_ => this._ackMsgRecv(aAckUri))
.catch(err => {
console.error("pushChannelOnStop: Error receiving message",
err);
});
},
};
function PushRecordHttp2(record) {
PushRecord.call(this, record);
this.subscriptionUri = record.subscriptionUri;
this.pushReceiptEndpoint = record.pushReceiptEndpoint;
}
PushRecordHttp2.prototype = Object.create(PushRecord.prototype, {
keyID: {
get() {
return this.subscriptionUri;
},
},
});
PushRecordHttp2.prototype.toSubscription = function() {
let subscription = PushRecord.prototype.toSubscription.call(this);
subscription.pushReceiptEndpoint = this.pushReceiptEndpoint;
return subscription;
};