-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Add query_output_schema to ReadFromBigQuery for BEAM_ROW + query support #39160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -777,6 +777,55 @@ def test_read_all_lineage(self): | |||||||||||
| 'bigquery:project2.dataset2.table2' | ||||||||||||
| ])) | ||||||||||||
|
|
||||||||||||
| def test_query_with_beam_row_requires_schema(self): | ||||||||||||
| with self.assertRaisesRegex(ValueError, 'query_output_schema'): | ||||||||||||
| ReadFromBigQuery( | ||||||||||||
| query='SELECT id, name FROM dataset.table', output_type='BEAM_ROW') | ||||||||||||
|
|
||||||||||||
| def test_query_with_beam_row_and_schema_accepted(self): | ||||||||||||
| schema = { | ||||||||||||
| 'fields': [ | ||||||||||||
| { | ||||||||||||
| 'name': 'id', 'type': 'INTEGER', 'mode': 'NULLABLE' | ||||||||||||
| }, | ||||||||||||
| { | ||||||||||||
| 'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE' | ||||||||||||
| }, | ||||||||||||
| ] | ||||||||||||
| } | ||||||||||||
| transform = ReadFromBigQuery( | ||||||||||||
| query='SELECT id, name FROM dataset.table', | ||||||||||||
| output_type='BEAM_ROW', | ||||||||||||
| query_output_schema=schema) | ||||||||||||
| self.assertEqual(transform.query_output_schema, schema) | ||||||||||||
|
|
||||||||||||
| def test_expand_output_type_uses_query_schema(self): | ||||||||||||
| schema = { | ||||||||||||
| 'fields': [ | ||||||||||||
| { | ||||||||||||
| 'name': 'id', 'type': 'INTEGER', 'mode': 'NULLABLE' | ||||||||||||
| }, | ||||||||||||
| { | ||||||||||||
| 'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE' | ||||||||||||
| }, | ||||||||||||
| ] | ||||||||||||
| } | ||||||||||||
| transform = ReadFromBigQuery( | ||||||||||||
| query='SELECT id, name FROM dataset.table', | ||||||||||||
| output_type='BEAM_ROW', | ||||||||||||
| query_output_schema=schema) | ||||||||||||
|
|
||||||||||||
| with mock.patch.object(bigquery_tools.BigQueryWrapper, | ||||||||||||
| 'get_table') as mock_get_table, \ | ||||||||||||
| mock.patch('apache_beam.io.gcp.bigquery.bigquery_schema_tools' | ||||||||||||
| '.convert_to_usertype') as mock_convert: | ||||||||||||
| mock_convert.return_value = beam.Map(lambda x: x) | ||||||||||||
| fake_pcoll = mock.MagicMock() | ||||||||||||
| transform._expand_output_type(fake_pcoll) | ||||||||||||
|
|
||||||||||||
| mock_get_table.assert_not_called() | ||||||||||||
| mock_convert.assert_called_once_with(schema, None) | ||||||||||||
|
Comment on lines
+826
to
+827
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since
Suggested change
|
||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') | ||||||||||||
| class TestBigQuerySink(unittest.TestCase): | ||||||||||||
|
|
||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,7 +101,8 @@ def read_from_bigquery( | |
| table: Optional[str] = None, | ||
| query: Optional[str] = None, | ||
| row_restriction: Optional[str] = None, | ||
| fields: Optional[Iterable[str]] = None): | ||
| fields: Optional[Iterable[str]] = None, | ||
| schema: Optional[Any] = None): | ||
| """Reads data from BigQuery. | ||
|
|
||
| Exactly one of table or query must be set. | ||
|
|
@@ -119,18 +120,27 @@ def read_from_bigquery( | |
| specified field is a nested field, all the sub-fields in the field will be | ||
| selected. The output field order is unrelated to the order of fields | ||
| given here. | ||
| schema (dict): Required when query is set. A BigQuery schema describing | ||
| the query result columns, e.g. | ||
| ``{'fields': [{'name': 'col', 'type': 'STRING', 'mode': 'NULLABLE'}]}``. | ||
| Not applicable when reading from a table (schema is auto-derived). | ||
| """ | ||
| if query is None: | ||
| assert table is not None | ||
| else: | ||
|
Comment on lines
128
to
130
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a user provides a if query is None:
assert table is not None
if schema is not None:
raise ValueError(
"The 'schema' parameter is only supported when reading with a 'query'. "
"For table-based reads, the schema is automatically derived.")
else: |
||
| assert table is None and row_restriction is None and fields is None | ||
| if schema is None: | ||
| raise ValueError( | ||
| "When using 'query' in ReadFromBigQuery YAML transform, " | ||
| "'schema' is required to define the output row structure.") | ||
| return ReadFromBigQuery( | ||
| query=query, | ||
| table=table, | ||
| row_restriction=row_restriction, | ||
| selected_fields=fields, | ||
| method='DIRECT_READ', | ||
| output_type='BEAM_ROW') | ||
| output_type='BEAM_ROW', | ||
| query_output_schema=schema) | ||
|
|
||
|
|
||
| def write_to_bigquery( | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -764,6 +764,49 @@ def expand(self, pcoll): | |||||||||||||||
| ])) | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| class ReadFromBigQueryTest(unittest.TestCase): | ||||||||||||||||
| def test_query_without_schema_raises(self): | ||||||||||||||||
| from apache_beam.yaml.yaml_io import read_from_bigquery | ||||||||||||||||
| with self.assertRaisesRegex(ValueError, 'schema'): | ||||||||||||||||
| read_from_bigquery(query='SELECT id FROM dataset.table') | ||||||||||||||||
|
|
||||||||||||||||
| def test_table_without_schema_ok(self): | ||||||||||||||||
| import unittest.mock as mock | ||||||||||||||||
|
|
||||||||||||||||
| from apache_beam.yaml.yaml_io import read_from_bigquery | ||||||||||||||||
| with mock.patch('apache_beam.yaml.yaml_io.ReadFromBigQuery') as mock_rfbq: | ||||||||||||||||
| mock_rfbq.return_value = mock.MagicMock() | ||||||||||||||||
| read_from_bigquery(table='project:dataset.table') | ||||||||||||||||
| mock_rfbq.assert_called_once() | ||||||||||||||||
| call_kwargs = mock_rfbq.call_args[1] | ||||||||||||||||
| self.assertIsNone(call_kwargs.get('query_output_schema')) | ||||||||||||||||
|
|
||||||||||||||||
|
Comment on lines
+782
to
+783
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a unit test to verify that providing a
Suggested change
|
||||||||||||||||
| def test_query_with_schema_passes_through(self): | ||||||||||||||||
| import unittest.mock as mock | ||||||||||||||||
|
|
||||||||||||||||
| from apache_beam.yaml.yaml_io import read_from_bigquery | ||||||||||||||||
| schema = { | ||||||||||||||||
| 'fields': [ | ||||||||||||||||
| { | ||||||||||||||||
| 'name': 'id', 'type': 'INTEGER', 'mode': 'NULLABLE' | ||||||||||||||||
| }, | ||||||||||||||||
| ] | ||||||||||||||||
| } | ||||||||||||||||
| with mock.patch('apache_beam.yaml.yaml_io.ReadFromBigQuery') as mock_rfbq: | ||||||||||||||||
| mock_rfbq.return_value = mock.MagicMock() | ||||||||||||||||
| read_from_bigquery(query='SELECT id FROM dataset.table', schema=schema) | ||||||||||||||||
| call_kwargs = mock_rfbq.call_args[1] | ||||||||||||||||
| self.assertEqual(call_kwargs['query_output_schema'], schema) | ||||||||||||||||
|
|
||||||||||||||||
| def test_query_and_table_both_raises(self): | ||||||||||||||||
| from apache_beam.yaml.yaml_io import read_from_bigquery | ||||||||||||||||
| with self.assertRaises(AssertionError): | ||||||||||||||||
| read_from_bigquery( | ||||||||||||||||
| table='project:dataset.table', | ||||||||||||||||
| query='SELECT id FROM dataset.table', | ||||||||||||||||
| schema={'fields': []}) | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| if __name__ == '__main__': | ||||||||||||||||
| logging.getLogger().setLevel(logging.INFO) | ||||||||||||||||
| unittest.main() | ||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
query_output_schemaparameter can be passed as a dictionary or a JSON string (as documented in the docstring and used in YAML). However,bigquery_schema_tools.convert_to_usertypeexpects aTableSchemaobject. Passing a dictionary or string directly will result in anAttributeErrorat runtime (e.g.,'dict' object has no attribute 'fields'). Usebigquery_tools.get_dict_table_schemato normalize the schema before passing it toconvert_to_usertype.