userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/ydb/client.py Source File
Loading...
Searching...
No Matches
client.py
1import ydb as ydb_native_client
2
3
5 """YDB Client implementation."""
6
7 def __init__(self, endpoint, database):
8 self._driver = self._init_driver(endpoint, database)
9 self._database = database
10 self._session = self._driver.table_client.session().create()
11
12 def execute(self, query):
13 return self._session.transaction().execute(query, commit_tx=True)
14
15 @property
16 def topic_client(self):
17 return self._driver.topic_client
18
19 @property
20 def session(self):
21 return self._session
22
23 @property
24 def database(self):
25 return self._database
26
27 @staticmethod
28 def _init_driver(endpoint, database):
29 config = ydb_native_client.DriverConfig(
30 endpoint=endpoint,
31 database=database,
32 auth_token='',
33 )
34 driver = ydb_native_client.Driver(config)
35 driver.wait(timeout=30)
36 return driver
37
38
39def _prepare_column(column, version=None):
40 column_type = None
41 if version is None or version == 1:
42 column_type = ydb_native_client.OptionalType(
43 getattr(ydb_native_client.PrimitiveType, column['type']),
44 )
45 elif column['type'][-1] == '?':
46 column_type = ydb_native_client.OptionalType(
47 getattr(ydb_native_client.PrimitiveType, column['type'][:-1]),
48 )
49 else:
50 column_type = getattr(ydb_native_client.PrimitiveType, column['type'])
51
52 return ydb_native_client.Column(column['name'], column_type)
53
54
55def _prepare_index(index):
56 return ydb_native_client.TableIndex(index['name']).with_index_columns(
57 *tuple(index['index_columns']),
58 )
59
60
61def create_table(client, schema):
62 version = schema.get('syntax_version', None)
63 client.session.create_table(
64 '/{}/{}'.format(client.database, schema['path']),
65 ydb_native_client.TableDescription()
66 .with_primary_keys(*schema['primary_key'])
67 .with_columns(*[_prepare_column(column, version) for column in schema['schema']])
68 .with_indexes(*[_prepare_index(index) for index in schema.get('indexes', [])]),
69 )
70
71
72def drop_table(client, table):
73 try:
74 client.session.drop_table('/{}/{}'.format(client.database, table))
75 except: # noqa pylint: disable=bare-except
76 pass