Modernize legacy module and rename it to bwhist.

Changes include using similar mechanisms for configuration, calling
the database aggregation function, querying the database, and writing
results as we're using in the ipv6servers and other modules.

Configuration options can now be changed via the following Java
properties:

bwhist.descriptors
bwhist.database
bwhist.history
bwhist.output

The legacy.config file, if one exists, will be ignored.

Part of #28116.
This commit is contained in:
Karsten Loesing 2018-11-14 10:39:24 +01:00
parent ca5fa45df0
commit f8fa108d18
8 changed files with 212 additions and 208 deletions

View File

@ -315,7 +315,7 @@
<antcall target="collectdescs" />
<antcall target="connbidirect" />
<antcall target="onionperf" />
<antcall target="legacy" />
<antcall target="bwhist" />
<antcall target="advbwdist" />
<antcall target="hidserv" />
<antcall target="clients" />
@ -340,39 +340,9 @@
<antcall target="run-java" />
</target>
<!-- Provides legacy.config file from template. -->
<target name="legacy-create-config" >
<copy file="${resources}/legacy.config.template"
tofile="${basedir}/legacy.config"/>
</target>
<!-- Expects legacy.config file in the base directory. -->
<target name="legacy" >
<property name="module.name" value="servers" />
<property name="localmoddir" value="${modulebase}/${module.name}" />
<property name="statsdir"
value="${localmoddir}/stats" />
<mkdir dir="${statsdir}" />
<copy file="${basedir}/legacy.config"
tofile="${localmoddir}/config"/>
<target name="bwhist" >
<property name="module.name" value="bwhist" />
<antcall target="run-java" />
<exec executable="psql"
dir="${localmoddir}"
failonerror="true" >
<arg value="--dbname=tordir"/>
<arg value="-c SELECT * FROM refresh_all();" />
</exec>
<exec executable="psql"
dir="${localmoddir}"
failonerror="true" >
<arg value="-c COPY (SELECT * FROM stats_bandwidth) TO STDOUT WITH CSV HEADER;" />
<arg value="--dbname=tordir"/>
<arg value="--output=${statsdir}/bandwidth.csv" />
</exec>
</target>
<target name="advbwdist">
@ -503,7 +473,7 @@
<fileset dir="${modulebase}/onionperf/stats" includes="*.csv" />
<fileset dir="${modulebase}/connbidirect/stats" includes="connbidirect2.csv" />
<fileset dir="${modulebase}/advbwdist/stats" includes="advbwdist.csv" />
<fileset dir="${modulebase}/servers/stats" includes="*.csv" />
<fileset dir="${modulebase}/bwhist/stats" includes="*.csv" />
<fileset dir="${modulebase}/hidserv/stats" includes="hidserv.csv" />
<fileset dir="${modulebase}/clients/stats"
includes="clients*.csv userstats-combined.csv" />

View File

@ -0,0 +1,18 @@
/* Copyright 2011--2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.stats.bwhist;
/** Configuration options parsed from Java properties with reasonable hard-coded
* defaults. */
public class Configuration {
static String descriptors = System.getProperty("bwhist.descriptors",
"../../shared/in/");
static String database = System.getProperty("bwhist.database",
"jdbc:postgresql:tordir");
static String history = System.getProperty("bwhist.history",
"status/read-descriptors");
static String output = System.getProperty("bwhist.output",
"stats/");
}

View File

@ -0,0 +1,56 @@
/* Copyright 2011--2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.stats.bwhist;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.file.Paths;
import java.util.Arrays;
/**
* Coordinate downloading and parsing of descriptors and extraction of
* statistically relevant data for later processing with R.
*/
public class Main {
private static Logger log = LoggerFactory.getLogger(Main.class);
private static String[][] paths = {
{"recent", "relay-descriptors", "consensuses"},
{"recent", "relay-descriptors", "extra-infos"},
{"archive", "relay-descriptors", "consensuses"},
{"archive", "relay-descriptors", "extra-infos"}};
/** Executes this data-processing module. */
public static void main(String[] args) throws Exception {
log.info("Starting bwhist module.");
log.info("Reading descriptors and inserting relevant parts into the "
+ "database.");
File[] descriptorDirectories = Arrays.stream(paths).map((String[] path)
-> Paths.get(Configuration.descriptors, path).toFile())
.toArray(File[]::new);
File historyFile = new File(Configuration.history);
RelayDescriptorDatabaseImporter database
= new RelayDescriptorDatabaseImporter(descriptorDirectories,
historyFile, Configuration.database);
database.importRelayDescriptors();
log.info("Aggregating database entries.");
database.aggregate();
log.info("Querying aggregated statistics from the database.");
new Writer().write(Paths.get(Configuration.output, "bandwidth.csv"),
database.queryBandwidth());
log.info("Closing database connection.");
database.closeConnection();
log.info("Terminating bwhist module.");
}
}

View File

@ -1,7 +1,7 @@
/* Copyright 2011--2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.stats.servers;
package org.torproject.metrics.stats.bwhist;
import org.torproject.descriptor.Descriptor;
import org.torproject.descriptor.DescriptorReader;
@ -20,6 +20,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@ -27,6 +28,7 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@ -108,22 +110,19 @@ public final class RelayDescriptorDatabaseImporter {
private boolean importIntoDatabase = true;
private List<File> archivesDirectories;
private File[] descriptorDirectories;
private File statsDirectory;
private File historyFile;
/**
* Initialize database importer by connecting to the database and
* preparing statements.
*/
public RelayDescriptorDatabaseImporter(String connectionUrl,
List<File> archivesDirectories, File statsDirectory) {
public RelayDescriptorDatabaseImporter(File[] descriptorDirectories,
File historyFile, String connectionUrl) {
if (archivesDirectories == null || statsDirectory == null) {
throw new IllegalArgumentException();
}
this.archivesDirectories = archivesDirectories;
this.statsDirectory = statsDirectory;
this.descriptorDirectories = descriptorDirectories;
this.historyFile = historyFile;
if (connectionUrl != null) {
try {
@ -520,29 +519,20 @@ public final class RelayDescriptorDatabaseImporter {
/** Imports relay descriptors into the database. */
public void importRelayDescriptors() {
log.info("Importing files in directories " + archivesDirectories
+ "/...");
if (!this.archivesDirectories.isEmpty()) {
DescriptorReader reader =
DescriptorSourceFactory.createDescriptorReader();
reader.setMaxDescriptorsInQueue(10);
File historyFile = new File(statsDirectory,
"database-importer-relay-descriptor-history");
reader.setHistoryFile(historyFile);
for (Descriptor descriptor : reader.readDescriptors(
this.archivesDirectories.toArray(
new File[this.archivesDirectories.size()]))) {
if (descriptor instanceof RelayNetworkStatusConsensus) {
this.addRelayNetworkStatusConsensus(
(RelayNetworkStatusConsensus) descriptor);
} else if (descriptor instanceof ExtraInfoDescriptor) {
this.addExtraInfoDescriptor((ExtraInfoDescriptor) descriptor);
}
DescriptorReader reader =
DescriptorSourceFactory.createDescriptorReader();
reader.setMaxDescriptorsInQueue(10);
reader.setHistoryFile(this.historyFile);
for (Descriptor descriptor : reader.readDescriptors(
this.descriptorDirectories)) {
if (descriptor instanceof RelayNetworkStatusConsensus) {
this.addRelayNetworkStatusConsensus(
(RelayNetworkStatusConsensus) descriptor);
} else if (descriptor instanceof ExtraInfoDescriptor) {
this.addExtraInfoDescriptor((ExtraInfoDescriptor) descriptor);
}
reader.saveHistoryFile(historyFile);
}
log.info("Finished importing relay descriptors.");
reader.saveHistoryFile(this.historyFile);
}
private void addRelayNetworkStatusConsensus(
@ -583,9 +573,9 @@ public final class RelayDescriptorDatabaseImporter {
}
/**
* Close the relay descriptor database connection.
* Commit any non-commited parts.
*/
public void closeConnection() {
public void commit() {
/* Log stats about imported descriptors. */
log.info("Finished importing relay descriptors: {} network status entries "
@ -609,21 +599,84 @@ public final class RelayDescriptorDatabaseImporter {
}
}
/* Commit any stragglers before closing. */
/* Commit any stragglers. */
if (this.conn != null) {
try {
this.csH.executeBatch();
this.conn.commit();
} catch (SQLException e) {
} catch (SQLException e) {
log.warn("Could not commit final records to database", e);
}
try {
this.conn.close();
} catch (SQLException e) {
log.warn("Could not close database connection.", e);
}
}
/** Call the refresh_all() function to aggregate newly imported data. */
void aggregate() throws SQLException {
Statement st = this.conn.createStatement();
st.executeQuery("SELECT refresh_all()");
}
/** Query the servers_platforms view. */
List<String[]> queryBandwidth() throws SQLException {
List<String[]> statistics = new ArrayList<>();
String columns = "date, isexit, isguard, bwread, bwwrite, dirread, "
+ "dirwrite";
statistics.add(columns.split(", "));
Statement st = this.conn.createStatement();
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"),
Locale.US);
String queryString = "SELECT " + columns + " FROM stats_bandwidth";
try (ResultSet rs = st.executeQuery(queryString)) {
while (rs.next()) {
String[] outputLine = new String[7];
outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString();
outputLine[1] = getBooleanFromResultSet(rs, "isexit");
outputLine[2] = getBooleanFromResultSet(rs, "isguard");
outputLine[3] = getLongFromResultSet(rs, "bwread");
outputLine[4] = getLongFromResultSet(rs, "bwwrite");
outputLine[5] = getLongFromResultSet(rs, "dirread");
outputLine[6] = getLongFromResultSet(rs, "dirwrite");
statistics.add(outputLine);
}
}
return statistics;
}
/** Retrieve the <code>boolean</code> value of the designated column in the
* current row of the given <code>ResultSet</code> object and format it as a
* <code>String</code> object with <code>"t"</code> for <code>true</code> and
* <code>"f"</code> for <code>false</code>, or return <code>null</code> if the
* retrieved value was <code>NULL</code>. */
private static String getBooleanFromResultSet(ResultSet rs,
String columnLabel) throws SQLException {
boolean result = rs.getBoolean(columnLabel);
if (rs.wasNull()) {
return null;
} else {
return result ? "t" : "f";
}
}
/** Retrieve the <code>long</code> value of the designated column in the
* current row of the given <code>ResultSet</code> object and format it as a
* <code>String</code> object, or return <code>null</code> if the retrieved
* value was <code>NULL</code>. */
private static String getLongFromResultSet(ResultSet rs, String columnLabel)
throws SQLException {
long result = rs.getLong(columnLabel);
return rs.wasNull() ? null : String.valueOf(result);
}
/**
* Close the relay descriptor database connection.
*/
public void closeConnection() {
try {
this.conn.close();
} catch (SQLException e) {
log.warn("Could not close database connection.", e);
}
}
}

View File

@ -0,0 +1,42 @@
/* Copyright 2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.stats.bwhist;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
/** Writer that takes output line objects and writes them to a file, preceded
* by a column header line. */
class Writer {
/** Write output lines to the given file. */
void write(Path filePath, Iterable<String[]> outputLines)
throws IOException {
File parentFile = filePath.toFile().getParentFile();
if (null != parentFile && !parentFile.exists()) {
if (!parentFile.mkdirs()) {
throw new IOException("Unable to create parent directory of output "
+ "file. Not writing this file.");
}
}
List<String> formattedOutputLines = new ArrayList<>();
for (String[] outputLine : outputLines) {
StringBuilder formattedOutputLine = new StringBuilder();
for (String outputLinePart : outputLine) {
formattedOutputLine.append(',');
if (null != outputLinePart) {
formattedOutputLine.append(outputLinePart);
}
}
formattedOutputLines.add(formattedOutputLine.substring(1));
}
Files.write(filePath, formattedOutputLines, StandardCharsets.UTF_8);
}
}

View File

@ -1,87 +0,0 @@
/* Copyright 2011--2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.stats.servers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Initialize configuration with hard-coded defaults, overwrite with
* configuration in config file, if exists, and answer Main.java about our
* configuration.
*/
public class Configuration {
private static Logger log = LoggerFactory.getLogger(Configuration.class);
private List<File> directoryArchivesDirectories = new ArrayList<>();
private String relayDescriptorDatabaseJdbc =
"jdbc:postgresql://localhost/tordir?user=metrics&password=password";
/** Initializes this configuration class. */
public Configuration() {
/* Read config file, if present. */
File configFile = new File("config");
if (!configFile.exists()) {
log.warn("Could not find config file.");
return;
}
String line = null;
try (BufferedReader br = new BufferedReader(new FileReader(configFile))) {
while ((line = br.readLine()) != null) {
if (line.startsWith("DirectoryArchivesDirectory")) {
this.directoryArchivesDirectories.add(new File(line.split(" ")[1]));
} else if (line.startsWith("RelayDescriptorDatabaseJDBC")) {
this.relayDescriptorDatabaseJdbc = line.split(" ")[1];
} else if (!line.startsWith("#") && line.length() > 0) {
log.error("Configuration file contains unrecognized "
+ "configuration key in line '{}'! Exiting!", line);
System.exit(1);
}
}
} catch (ArrayIndexOutOfBoundsException e) {
log.warn("Configuration file contains configuration key without value in "
+ "line '{}'. Exiting!", line);
System.exit(1);
} catch (MalformedURLException e) {
log.warn("Configuration file contains illegal URL or IP:port pair in "
+ "line '{}'. Exiting!", line);
System.exit(1);
} catch (NumberFormatException e) {
log.warn("Configuration file contains illegal value in line '{}' with "
+ "legal values being 0 or 1. Exiting!", line);
System.exit(1);
} catch (IOException e) {
log.error("Unknown problem while reading config file! Exiting!", e);
System.exit(1);
}
}
/** Returns directories containing archived descriptors. */
public List<File> getDirectoryArchivesDirectories() {
if (this.directoryArchivesDirectories.isEmpty()) {
String prefix = "../../shared/in/recent/relay-descriptors/";
return Arrays.asList(new File(prefix + "consensuses/"),
new File(prefix + "extra-infos/"));
} else {
return this.directoryArchivesDirectories;
}
}
public String getRelayDescriptorDatabaseJdbc() {
return this.relayDescriptorDatabaseJdbc;
}
}

View File

@ -1,40 +0,0 @@
/* Copyright 2011--2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.stats.servers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
/**
* Coordinate downloading and parsing of descriptors and extraction of
* statistically relevant data for later processing with R.
*/
public class Main {
private static Logger log = LoggerFactory.getLogger(Main.class);
/** Executes this data-processing module. */
public static void main(String[] args) {
log.info("Starting ERNIE.");
// Initialize configuration
Configuration config = new Configuration();
// Define stats directory for temporary files
File statsDirectory = new File("stats");
// Import relay descriptors
RelayDescriptorDatabaseImporter rddi = new RelayDescriptorDatabaseImporter(
config.getRelayDescriptorDatabaseJdbc(),
config.getDirectoryArchivesDirectories(), statsDirectory);
rddi.importRelayDescriptors();
rddi.closeConnection();
log.info("Terminating ERNIE.");
}
}

View File

@ -1,8 +0,0 @@
## Relative paths to directories to import directory archives from
#DirectoryArchivesDirectory /srv/metrics.torproject.org/metrics/shared/in/recent/relay-descriptors/consensuses/
#DirectoryArchivesDirectory /srv/metrics.torproject.org/metrics/shared/in/recent/relay-descriptors/server-descriptors/
#DirectoryArchivesDirectory /srv/metrics.torproject.org/metrics/shared/in/recent/relay-descriptors/extra-infos/
#
## JDBC string for relay descriptor database
#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=metrics&password=password
#