74 self.
_storage: dict[str, _S3Upload] = {}
77 def _generate_upload_id():
78 return os.urandom(15).hex()
81 def _generate_etag(data):
82 return hashlib.md5(data).hexdigest()
84 def create_multipart_upload(self, key: str, user_defined_meta: Mapping[str, str] |
None =
None):
85 key_path = pathlib.Path(key)
86 upload_id = _S3BucketUploadStorage._generate_upload_id()
90 'UploadId': upload_id,
94 upload_meta.update(user_defined_meta)
99 def abort_multipart_uplod(self, key: str, upload_id: str):
100 key_path = pathlib.Path(key)
101 upload = self.
_storage.get(upload_id)
102 if not upload
or upload.meta[
'Key'] != str(key_path):
112 last_modified: dt.datetime | str |
None =
None,
114 if part_number < 1
or part_number > 10000:
115 raise _S3ClientError(
'partNumber value is expected to be between 1 and 10000')
117 key_path = pathlib.Path(key)
118 upload = self.
_storage.get(upload_id)
119 if not upload
or upload.meta[
'Key'] != str(key_path):
122 if last_modified
is None:
124 last_modified = dt.datetime.now().replace(tzinfo=tz.tzlocal()).isoformat()
125 elif isinstance(last_modified, dt.datetime):
126 last_modified = last_modified.isoformat()
130 'Last-Modified': last_modified,
131 'Size': str(sys.getsizeof(data)),
135 upload.parts[part_number] = new_part
138 def complete_multipart_upload(self, key: str, upload_id: str, parts_to_complete: list):
139 key_path = pathlib.Path(key)
140 upload = self.
_storage.get(upload_id)
142 if not upload
or upload.meta[
'Key'] != str(key_path):
145 uploaded_parts = sorted(
146 ({
'PartNumber': part_number,
'ETag': info.meta[
'ETag']}
for part_number, info
in upload.parts.items()),
147 key=
lambda item: item[
'PartNumber'],
149 if uploaded_parts != parts_to_complete:
152 merged_data = bytearray()
153 for part
in parts_to_complete:
154 part_number = part[
'PartNumber']
155 uploded_part = upload.parts[part_number]
156 merged_data += uploded_part.data
162 return {
'Data': merged_data,
'Upload': upload}
165@dataclasses.dataclass
172 def __init__(self) -> None:
174 self.
_storage: dict[pathlib.Path, S3Object] = {}
177 def _generate_etag(data):
178 return hashlib.md5(data).hexdigest()
184 user_defined_meta: Mapping[str, str] |
None =
None,
185 last_modified: dt.datetime | str |
None =
None,
187 key_path = pathlib.Path(key)
188 if last_modified
is None:
190 last_modified = dt.datetime.now().replace(tzinfo=tz.tzlocal()).isoformat()
191 elif isinstance(last_modified, dt.datetime):
192 last_modified = last_modified.isoformat()
195 'Key': str(key_path),
197 'Last-Modified': last_modified,
198 'Size': str(sys.getsizeof(data)),
201 if user_defined_meta:
202 meta.update(user_defined_meta)
207 def get_object(self, key: str) -> S3Object |
None:
208 key_path = pathlib.Path(key)
211 def get_objects(self, parent_dir='') -> dict[str, S3Object]:
212 all_objects = {str(key_path): value
for key_path, value
in self.
_storage.items()}
217 return {key: value
for key, value
in all_objects.items()
if key.startswith(str(pathlib.Path(parent_dir)))}
219 def delete_object(self, key) -> S3Object | None:
220 key = pathlib.Path(key)
227 _s3_xml_nss = {
's3':
'http://s3.amazonaws.com/doc/2006-03-01/'}
229 def __init__(self, mockserver, s3_mock_storage, mock_base_url):
233 self.
_uploads = collections.defaultdict(_S3BucketUploadStorage)
235 def _get_bucket_name(self, request):
236 return request.headers[
'Host'].split(
'.')[0]
238 def _extract_key(self, request):
239 return request.path[len(self.
_base_url) + 1 :]
241 def _generate_get_objects_result(
243 s3_objects_dict: dict[str, S3Object],
247 empty_result = {
'result_objects': [],
'is_truncated':
False}
248 keys = list(s3_objects_dict.keys())
254 if marker > keys[-1]:
256 for i, key
in enumerate(keys):
261 result_objects = [s3_objects_dict[key]
for key
in keys[from_index : from_index + max_keys]]
262 is_truncated = from_index + max_keys >= len(keys)
263 return {
'result_objects': result_objects,
'is_truncated': is_truncated}
265 def _generate_get_objects_xml(
267 s3_objects: list[S3Object],
270 max_keys: int |
None,
275 for s3_object
in s3_objects:
278 <ETag>{s3_object.meta['ETag']}</ETag>
279 <Key>{s3_object.meta['Key']}</Key>
280 <LastModified>{s3_object.meta['Last-Modified']}</LastModified>
281 <Size>{s3_object.meta['Size']}</Size>
282 <StorageClass>STANDARD</StorageClass>
286 <?xml version="1.0" encoding="UTF-8"?>
288 <Name>{bucket_name}</Name>
289 <Prefix>{prefix}</Prefix>
290 <Marker>{marker or ''}</Marker>
291 <MaxKeys>{max_keys or ''}</MaxKeys>
292 <IsTruncated>{is_truncated}</IsTruncated>
298 def _generate_error_response_xml(code: str, message: str, resource: str):
300 '<?xml version="1.0" encoding="UTF-8"?>'
302 f
'<Code>{code}</Code>'
303 f
'<Message>{message}</Message>'
304 f
'<Resource>{resource}</Resource>'
305 f
'<RequestId>{os.urandom(15).hex()}</RequestId>'
310 def _parse_complete_multipart_xml_body(request_body: str):
311 xml_root_node = xml.etree.ElementTree.fromstring(request_body)
312 if xml_root_node
is None or xml_root_node.tag != f
'{{{S3HandleMock._s3_xml_nss["s3"]}}}CompleteMultipartUpload':
313 raise _S3ClientError(
'missing CompleteMultipartUpload in request body')
315 parts_to_complete = []
316 for xml_part
in xml_root_node.findall(
's3:Part', S3HandleMock._s3_xml_nss):
317 xml_part_number = xml_part.find(
's3:PartNumber', S3HandleMock._s3_xml_nss)
318 if xml_part_number
is None or not xml_part_number.text:
319 raise _S3ClientError(
'missing CompleteMultipartUpload.Part.PartNumber')
320 part_number_value = int(xml_part_number.text)
322 xml_etag = xml_part.find(
's3:ETag', S3HandleMock._s3_xml_nss)
323 if xml_etag
is None or not xml_etag.text:
326 parts_to_complete.append({
'ETag': xml_etag.text,
'PartNumber': part_number_value})
328 return parts_to_complete
330 def get_object(self, request):
335 s3_object = bucket_storage.get_object(key)
337 return self.
_mockserver.make_response(
'Object not found', 404)
341 headers=s3_object.meta,
344 def put_object(self, request):
349 data = request.get_data()
351 user_defined_meta = {}
352 for meta_key, meta_value
in request.headers.items():
354 if meta_key.startswith(
'x-amz-meta-')
or meta_key
in [
'Content-Type',
'Content-Disposition']:
355 user_defined_meta[meta_key] = meta_value
357 meta = bucket_storage.put_object(key, data, user_defined_meta)
359 return self.
_mockserver.make_response(
'', 200, headers=meta)
361 def copy_object(self, request):
364 source_bucket_name, source_key = request.headers.get(
368 src_bucket_storage = self.
_storage[source_bucket_name]
369 dst_bucket_storage = self.
_storage[dest_bucket_name]
371 src_obj = src_bucket_storage.get_object(source_key)
372 src_data = src_obj.data
373 src_meta = src_obj.meta
374 meta = dst_bucket_storage.put_object(key, src_data, src_meta)
376 return self.
_mockserver.make_response(
'', 200, headers=meta)
378 def get_objects(self, request):
379 prefix = request.query[
'prefix']
381 max_keys = int(request.query.get(
'max-keys', 1000))
382 marker = request.query.get(
'marker')
385 bucket_storage = self.
_storage[bucket_name]
387 s3_objects_dict = bucket_storage.get_objects(parent_dir=prefix)
389 s3_objects_dict=s3_objects_dict,
394 s3_objects=result[
'result_objects'],
395 bucket_name=bucket_name,
399 is_truncated=result[
'is_truncated'],
401 return self.
_mockserver.make_response(result_xml, 200)
403 def delete_object(self, request):
408 bucket_storage.delete_object(key)
413 def get_object_head(self, request):
418 s3_object = bucket_storage.get_object(key)
420 return self.
_mockserver.make_response(
'Object not found', 404)
425 headers=s3_object.meta,
428 def create_multipart_upload(self, request):
431 bucket_uploads = self.
_uploads[bucket_name]
433 user_defined_meta = {}
434 for meta_key, meta_value
in request.headers.items():
436 if meta_key.startswith(
'x-amz-meta-')
or meta_key
in [
'Content-Type',
'Content-Disposition']:
437 user_defined_meta[meta_key] = meta_value
439 meta = bucket_uploads.create_multipart_upload(key, user_defined_meta)
441 '<?xml version="1.0" encoding="UTF-8"?>'
442 '<InitiateMultipartUploadResult>'
443 f
'<Bucket>{bucket_name}</Bucket>'
445 f
'<UploadId>{meta["UploadId"]}</UploadId>'
446 '</InitiateMultipartUploadResult>'
449 return self.
_mockserver.make_response(response_body, 200)
451 def abort_multipart_upload(self, request):
453 upload_id = request.query[
'uploadId']
456 bucket_uploads.abort_multipart_uplod(key, upload_id)
457 except _S3NoSuchUploadError
as exc:
460 response_body = S3HandleMock._generate_error_response_xml(
461 exc.code, exc.message, f
'{request.path}?uploadId={upload_id}'
463 return self.
_mockserver.make_response(response_body, 404)
468 def upload_part(self, request):
471 upload_id = request.query[
'uploadId']
472 part_number = int(request.query[
'partNumber'])
473 bucket_uploads = self.
_uploads[bucket_name]
474 data = request.get_data()
476 upload_part = bucket_uploads.upload_part(key, upload_id, part_number, data)
477 except _S3ClientError
as exc:
478 return self.
_mockserver.make_response(str(exc), 400)
479 except _S3NoSuchUploadError
as exc:
481 response_body = S3HandleMock._generate_error_response_xml(
484 f
'{request.path}?uploadId={upload_id}',
486 return self.
_mockserver.make_response(response_body, 404)
488 return self.
_mockserver.make_response(status=200, headers={
'ETag': upload_part.meta[
'ETag']})
490 def complete_multipart_upload(self, request):
493 bucket_uploads = self.
_uploads[bucket_name]
494 bucket_storage = self.
_storage[bucket_name]
495 upload_id = request.query[
'uploadId']
497 parts_to_complete = S3HandleMock._parse_complete_multipart_xml_body(request.get_data().decode())
498 completed_upload = bucket_uploads.complete_multipart_upload(key, upload_id, parts_to_complete)
499 except _S3NoSuchUploadError
as exc:
501 response_body = S3HandleMock._generate_error_response_xml(
504 f
'{request.path}?uploadId={upload_id}',
506 return self.
_mockserver.make_response(response_body, 404)
507 except (_S3InvalidPartError, _S3InvalidPartOrderError, _S3EntityTooSmallError)
as exc:
509 response_body = S3HandleMock._generate_error_response_xml(
512 f
'{request.path}?uploadId={upload_id}',
514 return self.
_mockserver.make_response(response_body, 400)
515 except _S3ClientError
as exc:
516 return self.
_mockserver.make_response(str(exc), 400)
518 meta = bucket_storage.put_object(key, completed_upload[
'Data'], completed_upload[
'Upload'].meta)
520 '<?xml version="1.0" encoding="UTF-8"?>'
521 '<CompleteMultipartUploadResult>'
522 f
'<Location>{request.path}</Location>'
523 f
'<Bucket>{bucket_name}</Bucket>'
525 f
'<ETag>{meta["ETag"]}</ETag>'
526 '</CompleteMultipartUploadResult>'