1import ydb
as ydb_native_client
5 """YDB Client implementation."""
7 def __init__(self, endpoint, database):
12 def execute(self, query):
13 return self.
_session.transaction().execute(query, commit_tx=
True)
24 def _init_driver(endpoint, database):
25 config = ydb_native_client.DriverConfig(
30 driver = ydb_native_client.Driver(config)
31 driver.wait(timeout=30)
35def _prepare_column(column, version=None):
37 if version
is None or version == 1:
38 column_type = ydb_native_client.OptionalType(
39 getattr(ydb_native_client.PrimitiveType, column[
'type']),
41 elif column[
'type'][-1] ==
'?':
42 column_type = ydb_native_client.OptionalType(
43 getattr(ydb_native_client.PrimitiveType, column[
'type'][:-1]),
46 column_type = getattr(ydb_native_client.PrimitiveType, column[
'type'])
48 return ydb_native_client.Column(column[
'name'], column_type)
51def _prepare_index(index):
52 return ydb_native_client.TableIndex(index[
'name']).with_index_columns(
53 *tuple(index[
'index_columns']),
57def create_table(client, schema):
58 version = schema.get(
'syntax_version',
None)
59 client.session.create_table(
60 '/{}/{}'.format(client.database, schema[
'path']),
61 ydb_native_client.TableDescription()
62 .with_primary_keys(*schema[
'primary_key'])
63 .with_columns(*[_prepare_column(column, version)
for column
in schema[
'schema']])
64 .with_indexes(*[_prepare_index(index)
for index
in schema.get(
'indexes', [])]),
68def drop_table(client, table):
70 client.session.drop_table(
'/{}/{}'.format(client.database, table))