Skip to content

Commit 6f34f4d

Browse files
authored
Add read_files_as_table and make bulk_upload_files atomic (#20)
1 parent 3347240 commit 6f34f4d

File tree

9 files changed

+801
-611
lines changed

9 files changed

+801
-611
lines changed

dbxio/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
from dbxio.utils import * # noqa: F403
55
from dbxio.volume import * # noqa: F403
66

7-
__version__ = '0.4.6' # single source of truth
7+
__version__ = '0.5.0' # single source of truth

dbxio/core/cloud/azure/object_storage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def download_blob_to_file(self, blob_name: str, file_path: Union[str, Path]) ->
7070

7171
def break_lease(self, blob_name: str) -> None:
7272
blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=blob_name)
73-
BlobLeaseClient(client=blob_client).break_lease() # type: ignore
73+
BlobLeaseClient(client=blob_client).break_lease()
7474

7575
def lock_blob(self, blob_name: str, force: bool = False):
7676
blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=blob_name)
@@ -90,7 +90,7 @@ def upload_blob(self, blob_name: str, data: Union[bytes, IOBase, BinaryIO], over
9090

9191
def try_delete_blob(self, blob_name: str) -> None:
9292
blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=blob_name)
93-
lease_client = BlobLeaseClient(client=blob_client) # type: ignore
93+
lease_client = BlobLeaseClient(client=blob_client)
9494

9595
try:
9696
lease_client.break_lease()

dbxio/delta/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
get_comment_on_table,
1010
get_tags_on_table,
1111
merge_table,
12+
read_files_as_table,
1213
read_table,
1314
save_table_to_files,
1415
set_comment_on_table,
@@ -33,6 +34,7 @@
3334
'exists_table',
3435
'infer_schema',
3536
'merge_table',
37+
'read_files_as_table',
3638
'read_table',
3739
'write_table',
3840
'save_table_to_files',

dbxio/delta/table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class Table:
8484
default=Materialization.Table,
8585
validator=attrs.validators.instance_of(Materialization),
8686
)
87-
schema: Optional[Union[dict[str, BaseType], list[dict[str, BaseType]], TableSchema]] = attrs.field(
87+
schema: Optional[TableSchema] = attrs.field(
8888
default=None,
8989
converter=_table_schema_converter,
9090
)

dbxio/delta/table_commands.py

Lines changed: 111 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,39 @@ def exists_table(table: Union[str, Table], client: 'DbxIOClient') -> bool:
3737
return False
3838

3939

40-
def create_table(table: Union[str, Table], client: 'DbxIOClient') -> _FutureBaseResult:
41-
"""
42-
Creates a table in the catalog.
43-
If a table already exists, it does nothing.
44-
Query pattern:
45-
CREATE TABLE IF NOT EXISTS <table_identifier> (col1 type1, col2 type2, ...)
46-
[USING <table_format> LOCATION <location>]
47-
[PARTITIONED BY (col1, col2, ...)]
48-
"""
40+
def _create_table_query(table: Union[str, Table], replace: bool, if_not_exists: bool, include_schema: bool) -> str:
4941
dbxio_table = Table.from_obj(table)
5042

51-
schema_sql = ','.join([f'`{col_name}` {col_type}' for col_name, col_type in dbxio_table.schema.as_dict().items()])
52-
query = f'CREATE TABLE IF NOT EXISTS {dbxio_table.safe_table_identifier} ({schema_sql})'
43+
if include_schema and dbxio_table.schema:
44+
schema_sql = f'({dbxio_table.schema.as_sql()})'
45+
else:
46+
schema_sql = ''
47+
if replace:
48+
query = 'CREATE OR REPLACE TABLE'
49+
else:
50+
query = 'CREATE TABLE'
51+
if if_not_exists:
52+
query += ' IF NOT EXISTS'
53+
query += f' {dbxio_table.safe_table_identifier} {schema_sql}'
5354
if loc := dbxio_table.attributes.location:
5455
query += f" USING {dbxio_table.table_format.name} LOCATION '{loc}'"
5556
if part := dbxio_table.attributes.partitioned_by:
5657
query += f" PARTITIONED BY ({','.join(part)})"
5758

59+
return query
60+
61+
62+
def create_table(table: Union[str, Table], client: 'DbxIOClient', replace: bool = False) -> _FutureBaseResult:
63+
"""
64+
Creates a table in the catalog.
65+
If replace == False: if a table already exists, it does nothing.
66+
If replace == True: create or replace table.
67+
Query pattern:
68+
CREATE [OR REPLACE] TABLE [IF NOT EXISTS] <table_identifier> (col1 type1, col2 type2, ...)
69+
[USING <table_format> LOCATION <location>]
70+
[PARTITIONED BY (col1, col2, ...)]
71+
"""
72+
query = _create_table_query(table, replace, if_not_exists=True, include_schema=True)
5873
return client.sql(query)
5974

6075

@@ -200,6 +215,50 @@ def copy_into_table(
200215
client.sql(sql_copy_into_query).wait()
201216

202217

218+
def read_files_as_table(
219+
client: 'DbxIOClient',
220+
table: Table,
221+
blob_path: str,
222+
table_format: TableFormat,
223+
abs_name: str,
224+
abs_container_name: str,
225+
include_files_pattern: bool = False,
226+
replace: bool = False,
227+
force_schema: bool = True,
228+
) -> None:
229+
"""
230+
Copy data from blob storage as a table. All files that match the pattern *.{table_format} will be copied.
231+
If force_schema == False it will use schemaHints instead of schema option
232+
"""
233+
create_query = _create_table_query(table, replace, if_not_exists=False, include_schema=False)
234+
options = {
235+
'format': f"'{table_format.value.lower()}'",
236+
}
237+
if include_files_pattern:
238+
options['fileNamePattern'] = f"'*.{table_format.value.lower()}'"
239+
if table.schema:
240+
sql_schema = f"'{table.schema.as_sql()}'"
241+
columns_exp = ', '.join(table.schema.columns)
242+
if force_schema:
243+
options['schema'] = sql_schema
244+
else:
245+
options['schemaHints'] = sql_schema
246+
else:
247+
columns_exp = '*'
248+
options['mergeSchema'] = 'true'
249+
250+
options_query = ',\n'.join([f'{k} => {v}' for k, v in options.items()])
251+
select_query = dedent(f"""
252+
AS SELECT {columns_exp}
253+
FROM read_files(
254+
'abfss://{abs_container_name}@{abs_name}.dfs.core.windows.net/{blob_path}',
255+
{options_query}
256+
)
257+
""")
258+
query = ConstDatabricksQuery(f'{create_query} {select_query}')
259+
client.sql(query).wait()
260+
261+
203262
def bulk_write_table(
204263
table: Union[str, Table],
205264
new_records: Union[Iterator[Dict], List[Dict]],
@@ -241,17 +300,24 @@ def bulk_write_table(
241300
retrying=client.retrying,
242301
) as tmp_path:
243302
if not append:
244-
drop_table(dbxio_table, client=client, force=True).wait()
245-
create_table(dbxio_table, client=client).wait()
246-
247-
copy_into_table(
248-
client=client,
249-
table=dbxio_table,
250-
table_format=TableFormat.PARQUET,
251-
blob_path=tmp_path,
252-
abs_name=abs_name,
253-
abs_container_name=abs_container_name,
254-
)
303+
read_files_as_table(
304+
client=client,
305+
table=dbxio_table,
306+
table_format=TableFormat.PARQUET,
307+
blob_path=tmp_path,
308+
abs_name=abs_name,
309+
abs_container_name=abs_container_name,
310+
replace=True,
311+
)
312+
else:
313+
copy_into_table(
314+
client=client,
315+
table=dbxio_table,
316+
table_format=TableFormat.PARQUET,
317+
blob_path=tmp_path,
318+
abs_name=abs_name,
319+
abs_container_name=abs_container_name,
320+
)
255321

256322

257323
def bulk_write_local_files(
@@ -270,7 +336,7 @@ def bulk_write_local_files(
270336
"""
271337
assert table.schema, 'Table schema is required for bulk_write_local_files function'
272338

273-
p = Path(path)
339+
p = Path(path).expanduser()
274340
files = p.glob(f'*.{table_format.value.lower()}') if p.is_dir() else [path]
275341

276342
operation_uuid = str(uuid.uuid4())
@@ -294,21 +360,30 @@ def bulk_write_local_files(
294360
force=force,
295361
)
296362

297-
if not append:
298-
drop_table(table, client=client, force=True).wait()
299-
create_table(table, client=client).wait()
300-
301363
common_blob_path = str(os.path.commonpath(blobs))
302364
include_files_pattern = len(blobs) > 1
303-
copy_into_table(
304-
client=client,
305-
table=table,
306-
table_format=table_format,
307-
blob_path=common_blob_path,
308-
include_files_pattern=include_files_pattern,
309-
abs_name=abs_name,
310-
abs_container_name=abs_container_name,
311-
)
365+
366+
if not append:
367+
read_files_as_table(
368+
client=client,
369+
table=table,
370+
table_format=table_format,
371+
blob_path=common_blob_path,
372+
include_files_pattern=include_files_pattern,
373+
abs_name=abs_name,
374+
abs_container_name=abs_container_name,
375+
replace=True,
376+
)
377+
else:
378+
copy_into_table(
379+
client=client,
380+
table=table,
381+
table_format=table_format,
382+
blob_path=common_blob_path,
383+
include_files_pattern=include_files_pattern,
384+
abs_name=abs_name,
385+
abs_container_name=abs_container_name,
386+
)
312387

313388

314389
def merge_table(

dbxio/delta/table_schema.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,11 @@ def columns(self):
7373
def as_dict(self) -> dict[str, BaseType]:
7474
return {col_spec.name: col_spec.type for col_spec in self._columns}
7575

76+
@cache
77+
def as_sql(self) -> str:
78+
return ', '.join([
79+
f'`{name}` {type_}' for name, type_ in self.as_dict().items()
80+
])
81+
7682
def apply(self, record: dict[str, Any]) -> dict[str, Any]:
7783
return {key: self.as_dict()[key].deserialize(val) for key, val in record.items()}

0 commit comments

Comments
 (0)