Remove dependency on metrics-lib's log package (1/4).

- Copy types from metrics-lib to this code base.
 - Update package and import statements.
 - Copy remaining parts of metrics-lib's FileType.
This commit is contained in:
Karsten Loesing 2019-11-23 17:13:22 +01:00
parent cc3aa57e57
commit 859476ecae
9 changed files with 647 additions and 6 deletions

View File

@ -4,8 +4,8 @@
package org.torproject.metrics.collector.persist;
import org.torproject.descriptor.WebServerAccessLog;
import org.torproject.descriptor.log.InternalWebServerAccessLog;
import org.torproject.metrics.collector.webstats.FileType;
import org.torproject.metrics.collector.webstats.InternalWebServerAccessLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -12,6 +12,8 @@ import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
@ -68,11 +70,53 @@ public enum FileType {
.newInstance(os);
}
/**
* Compresses the given bytes in memory and returns the compressed bytes.
*/
public byte[] compress(byte[] bytes) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (OutputStream os = this.outputStream(baos)) {
os.write(bytes);
os.flush();
}
return baos.toByteArray();
}
/**
* Compresses the given InputStream and returns an OutputStream.
*/
public OutputStream compress(OutputStream os) throws Exception {
return this.outputStream(os);
}
/**
* Decompresses the given InputStream and returns an OutputStream.
*/
public InputStream decompress(InputStream is) throws Exception {
return this.inputStream(is);
}
/**
* Decompresses the given bytes in memory and returns the decompressed bytes.
*
* @since 2.2.0
*/
public byte[] decompress(byte[] bytes) throws Exception {
if (0 == bytes.length) {
return bytes;
}
try (InputStream is
= this.inputStream(new ByteArrayInputStream(bytes));
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
int readByte = is.read();
while (readByte > 0) {
baos.write(readByte);
readByte = is.read();
}
baos.flush();
return baos.toByteArray();
}
}
}

View File

@ -0,0 +1,63 @@
/* Copyright 2017--2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.collector.webstats;
import org.torproject.descriptor.DescriptorParseException;
import org.torproject.descriptor.LogDescriptor;
/**
* This interface provides methods for internal use only.
*
* @since 2.2.0
*/
public interface InternalLogDescriptor extends LogDescriptor {
/** Logfile name parts separator. */
String SEP = "_";
/**
* Validate log lines.
*
* @since 2.2.0
*/
void validate() throws DescriptorParseException;
/**
* Set the {@code Validator} that will perform the validation on log
* lines.
*
* <p>Usually set by the implementing class.</p>
*
* @since 2.2.0
*/
void setValidator(Validator validator);
/**
* Set the descriptor's bytes.
*
* @since 2.2.0
*/
void setRawDescriptorBytes(byte[] bytes);
/** Return the descriptor's preferred compression. */
String getCompressionType();
/**
* Provides a single function for validating a single log line.
*
* @since 2.2.0
*/
interface Validator {
/**
* Verifies a log line.
*
* @since 2.2.0
*/
boolean validate(String line);
}
}

View File

@ -0,0 +1,17 @@
/* Copyright 2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.collector.webstats;
/**
* This interface provides methods for internal use only.
*
* @since 2.2.0
*/
public interface InternalWebServerAccessLog extends InternalLogDescriptor {
/** The log's name should include this string. */
String MARKER = "access.log";
}

View File

@ -0,0 +1,166 @@
/* Copyright 2017--2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.collector.webstats;
import org.torproject.descriptor.Descriptor;
import org.torproject.descriptor.DescriptorParseException;
import org.torproject.descriptor.LogDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Base class for log descriptors.
*
* @since 2.2.0
*/
public abstract class LogDescriptorImpl
implements LogDescriptor, InternalLogDescriptor {
/** The log's file name should contain this string. */
public static final String MARKER = ".log";
private static final int unrecognizedLinesLimit = 3;
private static final Logger log
= LoggerFactory.getLogger(LogDescriptorImpl.class);
private static Pattern filenamePattern = Pattern.compile(
"(?:\\S*)" + MARKER + SEP + "(?:[0-9a-zA-Z]*)(?:\\.?)([a-zA-Z2]*)");
private final File descriptorFile;
/** Byte array for plain, i.e. uncompressed, log data. */
private byte[] logBytes;
private FileType fileType;
private List<String> unrecognizedLines = new ArrayList<>();
private Validator validator = (String line) -> true;
/**
* This constructor performs basic operations on the given bytes.
*
* <p>An unknown compression type (see {@link #getCompressionType})
* is interpreted as missing compression. In this case the bytes
* will be compressed to the given compression type.</p>
*
* @since 2.2.0
*/
protected LogDescriptorImpl(byte[] logBytes, File descriptorFile,
String logName, FileType defaultCompression)
throws DescriptorParseException {
this.logBytes = logBytes;
this.descriptorFile = descriptorFile;
try {
Matcher mat = filenamePattern.matcher(logName);
if (!mat.find()) {
throw new DescriptorParseException(
"Log file name doesn't comply to standard: " + logName);
}
this.fileType = FileType.findType(mat.group(1).toUpperCase());
if (FileType.PLAIN == this.fileType) {
this.fileType = defaultCompression;
this.logBytes = this.fileType.compress(this.logBytes);
}
} catch (Exception ex) {
throw new DescriptorParseException("Cannot parse file "
+ logName + " from file " + descriptorFile.getName(), ex);
}
}
@Override
public InputStream decompressedByteStream() throws DescriptorParseException {
try {
return this.fileType.decompress(new ByteArrayInputStream(this.logBytes));
} catch (Exception ex) {
throw new DescriptorParseException("Cannot provide deflated stream of "
+ this.descriptorFile + ".", ex);
}
}
@Override
public void validate() throws DescriptorParseException {
try (BufferedReader br = new BufferedReader(
new InputStreamReader(decompressedByteStream()))) {
this.unrecognizedLines.addAll(br.lines().parallel().filter((line)
-> null != line && !line.isEmpty() && !validator.validate(line))
.limit(unrecognizedLinesLimit).collect(Collectors.toList()));
} catch (Exception ex) {
throw new DescriptorParseException("Cannot validate log lines.", ex);
}
}
/**
* Assemble a LogDescriptor.
*
* @since 2.2.0
*/
public static List<Descriptor> parse(byte[] logBytes,
File descriptorFile, String logName) throws DescriptorParseException {
if (logName.contains(InternalWebServerAccessLog.MARKER)) {
return Arrays.asList(new Descriptor[]{
new WebServerAccessLogImpl(logBytes, descriptorFile, logName)});
} else {
throw new DescriptorParseException("Cannot parse file " + logName
+ " from file " + descriptorFile.getName());
}
}
@Override
public void setValidator(Validator validator) {
this.validator = validator;
}
@Override
public String getCompressionType() {
return this.fileType.name().toLowerCase();
}
@Override
public byte[] getRawDescriptorBytes() {
return this.logBytes;
}
@Override
public void setRawDescriptorBytes(byte[] bytes) {
this.logBytes = bytes;
}
@Override
public int getRawDescriptorLength() {
return this.logBytes.length;
}
@Override
public List<String> getAnnotations() {
return Collections.emptyList();
}
@Override
public List<String> getUnrecognizedLines() {
return this.unrecognizedLines;
}
@Override
public File getDescriptorFile() {
return descriptorFile;
}
}

View File

@ -3,7 +3,7 @@
package org.torproject.metrics.collector.webstats;
import static org.torproject.descriptor.log.WebServerAccessLogImpl.MARKER;
import static org.torproject.metrics.collector.webstats.WebServerAccessLogImpl.MARKER;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -12,10 +12,6 @@ import static java.util.stream.Collectors.summingLong;
import org.torproject.descriptor.DescriptorParseException;
import org.torproject.descriptor.Method;
import org.torproject.descriptor.WebServerAccessLog;
import org.torproject.descriptor.log.InternalLogDescriptor;
import org.torproject.descriptor.log.InternalWebServerAccessLog;
import org.torproject.descriptor.log.WebServerAccessLogImpl;
import org.torproject.descriptor.log.WebServerAccessLogLine;
import org.torproject.metrics.collector.conf.Configuration;
import org.torproject.metrics.collector.conf.Key;
import org.torproject.metrics.collector.conf.SourceType;

View File

@ -0,0 +1,173 @@
/* Copyright 2017--2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.collector.webstats;
import org.torproject.descriptor.DescriptorParseException;
import org.torproject.descriptor.WebServerAccessLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
/**
* Implementation of web server access log descriptors.
*
* <p>Defines sanitization and validation for web server access logs.</p>
*
* @since 2.2.0
*/
public class WebServerAccessLogImpl extends LogDescriptorImpl
implements InternalWebServerAccessLog, WebServerAccessLog {
private static final Logger log
= LoggerFactory.getLogger(WebServerAccessLogImpl.class);
/** The log's name should include this string. */
public static final String MARKER = InternalWebServerAccessLog.MARKER;
/** The mandatory web server log descriptor file name pattern. */
public static final Pattern filenamePattern
= Pattern.compile("(\\S*)" + SEP + "(\\S*)" + SEP + "" + MARKER
+ SEP + "(\\d*)(?:\\.?)([a-zA-Z]*)");
private final String physicalHost;
private final String virtualHost;
private final LocalDate logDate;
private boolean validate = true;
/**
* Creates a WebServerAccessLog from the given bytes and filename.
*
* <p>The given bytes are read, whereas the file is not read.</p>
*
* <p>The path of the given file has to be compliant to the following
* naming pattern
* {@code
* <virtualHost>-<physicalHost>-access.log-<yyyymmdd>.<compression>},
* where an unknown compression type (see {@link #getCompressionType})
* is interpreted as missing compression. In this case the bytes
* will be compressed to the default compression type.
* The immediate parent name is taken to be the physical host collecting the
* logs.</p>
*/
protected WebServerAccessLogImpl(byte[] logBytes, File file, String logName)
throws DescriptorParseException {
this(logBytes, file, logName, FileType.XZ);
}
/** For internal use only. */
public WebServerAccessLogImpl(byte[] bytes, String filename,
boolean validate) throws DescriptorParseException {
this(bytes, null, filename, FileType.XZ, validate);
}
/** For internal use only. */
public WebServerAccessLogImpl(byte[] bytes, File sourceFile, String filename,
boolean validate) throws DescriptorParseException {
this(bytes, sourceFile, filename, FileType.XZ, validate);
}
private WebServerAccessLogImpl(byte[] logBytes, File file, String logName,
FileType defaultCompression) throws DescriptorParseException {
this(logBytes, file, logName, defaultCompression, true);
}
private WebServerAccessLogImpl(byte[] logBytes, File file, String logName,
FileType defaultCompression, boolean validate)
throws DescriptorParseException {
super(logBytes, file, logName, defaultCompression);
try {
String fn = Paths.get(logName).getFileName().toString();
Matcher mat = filenamePattern.matcher(fn);
if (!mat.find()) {
throw new DescriptorParseException(
"WebServerAccessLog file name doesn't comply to standard: " + fn);
}
this.virtualHost = mat.group(1);
this.physicalHost = mat.group(2);
if (null == this.virtualHost || null == this.physicalHost
|| this.virtualHost.isEmpty() || this.physicalHost.isEmpty()) {
throw new DescriptorParseException(
"WebServerAccessLog file name doesn't comply to standard: " + fn);
}
String ymd = mat.group(3);
this.logDate = LocalDate.parse(ymd, DateTimeFormatter.BASIC_ISO_DATE);
this.setValidator((line)
-> WebServerAccessLogLine.makeLine(line).isValid());
if (validate) {
this.validate();
}
} catch (DescriptorParseException dpe) {
throw dpe; // escalate
} catch (Exception pe) {
throw new DescriptorParseException(
"Cannot parse WebServerAccessLog file: " + logName, pe);
}
}
@Override
public String getPhysicalHost() {
return this.physicalHost;
}
@Override
public String getVirtualHost() {
return this.virtualHost;
}
@Override
public LocalDate getLogDate() {
return this.logDate;
}
private static final int LISTLIMIT = Integer.MAX_VALUE / 2;
/** Returns a stream of all valid log lines. */
@Override
public Stream<WebServerAccessLog.Line> logLines()
throws DescriptorParseException {
try (BufferedReader br = new BufferedReader(new InputStreamReader(
this.decompressedByteStream()))) {
List<List<WebServerAccessLogLine>> lists = new ArrayList<>();
List<WebServerAccessLogLine> currentList = new ArrayList<>();
lists.add(currentList);
String lineStr = br.readLine();
int count = 0;
while (null != lineStr) {
WebServerAccessLogLine wsal = WebServerAccessLogLine.makeLine(lineStr);
if (wsal.isValid()) {
currentList.add(wsal);
count++;
}
if (count >= LISTLIMIT) {
currentList = new ArrayList<>();
lists.add(currentList);
count = 0;
}
lineStr = br.readLine();
}
br.close();
return lists.stream().flatMap(Collection::stream);
} catch (Exception ex) {
throw new DescriptorParseException("Cannot retrieve log lines.", ex);
}
}
}

View File

@ -0,0 +1,182 @@
/* Copyright 2018 The Tor Project
* See LICENSE for licensing information */
package org.torproject.metrics.collector.webstats;
import org.torproject.descriptor.Method;
import org.torproject.descriptor.WebServerAccessLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class WebServerAccessLogLine implements WebServerAccessLog.Line {
private static final Logger log = LoggerFactory
.getLogger(WebServerAccessLogLine.class);
private static final String DATE_PATTERN = "dd/MMM/yyyy";
private static final String DASH = "-";
private static final DateTimeFormatter dateTimeFormatter
= DateTimeFormatter.ofPattern(DATE_PATTERN + ":HH:mm:ss xxxx");
private static Pattern logLinePattern = Pattern.compile(
"^((?:\\d{1,3}\\.){3}\\d{1,3}) (\\S+) (\\S+) "
+ "\\[([\\w/]+)([\\w:]+)(\\s[+\\-]\\d{4})\\] "
+ "\"([A-Z]+) ([^\"]+) ([A-Z]+/\\d\\.\\d)\" "
+ "(\\d{3}) (\\d+|-)(.*)");
private static Map<String, String> ipMap
= Collections.synchronizedMap(new HashMap<>());
private static Map<LocalDate, LocalDate> dateMap
= Collections.synchronizedMap(new HashMap<>(500));
private static Map<String, String> protocolMap
= Collections.synchronizedMap(new HashMap<>());
private static Map<String, String> requestMap
= Collections.synchronizedMap(new HashMap<>(50_000));
private String ip;
private int response;
private String request;
private Method method;
private LocalDate date;
private int size = -1;
private boolean valid = false;
private String protocol;
/** Returns a log line string. Possibly empty. */
@Override
public String toLogString() {
if (!this.valid) {
return "";
}
return toString();
}
@Override
public String toString() {
return String.format("%s - - [%s:00:00:00 +0000] \"%s %s %s\" %d %s",
this.ip, this.getDateString(), this.method.name(), this.request,
this.protocol, this.response, this.size < 0 ? DASH : this.size);
}
/** Only used internally during sanitization.
* Returns the string of the date using 'dd/MMM/yyyy' format. */
public String getDateString() {
return this.date.format(DateTimeFormatter.ofPattern(DATE_PATTERN));
}
@Override
public String getIp() {
return this.ip;
}
/** Only used internally during sanitization. */
public void setIp(String ip) {
this.ip = fromMap(ip, ipMap);
}
@Override
public Method getMethod() {
return this.method;
}
@Override
public String getProtocol() {
return this.protocol;
}
@Override
public String getRequest() {
return this.request;
}
@Override
public Optional<Integer> getSize() {
return this.size < 0 ? Optional.empty() : Optional.of(this.size);
}
@Override
public int getResponse() {
return this.response;
}
/** Only used internally during sanitization. */
public void setRequest(String request) {
this.request = fromMap(request, requestMap);
}
@Override
public LocalDate getDate() {
return this.date;
}
@Override
public boolean isValid() {
return this.valid;
}
/** Creates a Line from a string. */
public static WebServerAccessLogLine makeLine(String line) {
WebServerAccessLogLine res = new WebServerAccessLogLine();
try {
Matcher mat = logLinePattern.matcher(line);
if (mat.find()) {
res.response = Integer.valueOf(mat.group(10));
res.method = Method.valueOf(mat.group(7));
String dateTimeString = mat.group(4) + mat.group(5) + mat.group(6);
res.date = fromMap(ZonedDateTime.parse(dateTimeString,
dateTimeFormatter).withZoneSameInstant(ZoneOffset.UTC)
.toLocalDate(), dateMap);
res.ip = fromMap(mat.group(1), ipMap);
res.request = fromMap(mat.group(8), requestMap);
res.protocol = fromMap(mat.group(9), protocolMap);
if (DASH.equals(mat.group(11))) {
res.size = -1;
} else {
res.size = Integer.valueOf(mat.group(11));
}
res.valid = true;
}
} catch (Throwable th) {
log.debug("Unmatchable line: '{}'.", line, th);
return new WebServerAccessLogLine();
}
return res;
}
private static <T> T fromMap(T val, Map<T, T> map) {
synchronized (map) {
map.putIfAbsent(Objects.requireNonNull(val), val);
return map.get(val);
}
}
@Override
public boolean equals(Object other) {
if (other instanceof WebServerAccessLogLine) {
return this.toLogString()
.equals(((WebServerAccessLogLine)other).toLogString());
}
return false;
}
@Override
public int hashCode() {
return this.toLogString().hashCode();
}
}