Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions flink-python/pyflink/table/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,11 +1537,11 @@ def _from_j_column(j_column) -> Optional["Column"]:
raise TypeError("The input %s is not an instance of Column." % j_column)

if get_java_class(JPhysicalColumn).isAssignableFrom(j_column.getClass()):
return PhysicalColumn(j_physical_column=j_column.getClass())
return PhysicalColumn(j_physical_column=j_column)
elif get_java_class(JComputedColumn).isAssignableFrom(j_column.getClass()):
return ComputedColumn(j_computed_column=j_column.getClass())
return ComputedColumn(j_computed_column=j_column)
elif get_java_class(JMetadataColumn).isAssignableFrom(j_column.getClass()):
return MetadataColumn(j_metadata_column=j_column.getClass())
return MetadataColumn(j_metadata_column=j_column)
else:
return None

Expand Down
22 changes: 19 additions & 3 deletions flink-python/pyflink/table/tests/test_schema_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.table.catalog import ResolvedSchema
from pyflink.table.catalog import ResolvedSchema, PhysicalColumn
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import DataTypes
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
Expand All @@ -33,7 +33,7 @@ def test_get_schema(self):
result = t.group_by(t.c).select(t.a.sum.alias('a'), t.c.alias('b'))
schema = result.get_schema()

assert schema == TableSchema(["a", "b"], [DataTypes.BIGINT(), DataTypes.STRING()])
self.assertEqual(schema, TableSchema(["a", "b"], [DataTypes.BIGINT(), DataTypes.STRING()]))

def test_get_resolved_schema(self):
t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
Expand All @@ -42,7 +42,23 @@ def test_get_resolved_schema(self):
['a', 'b', 'c'],
[DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()],
)
assert resolved_schema == expected_schema
self.assertEqual(resolved_schema, expected_schema)

def test_resolved_schema_get_columns(self):
physical_schema = ResolvedSchema.physical(
['a', 'b', 'c'],
[DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()],
)

columns = physical_schema.get_columns()
self.assertEqual(len(columns), 3)
for column in columns:
self.assertEqual(type(column), PhysicalColumn)

for idx in range(3):
column = physical_schema.get_column(idx)
self.assertEqual(type(column), PhysicalColumn)


if __name__ == '__main__':
import unittest
Expand Down