2Capture and work with logs. 
   13from testsuite.utils 
import callinfo
 
   14from testsuite.utils 
import compat
 
   15from testsuite.utils 
import net 
as net_utils
 
   17from ..utils 
import tskv
 
   19USERVER_CONFIG_HOOKS = [
'_userver_config_logs_capture']
 
   24    def __init__(self) -> None:
 
   25        self._logs: typing.List[tskv.TskvRow] = []
 
   26        self._subscribers: typing.List = []
 
   28    async def publish(self, row: tskv.TskvRow) -> 
None:
 
   29        self._logs.append(row)
 
   30        for query, callback 
in self._subscribers:
 
   31            if _match_entry(row, query):
 
   34    def select(self, **query) -> typing.List[tskv.TskvRow]:
 
   36        for row 
in self._logs:
 
   37            if _match_entry(row, query):
 
   41    def subscribe(self, **query):
 
   43            decorated = callinfo.acallqueue(func)
 
   44            self._subscribers.append((query, decorated))
 
 
   52        self.
_capture: typing.Optional[CapturedLogs] = 
None 
   55    @compat.asynccontextmanager 
   56    async def start_capture(self, *, timeout=10.0):
 
   66                _, pending = await asyncio.wait(self.
_tasks, timeout=timeout)
 
   70                        'Timedout while waiting for capture task to finish',
 
   73    @compat.asynccontextmanager 
   74    async def start_server(self, *, sock, loop=None):
 
   76        if sys.version_info < (3, 8):
 
   78        server = await asyncio.start_server(
 
   85            await server.wait_closed()
 
   87    async def _handle_client(self, reader, writer):
 
   88        async def log_reader():
 
   89            with contextlib.closing(writer):
 
   90                async for line 
in reader:
 
   92                        row = tskv.parse_line(line.decode(
'utf-8'))
 
   95        self.
_tasks.append(asyncio.create_task(log_reader()))
 
 
   98def pytest_addoption(parser):
 
   99    group = parser.getgroup(
'logs-capture')
 
  101        '--logs-capture-port',
 
  104        help=
'Port to bind logs-capture server to.',
 
  107        '--logs-capture-host',
 
  109        help=
'Host to bind logs-capture server to.',
 
  113@pytest.fixture(scope='session') 
  114def userver_log_capture(_userver_capture_control, _userver_capture_server):
 
  115    return _userver_capture_control
 
  118@pytest.fixture(scope='session') 
  119def _userver_capture_control():
 
  123@pytest.fixture(scope='session') 
  124def _userver_log_capture_socket(pytestconfig):
 
  125    host = pytestconfig.option.logs_capture_host
 
  126    port = pytestconfig.option.logs_capture_port
 
  127    if pytestconfig.option.service_wait 
or pytestconfig.option.service_disable:
 
  128        port = port 
or DEFAULT_PORT
 
  129    with net_utils.bind_socket(host, port) 
as socket:
 
  133@pytest.fixture(scope='session') 
  134async def _userver_capture_server(
 
  135        _userver_capture_control: CaptureControl,
 
  136        _userver_log_capture_socket,
 
  139    async with _userver_capture_control.start_server(
 
  140            sock=_userver_log_capture_socket, loop=loop,
 
  145@pytest.fixture(scope='session') 
  146def _userver_config_logs_capture(_userver_log_capture_socket):
 
  147    def patch_config(config, _config_vars) -> None:
 
  148        sockname = _userver_log_capture_socket.getsockname()
 
  149        logging_config = config[
'components_manager'][
'components'][
'logging']
 
  150        default_logger = logging_config[
'loggers'][
'default']
 
  152        default_logger[
'format'] = 
'tskv' 
  153        default_logger[
'testsuite-capture'] = {
 
  161def _match_entry(row: tskv.TskvRow, query) -> bool:
 
  162    for key, value 
in query.items():
 
  163        if row.get(key) != value: