1import ydb
as ydb_native_client
5 """YDB Client implementation."""
7 def __init__(self, endpoint, database):
12 def execute(self, query):
13 return self.
_session.transaction().execute(query, commit_tx=
True)
16 def topic_client(self):
17 return self.
_driver.topic_client
28 def _init_driver(endpoint, database):
29 config = ydb_native_client.DriverConfig(
34 driver = ydb_native_client.Driver(config)
35 driver.wait(timeout=30)
39def _prepare_column(column, version=None):
41 if version
is None or version == 1:
42 column_type = ydb_native_client.OptionalType(
43 getattr(ydb_native_client.PrimitiveType, column[
'type']),
45 elif column[
'type'][-1] ==
'?':
46 column_type = ydb_native_client.OptionalType(
47 getattr(ydb_native_client.PrimitiveType, column[
'type'][:-1]),
50 column_type = getattr(ydb_native_client.PrimitiveType, column[
'type'])
52 return ydb_native_client.Column(column[
'name'], column_type)
55def _prepare_index(index):
56 return ydb_native_client.TableIndex(index[
'name']).with_index_columns(
57 *tuple(index[
'index_columns']),
61def create_table(client, schema):
62 version = schema.get(
'syntax_version',
None)
63 client.session.create_table(
64 '/{}/{}'.format(client.database, schema[
'path']),
65 ydb_native_client.TableDescription()
66 .with_primary_keys(*schema[
'primary_key'])
67 .with_columns(*[_prepare_column(column, version)
for column
in schema[
'schema']])
68 .with_indexes(*[_prepare_index(index)
for index
in schema.get(
'indexes', [])]),
72def drop_table(client, table):
74 client.session.drop_table(
'/{}/{}'.format(client.database, table))