10logger = logging.getLogger(__name__)
 
   16@dataclasses.dataclass(frozen=True) 
   19    Class that holds a host and port. 
   21    @ingroup userver_testsuite 
   28@dataclasses.dataclass(frozen=False) 
 
   31    Class that holds all the info for health checks. 
   33    @ingroup userver_testsuite 
   36    tcp: typing.List[HostPort] = dataclasses.field(default_factory=list)
 
 
   39async def _check_tcp_port_availability(tcp: HostPort) -> bool:
 
   41        _, writer = await asyncio.open_connection(tcp.host, tcp.port)
 
   43        await writer.wait_closed()
 
   44    except (OSError, asyncio.TimeoutError) 
as exc:
 
   45        logger.debug(
'TCP %s:%s is not available: %s', tcp.host, tcp.port, exc)
 
   52    Checks availability for each provided entry. 
   54    @ingroup userver_testsuite 
   57    done, pending = await asyncio.wait(
 
   59            asyncio.Task(_check_tcp_port_availability(val))
 
   63        return_when=asyncio.ALL_COMPLETED,
 
   72    return not pending 
and all(task.result() 
for task 
in done)
 
   77    Returns a health checks info that for server.listener, grpc-server.port 
   78    and server.listener-monitor. 
   80    @see pytest_userver.plugins.base.service_non_http_health_checks() 
   82    @ingroup userver_testsuite 
   86    components = service_config_yaml[
'components_manager'][
'components']
 
   87    server = components.get(
'server')
 
   89        port = int(server.get(
'listener-monitor', {}).get(
'port', 0))
 
   91            checks.tcp.append(
HostPort(
'localhost', port))
 
   93        port = int(server.get(
'listener', {}).get(
'port', 0))
 
   95            checks.tcp.append(
HostPort(
'localhost', port))
 
   97    port = int(components.get(
'grpc-server', {}).get(
'port', 0))
 
   99        checks.tcp.append(
HostPort(
'localhost', port))