1import ydb
as ydb_native_client
5 def __init__(self, endpoint, database):
10 def execute(self, query):
11 return self.
_session.transaction().execute(query, commit_tx=
True)
22 def _init_driver(endpoint, database):
23 config = ydb_native_client.DriverConfig(
28 driver = ydb_native_client.Driver(config)
29 driver.wait(timeout=30)
33def _prepare_column(column, version=None):
35 if version
is None or version == 1:
36 column_type = ydb_native_client.OptionalType(
37 getattr(ydb_native_client.PrimitiveType, column[
'type']),
39 elif column[
'type'][-1] ==
'?':
40 column_type = ydb_native_client.OptionalType(
41 getattr(ydb_native_client.PrimitiveType, column[
'type'][:-1]),
44 column_type = getattr(ydb_native_client.PrimitiveType, column[
'type'])
46 return ydb_native_client.Column(column[
'name'], column_type)
49def _prepare_index(index):
50 return ydb_native_client.TableIndex(index[
'name']).with_index_columns(
51 *tuple(index[
'index_columns']),
55def create_table(client, schema):
56 version = schema.get(
'syntax_version',
None)
57 client.session.create_table(
58 '/{}/{}'.format(client.database, schema[
'path']),
59 ydb_native_client.TableDescription()
60 .with_primary_keys(*schema[
'primary_key'])
61 .with_columns(*[_prepare_column(column, version)
for column
in schema[
'schema']])
62 .with_indexes(*[_prepare_index(index)
for index
in schema.get(
'indexes', [])]),
66def drop_table(client, table):
68 client.session.drop_table(
'/{}/{}'.format(client.database, table))