Remember processed files between module runs.

The three recently added modules to archive Snowflake statistics,
bridge pool assignments, and BridgeDB metrics have in common that they
process any input files regardless of whether they already processed
them before.

The problem is that the input files processed by these modules are
either never removed (Snowflake statistics) or only removed manually
by the operator (bridge pool assignments and BridgeDB statistics).

The effect is that non-recent BridgeDB metrics and bridge pool
assignments are being placed in the indexed/recent/ directory in the
next execution after they are deleted for being older than 72 hours.
The same would happen with Snowflake statistics after the operator
removes them from the out/ directory.

The fix is to use a state file containing file names of previously
processed files and only process a file not found in there. This is
the same approach as taken for bridge descriptor tarballs.
This commit is contained in:
Karsten Loesing 2020-01-07 13:30:28 +01:00
parent d2a74b676a
commit 741401a0da
5 changed files with 100 additions and 0 deletions

View File

@ -4,6 +4,9 @@
- Give up on periodically checking the configuration file for
updates and reloading it in case of changes.
- Avoid reprocessing webstats files.
- Remember processed files between module runs for archived
Snowflake statistics, bridge pool assignments, and BridgeDB
metrics.
* Minor changes
- Remove dependency on metrics-lib's internal package.

View File

@ -24,7 +24,9 @@ import java.nio.file.Paths;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.SortedSet;
import java.util.Stack;
import java.util.TreeSet;
public class BridgedbMetricsProcessor extends CollecTorMain {
@ -39,6 +41,11 @@ public class BridgedbMetricsProcessor extends CollecTorMain {
*/
private File inputDirectory;
/**
* File containing file names of previously parsed BridgeDB metrics files.
*/
private Path parsedBridgedbMetricsFile;
/**
* Directory for writing BridgeDB statistics files to be archived in tarballs.
*/
@ -88,11 +95,19 @@ public class BridgedbMetricsProcessor extends CollecTorMain {
protected void startProcessing() throws ConfigurationException {
logger.info("Starting BridgeDB statistics module of CollecTor.");
this.initializeConfiguration();
SortedSet<Path> previouslyProcessedFiles = this.readProcessedFiles(
this.parsedBridgedbMetricsFile);
SortedSet<Path> processedFiles = new TreeSet<>();
logger.info("Reading BridgeDB statistics files in {}.",
this.inputDirectory);
for (Descriptor descriptor
: DescriptorSourceFactory.createDescriptorReader()
.readDescriptors(this.inputDirectory)) {
processedFiles.add(descriptor.getDescriptorFile().toPath());
if (previouslyProcessedFiles.contains(
descriptor.getDescriptorFile().toPath())) {
continue;
}
if (descriptor instanceof BridgedbMetrics) {
BridgedbMetrics bridgedbMetrics = (BridgedbMetrics) descriptor;
BridgedbMetricsPersistence persistence
@ -114,6 +129,7 @@ public class BridgedbMetricsProcessor extends CollecTorMain {
}
logger.info("Cleaning up directory {} containing recent files.",
this.recentPathName);
this.writeProcessedFiles(this.parsedBridgedbMetricsFile, processedFiles);
this.cleanUpRsyncDirectory();
logger.info("Finished processing BridgeDB statistics file(s).");
}
@ -123,6 +139,8 @@ public class BridgedbMetricsProcessor extends CollecTorMain {
* storing them in instance attributes.
*/
private void initializeConfiguration() throws ConfigurationException {
this.parsedBridgedbMetricsFile = this.config.getPath(Key.StatsPath)
.resolve("processed-bridgedb-metrics");
this.outputPathName = config.getPath(Key.OutputPath).toString();
this.recentPathName = config.getPath(Key.RecentPath).toString();
this.inputDirectory =

View File

@ -25,6 +25,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.DateTimeException;
import java.time.Instant;
@ -36,8 +37,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.Stack;
import java.util.TreeMap;
import java.util.TreeSet;
public class BridgePoolAssignmentsProcessor extends CollecTorMain {
@ -53,6 +56,11 @@ public class BridgePoolAssignmentsProcessor extends CollecTorMain {
*/
private File assignmentsDirectory;
/**
* File containing file names of previously parsed assignments files.
*/
private Path parsedBridgePoolAssignmentsFile;
/**
* Directory containing sanitized bridge pool assignments for tarballs.
*/
@ -117,9 +125,16 @@ public class BridgePoolAssignmentsProcessor extends CollecTorMain {
protected void startProcessing() throws ConfigurationException {
logger.info("Starting bridge-pool-assignments module of CollecTor.");
this.initializeConfiguration();
SortedSet<Path> previouslyProcessedFiles = this.readProcessedFiles(
this.parsedBridgePoolAssignmentsFile);
SortedSet<Path> processedFiles = new TreeSet<>();
List<File> assignmentFiles = this.listAssignmentFiles();
LocalDateTime latestPublished = null;
for (File assignmentFile : assignmentFiles) {
processedFiles.add(assignmentFile.toPath());
if (previouslyProcessedFiles.contains(assignmentFile.toPath())) {
continue;
}
logger.info("Processing bridge pool assignment file '{}'...",
assignmentFile.getAbsolutePath());
SortedMap<LocalDateTime, SortedMap<String, String>>
@ -161,6 +176,8 @@ public class BridgePoolAssignmentsProcessor extends CollecTorMain {
+ "published at {}, which is more than 5:30 hours in the past.",
latestPublished);
}
this.writeProcessedFiles(this.parsedBridgePoolAssignmentsFile,
processedFiles);
this.cleanUpRsyncDirectory();
logger.info("Finished processing bridge pool assignment file(s).");
}
@ -170,6 +187,8 @@ public class BridgePoolAssignmentsProcessor extends CollecTorMain {
* storing them in instance attributes.
*/
private void initializeConfiguration() throws ConfigurationException {
this.parsedBridgePoolAssignmentsFile = this.config.getPath(Key.StatsPath)
.resolve("parsed-bridge-pool-assignments");
this.outputPathName = Paths.get(config.getPath(Key.OutputPath).toString(),
"bridge-pool-assignments").toString();
this.recentPathName = Paths.get(config.getPath(Key.RecentPath).toString(),

View File

@ -16,9 +16,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
public abstract class CollecTorMain extends SyncManager
@ -128,5 +133,47 @@ public abstract class CollecTorMain extends SyncManager
+ ioe.getMessage(), ioe);
}
}
/**
* Read file names of processed files from the given state file.
*
* @param stateFile State file to read file names from.
* @return File names of processed files.
*/
public SortedSet<Path> readProcessedFiles(Path stateFile) {
SortedSet<Path> processedFiles = new TreeSet<>();
if (Files.exists(stateFile)) {
try {
for (String line : Files.readAllLines(stateFile)) {
processedFiles.add(Paths.get(line));
}
} catch (IOException e) {
logger.warn("I/O error while reading processed files.", e);
}
}
return processedFiles;
}
/**
* Write file names of processed files to the state file.
*
* @param stateFile State file to write file names to.
* @param processedFiles File names of processed files.
*/
public void writeProcessedFiles(Path stateFile,
SortedSet<Path> processedFiles) {
List<String> lines = new ArrayList<>();
for (Path processedFile : processedFiles) {
lines.add(processedFile.toString());
}
try {
if (!Files.exists(stateFile)) {
Files.createDirectories(stateFile.getParent());
}
Files.write(stateFile, lines);
} catch (IOException e) {
logger.warn("I/O error while writing processed files.", e);
}
}
}

View File

@ -23,6 +23,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.SortedSet;
@ -71,6 +73,11 @@ public class SnowflakeStatsDownloader extends CollecTorMain {
}
logger.debug("Finished downloading {}.", url);
Path parsedSnowflakeStatsFile = this.config.getPath(Key.StatsPath)
.resolve("processed-snowflake-stats");
SortedSet<Path> previouslyProcessedFiles = this.readProcessedFiles(
parsedSnowflakeStatsFile);
SortedSet<Path> processedFiles = new TreeSet<>();
DescriptorParser descriptorParser =
DescriptorSourceFactory.createDescriptorParser();
SortedSet<LocalDateTime> snowflakeStatsEnds = new TreeSet<>();
@ -85,6 +92,11 @@ public class SnowflakeStatsDownloader extends CollecTorMain {
= new SnowflakeStatsPersistence(snowflakeStats);
File tarballFile = new File(outputPathName + "/"
+ persistence.getStoragePath());
Path relativeFileName = Paths.get(tarballFile.getName());
processedFiles.add(relativeFileName);
if (previouslyProcessedFiles.contains(relativeFileName)) {
continue;
}
if (tarballFile.exists()) {
continue;
}
@ -106,6 +118,7 @@ public class SnowflakeStatsDownloader extends CollecTorMain {
snowflakeStatsEnds.last());
}
this.writeProcessedFiles(parsedSnowflakeStatsFile, processedFiles);
this.cleanUpRsyncDirectory();
}