userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/s3api.py Source File
Loading...
Searching...
No Matches
s3api.py
1import dataclasses
2import datetime as dt
3import hashlib
4import pathlib
5import sys
6from typing import Dict
7from typing import List
8from typing import Mapping
9from typing import Optional
10from typing import Union
11
12import dateutil.tz as tz
13
14
15@dataclasses.dataclass
17 data: bytearray
18 meta: Mapping[str, str]
19
20
22 def __init__(self):
23 # use Path to normalize keys (e.g. /a//file.json)
24 self._storage: Dict[pathlib.Path, S3Object] = {}
25
26 @staticmethod
27 def _generate_etag(data):
28 return hashlib.md5(data).hexdigest()
29
30 def put_object(
31 self,
32 key: str,
33 data: bytearray,
34 user_defined_meta: Mapping[str, str] = {},
35 last_modified: Optional[Union[dt.datetime, str]] = None,
36 ):
37 key_path = pathlib.Path(key)
38 if last_modified is None:
39 # Timezone is needed for RFC 3339 timeformat used by S3
40 last_modified = dt.datetime.now().replace(tzinfo=tz.tzlocal()).isoformat()
41 elif isinstance(last_modified, dt.datetime):
42 last_modified = last_modified.isoformat()
43
44 meta = {
45 'Key': str(key_path),
46 'ETag': self._generate_etag(data),
47 'Last-Modified': last_modified,
48 'Size': str(sys.getsizeof(data)),
49 }
50
51 meta.update(user_defined_meta)
52
53 self._storage[key_path] = S3Object(data, meta)
54 return meta
55
56 def get_object(self, key: str) -> Optional[S3Object]:
57 key_path = pathlib.Path(key)
58 return self._storage.get(key_path)
59
60 def get_objects(self, parent_dir='') -> Dict[str, S3Object]:
61 all_objects = {str(key_path): value for key_path, value in self._storage.items()}
62
63 if not parent_dir:
64 return all_objects
65
66 return {key: value for key, value in all_objects.items() if key.startswith(str(pathlib.Path(parent_dir)))}
67
68 def delete_object(self, key) -> Optional[S3Object]:
69 key = pathlib.Path(key)
70 if key not in self._storage:
71 return None
72 return self._storage.pop(key)
73
74
76 def __init__(self, mockserver, s3_mock_storage, mock_base_url):
77 self._mockserver = mockserver
78 self._base_url = mock_base_url
79 self._storage = s3_mock_storage
80
81 def _get_bucket_name(self, request):
82 return request.headers['Host'].split('.')[0]
83
84 def _extract_key(self, request):
85 return request.path[len(self._base_url) + 1 :]
86
87 def _generate_get_objects_result(
88 self,
89 s3_objects_dict: Dict[str, S3Object],
90 max_keys: int,
91 marker: Optional[str],
92 ):
93 empty_result = {'result_objects': [], 'is_truncated': False}
94 keys = list(s3_objects_dict.keys())
95 if not keys:
96 return empty_result
97
98 from_index = 0
99 if marker:
100 if marker > keys[-1]:
101 return empty_result
102 for i, key in enumerate(keys):
103 if key > marker:
104 from_index = i
105 break
106
107 result_objects = [s3_objects_dict[key] for key in keys[from_index : from_index + max_keys]]
108 is_truncated = from_index + max_keys >= len(keys)
109 return {'result_objects': result_objects, 'is_truncated': is_truncated}
110
111 def _generate_get_objects_xml(
112 self,
113 s3_objects: List[S3Object],
114 bucket_name: str,
115 prefix: str,
116 max_keys: Optional[int],
117 marker: Optional[str],
118 is_truncated: bool,
119 ):
120 contents = ''
121 for s3_object in s3_objects:
122 contents += f"""
123 <Contents>
124 <ETag>{s3_object.meta['ETag']}</ETag>
125 <Key>{s3_object.meta['Key']}</Key>
126 <LastModified>{s3_object.meta['Last-Modified']}</LastModified>
127 <Size>{s3_object.meta['Size']}</Size>
128 <StorageClass>STANDARD</StorageClass>
129 </Contents>
130 """
131 return f"""
132 <?xml version="1.0" encoding="UTF-8"?>
133 <ListBucketResult>
134 <Name>{bucket_name}</Name>
135 <Prefix>{prefix}</Prefix>
136 <Marker>{marker or ''}</Marker>
137 <MaxKeys>{max_keys or ''}</MaxKeys>
138 <IsTruncated>{is_truncated}</IsTruncated>
139 {contents}
140 </ListBucketResult>
141 """
142
143 def get_object(self, request):
144 key = self._extract_key(request)
145
146 bucket_storage = self._storage[self._get_bucket_name(request)]
147
148 s3_object = bucket_storage.get_object(key)
149 if not s3_object:
150 return self._mockserver.make_response('Object not found', 404)
151 return self._mockserver.make_response(
152 s3_object.data,
153 200,
154 headers=s3_object.meta,
155 )
156
157 def put_object(self, request):
158 key = self._extract_key(request)
159
160 bucket_storage = self._storage[self._get_bucket_name(request)]
161
162 data = request.get_data()
163
164 user_defined_meta = {}
165 for meta_key, meta_value in request.headers.items():
166 # https://docs.amazonaws.cn/en_us/AmazonS3/latest/userguide/UsingMetadata.html
167 if meta_key.startswith('x-amz-meta-'):
168 user_defined_meta[meta_key] = meta_value
169
170 meta = bucket_storage.put_object(key, data, user_defined_meta)
171 return self._mockserver.make_response('OK', 200, headers=meta)
172
173 def copy_object(self, request):
174 key = self._extract_key(request)
175 dest_bucket_name = self._get_bucket_name(request)
176 source_bucket_name, source_key = request.headers.get(
177 'x-amz-copy-source',
178 ).split('/', 2)[1:3]
179
180 src_bucket_storage = self._storage[source_bucket_name]
181 dst_bucket_storage = self._storage[dest_bucket_name]
182
183 src_obj = src_bucket_storage.get_object(source_key)
184 src_data = src_obj.data
185 src_meta = src_obj.meta
186 meta = dst_bucket_storage.put_object(key, src_data, src_meta)
187 return self._mockserver.make_response('OK', 200, headers=meta)
188
189 def get_objects(self, request):
190 prefix = request.query['prefix']
191 # 1000 is the default value specified by aws spec
192 max_keys = int(request.query.get('max-keys', 1000))
193 marker = request.query.get('marker')
194
195 bucket_name = self._get_bucket_name(request)
196 bucket_storage = self._storage[bucket_name]
197
198 s3_objects_dict = bucket_storage.get_objects(parent_dir=prefix)
199 result = self._generate_get_objects_result(
200 s3_objects_dict=s3_objects_dict,
201 max_keys=max_keys,
202 marker=marker,
203 )
204 result_xml = self._generate_get_objects_xml(
205 s3_objects=result['result_objects'],
206 bucket_name=bucket_name,
207 prefix=prefix,
208 max_keys=max_keys,
209 marker=marker,
210 is_truncated=result['is_truncated'],
211 )
212 return self._mockserver.make_response(result_xml, 200)
213
214 def delete_object(self, request):
215 key = self._extract_key(request)
216
217 bucket_storage = self._storage[self._get_bucket_name(request)]
218
219 bucket_storage.delete_object(key)
220 # S3 always return 204, even if file doesn't exist
221 return self._mockserver.make_response('OK', 204)
222
223 def get_object_head(self, request):
224 key = self._extract_key(request)
225
226 bucket_storage = self._storage[self._get_bucket_name(request)]
227
228 s3_object = bucket_storage.get_object(key)
229 if not s3_object:
230 return self._mockserver.make_response('Object not found', 404)
231 return self._mockserver.make_response(
232 'OK',
233 200,
234 headers=s3_object.meta,
235 )