Skip to content

Commit 77dff89

Browse files
committed
Merge branch 'main' into shuowei-fix-unpivot-empty
2 parents f03758d + 0ebc733 commit 77dff89

File tree

9 files changed

+226
-262
lines changed

9 files changed

+226
-262
lines changed

bigframes/bigquery/_operations/table.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import google.cloud.bigquery
2020
import pandas as pd
2121

22+
import bigframes.core.compile.sqlglot.sql as sg_sql
2223
import bigframes.core.logging.log_adapter as log_adapter
23-
import bigframes.core.sql.table
2424
import bigframes.session
2525

2626

@@ -80,14 +80,16 @@ def create_external_table(
8080
"""
8181
import bigframes.pandas as bpd
8282

83-
sql = bigframes.core.sql.table.create_external_table_ddl(
84-
table_name=table_name,
85-
replace=replace,
86-
if_not_exists=if_not_exists,
87-
columns=columns,
88-
partition_columns=partition_columns,
89-
connection_name=connection_name,
90-
options=options,
83+
sql = sg_sql.to_sql(
84+
sg_sql.create_external_table(
85+
table_name=table_name,
86+
replace=replace,
87+
if_not_exists=if_not_exists,
88+
columns=columns,
89+
partition_columns=partition_columns,
90+
connection_name=connection_name,
91+
options=options,
92+
)
9193
)
9294

9395
if session is None:

bigframes/core/compile/sqlglot/sql/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
table,
2323
to_sql,
2424
)
25-
from bigframes.core.compile.sqlglot.sql.ddl import load_data
25+
from bigframes.core.compile.sqlglot.sql.ddl import create_external_table, load_data
2626
from bigframes.core.compile.sqlglot.sql.dml import insert, replace
2727

2828
__all__ = [
@@ -35,6 +35,7 @@
3535
"table",
3636
"to_sql",
3737
# From ddl.py
38+
"create_external_table",
3839
"load_data",
3940
# From dml.py
4041
"insert",

bigframes/core/compile/sqlglot/sql/ddl.py

Lines changed: 145 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -22,51 +22,6 @@
2222
from bigframes.core.compile.sqlglot.sql import base
2323

2424

25-
def _loaddata_sql(self: sg.Generator, expression: sge.LoadData) -> str:
26-
out = ["LOAD DATA"]
27-
if expression.args.get("overwrite"):
28-
out.append("OVERWRITE")
29-
30-
out.append(f"INTO {self.sql(expression, 'this').strip()}")
31-
32-
# We ignore inpath as it's just a dummy to satisfy sqlglot requirements
33-
# but BigQuery uses FROM FILES instead.
34-
35-
columns = self.sql(expression, "columns").strip()
36-
if columns:
37-
out.append(columns)
38-
39-
partition_by = self.sql(expression, "partition_by").strip()
40-
if partition_by:
41-
out.append(partition_by)
42-
43-
cluster_by = self.sql(expression, "cluster_by").strip()
44-
if cluster_by:
45-
out.append(cluster_by)
46-
47-
options = self.sql(expression, "options").strip()
48-
if options:
49-
out.append(options)
50-
51-
from_files = self.sql(expression, "from_files").strip()
52-
if from_files:
53-
out.append(f"FROM FILES {from_files}")
54-
55-
with_partition_columns = self.sql(expression, "with_partition_columns").strip()
56-
if with_partition_columns:
57-
out.append(f"WITH PARTITION COLUMNS {with_partition_columns}")
58-
59-
connection = self.sql(expression, "connection").strip()
60-
if connection:
61-
out.append(f"WITH CONNECTION {connection}")
62-
63-
return " ".join(out)
64-
65-
66-
# Register the transform for BigQuery generator
67-
sg.dialects.bigquery.BigQuery.Generator.TRANSFORMS[sge.LoadData] = _loaddata_sql
68-
69-
7025
def load_data(
7126
table_name: str,
7227
*,
@@ -84,21 +39,6 @@ def load_data(
8439
# Quoting is handled by the dialect.
8540
table_expr = sge.Table(this=base.identifier(table_name))
8641

87-
sge_columns = (
88-
sge.Schema(
89-
this=None,
90-
expressions=[
91-
sge.ColumnDef(
92-
this=base.identifier(name),
93-
kind=sge.DataType.build(typ, dialect="bigquery"),
94-
)
95-
for name, typ in columns.items()
96-
],
97-
)
98-
if columns
99-
else None
100-
)
101-
10242
sge_partition_by = (
10343
sge.PartitionedByProperty(
10444
this=base.identifier(partition_by[0])
@@ -115,50 +55,166 @@ def load_data(
11555
else None
11656
)
11757

118-
sge_table_options = (
119-
sge.Properties(
120-
expressions=[
121-
sge.Property(this=base.identifier(k), value=base.literal(v))
122-
for k, v in table_options.items()
123-
]
124-
)
125-
if table_options
126-
else None
127-
)
128-
12958
sge_from_files = sge.Tuple(
13059
expressions=[
13160
sge.Property(this=base.identifier(k), value=base.literal(v))
13261
for k, v in from_files_options.items()
13362
]
13463
)
13564

136-
sge_with_partition_columns = (
137-
sge.Schema(
138-
this=None,
139-
expressions=[
140-
sge.ColumnDef(
141-
this=base.identifier(name),
142-
kind=sge.DataType.build(typ, dialect="bigquery"),
143-
)
144-
for name, typ in with_partition_columns.items()
145-
],
146-
)
147-
if with_partition_columns
148-
else None
149-
)
150-
15165
sge_connection = base.identifier(connection_name) if connection_name else None
15266

15367
return sge.LoadData(
15468
this=table_expr,
15569
overwrite=(write_disposition == "OVERWRITE"),
15670
inpath=sge.convert("fake"), # satisfy sqlglot's required inpath arg
157-
columns=sge_columns,
71+
columns=_get_sge_schema(columns),
15872
partition_by=sge_partition_by,
15973
cluster_by=sge_cluster_by,
160-
options=sge_table_options,
74+
options=_get_sge_properties(table_options),
16175
from_files=sge_from_files,
162-
with_partition_columns=sge_with_partition_columns,
76+
with_partition_columns=_get_sge_schema(with_partition_columns),
77+
connection=sge_connection,
78+
)
79+
80+
81+
def create_external_table(
82+
table_name: str,
83+
*,
84+
replace: bool = False,
85+
if_not_exists: bool = False,
86+
columns: Optional[Mapping[str, str]] = None,
87+
partition_columns: Optional[Mapping[str, str]] = None,
88+
connection_name: Optional[str] = None,
89+
options: Optional[Mapping[str, Union[str, int, float, bool, list]]] = None,
90+
) -> sge.Create:
91+
"""Generates the CREATE EXTERNAL TABLE DDL statement."""
92+
sge_connection = base.identifier(connection_name) if connection_name else None
93+
94+
table_expr = sge.Table(this=base.identifier(table_name))
95+
96+
# sqlglot.expressions.Create usually takes 'this' (Table or Schema)
97+
sge_schema = _get_sge_schema(columns)
98+
this: sge.Table | sge.Schema
99+
if sge_schema:
100+
sge_schema.set("this", table_expr)
101+
this = sge_schema
102+
else:
103+
this = table_expr
104+
105+
return sge.Create(
106+
this=this,
107+
kind="EXTERNAL TABLE",
108+
replace=replace,
109+
exists_ok=if_not_exists,
110+
properties=_get_sge_properties(options),
163111
connection=sge_connection,
112+
partition_columns=_get_sge_schema(partition_columns),
164113
)
114+
115+
116+
def _get_sge_schema(
117+
columns: Optional[Mapping[str, str]] = None
118+
) -> Optional[sge.Schema]:
119+
if not columns:
120+
return None
121+
122+
return sge.Schema(
123+
this=None,
124+
expressions=[
125+
sge.ColumnDef(
126+
this=base.identifier(name),
127+
kind=sge.DataType.build(typ, dialect=base.DIALECT),
128+
)
129+
for name, typ in columns.items()
130+
],
131+
)
132+
133+
134+
def _get_sge_properties(
135+
options: Optional[Mapping[str, Union[str, int, float, bool, list]]] = None
136+
) -> Optional[sge.Properties]:
137+
if not options:
138+
return None
139+
140+
return sge.Properties(
141+
expressions=[
142+
sge.Property(this=base.identifier(k), value=base.literal(v))
143+
for k, v in options.items()
144+
]
145+
)
146+
147+
148+
def _loaddata_sql(self: sg.Generator, expression: sge.LoadData) -> str:
149+
out = ["LOAD DATA"]
150+
if expression.args.get("overwrite"):
151+
out.append("OVERWRITE")
152+
153+
out.append(f"INTO {self.sql(expression, 'this').strip()}")
154+
155+
# We ignore inpath as it's just a dummy to satisfy sqlglot requirements
156+
# but BigQuery uses FROM FILES instead.
157+
158+
columns = self.sql(expression, "columns").strip()
159+
if columns:
160+
out.append(columns)
161+
162+
partition_by = self.sql(expression, "partition_by").strip()
163+
if partition_by:
164+
out.append(partition_by)
165+
166+
cluster_by = self.sql(expression, "cluster_by").strip()
167+
if cluster_by:
168+
out.append(cluster_by)
169+
170+
options = self.sql(expression, "options").strip()
171+
if options:
172+
out.append(options)
173+
174+
from_files = self.sql(expression, "from_files").strip()
175+
if from_files:
176+
out.append(f"FROM FILES {from_files}")
177+
178+
with_partition_columns = self.sql(expression, "with_partition_columns").strip()
179+
if with_partition_columns:
180+
out.append(f"WITH PARTITION COLUMNS {with_partition_columns}")
181+
182+
connection = self.sql(expression, "connection").strip()
183+
if connection:
184+
out.append(f"WITH CONNECTION {connection}")
185+
186+
return " ".join(out)
187+
188+
189+
def _create_sql(self: sg.Generator, expression: sge.Create) -> str:
190+
kind = expression.args.get("kind")
191+
if kind != "EXTERNAL TABLE":
192+
return self.create_sql(expression)
193+
194+
out = ["CREATE"]
195+
if expression.args.get("replace"):
196+
out.append("OR REPLACE")
197+
out.append("EXTERNAL TABLE")
198+
if expression.args.get("exists_ok"):
199+
out.append("IF NOT EXISTS")
200+
201+
out.append(self.sql(expression, "this"))
202+
203+
connection = self.sql(expression, "connection").strip()
204+
if connection:
205+
out.append(f"WITH CONNECTION {connection}")
206+
207+
partition_columns = self.sql(expression, "partition_columns").strip()
208+
if partition_columns:
209+
out.append(f"WITH PARTITION COLUMNS {partition_columns}")
210+
211+
properties = self.sql(expression, "properties").strip()
212+
if properties:
213+
out.append(properties)
214+
215+
return " ".join(out)
216+
217+
218+
# Register the transform for BigQuery generator
219+
base.DIALECT.Generator.TRANSFORMS[sge.LoadData] = _loaddata_sql
220+
base.DIALECT.Generator.TRANSFORMS[sge.Create] = _create_sql

bigframes/core/sql/table.py

Lines changed: 0 additions & 68 deletions
This file was deleted.

0 commit comments

Comments
 (0)