diff --git a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java index 22a3113..3976919 100644 --- a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java +++ b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java @@ -90,62 +90,17 @@ public class ArchiveReader { filesInInputDir.add(f); } } else { - if (rdp != null) { - try { - BufferedInputStream bis = null; - if (keepImportHistory - && archivesImportHistory.contains(pop.getName())) { - ignoredFiles++; - continue; - } else if (pop.getName().endsWith(".tar.bz2")) { - logger.warn("Cannot parse compressed tarball " - + pop.getAbsolutePath() + ". Skipping."); - continue; - } else if (pop.getName().endsWith(".bz2")) { - FileInputStream fis = new FileInputStream(pop); - BZip2CompressorInputStream bcis = - new BZip2CompressorInputStream(fis); - bis = new BufferedInputStream(bcis); - } else { - FileInputStream fis = new FileInputStream(pop); - bis = new BufferedInputStream(fis); - } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - int len; - byte[] data = new byte[1024]; - while ((len = bis.read(data, 0, 1024)) >= 0) { - baos.write(data, 0, len); - } - bis.close(); - byte[] allData = baos.toByteArray(); - boolean stored = rdp.parse(allData); - if (!stored) { - filesToRetry.add(pop); - continue; - } - if (keepImportHistory) { - archivesImportHistory.add(pop.getName()); - } - parsedFiles++; - } catch (IOException e) { - problems.add(pop); - if (problems.size() > 3) { - break; - } - } - } - } - } - for (File pop : filesToRetry) { - /* TODO We need to parse microdescriptors ourselves, rather than - * RelayDescriptorParser, because only we know the valid-after - * time(s) of microdesc consensus(es) containing this - * microdescriptor. However, this breaks functional abstraction - * pretty badly. */ - if (rdp != null) { try { BufferedInputStream bis = null; - if (pop.getName().endsWith(".bz2")) { + if (keepImportHistory + && archivesImportHistory.contains(pop.getName())) { + ignoredFiles++; + continue; + } else if (pop.getName().endsWith(".tar.bz2")) { + logger.warn("Cannot parse compressed tarball " + + pop.getAbsolutePath() + ". Skipping."); + continue; + } else if (pop.getName().endsWith(".bz2")) { FileInputStream fis = new FileInputStream(pop); BZip2CompressorInputStream bcis = new BZip2CompressorInputStream(fis); @@ -162,72 +117,11 @@ public class ArchiveReader { } bis.close(); byte[] allData = baos.toByteArray(); - BufferedReader br = new BufferedReader(new StringReader( - new String(allData, "US-ASCII"))); - String line; - do { - line = br.readLine(); - } while (line != null && line.startsWith("@")); - br.close(); - if (line == null) { - logger.debug("We were given an empty descriptor for " - + "parsing. Ignoring."); + boolean stored = rdp.parse(allData); + if (!stored) { + filesToRetry.add(pop); continue; } - if (!line.equals("onion-key")) { - logger.debug("Skipping non-recognized descriptor."); - continue; - } - SimpleDateFormat parseFormat = - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - parseFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - String ascii = null; - try { - ascii = new String(allData, "US-ASCII"); - } catch (UnsupportedEncodingException e) { - /* No way that US-ASCII is not supported. */ - } - int start = -1; - int end = -1; - String startToken = "onion-key\n"; - while (end < ascii.length()) { - start = ascii.indexOf(startToken, end); - if (start < 0) { - break; - } - end = ascii.indexOf(startToken, start + 1); - if (end < 0) { - end = ascii.length(); - if (end <= start) { - break; - } - } - byte[] descBytes = new byte[end - start]; - System.arraycopy(allData, start, descBytes, 0, end - start); - String digest256Base64 = Base64.encodeBase64String( - DigestUtils.sha256(descBytes)).replaceAll("=", ""); - String digest256Hex = DigestUtils.sha256Hex(descBytes); - if (!this.microdescriptorValidAfterTimes.containsKey( - digest256Hex)) { - logger.debug("Could not store microdescriptor '" - + digest256Hex + "', which was not contained in a " - + "microdesc consensus."); - continue; - } - for (String validAfterTime : - this.microdescriptorValidAfterTimes.get(digest256Hex)) { - try { - long validAfter = - parseFormat.parse(validAfterTime).getTime(); - rdp.storeMicrodescriptor(descBytes, digest256Hex, - digest256Base64, validAfter); - } catch (ParseException e) { - logger.warn("Could not parse " - + "valid-after time '" + validAfterTime + "'. Not " - + "storing microdescriptor.", e); - } - } - } if (keepImportHistory) { archivesImportHistory.add(pop.getName()); } @@ -240,6 +134,108 @@ public class ArchiveReader { } } } + for (File pop : filesToRetry) { + /* TODO We need to parse microdescriptors ourselves, rather than + * RelayDescriptorParser, because only we know the valid-after + * time(s) of microdesc consensus(es) containing this + * microdescriptor. However, this breaks functional abstraction + * pretty badly. */ + try { + BufferedInputStream bis = null; + if (pop.getName().endsWith(".bz2")) { + FileInputStream fis = new FileInputStream(pop); + BZip2CompressorInputStream bcis = + new BZip2CompressorInputStream(fis); + bis = new BufferedInputStream(bcis); + } else { + FileInputStream fis = new FileInputStream(pop); + bis = new BufferedInputStream(fis); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int len; + byte[] data = new byte[1024]; + while ((len = bis.read(data, 0, 1024)) >= 0) { + baos.write(data, 0, len); + } + bis.close(); + byte[] allData = baos.toByteArray(); + BufferedReader br = new BufferedReader(new StringReader( + new String(allData, "US-ASCII"))); + String line; + do { + line = br.readLine(); + } while (line != null && line.startsWith("@")); + br.close(); + if (line == null) { + logger.debug("We were given an empty descriptor for " + + "parsing. Ignoring."); + continue; + } + if (!line.equals("onion-key")) { + logger.debug("Skipping non-recognized descriptor."); + continue; + } + SimpleDateFormat parseFormat = + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + parseFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + String ascii = null; + try { + ascii = new String(allData, "US-ASCII"); + } catch (UnsupportedEncodingException e) { + /* No way that US-ASCII is not supported. */ + } + int start = -1; + int end = -1; + String startToken = "onion-key\n"; + while (end < ascii.length()) { + start = ascii.indexOf(startToken, end); + if (start < 0) { + break; + } + end = ascii.indexOf(startToken, start + 1); + if (end < 0) { + end = ascii.length(); + if (end <= start) { + break; + } + } + byte[] descBytes = new byte[end - start]; + System.arraycopy(allData, start, descBytes, 0, end - start); + String digest256Base64 = Base64.encodeBase64String( + DigestUtils.sha256(descBytes)).replaceAll("=", ""); + String digest256Hex = DigestUtils.sha256Hex(descBytes); + if (!this.microdescriptorValidAfterTimes.containsKey( + digest256Hex)) { + logger.debug("Could not store microdescriptor '" + + digest256Hex + "', which was not contained in a " + + "microdesc consensus."); + continue; + } + for (String validAfterTime : + this.microdescriptorValidAfterTimes.get(digest256Hex)) { + try { + long validAfter = + parseFormat.parse(validAfterTime).getTime(); + rdp.storeMicrodescriptor(descBytes, digest256Hex, + digest256Base64, validAfter); + } catch (ParseException e) { + logger.warn("Could not parse " + + "valid-after time '" + validAfterTime + "'. Not " + + "storing microdescriptor.", e); + } + } + } + if (keepImportHistory) { + archivesImportHistory.add(pop.getName()); + } + parsedFiles++; + } catch (IOException e) { + problems.add(pop); + if (problems.size() > 3) { + break; + } + } + } if (problems.isEmpty()) { logger.debug("Finished importing files in directory " + archivesDirectory + "/."); diff --git a/src/main/java/org/torproject/metrics/collector/relaydescs/CachedRelayDescriptorReader.java b/src/main/java/org/torproject/metrics/collector/relaydescs/CachedRelayDescriptorReader.java index 5b5d218..737344b 100644 --- a/src/main/java/org/torproject/metrics/collector/relaydescs/CachedRelayDescriptorReader.java +++ b/src/main/java/org/torproject/metrics/collector/relaydescs/CachedRelayDescriptorReader.java @@ -134,17 +134,15 @@ public class CachedRelayDescriptorReader { /* Parse the cached consensus if we haven't parsed it before * (but regardless of whether it's stale or not). */ - if (rdp != null) { - String digest = Hex.encodeHexString(DigestUtils.sha1( - allData)); - if (!lastImportHistory.contains(digest) - && !currentImportHistory.contains(digest)) { - rdp.parse(allData); - } else { - dumpStats.append(" (skipped)"); - } - currentImportHistory.add(digest); + String digest = Hex.encodeHexString(DigestUtils.sha1( + allData)); + if (!lastImportHistory.contains(digest) + && !currentImportHistory.contains(digest)) { + rdp.parse(allData); + } else { + dumpStats.append(" (skipped)"); } + currentImportHistory.add(digest); } else if (f.getName().equals("v3-status-votes")) { int parsedNum = 0; int skippedNum = 0; @@ -161,18 +159,16 @@ public class CachedRelayDescriptorReader { byte[] rawNetworkStatusBytes = new byte[next - start]; System.arraycopy(allData, start, rawNetworkStatusBytes, 0, next - start); - if (rdp != null) { - String digest = Hex.encodeHexString(DigestUtils.sha1( - rawNetworkStatusBytes)); - if (!lastImportHistory.contains(digest) - && !currentImportHistory.contains(digest)) { - rdp.parse(rawNetworkStatusBytes); - parsedNum++; - } else { - skippedNum++; - } - currentImportHistory.add(digest); + String digest = Hex.encodeHexString(DigestUtils.sha1( + rawNetworkStatusBytes)); + if (!lastImportHistory.contains(digest) + && !currentImportHistory.contains(digest)) { + rdp.parse(rawNetworkStatusBytes); + parsedNum++; + } else { + skippedNum++; } + currentImportHistory.add(digest); } start = next; } @@ -208,18 +204,16 @@ public class CachedRelayDescriptorReader { end += endToken.length(); byte[] descBytes = new byte[end - start]; System.arraycopy(allData, start, descBytes, 0, end - start); - if (rdp != null) { - String digest = Hex.encodeHexString(DigestUtils.sha1( - descBytes)); - if (!lastImportHistory.contains(digest) - && !currentImportHistory.contains(digest)) { - rdp.parse(descBytes); - parsedNum++; - } else { - skippedNum++; - } - currentImportHistory.add(digest); + String digest = Hex.encodeHexString(DigestUtils.sha1( + descBytes)); + if (!lastImportHistory.contains(digest) + && !currentImportHistory.contains(digest)) { + rdp.parse(descBytes); + parsedNum++; + } else { + skippedNum++; } + currentImportHistory.add(digest); } dumpStats.append("\n" + f.getName() + ": parsed " + parsedNum + ", skipped " + skippedNum + " "