userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/s3api.py Source File
Loading...
Searching...
No Matches
s3api.py
1import dataclasses
2import datetime as dt
3import hashlib
4import pathlib
5import sys
6from typing import Dict
7from typing import List
8from typing import Mapping
9from typing import Optional
10from typing import Union
11
12import dateutil.tz as tz
13
14
15@dataclasses.dataclass
17 data: bytearray
18 meta: Mapping[str, str]
19
20
22 def __init__(self):
23 # use Path to normalize keys (e.g. /a//file.json)
24 self._storage: Dict[pathlib.Path, S3Object] = {}
25
26 @staticmethod
27 def _generate_etag(data):
28 return hashlib.md5(data).hexdigest()
29
30 def put_object(
31 self,
32 key: str,
33 data: bytearray,
34 user_defined_meta: Mapping[str, str] = {},
35 last_modified: Optional[Union[dt.datetime, str]] = None,
36 ):
37 key_path = pathlib.Path(key)
38 if last_modified is None:
39 # Timezone is needed for RFC 3339 timeformat used by S3
40 last_modified = dt.datetime.now().replace(tzinfo=tz.tzlocal()).isoformat()
41 elif isinstance(last_modified, dt.datetime):
42 last_modified = last_modified.isoformat()
43
44 meta = {
45 'Key': str(key_path),
46 'ETag': self._generate_etag(data),
47 'Last-Modified': last_modified,
48 'Size': str(sys.getsizeof(data)),
49 }
50
51 meta.update(user_defined_meta)
52
53 self._storage[key_path] = S3Object(data, meta)
54 return meta
55
56 def get_object(self, key: str) -> Optional[S3Object]:
57 key_path = pathlib.Path(key)
58 return self._storage.get(key_path)
59
60 def get_objects(self, parent_dir='') -> Dict[str, S3Object]:
61 all_objects = {str(key_path): value for key_path, value in self._storage.items()}
62
63 if not parent_dir:
64 return all_objects
65
66 return {key: value for key, value in all_objects.items() if key.startswith(str(pathlib.Path(parent_dir)))}
67
68 def delete_object(self, key) -> Optional[S3Object]:
69 key = pathlib.Path(key)
70 if key not in self._storage:
71 return None
72 return self._storage.pop(key)
73
74
76 def __init__(self, mockserver, s3_mock_storage, mock_base_url):
77 self._mockserver = mockserver
78 self._base_url = mock_base_url
79 self._storage = s3_mock_storage
80
81 def _get_bucket_name(self, request):
82 return request.headers['Host'].split('.')[0]
83
84 def _extract_key(self, request):
85 return request.path[len(self._base_url) + 1 :]
86
87 def _generate_get_objects_result(
88 self,
89 s3_objects_dict: Dict[str, S3Object],
90 max_keys: int,
91 marker: Optional[str],
92 ):
93 empty_result = {'result_objects': [], 'is_truncated': False}
94 keys = list(s3_objects_dict.keys())
95 if not keys:
96 return empty_result
97
98 from_index = 0
99 if marker:
100 if marker > keys[-1]:
101 return empty_result
102 for i, key in enumerate(keys):
103 if key > marker:
104 from_index = i
105 break
106
107 result_objects = [s3_objects_dict[key] for key in keys[from_index : from_index + max_keys]]
108 is_truncated = from_index + max_keys >= len(keys)
109 return {'result_objects': result_objects, 'is_truncated': is_truncated}
110
111 def _generate_get_objects_xml(
112 self,
113 s3_objects: List[S3Object],
114 bucket_name: str,
115 prefix: str,
116 max_keys: Optional[int],
117 marker: Optional[str],
118 is_truncated: bool,
119 ):
120 contents = ''
121 for s3_object in s3_objects:
122 contents += f"""
123 <Contents>
124 <ETag>{s3_object.meta['ETag']}</ETag>
125 <Key>{s3_object.meta['Key']}</Key>
126 <LastModified>{s3_object.meta['Last-Modified']}</LastModified>
127 <Size>{s3_object.meta['Size']}</Size>
128 <StorageClass>STANDARD</StorageClass>
129 </Contents>
130 """
131 return f"""
132 <?xml version="1.0" encoding="UTF-8"?>
133 <ListBucketResult>
134 <Name>{bucket_name}</Name>
135 <Prefix>{prefix}</Prefix>
136 <Marker>{marker or ''}</Marker>
137 <MaxKeys>{max_keys or ''}</MaxKeys>
138 <IsTruncated>{is_truncated}</IsTruncated>
139 {contents}
140 </ListBucketResult>
141 """
142
143 def get_object(self, request):
144 key = self._extract_key(request)
145
146 bucket_storage = self._storage[self._get_bucket_name(request)]
147
148 s3_object = bucket_storage.get_object(key)
149 if not s3_object:
150 return self._mockserver.make_response('Object not found', 404)
151 return self._mockserver.make_response(
152 s3_object.data,
153 200,
154 headers=s3_object.meta,
155 )
156
157 def put_object(self, request):
158 key = self._extract_key(request)
159
160 bucket_storage = self._storage[self._get_bucket_name(request)]
161
162 data = request.get_data()
163
164 user_defined_meta = {}
165 for meta_key, meta_value in request.headers.items():
166 # https://docs.amazonaws.cn/en_us/AmazonS3/latest/userguide/UsingMetadata.html
167 if meta_key.startswith('x-amz-meta-'):
168 user_defined_meta[meta_key] = meta_value
169
170 meta = bucket_storage.put_object(key, data, user_defined_meta)
171 # Some clients like AWS SDK for C++ parse not empty body as XML
172 return self._mockserver.make_response('', 200, headers=meta)
173
174 def copy_object(self, request):
175 key = self._extract_key(request)
176 dest_bucket_name = self._get_bucket_name(request)
177 source_bucket_name, source_key = request.headers.get(
178 'x-amz-copy-source',
179 ).split('/', 2)[1:3]
180
181 src_bucket_storage = self._storage[source_bucket_name]
182 dst_bucket_storage = self._storage[dest_bucket_name]
183
184 src_obj = src_bucket_storage.get_object(source_key)
185 src_data = src_obj.data
186 src_meta = src_obj.meta
187 meta = dst_bucket_storage.put_object(key, src_data, src_meta)
188 # Some clients like AWS SDK for C++ parse not empty body as XML
189 return self._mockserver.make_response('', 200, headers=meta)
190
191 def get_objects(self, request):
192 prefix = request.query['prefix']
193 # 1000 is the default value specified by aws spec
194 max_keys = int(request.query.get('max-keys', 1000))
195 marker = request.query.get('marker')
196
197 bucket_name = self._get_bucket_name(request)
198 bucket_storage = self._storage[bucket_name]
199
200 s3_objects_dict = bucket_storage.get_objects(parent_dir=prefix)
201 result = self._generate_get_objects_result(
202 s3_objects_dict=s3_objects_dict,
203 max_keys=max_keys,
204 marker=marker,
205 )
206 result_xml = self._generate_get_objects_xml(
207 s3_objects=result['result_objects'],
208 bucket_name=bucket_name,
209 prefix=prefix,
210 max_keys=max_keys,
211 marker=marker,
212 is_truncated=result['is_truncated'],
213 )
214 return self._mockserver.make_response(result_xml, 200)
215
216 def delete_object(self, request):
217 key = self._extract_key(request)
218
219 bucket_storage = self._storage[self._get_bucket_name(request)]
220
221 bucket_storage.delete_object(key)
222 # S3 always return 204, even if file doesn't exist
223 # Some clients like AWS SDK for C++ parse not empty body as XML
224 return self._mockserver.make_response('', 204)
225
226 def get_object_head(self, request):
227 key = self._extract_key(request)
228
229 bucket_storage = self._storage[self._get_bucket_name(request)]
230
231 s3_object = bucket_storage.get_object(key)
232 if not s3_object:
233 return self._mockserver.make_response('Object not found', 404)
234 # Some clients like AWS SDK for C++ parse not empty body as XML
235 return self._mockserver.make_response(
236 '',
237 200,
238 headers=s3_object.meta,
239 )