From f8fa108d183968540eca529250cb142f8216ce8c Mon Sep 17 00:00:00 2001 From: Karsten Loesing Date: Wed, 14 Nov 2018 10:39:24 +0100 Subject: [PATCH] Modernize legacy module and rename it to bwhist. Changes include using similar mechanisms for configuration, calling the database aggregation function, querying the database, and writing results as we're using in the ipv6servers and other modules. Configuration options can now be changed via the following Java properties: bwhist.descriptors bwhist.database bwhist.history bwhist.output The legacy.config file, if one exists, will be ignored. Part of #28116. --- build.xml | 38 +---- .../metrics/stats/bwhist/Configuration.java | 18 +++ .../torproject/metrics/stats/bwhist/Main.java | 56 ++++++++ .../RelayDescriptorDatabaseImporter.java | 131 ++++++++++++------ .../metrics/stats/bwhist/Writer.java | 42 ++++++ .../metrics/stats/servers/Configuration.java | 87 ------------ .../metrics/stats/servers/Main.java | 40 ------ src/main/resources/legacy.config.template | 8 -- 8 files changed, 212 insertions(+), 208 deletions(-) create mode 100644 src/main/java/org/torproject/metrics/stats/bwhist/Configuration.java create mode 100644 src/main/java/org/torproject/metrics/stats/bwhist/Main.java rename src/main/java/org/torproject/metrics/stats/{servers => bwhist}/RelayDescriptorDatabaseImporter.java (84%) create mode 100644 src/main/java/org/torproject/metrics/stats/bwhist/Writer.java delete mode 100644 src/main/java/org/torproject/metrics/stats/servers/Configuration.java delete mode 100644 src/main/java/org/torproject/metrics/stats/servers/Main.java delete mode 100644 src/main/resources/legacy.config.template diff --git a/build.xml b/build.xml index b95550d..a391416 100644 --- a/build.xml +++ b/build.xml @@ -315,7 +315,7 @@ - + @@ -340,39 +340,9 @@ - - - - - - - - - - - - - - + + - - - - - - - - - - - @@ -503,7 +473,7 @@ - + diff --git a/src/main/java/org/torproject/metrics/stats/bwhist/Configuration.java b/src/main/java/org/torproject/metrics/stats/bwhist/Configuration.java new file mode 100644 index 0000000..2a0fbc5 --- /dev/null +++ b/src/main/java/org/torproject/metrics/stats/bwhist/Configuration.java @@ -0,0 +1,18 @@ +/* Copyright 2011--2018 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.metrics.stats.bwhist; + +/** Configuration options parsed from Java properties with reasonable hard-coded + * defaults. */ +public class Configuration { + static String descriptors = System.getProperty("bwhist.descriptors", + "../../shared/in/"); + static String database = System.getProperty("bwhist.database", + "jdbc:postgresql:tordir"); + static String history = System.getProperty("bwhist.history", + "status/read-descriptors"); + static String output = System.getProperty("bwhist.output", + "stats/"); +} + diff --git a/src/main/java/org/torproject/metrics/stats/bwhist/Main.java b/src/main/java/org/torproject/metrics/stats/bwhist/Main.java new file mode 100644 index 0000000..61c1435 --- /dev/null +++ b/src/main/java/org/torproject/metrics/stats/bwhist/Main.java @@ -0,0 +1,56 @@ +/* Copyright 2011--2018 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.metrics.stats.bwhist; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.file.Paths; +import java.util.Arrays; + +/** + * Coordinate downloading and parsing of descriptors and extraction of + * statistically relevant data for later processing with R. + */ +public class Main { + + private static Logger log = LoggerFactory.getLogger(Main.class); + + private static String[][] paths = { + {"recent", "relay-descriptors", "consensuses"}, + {"recent", "relay-descriptors", "extra-infos"}, + {"archive", "relay-descriptors", "consensuses"}, + {"archive", "relay-descriptors", "extra-infos"}}; + + /** Executes this data-processing module. */ + public static void main(String[] args) throws Exception { + + log.info("Starting bwhist module."); + + log.info("Reading descriptors and inserting relevant parts into the " + + "database."); + File[] descriptorDirectories = Arrays.stream(paths).map((String[] path) + -> Paths.get(Configuration.descriptors, path).toFile()) + .toArray(File[]::new); + File historyFile = new File(Configuration.history); + RelayDescriptorDatabaseImporter database + = new RelayDescriptorDatabaseImporter(descriptorDirectories, + historyFile, Configuration.database); + database.importRelayDescriptors(); + + log.info("Aggregating database entries."); + database.aggregate(); + + log.info("Querying aggregated statistics from the database."); + new Writer().write(Paths.get(Configuration.output, "bandwidth.csv"), + database.queryBandwidth()); + + log.info("Closing database connection."); + database.closeConnection(); + + log.info("Terminating bwhist module."); + } +} + diff --git a/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java b/src/main/java/org/torproject/metrics/stats/bwhist/RelayDescriptorDatabaseImporter.java similarity index 84% rename from src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java rename to src/main/java/org/torproject/metrics/stats/bwhist/RelayDescriptorDatabaseImporter.java index d1ae43c..a6cf0cc 100644 --- a/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java +++ b/src/main/java/org/torproject/metrics/stats/bwhist/RelayDescriptorDatabaseImporter.java @@ -1,7 +1,7 @@ /* Copyright 2011--2018 The Tor Project * See LICENSE for licensing information */ -package org.torproject.metrics.stats.servers; +package org.torproject.metrics.stats.bwhist; import org.torproject.descriptor.Descriptor; import org.torproject.descriptor.DescriptorReader; @@ -20,6 +20,7 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -27,6 +28,7 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -108,22 +110,19 @@ public final class RelayDescriptorDatabaseImporter { private boolean importIntoDatabase = true; - private List archivesDirectories; + private File[] descriptorDirectories; - private File statsDirectory; + private File historyFile; /** * Initialize database importer by connecting to the database and * preparing statements. */ - public RelayDescriptorDatabaseImporter(String connectionUrl, - List archivesDirectories, File statsDirectory) { + public RelayDescriptorDatabaseImporter(File[] descriptorDirectories, + File historyFile, String connectionUrl) { - if (archivesDirectories == null || statsDirectory == null) { - throw new IllegalArgumentException(); - } - this.archivesDirectories = archivesDirectories; - this.statsDirectory = statsDirectory; + this.descriptorDirectories = descriptorDirectories; + this.historyFile = historyFile; if (connectionUrl != null) { try { @@ -520,29 +519,20 @@ public final class RelayDescriptorDatabaseImporter { /** Imports relay descriptors into the database. */ public void importRelayDescriptors() { - log.info("Importing files in directories " + archivesDirectories - + "/..."); - if (!this.archivesDirectories.isEmpty()) { - DescriptorReader reader = - DescriptorSourceFactory.createDescriptorReader(); - reader.setMaxDescriptorsInQueue(10); - File historyFile = new File(statsDirectory, - "database-importer-relay-descriptor-history"); - reader.setHistoryFile(historyFile); - for (Descriptor descriptor : reader.readDescriptors( - this.archivesDirectories.toArray( - new File[this.archivesDirectories.size()]))) { - if (descriptor instanceof RelayNetworkStatusConsensus) { - this.addRelayNetworkStatusConsensus( - (RelayNetworkStatusConsensus) descriptor); - } else if (descriptor instanceof ExtraInfoDescriptor) { - this.addExtraInfoDescriptor((ExtraInfoDescriptor) descriptor); - } + DescriptorReader reader = + DescriptorSourceFactory.createDescriptorReader(); + reader.setMaxDescriptorsInQueue(10); + reader.setHistoryFile(this.historyFile); + for (Descriptor descriptor : reader.readDescriptors( + this.descriptorDirectories)) { + if (descriptor instanceof RelayNetworkStatusConsensus) { + this.addRelayNetworkStatusConsensus( + (RelayNetworkStatusConsensus) descriptor); + } else if (descriptor instanceof ExtraInfoDescriptor) { + this.addExtraInfoDescriptor((ExtraInfoDescriptor) descriptor); } - reader.saveHistoryFile(historyFile); } - - log.info("Finished importing relay descriptors."); + reader.saveHistoryFile(this.historyFile); } private void addRelayNetworkStatusConsensus( @@ -583,9 +573,9 @@ public final class RelayDescriptorDatabaseImporter { } /** - * Close the relay descriptor database connection. + * Commit any non-commited parts. */ - public void closeConnection() { + public void commit() { /* Log stats about imported descriptors. */ log.info("Finished importing relay descriptors: {} network status entries " @@ -609,21 +599,84 @@ public final class RelayDescriptorDatabaseImporter { } } - /* Commit any stragglers before closing. */ + /* Commit any stragglers. */ if (this.conn != null) { try { this.csH.executeBatch(); this.conn.commit(); - } catch (SQLException e) { + } catch (SQLException e) { log.warn("Could not commit final records to database", e); } - try { - this.conn.close(); - } catch (SQLException e) { - log.warn("Could not close database connection.", e); + } + } + + /** Call the refresh_all() function to aggregate newly imported data. */ + void aggregate() throws SQLException { + Statement st = this.conn.createStatement(); + st.executeQuery("SELECT refresh_all()"); + } + + /** Query the servers_platforms view. */ + List queryBandwidth() throws SQLException { + List statistics = new ArrayList<>(); + String columns = "date, isexit, isguard, bwread, bwwrite, dirread, " + + "dirwrite"; + statistics.add(columns.split(", ")); + Statement st = this.conn.createStatement(); + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), + Locale.US); + String queryString = "SELECT " + columns + " FROM stats_bandwidth"; + try (ResultSet rs = st.executeQuery(queryString)) { + while (rs.next()) { + String[] outputLine = new String[7]; + outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString(); + outputLine[1] = getBooleanFromResultSet(rs, "isexit"); + outputLine[2] = getBooleanFromResultSet(rs, "isguard"); + outputLine[3] = getLongFromResultSet(rs, "bwread"); + outputLine[4] = getLongFromResultSet(rs, "bwwrite"); + outputLine[5] = getLongFromResultSet(rs, "dirread"); + outputLine[6] = getLongFromResultSet(rs, "dirwrite"); + statistics.add(outputLine); } } + return statistics; + } + + /** Retrieve the boolean value of the designated column in the + * current row of the given ResultSet object and format it as a + * String object with "t" for true and + * "f" for false, or return null if the + * retrieved value was NULL. */ + private static String getBooleanFromResultSet(ResultSet rs, + String columnLabel) throws SQLException { + boolean result = rs.getBoolean(columnLabel); + if (rs.wasNull()) { + return null; + } else { + return result ? "t" : "f"; + } + } + + /** Retrieve the long value of the designated column in the + * current row of the given ResultSet object and format it as a + * String object, or return null if the retrieved + * value was NULL. */ + private static String getLongFromResultSet(ResultSet rs, String columnLabel) + throws SQLException { + long result = rs.getLong(columnLabel); + return rs.wasNull() ? null : String.valueOf(result); + } + + /** + * Close the relay descriptor database connection. + */ + public void closeConnection() { + try { + this.conn.close(); + } catch (SQLException e) { + log.warn("Could not close database connection.", e); + } } } diff --git a/src/main/java/org/torproject/metrics/stats/bwhist/Writer.java b/src/main/java/org/torproject/metrics/stats/bwhist/Writer.java new file mode 100644 index 0000000..1ac1fd9 --- /dev/null +++ b/src/main/java/org/torproject/metrics/stats/bwhist/Writer.java @@ -0,0 +1,42 @@ +/* Copyright 2018 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.metrics.stats.bwhist; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** Writer that takes output line objects and writes them to a file, preceded + * by a column header line. */ +class Writer { + + /** Write output lines to the given file. */ + void write(Path filePath, Iterable outputLines) + throws IOException { + File parentFile = filePath.toFile().getParentFile(); + if (null != parentFile && !parentFile.exists()) { + if (!parentFile.mkdirs()) { + throw new IOException("Unable to create parent directory of output " + + "file. Not writing this file."); + } + } + List formattedOutputLines = new ArrayList<>(); + for (String[] outputLine : outputLines) { + StringBuilder formattedOutputLine = new StringBuilder(); + for (String outputLinePart : outputLine) { + formattedOutputLine.append(','); + if (null != outputLinePart) { + formattedOutputLine.append(outputLinePart); + } + } + formattedOutputLines.add(formattedOutputLine.substring(1)); + } + Files.write(filePath, formattedOutputLines, StandardCharsets.UTF_8); + } +} + diff --git a/src/main/java/org/torproject/metrics/stats/servers/Configuration.java b/src/main/java/org/torproject/metrics/stats/servers/Configuration.java deleted file mode 100644 index b6ee397..0000000 --- a/src/main/java/org/torproject/metrics/stats/servers/Configuration.java +++ /dev/null @@ -1,87 +0,0 @@ -/* Copyright 2011--2018 The Tor Project - * See LICENSE for licensing information */ - -package org.torproject.metrics.stats.servers; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.net.MalformedURLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Initialize configuration with hard-coded defaults, overwrite with - * configuration in config file, if exists, and answer Main.java about our - * configuration. - */ -public class Configuration { - - private static Logger log = LoggerFactory.getLogger(Configuration.class); - - private List directoryArchivesDirectories = new ArrayList<>(); - - private String relayDescriptorDatabaseJdbc = - "jdbc:postgresql://localhost/tordir?user=metrics&password=password"; - - /** Initializes this configuration class. */ - public Configuration() { - - /* Read config file, if present. */ - File configFile = new File("config"); - if (!configFile.exists()) { - log.warn("Could not find config file."); - return; - } - String line = null; - try (BufferedReader br = new BufferedReader(new FileReader(configFile))) { - while ((line = br.readLine()) != null) { - if (line.startsWith("DirectoryArchivesDirectory")) { - this.directoryArchivesDirectories.add(new File(line.split(" ")[1])); - } else if (line.startsWith("RelayDescriptorDatabaseJDBC")) { - this.relayDescriptorDatabaseJdbc = line.split(" ")[1]; - } else if (!line.startsWith("#") && line.length() > 0) { - log.error("Configuration file contains unrecognized " - + "configuration key in line '{}'! Exiting!", line); - System.exit(1); - } - } - } catch (ArrayIndexOutOfBoundsException e) { - log.warn("Configuration file contains configuration key without value in " - + "line '{}'. Exiting!", line); - System.exit(1); - } catch (MalformedURLException e) { - log.warn("Configuration file contains illegal URL or IP:port pair in " - + "line '{}'. Exiting!", line); - System.exit(1); - } catch (NumberFormatException e) { - log.warn("Configuration file contains illegal value in line '{}' with " - + "legal values being 0 or 1. Exiting!", line); - System.exit(1); - } catch (IOException e) { - log.error("Unknown problem while reading config file! Exiting!", e); - System.exit(1); - } - } - - /** Returns directories containing archived descriptors. */ - public List getDirectoryArchivesDirectories() { - if (this.directoryArchivesDirectories.isEmpty()) { - String prefix = "../../shared/in/recent/relay-descriptors/"; - return Arrays.asList(new File(prefix + "consensuses/"), - new File(prefix + "extra-infos/")); - } else { - return this.directoryArchivesDirectories; - } - } - - public String getRelayDescriptorDatabaseJdbc() { - return this.relayDescriptorDatabaseJdbc; - } -} - diff --git a/src/main/java/org/torproject/metrics/stats/servers/Main.java b/src/main/java/org/torproject/metrics/stats/servers/Main.java deleted file mode 100644 index 1454418..0000000 --- a/src/main/java/org/torproject/metrics/stats/servers/Main.java +++ /dev/null @@ -1,40 +0,0 @@ -/* Copyright 2011--2018 The Tor Project - * See LICENSE for licensing information */ - -package org.torproject.metrics.stats.servers; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; - -/** - * Coordinate downloading and parsing of descriptors and extraction of - * statistically relevant data for later processing with R. - */ -public class Main { - - private static Logger log = LoggerFactory.getLogger(Main.class); - - /** Executes this data-processing module. */ - public static void main(String[] args) { - - log.info("Starting ERNIE."); - - // Initialize configuration - Configuration config = new Configuration(); - - // Define stats directory for temporary files - File statsDirectory = new File("stats"); - - // Import relay descriptors - RelayDescriptorDatabaseImporter rddi = new RelayDescriptorDatabaseImporter( - config.getRelayDescriptorDatabaseJdbc(), - config.getDirectoryArchivesDirectories(), statsDirectory); - rddi.importRelayDescriptors(); - rddi.closeConnection(); - - log.info("Terminating ERNIE."); - } -} - diff --git a/src/main/resources/legacy.config.template b/src/main/resources/legacy.config.template deleted file mode 100644 index e2e0dac..0000000 --- a/src/main/resources/legacy.config.template +++ /dev/null @@ -1,8 +0,0 @@ -## Relative paths to directories to import directory archives from -#DirectoryArchivesDirectory /srv/metrics.torproject.org/metrics/shared/in/recent/relay-descriptors/consensuses/ -#DirectoryArchivesDirectory /srv/metrics.torproject.org/metrics/shared/in/recent/relay-descriptors/server-descriptors/ -#DirectoryArchivesDirectory /srv/metrics.torproject.org/metrics/shared/in/recent/relay-descriptors/extra-infos/ -# -## JDBC string for relay descriptor database -#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=metrics&password=password -#