From 67c9fe9feeaf76e02839c8c988b183241cd5aa1d Mon Sep 17 00:00:00 2001 From: Mika Naylor Date: Wed, 11 Mar 2026 13:52:09 +0100 Subject: [PATCH] [FLINK-39242][python] Fix error when getting columns from ResolvedSchema --- flink-python/pyflink/table/catalog.py | 6 ++--- .../table/tests/test_schema_operation.py | 22 ++++++++++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py index 888f995d68820..c9d3ff8b3c138 100644 --- a/flink-python/pyflink/table/catalog.py +++ b/flink-python/pyflink/table/catalog.py @@ -1537,11 +1537,11 @@ def _from_j_column(j_column) -> Optional["Column"]: raise TypeError("The input %s is not an instance of Column." % j_column) if get_java_class(JPhysicalColumn).isAssignableFrom(j_column.getClass()): - return PhysicalColumn(j_physical_column=j_column.getClass()) + return PhysicalColumn(j_physical_column=j_column) elif get_java_class(JComputedColumn).isAssignableFrom(j_column.getClass()): - return ComputedColumn(j_computed_column=j_column.getClass()) + return ComputedColumn(j_computed_column=j_column) elif get_java_class(JMetadataColumn).isAssignableFrom(j_column.getClass()): - return MetadataColumn(j_metadata_column=j_column.getClass()) + return MetadataColumn(j_metadata_column=j_column) else: return None diff --git a/flink-python/pyflink/table/tests/test_schema_operation.py b/flink-python/pyflink/table/tests/test_schema_operation.py index c293e783d4815..d8ce568aa2edb 100644 --- a/flink-python/pyflink/table/tests/test_schema_operation.py +++ b/flink-python/pyflink/table/tests/test_schema_operation.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from pyflink.table.catalog import ResolvedSchema +from pyflink.table.catalog import ResolvedSchema, PhysicalColumn from pyflink.table.table_schema import TableSchema from pyflink.table.types import DataTypes from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase @@ -33,7 +33,7 @@ def test_get_schema(self): result = t.group_by(t.c).select(t.a.sum.alias('a'), t.c.alias('b')) schema = result.get_schema() - assert schema == TableSchema(["a", "b"], [DataTypes.BIGINT(), DataTypes.STRING()]) + self.assertEqual(schema, TableSchema(["a", "b"], [DataTypes.BIGINT(), DataTypes.STRING()])) def test_get_resolved_schema(self): t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']) @@ -42,7 +42,23 @@ def test_get_resolved_schema(self): ['a', 'b', 'c'], [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()], ) - assert resolved_schema == expected_schema + self.assertEqual(resolved_schema, expected_schema) + + def test_resolved_schema_get_columns(self): + physical_schema = ResolvedSchema.physical( + ['a', 'b', 'c'], + [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()], + ) + + columns = physical_schema.get_columns() + self.assertEqual(len(columns), 3) + for column in columns: + self.assertEqual(type(column), PhysicalColumn) + + for idx in range(3): + column = physical_schema.get_column(idx) + self.assertEqual(type(column), PhysicalColumn) + if __name__ == '__main__': import unittest