userver: /data/code/service_template/third_party/userver/testsuite/pytest_plugins/pytest_userver/plugins/log_capture.py Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
log_capture.py
1"""
2Capture and work with logs.
3"""
4
5# pylint: disable=redefined-outer-name
6import asyncio
7import contextlib
8import sys
9import typing
10
11import pytest
12
13from testsuite.utils import callinfo
14from testsuite.utils import compat
15from testsuite.utils import net as net_utils
16
17from ..utils import tskv
18
19USERVER_CONFIG_HOOKS = ['_userver_config_logs_capture']
20DEFAULT_PORT = 2211
21
22
24 def __init__(self) -> None:
25 self._logs: typing.List[tskv.TskvRow] = []
26 self._subscribers: typing.List = []
27
28 async def publish(self, row: tskv.TskvRow) -> None:
29 self._logs.append(row)
30 for query, callback in self._subscribers:
31 if _match_entry(row, query):
32 await callback(**row)
33
34 def select(self, **query) -> typing.List[tskv.TskvRow]:
35 result = []
36 for row in self._logs:
37 if _match_entry(row, query):
38 result.append(row)
39 return result
40
41 def subscribe(self, **query):
42 def decorator(func):
43 decorated = callinfo.acallqueue(func)
44 self._subscribers.append((query, decorated))
45 return decorated
46
47 return decorator
48
49
51 def __init__(self):
52 self._capture: typing.Optional[CapturedLogs] = None
53 self._tasks = []
54
55 @compat.asynccontextmanager
56 async def start_capture(self, *, timeout=10.0):
57 if self._capture:
58 yield self._capture
59 return
60 self._capture = CapturedLogs()
61 try:
62 yield self._capture
63 finally:
64 self._capture = None
65 if self._tasks:
66 _, pending = await asyncio.wait(self._tasks, timeout=timeout)
67 self._tasks = []
68 if pending:
69 raise RuntimeError(
70 'Timedout while waiting for capture task to finish',
71 )
72
73 @compat.asynccontextmanager
74 async def start_server(self, *, sock, loop=None):
75 extra = {}
76 if sys.version_info < (3, 8):
77 extra['loop'] = loop
78 server = await asyncio.start_server(
79 self._handle_client_handle_client, sock=sock, **extra,
80 )
81 try:
82 yield server
83 finally:
84 server.close()
85 await server.wait_closed()
86
87 async def _handle_client(self, reader, writer):
88 async def log_reader():
89 with contextlib.closing(writer):
90 async for line in reader:
91 if self._capture:
92 row = tskv.parse_line(line.decode('utf-8'))
93 await self._capture.publish(row)
94
95 self._tasks.append(asyncio.create_task(log_reader()))
96
97
98def pytest_addoption(parser):
99 group = parser.getgroup('logs-capture')
100 group.addoption(
101 '--logs-capture-port',
102 type=int,
103 default=0,
104 help='Port to bind logs-capture server to.',
105 )
106 group.addoption(
107 '--logs-capture-host',
108 default='localhost',
109 help='Host to bind logs-capture server to.',
110 )
111
112
113@pytest.fixture(scope='session')
114def userver_log_capture(_userver_capture_control, _userver_capture_server):
115 return _userver_capture_control
116
117
118@pytest.fixture(scope='session')
119def _userver_capture_control():
120 return CaptureControl()
121
122
123@pytest.fixture(scope='session')
124def _userver_log_capture_socket(pytestconfig):
125 host = pytestconfig.option.logs_capture_host
126 port = pytestconfig.option.logs_capture_port
127 if pytestconfig.option.service_wait or pytestconfig.option.service_disable:
128 port = port or DEFAULT_PORT
129 with net_utils.bind_socket(host, port) as socket:
130 yield socket
131
132
133@pytest.fixture(scope='session')
134async def _userver_capture_server(
135 _userver_capture_control: CaptureControl,
136 _userver_log_capture_socket,
137 loop,
138):
139 async with _userver_capture_control.start_server(
140 sock=_userver_log_capture_socket, loop=loop,
141 ) as server:
142 yield server
143
144
145@pytest.fixture(scope='session')
146def _userver_config_logs_capture(_userver_log_capture_socket):
147 def patch_config(config, _config_vars) -> None:
148 sockname = _userver_log_capture_socket.getsockname()
149 logging_config = config['components_manager']['components']['logging']
150 default_logger = logging_config['loggers']['default']
151 # Other formats are not yet supported by log-capture.
152 default_logger['format'] = 'tskv'
153 default_logger['testsuite-capture'] = {
154 'host': sockname[0],
155 'port': sockname[1],
156 }
157
158 return patch_config
159
160
161def _match_entry(row: tskv.TskvRow, query) -> bool:
162 for key, value in query.items():
163 if row.get(key) != value:
164 return False
165 return True