userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/log_capture.py Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
log_capture.py
1"""
2Capture and work with logs.
3"""
4
5# pylint: disable=redefined-outer-name
6import asyncio
7import contextlib
8import enum
9import logging
10import sys
11import typing
12
13import pytest
14
15from testsuite.utils import callinfo
16from testsuite.utils import compat
17from testsuite.utils import net as net_utils
18
19from ..utils import tskv
20
21USERVER_CONFIG_HOOKS = ['_userver_config_logs_capture']
22DEFAULT_PORT = 2211
23
24logger = logging.getLogger(__name__)
25
26
27class BaseError(Exception):
28 pass
29
30
32 pass
33
34
39class LogLevel(enum.Enum):
40 TRACE = 0
41 DEBUG = 1
42 INFO = 2
43 WARNING = 3
44 ERROR = 4
45 CRITICAL = 5
46 NONE = 6
47
48 @classmethod
49 def from_string(cls, level: str) -> 'LogLevel':
50 return cls[level.upper()]
51
52
54 def __init__(self, *, log_level: str) -> None:
55 self._log_level = LogLevel.from_string(log_level)
56 self._logs: typing.List[tskv.TskvRow] = []
57 self._subscribers: typing.List = []
58 self._closed = False
59
60 def close(self):
61 self._closed = True
62
63 async def publish(self, row: tskv.TskvRow) -> None:
64 self._logs.append(row)
65 for query, callback in self._subscribers:
66 if _match_entry(row, query):
67 await callback(**row)
68
69 def select(self, **query) -> typing.List[tskv.TskvRow]:
70 if not self._closed:
72 'select() is only supported for closed captures\nPlease move select() after context manager body',
73 )
74 level = query.get('level')
75 if level:
76 log_level = LogLevel[level]
77 if log_level.value < self._log_level.value:
79 f'Requested log level={log_level.name} is lower than service log level {self._log_level.name}',
80 )
81 result = []
82 for row in self._logs:
83 if _match_entry(row, query):
84 result.append(row)
85 return result
86
87 def subscribe(self, **query):
88 if self._closed:
90 'subscribe() is not supported for closed captures\nPlease move subscribe() into context manager body',
91 )
92
93 def decorator(func):
94 decorated = callinfo.acallqueue(func)
95 self._subscribers.append((query, decorated))
96 return decorated
97
98 return decorator
99
100
102 def __init__(self, *, log_level: str):
103 self.default_log_level = log_level
104 self._capture: typing.Optional[CapturedLogs] = None
105 self._tasks = []
106 self._client_cond = asyncio.Condition()
107
108 async def wait_for_client(self, timeout: float = 10.0):
109 async def waiter():
110 async with self._client_cond:
111 await self._client_cond.wait_for(lambda: self._tasks)
112
113 logger.debug('Waiting for logcapture client to connect...')
114 try:
115 await asyncio.wait_for(waiter(), timeout=timeout)
116 except TimeoutError:
118 'Timedout while waiting for logcapture client to connect',
119 )
120
121 @compat.asynccontextmanager
122 async def start_capture(
123 self,
124 *,
125 log_level: typing.Optional[str] = None,
126 timeout: float = 10.0,
127 ):
128 if self._capture:
129 yield self._capture
130 return
131
132 if not log_level:
133 log_level = self.default_log_level
134
135 self._capture = CapturedLogs(log_level=log_level)
136 try:
137 yield self._capture
138 finally:
139 self._capture.close()
140 self._capture = None
141 if self._tasks:
142 _, pending = await asyncio.wait(self._tasks, timeout=timeout)
143 self._tasks = []
144 if pending:
145 raise RuntimeError(
146 'Timedout while waiting for capture task to finish',
147 )
148
149 @compat.asynccontextmanager
150 async def start_server(self, *, sock, loop=None):
151 extra = {}
152 if sys.version_info < (3, 8):
153 extra['loop'] = loop
154 server = await asyncio.start_server(
156 sock=sock,
157 **extra,
158 )
159 try:
160 yield server
161 finally:
162 server.close()
163 await server.wait_closed()
164
165 async def _handle_client(self, reader, writer):
166 logger.debug('logcapture client connected')
167
168 async def log_reader(capture: CapturedLogs):
169 with contextlib.closing(writer):
170 async for line in reader:
171 row = tskv.parse_line(line.decode('utf-8'))
172 await capture.publish(row)
173 await writer.wait_closed()
174
175 if not self._capture:
176 writer.close()
177 await writer.wait_closed()
178 else:
179 self._tasks.append(asyncio.create_task(log_reader(self._capture)))
180 async with self._client_cond:
181 self._client_cond.notify_all()
182
183
184def pytest_addoption(parser):
185 group = parser.getgroup('logs-capture')
186 group.addoption(
187 '--logs-capture-port',
188 type=int,
189 default=0,
190 help='Port to bind logs-capture server to.',
191 )
192 group.addoption(
193 '--logs-capture-host',
194 default='localhost',
195 help='Host to bind logs-capture server to.',
196 )
197
198
199@pytest.fixture(scope='session')
200def userver_log_capture(_userver_capture_control, _userver_capture_server):
201 return _userver_capture_control
202
203
204@pytest.fixture(scope='session')
205def _userver_capture_control(userver_log_level):
206 return CaptureControl(log_level=userver_log_level)
207
208
209@pytest.fixture(scope='session')
210def _userver_log_capture_socket(pytestconfig):
211 host = pytestconfig.option.logs_capture_host
212 port = pytestconfig.option.logs_capture_port
213 if pytestconfig.option.service_wait or pytestconfig.option.service_disable:
214 port = port or DEFAULT_PORT
215 with net_utils.bind_socket(host, port) as socket:
216 yield socket
217
218
219@pytest.fixture(scope='session')
220async def _userver_capture_server(
221 _userver_capture_control: CaptureControl,
222 _userver_log_capture_socket,
223 loop,
224):
225 async with _userver_capture_control.start_server(
226 sock=_userver_log_capture_socket,
227 loop=loop,
228 ) as server:
229 yield server
230
231
232@pytest.fixture(scope='session')
233def _userver_config_logs_capture(_userver_log_capture_socket):
234 def patch_config(config, _config_vars) -> None:
235 sockname = _userver_log_capture_socket.getsockname()
236 logging_config = config['components_manager']['components']['logging']
237 default_logger = logging_config['loggers']['default']
238 # Other formats are not yet supported by log-capture.
239 default_logger['format'] = 'tskv'
240 default_logger['testsuite-capture'] = {
241 'host': sockname[0],
242 'port': sockname[1],
243 }
244
245 return patch_config
246
247
248def _match_entry(row: tskv.TskvRow, query) -> bool:
249 for key, value in query.items():
250 if row.get(key) != value:
251 return False
252 return True
253
254
255def __tracebackhide__(excinfo):
256 return excinfo.errisinstance(BaseError)