2Capture and work with logs.
14from testsuite.utils
import callinfo
15from testsuite.utils
import compat
16from testsuite.utils
import net
as net_utils
18from ..utils
import tskv
20USERVER_CONFIG_HOOKS = [
'_userver_config_logs_capture']
34 def from_string(cls, level: str) ->
'LogLevel':
35 return cls[level.upper()]
39 def __init__(self, *, log_level: str) ->
None:
40 self.
_log_level = LogLevel.from_string(log_level)
41 self._logs: typing.List[tskv.TskvRow] = []
42 self._subscribers: typing.List = []
44 async def publish(self, row: tskv.TskvRow) ->
None:
45 self._logs.append(row)
46 for query, callback
in self._subscribers:
47 if _match_entry(row, query):
50 def select(self, **query) -> typing.List[tskv.TskvRow]:
51 level = query.get(
'level')
53 log_level = LogLevel[level]
56 f
'Requested log level={log_level.name} is lower than '
57 f
'service log level {self._log_level.name}',
60 for row
in self._logs:
61 if _match_entry(row, query):
65 def subscribe(self, **query):
67 decorated = callinfo.acallqueue(func)
68 self._subscribers.append((query, decorated))
75 def __init__(self, *, log_level: str):
77 self.
_capture: typing.Optional[CapturedLogs] =
None
80 @compat.asynccontextmanager
81 async def start_capture(
84 log_level: typing.Optional[str] =
None,
85 timeout: float = 10.0,
100 _, pending = await asyncio.wait(self.
_tasks, timeout=timeout)
104 'Timedout while waiting for capture task to finish',
107 @compat.asynccontextmanager
108 async def start_server(self, *, sock, loop=None):
110 if sys.version_info < (3, 8):
112 server = await asyncio.start_server(
119 await server.wait_closed()
121 async def _handle_client(self, reader, writer):
122 async def log_reader():
123 with contextlib.closing(writer):
124 async for line
in reader:
126 row = tskv.parse_line(line.decode(
'utf-8'))
129 self.
_tasks.append(asyncio.create_task(log_reader()))
132def pytest_addoption(parser):
133 group = parser.getgroup(
'logs-capture')
135 '--logs-capture-port',
138 help=
'Port to bind logs-capture server to.',
141 '--logs-capture-host',
143 help=
'Host to bind logs-capture server to.',
147@pytest.fixture(scope='session')
148def userver_log_capture(_userver_capture_control, _userver_capture_server):
149 return _userver_capture_control
152@pytest.fixture(scope='session')
153def _userver_capture_control(userver_log_level):
157@pytest.fixture(scope='session')
158def _userver_log_capture_socket(pytestconfig):
159 host = pytestconfig.option.logs_capture_host
160 port = pytestconfig.option.logs_capture_port
161 if pytestconfig.option.service_wait
or pytestconfig.option.service_disable:
162 port = port
or DEFAULT_PORT
163 with net_utils.bind_socket(host, port)
as socket:
167@pytest.fixture(scope='session')
168async def _userver_capture_server(
169 _userver_capture_control: CaptureControl,
170 _userver_log_capture_socket,
173 async with _userver_capture_control.start_server(
174 sock=_userver_log_capture_socket, loop=loop,
179@pytest.fixture(scope='session')
180def _userver_config_logs_capture(_userver_log_capture_socket):
181 def patch_config(config, _config_vars) -> None:
182 sockname = _userver_log_capture_socket.getsockname()
183 logging_config = config[
'components_manager'][
'components'][
'logging']
184 default_logger = logging_config[
'loggers'][
'default']
186 default_logger[
'format'] =
'tskv'
187 default_logger[
'testsuite-capture'] = {
195def _match_entry(row: tskv.TskvRow, query) -> bool:
196 for key, value
in query.items():
197 if row.get(key) != value: