From 41e7d3ecde9539c87d3371280217d6487c804bfa Mon Sep 17 00:00:00 2001 From: "Unknown W. Brackets" Date: Wed, 25 May 2016 19:02:38 -0700 Subject: [PATCH] http: Switch to using input/output sinks. This should be more performant than reading one byte at a time in a loop. --- CMakeLists.txt | 2 + ext/native/Android.mk | 1 + ext/native/native.vcxproj | 2 + ext/native/native.vcxproj.filters | 6 + ext/native/net/http_headers.cpp | 45 ++-- ext/native/net/http_headers.h | 7 +- ext/native/net/http_server.cpp | 58 +++-- ext/native/net/http_server.h | 19 +- ext/native/net/sinks.cpp | 364 ++++++++++++++++++++++++++++++ ext/native/net/sinks.h | 72 ++++++ 10 files changed, 519 insertions(+), 57 deletions(-) create mode 100644 ext/native/net/sinks.cpp create mode 100644 ext/native/net/sinks.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 15388e93da..0a6d58f3ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1001,6 +1001,8 @@ add_library(native STATIC ext/native/net/http_client.h ext/native/net/resolve.cpp ext/native/net/resolve.h + ext/native/net/sinks.cpp + ext/native/net/sinks.h ext/native/net/url.cpp ext/native/net/url.h ext/native/profiler/profiler.cpp diff --git a/ext/native/Android.mk b/ext/native/Android.mk index 1f3eea6530..87e0004273 100644 --- a/ext/native/Android.mk +++ b/ext/native/Android.mk @@ -62,6 +62,7 @@ LOCAL_SRC_FILES :=\ net/http_server.cpp \ net/http_headers.cpp \ net/resolve.cpp \ + net/sinks.cpp \ net/url.cpp \ profiler/profiler.cpp \ thread/executor.cpp \ diff --git a/ext/native/native.vcxproj b/ext/native/native.vcxproj index f750063f7c..ba4fccf8a6 100644 --- a/ext/native/native.vcxproj +++ b/ext/native/native.vcxproj @@ -264,6 +264,7 @@ + @@ -723,6 +724,7 @@ + diff --git a/ext/native/native.vcxproj.filters b/ext/native/native.vcxproj.filters index 269ac5835d..c3226a5147 100644 --- a/ext/native/native.vcxproj.filters +++ b/ext/native/native.vcxproj.filters @@ -308,6 +308,9 @@ math + + net + @@ -745,6 +748,9 @@ thin3d + + net + diff --git a/ext/native/net/http_headers.cpp b/ext/native/net/http_headers.cpp index 4819239da4..de05f2f978 100644 --- a/ext/native/net/http_headers.cpp +++ b/ext/native/net/http_headers.cpp @@ -6,6 +6,7 @@ #include "base/logging.h" #include "base/stringutil.h" #include "file/fd_util.h" +#include "net/sinks.h" namespace http { @@ -121,30 +122,26 @@ int RequestHeader::ParseHttpHeader(const char *buffer) { return 0; } -void RequestHeader::ParseHeaders(int fd) { - int line_count = 0; - // Loop through request headers. - while (true) { - if (!fd_util::WaitUntilReady(fd, 5.0)) { // Wait max 5 secs. - // Timed out or error. - ok = false; - return; - } - char buffer[1024]; - fd_util::ReadLine(fd, buffer, 1023); - StringTrimEndNonAlphaNum(buffer); - if (buffer[0] == '\0') - break; - ParseHttpHeader(buffer); - line_count++; - if (type == SIMPLE) { - // Done! - ILOG("Simple: Done parsing http request."); - break; - } - } - ILOG("finished parsing request."); - ok = line_count > 1; +void RequestHeader::ParseHeaders(net::InputSink *sink) { + int line_count = 0; + std::string line; + while (sink->ReadLine(line)) { + if (line.length() == 0) { + // Blank line, this means end of headers. + break; + } + + ParseHttpHeader(line.c_str()); + line_count++; + if (type == SIMPLE) { + // Done! + ILOG("Simple: Done parsing http request."); + break; + } + } + + ILOG("finished parsing request."); + ok = line_count > 1; } } // namespace http diff --git a/ext/native/net/http_headers.h b/ext/native/net/http_headers.h index d333e1f91a..ccc3a09a2e 100644 --- a/ext/native/net/http_headers.h +++ b/ext/native/net/http_headers.h @@ -3,6 +3,10 @@ #include "base/buffer.h" +namespace net { +class InputSink; +}; + namespace http { class RequestHeader { @@ -12,6 +16,7 @@ class RequestHeader { // Public variables since it doesn't make sense // to bother with accessors for all these. int status; + // Intentional misspelling. char *referer; char *user_agent; char *resource; @@ -29,7 +34,7 @@ class RequestHeader { }; Method method; bool ok; - void ParseHeaders(int fd); + void ParseHeaders(net::InputSink *sink); bool GetParamValue(const char *param_name, std::string *value) const; private: int ParseHttpHeader(const char *buffer); diff --git a/ext/native/net/http_server.cpp b/ext/native/net/http_server.cpp index 68170251ea..16c9aec70f 100644 --- a/ext/native/net/http_server.cpp +++ b/ext/native/net/http_server.cpp @@ -2,6 +2,7 @@ #ifdef _WIN32 +#define NOMINMAX #include #include #include @@ -15,8 +16,11 @@ #include /* inet (3) funtions */ #include /* misc. UNIX functions */ +#define closesocket close + #endif +#include #include #include @@ -25,50 +29,48 @@ #include "base/buffer.h" #include "file/fd_util.h" #include "net/http_server.h" +#include "net/sinks.h" #include "thread/executor.h" namespace http { Request::Request(int fd) : fd_(fd) { - in_buffer_ = new Buffer; - out_buffer_ = new Buffer; - header_.ParseHeaders(fd_); + in_ = new net::InputSink(fd); + out_ = new net::OutputSink(fd); + header_.ParseHeaders(in_); - if (header_.ok) { - // Read the rest, too. - if (header_.content_length >= 0) { - in_buffer_->Read(fd_, header_.content_length); - } - ILOG("The request carried with it %i bytes", (int)in_buffer_->size()); - } else { - Close(); - } + if (header_.ok) { + ILOG("The request carried with it %i bytes", (int)header_.content_length); + } else { + Close(); + } } Request::~Request() { Close(); - CHECK(in_buffer_->empty()); - delete in_buffer_; - CHECK(out_buffer_->empty()); - delete out_buffer_; + CHECK(in_->Empty()); + delete in_; + CHECK(out_->Empty()); + delete out_; } void Request::WriteHttpResponseHeader(int status, int size) const { - Buffer *buffer = out_buffer_; + net::OutputSink *buffer = Out(); buffer->Printf("HTTP/1.0 %d OK\r\n", status); - buffer->Append("Server: SuperDuperServer v0.1\r\n"); - buffer->Append("Content-Type: text/html\r\n"); + buffer->Push("Server: SuperDuperServer v0.1\r\n"); + buffer->Push("Content-Type: text/html\r\n"); + buffer->Push("Connection: close\r\n"); if (size >= 0) { buffer->Printf("Content-Length: %i\r\n", size); } - buffer->Append("\r\n"); + buffer->Push("\r\n"); } void Request::WritePartial() const { CHECK(fd_); - out_buffer_->Flush(fd_); + out_->Flush(); } void Request::Write() { @@ -79,7 +81,7 @@ void Request::Write() { void Request::Close() { if (fd_) { - close(fd_); + closesocket(fd_); fd_ = 0; } } @@ -139,7 +141,12 @@ void Server::HandleConnection(int conn_fd) { return; } HandleRequestDefault(request); - request.WritePartial(); + + // TODO: Way to mark the content body as read, read it here if never read. + // This allows the handler to stream if need be. + + // TODO: Could handle keep alive here. + request.Write(); } void Server::HandleRequest(const Request &request) { @@ -157,12 +164,13 @@ void Server::HandleRequestDefault(const Request &request) { ILOG("No handler for '%s', falling back to 404.", request.resource()); const char *payload = "404 not found\r\n"; request.WriteHttpResponseHeader(404, (int)strlen(payload)); - request.out_buffer()->Append(payload); + request.Out()->Push(payload); } void Server::HandleListing(const Request &request) { + request.WriteHttpResponseHeader(200, -1); for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter) { - request.out_buffer()->Printf("%s", iter->first.c_str()); + request.Out()->Printf("%s\n", iter->first.c_str()); } } diff --git a/ext/native/net/http_server.h b/ext/native/net/http_server.h index 5d45722671..ab8abd95cc 100644 --- a/ext/native/net/http_server.h +++ b/ext/native/net/http_server.h @@ -8,6 +8,11 @@ #include "net/http_headers.h" #include "thread/executor.h" +namespace net { +class InputSink; +class OutputSink; +}; + namespace http { class Request { @@ -23,8 +28,8 @@ class Request { return header_.GetParamValue(param_name, value); } - Buffer *in_buffer() const { return in_buffer_; } - Buffer *out_buffer() const { return out_buffer_; } + net::InputSink *In() const { return in_; } + net::OutputSink *Out() const { return out_; } // TODO: Remove, in favor of PartialWrite and friends. int fd() const { return fd_; } @@ -38,11 +43,11 @@ class Request { // If size is negative, no Content-Length: line is written. void WriteHttpResponseHeader(int status, int size = -1) const; - private: - Buffer *in_buffer_; - Buffer *out_buffer_; - RequestHeader header_; - int fd_; +private: + net::InputSink *in_; + net::OutputSink *out_; + RequestHeader header_; + int fd_; }; // Register handlers on this class to serve stuff. diff --git a/ext/native/net/sinks.cpp b/ext/native/net/sinks.cpp new file mode 100644 index 0000000000..158984e1a2 --- /dev/null +++ b/ext/native/net/sinks.cpp @@ -0,0 +1,364 @@ +#pragma optimize("", off) + +#ifdef _WIN32 + +#define NOMINMAX +#include +#include +#include + +#else + +#include /* socket definitions */ +#include /* socket types */ +#include /* for waitpid() */ +#include /* struct sockaddr_in */ +#include /* inet (3) funtions */ +#include /* misc. UNIX functions */ + +#endif + +#include +#include + +#include "base/logging.h" +#include "net/sinks.h" +#include "file/fd_util.h" + +namespace net { + +InputSink::InputSink(size_t fd) : fd_(fd), read_(0), write_(0), valid_(0) { + fd_util::SetNonBlocking((int)fd_, true); +} + +bool InputSink::ReadLineWithEnding(std::string &s) { + size_t newline = FindNewline(); + if (newline == BUFFER_SIZE) { + Block(); + newline = FindNewline(); + } + if (newline == BUFFER_SIZE) { + // Timed out. + return false; + } + + s.resize(newline + 1); + if (read_ + newline + 1 > BUFFER_SIZE) { + // Need to do two reads. + size_t chunk1 = BUFFER_SIZE - read_; + size_t chunk2 = read_ + newline + 1 - BUFFER_SIZE; + memcpy(&s[0], buf_ + read_, chunk1); + memcpy(&s[chunk1], buf_, chunk2); + } else { + memcpy(&s[0], buf_ + read_, newline + 1); + } + AccountDrain(newline + 1); + + return true; +} + +std::string InputSink::ReadLineWithEnding() { + std::string s; + ReadLineWithEnding(s); + return s; +} + +bool InputSink::ReadLine(std::string &s) { + bool result = ReadLineWithEnding(s); + if (result) { + size_t l = s.length(); + if (l >= 2 && s[l - 2] == '\r' && s[l - 1] == '\n') { + s.resize(l - 2); + } else if (l >= 1 && s[l - 1] == '\n') { + s.resize(l - 1); + } + } + return result; +} + +std::string InputSink::ReadLine() { + std::string s; + ReadLine(s); + return s; +} + +size_t InputSink::FindNewline() const { + // Technically, \r\n, but most parsers are lax... let's follow suit. + size_t until_end = std::min(valid_, BUFFER_SIZE - read_); + for (const char *p = buf_ + read_, *end = buf_ + read_ + until_end; p < end; ++p) { + if (*p == '\n') { + return p - (buf_ + read_); + } + } + + // Were there more bytes wrapped around? + if (read_ + valid_ > BUFFER_SIZE) { + size_t wrapped = read_ + valid_ - BUFFER_SIZE; + for (const char *p = buf_, *end = buf_ + wrapped; p < end; ++p) { + if (*p == '\n') { + // Offset by the skipped portion before wrapping. + return (p - buf_) + until_end; + } + } + } + + // Never found, return an invalid position to indicate. + return BUFFER_SIZE; +} + +bool InputSink::TakeExact(char *buf, size_t bytes) { + while (bytes > 0) { + size_t drained = TakeAtMost(buf, bytes); + buf += drained; + bytes -= drained; + + if (drained == 0) { + if (!Block()) { + // Timed out reading more bytes. + return false; + } + } + } + + return true; +} + +size_t InputSink::TakeAtMost(char *buf, size_t bytes) { + Fill(); + + // The least of: contiguous to read, actually populated in buffer, and wanted. + size_t avail = std::min(std::min(BUFFER_SIZE - read_, valid_), bytes); + + if (avail != 0) { + memcpy(buf, buf_ + read_, avail); + AccountDrain(avail); + } + + return avail; +} + +bool InputSink::Skip(size_t bytes) { + while (bytes > 0) { + size_t drained = std::min(valid_, bytes); + AccountDrain(drained); + bytes -= drained; + + // Nothing left to read? Get more. + if (drained == 0) { + if (!Block()) { + // Timed out reading more bytes. + return false; + } + } + } + + return true; +} + +void InputSink::Fill() { + // Avoid small reads if possible. + if (BUFFER_SIZE - valid_ > PRESSURE) { + // Whatever isn't valid and follows write_ is what's available. + size_t avail = BUFFER_SIZE - std::max(write_, valid_); + + int bytes = recv(fd_, buf_ + write_, (int)avail, 0); + AccountFill(bytes); + } +} + +bool InputSink::Block() { + if (!fd_util::WaitUntilReady((int)fd_, 5.0)) { + return false; + } + + Fill(); + return true; +} + +void InputSink::AccountFill(int bytes) { + if (bytes < 0) { + ELOG("Error reading from socket"); + return; + } + + // Okay, move forward (might be by zero.) + valid_ += bytes; + write_ += bytes; + if (write_ >= BUFFER_SIZE) { + write_ -= BUFFER_SIZE; + } +} + +void InputSink::AccountDrain(size_t bytes) { + valid_ -= bytes; + read_ += bytes; + if (read_ >= BUFFER_SIZE) { + read_ -= BUFFER_SIZE; + } +} + +bool InputSink::Empty() { + return valid_ == 0; +} + +OutputSink::OutputSink(size_t fd) : fd_(fd), read_(0), write_(0), valid_(0) { + fd_util::SetNonBlocking((int)fd_, true); +} + +bool OutputSink::Push(const std::string &s) { + return Push(&s[0], s.length()); +} + +bool OutputSink::Push(const char *buf, size_t bytes) { + while (bytes > 0) { + size_t pushed = PushAtMost(buf, bytes); + buf += pushed; + bytes -= pushed; + + if (pushed == 0) { + if (!Block()) { + // We couldn't write all the bytes. + return false; + } + } + } + + return true; +} + +bool OutputSink::PushCRLF(const std::string &s) { + if (Push(s)) { + return Push("r\n", 2); + } + return false; +} + +size_t OutputSink::PushAtMost(const char *buf, size_t bytes) { + Drain(); + + // Look for contiguous free space after write_ that's valid. + size_t avail = std::min(BUFFER_SIZE - std::max(write_, valid_), bytes); + + if (avail != 0) { + memcpy(buf_ + write_, buf, avail); + AccountPush(avail); + } + + return avail; +} + + +bool OutputSink::Printf(const char *fmt, ...) { + // Let's start by checking how much space we have. + size_t avail = BUFFER_SIZE - std::max(write_, valid_); + + va_list args; + va_start(args, fmt); + // Make a backup in case we don't have sufficient space. + va_list backup; + va_copy(backup, args); + + bool success = true; + + int result = vsnprintf(buf_ + write_, avail, fmt, args); + if (result >= avail) { + // There wasn't enough space. Let's use a buffer instead. + // This could be caused by wraparound. + char temp[BUFFER_SIZE]; + result = vsnprintf(temp, BUFFER_SIZE, fmt, args); + + if (result < BUFFER_SIZE && result > 0) { + // In case it did return the null terminator. + if (temp[result - 1] == '\0') { + result--; + } + + success = Push(temp, result); + // We've written so there's nothing more. + result = 0; + } + } + va_end(args); + va_end(backup); + + // Okay, did we actually write? + if (result >= avail) { + // This means the result string was too big for the buffer. + ELOG("Not enough space to format output."); + return false; + } else if (result < 0) { + ELOG("vsnprintf failed."); + return false; + } + + if (result > 0) { + AccountPush(result); + } + + return success; +} + +bool OutputSink::Block() { + if (!fd_util::WaitUntilReady((int)fd_, 5.0, true)) { + return false; + } + + Drain(); + return true; +} + +bool OutputSink::Flush() { + while (valid_ > 0) { + size_t avail = std::min(BUFFER_SIZE - read_, valid_); + + int bytes = send(fd_, buf_ + read_, (int)avail, 0); + AccountDrain(bytes); + + if (bytes == 0) { + // This may also drain. Either way, keep looping. + if (!Block()) { + return false; + } + } + } + + return true; +} + +void OutputSink::Drain() { + // Avoid small reads if possible. + if (valid_ > PRESSURE) { + // Let's just do contiguous valid. + size_t avail = std::min(BUFFER_SIZE - read_, valid_); + + int bytes = send(fd_, buf_ + read_, (int)avail, 0); + AccountDrain(bytes); + } +} + +void OutputSink::AccountPush(size_t bytes) { + valid_ += bytes; + write_ += bytes; + if (write_ >= BUFFER_SIZE) { + write_ -= BUFFER_SIZE; + } +} + +void OutputSink::AccountDrain(int bytes) { + if (bytes < 0) { + ELOG("Error writing to socket"); + return; + } + + valid_ -= bytes; + read_ += bytes; + if (read_ >= BUFFER_SIZE) { + read_ -= BUFFER_SIZE; + } +} + +bool OutputSink::Empty() { + return valid_ == 0; +} + +}; diff --git a/ext/native/net/sinks.h b/ext/native/net/sinks.h new file mode 100644 index 0000000000..0138d74a6c --- /dev/null +++ b/ext/native/net/sinks.h @@ -0,0 +1,72 @@ +#pragma once + +#include + +namespace net { + +class InputSink { +public: + InputSink(size_t fd); + + bool ReadLine(std::string &s); + std::string ReadLine(); + bool ReadLineWithEnding(std::string &s); + std::string ReadLineWithEnding(); + + // Read exactly this number of bytes, or fail. + bool TakeExact(char *buf, size_t bytes); + // Read whatever is convenient (may even return 0 bytes when there's more coming eventually.) + size_t TakeAtMost(char *buf, size_t bytes); + // Skip exactly this number of bytes, or fail. + bool Skip(size_t bytes); + + bool Empty(); + +private: + void Fill(); + bool Block(); + void AccountFill(int bytes); + void AccountDrain(size_t bytes); + size_t FindNewline() const; + + static const size_t BUFFER_SIZE = 32 * 1024; + static const size_t PRESSURE = 8 * 1024; + + size_t fd_; + char buf_[BUFFER_SIZE]; + size_t read_; + size_t write_; + size_t valid_; +}; + +class OutputSink { +public: + OutputSink(size_t fd); + + bool Push(const std::string &s); + bool Push(const char *buf, size_t bytes); + size_t PushAtMost(const char *buf, size_t bytes); + bool PushCRLF(const std::string &s); + bool Printf(const char *fmt, ...); + + bool Flush(); + + bool Empty(); + +private: + void Drain(); + bool Block(); + void AccountPush(size_t bytes); + void AccountDrain(int bytes); + + static const size_t BUFFER_SIZE = 32 * 1024; + static const size_t PRESSURE = 8 * 1024; + + size_t fd_; + char buf_[BUFFER_SIZE]; + size_t read_; + size_t write_; + size_t valid_; +}; + +};