userver: samples/websocket_client/tests/test_websocket_client.py
Loading...
Searching...
No Matches
samples/websocket_client/tests/test_websocket_client.py
1import aiohttp.web
2
3
4async def test_websocket_client(service_client, mockserver):
5 """Test WebSocket client connecting to echo server"""
6
7 @mockserver.aiohttp_handler('/chat')
8 async def _mock_websocket(request):
9 ws = aiohttp.web.WebSocketResponse()
10 await ws.prepare(request)
11
12 async for msg in ws:
13 if msg.type == aiohttp.WSMsgType.TEXT:
14 await ws.send_str(msg.data)
15
16 return ws
17
18 response = await service_client.get(
19 '/ws-client',
20 params={
21 'url': mockserver.ws_url('/chat'),
22 'message': 'test message',
23 },
24 )
25 assert response.status == 200
26 assert response.text == 'test message'