userver: /data/code/service_template/third_party/userver/testsuite/pytest_plugins/pytest_userver/plugins/log_capture.py Source File
Loading...
Searching...
No Matches
log_capture.py
1"""
2Capture and work with logs.
3"""
4
5# pylint: disable=redefined-outer-name
6import asyncio
7import contextlib
8import enum
9import sys
10import typing
11
12import pytest
13
14from testsuite.utils import callinfo
15from testsuite.utils import compat
16from testsuite.utils import net as net_utils
17
18from ..utils import tskv
19
20USERVER_CONFIG_HOOKS = ['_userver_config_logs_capture']
21DEFAULT_PORT = 2211
22
23
24class LogLevel(enum.Enum):
25 TRACE = 0
26 DEBUG = 1
27 INFO = 2
28 WARNING = 3
29 ERROR = 4
30 CRITICAL = 5
31 NONE = 6
32
33 @classmethod
34 def from_string(cls, level: str) -> 'LogLevel':
35 return cls[level.upper()]
36
37
39 def __init__(self, *, log_level: str) -> None:
40 self._log_level = LogLevel.from_string(log_level)
41 self._logs: typing.List[tskv.TskvRow] = []
42 self._subscribers: typing.List = []
43
44 async def publish(self, row: tskv.TskvRow) -> None:
45 self._logs.append(row)
46 for query, callback in self._subscribers:
47 if _match_entry(row, query):
48 await callback(**row)
49
50 def select(self, **query) -> typing.List[tskv.TskvRow]:
51 level = query.get('level')
52 if level:
53 log_level = LogLevel[level]
54 if log_level.value < self._log_level.value:
55 raise RuntimeError(
56 f'Requested log level={log_level.name} is lower than '
57 f'service log level {self._log_level.name}',
58 )
59 result = []
60 for row in self._logs:
61 if _match_entry(row, query):
62 result.append(row)
63 return result
64
65 def subscribe(self, **query):
66 def decorator(func):
67 decorated = callinfo.acallqueue(func)
68 self._subscribers.append((query, decorated))
69 return decorated
70
71 return decorator
72
73
75 def __init__(self, *, log_level: str):
76 self.default_log_level = log_level
77 self._capture: typing.Optional[CapturedLogs] = None
78 self._tasks = []
79
80 @compat.asynccontextmanager
81 async def start_capture(
82 self,
83 *,
84 log_level: typing.Optional[str] = None,
85 timeout: float = 10.0,
86 ):
87 if self._capture:
88 yield self._capture
89 return
90
91 if not log_level:
92 log_level = self.default_log_level
93
94 self._capture = CapturedLogs(log_level=log_level)
95 try:
96 yield self._capture
97 finally:
98 self._capture = None
99 if self._tasks:
100 _, pending = await asyncio.wait(self._tasks, timeout=timeout)
101 self._tasks = []
102 if pending:
103 raise RuntimeError(
104 'Timedout while waiting for capture task to finish',
105 )
106
107 @compat.asynccontextmanager
108 async def start_server(self, *, sock, loop=None):
109 extra = {}
110 if sys.version_info < (3, 8):
111 extra['loop'] = loop
112 server = await asyncio.start_server(
113 self._handle_client_handle_client, sock=sock, **extra,
114 )
115 try:
116 yield server
117 finally:
118 server.close()
119 await server.wait_closed()
120
121 async def _handle_client(self, reader, writer):
122 async def log_reader():
123 with contextlib.closing(writer):
124 async for line in reader:
125 if self._capture:
126 row = tskv.parse_line(line.decode('utf-8'))
127 await self._capture.publish(row)
128
129 self._tasks.append(asyncio.create_task(log_reader()))
130
131
132def pytest_addoption(parser):
133 group = parser.getgroup('logs-capture')
134 group.addoption(
135 '--logs-capture-port',
136 type=int,
137 default=0,
138 help='Port to bind logs-capture server to.',
139 )
140 group.addoption(
141 '--logs-capture-host',
142 default='localhost',
143 help='Host to bind logs-capture server to.',
144 )
145
146
147@pytest.fixture(scope='session')
148def userver_log_capture(_userver_capture_control, _userver_capture_server):
149 return _userver_capture_control
150
151
152@pytest.fixture(scope='session')
153def _userver_capture_control(userver_log_level):
154 return CaptureControl(log_level=userver_log_level)
155
156
157@pytest.fixture(scope='session')
158def _userver_log_capture_socket(pytestconfig):
159 host = pytestconfig.option.logs_capture_host
160 port = pytestconfig.option.logs_capture_port
161 if pytestconfig.option.service_wait or pytestconfig.option.service_disable:
162 port = port or DEFAULT_PORT
163 with net_utils.bind_socket(host, port) as socket:
164 yield socket
165
166
167@pytest.fixture(scope='session')
168async def _userver_capture_server(
169 _userver_capture_control: CaptureControl,
170 _userver_log_capture_socket,
171 loop,
172):
173 async with _userver_capture_control.start_server(
174 sock=_userver_log_capture_socket, loop=loop,
175 ) as server:
176 yield server
177
178
179@pytest.fixture(scope='session')
180def _userver_config_logs_capture(_userver_log_capture_socket):
181 def patch_config(config, _config_vars) -> None:
182 sockname = _userver_log_capture_socket.getsockname()
183 logging_config = config['components_manager']['components']['logging']
184 default_logger = logging_config['loggers']['default']
185 # Other formats are not yet supported by log-capture.
186 default_logger['format'] = 'tskv'
187 default_logger['testsuite-capture'] = {
188 'host': sockname[0],
189 'port': sockname[1],
190 }
191
192 return patch_config
193
194
195def _match_entry(row: tskv.TskvRow, query) -> bool:
196 for key, value in query.items():
197 if row.get(key) != value:
198 return False
199 return True