1import ydb
as ydb_native_client
5 def __init__(self, endpoint, database):
10 def execute(self, query):
11 return self.
_session.transaction().execute(query, commit_tx=
True)
22 def _init_driver(endpoint, database):
23 config = ydb_native_client.DriverConfig(
24 endpoint=endpoint, database=database, auth_token=
'',
26 driver = ydb_native_client.Driver(config)
27 driver.wait(timeout=30)
31def _prepare_column(column, version=None):
33 if version
is None or version == 1:
34 column_type = ydb_native_client.OptionalType(
35 getattr(ydb_native_client.PrimitiveType, column[
'type']),
37 elif column[
'type'][-1] ==
'?':
38 column_type = ydb_native_client.OptionalType(
39 getattr(ydb_native_client.PrimitiveType, column[
'type'][:-1]),
42 column_type = getattr(ydb_native_client.PrimitiveType, column[
'type'])
44 return ydb_native_client.Column(column[
'name'], column_type)
47def _prepare_index(index):
48 return ydb_native_client.TableIndex(index[
'name']).with_index_columns(
49 *tuple(index[
'index_columns']),
53def create_table(client, schema):
54 version = schema.get(
'syntax_version',
None)
55 client.session.create_table(
56 '/{}/{}'.format(client.database, schema[
'path']),
57 ydb_native_client.TableDescription()
58 .with_primary_keys(*schema[
'primary_key'])
60 *[_prepare_column(column, version)
for column
in schema[
'schema']],
63 *[_prepare_index(index)
for index
in schema.get(
'indexes', [])],
68def drop_table(client, table):
70 client.session.drop_table(
'/{}/{}'.format(client.database, table))