From 69c215e7b33fc2affd284bc49ec815822ee7db12 Mon Sep 17 00:00:00 2001 From: Jean-Philip Desjardins Date: Mon, 19 Feb 2018 16:46:16 -0500 Subject: [PATCH] More S3ObjectStream WIP. Add some caching and ListObjects. --- Source/s3stream/AmazonS3Client.cpp | 45 ++++++++++++++++++++++++++-- Source/s3stream/AmazonS3Client.h | 12 ++++++++ Source/s3stream/S3ObjectStream.cpp | 47 +++++++++++++++++++++++++----- Source/s3stream/S3ObjectStream.h | 7 +++++ 4 files changed, 101 insertions(+), 10 deletions(-) diff --git a/Source/s3stream/AmazonS3Client.cpp b/Source/s3stream/AmazonS3Client.cpp index 8cfbfc66..63c8d434 100644 --- a/Source/s3stream/AmazonS3Client.cpp +++ b/Source/s3stream/AmazonS3Client.cpp @@ -1,5 +1,6 @@ #include "http/HttpClientFactory.h" #include "string_format.h" +#include "xml/Parser.h" #include #include #include @@ -216,8 +217,48 @@ HeadObjectResult CAmazonS3Client::HeadObject(std::string objectName) } HeadObjectResult result; - auto contentLength = response.headers.find("Content-Length"); - result.contentLength = atol(contentLength->second.c_str()); + + auto contentLengthIterator = response.headers.find("Content-Length"); + if(contentLengthIterator != std::end(response.headers)) + { + result.contentLength = atol(contentLengthIterator->second.c_str()); + } + + auto etagIterator = response.headers.find("ETag"); + if(etagIterator != std::end(response.headers)) + { + result.etag = etagIterator->second; + } + + return result; +} + +ListObjectsResult CAmazonS3Client::ListObjects(std::string bucketName) +{ + Request rq; + rq.method = Framework::Http::HTTP_VERB::GET; + rq.uri = "/"; + + auto response = ExecuteRequest(rq); + if(response.statusCode != Framework::Http::HTTP_STATUS_CODE::OK) + { + throw std::runtime_error("Failed to list objects"); + } + + ListObjectsResult result; + + auto documentNode = std::unique_ptr(Framework::Xml::CParser::ParseDocument(response.data)); + auto contentsNodes = documentNode->SelectNodes("ListBucketResult/Contents"); + for(const auto& contentsNode : contentsNodes) + { + Object object; + if(auto keyNode = contentsNode->Select("Key")) + { + object.key = keyNode->GetInnerText(); + } + result.objects.push_back(object); + } + return result; } diff --git a/Source/s3stream/AmazonS3Client.h b/Source/s3stream/AmazonS3Client.h index 6ddc2bb5..69fd724d 100644 --- a/Source/s3stream/AmazonS3Client.h +++ b/Source/s3stream/AmazonS3Client.h @@ -29,6 +29,17 @@ struct GetObjectResult struct HeadObjectResult { uint64 contentLength = 0; + std::string etag; +}; + +struct Object +{ + std::string key; +}; + +struct ListObjectsResult +{ + std::vector objects; }; class CAmazonS3Client @@ -39,6 +50,7 @@ public: GetBucketLocationResult GetBucketLocation(const GetBucketLocationRequest&); GetObjectResult GetObject(const GetObjectRequest&); HeadObjectResult HeadObject(std::string); + ListObjectsResult ListObjects(std::string); private: struct Request diff --git a/Source/s3stream/S3ObjectStream.cpp b/Source/s3stream/S3ObjectStream.cpp index 40778367..d513c3c3 100644 --- a/Source/s3stream/S3ObjectStream.cpp +++ b/Source/s3stream/S3ObjectStream.cpp @@ -4,9 +4,13 @@ #include "AmazonS3Client.h" #include "Singleton.h" #include "AppConfig.h" +#include "PathUtils.h" +#include "string_format.h" +#include "StdStreamUtils.h" #define PREF_S3_OBJECTSTREAM_ACCESSKEYID "s3.objectstream.accesskeyid" #define PREF_S3_OBJECTSTREAM_SECRETACCESSKEY "s3.objectstream.secretaccesskey" +#define CACHE_PATH "s3objectstream_cache" class CS3Config : public CSingleton { @@ -32,30 +36,46 @@ CS3ObjectStream::CS3ObjectStream(const char* bucketName, const char* objectName) : m_bucketName(bucketName) , m_objectName(objectName) { + Framework::PathUtils::EnsurePathExists(GetCachePath()); GetObjectInfo(); } uint64 CS3ObjectStream::Read(void* buffer, uint64 size) { + auto range = std::make_pair(m_objectPosition, m_objectPosition + size - 1); + auto readCacheFilePath = GetCachePath() / GenerateReadCacheKey(range); + +#ifdef _TRACEGET + static FILE* output = fopen("getobject.log", "wb"); + fprintf(output, "%ld,%ld,%ld\r\n", range.first, range.second, size); + fflush(output); +#endif + + if(boost::filesystem::exists(readCacheFilePath)) + { + auto readCacheFileStream = Framework::CreateInputStdStream(readCacheFilePath.native()); + auto cacheRead = readCacheFileStream.Read(buffer, size); + assert(cacheRead == size); + return size; + } + assert(size > 0); CAmazonS3Client client(CS3Config::GetInstance().GetAccessKeyId(), CS3Config::GetInstance().GetSecretAccessKey(), m_bucketRegion); GetObjectRequest request; request.object = m_objectName; request.bucket = m_bucketName; - request.range = std::make_pair(m_objectPosition, m_objectPosition + size - 1); + request.range = range; auto objectContent = client.GetObject(request); assert(objectContent.data.size() == size); memcpy(buffer, objectContent.data.data(), size); m_objectPosition += size; -#ifdef _TRACEGET - static FILE* output = fopen("getobject.log", "wb"); - fprintf(output, "%ld,%ld,%ld\r\n", - request.range.first, request.range.second, size); - fflush(output); -#endif + { + auto readCacheFileStream = Framework::CreateOutputStdStream(readCacheFilePath.native()); + readCacheFileStream.Write(objectContent.data.data(), size); + } - return 0; + return size; } uint64 CS3ObjectStream::Write(const void*, uint64) @@ -90,6 +110,16 @@ bool CS3ObjectStream::IsEOF() return (m_objectPosition == m_objectSize); } +boost::filesystem::path CS3ObjectStream::GetCachePath() +{ + return CAppConfig::GetInstance().GetBasePath() / CACHE_PATH; +} + +std::string CS3ObjectStream::GenerateReadCacheKey(const std::pair& range) const +{ + return string_format("%s-%ld-%ld", m_objectEtag.c_str(), range.first, range.second); +} + void CS3ObjectStream::GetObjectInfo() { #if 0 @@ -111,4 +141,5 @@ void CS3ObjectStream::GetObjectInfo() request.bucket = m_bucketName; auto objectHeader = client.HeadObject(m_objectName); m_objectSize = objectHeader.contentLength; + m_objectEtag = objectHeader.etag; } diff --git a/Source/s3stream/S3ObjectStream.h b/Source/s3stream/S3ObjectStream.h index c10b7379..9395535c 100644 --- a/Source/s3stream/S3ObjectStream.h +++ b/Source/s3stream/S3ObjectStream.h @@ -1,6 +1,7 @@ #pragma once #include "Stream.h" +#include "boost_filesystem_def.h" class CS3ObjectStream : public Framework::CStream { @@ -14,11 +15,17 @@ public: bool IsEOF() override; private: + static boost::filesystem::path GetCachePath(); + std::string GenerateReadCacheKey(const std::pair&) const; void GetObjectInfo(); std::string m_bucketName; std::string m_bucketRegion; std::string m_objectName; + + //Object Metadata uint64 m_objectSize = 0; + std::string m_objectEtag; + uint64 m_objectPosition = 0; };