1#include <s3api/clients/client.hpp>
6#include <boost/algorithm/string.hpp>
9#include <userver/http/common_headers.hpp>
10#include <userver/http/url.hpp>
11#include <userver/logging/log.hpp>
12#include <userver/utils/algo.hpp>
13#include <userver/utils/exception.hpp>
15#include <userver/s3api/authenticators/access_key.hpp>
17#include <s3api/s3_connection.hpp>
18#include <s3api/s3api_methods.hpp>
20USERVER_NAMESPACE_BEGIN
25const std::string kMeta =
"x-amz-meta-";
26const std::string kTagging =
"X-Amz-Tagging";
27constexpr const std::size_t kMaxS3Keys = 1000;
29constexpr http::
headers::PredefinedHeader kEtagHeader{
"ETag"};
31void SaveMeta(clients::http::Headers& headers,
const ClientImpl::Meta& meta) {
32 for (
const auto& [header, value] : meta) {
33 headers
[kMeta + header
] = value;
37void ReadMeta(
const clients::http::Headers& headers,
ClientImpl::Meta& meta) {
38 for (
const auto& [header, value] : headers) {
39 if (boost::istarts_with(header, kMeta)) {
40 meta
[header.substr(kMeta.length())
] = value;
45void SaveTags(clients::http::Headers& headers,
const std::vector<
ClientImpl::
Tag>& tags) {
47 for (
const auto& [key, value] : tags) {
48 size += key.size() + value.size() + 2;
50 std::string tag_values;
51 tag_values.reserve(size);
52 for (
const auto& [key, value] : tags) {
53 tag_values.append(key);
54 tag_values.append(
"=");
55 tag_values.append(value);
56 tag_values.append(
"&");
58 if (!tag_values.empty()) {
60 tag_values.pop_back();
62 headers
[kTagging
] = std::move(tag_values);
65void AddQueryParamsToPresignedUrl(
66 std::ostringstream& generated_url,
71 if (!req.req.empty()) {
72 generated_url <<
"/" + req.req;
75 auto params = authenticator->Sign(req, expires_at);
77 if (!params.empty()) {
83std::string GeneratePresignedUrl(
85 std::string_view host,
86 std::string_view protocol,
87 const std::chrono::system_clock::time_point& expires_at,
90 std::ostringstream generated_url;
93 generated_url << protocol << request.bucket <<
"." << host;
94 const auto expires_at_time_t = std::chrono::system_clock::to_time_t(expires_at);
95 AddQueryParamsToPresignedUrl(generated_url, expires_at_time_t, request, std::move(authenticator));
96 return generated_url.str();
99bool IsS3ResponseTruncated(
const pugi::xml_node& list_bucket_result) {
100 const auto is_truncated = list_bucket_result.child(
"IsTruncated").child_value();
101 return std::string_view{is_truncated} ==
"true";
106 pugi::xml_document xml;
107 const pugi::xml_parse_result parse_result = xml.load_string(s3_response.c_str());
108 if (parse_result.status != pugi::status_ok) {
110 "Failed to parse S3 list response as xml, error: {}, response: {}",
111 parse_result.description(),
116 const auto list_bucket_result = xml.child(
"ListBucketResult");
117 is_truncated = IsS3ResponseTruncated(list_bucket_result);
119 const auto items = list_bucket_result.children(
"Contents");
120 for (
const auto& item : items) {
121 const auto key = item.child(
"Key").child_value();
122 const auto size = std::stoull(item.child(
"Size").child_value());
123 const auto last_modified = item.child(
"LastModified").child_value();
124 result.push_back(
ObjectMeta{key, size, last_modified});
126 }
catch (
const pugi::xpath_exception& ex) {
128 fmt::format(
"Bad xml structure for S3 list response, error: {}, response: {}", ex.what(), s3_response)
134std::vector<std::string> ParseS3DirectoriesListResponse(
utils::
zstring_view s3_response,
bool& is_truncated) {
135 std::vector<std::string> result;
136 pugi::xml_document xml;
137 const pugi::xml_parse_result parse_result = xml.load_string(s3_response.c_str());
138 if (parse_result.status != pugi::status_ok) {
140 "Failed to parse S3 directories list response as xml, error: {}, "
142 parse_result.description(),
147 const auto list_bucket_result = xml.child(
"ListBucketResult");
148 is_truncated = IsS3ResponseTruncated(list_bucket_result);
150 const auto items = list_bucket_result.children(
"CommonPrefixes");
151 for (
const auto& item : items) {
152 result.push_back(item.child(
"Prefix").child_value());
154 }
catch (
const pugi::xpath_exception& ex) {
156 "Bad xml structure for S3 directories list response, error: {}, "
174 : conn_(std::move(s3conn)),
175 authenticator_{std::move(authenticator)},
176 bucket_(std::move(bucket))
191std::string_view
ClientImpl::GetBucketName()
const {
return bucket_; }
194 std::string_view path,
196 const std::optional<Meta>& meta,
197 std::string_view content_type,
198 const std::optional<std::string>& content_disposition,
199 const std::optional<std::vector<
Tag>>& tags
201 auto req = api_methods::PutObject(
209 if (meta.has_value()) {
210 SaveMeta(req.headers, meta.value());
212 if (tags.has_value()) {
213 SaveTags(req.headers, tags.value());
215 return RequestApi(req,
"put_object");
218void ClientImpl::DeleteObject(std::string_view path)
const {
219 auto req = api_methods::DeleteObject(bucket_, path);
220 RequestApi(req,
"delete_object");
223std::optional<std::string>
ClientImpl::GetObject(
224 std::string_view path,
225 std::optional<std::string> version,
230 return std::make_optional(TryGetObject(path, std::move(version), headers_data, headers_request));
232 if (e.code() == 404) {
233 LOG_INFO() <<
"Can't get object with path: " << path <<
", object not found:" << e.what();
235 LOG_ERROR() <<
"Can't get object with path: " << path <<
", unknown error:" << e.what();
238 }
catch (
const std::exception& e) {
239 LOG_ERROR() <<
"Can't get object with path: " << path <<
", unknown error:" << e.what();
245 std::string_view path,
246 std::optional<std::string> version,
250 auto req = api_methods::GetObject(bucket_, path, std::move(version));
251 return RequestApi(req,
"get_object", headers_data, headers_request);
254std::optional<std::string>
ClientImpl::GetPartialObject(
255 std::string_view path,
256 std::string_view range,
257 std::optional<std::string> version,
262 return std::make_optional(TryGetPartialObject(path, range, std::move(version), headers_data, headers_request));
264 if (e.code() == 404) {
265 LOG_INFO() <<
"Can't get object with path: " << path <<
", object not found:" << e.what();
267 LOG_ERROR() <<
"Can't get object with path: " << path <<
", unknown error:" << e.what();
270 }
catch (
const std::exception& e) {
271 LOG_ERROR() <<
"Can't get object with path: " << path <<
", unknown error:" << e.what();
277 std::string_view path,
278 std::string_view range,
279 std::optional<std::string> version,
283 auto req = api_methods::GetObject(bucket_, path, std::move(version));
284 api_methods::SetRange(req, range);
285 return RequestApi(req,
"get_object", headers_data, headers_request);
289 std::string_view path,
293 auto req = api_methods::GetObjectHead(bucket_, path);
295 RequestApi(req,
"get_object_head", &headers_data, headers_request);
296 }
catch (
const std::exception& e) {
297 LOG_INFO() <<
"Can't get object with path: " << path <<
", error:" << e.what();
300 return std::make_optional(std::move(headers_data));
303[[deprecated]] std::string
ClientImpl::GenerateDownloadUrl(std::string_view path, time_t expires_at,
bool use_ssl)
305 auto req = api_methods::GetObject(bucket_, path);
307 std::ostringstream generated_url;
308 auto host = conn_->GetHost();
309 if (host.find(
"://") == std::string::npos) {
310 generated_url << (use_ssl ?
"https" :
"http") <<
"://";
312 generated_url << host;
314 if (!req.bucket.empty()) {
315 generated_url <<
"/" + req.bucket;
317 AddQueryParamsToPresignedUrl(generated_url, expires_at, req, authenticator_);
318 return generated_url.str();
321std::string
ClientImpl::GenerateDownloadUrlVirtualHostAddressing(
322 std::string_view path,
323 const std::chrono::system_clock::time_point& expires_at,
324 std::string_view protocol
326 auto req = api_methods::GetObject(bucket_, path);
327 if (req.bucket.empty()) {
328 throw NoBucketError(
"presigned url for empty bucket string");
330 return GeneratePresignedUrl(req, conn_->GetHost(), protocol, expires_at, authenticator_);
333std::string
ClientImpl::GenerateUploadUrlVirtualHostAddressing(
334 std::string_view data,
335 std::string_view content_type,
336 std::string_view path,
337 const std::chrono::system_clock::time_point& expires_at,
338 std::string_view protocol
340 auto req = api_methods::PutObject(bucket_, path, std::string{data}, content_type);
341 if (req.bucket.empty()) {
342 throw NoBucketError(
"presigned url for empty bucket string");
344 return GeneratePresignedUrl(req, conn_->GetHost(), protocol, expires_at, authenticator_);
348 if (!authenticator_) {
353 auto auth_headers = authenticator_->Auth(request);
356 auto it = std::ranges::find_if(auth_headers, [&request](
const auto& header) {
357 return request.headers.contains(header.first);
360 if (it != auth_headers.cend()) {
366 .insert(std::make_move_iterator(std::begin(auth_headers))
, std::make_move_iterator(std::end(auth_headers))
);
371 std::string_view method_name,
377 auto response = conn_->RequestApi(request, method_name);
380 if (headers_request.need_meta) {
381 headers_data->meta.emplace();
382 ReadMeta(response->headers(), *headers_data->meta);
384 if (headers_request.headers) {
385 headers_data->headers.emplace();
386 for (
const auto& header : *headers_request.headers) {
387 if (
auto it = response->headers()
.find(header
); it != response->headers()
.end()) {
388 headers_data->headers->
emplace(it->first
, it->second
);
397std::optional<std::string>
ClientImpl::ListBucketContents(
398 std::string_view path,
401 std::string delimiter
403 auto req = api_methods::ListBucketContents(bucket_, path, max_keys, marker, delimiter);
404 std::string reply = RequestApi(req,
"list_bucket_contents");
408 return std::optional<std::string>{std::move(reply)};
411std::vector<
ObjectMeta>
ClientImpl::ListBucketContentsParsed(std::string_view path_prefix)
const {
414 std::string marker{};
415 bool is_finished =
false;
416 while (!is_finished) {
417 auto response = ListBucketContents(path_prefix, kMaxS3Keys, marker, {});
419 LOG_WARNING() <<
"Empty S3 bucket listing response for path prefix " << path_prefix;
423 bool is_truncated =
false;
424 auto response_result = ParseS3ListResponse(*response, is_truncated);
425 if (response_result.empty()) {
433 std::make_move_iterator(response_result.begin()),
434 std::make_move_iterator(response_result.end())
436 marker = result.back().key;
441std::vector<std::string>
ClientImpl::ListBucketDirectories(std::string_view path_prefix)
const {
442 std::vector<std::string> result;
444 std::string marker{};
445 bool is_finished =
false;
446 while (!is_finished) {
447 auto response = ListBucketContents(path_prefix, kMaxS3Keys, marker,
"/");
449 LOG_WARNING() <<
"Empty S3 directory bucket listing response for path prefix " << path_prefix;
453 bool is_truncated =
false;
454 auto response_result = ParseS3DirectoriesListResponse(*response, is_truncated);
455 if (response_result.empty()) {
463 std::make_move_iterator(response_result.begin()),
464 std::make_move_iterator(response_result.end())
466 marker = result.back();
473 std::string_view key_from,
474 std::string_view bucket_to,
475 std::string_view key_to,
476 const std::optional<Meta>& meta
478 const auto object_head = [&] {
480 header_request.headers.emplace();
481 header_request.headers->emplace(USERVER_NAMESPACE::http::
headers::kContentType);
482 header_request.need_meta =
false;
483 return GetObjectHead(key_from, header_request);
489 const auto content_type = [&object_head]() -> std::optional<std::string> {
490 if (!object_head->headers) {
495 *object_head->headers
,
496 USERVER_NAMESPACE::http::
headers::kContentType
503 auto req = api_methods::CopyObject(bucket_, key_from, bucket_to, key_to, *content_type);
505 SaveMeta(req.headers, *meta);
507 return RequestApi(req,
"copy_object");
511 std::string_view key_from,
512 std::string_view key_to,
513 const std::optional<Meta>& meta
515 return CopyObject(key_from, bucket_, key_to, meta);
522 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
523 const auto api_response_body = RequestApi(api_request,
"create_multipart_upload");
527 fmt::format(
"failed to parse CreateMultipartUpload action response - {}; key: {}", exc.what(), request.key)
533 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
538 RequestApi(api_request,
"upload_part", &response_headers_data, expected_headers);
539 if (!response_headers_data.headers) {
542 const auto iter = response_headers_data.headers->
find(kEtagHeader
);
543 if (iter == response_headers_data.headers->
end()) {
546 if (iter->second.empty()) {
549 return {std::move(iter->second)};
553 "failed to parse UploadPart action response - {}; upload_id '{}'; key '{}'",
564 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
565 const auto api_response_body = RequestApi(api_request,
"complete_multipart_upload");
569 "failed to parse CompleteMultipartUpload action response - {}; upload_id '{}'; key '{}'",
577 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
578 RequestApi(api_request,
"abort_multipart_upload");
583 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
584 const auto api_response_body = RequestApi(api_request,
"list_parts");
588 "failed to parse ListParts action response - {}; upload_id '{}'; key '{}'",
599 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
600 const auto api_response_body = RequestApi(api_request,
"list_multipart_uploads");
603 throw MultipartUploadError(fmt::format(
"failed to parse ListMultipartUploads action response - {}", exc.what()));
606ClientPtr GetS3Client(
618ClientPtr GetS3Client(
623 return std::static_pointer_cast<
Client>(std::make_shared<
ClientImpl>(s3conn, authenticator, bucket));