userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/log_capture.py Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
log_capture.py
1"""
2Capture and work with logs.
3"""
4
5# pylint: disable=redefined-outer-name
6import asyncio
7import contextlib
8import enum
9import sys
10import typing
11
12import pytest
13
14from testsuite.utils import callinfo
15from testsuite.utils import compat
16from testsuite.utils import net as net_utils
17
18from ..utils import tskv
19
20USERVER_CONFIG_HOOKS = ['_userver_config_logs_capture']
21DEFAULT_PORT = 2211
22
23
24class LogLevel(enum.Enum):
25 TRACE = 0
26 DEBUG = 1
27 INFO = 2
28 WARNING = 3
29 ERROR = 4
30 CRITICAL = 5
31 NONE = 6
32
33 @classmethod
34 def from_string(cls, level: str) -> 'LogLevel':
35 return cls[level.upper()]
36
37
39 def __init__(self, *, log_level: str) -> None:
40 self._log_level = LogLevel.from_string(log_level)
41 self._logs: typing.List[tskv.TskvRow] = []
42 self._subscribers: typing.List = []
43
44 async def publish(self, row: tskv.TskvRow) -> None:
45 self._logs.append(row)
46 for query, callback in self._subscribers:
47 if _match_entry(row, query):
48 await callback(**row)
49
50 def select(self, **query) -> typing.List[tskv.TskvRow]:
51 level = query.get('level')
52 if level:
53 log_level = LogLevel[level]
54 if log_level.value < self._log_level.value:
55 raise RuntimeError(
56 f'Requested log level={log_level.name} is lower than '
57 f'service log level {self._log_level.name}',
58 )
59 result = []
60 for row in self._logs:
61 if _match_entry(row, query):
62 result.append(row)
63 return result
64
65 def subscribe(self, **query):
66 def decorator(func):
67 decorated = callinfo.acallqueue(func)
68 self._subscribers.append((query, decorated))
69 return decorated
70
71 return decorator
72
73
75 def __init__(self, *, log_level: str):
76 self.default_log_level = log_level
77 self._capture: typing.Optional[CapturedLogs] = None
78 self._tasks = []
79
80 @compat.asynccontextmanager
81 async def start_capture(
82 self,
83 *,
84 log_level: typing.Optional[str] = None,
85 timeout: float = 10.0,
86 ):
87 if self._capture:
88 yield self._capture
89 return
90
91 if not log_level:
92 log_level = self.default_log_level
93
94 self._capture = CapturedLogs(log_level=log_level)
95 try:
96 yield self._capture
97 finally:
98 self._capture = None
99 if self._tasks:
100 _, pending = await asyncio.wait(self._tasks, timeout=timeout)
101 self._tasks = []
102 if pending:
103 raise RuntimeError(
104 'Timedout while waiting for capture task to finish',
105 )
106
107 @compat.asynccontextmanager
108 async def start_server(self, *, sock, loop=None):
109 extra = {}
110 if sys.version_info < (3, 8):
111 extra['loop'] = loop
112 server = await asyncio.start_server(
113 self._handle_client_handle_client, sock=sock, **extra,
114 )
115 try:
116 yield server
117 finally:
118 server.close()
119 await server.wait_closed()
120
121 async def _handle_client(self, reader, writer):
122 async def log_reader():
123 with contextlib.closing(writer):
124 async for line in reader:
125 if self._capture:
126 row = tskv.parse_line(line.decode('utf-8'))
127 await self._capture.publish(row)
128
129 self._tasks.append(asyncio.create_task(log_reader()))
130
131
132def pytest_addoption(parser):
133 group = parser.getgroup('logs-capture')
134 group.addoption(
135 '--logs-capture-port',
136 type=int,
137 default=0,
138 help='Port to bind logs-capture server to.',
139 )
140 group.addoption(
141 '--logs-capture-host',
142 default='localhost',
143 help='Host to bind logs-capture server to.',
144 )
145
146
147@pytest.fixture(scope='session')
148def userver_log_capture(_userver_capture_control, _userver_capture_server):
149 return _userver_capture_control
150
151
152@pytest.fixture(scope='session')
153def _userver_capture_control(userver_log_level):
154 return CaptureControl(log_level=userver_log_level)
155
156
157@pytest.fixture(scope='session')
158def _userver_log_capture_socket(pytestconfig):
159 host = pytestconfig.option.logs_capture_host
160 port = pytestconfig.option.logs_capture_port
161 if pytestconfig.option.service_wait or pytestconfig.option.service_disable:
162 port = port or DEFAULT_PORT
163 with net_utils.bind_socket(host, port) as socket:
164 yield socket
165
166
167@pytest.fixture(scope='session')
168async def _userver_capture_server(
169 _userver_capture_control: CaptureControl,
170 _userver_log_capture_socket,
171 loop,
172):
173 async with _userver_capture_control.start_server(
174 sock=_userver_log_capture_socket, loop=loop,
175 ) as server:
176 yield server
177
178
179@pytest.fixture(scope='session')
180def _userver_config_logs_capture(_userver_log_capture_socket):
181 def patch_config(config, _config_vars) -> None:
182 sockname = _userver_log_capture_socket.getsockname()
183 logging_config = config['components_manager']['components']['logging']
184 default_logger = logging_config['loggers']['default']
185 # Other formats are not yet supported by log-capture.
186 default_logger['format'] = 'tskv'
187 default_logger['testsuite-capture'] = {
188 'host': sockname[0],
189 'port': sockname[1],
190 }
191
192 return patch_config
193
194
195def _match_entry(row: tskv.TskvRow, query) -> bool:
196 for key, value in query.items():
197 if row.get(key) != value:
198 return False
199 return True