Use CharQueue in Buffer

This commit is contained in:
Henrik Rydgård 2024-11-21 23:40:27 +01:00
parent cb27df02f3
commit d1d7ddf310
3 changed files with 47 additions and 32 deletions

View File

@ -9,9 +9,7 @@
char *Buffer::Append(size_t length) { char *Buffer::Append(size_t length) {
if (length > 0) { if (length > 0) {
size_t old_size = data_.size(); return data_.push_back_write(length);
data_.resize(old_size + length);
return &data_[0] + old_size;
} else { } else {
return nullptr; return nullptr;
} }
@ -33,8 +31,11 @@ void Buffer::Append(const char *str) {
void Buffer::Append(const Buffer &other) { void Buffer::Append(const Buffer &other) {
size_t len = other.size(); size_t len = other.size();
if (len > 0) { if (len > 0) {
char *dest = Append(len); // Append other to the current buffer.
memcpy(dest, &other.data_[0], len); other.data_.iterate_blocks([&](const char *data, size_t size) {
data_.push_back(data, size);
return true;
});
} }
} }
@ -57,8 +58,8 @@ void Buffer::Take(size_t length, std::string *dest) {
} }
void Buffer::Take(size_t length, char *dest) { void Buffer::Take(size_t length, char *dest) {
memcpy(dest, &data_[0], length); size_t retval = data_.pop_front_bulk(dest, length);
data_.erase(data_.begin(), data_.begin() + length); _dbg_assert_(retval == length);
} }
int Buffer::TakeLineCRLF(std::string *dest) { int Buffer::TakeLineCRLF(std::string *dest) {
@ -79,7 +80,7 @@ void Buffer::Skip(size_t length) {
ERROR_LOG(Log::IO, "Truncating length in Buffer::Skip()"); ERROR_LOG(Log::IO, "Truncating length in Buffer::Skip()");
length = data_.size(); length = data_.size();
} }
data_.erase(data_.begin(), data_.begin() + length); data_.skip(length);
} }
int Buffer::SkipLineCRLF() { int Buffer::SkipLineCRLF() {
@ -92,9 +93,10 @@ int Buffer::SkipLineCRLF() {
} }
} }
// This relies on having buffered data!
int Buffer::OffsetToAfterNextCRLF() { int Buffer::OffsetToAfterNextCRLF() {
for (int i = 0; i < (int)data_.size() - 1; i++) { for (int i = 0; i < (int)data_.size() - 1; i++) {
if (data_[i] == '\r' && data_[i + 1] == '\n') { if (data_.peek(i) == '\r' && data_.peek(i + 1) == '\n') {
return i + 2; return i + 2;
} }
} }
@ -124,7 +126,11 @@ bool Buffer::FlushToFile(const Path &filename) {
if (!f) if (!f)
return false; return false;
if (!data_.empty()) { if (!data_.empty()) {
fwrite(&data_[0], 1, data_.size(), f); // Write the buffer to the file.
data_.iterate_blocks([=](const char *blockData, size_t blockSize) {
return fwrite(blockData, 1, blockSize, f) == blockSize;
});
data_.clear();
} }
fclose(f); fclose(f);
return true; return true;
@ -132,5 +138,8 @@ bool Buffer::FlushToFile(const Path &filename) {
void Buffer::PeekAll(std::string *dest) { void Buffer::PeekAll(std::string *dest) {
dest->resize(data_.size()); dest->resize(data_.size());
memcpy(&(*dest)[0], &data_[0], data_.size()); data_.iterate_blocks(([=](const char *blockData, size_t blockSize) {
dest->append(blockData, blockSize);
return true;
}));
} }

View File

@ -4,6 +4,7 @@
#include <vector> #include <vector>
#include "Common/Common.h" #include "Common/Common.h"
#include "Common/Data/Collections/CharQueue.h"
class Path; class Path;
@ -72,12 +73,12 @@ public:
// Utilities. Try to avoid checking for size. // Utilities. Try to avoid checking for size.
size_t size() const { return data_.size(); } size_t size() const { return data_.size(); }
bool empty() const { return size() == 0; } bool empty() const { return size() == 0; }
void clear() { data_.resize(0); } void clear() { data_.clear(); }
bool IsVoid() const { return void_; } bool IsVoid() const { return void_; }
protected: protected:
// TODO: Find a better internal representation, like a cord. // Custom queue implementation.
std::vector<char> data_; CharQueue data_;
bool void_ = false; bool void_ = false;
private: private:

View File

@ -36,27 +36,32 @@ void RequestProgress::Update(int64_t downloaded, int64_t totalBytes, bool done)
bool Buffer::FlushSocket(uintptr_t sock, double timeout, bool *cancelled) { bool Buffer::FlushSocket(uintptr_t sock, double timeout, bool *cancelled) {
static constexpr float CANCEL_INTERVAL = 0.25f; static constexpr float CANCEL_INTERVAL = 0.25f;
for (size_t pos = 0, end = data_.size(); pos < end; ) {
bool ready = false; data_.iterate_blocks([&](const char *data, size_t size) {
double endTimeout = time_now_d() + timeout; for (size_t pos = 0, end = size; pos < end; ) {
while (!ready) { bool ready = false;
if (cancelled && *cancelled) double endTimeout = time_now_d() + timeout;
return false; while (!ready) {
ready = fd_util::WaitUntilReady(sock, CANCEL_INTERVAL, true); if (cancelled && *cancelled)
if (!ready && time_now_d() > endTimeout) { return false;
ERROR_LOG(Log::IO, "FlushSocket timed out"); ready = fd_util::WaitUntilReady(sock, CANCEL_INTERVAL, true);
if (!ready && time_now_d() > endTimeout) {
ERROR_LOG(Log::IO, "FlushSocket timed out");
return false;
}
}
int sent = send(sock, &data[pos], end - pos, MSG_NOSIGNAL);
// TODO: Do we need some retry logic here, instead of just giving up?
if (sent < 0) {
ERROR_LOG(Log::IO, "FlushSocket failed to send: %d", errno);
return false; return false;
} }
pos += sent;
} }
int sent = send(sock, &data_[pos], end - pos, MSG_NOSIGNAL); return true;
// TODO: Do we need some retry logic here, instead of just giving up? });
if (sent < 0) {
ERROR_LOG(Log::IO, "FlushSocket failed to send: %d", errno); data_.clear();
return false;
}
pos += sent;
}
data_.resize(0);
return true; return true;
} }