Skip to content

Latest commit

 

History

History
467 lines (351 loc) · 13.4 KB

README.md

File metadata and controls

467 lines (351 loc) · 13.4 KB

dbxio Tutorial

Prerequisites

  1. Login to Azure using az cli (installation)
az login --scope 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default --use-device-code

This will prompt you with a link and code to open in any external browser.

What is 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default?

This is the default scope for Azure Databricks. It is used to access the Databricks API. You don't need to change it.

Docs

  1. Find your http path and server hostname in Databricks workspace.

    It can be found on your target cluster's page, in section Advanced Options --> JDBC/ODBC

Create dbxio client

There are several ways to create a dbxio client:

  • presetting http_path and server_hostname. In this case access token will be obtained from the environment variable DATABRICKS_ACCESS_TOKEN (if available) or from a credential provider.
from dbxio import DbxIOClient, ClusterType

client = DbxIOClient.from_cluster_settings(
    cluster_type=ClusterType.SQL_WAREHOUSE,
    http_path='<YOUR_HTTP_PATH>',
    server_hostname='<YOUR_SERVER_HOSTNAME>',
)
  • using default clients for SQL Warehouse and all-purpose cluster. Can be useful if http path and server hostname are stored in environment variables.
import os
from dbxio import DefaultSqlDbxIOClient, DefaultDbxIOClient

os.environ['DATABRICKS_HTTP_PATH'] = '<YOUR_HTTP_PATH>'
os.environ['DATABRICKS_SERVER_HOSTNAME'] = '<YOUR_SERVER_HOSTNAME>'

sql_client = DefaultSqlDbxIOClient()

all_purpose_client = DefaultDbxIOClient()
  • using PAT token
from dbxio import DbxIOClient, BareAuthProvider, ClusterType

client = DbxIOClient.from_auth_provider(
    auth_provider=BareAuthProvider(
        access_token='dapixxxxxx-xxxxx-xxxxxx-x',
        http_path='<YOUR_HTTP_PATH>',
        server_hostname='<YOUR_SERVER_HOSTNAME>',
        cluster_type=ClusterType.SQL_WAREHOUSE,
    )
)

Specify cloud provider

dbxio supports (or will support in the future, see available cloud providers) cloud providers supported by Databricks.

To specify the cloud provider, pass settings to the client:

import dbxio

client = dbxio.DbxIOClient.from_cluster_settings(
    cluster_type=dbxio.ClusterType.SQL_WAREHOUSE,
    http_path='<YOUR_HTTP_PATH>',
    server_hostname='<YOUR_SERVER_HOSTNAME>',
    settings=dbxio.Settings(cloud_provider=dbxio.CloudProvider.AZURE),
)

Explicitly specify credential provider

Credential provider is resolved automatically based on the cloud provider. But you can specify it explicitly when creating a client.

import dbxio
from azure.identity import AzureCliCredential

client = dbxio.DbxIOClient.from_cluster_settings(
    # ...,
    az_cred_provider=AzureCliCredential(),
    # ...,
)

Customize retries settings

Sometimes it's vital to retry some exceptions based on your setup. You can customize the retry settings by passing them into the settings object.

import dbxio

settings = dbxio.Settings(
    cloud_provider=dbxio.CloudProvider.AZURE,
    retry_config=dbxio.RetryConfig(
        max_attempts=20,
        exponential_backoff_multiplier=1.5,
        extra_exceptions_to_retry=(MyCustomException,),
    ),
)

client = dbxio.DbxIOClient.from_cluster_settings(
    # ...,
    settings=settings,
    # ...,
)

Basic read/write table operations

Note

In general, it's recommended to use SQL Warehouses for all operations. Using all-purpose clusters for fetching data can be extremely slow. Carefully consider the cluster type for your operations.

Read table

import pandas as pd
from dbxio import read_table

# read all records from table and convert to pandas DataFrame
table = pd.DataFrame(read_table('catalog.schema.table', client=...))

# read only 10 records
table10 = list(read_table('catalog.schema.table', client=..., limit_records=10))

# read record by record
with read_table('catalog.schema.table', client=...) as gen:
    for record in gen:
        ...  # do something with record

Write table

There are two ways to write data to a table: using SQL and using bulk operation.

  • SQL: this approach creates one sql query for all records and performs a INSERT INTO operation. It's slow and not recommended for big amounts of data. There is a strict limit that only 50 Mb of data can be written at once.

  • Bulk operation: this approach writes data to object storage in parquet format and then performs a COPY INTO operation. It's faster and recommended for big amounts of data. Object storage must be mounted to the Databricks workspace to access data.

import dbxio

data = [
    {'col1': 1, 'col2': 'a', 'col3': [1, 2, 3]},
    {'col1': 2, 'col2': 'b', 'col3': [4, 5, 6]},
]
schema = dbxio.TableSchema.from_obj(
    [
        {'name': 'col1', 'type': dbxio.types.IntType()},
        {'name': 'col2', 'type': dbxio.types.StringType()},
        {'name': 'col3', 'type': dbxio.types.ArrayType(dbxio.types.IntType())},
    ]
)

# write data to table using sql (slow and not recommended for big amounts of data)
dbxio.write_table(
    dbxio.Table('catalog.schema.table', schema=schema),
    data,
    client=...,
    append=True,
)

# write data to table using bulk operation
dbxio.bulk_write_table(
    dbxio.Table('catalog.schema.table', schema=schema),
    data,
    client=...,
    abs_name='blob_storage_name',
    abs_container_name='container_name',
    append=True,
)

Run SQL query and fetch results

import dbxio

client = dbxio.DbxIOClient.from_cluster_settings(
    cluster_type=dbxio.ClusterType.SQL_WAREHOUSE,
    http_path='<YOUR_HTTP_PATH>',
    server_hostname='<YOUR_SERVER_HOSTNAME>',
)

# fetch all results
data = list(client.sql('select 1+1'))

# fetch results and convert to pandas DataFrame
df: pd.DataFrame = client.sql('select 1+1').df()

# or you can use a generator
with client.sql('select 1+1') as gen:
    for record in gen:
        ...  # do something with record

Save results to files

import dbxio

client = dbxio.DbxIOClient.from_cluster_settings(
    cluster_type=dbxio.ClusterType.SQL_WAREHOUSE,
    http_path='<YOUR_HTTP_PATH>',
    server_hostname='<YOUR_SERVER_HOSTNAME>',
)
path_to_chunks = client.sql_to_files(
    query='select 1+1',
    results_path='path/to/save/files',
    max_concurrency=8,
)

Save SQL results to files

dbxio can save the result of an arbitrary SQL query to files in parquet format. The result will be chunked into several files. The returned path will contain a directory named <statement_id> where all files will be saved (statement_id is a unique identifier of the query)

Note

As usual, it's recommended to use SQL warehouse.

Using all-purpose clusters

All-purpose clusters use ODBC protocol to fetch results of queries, and it can be extremely slow even for small tables. It's not deprecated, but it's highly recommended to use SQL warehouses instead.

import dbxio

client = dbxio.DbxIOClient.from_cluster_settings(
    cluster_type=dbxio.ClusterType.SQL_WAREHOUSE,
    http_path='<YOUR_HTTP_PATH>',
    server_hostname='<YOUR_SERVER_HOSTNAME>',
)

path_to_files = dbxio.save_table_to_files(
    table='catalog.schema.table',
    client=client,
    results_path='path/to/save/files',
    max_concurrency=8,
)

# or save a result of an arbitrary SQL query
QUERY = 'select * from domain.schema.table where 1=1 and 2=2'
path_to_files = client.sql_to_files(
    query=QUERY,
    results_path='path/to/save/files',
    max_concurrency=8,
)

Upload large files to Databricks table

Supported formats: CSV, JSON, AVRO, ORC, PARQUET, TEXT, BINARYFILE.

Warning

dbxio does not make any transformations to the data. It is the user's responsibility to ensure that the data is in the correct format and schema.

import logging
import dbxio
import pandas as pd

logging.basicConfig(level=logging.INFO)

# it can be a path to directory. then all files by glob **/*.<format> will be uploaded (but they must have the same schema)
LARGE_FILE_TO_UPLOAD = 'path/to/large/file.csv'  # 1GB+

client = dbxio.DbxIOClient.from_cluster_settings(
    cluster_type=dbxio.ClusterType.SQL_WAREHOUSE,
    http_path='<YOUR_HTTP_PATH>',
    server_hostname='<YOUR_SERVER_HOSTNAME>',
)
schema = dbxio.infer_schema(pd.read_csv(LARGE_FILE_TO_UPLOAD, low_memory=True).iloc[0].to_dict())
table_format = dbxio.TableFormat.CSV
table = dbxio.Table('catalog.schema.table', schema=schema, table_format=table_format)

dbxio.bulk_write_local_files(
    table=table,
    path=LARGE_FILE_TO_UPLOAD,
    table_format=table_format,
    client=client,
    append=False,
    abs_name='blob_storage_name',
    abs_container_name='container_name',
    max_concurrency=8,
)
Data consistency

Under the hood dbxio copies all files to ABS tracking success of the upload and writes log files to be resumable without data loss or repeated upload.

After the upload is finished, dbxio runs COPY INTO command to load the data into the table.

If something went wrong

dbxio uses blob lease to ensure that there's only one process can write to the blob. If another process tries to write to the blob, it will raise LeaseAlreadyPresentError.

But sometimes lease can be left without a release. To break all leases, pass force=True to bulk_write_local_files function.

Volume operations

There are two types of Volumes in Databricks: managed and external. You can read more about them in the Databricks documentation.

dbxio fully supports both types.

Working with data in external Volume will be done using SDK your cloud provider. To work with managed Volume dbxio uses Databricks Files API.

Note

Databricks API allows downloading/uploading files up to 5GB in managed Volumes. If you need to download bigger files, consider using external Volume or splitting the file into smaller parts.

Upload to Volume non-tabular data

To work with external Volumes in Databricks, you need to make sure that your target catalog has associated external storage.

Associated external storage is:

  • created external location in the Databricks workspace
  • stored desired container name in catalog's properties with key default_external_location

Note

dbxio creates Volumes automatically on write operations. If you want to disable this behavior, pass create_volume_if_not_exists=False to write_volume function.,

# dbxio will upload all found files in the directory (except "hidden" files)
PATH_TO_FILES = 'path/to/files'
dbxio.write_volume(
    path=PATH_TO_FILES,
    catalog_name='catalog_name',
    schema_name='schema_name',
    volume_name='volume_name',
    client=...,
    volume_type=dbxio.VolumeType.MANAGED,  # or EXTERNAL
    max_concurrency=8,
)

Upload single file (or files by prefix path) to existing Volume

If you want to add or modify files in existing Volume, you can use volume_path parameter to specify the path in Volume.

The code below will add or modify file file.txt on the path path/in/volume/file.txt in Volume. If Volume does not exist, it will be created.

dbxio.write_volume(
    path='path/to/file.txt',
    catalog_name='catalog_name',
    schema_name='schema_name',
    volume_name='volume_name',
    client=...,
    volume_path='path/in/volume',
)

Download from Volume

dbxio.download_volume(
    path='local/path/to/download',
    catalog_name='catalog_name',
    schema_name='schema_name',
    volume_name='volume_name',
    client=...,
)

Download files by prefix path from Volume

It's also possible to download files by prefix path from Volume.

Code below will download all files from the path path/in/volume in Volume to the local directory local/path/to/download.

dbxio.download_volume(
    path='local/path/to/download',
    catalog_name='catalog_name',
    schema_name='schema_name',
    volume_name='volume_name',
    client=...,
    volume_path='path/in/volume',
)

Delete Volume

dbxio can delete Volumes in Databricks. If your Volume is external, it will delete all files in object storage first and then delete the Volume's metadata.

# first we need to create a Volume object (it will fetch all required information from Databricks)
volume = dbxio.Volume.from_url('/Volumes/<catalog>/<schema>/<volume_name>', client=...)

dbxio.delete_volume(volume, client=...)

Further docs