Reduce memory footprint and wall time.

Adapt to latest changes of metrics-lib (task-25329) and make use of the high
redundancy of logs (e.g. a 3G file might only contain 350 different lines).
This avoids OOM and array out of bounds exceptions for large files (>2G) and
gives a speed-up of roughly 50%. (The earlier 66min are down to 34min for
meronense&weschniakowii files plus two larger files.)

There is a BATCH constant, which could be tuned for processing speed. It is
logged for each webstats module run.  Currently, it is set to 100k.  This
was more or less arbitrarily chosen and used for all the tests.  A test run
using 500k didn't show significant differences.
This commit is contained in:
iwakeh 2018-02-20 16:30:09 +00:00
parent fbb35f75da
commit 8557bf6255
2 changed files with 51 additions and 18 deletions

View File

@ -5,7 +5,6 @@ package org.torproject.collector.persist;
import org.torproject.descriptor.WebServerAccessLog;
import org.torproject.descriptor.internal.FileType;
import org.torproject.descriptor.log.InternalLogDescriptor;
import org.torproject.descriptor.log.InternalWebServerAccessLog;
import org.slf4j.Logger;
@ -30,13 +29,6 @@ public class WebServerAccessLogPersistence
/** Prepare storing the given descriptor. */
public WebServerAccessLogPersistence(WebServerAccessLog desc) {
super(desc, new byte[0]);
byte[] compressedBytes = null;
try { // The descriptor bytes have to be stored compressed.
compressedBytes = COMPRESSION.compress(desc.getRawDescriptorBytes());
((InternalLogDescriptor)desc).setRawDescriptorBytes(compressedBytes);
} catch (Exception ex) {
log.warn("Cannot compress {}. Storing uncompressed.", ex);
}
calculatePaths();
}

View File

@ -3,6 +3,7 @@
package org.torproject.collector.webstats;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingByConcurrent;
import static java.util.stream.Collectors.toList;
@ -17,6 +18,7 @@ import org.torproject.collector.persist.WebServerAccessLogPersistence;
import org.torproject.descriptor.DescriptorParseException;
import org.torproject.descriptor.Method;
import org.torproject.descriptor.WebServerAccessLog;
import org.torproject.descriptor.internal.FileType;
import org.torproject.descriptor.log.InternalLogDescriptor;
import org.torproject.descriptor.log.InternalWebServerAccessLog;
import org.torproject.descriptor.log.WebServerAccessLogImpl;
@ -26,8 +28,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDate;
@ -40,6 +43,7 @@ import java.util.SortedSet;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@ -87,6 +91,7 @@ public class SanitizeWeblogs extends CollecTorMain {
Set<SourceType> sources = this.config.getSourceTypeSet(
Key.WebstatsSources);
if (sources.contains(SourceType.Local)) {
log.info("Processing logs using batch value {}.", BATCH);
findCleanWrite(this.config.getPath(Key.WebstatsLocalOrigins));
PersistenceUtils.cleanDirectory(this.config.getPath(Key.RecentPath));
}
@ -126,24 +131,60 @@ public class SanitizeWeblogs extends CollecTorMain {
String name = new StringJoiner(InternalLogDescriptor.SEP)
.add(virtualHost).add(physicalHost)
.add(InternalWebServerAccessLog.MARKER)
.add(date.format(DateTimeFormatter.BASIC_ISO_DATE)).toString();
.add(date.format(DateTimeFormatter.BASIC_ISO_DATE))
.toString() + "." + FileType.XZ.name().toLowerCase();
log.debug("Sanitizing {}.", name);
List<String> retainedLines = lines
Map<String, Long> retainedLines = new TreeMap<>(lines
.stream().parallel().map((line) -> sanitize(line, date))
.filter((line) -> line.isPresent()).map((line) -> line.get())
.collect(toList());
retainedLines.sort(null);
.filter((line) -> line.isPresent())
.map((line) -> line.get())
.collect(groupingByConcurrent(line -> line, counting())));
lines.clear(); // not needed anymore
try {
WebServerAccessLogPersistence walp
= new WebServerAccessLogPersistence(
new WebServerAccessLogImpl(retainedLines, name, false));
new WebServerAccessLogImpl(toCompressedBytes(retainedLines),
name, false));
log.debug("Storing {}.", name);
walp.storeOut(this.outputPathName);
walp.storeRecent(this.recentPathName);
} catch (DescriptorParseException dpe) {
log.error("Cannot store log desriptor {}.", name, dpe);
} catch (Throwable th) { // catch all else
log.error("Serious problem. Cannot store log desriptor {}.", name, th);
}
lines.clear();
}
private static final int BATCH = 100_000;
static byte[] toCompressedBytes(Map<String, Long> lines)
throws DescriptorParseException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStream os = FileType.XZ.outputStream(baos)) {
for (Map.Entry<String, Long> entry : lines.entrySet()) {
long count = entry.getValue();
byte[] batch = bytesFor(entry.getKey(), BATCH);
while (count > 0) {
if (count > BATCH) {
os.write(batch);
count -= BATCH;
} else {
os.write(bytesFor(entry.getKey(), count));
break;
}
}
}
os.flush();
os.close();
return baos.toByteArray();
} catch (Exception ex) {
throw new DescriptorParseException(ex.getMessage());
}
}
private static byte[] bytesFor(String line, long times) {
return Stream.of(line).limit(times)
.collect(Collectors.joining("\n", "", "\n")).getBytes();
}
static Optional<String> sanitize(WebServerAccessLogLine logLine,
@ -186,8 +227,8 @@ public class SanitizeWeblogs extends CollecTorMain {
private Stream<WebServerAccessLogLine> lineStream(LogMetadata metadata) {
log.debug("Processing file {}.", metadata.path);
try (BufferedReader br
= new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
metadata.fileType.decompress(Files.readAllBytes(metadata.path)))))) {
= new BufferedReader(new InputStreamReader(
metadata.fileType.decompress(Files.newInputStream(metadata.path))))) {
return br.lines()
.map((String line) -> WebServerAccessLogLine.makeLine(line))
.collect(toList()).stream();