userver: /data/code/userver/libraries/s3api/src/s3api/clients/client.cpp Source File
Loading...
Searching...
No Matches
client.cpp
1#include <s3api/clients/client.hpp>
2
3#include <sstream>
4
5#include <fmt/format.h>
6#include <boost/algorithm/string.hpp>
7#include <pugixml.hpp>
8
9#include <userver/http/common_headers.hpp>
10#include <userver/http/url.hpp>
11#include <userver/logging/log.hpp>
12#include <userver/utils/algo.hpp>
13#include <userver/utils/exception.hpp>
14
15#include <userver/s3api/authenticators/access_key.hpp>
16
17#include <s3api/s3_connection.hpp>
18#include <s3api/s3api_methods.hpp>
19
20USERVER_NAMESPACE_BEGIN
21
22namespace s3api {
23namespace {
24
25const std::string kMeta = "x-amz-meta-";
26const std::string kTagging = "X-Amz-Tagging";
27constexpr const std::size_t kMaxS3Keys = 1000;
28
29constexpr http::headers::PredefinedHeader kEtagHeader{"ETag"};
30
31void SaveMeta(clients::http::Headers& headers, const ClientImpl::Meta& meta) {
32 for (const auto& [header, value] : meta) {
33 headers[kMeta + header] = value;
34 }
35}
36
37void ReadMeta(const clients::http::Headers& headers, ClientImpl::Meta& meta) {
38 for (const auto& [header, value] : headers) {
39 if (boost::istarts_with(header, kMeta)) {
40 meta[header.substr(kMeta.length())] = value;
41 }
42 }
43}
44
45void SaveTags(clients::http::Headers& headers, const std::vector<ClientImpl::Tag>& tags) {
46 size_t size = 0;
47 for (const auto& [key, value] : tags) {
48 size += key.size() + value.size() + 2;
49 }
50 std::string tag_values;
51 tag_values.reserve(size);
52 for (const auto& [key, value] : tags) {
53 tag_values.append(key);
54 tag_values.append("=");
55 tag_values.append(value);
56 tag_values.append("&");
57 }
58 if (!tag_values.empty()) {
59 // pop last &
60 tag_values.pop_back();
61 }
62 headers[kTagging] = std::move(tag_values);
63}
64
65void AddQueryParamsToPresignedUrl(
66 std::ostringstream& generated_url,
67 time_t expires_at,
68 const Request& req,
69 std::shared_ptr<authenticators::Authenticator> authenticator
70) {
71 if (!req.req.empty()) {
72 generated_url << "/" + req.req;
73 }
74
75 auto params = authenticator->Sign(req, expires_at);
76
77 if (!params.empty()) {
78 generated_url << "?";
79 generated_url << USERVER_NAMESPACE::http::MakeQuery(params);
80 }
81}
82
83std::string GeneratePresignedUrl(
84 const Request& request,
85 std::string_view host,
86 std::string_view protocol,
87 const std::chrono::system_clock::time_point& expires_at,
88 std::shared_ptr<authenticators::Authenticator> authenticator
89) {
90 std::ostringstream generated_url;
91 // both internal (s3.mds(t)) and private (s3-private)
92 // balancers support virtual host addressing and https
93 generated_url << protocol << request.bucket << "." << host;
94 const auto expires_at_time_t = std::chrono::system_clock::to_time_t(expires_at);
95 AddQueryParamsToPresignedUrl(generated_url, expires_at_time_t, request, std::move(authenticator));
96 return generated_url.str();
97}
98
99std::vector<ObjectMeta> ParseS3ListResponse(std::string_view s3_response) {
100 std::vector<ObjectMeta> result;
101 pugi::xml_document xml;
102 const pugi::xml_parse_result parse_result = xml.load_string(s3_response.data());
103 if (parse_result.status != pugi::status_ok) {
104 throw ListBucketError(fmt::format(
105 "Failed to parse S3 list response as xml, error: {}, response: {}", parse_result.description(), s3_response
106 ));
107 }
108 try {
109 const auto items = xml.child("ListBucketResult").children("Contents");
110 for (const auto& item : items) {
111 const auto key = item.child("Key").child_value();
112 const auto size = std::stoull(item.child("Size").child_value());
113 const auto last_modified = item.child("LastModified").child_value();
114 result.push_back(ObjectMeta{key, size, last_modified});
115 }
116 } catch (const pugi::xpath_exception& ex) {
117 throw ListBucketError(
118 fmt::format("Bad xml structure for S3 list response, error: {}, response: {}", ex.what(), s3_response)
119 );
120 }
121 return result;
122}
123
124std::vector<std::string> ParseS3DirectoriesListResponse(std::string_view s3_response) {
125 std::vector<std::string> result;
126 pugi::xml_document xml;
127 const pugi::xml_parse_result parse_result = xml.load_string(s3_response.data());
128 if (parse_result.status != pugi::status_ok) {
129 throw ListBucketError(fmt::format(
130 "Failed to parse S3 directories list response as xml, error: {}, "
131 "response: {}",
132 parse_result.description(),
133 s3_response
134 ));
135 }
136 try {
137 const auto items = xml.child("ListBucketResult").children("CommonPrefixes");
138 for (const auto& item : items) {
139 result.push_back(item.child("Prefix").child_value());
140 }
141 } catch (const pugi::xpath_exception& ex) {
142 throw ListBucketError(fmt::format(
143 "Bad xml structure for S3 directories list response, error: {}, "
144 "response: {}",
145 ex.what(),
146 s3_response
147 ));
148 }
149 return result;
150}
151
152} // namespace
153
154void ClientImpl::UpdateConfig(ConnectionCfg&& config) { conn_->UpdateConfig(std::move(config)); }
155
156ClientImpl::ClientImpl(
157 std::shared_ptr<S3Connection> s3conn,
158 std::shared_ptr<authenticators::Authenticator> authenticator,
159 std::string bucket
160)
161 : conn_(std::move(s3conn)), authenticator_{std::move(authenticator)}, bucket_(std::move(bucket)) {}
162
163ClientImpl::ClientImpl(
164 std::shared_ptr<S3Connection> s3conn,
165 std::shared_ptr<authenticators::AccessKey> authenticator,
166 std::string bucket
167)
168 : ClientImpl(
169 std::move(s3conn),
170 std::static_pointer_cast<authenticators::Authenticator>(std::move(authenticator)),
171 std::move(bucket)
172 ) {}
173
174std::string_view ClientImpl::GetBucketName() const { return bucket_; }
175
176std::string ClientImpl::PutObject(
177 std::string_view path, //
178 std::string data, //
179 const std::optional<Meta>& meta, //
180 std::string_view content_type, //
181 const std::optional<std::string>& content_disposition,
182 const std::optional<std::vector<Tag>>& tags
183) const {
184 auto req = api_methods::PutObject( //
185 bucket_, //
186 path, //
187 std::move(data), //
188 content_type, //
189 content_disposition //
190 );
191
192 if (meta.has_value()) {
193 SaveMeta(req.headers, meta.value());
194 }
195 if (tags.has_value()) {
196 SaveTags(req.headers, tags.value());
197 }
198 return RequestApi(req, "put_object");
199}
200
201void ClientImpl::DeleteObject(std::string_view path) const {
202 auto req = api_methods::DeleteObject(bucket_, path);
203 RequestApi(req, "delete_object");
204}
205
206std::optional<std::string> ClientImpl::GetObject(
207 std::string_view path,
208 std::optional<std::string> version,
209 HeadersDataResponse* headers_data,
210 const HeaderDataRequest& headers_request
211) const {
212 try {
213 return std::make_optional(TryGetObject(path, std::move(version), headers_data, headers_request));
214 } catch (const clients::http::HttpException& e) {
215 if (e.code() == 404) {
216 LOG_INFO() << "Can't get object with path: " << path << ", object not found:" << e.what();
217 } else {
218 LOG_ERROR() << "Can't get object with path: " << path << ", unknown error:" << e.what();
219 }
220 return std::nullopt;
221 } catch (const std::exception& e) {
222 LOG_ERROR() << "Can't get object with path: " << path << ", unknown error:" << e.what();
223 return std::nullopt;
224 }
225}
226
227std::string ClientImpl::TryGetObject(
228 std::string_view path,
229 std::optional<std::string> version,
230 HeadersDataResponse* headers_data,
231 const HeaderDataRequest& headers_request
232) const {
233 auto req = api_methods::GetObject(bucket_, path, std::move(version));
234 return RequestApi(req, "get_object", headers_data, headers_request);
235}
236
237std::optional<std::string> ClientImpl::GetPartialObject(
238 std::string_view path,
239 std::string_view range,
240 std::optional<std::string> version,
241 HeadersDataResponse* headers_data,
242 const HeaderDataRequest& headers_request
243) const {
244 try {
245 return std::make_optional(TryGetPartialObject(path, range, std::move(version), headers_data, headers_request));
246 } catch (const clients::http::HttpException& e) {
247 if (e.code() == 404) {
248 LOG_INFO() << "Can't get object with path: " << path << ", object not found:" << e.what();
249 } else {
250 LOG_ERROR() << "Can't get object with path: " << path << ", unknown error:" << e.what();
251 }
252 return std::nullopt;
253 } catch (const std::exception& e) {
254 LOG_ERROR() << "Can't get object with path: " << path << ", unknown error:" << e.what();
255 return std::nullopt;
256 }
257}
258
259std::string ClientImpl::TryGetPartialObject(
260 std::string_view path,
261 std::string_view range,
262 std::optional<std::string> version,
263 HeadersDataResponse* headers_data,
264 const HeaderDataRequest& headers_request
265) const {
266 auto req = api_methods::GetObject(bucket_, path, std::move(version));
267 api_methods::SetRange(req, range);
268 return RequestApi(req, "get_object", headers_data, headers_request);
269}
270
271std::optional<ClientImpl::HeadersDataResponse>
272ClientImpl::GetObjectHead(std::string_view path, const HeaderDataRequest& headers_request) const {
273 HeadersDataResponse headers_data;
274 auto req = api_methods::GetObjectHead(bucket_, path);
275 try {
276 RequestApi(req, "get_object_head", &headers_data, headers_request);
277 } catch (const std::exception& e) {
278 LOG_INFO() << "Can't get object with path: " << path << ", error:" << e.what();
279 return std::nullopt;
280 }
281 return std::make_optional(std::move(headers_data));
282}
283
284[[deprecated]] std::string ClientImpl::GenerateDownloadUrl(std::string_view path, time_t expires_at, bool use_ssl)
285 const {
286 auto req = api_methods::GetObject(bucket_, path);
287
288 std::ostringstream generated_url;
289 auto host = conn_->GetHost();
290 if (host.find("://") == std::string::npos) {
291 generated_url << (use_ssl ? "https" : "http") << "://";
292 }
293 generated_url << host;
294
295 if (!req.bucket.empty()) {
296 generated_url << "/" + req.bucket;
297 }
298 AddQueryParamsToPresignedUrl(generated_url, expires_at, req, authenticator_);
299 return generated_url.str();
300}
301
302std::string ClientImpl::GenerateDownloadUrlVirtualHostAddressing(
303 std::string_view path,
304 const std::chrono::system_clock::time_point& expires_at,
305 std::string_view protocol
306) const {
307 auto req = api_methods::GetObject(bucket_, path);
308 if (req.bucket.empty()) {
309 throw NoBucketError("presigned url for empty bucket string");
310 }
311 return GeneratePresignedUrl(req, conn_->GetHost(), protocol, expires_at, authenticator_);
312}
313
314std::string ClientImpl::GenerateUploadUrlVirtualHostAddressing(
315 std::string_view data,
316 std::string_view content_type,
317 std::string_view path,
318 const std::chrono::system_clock::time_point& expires_at,
319 std::string_view protocol
320) const {
321 auto req = api_methods::PutObject(bucket_, path, std::string{data}, content_type);
322 if (req.bucket.empty()) {
323 throw NoBucketError("presigned url for empty bucket string");
324 }
325 return GeneratePresignedUrl(req, conn_->GetHost(), protocol, expires_at, authenticator_);
326}
327
328void ClientImpl::Auth(Request& request) const {
329 if (!authenticator_) {
330 // anonymous request
331 return;
332 }
333
334 auto auth_headers = authenticator_->Auth(request);
335
336 {
337 auto it = std::find_if(
338 auth_headers.cbegin(),
339 auth_headers.cend(),
340 [&request](const decltype(auth_headers)::value_type& header) {
341 return request.headers.count(std::get<0>(header));
342 }
343 );
344
345 if (it != auth_headers.cend()) {
346 throw AuthHeaderConflictError{std::string{"Conflict with auth header: "} + it->first};
347 }
348 }
349
350 request.headers.insert(
351 std::make_move_iterator(std::begin(auth_headers)), std::make_move_iterator(std::end(auth_headers))
352 );
353}
354
355std::string ClientImpl::RequestApi(
356 Request& request,
357 std::string_view method_name,
358 HeadersDataResponse* headers_data,
359 const HeaderDataRequest& headers_request
360) const {
361 Auth(request);
362
363 auto response = conn_->RequestApi(request, method_name);
364
365 if (headers_data) {
366 if (headers_request.need_meta) {
367 headers_data->meta.emplace();
368 ReadMeta(response->headers(), *headers_data->meta);
369 }
370 if (headers_request.headers) {
371 headers_data->headers.emplace();
372 for (const auto& header : *headers_request.headers) {
373 if (auto it = response->headers().find(header); it != response->headers().end()) {
374 headers_data->headers->emplace(it->first, it->second);
375 }
376 }
377 }
378 }
379
380 return response->body();
381}
382
383std::optional<std::string>
384ClientImpl::ListBucketContents(std::string_view path, int max_keys, std::string marker, std::string delimiter) const {
385 auto req = api_methods::ListBucketContents(bucket_, path, max_keys, marker, delimiter);
386 std::string reply = RequestApi(req, "list_bucket_contents");
387 if (reply.empty()) {
388 return std::nullopt;
389 }
390 return std::optional<std::string>{std::move(reply)};
391}
392
393std::vector<ObjectMeta> ClientImpl::ListBucketContentsParsed(std::string_view path_prefix) const {
394 std::vector<ObjectMeta> result;
395 // S3 doc: specifies the key to start with when listing objects in a bucket
396 std::string marker = "";
397 bool is_finished = false;
398 while (!is_finished) {
399 auto response = ListBucketContents(path_prefix, kMaxS3Keys, marker, {});
400 if (!response) {
401 LOG_WARNING() << "Empty S3 bucket listing response for path prefix " << path_prefix;
402 break;
403 }
404 auto response_result = ParseS3ListResponse(*response);
405 if (response_result.empty()) {
406 break;
407 }
408 if (response_result.size() < kMaxS3Keys) {
409 is_finished = true;
410 }
411 result.insert(
412 result.end(),
413 std::make_move_iterator(response_result.begin()),
414 std::make_move_iterator(response_result.end())
415 );
416 marker = result.back().key;
417 }
418 return result;
419}
420
421std::vector<std::string> ClientImpl::ListBucketDirectories(std::string_view path_prefix) const {
422 std::vector<std::string> result;
423 // S3 doc: specifies the key to start with when listing objects in a bucket
424 std::string marker = "";
425 bool is_finished = false;
426 while (!is_finished) {
427 auto response = ListBucketContents(path_prefix, kMaxS3Keys, marker, "/");
428 if (!response) {
429 LOG_WARNING() << "Empty S3 directory bucket listing response "
430 "for path prefix "
431 << path_prefix;
432 break;
433 }
434 auto response_result = ParseS3DirectoriesListResponse(*response);
435 if (response_result.empty()) {
436 break;
437 }
438 if (response_result.size() < kMaxS3Keys) {
439 is_finished = true;
440 }
441 result.insert(
442 result.end(),
443 std::make_move_iterator(response_result.begin()),
444 std::make_move_iterator(response_result.end())
445 );
446 marker = result.back();
447 }
448
449 return result;
450}
451
452std::string ClientImpl::CopyObject(
453 std::string_view key_from,
454 std::string_view bucket_to,
455 std::string_view key_to,
456 const std::optional<Meta>& meta
457) {
458 const auto object_head = [&] {
459 HeaderDataRequest header_request;
460 header_request.headers.emplace();
461 header_request.headers->emplace(USERVER_NAMESPACE::http::headers::kContentType);
462 header_request.need_meta = false;
463 return GetObjectHead(key_from, header_request);
464 }();
465 if (!object_head) {
466 USERVER_NAMESPACE::utils::LogErrorAndThrow("S3Api : Failed to get object head");
467 }
468
469 const auto content_type = [&object_head]() -> std::optional<std::string> {
470 if (!object_head->headers) {
471 return std::nullopt;
472 }
473
474 return USERVER_NAMESPACE::utils::FindOptional(
475 *object_head->headers, USERVER_NAMESPACE::http::headers::kContentType
476 );
477 }();
478 if (!content_type) {
479 USERVER_NAMESPACE::utils::LogErrorAndThrow("S3Api : Object head is missing `content-type` header");
480 }
481
482 auto req = api_methods::CopyObject(bucket_, key_from, bucket_to, key_to, *content_type);
483 if (meta) {
484 SaveMeta(req.headers, *meta);
485 }
486 return RequestApi(req, "copy_object");
487}
488
489std::string
490ClientImpl::CopyObject(std::string_view key_from, std::string_view key_to, const std::optional<Meta>& meta) {
491 return CopyObject(key_from, bucket_, key_to, meta);
492}
493
495 const multipart_upload::CreateMultipartUploadRequest& request
496) const try {
497 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
498 const auto api_response_body = RequestApi(api_request, "create_multipart_upload");
499 return multipart_upload::InitiateMultipartUploadResult::Parse(api_response_body);
500} catch (const ResponseParsingError& exc) {
502 fmt::format("failed to parse CreateMultipartUpload action response - {}; key: {}", exc.what(), request.key)
503 );
504}
505
506multipart_upload::UploadPartResult ClientImpl::UploadPart(const multipart_upload::UploadPartRequest& request) const
507 try {
508 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
509
510 const HeaderDataRequest expected_headers({{std::string(kEtagHeader)}}, false);
511 HeadersDataResponse response_headers_data;
512
513 RequestApi(api_request, "upload_part", &response_headers_data, expected_headers);
514 if (!response_headers_data.headers) {
515 throw ResponseParsingError("missing ETag header in response");
516 }
517 const auto iter = response_headers_data.headers->find(kEtagHeader);
518 if (iter == response_headers_data.headers->end()) {
519 throw ResponseParsingError("missing ETag header in response");
520 }
521 if (iter->second.empty()) {
522 throw ResponseParsingError("got empty ETag header value in response");
523 }
524 return {std::move(iter->second)};
525
526} catch (const ResponseParsingError& exc) {
527 throw MultipartUploadError(fmt::format(
528 "failed to parse UploadPart action response - {}; upload_id '{}'; key '{}'",
529 exc.what(),
530 request.upload_id,
531 request.key
532 ));
533}
534
536 const multipart_upload::CompleteMultipartUploadRequest& request
537) const try {
538 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
539 const auto api_response_body = RequestApi(api_request, "complete_multipart_upload");
540 return multipart_upload::CompleteMultipartUploadResult::Parse(api_response_body);
541} catch (const ResponseParsingError& exc) {
542 throw MultipartUploadError(fmt::format(
543 "failed to parse CompleteMultipartUpload action response - {}; upload_id '{}'; key '{}'",
544 exc.what(),
545 request.upload_id,
546 request.key
547 ));
548}
549
550void ClientImpl::AbortMultipartUpload(const multipart_upload::AbortMultipartUploadRequest& request) const {
551 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
552 RequestApi(api_request, "abort_multipart_upload");
553}
554
555multipart_upload::ListPartsResult ClientImpl::ListParts(const multipart_upload::ListPartsRequest& request) const try {
556 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
557 const auto api_response_body = RequestApi(api_request, "list_parts");
558 return multipart_upload::ListPartsResult::Parse(api_response_body);
559} catch (const ResponseParsingError& exc) {
560 throw MultipartUploadError(fmt::format(
561 "failed to parse ListParts action response - {}; upload_id '{}'; key '{}'",
562 exc.what(),
563 request.upload_id,
564 request.key
565 ));
566}
567
569 const multipart_upload::ListMultipartUploadsRequest& request
570) const try {
571 auto api_request = api_methods::CreateInternalApiRequest(bucket_, request);
572 const auto api_response_body = RequestApi(api_request, "list_multipart_uploads");
573 return multipart_upload::ListMultipartUploadsResult::Parse(api_response_body);
574} catch (const ResponseParsingError& exc) {
575 throw MultipartUploadError(fmt::format("failed to parse ListMultipartUploads action response - {}", exc.what()));
576}
577
578ClientPtr GetS3Client(
579 std::shared_ptr<S3Connection> s3conn,
580 std::shared_ptr<authenticators::AccessKey> authenticator,
581 std::string bucket
582) {
583 return GetS3Client(
584 std::move(s3conn), std::static_pointer_cast<authenticators::Authenticator>(authenticator), std::move(bucket)
585 );
586}
587
588ClientPtr GetS3Client(
589 std::shared_ptr<S3Connection> s3conn,
590 std::shared_ptr<authenticators::Authenticator> authenticator,
591 std::string bucket
592) {
593 return std::static_pointer_cast<Client>(std::make_shared<ClientImpl>(s3conn, authenticator, bucket));
594}
595
596} // namespace s3api
597
598USERVER_NAMESPACE_END