userver: /data/code/service_template/third_party/userver/testsuite/pytest_plugins/pytest_userver/utils/net.py Source File
Loading...
Searching...
No Matches
net.py
1# pylint: disable=no-member
2import asyncio
3import dataclasses
4import logging
5import typing
6
7# @cond
8
9
10logger = logging.getLogger(__name__)
11
12
13# @endcond
14
15
16@dataclasses.dataclass(frozen=True)
18 """
19 Class that holds a host and port.
20
21 @ingroup userver_testsuite
22 """
23
24 host: str
25 port: int
26
27
28@dataclasses.dataclass(frozen=False)
30 """
31 Class that holds all the info for health checks.
32
33 @ingroup userver_testsuite
34 """
35
36 tcp: typing.List[HostPort] = dataclasses.field(default_factory=list)
37
38
39async def _check_tcp_port_availability(tcp: HostPort) -> bool:
40 try:
41 _, writer = await asyncio.open_connection(tcp.host, tcp.port)
42 writer.close()
43 await writer.wait_closed()
44 except (OSError, asyncio.TimeoutError) as exc:
45 logger.debug('TCP %s:%s is not available: %s', tcp.host, tcp.port, exc)
46 return False
47 return True
48
49
50async def check_availability(checks: HealthChecks) -> bool:
51 """
52 Checks availability for each provided entry.
53
54 @ingroup userver_testsuite
55 """
56 assert checks.tcp
57 done, pending = await asyncio.wait(
58 [
59 asyncio.Task(_check_tcp_port_availability(val))
60 for val in checks.tcp
61 ],
62 timeout=25.0,
63 return_when=asyncio.ALL_COMPLETED,
64 )
65
66 for task in pending:
67 task.cancel()
68
69 for task in pending:
70 await task
71
72 return not pending and all(task.result() for task in done)
73
74
75def get_health_checks_info(service_config_yaml: dict) -> HealthChecks:
76 """
77 Returns a health checks info that for server.listener, grpc-server.port
78 and server.listener-monitor.
79
80 @see pytest_userver.plugins.base.service_non_http_health_checks()
81
82 @ingroup userver_testsuite
83 """
84 checks = HealthChecks()
85
86 components = service_config_yaml['components_manager']['components']
87 server = components.get('server')
88 if server:
89 port = int(server.get('listener-monitor', {}).get('port', 0))
90 if port:
91 checks.tcp.append(HostPort('localhost', port))
92
93 port = int(server.get('listener', {}).get('port', 0))
94 if port:
95 checks.tcp.append(HostPort('localhost', port))
96
97 port = int(components.get('grpc-server', {}).get('port', 0))
98 if port:
99 checks.tcp.append(HostPort('localhost', port))
100
101 return checks