userver: samples/testsuite-support/tests/test_testpoint.py
Loading...
Searching...
No Matches
samples/testsuite-support/tests/test_testpoint.py
1import pytest
2
4
5
6# /// [Testpoint - fixture]
7async def test_basic(service_client, testpoint):
8 @testpoint('simple-testpoint')
9 def simple_testpoint(data):
10 assert data == {'payload': 'Hello, world!'}
11
12 response = await service_client.get('/testpoint')
13 assert response.status == 200
14 assert simple_testpoint.times_called == 1
15 # /// [Testpoint - fixture]
16
17
18# /// [Sample TESTPOINT_CALLBACK usage python]
19async def test_injection(service_client, testpoint):
20 @testpoint('injection-point')
21 def injection_point(data):
22 return {'value': 'injected'}
23
24 response = await service_client.get('/testpoint')
25 assert response.status == 200
26 assert response.json() == {'value': 'injected'}
27
28 # testpoint supports callqueue interface
29 assert injection_point.times_called == 1
30 # /// [Sample TESTPOINT_CALLBACK usage python]
31
32
33async def test_disabled_testpoint(service_client, testpoint):
34 response = await service_client.get('/testpoint')
35 assert response.status == 200
36
37 @testpoint('injection-point')
38 def injection_point(data):
39 return {'value': 'injected'}
40
41 # /// [Unregistered testpoint usage]
42 with pytest.raises(
44 ):
45 assert injection_point.times_called == 0
46 # /// [Unregistered testpoint usage]
47
48 await service_client.update_server_state()
49 assert injection_point.times_called == 0
50
51
52async def test_manual_testpoint_registration(service_client, testpoint):
53 # /// [Manual registration]
54 @testpoint('injection-point')
55 def injection_point(data):
56 return {'value': 'injected'}
57
58 await service_client.update_server_state()
59 assert injection_point.times_called == 0
60 # /// [Manual registration]
61
62 response = await service_client.get('/testpoint')
63 assert response.status == 200
64
65 assert injection_point.times_called == 1