userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/s3api.py Source File
Loading...
Searching...
No Matches
s3api.py
1import collections
2from collections.abc import Mapping
3from collections.abc import MutableMapping
4import dataclasses
5import datetime as dt
6import hashlib
7import os
8import pathlib
9import sys
10import xml.etree.ElementTree
11
12import dateutil.tz as tz
13
14
15class _S3NoSuchUploadError(Exception):
16 code = 'NoSuchUpload'
17 message = 'The specified multipart upload does not exist.'
18
19 def __str__(self):
20 return _S3NoSuchUploadError.message
21
22
23class _S3InvalidPartError(Exception):
24 code = 'InvalidPart'
25 message = (
26 'One or more of the specified parts could not be found.'
27 ' The part might not have been uploaded, or the specified'
28 " ETag might not have matched the uploaded part's ETag."
29 )
30
31 def __str__(self):
32 return _S3InvalidPartError.message
33
34
35class _S3InvalidPartOrderError(Exception):
36 code = 'InvalidPartOrder'
37 message = 'The list of parts was not in ascending order. The parts list must be specified in order by part number.'
38
39 def __str__(self):
40 return _S3InvalidPartOrderError.message
41
42
43class _S3EntityTooSmallError(Exception):
44 code = 'EntityTooSmall'
45 message = 'Your proposed upload is smaller than the minimum allowed object size.'
46
47 def __str__(self):
48 return _S3EntityTooSmallError.message
49
50
51class _S3ClientError(Exception):
52 def __init__(self, msg: str):
53 self._msg = msg
54
55 def __str__(self):
56 return self._msg
57
58
59@dataclasses.dataclass
61 data: bytearray
62 meta: Mapping[str, str]
63
64
65@dataclasses.dataclass
67 parts: MutableMapping[int, _S3UploadPart]
68 meta: Mapping[str, str]
69
70
71@dataclasses.dataclass
73 def __init__(self):
74 self._storage: dict[str, _S3Upload] = {}
75
76 @staticmethod
77 def _generate_upload_id():
78 return os.urandom(15).hex()
79
80 @staticmethod
81 def _generate_etag(data):
82 return hashlib.md5(data).hexdigest()
83
84 def create_multipart_upload(self, key: str, user_defined_meta: Mapping[str, str] | None = None):
85 key_path = pathlib.Path(key)
86 upload_id = _S3BucketUploadStorage._generate_upload_id()
87
88 upload_meta = {
89 'Key': str(key_path),
90 'UploadId': upload_id,
91 }
92
93 if user_defined_meta:
94 upload_meta.update(user_defined_meta)
95
96 self._storage[upload_id] = _S3Upload(parts={}, meta=upload_meta)
97 return upload_meta
98
99 def abort_multipart_uplod(self, key: str, upload_id: str):
100 key_path = pathlib.Path(key)
101 upload = self._storage.get(upload_id)
102 if not upload or upload.meta['Key'] != str(key_path):
104 return self._storage.pop(upload_id)
105
106 def upload_part(
107 self,
108 key: str,
109 upload_id: str,
110 part_number: int,
111 data: bytearray,
112 last_modified: dt.datetime | str | None = None,
113 ):
114 if part_number < 1 or part_number > 10000:
115 raise _S3ClientError('partNumber value is expected to be between 1 and 10000')
116
117 key_path = pathlib.Path(key)
118 upload = self._storage.get(upload_id)
119 if not upload or upload.meta['Key'] != str(key_path):
121
122 if last_modified is None:
123 # Timezone is needed for RFC 3339 timeformat used by S3
124 last_modified = dt.datetime.now().replace(tzinfo=tz.tzlocal()).isoformat()
125 elif isinstance(last_modified, dt.datetime):
126 last_modified = last_modified.isoformat()
127
128 meta = {
129 'ETag': self._generate_etag(data),
130 'Last-Modified': last_modified,
131 'Size': str(sys.getsizeof(data)),
132 }
133
134 new_part = _S3UploadPart(data, meta)
135 upload.parts[part_number] = new_part
136 return new_part
137
138 def complete_multipart_upload(self, key: str, upload_id: str, parts_to_complete: list):
139 key_path = pathlib.Path(key)
140 upload = self._storage.get(upload_id)
141
142 if not upload or upload.meta['Key'] != str(key_path):
144
145 uploaded_parts = sorted(
146 ({'PartNumber': part_number, 'ETag': info.meta['ETag']} for part_number, info in upload.parts.items()),
147 key=lambda item: item['PartNumber'],
148 )
149 if uploaded_parts != parts_to_complete:
151
152 merged_data = bytearray()
153 for part in parts_to_complete:
154 part_number = part['PartNumber']
155 uploded_part = upload.parts[part_number]
156 merged_data += uploded_part.data
157
158 if not merged_data:
160
161 self._storage.pop(upload_id)
162 return {'Data': merged_data, 'Upload': upload}
163
164
165@dataclasses.dataclass
167 data: bytearray
168 meta: Mapping[str, str]
169
170
172 def __init__(self) -> None:
173 # use Path to normalize keys (e.g. /a//file.json)
174 self._storage: dict[pathlib.Path, S3Object] = {}
175
176 @staticmethod
177 def _generate_etag(data):
178 return hashlib.md5(data).hexdigest()
179
180 def put_object(
181 self,
182 key: str,
183 data: bytearray,
184 user_defined_meta: Mapping[str, str] | None = None,
185 last_modified: dt.datetime | str | None = None,
186 ):
187 key_path = pathlib.Path(key)
188 if last_modified is None:
189 # Timezone is needed for RFC 3339 timeformat used by S3
190 last_modified = dt.datetime.now().replace(tzinfo=tz.tzlocal()).isoformat()
191 elif isinstance(last_modified, dt.datetime):
192 last_modified = last_modified.isoformat()
193
194 meta = {
195 'Key': str(key_path),
196 'ETag': self._generate_etag(data),
197 'Last-Modified': last_modified,
198 'Size': str(sys.getsizeof(data)),
199 }
200
201 if user_defined_meta:
202 meta.update(user_defined_meta)
203
204 self._storage[key_path] = S3Object(data, meta)
205 return meta
206
207 def get_object(self, key: str) -> S3Object | None:
208 key_path = pathlib.Path(key)
209 return self._storage.get(key_path)
210
211 def get_objects(self, parent_dir='') -> dict[str, S3Object]:
212 all_objects = {str(key_path): value for key_path, value in self._storage.items()}
213
214 if not parent_dir:
215 return all_objects
216
217 return {key: value for key, value in all_objects.items() if key.startswith(str(pathlib.Path(parent_dir)))}
218
219 def delete_object(self, key) -> S3Object | None:
220 key = pathlib.Path(key)
221 if key not in self._storage:
222 return None
223 return self._storage.pop(key)
224
225
227 _s3_xml_nss = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
228
229 def __init__(self, mockserver, s3_mock_storage, mock_base_url):
230 self._mockserver = mockserver
231 self._base_url = mock_base_url
232 self._storage = s3_mock_storage
233 self._uploads = collections.defaultdict(_S3BucketUploadStorage)
234
235 def _get_bucket_name(self, request):
236 return request.headers['Host'].split('.')[0]
237
238 def _extract_key(self, request):
239 return request.path[len(self._base_url) + 1 :]
240
241 def _generate_get_objects_result(
242 self,
243 s3_objects_dict: dict[str, S3Object],
244 max_keys: int,
245 marker: str | None,
246 ):
247 empty_result = {'result_objects': [], 'is_truncated': False}
248 keys = list(s3_objects_dict.keys())
249 if not keys:
250 return empty_result
251
252 from_index = 0
253 if marker:
254 if marker > keys[-1]:
255 return empty_result
256 for i, key in enumerate(keys):
257 if key > marker:
258 from_index = i
259 break
260
261 result_objects = [s3_objects_dict[key] for key in keys[from_index : from_index + max_keys]]
262 is_truncated = from_index + max_keys >= len(keys)
263 return {'result_objects': result_objects, 'is_truncated': is_truncated}
264
265 def _generate_get_objects_xml(
266 self,
267 s3_objects: list[S3Object],
268 bucket_name: str,
269 prefix: str,
270 max_keys: int | None,
271 marker: str | None,
272 is_truncated: bool,
273 ):
274 contents = ''
275 for s3_object in s3_objects:
276 contents += f"""
277 <Contents>
278 <ETag>{s3_object.meta['ETag']}</ETag>
279 <Key>{s3_object.meta['Key']}</Key>
280 <LastModified>{s3_object.meta['Last-Modified']}</LastModified>
281 <Size>{s3_object.meta['Size']}</Size>
282 <StorageClass>STANDARD</StorageClass>
283 </Contents>
284 """
285 return f"""
286 <?xml version="1.0" encoding="UTF-8"?>
287 <ListBucketResult>
288 <Name>{bucket_name}</Name>
289 <Prefix>{prefix}</Prefix>
290 <Marker>{marker or ''}</Marker>
291 <MaxKeys>{max_keys or ''}</MaxKeys>
292 <IsTruncated>{is_truncated}</IsTruncated>
293 {contents}
294 </ListBucketResult>
295 """
296
297 @staticmethod
298 def _generate_error_response_xml(code: str, message: str, resource: str):
299 return (
300 '<?xml version="1.0" encoding="UTF-8"?>'
301 '<Error>'
302 f'<Code>{code}</Code>'
303 f'<Message>{message}</Message>'
304 f'<Resource>{resource}</Resource>'
305 f'<RequestId>{os.urandom(15).hex()}</RequestId>'
306 '</Error>'
307 )
308
309 @staticmethod
310 def _parse_complete_multipart_xml_body(request_body: str):
311 xml_root_node = xml.etree.ElementTree.fromstring(request_body)
312 if xml_root_node is None or xml_root_node.tag != f'{{{S3HandleMock._s3_xml_nss["s3"]}}}CompleteMultipartUpload':
313 raise _S3ClientError('missing CompleteMultipartUpload in request body')
314
315 parts_to_complete = []
316 for xml_part in xml_root_node.findall('s3:Part', S3HandleMock._s3_xml_nss):
317 xml_part_number = xml_part.find('s3:PartNumber', S3HandleMock._s3_xml_nss)
318 if xml_part_number is None or not xml_part_number.text:
319 raise _S3ClientError('missing CompleteMultipartUpload.Part.PartNumber')
320 part_number_value = int(xml_part_number.text)
321
322 xml_etag = xml_part.find('s3:ETag', S3HandleMock._s3_xml_nss)
323 if xml_etag is None or not xml_etag.text:
324 raise _S3ClientError('missing CompleteMultipartUpload.Part.ETag')
325
326 parts_to_complete.append({'ETag': xml_etag.text, 'PartNumber': part_number_value})
327
328 return parts_to_complete
329
330 def get_object(self, request):
331 key = self._extract_key(request)
332
333 bucket_storage = self._storage[self._get_bucket_name(request)]
334
335 s3_object = bucket_storage.get_object(key)
336 if not s3_object:
337 return self._mockserver.make_response('Object not found', 404)
338 return self._mockserver.make_response(
339 s3_object.data,
340 200,
341 headers=s3_object.meta,
342 )
343
344 def put_object(self, request):
345 key = self._extract_key(request)
346
347 bucket_storage = self._storage[self._get_bucket_name(request)]
348
349 data = request.get_data()
350
351 user_defined_meta = {}
352 for meta_key, meta_value in request.headers.items():
353 # https://docs.amazonaws.cn/en_us/AmazonS3/latest/userguide/UsingMetadata.html
354 if meta_key.startswith('x-amz-meta-') or meta_key in ['Content-Type', 'Content-Disposition']:
355 user_defined_meta[meta_key] = meta_value
356
357 meta = bucket_storage.put_object(key, data, user_defined_meta)
358 # Some clients like AWS SDK for C++ parse not empty body as XML
359 return self._mockserver.make_response('', 200, headers=meta)
360
361 def copy_object(self, request):
362 key = self._extract_key(request)
363 dest_bucket_name = self._get_bucket_name(request)
364 source_bucket_name, source_key = request.headers.get(
365 'x-amz-copy-source',
366 ).split('/', 2)[1:3]
367
368 src_bucket_storage = self._storage[source_bucket_name]
369 dst_bucket_storage = self._storage[dest_bucket_name]
370
371 src_obj = src_bucket_storage.get_object(source_key)
372 src_data = src_obj.data
373 src_meta = src_obj.meta
374 meta = dst_bucket_storage.put_object(key, src_data, src_meta)
375 # Some clients like AWS SDK for C++ parse not empty body as XML
376 return self._mockserver.make_response('', 200, headers=meta)
377
378 def get_objects(self, request):
379 prefix = request.query['prefix']
380 # 1000 is the default value specified by aws spec
381 max_keys = int(request.query.get('max-keys', 1000))
382 marker = request.query.get('marker')
383
384 bucket_name = self._get_bucket_name(request)
385 bucket_storage = self._storage[bucket_name]
386
387 s3_objects_dict = bucket_storage.get_objects(parent_dir=prefix)
388 result = self._generate_get_objects_result(
389 s3_objects_dict=s3_objects_dict,
390 max_keys=max_keys,
391 marker=marker,
392 )
393 result_xml = self._generate_get_objects_xml(
394 s3_objects=result['result_objects'],
395 bucket_name=bucket_name,
396 prefix=prefix,
397 max_keys=max_keys,
398 marker=marker,
399 is_truncated=result['is_truncated'],
400 )
401 return self._mockserver.make_response(result_xml, 200)
402
403 def delete_object(self, request):
404 key = self._extract_key(request)
405
406 bucket_storage = self._storage[self._get_bucket_name(request)]
407
408 bucket_storage.delete_object(key)
409 # S3 always return 204, even if file doesn't exist
410 # Some clients like AWS SDK for C++ parse not empty body as XML
411 return self._mockserver.make_response('', 204)
412
413 def get_object_head(self, request):
414 key = self._extract_key(request)
415
416 bucket_storage = self._storage[self._get_bucket_name(request)]
417
418 s3_object = bucket_storage.get_object(key)
419 if not s3_object:
420 return self._mockserver.make_response('Object not found', 404)
421 # Some clients like AWS SDK for C++ parse not empty body as XML
422 return self._mockserver.make_response(
423 '',
424 200,
425 headers=s3_object.meta,
426 )
427
428 def create_multipart_upload(self, request):
429 key = self._extract_key(request)
430 bucket_name = self._get_bucket_name(request)
431 bucket_uploads = self._uploads[bucket_name]
432
433 user_defined_meta = {}
434 for meta_key, meta_value in request.headers.items():
435 # https://docs.amazonaws.cn/en_us/AmazonS3/latest/userguide/UsingMetadata.html
436 if meta_key.startswith('x-amz-meta-') or meta_key in ['Content-Type', 'Content-Disposition']:
437 user_defined_meta[meta_key] = meta_value
438
439 meta = bucket_uploads.create_multipart_upload(key, user_defined_meta)
440 response_body = (
441 '<?xml version="1.0" encoding="UTF-8"?>'
442 '<InitiateMultipartUploadResult>'
443 f'<Bucket>{bucket_name}</Bucket>'
444 f'<Key>{key}</Key>'
445 f'<UploadId>{meta["UploadId"]}</UploadId>'
446 '</InitiateMultipartUploadResult>'
447 )
448 # Some clients like AWS SDK for C++ parse not empty body as XML
449 return self._mockserver.make_response(response_body, 200)
450
451 def abort_multipart_upload(self, request):
452 key = self._extract_key(request)
453 upload_id = request.query['uploadId']
454 bucket_uploads = self._uploads[self._get_bucket_name(request)]
455 try:
456 bucket_uploads.abort_multipart_uplod(key, upload_id)
457 except _S3NoSuchUploadError as exc:
458 # https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
459 # #API_AbortMultipartUpload_Errors
460 response_body = S3HandleMock._generate_error_response_xml(
461 exc.code, exc.message, f'{request.path}?uploadId={upload_id}'
462 )
463 return self._mockserver.make_response(response_body, 404)
464
465 # Some clients like AWS SDK for C++ parse not empty body as XML
466 return self._mockserver.make_response('', 204)
467
468 def upload_part(self, request):
469 key = self._extract_key(request)
470 bucket_name = self._get_bucket_name(request)
471 upload_id = request.query['uploadId']
472 part_number = int(request.query['partNumber'])
473 bucket_uploads = self._uploads[bucket_name]
474 data = request.get_data()
475 try:
476 upload_part = bucket_uploads.upload_part(key, upload_id, part_number, data)
477 except _S3ClientError as exc:
478 return self._mockserver.make_response(str(exc), 400)
479 except _S3NoSuchUploadError as exc:
480 # https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
481 response_body = S3HandleMock._generate_error_response_xml(
482 exc.code,
483 exc.message,
484 f'{request.path}?uploadId={upload_id}',
485 )
486 return self._mockserver.make_response(response_body, 404)
487
488 return self._mockserver.make_response(status=200, headers={'ETag': upload_part.meta['ETag']})
489
490 def complete_multipart_upload(self, request):
491 key = self._extract_key(request)
492 bucket_name = self._get_bucket_name(request)
493 bucket_uploads = self._uploads[bucket_name]
494 bucket_storage = self._storage[bucket_name]
495 upload_id = request.query['uploadId']
496 try:
497 parts_to_complete = S3HandleMock._parse_complete_multipart_xml_body(request.get_data().decode())
498 completed_upload = bucket_uploads.complete_multipart_upload(key, upload_id, parts_to_complete)
499 except _S3NoSuchUploadError as exc:
500 # https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
501 response_body = S3HandleMock._generate_error_response_xml(
502 exc.code,
503 exc.message,
504 f'{request.path}?uploadId={upload_id}',
505 )
506 return self._mockserver.make_response(response_body, 404)
507 except (_S3InvalidPartError, _S3InvalidPartOrderError, _S3EntityTooSmallError) as exc:
508 # https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
509 response_body = S3HandleMock._generate_error_response_xml(
510 exc.code,
511 exc.message,
512 f'{request.path}?uploadId={upload_id}',
513 )
514 return self._mockserver.make_response(response_body, 400)
515 except _S3ClientError as exc:
516 return self._mockserver.make_response(str(exc), 400)
517
518 meta = bucket_storage.put_object(key, completed_upload['Data'], completed_upload['Upload'].meta)
519 response_body = (
520 '<?xml version="1.0" encoding="UTF-8"?>'
521 '<CompleteMultipartUploadResult>'
522 f'<Location>{request.path}</Location>'
523 f'<Bucket>{bucket_name}</Bucket>'
524 f'<Key>{key}</Key>'
525 f'<ETag>{meta["ETag"]}</ETag>'
526 '</CompleteMultipartUploadResult>'
527 )
528 return self._mockserver.make_response(
529 response_body,
530 status=200,
531 headers=meta,
532 )