userver: samples/testsuite-support/tests/test_testpoint.py
Loading...
Searching...
No Matches
samples/testsuite-support/tests/test_testpoint.py
1import pytest
3
4
5# /// [Testpoint - fixture]
6async def test_basic(service_client, testpoint):
7 @testpoint('simple-testpoint')
8 def simple_testpoint(data):
9 assert data == {'payload': 'Hello, world!'}
10
11 response = await service_client.get('/testpoint')
12 assert response.status == 200
13 assert simple_testpoint.times_called == 1
14 # /// [Testpoint - fixture]
15
16
17# /// [Sample TESTPOINT_CALLBACK usage python]
18async def test_injection(service_client, testpoint):
19 @testpoint('injection-point')
20 def injection_point(data):
21 return {'value': 'injected'}
22
23 response = await service_client.get('/testpoint')
24 assert response.status == 200
25 assert response.json() == {'value': 'injected'}
26
27 # testpoint supports callqueue interface
28 assert injection_point.times_called == 1
29 # /// [Sample TESTPOINT_CALLBACK usage python]
30
31
32async def test_disabled_testpoint(service_client, testpoint):
33 response = await service_client.get('/testpoint')
34 assert response.status == 200
35
36 @testpoint('injection-point')
37 def injection_point(data):
38 return {'value': 'injected'}
39
40 # /// [Unregistered testpoint usage]
41 with pytest.raises(
43 ):
44 assert injection_point.times_called == 0
45 # /// [Unregistered testpoint usage]
46
47 await service_client.update_server_state()
48 assert injection_point.times_called == 0
49
50
51async def test_manual_testpoint_registration(service_client, testpoint):
52 # /// [Manual registration]
53 @testpoint('injection-point')
54 def injection_point(data):
55 return {'value': 'injected'}
56
57 await service_client.update_server_state()
58 assert injection_point.times_called == 0
59 # /// [Manual registration]
60
61 response = await service_client.get('/testpoint')
62 assert response.status == 200
63
64 assert injection_point.times_called == 1