userver: samples/testsuite-support/tests/test_testpoint.py
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
samples/testsuite-support/tests/test_testpoint.py
1import pytest
3
4
5# /// [Testpoint - fixture]
6async def test_basic(service_client, testpoint):
7 @testpoint('simple-testpoint')
8 def simple_testpoint(data):
9 assert data == {'payload': 'Hello, world!'}
10
11 response = await service_client.get('/testpoint')
12 assert response.status == 200
13 assert simple_testpoint.times_called == 1
14 # /// [Testpoint - fixture]
15
16
17# /// [Sample TESTPOINT_CALLBACK usage python]
18async def test_injection(service_client, testpoint):
19 @testpoint('injection-point')
20 def injection_point(data):
21 return {'value': 'injected'}
22
23 response = await service_client.get('/testpoint')
24 assert response.status == 200
25 assert response.json() == {'value': 'injected'}
26
27 # testpoint supports callqueue interface
28 assert injection_point.times_called == 1
29 # /// [Sample TESTPOINT_CALLBACK usage python]
30
31
32async def test_disabled_testpoint(service_client, testpoint):
33 response = await service_client.get('/testpoint')
34 assert response.status == 200
35
36 @testpoint('injection-point')
37 def injection_point(data):
38 return {'value': 'injected'}
39
40 # /// [Unregistered testpoint usage]
41 with pytest.raises(
43 ):
44 assert injection_point.times_called == 0
45 # /// [Unregistered testpoint usage]
46
47 await service_client.update_server_state()
48 assert injection_point.times_called == 0
49
50
51async def test_manual_testpoint_registration(service_client, testpoint):
52 # /// [Manual registration]
53 @testpoint('injection-point')
54 def injection_point(data):
55 return {'value': 'injected'}
56
57 await service_client.update_server_state()
58 assert injection_point.times_called == 0
59 # /// [Manual registration]
60
61 response = await service_client.get('/testpoint')
62 assert response.status == 200
63
64 assert injection_point.times_called == 1