userver: /home/antonyzhilin/arcadia/taxi/uservices/userver/testsuite/pytest_plugins/pytest_userver/plugins/ydb/client.py Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
client.py
1import ydb as ydb_native_client
2
3
5 """YDB Client implementation."""
6
7 def __init__(self, endpoint, database):
8 self._driver = self._init_driver(endpoint, database)
9 self._database = database
10 self._session = self._driver.table_client.session().create()
11
12 def execute(self, query):
13 return self._session.transaction().execute(query, commit_tx=True)
14
15 @property
16 def session(self):
17 return self._session
18
19 @property
20 def database(self):
21 return self._database
22
23 @staticmethod
24 def _init_driver(endpoint, database):
25 config = ydb_native_client.DriverConfig(
26 endpoint=endpoint,
27 database=database,
28 auth_token='',
29 )
30 driver = ydb_native_client.Driver(config)
31 driver.wait(timeout=30)
32 return driver
33
34
35def _prepare_column(column, version=None):
36 column_type = None
37 if version is None or version == 1:
38 column_type = ydb_native_client.OptionalType(
39 getattr(ydb_native_client.PrimitiveType, column['type']),
40 )
41 elif column['type'][-1] == '?':
42 column_type = ydb_native_client.OptionalType(
43 getattr(ydb_native_client.PrimitiveType, column['type'][:-1]),
44 )
45 else:
46 column_type = getattr(ydb_native_client.PrimitiveType, column['type'])
47
48 return ydb_native_client.Column(column['name'], column_type)
49
50
51def _prepare_index(index):
52 return ydb_native_client.TableIndex(index['name']).with_index_columns(
53 *tuple(index['index_columns']),
54 )
55
56
57def create_table(client, schema):
58 version = schema.get('syntax_version', None)
59 client.session.create_table(
60 '/{}/{}'.format(client.database, schema['path']),
61 ydb_native_client.TableDescription()
62 .with_primary_keys(*schema['primary_key'])
63 .with_columns(*[_prepare_column(column, version) for column in schema['schema']])
64 .with_indexes(*[_prepare_index(index) for index in schema.get('indexes', [])]),
65 )
66
67
68def drop_table(client, table):
69 try:
70 client.session.drop_table('/{}/{}'.format(client.database, table))
71 except: # noqa pylint: disable=bare-except
72 pass