1
2import socket
3
4
5async def test_basic(service_client, loop, tcp_service_port):
6 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
7 sock.connect(('localhost', tcp_service_port))
8
9 await loop.sock_sendall(sock, b'hi')
10 hello = await loop.sock_recv(sock, 5)
11 assert hello == b'hello'
12
13 await loop.sock_sendall(sock, b'whats up?')
14 try:
15 await loop.sock_recv(sock, 1)
16 assert False
17 except ConnectionResetError:
18 pass
19