2Capture and work with logs. 
   14from testsuite.utils 
import callinfo
 
   15from testsuite.utils 
import compat
 
   16from testsuite.utils 
import net 
as net_utils
 
   18from ..utils 
import tskv
 
   20USERVER_CONFIG_HOOKS = [
'_userver_config_logs_capture']
 
   34    def from_string(cls, level: str) -> 
'LogLevel':
 
   35        return cls[level.upper()]
 
 
   39    def __init__(self, *, log_level: str) -> 
None:
 
   40        self.
_log_level = LogLevel.from_string(log_level)
 
   41        self._logs: typing.List[tskv.TskvRow] = []
 
   42        self._subscribers: typing.List = []
 
   44    async def publish(self, row: tskv.TskvRow) -> 
None:
 
   45        self._logs.append(row)
 
   46        for query, callback 
in self._subscribers:
 
   47            if _match_entry(row, query):
 
   50    def select(self, **query) -> typing.List[tskv.TskvRow]:
 
   51        level = query.get(
'level')
 
   53            log_level = LogLevel[level]
 
   56                    f
'Requested log level={log_level.name} is lower than ' 
   57                    f
'service log level {self._log_level.name}',
 
   60        for row 
in self._logs:
 
   61            if _match_entry(row, query):
 
   65    def subscribe(self, **query):
 
   67            decorated = callinfo.acallqueue(func)
 
   68            self._subscribers.append((query, decorated))
 
 
   75    def __init__(self, *, log_level: str):
 
   77        self.
_capture: typing.Optional[CapturedLogs] = 
None 
   80    @compat.asynccontextmanager 
   81    async def start_capture(
 
   84            log_level: typing.Optional[str] = 
None,
 
   85            timeout: float = 10.0,
 
  100                _, pending = await asyncio.wait(self.
_tasks, timeout=timeout)
 
  104                        'Timedout while waiting for capture task to finish',
 
  107    @compat.asynccontextmanager 
  108    async def start_server(self, *, sock, loop=None):
 
  110        if sys.version_info < (3, 8):
 
  112        server = await asyncio.start_server(
 
  119            await server.wait_closed()
 
  121    async def _handle_client(self, reader, writer):
 
  122        async def log_reader():
 
  123            with contextlib.closing(writer):
 
  124                async for line 
in reader:
 
  126                        row = tskv.parse_line(line.decode(
'utf-8'))
 
  129        self.
_tasks.append(asyncio.create_task(log_reader()))
 
 
  132def pytest_addoption(parser):
 
  133    group = parser.getgroup(
'logs-capture')
 
  135        '--logs-capture-port',
 
  138        help=
'Port to bind logs-capture server to.',
 
  141        '--logs-capture-host',
 
  143        help=
'Host to bind logs-capture server to.',
 
  147@pytest.fixture(scope='session') 
  148def userver_log_capture(_userver_capture_control, _userver_capture_server):
 
  149    return _userver_capture_control
 
  152@pytest.fixture(scope='session') 
  153def _userver_capture_control(userver_log_level):
 
  157@pytest.fixture(scope='session') 
  158def _userver_log_capture_socket(pytestconfig):
 
  159    host = pytestconfig.option.logs_capture_host
 
  160    port = pytestconfig.option.logs_capture_port
 
  161    if pytestconfig.option.service_wait 
or pytestconfig.option.service_disable:
 
  162        port = port 
or DEFAULT_PORT
 
  163    with net_utils.bind_socket(host, port) 
as socket:
 
  167@pytest.fixture(scope='session') 
  168async def _userver_capture_server(
 
  169        _userver_capture_control: CaptureControl,
 
  170        _userver_log_capture_socket,
 
  173    async with _userver_capture_control.start_server(
 
  174            sock=_userver_log_capture_socket, loop=loop,
 
  179@pytest.fixture(scope='session') 
  180def _userver_config_logs_capture(_userver_log_capture_socket):
 
  181    def patch_config(config, _config_vars) -> None:
 
  182        sockname = _userver_log_capture_socket.getsockname()
 
  183        logging_config = config[
'components_manager'][
'components'][
'logging']
 
  184        default_logger = logging_config[
'loggers'][
'default']
 
  186        default_logger[
'format'] = 
'tskv' 
  187        default_logger[
'testsuite-capture'] = {
 
  195def _match_entry(row: tskv.TskvRow, query) -> bool:
 
  196    for key, value 
in query.items():
 
  197        if row.get(key) != value: