userver: /data/code/service_template/third_party/userver/testsuite/pytest_plugins/pytest_userver/plugins/log_capture.py Source File
Loading...
Searching...
No Matches
log_capture.py
1"""
2Capture and work with logs.
3"""
4
5# pylint: disable=redefined-outer-name
6import asyncio
7import contextlib
8import sys
9import typing
10
11import pytest
12
13from testsuite.utils import callinfo
14from testsuite.utils import compat
15from testsuite.utils import net as net_utils
16
17from ..utils import tskv
18
19USERVER_CONFIG_HOOKS = ['_userver_config_logs_capture']
20DEFAULT_PORT = 2211
21
22
24 def __init__(self) -> None:
25 self._logs: typing.List[tskv.TskvRow] = []
26 self._subscribers: typing.List = []
27
28 async def publish(self, row: tskv.TskvRow) -> None:
29 self._logs.append(row)
30 for query, callback in self._subscribers:
31 if _match_entry(row, query):
32 await callback(**row)
33
34 def select(self, **query) -> typing.List[tskv.TskvRow]:
35 result = []
36 for row in self._logs:
37 if _match_entry(row, query):
38 result.append(row)
39 return result
40
41 def subscribe(self, **query):
42 def decorator(func):
43 decorated = callinfo.acallqueue(func)
44 self._subscribers.append((query, decorated))
45 return decorated
46
47 return decorator
48
49
51 def __init__(self):
52 self._capture: typing.Optional[CapturedLogs] = None
53 self._tasks = []
54
55 @compat.asynccontextmanager
56 async def start_capture(self, *, timeout=10.0):
57 if self._capture:
58 yield self._capture
59 return
60 self._capture = CapturedLogs()
61 try:
62 yield self._capture
63 finally:
64 self._capture = None
65 if self._tasks:
66 _, pending = await asyncio.wait(self._tasks, timeout=timeout)
67 self._tasks = []
68 if pending:
69 raise RuntimeError(
70 'Timedout while waiting for capture task to finish',
71 )
72
73 @compat.asynccontextmanager
74 async def start_server(self, *, sock, loop=None):
75 extra = {}
76 if sys.version_info < (3, 8):
77 extra['loop'] = loop
78 server = await asyncio.start_server(
79 self._handle_client_handle_client, sock=sock, **extra,
80 )
81 try:
82 yield server
83 finally:
84 server.close()
85 await server.wait_closed()
86
87 async def _handle_client(self, reader, writer):
88 async def log_reader():
89 with contextlib.closing(writer):
90 async for line in reader:
91 if self._capture:
92 row = tskv.parse_line(line.decode('utf-8'))
93 await self._capture.publish(row)
94
95 self._tasks.append(asyncio.create_task(log_reader()))
96
97
98def pytest_addoption(parser):
99 group = parser.getgroup('logs-capture')
100 group.addoption(
101 '--logs-capture-port',
102 type=int,
103 default=0,
104 help='Port to bind logs-capture server to.',
105 )
106 group.addoption(
107 '--logs-capture-host',
108 default='localhost',
109 help='Host to bind logs-capture server to.',
110 )
111
112
113@pytest.fixture(scope='session')
114def userver_log_capture(_userver_capture_control, _userver_capture_server):
115 return _userver_capture_control
116
117
118@pytest.fixture(scope='session')
119def _userver_capture_control():
120 return CaptureControl()
121
122
123@pytest.fixture(scope='session')
124def _userver_log_capture_socket(pytestconfig):
125 host = pytestconfig.option.logs_capture_host
126 port = pytestconfig.option.logs_capture_port
127 if pytestconfig.option.service_wait or pytestconfig.option.service_disable:
128 port = port or DEFAULT_PORT
129 with net_utils.bind_socket(host, port) as socket:
130 yield socket
131
132
133@pytest.fixture(scope='session')
134async def _userver_capture_server(
135 _userver_capture_control: CaptureControl,
136 _userver_log_capture_socket,
137 loop,
138):
139 async with _userver_capture_control.start_server(
140 sock=_userver_log_capture_socket, loop=loop,
141 ) as server:
142 yield server
143
144
145@pytest.fixture(scope='session')
146def _userver_config_logs_capture(_userver_log_capture_socket):
147 def patch_config(config, _config_vars) -> None:
148 sockname = _userver_log_capture_socket.getsockname()
149 logging_config = config['components_manager']['components']['logging']
150 default_logger = logging_config['loggers']['default']
151 # Other formats are not yet supported by log-capture.
152 default_logger['format'] = 'tskv'
153 default_logger['testsuite-capture'] = {
154 'host': sockname[0],
155 'port': sockname[1],
156 }
157
158 return patch_config
159
160
161def _match_entry(row: tskv.TskvRow, query) -> bool:
162 for key, value in query.items():
163 if row.get(key) != value:
164 return False
165 return True