2Capture and work with logs.
13from testsuite.utils
import callinfo
14from testsuite.utils
import compat
15from testsuite.utils
import net
as net_utils
17from ..utils
import tskv
19USERVER_CONFIG_HOOKS = [
'_userver_config_logs_capture']
24 def __init__(self) -> None:
25 self._logs: typing.List[tskv.TskvRow] = []
26 self._subscribers: typing.List = []
28 async def publish(self, row: tskv.TskvRow) ->
None:
29 self._logs.append(row)
30 for query, callback
in self._subscribers:
31 if _match_entry(row, query):
34 def select(self, **query) -> typing.List[tskv.TskvRow]:
36 for row
in self._logs:
37 if _match_entry(row, query):
41 def subscribe(self, **query):
43 decorated = callinfo.acallqueue(func)
44 self._subscribers.append((query, decorated))
52 self.
_capture: typing.Optional[CapturedLogs] =
None
55 @compat.asynccontextmanager
56 async def start_capture(self, *, timeout=10.0):
66 _, pending = await asyncio.wait(self.
_tasks, timeout=timeout)
70 'Timedout while waiting for capture task to finish',
73 @compat.asynccontextmanager
74 async def start_server(self, *, sock, loop=None):
76 if sys.version_info < (3, 8):
78 server = await asyncio.start_server(
85 await server.wait_closed()
87 async def _handle_client(self, reader, writer):
88 async def log_reader():
89 with contextlib.closing(writer):
90 async for line
in reader:
92 row = tskv.parse_line(line.decode(
'utf-8'))
95 self.
_tasks.append(asyncio.create_task(log_reader()))
98def pytest_addoption(parser):
99 group = parser.getgroup(
'logs-capture')
101 '--logs-capture-port',
104 help=
'Port to bind logs-capture server to.',
107 '--logs-capture-host',
109 help=
'Host to bind logs-capture server to.',
113@pytest.fixture(scope='session')
114def userver_log_capture(_userver_capture_control, _userver_capture_server):
115 return _userver_capture_control
118@pytest.fixture(scope='session')
119def _userver_capture_control():
123@pytest.fixture(scope='session')
124def _userver_log_capture_socket(pytestconfig):
125 host = pytestconfig.option.logs_capture_host
126 port = pytestconfig.option.logs_capture_port
127 if pytestconfig.option.service_wait
or pytestconfig.option.service_disable:
128 port = port
or DEFAULT_PORT
129 with net_utils.bind_socket(host, port)
as socket:
133@pytest.fixture(scope='session')
134async def _userver_capture_server(
135 _userver_capture_control: CaptureControl,
136 _userver_log_capture_socket,
139 async with _userver_capture_control.start_server(
140 sock=_userver_log_capture_socket, loop=loop,
145@pytest.fixture(scope='session')
146def _userver_config_logs_capture(_userver_log_capture_socket):
147 def patch_config(config, _config_vars) -> None:
148 sockname = _userver_log_capture_socket.getsockname()
149 logging_config = config[
'components_manager'][
'components'][
'logging']
150 default_logger = logging_config[
'loggers'][
'default']
152 default_logger[
'format'] =
'tskv'
153 default_logger[
'testsuite-capture'] = {
161def _match_entry(row: tskv.TskvRow, query) -> bool:
162 for key, value
in query.items():
163 if row.get(key) != value: