Avoid reprocessing webstats files.

Web servers typically provide us with the last 14 days of request
logs. We shouldn't process the whole 14 days over and over. Instead we
should only process new logs files and any other log files containing
log lines from newly written dates.

In some cases web servers stop serving a given virtual host or stop
acting as web server at all. However, in these cases we're left with
14 days of logs per virtual host. Ideally, these logs would get
cleaned up, but until that's the case, we should at least not
reprocess these files over and over.

In order to avoid reprocessing webstats files, we need a new state
file with log dates contained in given input files. We use that state
file to determine which of the previously processed webstats files to
re-process, so that we can write complete daily logs.
This commit is contained in:
Karsten Loesing 2019-12-11 12:22:40 +01:00
parent 3002d6bc6b
commit d48163379c
4 changed files with 170 additions and 22 deletions

View File

@ -3,6 +3,7 @@
* Medium changes * Medium changes
- Give up on periodically checking the configuration file for - Give up on periodically checking the configuration file for
updates and reloading it in case of changes. updates and reloading it in case of changes.
- Avoid reprocessing webstats files.
* Minor changes * Minor changes
- Remove dependency on metrics-lib's internal package. - Remove dependency on metrics-lib's internal package.

View File

@ -81,5 +81,22 @@ public class LogMetadata {
} }
return Optional.ofNullable(metadata); return Optional.ofNullable(metadata);
} }
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
LogMetadata that = (LogMetadata) other;
return path.toString().equals(that.path.toString());
}
@Override
public int hashCode() {
return path.hashCode();
}
} }

View File

@ -25,14 +25,18 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -56,8 +60,10 @@ public class SanitizeWeblogs extends CollecTorMain {
private static final int LIMIT = 2; private static final int LIMIT = 2;
private static final String WEBSTATS = "webstats"; private static final String WEBSTATS = "webstats";
private String outputPathName; private Path outputDirectory;
private String recentPathName; private Path recentDirectory;
private Path processedWebstatsFile;
private boolean limits; private boolean limits;
/** /**
@ -84,14 +90,22 @@ public class SanitizeWeblogs extends CollecTorMain {
try { try {
Files.createDirectories(this.config.getPath(Key.OutputPath)); Files.createDirectories(this.config.getPath(Key.OutputPath));
Files.createDirectories(this.config.getPath(Key.RecentPath)); Files.createDirectories(this.config.getPath(Key.RecentPath));
this.outputPathName = this.config.getPath(Key.OutputPath).toString(); Files.createDirectories(this.config.getPath(Key.StatsPath));
this.recentPathName = this.config.getPath(Key.RecentPath).toString(); this.outputDirectory = this.config.getPath(Key.OutputPath);
this.recentDirectory = this.config.getPath(Key.RecentPath);
this.processedWebstatsFile = this.config.getPath(Key.StatsPath)
.resolve("processed-webstats");
this.limits = this.config.getBool(Key.WebstatsLimits); this.limits = this.config.getBool(Key.WebstatsLimits);
Set<SourceType> sources = this.config.getSourceTypeSet( Set<SourceType> sources = this.config.getSourceTypeSet(
Key.WebstatsSources); Key.WebstatsSources);
if (sources.contains(SourceType.Local)) { if (sources.contains(SourceType.Local)) {
log.info("Processing logs using batch value {}.", BATCH); log.info("Processing logs using batch value {}.", BATCH);
findCleanWrite(this.config.getPath(Key.WebstatsLocalOrigins)); Map<LogMetadata, Set<LocalDate>> previouslyProcessedWebstats
= this.readProcessedWebstats();
Map<LogMetadata, Set<LocalDate>> newlyProcessedWebstats
= this.findCleanWrite(this.config.getPath(Key.WebstatsLocalOrigins),
previouslyProcessedWebstats);
this.writeProcessedWebstats(newlyProcessedWebstats);
long cutOffMillis = System.currentTimeMillis() long cutOffMillis = System.currentTimeMillis()
- 3L * 24L * 60L * 60L * 1000L; - 3L * 24L * 60L * 60L * 1000L;
PersistenceUtils.cleanDirectory(this.config.getPath(Key.RecentPath), PersistenceUtils.cleanDirectory(this.config.getPath(Key.RecentPath),
@ -103,7 +117,32 @@ public class SanitizeWeblogs extends CollecTorMain {
} }
} }
private void findCleanWrite(Path dir) { private Map<LogMetadata, Set<LocalDate>> readProcessedWebstats() {
Map<LogMetadata, Set<LocalDate>> processedWebstats = new HashMap<>();
if (Files.exists(this.processedWebstatsFile)) {
try {
for (String line : Files.readAllLines(this.processedWebstatsFile)) {
String[] lineParts = line.split(",", 2);
Optional<LogMetadata> logMetadata
= LogMetadata.create(Paths.get(lineParts[1]));
if (logMetadata.isPresent()) {
processedWebstats.putIfAbsent(logMetadata.get(), new HashSet<>());
LocalDate containedLogDate = LocalDate.parse(lineParts[0]);
processedWebstats.get(logMetadata.get()).add(containedLogDate);
}
}
} catch (IOException e) {
log.error("Cannot read state file {}.", this.processedWebstatsFile, e);
}
log.debug("Read state file containing {} log files.",
processedWebstats.size());
}
return processedWebstats;
}
private Map<LogMetadata, Set<LocalDate>> findCleanWrite(Path dir,
Map<LogMetadata, Set<LocalDate>> previouslyProcessedWebstats) {
Map<LogMetadata, Set<LocalDate>> newlyProcessedWebstats = new HashMap<>();
LogFileMap fileMapIn = new LogFileMap(dir); LogFileMap fileMapIn = new LogFileMap(dir);
log.info("Found log files for {} virtual hosts.", fileMapIn.size()); log.info("Found log files for {} virtual hosts.", fileMapIn.size());
for (Map.Entry<String,TreeMap<String,TreeMap<LocalDate,LogMetadata>>> for (Map.Entry<String,TreeMap<String,TreeMap<LocalDate,LogMetadata>>>
@ -113,24 +152,76 @@ public class SanitizeWeblogs extends CollecTorMain {
: virtualEntry.getValue().entrySet()) { : virtualEntry.getValue().entrySet()) {
String physicalHost = physicalEntry.getKey(); String physicalHost = physicalEntry.getKey();
log.info("Processing logs for {} on {}.", virtualHost, physicalHost); log.info("Processing logs for {} on {}.", virtualHost, physicalHost);
Map<LocalDate, Map<String, Long>> linesByDate /* Go through current input log files for given virtual and physical
= physicalEntry.getValue().values().stream().parallel() * host, and either look up contained log dates from the last execution,
.flatMap(metadata -> sanitzedLineStream(metadata).entrySet() * or parse files to memory now. */
.stream()) Map<LocalDate, Map<String, Long>> sanitizedLinesByDate
.collect(groupingBy(Map.Entry::getKey, = new HashMap<>();
reducing(Collections.emptyMap(), Map.Entry::getValue, Set<LogMetadata> previouslyReadFiles = new HashSet<>();
(e1, e2) -> Stream.concat(e1.entrySet().stream(), e2.entrySet() for (LogMetadata logMetadata : physicalEntry.getValue().values()) {
.stream()) Set<LocalDate> containedLogDates;
.collect(groupingByConcurrent(Map.Entry::getKey, if (previouslyProcessedWebstats.containsKey(logMetadata)) {
summingLong(Map.Entry::getValue)))))); containedLogDates = previouslyProcessedWebstats.get(logMetadata);
LocalDate[] interval = determineInterval(linesByDate.keySet()); for (LocalDate date : containedLogDates) {
linesByDate.entrySet().stream() sanitizedLinesByDate.putIfAbsent(date, new TreeMap<>());
.filter((entry) -> entry.getKey().isAfter(interval[0]) }
&& entry.getKey().isBefore(interval[1])).parallel() previouslyReadFiles.add(logMetadata);
} else {
containedLogDates = sanitizeWebstatsLog(sanitizedLinesByDate,
logMetadata);
}
newlyProcessedWebstats.put(logMetadata, containedLogDates);
}
/* Determine log dates that are safe to be written to disk now and that
* we didn't write to disk before. */
Set<LocalDate> storeDates = new HashSet<>();
LocalDate[] interval = determineInterval(sanitizedLinesByDate.keySet());
for (LocalDate newDate : sanitizedLinesByDate.keySet()) {
if (newDate.isAfter(interval[0]) && newDate.isBefore(interval[1])) {
WebServerAccessLogPersistence walp
= new WebServerAccessLogPersistence(
new WebServerAccessLogImpl(virtualHost, physicalHost, newDate));
Path outputPath = this.outputDirectory
.resolve(walp.getStoragePath());
if (!Files.exists(outputPath)) {
storeDates.add(newDate);
}
}
}
/* Reprocess previously read files containing log dates that we're going
* to write to disk below. */
for (LogMetadata previouslyReadFile : previouslyReadFiles) {
if (!Collections.disjoint(storeDates,
newlyProcessedWebstats.get(previouslyReadFile))) {
sanitizeWebstatsLog(sanitizedLinesByDate, previouslyReadFile);
}
}
/* Write sanitized log files to disk. */
sanitizedLinesByDate.entrySet().stream()
.filter((entry) -> storeDates.contains(entry.getKey())).parallel()
.forEach((entry) -> storeSortedAndForget(virtualHost, physicalHost, .forEach((entry) -> storeSortedAndForget(virtualHost, physicalHost,
entry.getKey(), entry.getValue())); entry.getKey(), entry.getValue()));
} }
} }
return newlyProcessedWebstats;
}
private Set<LocalDate> sanitizeWebstatsLog(
Map<LocalDate, Map<String, Long>> sanitizedLinesByDate,
LogMetadata logFile) {
Map<LocalDate, Map<String, Long>> newlySanitizedLinesByDate
= sanitzedLineStream(logFile);
for (Map.Entry<LocalDate, Map<String, Long>> e
: newlySanitizedLinesByDate.entrySet()) {
sanitizedLinesByDate.putIfAbsent(e.getKey(), new TreeMap<>());
Map<String, Long> newlySanitizedLines
= sanitizedLinesByDate.get(e.getKey());
for (Map.Entry<String, Long> e1 : e.getValue().entrySet()) {
newlySanitizedLines.put(e1.getKey(),
newlySanitizedLines.getOrDefault(e1.getKey(), 0L) + e1.getValue());
}
}
return newlySanitizedLinesByDate.keySet();
} }
private void storeSortedAndForget(String virtualHost, String physicalHost, private void storeSortedAndForget(String virtualHost, String physicalHost,
@ -149,8 +240,8 @@ public class SanitizeWeblogs extends CollecTorMain {
new WebServerAccessLogImpl(toCompressedBytes(retainedLines), new WebServerAccessLogImpl(toCompressedBytes(retainedLines),
new File(name), name)); new File(name), name));
log.debug("Storing {}.", name); log.debug("Storing {}.", name);
walp.storeOut(this.outputPathName); walp.storeOut(this.outputDirectory.toString());
walp.storeRecent(this.recentPathName); walp.storeRecent(this.recentDirectory.toString());
} catch (DescriptorParseException dpe) { } catch (DescriptorParseException dpe) {
log.error("Cannot store log desriptor {}.", name, dpe); log.error("Cannot store log desriptor {}.", name, dpe);
} catch (Throwable th) { // catch all else } catch (Throwable th) { // catch all else
@ -279,5 +370,25 @@ public class SanitizeWeblogs extends CollecTorMain {
return Collections.emptyMap(); return Collections.emptyMap();
} }
private void writeProcessedWebstats(
Map<LogMetadata, Set<LocalDate>> newlyProcessedWebstats) {
try {
if (!Files.exists(this.processedWebstatsFile.getParent())) {
Files.createDirectories(this.processedWebstatsFile.getParent());
}
List<String> lines = new ArrayList<>();
for (Map.Entry<LogMetadata, Set<LocalDate>> e
: newlyProcessedWebstats.entrySet()) {
for (LocalDate logLineDate : e.getValue()) {
lines.add(String.format("%s,%s", logLineDate, e.getKey().path));
}
}
Files.write(this.processedWebstatsFile, lines);
} catch (IOException e) {
log.error("Cannot write state file {}.", this.processedWebstatsFile, e);
}
log.debug("Wrote state file containing {} log files.",
newlyProcessedWebstats.size());
}
} }

View File

@ -104,6 +104,25 @@ public class WebServerAccessLogImpl implements WebServerAccessLog {
} }
} }
/**
* Creates an empty WebServerAccessLog from the given filename parts.
*
* <p>This instance is not intended to be written to disk, as it doesn't have
* any content. The main intention of this instance is to compute storage
* paths.</p>
*
* @param virtualHost Virtual host name.
* @param physicalHost Physical host name.
* @param logDate Log date.
*/
protected WebServerAccessLogImpl(String virtualHost, String physicalHost,
LocalDate logDate) {
this.descriptorFile = null;
this.virtualHost = virtualHost;
this.physicalHost = physicalHost;
this.logDate = logDate;
}
@Override @Override
public InputStream decompressedByteStream() throws DescriptorParseException { public InputStream decompressedByteStream() throws DescriptorParseException {
try { try {