2Capture and work with logs.
15from testsuite.utils
import callinfo
16from testsuite.utils
import compat
17from testsuite.utils
import net
as net_utils
19from ..utils
import tskv
21USERVER_CONFIG_HOOKS = [
'_userver_config_logs_capture']
24logger = logging.getLogger(__name__)
49 def from_string(cls, level: str) ->
'LogLevel':
50 return cls[level.upper()]
54 def __init__(self, *, log_level: str) ->
None:
55 self.
_log_level = LogLevel.from_string(log_level)
56 self._logs: typing.List[tskv.TskvRow] = []
57 self._subscribers: typing.List = []
63 async def publish(self, row: tskv.TskvRow) ->
None:
64 self._logs.append(row)
65 for query, callback
in self._subscribers:
66 if _match_entry(row, query):
69 def select(self, **query) -> typing.List[tskv.TskvRow]:
72 'select() is only supported for closed captures\n'
73 'Please move select() after context manager body',
75 level = query.get(
'level')
77 log_level = LogLevel[level]
80 f
'Requested log level={log_level.name} is lower than '
81 f
'service log level {self._log_level.name}',
84 for row
in self._logs:
85 if _match_entry(row, query):
89 def subscribe(self, **query):
92 'subscribe() is not supported for closed captures\n'
93 'Please move subscribe() into context manager body',
97 decorated = callinfo.acallqueue(func)
98 self._subscribers.append((query, decorated))
105 def __init__(self, *, log_level: str):
107 self.
_capture: typing.Optional[CapturedLogs] =
None
111 async def wait_for_client(self, timeout: float = 10.0):
116 logger.debug(
'Waiting for logcapture client to connect...')
118 await asyncio.wait_for(waiter(), timeout=timeout)
121 'Timedout while waiting for logcapture client to connect',
124 @compat.asynccontextmanager
125 async def start_capture(
126 self, *, log_level: typing.Optional[str] =
None, timeout: float = 10.0,
142 _, pending = await asyncio.wait(self.
_tasks, timeout=timeout)
146 'Timedout while waiting for capture task to finish',
149 @compat.asynccontextmanager
150 async def start_server(self, *, sock, loop=None):
152 if sys.version_info < (3, 8):
154 server = await asyncio.start_server(
161 await server.wait_closed()
163 async def _handle_client(self, reader, writer):
164 logger.debug(
'logcapture client connected')
166 async def log_reader(capture: CapturedLogs):
167 with contextlib.closing(writer):
168 async for line
in reader:
169 row = tskv.parse_line(line.decode(
'utf-8'))
170 await capture.publish(row)
171 await writer.wait_closed()
175 await writer.wait_closed()
177 self.
_tasks.append(asyncio.create_task(log_reader(self.
_capture)))
182def pytest_addoption(parser):
183 group = parser.getgroup(
'logs-capture')
185 '--logs-capture-port',
188 help=
'Port to bind logs-capture server to.',
191 '--logs-capture-host',
193 help=
'Host to bind logs-capture server to.',
197@pytest.fixture(scope='session')
198def userver_log_capture(_userver_capture_control, _userver_capture_server):
199 return _userver_capture_control
202@pytest.fixture(scope='session')
203def _userver_capture_control(userver_log_level):
207@pytest.fixture(scope='session')
208def _userver_log_capture_socket(pytestconfig):
209 host = pytestconfig.option.logs_capture_host
210 port = pytestconfig.option.logs_capture_port
211 if pytestconfig.option.service_wait
or pytestconfig.option.service_disable:
212 port = port
or DEFAULT_PORT
213 with net_utils.bind_socket(host, port)
as socket:
217@pytest.fixture(scope='session')
218async def _userver_capture_server(
219 _userver_capture_control: CaptureControl, _userver_log_capture_socket, loop,
221 async with _userver_capture_control.start_server(
222 sock=_userver_log_capture_socket, loop=loop,
227@pytest.fixture(scope='session')
228def _userver_config_logs_capture(_userver_log_capture_socket):
229 def patch_config(config, _config_vars) -> None:
230 sockname = _userver_log_capture_socket.getsockname()
231 logging_config = config[
'components_manager'][
'components'][
'logging']
232 default_logger = logging_config[
'loggers'][
'default']
234 default_logger[
'format'] =
'tskv'
235 default_logger[
'testsuite-capture'] = {
243def _match_entry(row: tskv.TskvRow, query) -> bool:
244 for key, value
in query.items():
245 if row.get(key) != value:
250def __tracebackhide__(excinfo):
251 return excinfo.errisinstance(BaseError)