CLOUD: Rewrite NetworkReadStream

Now it is based on MemoryReadWriteStream, which is introduced by this
commit. This stream is using ring buffer and is dynamically increasing
its size when necessary.
This commit is contained in:
Alexander Tkachev 2016-05-16 01:05:40 +06:00
parent 01abba4f1d
commit 9c22b7cc64
10 changed files with 119 additions and 46 deletions

View File

@ -23,7 +23,7 @@
#define FORBIDDEN_SYMBOL_ALLOW_ALL
#include "backends/cloud/dropbox/curlrequest.h"
#include "backends/cloud/curl/networkreadstream.h"
#include "backends/networking/curl/networkreadstream.h"
#include "common/debug.h"
#include <curl/curl.h>
@ -38,7 +38,7 @@ CurlRequest::~CurlRequest() {
if (_stream) delete _stream;
}
bool CurlRequest::handle(ConnectionManager& manager) {
bool CurlRequest::handle(Networking::ConnectionManager &manager) {
if (_firstTime) {
_stream = manager.makeRequest(_url);
_firstTime = false;

View File

@ -25,22 +25,23 @@
#include "backends/cloud/request.h"
namespace Cloud {
namespace Networking {
class NetworkReadStream;
}
namespace Cloud {
namespace Dropbox {
class CurlRequest : public Cloud::Request {
bool _firstTime;
const char *_url;
NetworkReadStream *_stream;
Networking::NetworkReadStream *_stream;
public:
CurlRequest(Callback cb, const char *url);
virtual ~CurlRequest();
virtual bool handle(ConnectionManager& manager);
virtual bool handle(Networking::ConnectionManager &manager);
};
} //end of namespace Dropbox

View File

@ -23,7 +23,7 @@
#ifndef BACKENDS_CLOUD_REQUEST_H
#define BACKENDS_CLOUD_REQUEST_H
#include "backends/cloud/curl/connectionmanager.h"
#include "backends/networking/curl/connectionmanager.h"
namespace Cloud {
@ -47,7 +47,7 @@ public:
* @return true if request's work is complete and it may be removed from Storage's list
*/
virtual bool handle(ConnectionManager& manager) = 0;
virtual bool handle(Networking::ConnectionManager &manager) = 0;
};
} //end of namespace Cloud

View File

@ -26,7 +26,7 @@
#include "common/str.h"
#include "common/array.h"
#include "backends/cloud/request.h"
#include "backends/cloud/curl/connectionmanager.h"
#include "backends/networking/curl/connectionmanager.h"
namespace Cloud {
@ -36,7 +36,7 @@ class Storage {
protected:
Common::Array<Request *> _requests;
ConnectionManager _connectionManager;
Networking::ConnectionManager _connectionManager;
virtual void addRequest(Request *request); //starts the timer if it's not started
virtual void handler();

View File

@ -24,9 +24,13 @@ MODULE_OBJS += \
cloud/manager.o \
cloud/storage.o \
cloud/dropbox/dropboxstorage.o \
cloud/dropbox/curlrequest.o \
cloud/curl/connectionmanager.o \
cloud/curl/networkreadstream.o
cloud/dropbox/curlrequest.o
endif
ifdef USE_LIBCURL
MODULE_OBJS += \
networking/curl/connectionmanager.o \
networking/curl/networkreadstream.o
endif
ifdef USE_ELF_LOADER

View File

@ -22,12 +22,12 @@
#define FORBIDDEN_SYMBOL_ALLOW_ALL
#include "backends/cloud/curl/connectionmanager.h"
#include "backends/cloud/curl/networkreadstream.h"
#include "backends/networking/curl/connectionmanager.h"
#include "backends/networking/curl/networkreadstream.h"
#include "common/debug.h"
#include <curl/curl.h>
namespace Cloud {
namespace Networking {
ConnectionManager::ConnectionManager(): _multi(0) {
curl_global_init(CURL_GLOBAL_ALL);
@ -60,9 +60,8 @@ void ConnectionManager::handle() {
if (stream) stream->done();
debug("ConnectionManager: SUCCESS (%d - %s)", curlMsg->data.result, curl_easy_strerror(curlMsg->data.result));
curl_multi_remove_handle(_multi, e);
}
else {
curl_multi_remove_handle(_multi, e);
} else {
debug("ConnectionManager: FAILURE (CURLMsg (%d))", curlMsg->msg);
//TODO: notify stream on this case also
}

View File

@ -20,14 +20,14 @@
*
*/
#ifndef BACKENDS_CLOUD_CURL_CONNECTIONMANAGER_H
#define BACKENDS_CLOUD_CURL_CONNECTIONMANAGER_H
#ifndef BACKENDS_NETWORKING_CURL_CONNECTIONMANAGER_H
#define BACKENDS_NETWORKING_CURL_CONNECTIONMANAGER_H
#include "common/str.h"
typedef void CURLM;
namespace Cloud {
namespace Networking {
class NetworkReadStream;

View File

@ -22,15 +22,15 @@
#define FORBIDDEN_SYMBOL_ALLOW_ALL
#include "backends/cloud/curl/networkreadstream.h"
#include "backends/networking/curl/networkreadstream.h"
#include "common/debug.h"
#include <curl/curl.h>
namespace Cloud {
namespace Networking {
static size_t curlDataCallback(char *d, size_t n, size_t l, void *p) {
NetworkReadStream *stream = (NetworkReadStream *)p;
if (stream) return stream->dataCallback(d, n, l);
if (stream) return stream->write(d, n*l);
return 0;
}
@ -53,18 +53,13 @@ bool NetworkReadStream::eos() const {
}
uint32 NetworkReadStream::read(void *dataPtr, uint32 dataSize) {
uint32 available = _bytes.size();
uint32 actuallyRead = MemoryReadWriteStream::read(dataPtr, dataSize);
if (available == 0) {
if (actuallyRead == 0) {
if (_requestComplete) _eos = true;
return 0;
}
char *data = (char *)dataPtr;
uint32 actuallyRead = (dataSize < available ? dataSize : available);
for (uint32 i = 0; i < actuallyRead; ++i) data[i] = _bytes[i];
data[actuallyRead] = 0;
_bytes.erase(0, actuallyRead);
return actuallyRead;
}
@ -72,12 +67,4 @@ void NetworkReadStream::done() {
_requestComplete = true;
}
size_t NetworkReadStream::dataCallback(char *d, size_t n, size_t l) {
//TODO: return CURL_WRITEFUNC_PAUSE if _bytes is too long
//TODO: remember https://curl.haxx.se/libcurl/c/curl_easy_pause.html (Memory Use / compressed data case)
//TODO: if using pause, don't forget to unpause it somehow from read() up there
_bytes += Common::String(d, n*l);
return n*l;
}
} //end of namespace Cloud

View File

@ -20,20 +20,20 @@
*
*/
#ifndef BACKENDS_CLOUD_CURL_NETWORKREADSTREAM_H
#define BACKENDS_CLOUD_CURL_NETWORKREADSTREAM_H
#ifndef BACKENDS_NETWORKING_CURL_NETWORKREADSTREAM_H
#define BACKENDS_NETWORKING_CURL_NETWORKREADSTREAM_H
#include "common/memstream.h"
#include "common/stream.h"
#include "common/str.h"
typedef void CURL;
namespace Cloud {
namespace Networking {
class NetworkReadStream: public Common::ReadStream {
class NetworkReadStream: public Common::MemoryReadWriteStream {
CURL *_easy;
bool _eos, _requestComplete;
Common::String _bytes;
public:
NetworkReadStream(const char *url);
@ -68,7 +68,6 @@ public:
virtual uint32 read(void *dataPtr, uint32 dataSize);
void done();
size_t dataCallback(char *d, size_t n, size_t l);
};
} //end of namespace Cloud

View File

@ -209,6 +209,89 @@ public:
bool seek(int32 offset, int whence = SEEK_SET);
};
/**
* MemoryStream based on RingBuffer. Grows if has insufficient buffer size.
*/
class MemoryReadWriteStream : public WriteStream {
private:
uint32 _capacity;
uint32 _size;
byte *_data;
uint32 _writePos, _readPos, _pos, _length;
DisposeAfterUse::Flag _disposeMemory;
void ensureCapacity(uint32 new_len) {
if (new_len <= _capacity)
return;
byte *old_data = _data;
uint32 oldCapacity = _capacity;
_capacity = MAX(new_len + 32, _capacity * 2);
_data = (byte *)malloc(_capacity);
if (old_data) {
// Copy old data
if (_readPos < _writePos) {
memcpy(_data, old_data + _readPos, _writePos - _readPos);
_writePos -= _readPos;
_readPos = 0;
} else {
memcpy(_data, old_data + _readPos, oldCapacity - _readPos);
memcpy(_data + oldCapacity - _readPos, old_data, _writePos);
_writePos += oldCapacity - _readPos;
_readPos = 0;
}
free(old_data);
}
}
public:
MemoryReadWriteStream(DisposeAfterUse::Flag disposeMemory = DisposeAfterUse::NO) : _capacity(0), _size(0), _data(0), _writePos(0), _readPos(0), _pos(0), _length(0), _disposeMemory(disposeMemory) {}
~MemoryReadWriteStream() {
if (_disposeMemory)
free(_data);
}
uint32 write(const void *dataPtr, uint32 dataSize) {
ensureCapacity(_length + dataSize);
if (_writePos + dataSize < _capacity) {
memcpy(_data + _writePos, dataPtr, dataSize);
} else {
memcpy(_data + _writePos, dataPtr, _capacity - _writePos);
const byte *shiftedPtr = (const byte *)dataPtr + _capacity - _writePos;
memcpy(_data, shiftedPtr, dataSize - (_capacity - _writePos));
}
_writePos = (_writePos + dataSize) % _capacity;
_pos += dataSize;
_length += dataSize;
if (_pos > _size)
_size = _pos;
return dataSize;
}
uint32 read(void *dataPtr, uint32 dataSize) {
uint32 length = _length;
if (length < dataSize) dataSize = length;
if (dataSize == 0 || _capacity == 0) return 0;
if (_readPos + dataSize < _capacity) {
memcpy(dataPtr, _data + _readPos, dataSize);
} else {
memcpy(dataPtr, _data + _readPos, _capacity - _readPos);
byte *shiftedPtr = (byte *)dataPtr + _capacity - _readPos;
memcpy(shiftedPtr, _data, dataSize - (_capacity - _readPos));
}
_readPos = (_readPos + dataSize) % _capacity;
_length -= dataSize;
return dataSize;
}
uint32 pos() const { return _pos - _length; } //'read' position in the stream
uint32 size() const { return _size; } //that's also 'write' position in the stream, as it's append-only
byte *getData() { return _data; }
};
} // End of namespace Common
#endif