userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/log_capture.py Source File
Loading...
Searching...
No Matches
log_capture.py
1"""
2Capture and work with logs.
3"""
4
5# pylint: disable=redefined-outer-name
6import asyncio
7import contextlib
8import enum
9import logging
10import sys
11import typing
12
13import pytest
14
15from testsuite.utils import callinfo
16from testsuite.utils import compat
17from testsuite.utils import net as net_utils
18
19from ..utils import tskv
20
21USERVER_CONFIG_HOOKS = ['_userver_config_logs_capture']
22DEFAULT_PORT = 2211
23
24logger = logging.getLogger(__name__)
25
26
27class BaseError(Exception):
28 pass
29
30
32 pass
33
34
39class LogLevel(enum.Enum):
40 TRACE = 0
41 DEBUG = 1
42 INFO = 2
43 WARNING = 3
44 ERROR = 4
45 CRITICAL = 5
46 NONE = 6
47
48 @classmethod
49 def from_string(cls, level: str) -> 'LogLevel':
50 return cls[level.upper()]
51
52
54 def __init__(self, *, log_level: str) -> None:
55 self._log_level = LogLevel.from_string(log_level)
56 self._logs: typing.List[tskv.TskvRow] = []
57 self._subscribers: typing.List = []
58 self._closed = False
59
60 def close(self):
61 self._closed = True
62
63 async def publish(self, row: tskv.TskvRow) -> None:
64 self._logs.append(row)
65 for query, callback in self._subscribers:
66 if _match_entry(row, query):
67 await callback(**row)
68
69 def select(self, **query) -> typing.List[tskv.TskvRow]:
70 if not self._closed:
72 'select() is only supported for closed captures\n'
73 'Please move select() after context manager body',
74 )
75 level = query.get('level')
76 if level:
77 log_level = LogLevel[level]
78 if log_level.value < self._log_level.value:
80 f'Requested log level={log_level.name} is lower than '
81 f'service log level {self._log_level.name}',
82 )
83 result = []
84 for row in self._logs:
85 if _match_entry(row, query):
86 result.append(row)
87 return result
88
89 def subscribe(self, **query):
90 if self._closed:
92 'subscribe() is not supported for closed captures\n'
93 'Please move subscribe() into context manager body',
94 )
95
96 def decorator(func):
97 decorated = callinfo.acallqueue(func)
98 self._subscribers.append((query, decorated))
99 return decorated
100
101 return decorator
102
103
105 def __init__(self, *, log_level: str):
106 self.default_log_level = log_level
107 self._capture: typing.Optional[CapturedLogs] = None
108 self._tasks = []
109 self._client_cond = asyncio.Condition()
110
111 async def wait_for_client(self, timeout: float = 10.0):
112 async def waiter():
113 async with self._client_cond:
114 await self._client_cond.wait_for(lambda: self._tasks)
115
116 logger.debug('Waiting for logcapture client to connect...')
117 try:
118 await asyncio.wait_for(waiter(), timeout=timeout)
119 except TimeoutError:
121 'Timedout while waiting for logcapture client to connect',
122 )
123
124 @compat.asynccontextmanager
125 async def start_capture(
126 self, *, log_level: typing.Optional[str] = None, timeout: float = 10.0,
127 ):
128 if self._capture:
129 yield self._capture
130 return
131
132 if not log_level:
133 log_level = self.default_log_level
134
135 self._capture = CapturedLogs(log_level=log_level)
136 try:
137 yield self._capture
138 finally:
139 self._capture.close()
140 self._capture = None
141 if self._tasks:
142 _, pending = await asyncio.wait(self._tasks, timeout=timeout)
143 self._tasks = []
144 if pending:
145 raise RuntimeError(
146 'Timedout while waiting for capture task to finish',
147 )
148
149 @compat.asynccontextmanager
150 async def start_server(self, *, sock, loop=None):
151 extra = {}
152 if sys.version_info < (3, 8):
153 extra['loop'] = loop
154 server = await asyncio.start_server(
155 self._handle_client_handle_client, sock=sock, **extra,
156 )
157 try:
158 yield server
159 finally:
160 server.close()
161 await server.wait_closed()
162
163 async def _handle_client(self, reader, writer):
164 logger.debug('logcapture client connected')
165
166 async def log_reader(capture: CapturedLogs):
167 with contextlib.closing(writer):
168 async for line in reader:
169 row = tskv.parse_line(line.decode('utf-8'))
170 await capture.publish(row)
171 await writer.wait_closed()
172
173 if not self._capture:
174 writer.close()
175 await writer.wait_closed()
176 else:
177 self._tasks.append(asyncio.create_task(log_reader(self._capture)))
178 async with self._client_cond:
179 self._client_cond.notify_all()
180
181
182def pytest_addoption(parser):
183 group = parser.getgroup('logs-capture')
184 group.addoption(
185 '--logs-capture-port',
186 type=int,
187 default=0,
188 help='Port to bind logs-capture server to.',
189 )
190 group.addoption(
191 '--logs-capture-host',
192 default='localhost',
193 help='Host to bind logs-capture server to.',
194 )
195
196
197@pytest.fixture(scope='session')
198def userver_log_capture(_userver_capture_control, _userver_capture_server):
199 return _userver_capture_control
200
201
202@pytest.fixture(scope='session')
203def _userver_capture_control(userver_log_level):
204 return CaptureControl(log_level=userver_log_level)
205
206
207@pytest.fixture(scope='session')
208def _userver_log_capture_socket(pytestconfig):
209 host = pytestconfig.option.logs_capture_host
210 port = pytestconfig.option.logs_capture_port
211 if pytestconfig.option.service_wait or pytestconfig.option.service_disable:
212 port = port or DEFAULT_PORT
213 with net_utils.bind_socket(host, port) as socket:
214 yield socket
215
216
217@pytest.fixture(scope='session')
218async def _userver_capture_server(
219 _userver_capture_control: CaptureControl, _userver_log_capture_socket, loop,
220):
221 async with _userver_capture_control.start_server(
222 sock=_userver_log_capture_socket, loop=loop,
223 ) as server:
224 yield server
225
226
227@pytest.fixture(scope='session')
228def _userver_config_logs_capture(_userver_log_capture_socket):
229 def patch_config(config, _config_vars) -> None:
230 sockname = _userver_log_capture_socket.getsockname()
231 logging_config = config['components_manager']['components']['logging']
232 default_logger = logging_config['loggers']['default']
233 # Other formats are not yet supported by log-capture.
234 default_logger['format'] = 'tskv'
235 default_logger['testsuite-capture'] = {
236 'host': sockname[0],
237 'port': sockname[1],
238 }
239
240 return patch_config
241
242
243def _match_entry(row: tskv.TskvRow, query) -> bool:
244 for key, value in query.items():
245 if row.get(key) != value:
246 return False
247 return True
248
249
250def __tracebackhide__(excinfo):
251 return excinfo.errisinstance(BaseError)