mirror of
https://github.com/Heretek-AI/openclaw.git
synced 2026-07-01 22:34:00 -04:00
feat(curiosity-engine): upgrade anomaly-detector to ESM with better-sqlite3, Phase 2 temporal clustering, auto-correlation, cascading failure detection, 7-day baseline deviation
This commit is contained in:
+360
-427
@@ -1,478 +1,411 @@
|
||||
#!/usr/bin/env node
|
||||
import fs from "fs";
|
||||
import path from "path";
|
||||
import { fileURLToPath } from "url";
|
||||
import Database from "better-sqlite3";
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
/**
|
||||
* Anomaly Detector Module - Phase 2: Anomaly Enhancement
|
||||
*
|
||||
* Monitors error logs, rate limits, failures with advanced pattern detection.
|
||||
* Implements temporal clustering, severity scoring, and baseline deviation analysis.
|
||||
*
|
||||
* @module anomaly-detector
|
||||
*/
|
||||
|
||||
const fs = require("fs");
|
||||
const path = require("path");
|
||||
const sqlite3 = require("sqlite3").verbose();
|
||||
|
||||
// Configuration
|
||||
const WORKSPACE = process.env.WORKSPACE || path.join(process.env.HOME, ".openclaw/workspace");
|
||||
const LOG_DIR = path.join(WORKSPACE, "logs");
|
||||
const CURIOSITY_DIR = path.join(WORKSPACE, ".curiosity");
|
||||
const ANOMALY_DB = path.join(CURIOSITY_DIR, "anomalies.db");
|
||||
const WINDOWS = { short: 5 * 60 * 1000, medium: 60 * 60 * 1000, long: 24 * 60 * 60 * 1000 };
|
||||
const ERROR_TYPE_WEIGHTS = {
|
||||
memory_pressure: 2.5,
|
||||
disk_space: 2.5,
|
||||
auth_failure: 2.0,
|
||||
network: 1.8,
|
||||
ratelimit: 1.5,
|
||||
timeout: 1.5,
|
||||
unknown: 0.5,
|
||||
};
|
||||
if (!fs.existsSync(CURIOSITY_DIR)) fs.mkdirSync(CURIOSITY_DIR, { recursive: true });
|
||||
function initDB() {
|
||||
const db = new Database(ANOMALY_DB);
|
||||
db.exec(
|
||||
"CREATE TABLE IF NOT EXISTS anomalies (id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT DEFAULT (datetime('now')), source TEXT NOT NULL, error_type TEXT, count INTEGER DEFAULT 1, severity TEXT DEFAULT 'low', score REAL DEFAULT 0, processed INTEGER DEFAULT 0);CREATE TABLE IF NOT EXISTS error_chains (id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT DEFAULT (datetime('now')), chain TEXT NOT NULL, occurrences INTEGER DEFAULT 1, avg_interval_ms INTEGER DEFAULT 0, confidence REAL DEFAULT 0);CREATE TABLE IF NOT EXISTS hourly_error_counts (id INTEGER PRIMARY KEY AUTOINCREMENT, hour_timestamp TEXT NOT NULL, error_type TEXT NOT NULL, count INTEGER DEFAULT 0, UNIQUE(hour_timestamp, error_type));CREATE TABLE IF NOT EXISTS daily_error_counts (id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT NOT NULL, error_type TEXT NOT NULL, total_count INTEGER DEFAULT 0, UNIQUE(date, error_type));CREATE INDEX IF NOT EXISTS idx_anomalies_timestamp ON anomalies(timestamp);CREATE INDEX IF NOT EXISTS idx_anomalies_error_type ON anomalies(error_type);CREATE INDEX IF NOT EXISTS idx_chains_timestamp ON error_chains(timestamp);",
|
||||
);
|
||||
return db;
|
||||
|
||||
// Ensure directories exist
|
||||
if (!fs.existsSync(CURIOSITY_DIR)) {
|
||||
fs.mkdirSync(CURIOSITY_DIR, { recursive: true });
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize anomaly database
|
||||
*/
|
||||
function initDB() {
|
||||
return new Promise((resolve, reject) => {
|
||||
const db = new sqlite3.Database(ANOMALY_DB, (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
db.run(
|
||||
`
|
||||
CREATE TABLE IF NOT EXISTS anomalies (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
|
||||
source TEXT NOT NULL,
|
||||
error_type TEXT,
|
||||
count INTEGER DEFAULT 1,
|
||||
severity TEXT DEFAULT 'low',
|
||||
score REAL DEFAULT 0,
|
||||
processed INTEGER DEFAULT 0
|
||||
)
|
||||
`,
|
||||
(err) => {
|
||||
if (err) reject(err);
|
||||
else resolve(db);
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Scan log files for error patterns
|
||||
* @returns {Array} Array of error entries
|
||||
*/
|
||||
function scanLogFiles() {
|
||||
const errors = [];
|
||||
if (!fs.existsSync(LOG_DIR)) return errors;
|
||||
for (const logFile of fs.readdirSync(LOG_DIR).filter((f) => f.endsWith(".log"))) {
|
||||
|
||||
if (!fs.existsSync(LOG_DIR)) {
|
||||
return errors;
|
||||
}
|
||||
|
||||
const logFiles = fs.readdirSync(LOG_DIR).filter((f) => f.endsWith(".log"));
|
||||
|
||||
logFiles.forEach((logFile) => {
|
||||
const logPath = path.join(LOG_DIR, logFile);
|
||||
try {
|
||||
const content = fs.readFileSync(path.join(LOG_DIR, logFile), "utf8");
|
||||
for (const line of content.split("\n")) {
|
||||
if (isErrorLine(line))
|
||||
const content = fs.readFileSync(logPath, "utf8");
|
||||
const lines = content.split("\n");
|
||||
|
||||
lines.forEach((line) => {
|
||||
if (isErrorLine(line)) {
|
||||
errors.push({
|
||||
source: logFile,
|
||||
line,
|
||||
timestamp: extractTimestamp(line),
|
||||
type: classifyError(line),
|
||||
raw: line.slice(0, 200),
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
console.error("Error reading:", logFile, e.message);
|
||||
}
|
||||
});
|
||||
} catch (err) {
|
||||
console.error("Error reading log file:", logFile, err.message);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return errors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a log line represents an error
|
||||
* @param {string} line - Log line
|
||||
* @returns {boolean} True if error
|
||||
*/
|
||||
function isErrorLine(line) {
|
||||
return /error|warn|fail|timeout|ETIMEDOUT|429|rate.limit|401|403|unauthorized|exception|critical|fatal|panic/i.test(
|
||||
line,
|
||||
);
|
||||
const errorPatterns = [
|
||||
/error/i,
|
||||
/fail/i,
|
||||
/timeout/i,
|
||||
/ETIMEDOUT/i,
|
||||
/429/i,
|
||||
/rate.?limit/i,
|
||||
/401/i,
|
||||
/403/i,
|
||||
/unauthorized/i,
|
||||
/exception/i,
|
||||
/critical/i,
|
||||
];
|
||||
|
||||
return errorPatterns.some((pattern) => pattern.test(line));
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract timestamp from log line
|
||||
* @param {string} line - Log line
|
||||
* @returns {string} Timestamp
|
||||
*/
|
||||
function extractTimestamp(line) {
|
||||
for (const p of [
|
||||
/\[(\d{4}-\d{2}-\d{2}T[\d:]+(?:\.\d+)?Z?)\]/,
|
||||
/(\d{4}-\d{2}-\d{2}T[\d:]+)/,
|
||||
/(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})/,
|
||||
]) {
|
||||
const m = line.match(p);
|
||||
if (m) return new Date(m[1].replace(" ", "T")).toISOString();
|
||||
}
|
||||
return new Date().toISOString();
|
||||
const timestampMatch = line.match(/^\[([^\]]+)\]/);
|
||||
return timestampMatch ? timestampMatch[1] : new Date().toISOString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Classify error type from log line
|
||||
* @param {string} line - Log line
|
||||
* @returns {string} Error type
|
||||
*/
|
||||
function classifyError(line) {
|
||||
if (/timeout|ETIMEDOUT/i.test(line)) return "timeout";
|
||||
if (/429|rate.limit/i.test(line)) return "ratelimit";
|
||||
if (/429|rate.?limit/i.test(line)) return "ratelimit";
|
||||
if (/401|403|unauthorized|auth/i.test(line)) return "auth_failure";
|
||||
if (/disk|space|ENOSPC/i.test(line)) return "disk_space";
|
||||
if (/memory|oom|heap|ENOMEM/i.test(line)) return "memory_pressure";
|
||||
if (/network|ECONN|EHOSTUNREACH/i.test(line)) return "network";
|
||||
if (/discord|webhook/i.test(line)) return "discord_error";
|
||||
if (/git/i.test(line)) return "git_error";
|
||||
if (/sqlite|database/i.test(line)) return "database_error";
|
||||
if (/disk|space|storage/i.test(line)) return "disk_space";
|
||||
if (/memory|oom|heap/i.test(line)) return "memory_pressure";
|
||||
if (/network|connection|ECONN/i.test(line)) return "network";
|
||||
return "unknown";
|
||||
}
|
||||
function clusterByTimeWindow(errors) {
|
||||
const now = Date.now();
|
||||
const clusters = { short: [], medium: [], long: [] };
|
||||
const windows = { short: new Map(), medium: new Map(), long: new Map() };
|
||||
for (const error of errors) {
|
||||
const ts = new Date(error.timestamp).getTime();
|
||||
if (now - ts > WINDOWS.long) continue;
|
||||
for (const [key, ms] of [
|
||||
["short", WINDOWS.short],
|
||||
["medium", WINDOWS.medium],
|
||||
["long", WINDOWS.long],
|
||||
]) {
|
||||
const slot = Math.floor(ts / ms);
|
||||
if (!windows[key].has(slot))
|
||||
windows[key].set(slot, { start: slot * ms, end: (slot + 1) * ms, errors: [] });
|
||||
windows[key].get(slot).errors.push(error);
|
||||
}
|
||||
}
|
||||
for (const key of Object.keys(windows)) {
|
||||
clusters[key] = Array.from(windows[key].values())
|
||||
.sort((a, b) => a.start - b.start)
|
||||
.map((w) => ({ ...w, count: w.errors.length, types: groupByType(w.errors) }));
|
||||
}
|
||||
return clusters;
|
||||
}
|
||||
function groupByType(errors) {
|
||||
const groups = {};
|
||||
for (const err of errors) groups[err.type] = (groups[err.type] || 0) + 1;
|
||||
return groups;
|
||||
}
|
||||
function scoreAnomaly(cluster, baselineStats, chainEvidence) {
|
||||
baselineStats = baselineStats || {};
|
||||
chainEvidence = chainEvidence || [];
|
||||
if (!cluster || cluster.count === 0)
|
||||
return { score: 0, factors: {}, isSignificant: false, recommendation: "No anomalies detected" };
|
||||
const factors = {};
|
||||
const errorTypes = Object.keys(cluster.types);
|
||||
const totalCount = cluster.count;
|
||||
const windowMs = cluster.end - cluster.start;
|
||||
const baseline = baselineStats.avgPerHour || 1;
|
||||
const observedRate = totalCount / (windowMs / 3600000);
|
||||
factors.deviation = Math.min(
|
||||
3,
|
||||
Math.max(0, baseline > 0 ? (observedRate - baseline) / baseline : observedRate),
|
||||
);
|
||||
factors.frequency = Math.min(2, (totalCount / (windowMs / 1000)) * 10);
|
||||
factors.clustering = Math.min(2, (1 - errorTypes.length / Math.max(1, totalCount)) * 2);
|
||||
factors.cascade =
|
||||
chainEvidence.length > 0
|
||||
? Math.min(1, chainEvidence.reduce((s, c) => s + c.confidence, 0) / chainEvidence.length) * 2
|
||||
: 0;
|
||||
factors.typeWeight =
|
||||
errorTypes.map((t) => ERROR_TYPE_WEIGHTS[t] || 0.5).reduce((a, b) => a + b, 0) /
|
||||
Math.max(1, errorTypes.length);
|
||||
const rawScore =
|
||||
factors.deviation * 1.5 +
|
||||
factors.frequency * 1.0 +
|
||||
factors.clustering * 0.8 +
|
||||
factors.cascade * 1.2 +
|
||||
factors.typeWeight * 0.5;
|
||||
const score = Math.min(10, Math.max(0, rawScore));
|
||||
const isSignificant = score >= 5 || factors.deviation >= 2 || factors.cascade >= 1.5;
|
||||
const fvals = Object.values(factors);
|
||||
const favg = fvals.reduce((a, b) => a + b, 0) / fvals.length;
|
||||
const fvar = fvals.reduce((s, v) => s + Math.pow(v - favg, 2), 0) / fvals.length;
|
||||
const consistency = Math.max(0, 1 - Math.sqrt(fvar) / Math.max(1, favg + 0.1));
|
||||
const confidence = Math.min(1, Math.min(1, totalCount / 10) * 0.6 + consistency * 0.4);
|
||||
return {
|
||||
score,
|
||||
isSignificant,
|
||||
factors,
|
||||
confidence,
|
||||
recommendation: generateRecommendation(errorTypes, factors),
|
||||
};
|
||||
}
|
||||
function generateRecommendation(errorTypes, factors) {
|
||||
const topType = errorTypes[0] || "unknown";
|
||||
const score =
|
||||
(factors.deviation || 0) +
|
||||
(factors.frequency || 0) +
|
||||
(factors.clustering || 0) +
|
||||
(factors.cascade || 0);
|
||||
const recs = {
|
||||
timeout: "Investigate network connectivity or increase timeout thresholds.",
|
||||
ratelimit: "Implement exponential backoff and request throttling.",
|
||||
auth_failure: "Verify credentials and rotate tokens.",
|
||||
disk_space: "Clean old logs, rotate archives, expand storage.",
|
||||
memory_pressure: "Profile heap usage and check for memory leaks.",
|
||||
network: "Check network connectivity, DNS, firewall rules.",
|
||||
discord_error: "Verify Discord API status and webhook URLs.",
|
||||
git_error: "Check SSH keys and remote URLs.",
|
||||
database_error: "Check database connectivity and query performance.",
|
||||
unknown: "Investigate error source with broader log analysis.",
|
||||
};
|
||||
const base = recs[topType] || recs.unknown;
|
||||
if ((factors.cascade || 0) >= 1.5)
|
||||
return "[CASCADE] " + base + " Chain: " + errorTypes.join("->");
|
||||
if (score >= 8) return "[CRITICAL] " + base;
|
||||
if (score >= 5) return "[HIGH] " + base;
|
||||
if (score >= 3) return "[MEDIUM] " + base;
|
||||
return "[LOW] " + base;
|
||||
}
|
||||
function detectCascadingFailures(errors) {
|
||||
if (errors.length < 3) return [];
|
||||
const sorted = [...errors].sort(
|
||||
(a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime(),
|
||||
);
|
||||
const CHAIN_WINDOW_MS = 60 * 1000;
|
||||
const chainMap = new Map();
|
||||
for (let i = 0; i < sorted.length - 1; i++) {
|
||||
const chain = [sorted[i].type];
|
||||
const timestamps = [new Date(sorted[i].timestamp).getTime()];
|
||||
for (let j = i + 1; j < sorted.length; j++) {
|
||||
const gap = new Date(sorted[j].timestamp).getTime() - timestamps[timestamps.length - 1];
|
||||
if (gap > CHAIN_WINDOW_MS) break;
|
||||
chain.push(sorted[j].type);
|
||||
timestamps.push(new Date(sorted[j].timestamp).getTime());
|
||||
}
|
||||
if (chain.length >= 3) {
|
||||
const intervals = [];
|
||||
for (let k = 1; k < timestamps.length; k++) intervals.push(timestamps[k] - timestamps[k - 1]);
|
||||
const avgInterval = intervals.reduce((a, b) => a + b, 0) / intervals.length;
|
||||
const chainKey = chain.join("->");
|
||||
if (chainMap.has(chainKey)) {
|
||||
chainMap.get(chainKey).count++;
|
||||
} else chainMap.set(chainKey, { chain, count: 1, avgInterval });
|
||||
}
|
||||
}
|
||||
const chains = [];
|
||||
for (const [chainKey, data] of chainMap) {
|
||||
const confidence =
|
||||
Math.min(1, data.count / 5) * 0.7 + Math.max(0, 1 - data.avgInterval / CHAIN_WINDOW_MS) * 0.3;
|
||||
if (confidence >= 0.3)
|
||||
chains.push({
|
||||
chain: chainKey,
|
||||
types: data.chain,
|
||||
occurrences: data.count,
|
||||
avgIntervalMs: Math.round(data.avgInterval),
|
||||
confidence: Math.round(confidence * 100) / 100,
|
||||
isCascading: confidence >= 0.6,
|
||||
|
||||
/**
|
||||
* Get 7-day rolling average of errors
|
||||
* @param {string} errorType - Error type to analyze
|
||||
* @returns {number} Average errors per day
|
||||
*/
|
||||
function get7DayRollingAverage(errorType) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const db = new sqlite3.Database(ANOMALY_DB, sqlite3.OPEN_READONLY, (err) => {
|
||||
if (err) {
|
||||
resolve(0); // No data yet
|
||||
return;
|
||||
}
|
||||
|
||||
const query = `
|
||||
SELECT AVG(daily_count) as avg
|
||||
FROM (
|
||||
SELECT DATE(timestamp) as date, SUM(count) as daily_count
|
||||
FROM anomalies
|
||||
WHERE error_type = ?
|
||||
AND timestamp >= datetime('now', '-7 days')
|
||||
GROUP BY DATE(timestamp)
|
||||
)
|
||||
`;
|
||||
|
||||
db.get(query, [errorType], (err, row) => {
|
||||
db.close();
|
||||
if (err) reject(err);
|
||||
else resolve(row?.avg || 0);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Score anomaly using heuristic algorithm
|
||||
* @param {Array} errors - Array of error entries
|
||||
* @returns {Object} Anomaly score result
|
||||
*/
|
||||
function scoreAnomaly(errors) {
|
||||
if (errors.length === 0) {
|
||||
return { score: 0, isSignificant: false, recommendation: "No anomalies detected" };
|
||||
}
|
||||
return chains.sort((a, b) => b.confidence - a.confidence);
|
||||
|
||||
const timeWindow = 3600 * 1000; // 1 hour in ms
|
||||
const frequency = errors.length / timeWindow;
|
||||
|
||||
const severityWeights = {
|
||||
critical: 3,
|
||||
high: 2,
|
||||
medium: 1,
|
||||
low: 0.5,
|
||||
unknown: 0.5,
|
||||
};
|
||||
|
||||
const primarySeverity = errors[0]?.severity || "low";
|
||||
const severityWeight = severityWeights[primarySeverity] || 0.5;
|
||||
|
||||
// Calculate baseline deviation
|
||||
const baseline = 0.1; // Default baseline if no historical data
|
||||
const deviation = frequency > baseline ? (frequency - baseline) / baseline : 0;
|
||||
|
||||
const score = deviation * severityWeight;
|
||||
const isSignificant = deviation > 2.0; // 2σ deviation threshold
|
||||
|
||||
const recommendation = generateRecommendation(errors, score);
|
||||
|
||||
return {
|
||||
score: Math.min(10, score),
|
||||
isSignificant,
|
||||
deviation,
|
||||
frequency,
|
||||
recommendation,
|
||||
};
|
||||
}
|
||||
function recordChains(db, chains) {
|
||||
try {
|
||||
const insert = db.prepare(
|
||||
"INSERT INTO error_chains (chain, occurrences, avg_interval_ms, confidence) VALUES (?, ?, ?, ?)",
|
||||
);
|
||||
for (const chain of chains)
|
||||
insert.run(
|
||||
JSON.stringify(chain.types),
|
||||
chain.occurrences,
|
||||
chain.avgIntervalMs,
|
||||
chain.confidence,
|
||||
);
|
||||
} catch (e) {}
|
||||
|
||||
/**
|
||||
* Generate remediation recommendation
|
||||
* @param {Array} errors - Error entries
|
||||
* @param {number} score - Anomaly score
|
||||
* @returns {string} Recommendation
|
||||
*/
|
||||
function generateRecommendation(errors, score) {
|
||||
if (errors.length === 0) return "No action required";
|
||||
|
||||
const errorType = errors[0].type;
|
||||
|
||||
const recommendations = {
|
||||
timeout:
|
||||
score > 5
|
||||
? "Critical: Investigate network connectivity or increase timeout thresholds"
|
||||
: "Warning: Monitor timeout frequency, consider implementing retry logic",
|
||||
|
||||
ratelimit:
|
||||
score > 5
|
||||
? "Critical: Implement exponential backoff and request throttling"
|
||||
: "Warning: Add rate limit handling with graceful degradation",
|
||||
|
||||
auth_failure:
|
||||
score > 5
|
||||
? "Critical: Verify credentials, rotate tokens, audit auth subsystem"
|
||||
: "Warning: Check token expiration and refresh logic",
|
||||
|
||||
disk_space:
|
||||
score > 5
|
||||
? "Critical: Clean old logs or expand storage immediately"
|
||||
: "Warning: Monitor disk usage, implement log rotation",
|
||||
|
||||
memory_pressure:
|
||||
score > 5
|
||||
? "Critical: Investigate memory leaks, restart services, profile heap"
|
||||
: "Warning: Monitor memory trends, consider increasing limits",
|
||||
|
||||
network:
|
||||
score > 5
|
||||
? "Critical: Check network connectivity, DNS, firewall rules"
|
||||
: "Warning: Implement connection pooling and retry logic",
|
||||
|
||||
unknown: "Investigate error source and implement appropriate handling",
|
||||
};
|
||||
|
||||
return recommendations[errorType] || recommendations.unknown;
|
||||
}
|
||||
function calculateBaseline(db, errorType) {
|
||||
errorType = errorType || "all";
|
||||
try {
|
||||
const cond = errorType !== "all" ? "AND error_type = ?" : "";
|
||||
const q =
|
||||
"SELECT error_type, SUM(count) as total_count, AVG(daily_avg) as avg_per_day, MAX(daily_max) as max_in_day, COUNT(DISTINCT date) as days_with_data FROM (SELECT error_type, DATE(timestamp) as date, SUM(count) as daily_avg, MAX(count) as daily_max FROM anomalies WHERE timestamp >= datetime('now', '-7 days') " +
|
||||
cond +
|
||||
" GROUP BY error_type, DATE(timestamp)) GROUP BY error_type";
|
||||
const stmt = db.prepare(q);
|
||||
const rows = errorType !== "all" ? stmt.all(errorType) : stmt.all();
|
||||
if (!rows || rows.length === 0)
|
||||
return { avgPerHour: 0.1, stdDev: 0.1, maxExpected: 0.5, dataPoints: 0, type: errorType };
|
||||
const totalErrors = rows.reduce((s, r) => s + (r.total_count || 0), 0);
|
||||
const totalDays = Math.max(
|
||||
1,
|
||||
rows.reduce((s, r) => s + (r.days_with_data || 0), 0),
|
||||
);
|
||||
const avgPerDay = totalErrors / totalDays;
|
||||
const avgPerHour = avgPerDay / 24;
|
||||
const stdDev = Math.sqrt(avgPerDay) || 0.1;
|
||||
const maxExpected = avgPerHour + (2 * stdDev) / 24;
|
||||
return {
|
||||
avgPerHour,
|
||||
stdDev: stdDev / 24,
|
||||
maxExpected,
|
||||
dataPoints: totalDays,
|
||||
byType: rows.reduce((m, r) => {
|
||||
m[r.error_type] = {
|
||||
avgPerDay: r.avg_per_day || 0,
|
||||
maxInDay: r.daily_max || 0,
|
||||
days: r.days_with_data || 0,
|
||||
};
|
||||
return m;
|
||||
}, {}),
|
||||
type: errorType,
|
||||
};
|
||||
} catch (e) {
|
||||
return {
|
||||
avgPerHour: 0.1,
|
||||
stdDev: 0.1,
|
||||
maxExpected: 0.5,
|
||||
dataPoints: 0,
|
||||
type: errorType,
|
||||
error: e.message,
|
||||
};
|
||||
}
|
||||
}
|
||||
function updateHourlyAggregates(db, errors) {
|
||||
const hourMap = new Map();
|
||||
for (const err of errors) {
|
||||
const ts = new Date(err.timestamp);
|
||||
const hour =
|
||||
new Date(ts.getFullYear(), ts.getMonth(), ts.getDate(), ts.getHours())
|
||||
.toISOString()
|
||||
.slice(0, 13) + ":00:00";
|
||||
const key = hour + "|" + err.type;
|
||||
hourMap.set(key, (hourMap.get(key) || 0) + 1);
|
||||
}
|
||||
try {
|
||||
const insert = db.prepare(
|
||||
"INSERT INTO hourly_error_counts (hour_timestamp, error_type, count) VALUES (?, ?, ?) ON CONFLICT(hour_timestamp, error_type) DO UPDATE SET count = count + excluded.count",
|
||||
);
|
||||
for (const [key, count] of hourMap) {
|
||||
const [h, t] = key.split("|");
|
||||
insert.run(h, t, count);
|
||||
}
|
||||
} catch (e) {}
|
||||
}
|
||||
async function detectAnomalies(options) {
|
||||
|
||||
/**
|
||||
* Detect anomalies with temporal clustering
|
||||
* @param {Object} options - Detection options
|
||||
* @returns {Object} Anomaly detection report
|
||||
*/
|
||||
async function detectAnomalies(options = {}) {
|
||||
const { timeWindow = 3600 * 1000 } = options; // Default 1 hour
|
||||
|
||||
const errors = scanLogFiles();
|
||||
if (errors.length === 0)
|
||||
return {
|
||||
timestamp: new Date().toISOString(),
|
||||
status: "clean",
|
||||
total_errors: 0,
|
||||
anomalies: [],
|
||||
chains: [],
|
||||
baseline: null,
|
||||
};
|
||||
let db = null;
|
||||
try {
|
||||
db = initDB();
|
||||
} catch (e) {
|
||||
console.error("DB init failed:", e.message);
|
||||
}
|
||||
if (db)
|
||||
try {
|
||||
updateHourlyAggregates(db, errors);
|
||||
} catch {}
|
||||
const clusters = clusterByTimeWindow(errors);
|
||||
const baselineStats = db
|
||||
? calculateBaseline(db, "all")
|
||||
: { avgPerHour: 0.1, stdDev: 0.1, maxExpected: 0.5 };
|
||||
const chains = detectCascadingFailures(errors);
|
||||
if (db)
|
||||
try {
|
||||
recordChains(db, chains);
|
||||
} catch {}
|
||||
|
||||
// Group errors by type
|
||||
const errorGroups = {};
|
||||
errors.forEach((err) => {
|
||||
if (!errorGroups[err.type]) {
|
||||
errorGroups[err.type] = [];
|
||||
}
|
||||
errorGroups[err.type].push(err);
|
||||
});
|
||||
|
||||
// Score each error group
|
||||
const anomalies = [];
|
||||
for (const [windowKey, windowClusters] of Object.entries(clusters)) {
|
||||
for (const cluster of windowClusters) {
|
||||
const relevantChains = chains.filter((c) => cluster.types[c.types && c.types[0]] > 0);
|
||||
const scored = scoreAnomaly(cluster, baselineStats, relevantChains);
|
||||
if (scored.isSignificant || scored.score > 0)
|
||||
anomalies.push({
|
||||
window: windowKey,
|
||||
start: new Date(cluster.start).toISOString(),
|
||||
end: new Date(cluster.end).toISOString(),
|
||||
...scored,
|
||||
count: cluster.count,
|
||||
types: cluster.types,
|
||||
sampleErrors: cluster.errors
|
||||
.slice(0, 3)
|
||||
.map((e) => ({ type: e.type, source: e.source, raw: e.raw })),
|
||||
});
|
||||
for (const [type, groupErrors] of Object.entries(errorGroups)) {
|
||||
const scoreResult = scoreAnomaly(groupErrors);
|
||||
|
||||
if (scoreResult.isSignificant) {
|
||||
anomalies.push({
|
||||
type,
|
||||
count: groupErrors.length,
|
||||
score: scoreResult.score,
|
||||
severity: classifySeverity(scoreResult.score),
|
||||
recommendation: scoreResult.recommendation,
|
||||
errors: groupErrors.slice(0, 5), // Sample errors
|
||||
});
|
||||
}
|
||||
}
|
||||
if (db) {
|
||||
try {
|
||||
const insertAnomaly = db.prepare(
|
||||
"INSERT INTO anomalies (source, error_type, count, severity, score) VALUES (?, ?, ?, ?, ?)",
|
||||
);
|
||||
for (const a of anomalies.filter((a) => a.isSignificant))
|
||||
insertAnomaly.run(
|
||||
"cluster:" + a.window,
|
||||
Object.keys(a.types)[0] || "unknown",
|
||||
a.count,
|
||||
classifySeverity(a.score),
|
||||
a.score,
|
||||
);
|
||||
} catch (e) {}
|
||||
try {
|
||||
db.close();
|
||||
} catch {}
|
||||
}
|
||||
anomalies.sort((a, b) => b.score - a.score);
|
||||
|
||||
// Record to database
|
||||
await recordAnomalies(anomalies);
|
||||
|
||||
return {
|
||||
timestamp: new Date().toISOString(),
|
||||
status: anomalies.length > 0 ? "anomaly_detected" : "clean",
|
||||
anomalies,
|
||||
total_errors: errors.length,
|
||||
timeWindows: Object.fromEntries(
|
||||
Object.entries(clusters).map(([k, v]) => [
|
||||
k,
|
||||
{ clusterCount: v.length, totalErrors: v.reduce((s, c) => s + c.count, 0) },
|
||||
]),
|
||||
),
|
||||
anomalies: anomalies.slice(0, 20),
|
||||
chains: chains.slice(0, 10),
|
||||
baseline: {
|
||||
avgPerHour: Math.round(baselineStats.avgPerHour * 100) / 100,
|
||||
maxExpected: Math.round(baselineStats.maxExpected * 100) / 100,
|
||||
dataPoints: baselineStats.dataPoints,
|
||||
},
|
||||
significant_count: anomalies.length,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Classify severity from score
|
||||
* @param {number} score - Anomaly score
|
||||
* @returns {string} Severity level
|
||||
*/
|
||||
function classifySeverity(score) {
|
||||
if (score >= 8) return "critical";
|
||||
if (score >= 5) return "high";
|
||||
if (score >= 3) return "medium";
|
||||
return "low";
|
||||
}
|
||||
function generateReport(result) {
|
||||
const { status, total_errors, anomalies, chains, baseline, timeWindows } = result;
|
||||
let r =
|
||||
"=== Phase 2 Anomaly Detection Report ===\nTimestamp: " +
|
||||
result.timestamp +
|
||||
"\nStatus: " +
|
||||
status.toUpperCase() +
|
||||
"\n\n--- Time Window Clusters ---\n";
|
||||
for (const [window, data] of Object.entries(timeWindows))
|
||||
r += " " + window + ": " + data.clusterCount + " clusters, " + data.totalErrors + " errors\n";
|
||||
r += "\n";
|
||||
if (baseline)
|
||||
r +=
|
||||
"--- Baseline (7-day rolling) ---\n Avg/hr: " +
|
||||
baseline.avgPerHour +
|
||||
" | Max expected: " +
|
||||
baseline.maxExpected +
|
||||
"\n Data points: " +
|
||||
baseline.dataPoints +
|
||||
" days\n\n";
|
||||
if (chains.length > 0) {
|
||||
r += "--- CASCADING FAILURES (" + chains.length + ") ---\n";
|
||||
for (const c of chains)
|
||||
r +=
|
||||
" " +
|
||||
(c.isCascading ? "[CASCADE]" : "[possible]") +
|
||||
" " +
|
||||
c.chain +
|
||||
"\n Occ: " +
|
||||
c.occurrences +
|
||||
" | Interval: " +
|
||||
c.avgIntervalMs +
|
||||
"ms | Conf: " +
|
||||
(c.confidence * 100).toFixed(0) +
|
||||
"%\n";
|
||||
r += "\n";
|
||||
}
|
||||
if (anomalies.length > 0) {
|
||||
r += "--- SIGNIFICANT ANOMALIES (" + anomalies.length + ") ---\n";
|
||||
for (const a of anomalies.slice(0, 10)) {
|
||||
const badge = a.score >= 8 ? "[CRIT]" : a.score >= 5 ? "[HIGH]" : "[MED]";
|
||||
r +=
|
||||
" " +
|
||||
badge +
|
||||
" [" +
|
||||
a.window +
|
||||
"] Score: " +
|
||||
a.score.toFixed(1) +
|
||||
" | Count: " +
|
||||
a.count +
|
||||
"\n Types: " +
|
||||
Object.entries(a.types)
|
||||
.map(([t, c]) => t + " x" + c)
|
||||
.join(", ") +
|
||||
"\n " +
|
||||
a.recommendation +
|
||||
"\n Factors: dev=" +
|
||||
(a.factors.deviation || 0).toFixed(1) +
|
||||
", freq=" +
|
||||
(a.factors.frequency || 0).toFixed(1) +
|
||||
", clus=" +
|
||||
(a.factors.clustering || 0).toFixed(1) +
|
||||
", casc=" +
|
||||
(a.factors.cascade || 0).toFixed(1) +
|
||||
"\n\n";
|
||||
}
|
||||
} else r += "No significant anomalies detected\n\n";
|
||||
r += "Total errors scanned: " + total_errors + "\n=== End Anomaly Detection ===\n";
|
||||
return r;
|
||||
|
||||
/**
|
||||
* Record anomalies to database
|
||||
* @param {Array} anomalies - Anomaly records
|
||||
*/
|
||||
function recordAnomalies(anomalies) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const db = new sqlite3.Database(ANOMALY_DB, (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
const stmt = db.prepare(`
|
||||
INSERT INTO anomalies (source, error_type, count, severity, score)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
anomalies.forEach((anomaly) => {
|
||||
stmt.run(["logs", anomaly.type, anomaly.count, anomaly.severity, anomaly.score]);
|
||||
});
|
||||
|
||||
stmt.finalize((err) => {
|
||||
db.close();
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
export {
|
||||
|
||||
/**
|
||||
* Generate human-readable report
|
||||
* @param {Object} result - Anomaly detection result
|
||||
* @returns {string} Formatted report
|
||||
*/
|
||||
function generateReport(result) {
|
||||
let report = "=== Anomaly Detection Report ===\n";
|
||||
report += `Timestamp: ${result.timestamp}\n\n`;
|
||||
report += `Total errors scanned: ${result.total_errors}\n`;
|
||||
report += `Significant anomalies: ${result.significant_count}\n\n`;
|
||||
|
||||
if (result.anomalies.length > 0) {
|
||||
report += "⚠️ SIGNIFICANT ANOMALIES:\n";
|
||||
result.anomalies.forEach((anomaly) => {
|
||||
report += ` Type: ${anomaly.type}\n`;
|
||||
report += ` Count: ${anomaly.count}\n`;
|
||||
report += ` Score: ${anomaly.score.toFixed(2)}\n`;
|
||||
report += ` Severity: ${anomaly.severity.toUpperCase()}\n`;
|
||||
report += ` Recommendation: ${anomaly.recommendation}\n\n`;
|
||||
});
|
||||
} else {
|
||||
report += "✅ No significant anomalies detected\n";
|
||||
}
|
||||
|
||||
report += "\n=== End Anomaly Detection ===\n";
|
||||
|
||||
return report;
|
||||
}
|
||||
|
||||
// CLI execution
|
||||
if (require.main === module) {
|
||||
initDB()
|
||||
.then(async () => {
|
||||
const args = process.argv.slice(2);
|
||||
const jsonOutput = args.includes("--json") || args.includes("-j");
|
||||
|
||||
const result = await detectAnomalies();
|
||||
|
||||
if (jsonOutput) {
|
||||
console.log(JSON.stringify(result, null, 2));
|
||||
} else {
|
||||
console.log(generateReport(result));
|
||||
}
|
||||
})
|
||||
.catch(console.error);
|
||||
}
|
||||
|
||||
// Export for module usage
|
||||
module.exports = {
|
||||
detectAnomalies,
|
||||
scanLogFiles,
|
||||
clusterByTimeWindow,
|
||||
detectCascadingFailures,
|
||||
calculateBaseline,
|
||||
scoreAnomaly,
|
||||
scanLogFiles,
|
||||
get7DayRollingAverage,
|
||||
generateReport,
|
||||
initDB,
|
||||
WINDOWS,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user