10logger = logging.getLogger(__name__)
16@dataclasses.dataclass(frozen=True)
19 Class that holds a host and port.
21 @ingroup userver_testsuite
28@dataclasses.dataclass(frozen=False)
31 Class that holds all the info for health checks.
33 @ingroup userver_testsuite
36 tcp: typing.List[HostPort] = dataclasses.field(default_factory=list)
39async def _check_tcp_port_availability(tcp: HostPort) -> bool:
41 _, writer = await asyncio.open_connection(tcp.host, tcp.port)
43 await writer.wait_closed()
44 except (OSError, asyncio.TimeoutError)
as exc:
45 logger.debug(
'TCP %s:%s is not available: %s', tcp.host, tcp.port, exc)
52 Checks availability for each provided entry.
54 @ingroup userver_testsuite
57 done, pending = await asyncio.wait(
59 asyncio.Task(_check_tcp_port_availability(val))
63 return_when=asyncio.ALL_COMPLETED,
72 return not pending
and all(task.result()
for task
in done)
77 Returns a health checks info that for server.listener, grpc-server.port
78 and server.listener-monitor.
80 @see pytest_userver.plugins.base.service_non_http_health_checks()
82 @ingroup userver_testsuite
86 components = service_config_yaml[
'components_manager'][
'components']
87 server = components.get(
'server')
89 port = int(server.get(
'listener-monitor', {}).get(
'port', 0))
91 checks.tcp.append(
HostPort(
'localhost', port))
93 port = int(server.get(
'listener', {}).get(
'port', 0))
95 checks.tcp.append(
HostPort(
'localhost', port))
97 port = int(components.get(
'grpc-server', {}).get(
'port', 0))
99 checks.tcp.append(
HostPort(
'localhost', port))