userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/log_capture.py Source File
Loading...
Searching...
No Matches
log_capture.py
1"""
2Capture and work with logs.
3"""
4
5# pylint: disable=redefined-outer-name
6import asyncio
7import contextlib
8import enum
9import logging
10import sys
11import typing
12
13import pytest
14
15from testsuite.utils import callinfo
16from testsuite.utils import compat
17from testsuite.utils import net as net_utils
18
19from ..utils import tskv
20
21USERVER_CONFIG_HOOKS = ['_userver_config_logs_capture']
22DEFAULT_PORT = 2211
23
24logger = logging.getLogger(__name__)
25
26
27class BaseError(Exception):
28 pass
29
30
32 pass
33
34
39class LogLevel(enum.Enum):
40 TRACE = 0
41 DEBUG = 1
42 INFO = 2
43 WARNING = 3
44 ERROR = 4
45 CRITICAL = 5
46 NONE = 6
47
48 @classmethod
49 def from_string(cls, level: str) -> 'LogLevel':
50 return cls[level.upper()]
51
52
54 def __init__(self, *, log_level: str) -> None:
55 self._log_level = LogLevel.from_string(log_level)
56 self._logs: typing.List[tskv.TskvRow] = []
57 self._subscribers: typing.List = []
58 self._closed = False
59
60 def close(self):
61 self._closed = True
62
63 async def publish(self, row: tskv.TskvRow) -> None:
64 self._logs.append(row)
65 for query, callback in self._subscribers:
66 if _match_entry(row, query):
67 await callback(**row)
68
69 def select(self, **query) -> typing.List[tskv.TskvRow]:
70 if not self._closed:
72 'select() is only supported for closed captures\n' 'Please move select() after context manager body',
73 )
74 level = query.get('level')
75 if level:
76 log_level = LogLevel[level]
77 if log_level.value < self._log_level.value:
79 f'Requested log level={log_level.name} is lower than ' f'service log level {self._log_level.name}',
80 )
81 result = []
82 for row in self._logs:
83 if _match_entry(row, query):
84 result.append(row)
85 return result
86
87 def subscribe(self, **query):
88 if self._closed:
90 'subscribe() is not supported for closed captures\n'
91 'Please move subscribe() into context manager body',
92 )
93
94 def decorator(func):
95 decorated = callinfo.acallqueue(func)
96 self._subscribers.append((query, decorated))
97 return decorated
98
99 return decorator
100
101
103 def __init__(self, *, log_level: str):
104 self.default_log_level = log_level
105 self._capture: typing.Optional[CapturedLogs] = None
106 self._tasks = []
107 self._client_cond = asyncio.Condition()
108
109 async def wait_for_client(self, timeout: float = 10.0):
110 async def waiter():
111 async with self._client_cond:
112 await self._client_cond.wait_for(lambda: self._tasks)
113
114 logger.debug('Waiting for logcapture client to connect...')
115 try:
116 await asyncio.wait_for(waiter(), timeout=timeout)
117 except TimeoutError:
119 'Timedout while waiting for logcapture client to connect',
120 )
121
122 @compat.asynccontextmanager
123 async def start_capture(
124 self,
125 *,
126 log_level: typing.Optional[str] = None,
127 timeout: float = 10.0,
128 ):
129 if self._capture:
130 yield self._capture
131 return
132
133 if not log_level:
134 log_level = self.default_log_level
135
136 self._capture = CapturedLogs(log_level=log_level)
137 try:
138 yield self._capture
139 finally:
140 self._capture.close()
141 self._capture = None
142 if self._tasks:
143 _, pending = await asyncio.wait(self._tasks, timeout=timeout)
144 self._tasks = []
145 if pending:
146 raise RuntimeError(
147 'Timedout while waiting for capture task to finish',
148 )
149
150 @compat.asynccontextmanager
151 async def start_server(self, *, sock, loop=None):
152 extra = {}
153 if sys.version_info < (3, 8):
154 extra['loop'] = loop
155 server = await asyncio.start_server(
157 sock=sock,
158 **extra,
159 )
160 try:
161 yield server
162 finally:
163 server.close()
164 await server.wait_closed()
165
166 async def _handle_client(self, reader, writer):
167 logger.debug('logcapture client connected')
168
169 async def log_reader(capture: CapturedLogs):
170 with contextlib.closing(writer):
171 async for line in reader:
172 row = tskv.parse_line(line.decode('utf-8'))
173 await capture.publish(row)
174 await writer.wait_closed()
175
176 if not self._capture:
177 writer.close()
178 await writer.wait_closed()
179 else:
180 self._tasks.append(asyncio.create_task(log_reader(self._capture)))
181 async with self._client_cond:
182 self._client_cond.notify_all()
183
184
185def pytest_addoption(parser):
186 group = parser.getgroup('logs-capture')
187 group.addoption(
188 '--logs-capture-port',
189 type=int,
190 default=0,
191 help='Port to bind logs-capture server to.',
192 )
193 group.addoption(
194 '--logs-capture-host',
195 default='localhost',
196 help='Host to bind logs-capture server to.',
197 )
198
199
200@pytest.fixture(scope='session')
201def userver_log_capture(_userver_capture_control, _userver_capture_server):
202 return _userver_capture_control
203
204
205@pytest.fixture(scope='session')
206def _userver_capture_control(userver_log_level):
207 return CaptureControl(log_level=userver_log_level)
208
209
210@pytest.fixture(scope='session')
211def _userver_log_capture_socket(pytestconfig):
212 host = pytestconfig.option.logs_capture_host
213 port = pytestconfig.option.logs_capture_port
214 if pytestconfig.option.service_wait or pytestconfig.option.service_disable:
215 port = port or DEFAULT_PORT
216 with net_utils.bind_socket(host, port) as socket:
217 yield socket
218
219
220@pytest.fixture(scope='session')
221async def _userver_capture_server(
222 _userver_capture_control: CaptureControl,
223 _userver_log_capture_socket,
224 loop,
225):
226 async with _userver_capture_control.start_server(
227 sock=_userver_log_capture_socket,
228 loop=loop,
229 ) as server:
230 yield server
231
232
233@pytest.fixture(scope='session')
234def _userver_config_logs_capture(_userver_log_capture_socket):
235 def patch_config(config, _config_vars) -> None:
236 sockname = _userver_log_capture_socket.getsockname()
237 logging_config = config['components_manager']['components']['logging']
238 default_logger = logging_config['loggers']['default']
239 # Other formats are not yet supported by log-capture.
240 default_logger['format'] = 'tskv'
241 default_logger['testsuite-capture'] = {
242 'host': sockname[0],
243 'port': sockname[1],
244 }
245
246 return patch_config
247
248
249def _match_entry(row: tskv.TskvRow, query) -> bool:
250 for key, value in query.items():
251 if row.get(key) != value:
252 return False
253 return True
254
255
256def __tracebackhide__(excinfo):
257 return excinfo.errisinstance(BaseError)