Skip to content
74 changes: 47 additions & 27 deletions sqlalchemy_bigquery/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,61 +833,81 @@ def _process_time_partitioning(
function returns:
"PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)".

Current inputs allowed by BQ and covered by this function include:
Current inputs allowed by BQ AND covered by this function include:
* _PARTITIONDATE
* DATETIME_TRUNC(<datetime_column>, DAY/HOUR/MONTH/YEAR)
* TIMESTAMP_TRUNC(<timestamp_column>, DAY/HOUR/MONTH/YEAR)
* DATE_TRUNC(<date_column>, MONTH/YEAR)

Additional options allowed by BQ but not explicitly covered by this
function include:
* DATE(_PARTITIONTIME)
* DATE(<timestamp_column>)
* DATE(<datetime_column>)
* DATE column
"""

sqltypes = {
"_PARTITIONDATE": ("_PARTITIONDATE", None),
"TIMESTAMP": ("TIMESTAMP_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}),
"DATETIME": ("DATETIME_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}),
"DATE": ("DATE_TRUNC", {"MONTH", "YEAR"}),
}

# Extract field (i.e <column_name> or _PARTITIONDATE)
# AND extract the name of the column_type (i.e. "TIMESTAMP", "DATE",
# Extract field if given (i.e <column_name>) or _PARTITIONDATE if not given
# AND extract the name of the column_type (i.e. is it a "TIMESTAMP", "DATE",
# "DATETIME", "_PARTITIONDATE")
# Also extract the time_partitioning.type_ (i.e. the truncation granularity:
# HOUR, DAY, MONTH, YEAR)
if time_partitioning.field is not None:
field = time_partitioning.field
column_type = table.columns[field].type.__visit_name__.upper()

# Extract time_partitioning.type_ (DAY, HOUR, MONTH, YEAR)
# i.e. generates one partition per type (1/DAY, 1/HOUR)
# NOTE: if time_partitioning.type_ == None, the python-bigquery library
# will eventually overwrite it with a default of DAY.
partitioning_period = time_partitioning.type_

else:
# If no field is given, default to "_PARTITIONDATE" as the
# field to partition on. In addition, to normalize the processing in
# the remainder of this function, set column_type and partitioning_period
# as shown below.
field = "_PARTITIONDATE"
column_type = "_PARTITIONDATE"
partitioning_period = None

# TODO: move this dictionary outside the function or to top of function
sqltypes_w_no_partitioning_period = {
# Keys are columns, values are functions
"_PARTITIONDATE": None,
"_PARTITIONTIME": "DATE", # <date function>
"DATE": None, # 'DATE' is a <date_column> not a function
"DATETIME": "DATE", # <date function>
"TIMESTAMP": "DATE", # <date function>
}

# Extract time_partitioning.type_ (DAY, HOUR, MONTH, YEAR)
# i.e. generates one partition per type (1/DAY, 1/HOUR)
# NOTE: if time_partitioning.type_ == None, it gets
# immediately overwritten by python-bigquery to a default of DAY.
partitioning_period = time_partitioning.type_
# TODO: move this dictionary outside the function or to top of function
sqltypes_w_partitioning_period = {
# Keys are columns, values are (functions, {allowed_partioning_periods})
# "_PARTITIONDATE": ("_PARTITIONDATE", {}),
"DATE": ("DATE_TRUNC", {"MONTH", "YEAR"}),
"DATETIME": ("DATETIME_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}),
"TIMESTAMP": ("TIMESTAMP_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}),
}

# Extract the truncation_function (i.e. DATE_TRUNC)
# and the set of allowable partition_periods
# Extract truncation_function (i.e. DATE_TRUNC()) or a default value if
# a truncation_function is not used (i.e. for _PARTITIONDATE)
# Also extract the set of allowable partition_periods
# that can be used in that function
trunc_fn, allowed_partitions = sqltypes[column_type]
if partitioning_period is None:
function = sqltypes_w_no_partitioning_period[column_type]
else:
function, allowed_partitions = sqltypes_w_partitioning_period[column_type]

# Create output:
# Special Case: _PARTITIONDATE does NOT use a function or partitioning_period
if trunc_fn == "_PARTITIONDATE":
# Special Case 1: _PARTITIONDATE does NOT use a function or partitioning_period
if function is None:
return f"PARTITION BY {field}"

# Special Case: BigQuery will not accept DAY as partitioning_period for
# Special Case 2: BigQuery will not accept DAY as partitioning_period for
# DATE_TRUNC.
# However, the default argument in python-bigquery for TimePartioning
# However, the default argument in python-bigquery for TimePartitioning
# is DAY. This case overwrites that to avoid making a breaking change in
# python-bigquery.
# https://github.com/googleapis/python-bigquery/blob/a4d9534a900f13ae7355904cda05097d781f27e3/google/cloud/bigquery/table.py#L2916
if trunc_fn == "DATE_TRUNC" and partitioning_period == "DAY":
if function == "DATE_TRUNC" and partitioning_period == "DAY":
raise ValueError(
"The TimePartitioning.type_ must be one of: "
f"{allowed_partitions}, received {partitioning_period}."
Expand All @@ -903,7 +923,7 @@ def _process_time_partitioning(
f"{allowed_partitions}, received {partitioning_period}."
)

return f"PARTITION BY {trunc_fn}({field}, {partitioning_period})"
return f"PARTITION BY {function}({field}, {partitioning_period})"

def _process_range_partitioning(
self, table: Table, range_partitioning: RangePartitioning
Expand Down