mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-11-30 00:01:50 +00:00
Bug 1120380 - Update the retention logic for archived pings. r=gfritzsche
This commit is contained in:
parent
ef942e6027
commit
2a044e1348
@ -60,13 +60,6 @@ this.TelemetryArchive = {
|
||||
promiseArchivePing: function(ping) {
|
||||
return TelemetryArchiveImpl.promiseArchivePing(ping);
|
||||
},
|
||||
|
||||
/**
|
||||
* Used in tests only to fake a restart of the module.
|
||||
*/
|
||||
_testReset: function() {
|
||||
TelemetryArchiveImpl._testReset();
|
||||
},
|
||||
};
|
||||
|
||||
/**
|
||||
@ -81,12 +74,6 @@ function shouldArchivePings() {
|
||||
let TelemetryArchiveImpl = {
|
||||
_logger: null,
|
||||
|
||||
// Tracks the archived pings in a Map of (id -> {timestampCreated, type}).
|
||||
// We use this to cache info on archived pings to avoid scanning the disk more than once.
|
||||
_archivedPings: new Map(),
|
||||
// Whether we already scanned the archived pings on disk.
|
||||
_scannedArchiveDirectory: false,
|
||||
|
||||
get _log() {
|
||||
if (!this._logger) {
|
||||
this._logger = Log.repository.getLoggerWithMessagePrefix(LOGGER_NAME, LOGGER_PREFIX);
|
||||
@ -95,11 +82,6 @@ let TelemetryArchiveImpl = {
|
||||
return this._logger;
|
||||
},
|
||||
|
||||
_testReset: function() {
|
||||
this._archivedPings = new Map();
|
||||
this._scannedArchiveDirectory = false;
|
||||
},
|
||||
|
||||
promiseArchivePing: function(ping) {
|
||||
if (!shouldArchivePings()) {
|
||||
this._log.trace("promiseArchivePing - archiving is disabled");
|
||||
@ -113,27 +95,11 @@ let TelemetryArchiveImpl = {
|
||||
}
|
||||
}
|
||||
|
||||
const creationDate = new Date(ping.creationDate);
|
||||
if (this._archivedPings.has(ping.id)) {
|
||||
const data = this._archivedPings.get(ping.id);
|
||||
if (data.timestampCreated > creationDate.getTime()) {
|
||||
this._log.error("promiseArchivePing - trying to overwrite newer ping with the same id");
|
||||
return Promise.reject(new Error("trying to overwrite newer ping with the same id"));
|
||||
} else {
|
||||
this._log.warn("promiseArchivePing - overwriting older ping with the same id");
|
||||
}
|
||||
}
|
||||
|
||||
this._archivedPings.set(ping.id, {
|
||||
timestampCreated: creationDate.getTime(),
|
||||
type: ping.type,
|
||||
});
|
||||
|
||||
return TelemetryStorage.saveArchivedPing(ping);
|
||||
},
|
||||
|
||||
_buildArchivedPingList: function() {
|
||||
let list = [for (p of this._archivedPings) {
|
||||
_buildArchivedPingList: function(archivedPingsMap) {
|
||||
let list = [for (p of archivedPingsMap) {
|
||||
id: p[0],
|
||||
timestampCreated: p[1].timestampCreated,
|
||||
type: p[1].type,
|
||||
@ -147,33 +113,13 @@ let TelemetryArchiveImpl = {
|
||||
promiseArchivedPingList: function() {
|
||||
this._log.trace("promiseArchivedPingList");
|
||||
|
||||
if (this._scannedArchiveDirectory) {
|
||||
return Promise.resolve(this._buildArchivedPingList())
|
||||
}
|
||||
|
||||
return TelemetryStorage.loadArchivedPingList().then((loadedInfo) => {
|
||||
// Add the ping info from scanning to the existing info.
|
||||
// We might have pings added before lazily loading this list.
|
||||
for (let [id, info] of loadedInfo) {
|
||||
this._archivedPings.set(id, {
|
||||
timestampCreated: info.timestampCreated,
|
||||
type: info.type,
|
||||
});
|
||||
}
|
||||
|
||||
this._scannedArchiveDirectory = true;
|
||||
return this._buildArchivedPingList();
|
||||
return TelemetryStorage.loadArchivedPingList().then(loadedInfo => {
|
||||
return this._buildArchivedPingList(loadedInfo);
|
||||
});
|
||||
},
|
||||
|
||||
promiseArchivedPingById: function(id) {
|
||||
this._log.trace("promiseArchivedPingById - id: " + id);
|
||||
const data = this._archivedPings.get(id);
|
||||
if (!data) {
|
||||
this._log.trace("promiseArchivedPingById - no ping with id: " + id);
|
||||
return Promise.reject(new Error("TelemetryArchive.promiseArchivedPingById - no ping with id " + id));
|
||||
}
|
||||
|
||||
return TelemetryStorage.loadArchivedPing(id, data.timestampCreated, data.type);
|
||||
return TelemetryStorage.loadArchivedPing(id);
|
||||
},
|
||||
};
|
||||
|
@ -174,6 +174,7 @@ this.TelemetryController = Object.freeze({
|
||||
*/
|
||||
reset: function() {
|
||||
Impl._clientID = null;
|
||||
TelemetryStorage.reset();
|
||||
return this.setup();
|
||||
},
|
||||
/**
|
||||
@ -1067,6 +1068,10 @@ let Impl = {
|
||||
this._clientID = yield ClientID.getClientID();
|
||||
Preferences.set(PREF_CACHED_CLIENTID, this._clientID);
|
||||
|
||||
// Purge the pings archive by removing outdated pings. We don't wait for this
|
||||
// task to complete, but TelemetryStorage blocks on it during shutdown.
|
||||
TelemetryStorage.runCleanPingArchiveTask();
|
||||
|
||||
Telemetry.asyncFetchTelemetryData(function () {});
|
||||
this._delayedInitTaskDeferred.resolve();
|
||||
} catch (e) {
|
||||
|
@ -17,6 +17,7 @@ Cu.import("resource://gre/modules/Services.jsm", this);
|
||||
Cu.import("resource://gre/modules/XPCOMUtils.jsm", this);
|
||||
Cu.import("resource://gre/modules/osfile.jsm", this);
|
||||
Cu.import("resource://gre/modules/Task.jsm", this);
|
||||
Cu.import("resource://gre/modules/TelemetryUtils.jsm", this);
|
||||
Cu.import("resource://gre/modules/Promise.jsm", this);
|
||||
|
||||
XPCOMUtils.defineLazyModuleGetter(this, 'Deprecated',
|
||||
@ -52,6 +53,9 @@ const OVERDUE_PING_FILE_AGE = 7 * 24 * 60 * 60 * 1000; // 1 week
|
||||
// Maximum number of pings to save.
|
||||
const MAX_LRU_PINGS = 50;
|
||||
|
||||
// Maxmimum time, in milliseconds, archive pings should be retained.
|
||||
const MAX_ARCHIVED_PINGS_RETENTION_MS = 180 * 24 * 60 * 60 * 1000; // 180 days
|
||||
|
||||
// The number of outstanding saved pings that we have issued loading
|
||||
// requests for.
|
||||
let pingsLoaded = 0;
|
||||
@ -69,6 +73,13 @@ let pendingPings = [];
|
||||
|
||||
let isPingDirectoryCreated = false;
|
||||
|
||||
/**
|
||||
* This is a policy object used to override behavior for testing.
|
||||
*/
|
||||
let Policy = {
|
||||
now: () => new Date(),
|
||||
};
|
||||
|
||||
this.TelemetryStorage = {
|
||||
get MAX_PING_FILE_AGE() {
|
||||
return MAX_PING_FILE_AGE;
|
||||
@ -109,12 +120,34 @@ this.TelemetryStorage = {
|
||||
* Load an archived ping from disk.
|
||||
*
|
||||
* @param {string} id The pings id.
|
||||
* @param {number} timestampCreated The pings creation timestamp.
|
||||
* @param {string} type The pings type.
|
||||
* @return {promise<object>} Promise that is resolved with the ping data.
|
||||
*/
|
||||
loadArchivedPing: function(id, timestampCreated, type) {
|
||||
return TelemetryStorageImpl.loadArchivedPing(id, timestampCreated, type);
|
||||
loadArchivedPing: function(id) {
|
||||
return TelemetryStorageImpl.loadArchivedPing(id);
|
||||
},
|
||||
|
||||
/**
|
||||
* Clean the pings archive by removing old pings.
|
||||
* This will scan the archive directory.
|
||||
*
|
||||
* @return {Promise} Resolved when the cleanup task completes.
|
||||
*/
|
||||
runCleanPingArchiveTask: function() {
|
||||
return TelemetryStorageImpl.runCleanPingArchiveTask();
|
||||
},
|
||||
|
||||
/**
|
||||
* Reset the storage state in tests.
|
||||
*/
|
||||
reset: function() {
|
||||
return TelemetryStorageImpl.reset();
|
||||
},
|
||||
|
||||
/**
|
||||
* Test method that allows waiting on the archive clean task to finish.
|
||||
*/
|
||||
testCleanupTaskPromise: function() {
|
||||
return (TelemetryStorageImpl._archiveCleanTask || Promise.resolve());
|
||||
},
|
||||
|
||||
/**
|
||||
@ -398,6 +431,19 @@ let TelemetryStorageImpl = {
|
||||
// Used to serialize aborted session ping writes to disk.
|
||||
_abortedSessionSerializer: new SaveSerializer(),
|
||||
|
||||
// Tracks the archived pings in a Map of (id -> {timestampCreated, type}).
|
||||
// We use this to cache info on archived pings to avoid scanning the disk more than once.
|
||||
_archivedPings: new Map(),
|
||||
// Track the archive loading task to prevent multiple tasks from being executed.
|
||||
_archiveCleanTaskArchiveLoadingTask: null,
|
||||
// Track the archive cleanup task.
|
||||
_archiveCleanTask: null,
|
||||
// Whether we already scanned the archived pings on disk.
|
||||
_scannedArchiveDirectory: false,
|
||||
|
||||
// Track the shutdown process to bail out of the clean up task quickly.
|
||||
_shutdown: false,
|
||||
|
||||
get _log() {
|
||||
if (!this._logger) {
|
||||
this._logger = Log.repository.getLoggerWithMessagePrefix(LOGGER_NAME, LOGGER_PREFIX);
|
||||
@ -412,7 +458,11 @@ let TelemetryStorageImpl = {
|
||||
* @return {Promise} Promise that is resolved when shutdown is complete.
|
||||
*/
|
||||
shutdown: Task.async(function*() {
|
||||
this._shutdown = true;
|
||||
yield this._abortedSessionSerializer.flushTasks();
|
||||
// If the archive cleaning task is running, block on it. It should bail out as soon
|
||||
// as possible.
|
||||
yield this._archiveCleanTask;
|
||||
}),
|
||||
|
||||
/**
|
||||
@ -423,24 +473,44 @@ let TelemetryStorageImpl = {
|
||||
*/
|
||||
saveArchivedPing: Task.async(function*(ping) {
|
||||
const creationDate = new Date(ping.creationDate);
|
||||
if (this._archivedPings.has(ping.id)) {
|
||||
const data = this._archivedPings.get(ping.id);
|
||||
if (data.timestampCreated > creationDate.getTime()) {
|
||||
this._log.error("saveArchivedPing - trying to overwrite newer ping with the same id");
|
||||
return Promise.reject(new Error("trying to overwrite newer ping with the same id"));
|
||||
} else {
|
||||
this._log.warn("saveArchivedPing - overwriting older ping with the same id");
|
||||
}
|
||||
}
|
||||
|
||||
// Get the archived ping path and append the lz4 suffix to it (so we have 'jsonlz4').
|
||||
const filePath = getArchivedPingPath(ping.id, creationDate, ping.type) + "lz4";
|
||||
yield OS.File.makeDir(OS.Path.dirname(filePath), { ignoreExisting: true,
|
||||
from: OS.Constants.Path.profileDir });
|
||||
yield this.savePingToFile(ping, filePath, /*overwrite*/ true, /*compressed*/ true);
|
||||
|
||||
this._archivedPings.set(ping.id, {
|
||||
timestampCreated: creationDate.getTime(),
|
||||
type: ping.type,
|
||||
});
|
||||
}),
|
||||
|
||||
/**
|
||||
* Load an archived ping from disk.
|
||||
*
|
||||
* @param {string} id The pings id.
|
||||
* @param {number} timestampCreated The pings creation timestamp.
|
||||
* @param {string} type The pings type.
|
||||
* @return {promise<object>} Promise that is resolved with the ping data.
|
||||
*/
|
||||
loadArchivedPing: Task.async(function*(id, timestampCreated, type) {
|
||||
this._log.trace("loadArchivedPing - id: " + id + ", timestampCreated: " + timestampCreated + ", type: " + type);
|
||||
const path = getArchivedPingPath(id, new Date(timestampCreated), type);
|
||||
loadArchivedPing: Task.async(function*(id) {
|
||||
this._log.trace("loadArchivedPing - id: " + id);
|
||||
|
||||
const data = this._archivedPings.get(id);
|
||||
if (!data) {
|
||||
this._log.trace("loadArchivedPing - no ping with id: " + id);
|
||||
return Promise.reject(new Error("TelemetryStorage.loadArchivedPing - no ping with id " + id));
|
||||
}
|
||||
|
||||
const path = getArchivedPingPath(id, new Date(data.timestampCreated), data.type);
|
||||
const pathCompressed = path + "lz4";
|
||||
|
||||
try {
|
||||
@ -472,6 +542,95 @@ let TelemetryStorageImpl = {
|
||||
yield OS.File.remove(pathCompressed, {ignoreAbsent: true});
|
||||
}),
|
||||
|
||||
/**
|
||||
* Clean the pings archive by removing old pings.
|
||||
*
|
||||
* @return {Promise} Resolved when the cleanup task completes.
|
||||
*/
|
||||
runCleanPingArchiveTask: function() {
|
||||
// If there's an archive cleaning task already running, return it.
|
||||
if (this._archiveCleanTask) {
|
||||
return this._archiveCleanTask;
|
||||
}
|
||||
|
||||
// Make sure to clear |_archiveCleanTask| once done.
|
||||
let clear = () => this._archiveCleanTask = null;
|
||||
// Since there's no archive cleaning task running, start it.
|
||||
this._archiveCleanTask = this.cleanArchiveTask().then(clear, clear);
|
||||
return this._archiveCleanTask;
|
||||
},
|
||||
|
||||
cleanArchiveTask: Task.async(function*() {
|
||||
this._log.trace("cleanArchiveTask");
|
||||
|
||||
if (!(yield OS.File.exists(gPingsArchivePath))) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Policy.now().getTime();
|
||||
let dirIterator = new OS.File.DirectoryIterator(gPingsArchivePath);
|
||||
let subdirs = (yield dirIterator.nextBatch()).filter(e => e.isDir);
|
||||
|
||||
// Keep track of the newest removed month to update the cache, if needed.
|
||||
let newestRemovedMonth = null;
|
||||
|
||||
// Walk through the monthly subdirs of the form <YYYY-MM>/
|
||||
for (let dir of subdirs) {
|
||||
if (this._shutdown) {
|
||||
this._log.trace("cleanArchiveTask - Terminating the clean up task due to shutdown");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isValidArchiveDir(dir.name)) {
|
||||
this._log.warn("cleanArchiveTask - skipping invalidly named subdirectory " + dir.path);
|
||||
continue;
|
||||
}
|
||||
|
||||
const archiveDate = getDateFromArchiveDir(dir.name);
|
||||
if (!archiveDate) {
|
||||
this._log.warn("cleanArchiveTask - skipping invalid subdirectory date " + dir.path);
|
||||
continue;
|
||||
}
|
||||
|
||||
// If this archive directory is older than 180 days, remove it.
|
||||
if (!TelemetryUtils.areTimesClose(archiveDate.getTime(), now,
|
||||
MAX_ARCHIVED_PINGS_RETENTION_MS)) {
|
||||
try {
|
||||
yield OS.File.removeDir(dir.path);
|
||||
|
||||
// Update the newest removed month.
|
||||
if (archiveDate > newestRemovedMonth) {
|
||||
newestRemovedMonth = archiveDate;
|
||||
}
|
||||
} catch (ex) {
|
||||
this._log.error("cleanArchiveTask - Unable to remove " + dir.path, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If the archive directory was already scanned, filter the ping archive cache.
|
||||
if (this._scannedArchiveDirectory && newestRemovedMonth) {
|
||||
// Scan the archive cache for pings older than the newest directory pruned above.
|
||||
for (let [id, info] of this._archivedPings) {
|
||||
const timestampCreated = new Date(info.timestampCreated);
|
||||
if (timestampCreated.getTime() > newestRemovedMonth.getTime()) {
|
||||
continue;
|
||||
}
|
||||
// Remove outdated pings from the cache.
|
||||
this._archivedPings.delete(id);
|
||||
}
|
||||
}
|
||||
}),
|
||||
|
||||
/**
|
||||
* Reset the storage state in tests.
|
||||
*/
|
||||
reset: function() {
|
||||
this._shutdown = false;
|
||||
this._scannedArchiveDirectory = false;
|
||||
this._archivedPings = new Map();
|
||||
},
|
||||
|
||||
/**
|
||||
* Get a list of info on the archived pings.
|
||||
* This will scan the archive directory and grab basic data about the existing
|
||||
@ -479,26 +638,45 @@ let TelemetryStorageImpl = {
|
||||
*
|
||||
* @return {promise<sequence<object>>}
|
||||
*/
|
||||
loadArchivedPingList: Task.async(function*() {
|
||||
this._log.trace("loadArchivedPingList");
|
||||
loadArchivedPingList: function() {
|
||||
// If there's an archive loading task already running, return it.
|
||||
if (this._archiveScanningTask) {
|
||||
return this._archiveScanningTask;
|
||||
}
|
||||
|
||||
if (this._scannedArchiveDirectory) {
|
||||
this._log.trace("loadArchivedPingList - Archive already scanned, hitting cache.");
|
||||
return Promise.resolve(this._archivedPings);
|
||||
}
|
||||
|
||||
// Make sure to clear |_archiveScanningTask| once done.
|
||||
let clear = pingList => {
|
||||
this._archiveScanningTask = null;
|
||||
return pingList;
|
||||
};
|
||||
// Since there's no archive loading task running, start it.
|
||||
this._archiveScanningTask = this._scanArchive().then(clear, clear);
|
||||
return this._archiveScanningTask;
|
||||
},
|
||||
|
||||
_scanArchive: Task.async(function*() {
|
||||
this._log.trace("_scanArchive");
|
||||
|
||||
if (!(yield OS.File.exists(gPingsArchivePath))) {
|
||||
return new Map();
|
||||
}
|
||||
|
||||
let archivedPings = new Map();
|
||||
let dirIterator = new OS.File.DirectoryIterator(gPingsArchivePath);
|
||||
let subdirs = (yield dirIterator.nextBatch()).filter(e => e.isDir);
|
||||
|
||||
// Walk through the monthly subdirs of the form <YYYY-MM>/
|
||||
for (let dir of subdirs) {
|
||||
const dirRegEx = /^[0-9]{4}-[0-9]{2}$/;
|
||||
if (!dirRegEx.test(dir.name)) {
|
||||
this._log.warn("loadArchivedPingList - skipping invalidly named subdirectory " + dir.path);
|
||||
if (!isValidArchiveDir(dir.name)) {
|
||||
this._log.warn("_scanArchive - skipping invalidly named subdirectory " + dir.path);
|
||||
continue;
|
||||
}
|
||||
|
||||
this._log.trace("loadArchivedPingList - checking in subdir: " + dir.path);
|
||||
this._log.trace("_scanArchive - checking in subdir: " + dir.path);
|
||||
let pingIterator = new OS.File.DirectoryIterator(dir.path);
|
||||
let pings = (yield pingIterator.nextBatch()).filter(e => !e.isDir);
|
||||
|
||||
@ -511,26 +689,28 @@ let TelemetryStorageImpl = {
|
||||
}
|
||||
|
||||
// In case of conflicts, overwrite only with newer pings.
|
||||
if (archivedPings.has(data.id)) {
|
||||
const overwrite = data.timestamp > archivedPings.get(data.id).timestampCreated;
|
||||
this._log.warn("loadArchivedPingList - have seen this id before: " + data.id +
|
||||
if (this._archivedPings.has(data.id)) {
|
||||
const overwrite = data.timestamp > this._archivedPings.get(data.id).timestampCreated;
|
||||
this._log.warn("_scanArchive - have seen this id before: " + data.id +
|
||||
", overwrite: " + overwrite);
|
||||
if (!overwrite) {
|
||||
continue;
|
||||
}
|
||||
|
||||
yield this._removeArchivedPing(data.id, data.timestampCreated, data.type)
|
||||
.catch((e) => this._log.warn("loadArchivedPingList - failed to remove ping", e));
|
||||
.catch((e) => this._log.warn("_scanArchive - failed to remove ping", e));
|
||||
}
|
||||
|
||||
archivedPings.set(data.id, {
|
||||
this._archivedPings.set(data.id, {
|
||||
timestampCreated: data.timestamp,
|
||||
type: data.type,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return archivedPings;
|
||||
// Mark the archive as scanned, so we no longer hit the disk.
|
||||
this._scannedArchiveDirectory = true;
|
||||
return this._archivedPings;
|
||||
}),
|
||||
|
||||
/**
|
||||
@ -915,3 +1095,30 @@ function getArchivedPingPath(aPingId, aDate, aType) {
|
||||
let fileName = [aDate.getTime(), aPingId, aType, "json"].join(".");
|
||||
return OS.Path.join(archivedPingDir, fileName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a directory name is in the "YYYY-MM" format.
|
||||
* @param {String} aDirName The name of the pings archive directory.
|
||||
* @return {Boolean} True if the directory name is in the right format, false otherwise.
|
||||
*/
|
||||
function isValidArchiveDir(aDirName) {
|
||||
const dirRegEx = /^[0-9]{4}-[0-9]{2}$/;
|
||||
return dirRegEx.test(aDirName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a date object from an archive directory name.
|
||||
* @param {String} aDirName The name of the pings archive directory. Must be in the YYYY-MM
|
||||
* format.
|
||||
* @return {Object} A Date object or null if the dir name is not valid.
|
||||
*/
|
||||
function getDateFromArchiveDir(aDirName) {
|
||||
let [year, month] = aDirName.split("-");
|
||||
year = parseInt(year);
|
||||
month = parseInt(month);
|
||||
// Make sure to have sane numbers.
|
||||
if (!Number.isFinite(month) || !Number.isFinite(year) || month < 1 || month > 12) {
|
||||
return null;
|
||||
}
|
||||
return new Date(year, month - 1, 1, 0, 0, 0);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user