More S3ObjectStream WIP.

Add some caching and ListObjects.
This commit is contained in:
Jean-Philip Desjardins 2018-02-19 16:46:16 -05:00
parent bc3a78fcda
commit 69c215e7b3
4 changed files with 101 additions and 10 deletions

View File

@ -1,5 +1,6 @@
#include "http/HttpClientFactory.h"
#include "string_format.h"
#include "xml/Parser.h"
#include <openssl/sha.h>
#include <openssl/hmac.h>
#include <map>
@ -216,8 +217,48 @@ HeadObjectResult CAmazonS3Client::HeadObject(std::string objectName)
}
HeadObjectResult result;
auto contentLength = response.headers.find("Content-Length");
result.contentLength = atol(contentLength->second.c_str());
auto contentLengthIterator = response.headers.find("Content-Length");
if(contentLengthIterator != std::end(response.headers))
{
result.contentLength = atol(contentLengthIterator->second.c_str());
}
auto etagIterator = response.headers.find("ETag");
if(etagIterator != std::end(response.headers))
{
result.etag = etagIterator->second;
}
return result;
}
ListObjectsResult CAmazonS3Client::ListObjects(std::string bucketName)
{
Request rq;
rq.method = Framework::Http::HTTP_VERB::GET;
rq.uri = "/";
auto response = ExecuteRequest(rq);
if(response.statusCode != Framework::Http::HTTP_STATUS_CODE::OK)
{
throw std::runtime_error("Failed to list objects");
}
ListObjectsResult result;
auto documentNode = std::unique_ptr<Framework::Xml::CNode>(Framework::Xml::CParser::ParseDocument(response.data));
auto contentsNodes = documentNode->SelectNodes("ListBucketResult/Contents");
for(const auto& contentsNode : contentsNodes)
{
Object object;
if(auto keyNode = contentsNode->Select("Key"))
{
object.key = keyNode->GetInnerText();
}
result.objects.push_back(object);
}
return result;
}

View File

@ -29,6 +29,17 @@ struct GetObjectResult
struct HeadObjectResult
{
uint64 contentLength = 0;
std::string etag;
};
struct Object
{
std::string key;
};
struct ListObjectsResult
{
std::vector<Object> objects;
};
class CAmazonS3Client
@ -39,6 +50,7 @@ public:
GetBucketLocationResult GetBucketLocation(const GetBucketLocationRequest&);
GetObjectResult GetObject(const GetObjectRequest&);
HeadObjectResult HeadObject(std::string);
ListObjectsResult ListObjects(std::string);
private:
struct Request

View File

@ -4,9 +4,13 @@
#include "AmazonS3Client.h"
#include "Singleton.h"
#include "AppConfig.h"
#include "PathUtils.h"
#include "string_format.h"
#include "StdStreamUtils.h"
#define PREF_S3_OBJECTSTREAM_ACCESSKEYID "s3.objectstream.accesskeyid"
#define PREF_S3_OBJECTSTREAM_SECRETACCESSKEY "s3.objectstream.secretaccesskey"
#define CACHE_PATH "s3objectstream_cache"
class CS3Config : public CSingleton<CS3Config>
{
@ -32,30 +36,46 @@ CS3ObjectStream::CS3ObjectStream(const char* bucketName, const char* objectName)
: m_bucketName(bucketName)
, m_objectName(objectName)
{
Framework::PathUtils::EnsurePathExists(GetCachePath());
GetObjectInfo();
}
uint64 CS3ObjectStream::Read(void* buffer, uint64 size)
{
auto range = std::make_pair(m_objectPosition, m_objectPosition + size - 1);
auto readCacheFilePath = GetCachePath() / GenerateReadCacheKey(range);
#ifdef _TRACEGET
static FILE* output = fopen("getobject.log", "wb");
fprintf(output, "%ld,%ld,%ld\r\n", range.first, range.second, size);
fflush(output);
#endif
if(boost::filesystem::exists(readCacheFilePath))
{
auto readCacheFileStream = Framework::CreateInputStdStream(readCacheFilePath.native());
auto cacheRead = readCacheFileStream.Read(buffer, size);
assert(cacheRead == size);
return size;
}
assert(size > 0);
CAmazonS3Client client(CS3Config::GetInstance().GetAccessKeyId(), CS3Config::GetInstance().GetSecretAccessKey(), m_bucketRegion);
GetObjectRequest request;
request.object = m_objectName;
request.bucket = m_bucketName;
request.range = std::make_pair(m_objectPosition, m_objectPosition + size - 1);
request.range = range;
auto objectContent = client.GetObject(request);
assert(objectContent.data.size() == size);
memcpy(buffer, objectContent.data.data(), size);
m_objectPosition += size;
#ifdef _TRACEGET
static FILE* output = fopen("getobject.log", "wb");
fprintf(output, "%ld,%ld,%ld\r\n",
request.range.first, request.range.second, size);
fflush(output);
#endif
{
auto readCacheFileStream = Framework::CreateOutputStdStream(readCacheFilePath.native());
readCacheFileStream.Write(objectContent.data.data(), size);
}
return 0;
return size;
}
uint64 CS3ObjectStream::Write(const void*, uint64)
@ -90,6 +110,16 @@ bool CS3ObjectStream::IsEOF()
return (m_objectPosition == m_objectSize);
}
boost::filesystem::path CS3ObjectStream::GetCachePath()
{
return CAppConfig::GetInstance().GetBasePath() / CACHE_PATH;
}
std::string CS3ObjectStream::GenerateReadCacheKey(const std::pair<uint64, uint64>& range) const
{
return string_format("%s-%ld-%ld", m_objectEtag.c_str(), range.first, range.second);
}
void CS3ObjectStream::GetObjectInfo()
{
#if 0
@ -111,4 +141,5 @@ void CS3ObjectStream::GetObjectInfo()
request.bucket = m_bucketName;
auto objectHeader = client.HeadObject(m_objectName);
m_objectSize = objectHeader.contentLength;
m_objectEtag = objectHeader.etag;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include "Stream.h"
#include "boost_filesystem_def.h"
class CS3ObjectStream : public Framework::CStream
{
@ -14,11 +15,17 @@ public:
bool IsEOF() override;
private:
static boost::filesystem::path GetCachePath();
std::string GenerateReadCacheKey(const std::pair<uint64, uint64>&) const;
void GetObjectInfo();
std::string m_bucketName;
std::string m_bucketRegion;
std::string m_objectName;
//Object Metadata
uint64 m_objectSize = 0;
std::string m_objectEtag;
uint64 m_objectPosition = 0;
};