diff --git a/paimon-python/pypaimon/api/api_request.py b/paimon-python/pypaimon/api/api_request.py index d453250757a5..f2d062f8d005 100644 --- a/paimon-python/pypaimon/api/api_request.py +++ b/paimon-python/pypaimon/api/api_request.py @@ -23,6 +23,7 @@ from pypaimon.common.identifier import Identifier from pypaimon.common.json_util import json_field from pypaimon.schema.schema import Schema +from pypaimon.schema.schema_change import SchemaChange from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_commit import PartitionStatistics @@ -76,3 +77,10 @@ class CommitTableRequest(RESTRequest): table_uuid: Optional[str] = json_field(FIELD_TABLE_UUID) snapshot: Snapshot = json_field(FIELD_SNAPSHOT) statistics: List[PartitionStatistics] = json_field(FIELD_STATISTICS) + + +@dataclass +class AlterTableRequest(RESTRequest): + FIELD_CHANGES = "changes" + + changes: List[SchemaChange] = json_field(FIELD_CHANGES) diff --git a/paimon-python/pypaimon/api/rest_api.py b/paimon-python/pypaimon/api/rest_api.py index 25165916a6f6..806831d95489 100755 --- a/paimon-python/pypaimon/api/rest_api.py +++ b/paimon-python/pypaimon/api/rest_api.py @@ -18,7 +18,7 @@ import logging from typing import Callable, Dict, List, Optional, Union -from pypaimon.api.api_request import (AlterDatabaseRequest, CommitTableRequest, +from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest, CommitTableRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest) from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse, @@ -289,6 +289,17 @@ def rename_table(self, source_identifier: Identifier, target_identifier: Identif request, self.rest_auth_function) + def alter_table(self, identifier: Identifier, changes: List): + database_name, table_name = self.__validate_identifier(identifier) + if not changes: + raise ValueError("Changes cannot be empty") + + request = AlterTableRequest(changes) + return self.client.post( + self.resource_paths.table(database_name, table_name), + request, + self.rest_auth_function) + def load_table_token(self, identifier: Identifier) -> GetTableTokenResponse: database_name, table_name = self.__validate_identifier(identifier) diff --git a/paimon-python/pypaimon/catalog/catalog.py b/paimon-python/pypaimon/catalog/catalog.py index a8e7dcd0657e..fa19355b0b83 100644 --- a/paimon-python/pypaimon/catalog/catalog.py +++ b/paimon-python/pypaimon/catalog/catalog.py @@ -21,6 +21,7 @@ from pypaimon.common.identifier import Identifier from pypaimon.schema.schema import Schema +from pypaimon.schema.schema_change import SchemaChange from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_commit import PartitionStatistics @@ -54,6 +55,15 @@ def get_table(self, identifier: Union[str, Identifier]) -> 'Table': def create_table(self, identifier: Union[str, Identifier], schema: Schema, ignore_if_exists: bool): """Create table with schema.""" + @abstractmethod + def alter_table( + self, + identifier: Union[str, Identifier], + changes: List[SchemaChange], + ignore_if_not_exists: bool = False + ): + """Alter table with schema changes.""" + def supports_version_management(self) -> bool: """ Whether this catalog supports version management for tables. diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py b/paimon-python/pypaimon/catalog/filesystem_catalog.py index b2eced586eff..8d1b485b7425 100644 --- a/paimon-python/pypaimon/catalog/filesystem_catalog.py +++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py @@ -20,16 +20,19 @@ from pypaimon.catalog.catalog import Catalog from pypaimon.catalog.catalog_environment import CatalogEnvironment -from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException, - DatabaseNotExistException, - TableAlreadyExistException, - TableNotExistException) +from pypaimon.catalog.catalog_exception import ( + DatabaseAlreadyExistException, + DatabaseNotExistException, + TableAlreadyExistException, + TableNotExistException +) from pypaimon.catalog.database import Database from pypaimon.common.options import Options from pypaimon.common.options.config import CatalogOptions from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.file_io import FileIO from pypaimon.common.identifier import Identifier +from pypaimon.schema.schema_change import SchemaChange from pypaimon.schema.schema_manager import SchemaManager from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_commit import PartitionStatistics @@ -115,6 +118,28 @@ def get_table_path(self, identifier: Identifier) -> str: db_path = self.get_database_path(identifier.get_database_name()) return f"{db_path}/{identifier.get_table_name()}" + def alter_table( + self, + identifier: Union[str, Identifier], + changes: List[SchemaChange], + ignore_if_not_exists: bool = False + ): + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + try: + self.get_table(identifier) + except TableNotExistException: + if not ignore_if_not_exists: + raise + return + + table_path = self.get_table_path(identifier) + schema_manager = SchemaManager(self.file_io, table_path) + try: + schema_manager.commit_changes(changes) + except Exception as e: + raise RuntimeError(f"Failed to alter table {identifier.get_full_name()}: {e}") from e + def commit_snapshot( self, identifier: Identifier, diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index c32c5bcb3946..5a0ca01ce48e 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -34,6 +34,7 @@ from pypaimon.common.file_io import FileIO from pypaimon.common.identifier import Identifier from pypaimon.schema.schema import Schema +from pypaimon.schema.schema_change import SchemaChange from pypaimon.schema.table_schema import TableSchema from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_commit import PartitionStatistics @@ -177,6 +178,20 @@ def drop_table(self, identifier: Union[str, Identifier], ignore_if_not_exists: b if not ignore_if_not_exists: raise TableNotExistException(identifier) from e + def alter_table( + self, + identifier: Union[str, Identifier], + changes: List[SchemaChange], + ignore_if_not_exists: bool = False + ): + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + try: + self.rest_api.alter_table(identifier, changes) + except NoSuchResourceException as e: + if not ignore_if_not_exists: + raise TableNotExistException(identifier) from e + def load_table_metadata(self, identifier: Identifier) -> TableMetadata: response = self.rest_api.get_table(identifier) return self.to_table_metadata(identifier.get_database_name(), response) diff --git a/paimon-python/pypaimon/schema/schema_change.py b/paimon-python/pypaimon/schema/schema_change.py new file mode 100644 index 000000000000..8e62e7f5bbd3 --- /dev/null +++ b/paimon-python/pypaimon/schema/schema_change.py @@ -0,0 +1,289 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional, Union + +from pypaimon.common.json_util import json_field +from pypaimon.schema.data_types import DataType + + +class Actions: + FIELD_ACTION = "action" + SET_OPTION_ACTION = "setOption" + REMOVE_OPTION_ACTION = "removeOption" + UPDATE_COMMENT_ACTION = "updateComment" + ADD_COLUMN_ACTION = "addColumn" + RENAME_COLUMN_ACTION = "renameColumn" + DROP_COLUMN_ACTION = "dropColumn" + UPDATE_COLUMN_TYPE_ACTION = "updateColumnType" + UPDATE_COLUMN_NULLABILITY_ACTION = "updateColumnNullability" + UPDATE_COLUMN_COMMENT_ACTION = "updateColumnComment" + UPDATE_COLUMN_DEFAULT_VALUE_ACTION = "updateColumnDefaultValue" + UPDATE_COLUMN_POSITION_ACTION = "updateColumnPosition" + + +class SchemaChange(ABC): + @staticmethod + def set_option(key: str, value: str) -> "SetOption": + return SetOption(key=key, value=value) + + @staticmethod + def remove_option(key: str) -> "RemoveOption": + return RemoveOption(key) + + @staticmethod + def update_comment(comment: Optional[str]) -> "UpdateComment": + return UpdateComment(comment) + + @staticmethod + def add_column( + field_name: Union[str, List[str]], + data_type: DataType, + comment: Optional[str] = None, + move: Optional["Move"] = None + ) -> "AddColumn": + if isinstance(field_name, str): + return AddColumn(field_names=[field_name], data_type=data_type, comment=comment, move=move) + else: + return AddColumn(field_names=field_name, data_type=data_type, comment=comment, move=move) + + @staticmethod + def rename_column(field_name: Union[str, List[str]], new_name: str) -> "RenameColumn": + if isinstance(field_name, str): + return RenameColumn([field_name], new_name) + else: + return RenameColumn(field_name, new_name) + + @staticmethod + def drop_column(field_name: Union[str, List[str]]) -> "DropColumn": + if isinstance(field_name, str): + return DropColumn([field_name]) + else: + return DropColumn(field_name) + + @staticmethod + def update_column_type( + field_name: Union[str, List[str]], + new_data_type: DataType, + keep_nullability: bool = False + ) -> "UpdateColumnType": + if isinstance(field_name, str): + return UpdateColumnType([field_name], new_data_type, keep_nullability) + else: + return UpdateColumnType(field_name, new_data_type, keep_nullability) + + @staticmethod + def update_column_nullability( + field_name: Union[str, List[str]], + new_nullability: bool + ) -> "UpdateColumnNullability": + if isinstance(field_name, str): + return UpdateColumnNullability([field_name], new_nullability) + else: + return UpdateColumnNullability(field_name, new_nullability) + + @staticmethod + def update_column_comment(field_name: Union[str, List[str]], comment: str) -> "UpdateColumnComment": + if isinstance(field_name, str): + return UpdateColumnComment([field_name], comment) + else: + return UpdateColumnComment(field_name, comment) + + @staticmethod + def update_column_default_value(field_names: List[str], default_value: str) -> "UpdateColumnDefaultValue": + return UpdateColumnDefaultValue(field_names, default_value) + + @staticmethod + def update_column_position(move: "Move") -> "UpdateColumnPosition": + return UpdateColumnPosition(move) + + +@dataclass +class SetOption(SchemaChange): + FIELD_KEY = "key" + FIELD_VALUE = "value" + key: str = json_field(FIELD_KEY) + value: str = json_field(FIELD_VALUE) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.SET_OPTION_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.SET_OPTION_ACTION + + +@dataclass +class RemoveOption(SchemaChange): + FIELD_KEY = "key" + key: str = json_field(FIELD_KEY) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.REMOVE_OPTION_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.REMOVE_OPTION_ACTION + + +@dataclass +class UpdateComment(SchemaChange): + FIELD_COMMENT = "comment" + comment: Optional[str] = json_field(FIELD_COMMENT) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.UPDATE_COMMENT_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.UPDATE_COMMENT_ACTION + + +@dataclass +class AddColumn(SchemaChange): + FIELD_FIELD_NAMES = "fieldNames" + FIELD_DATA_TYPE = "dataType" + FIELD_COMMENT = "comment" + FIELD_MOVE = "move" + field_names: List[str] = json_field(FIELD_FIELD_NAMES) + data_type: DataType = json_field(FIELD_DATA_TYPE) + comment: Optional[str] = json_field(FIELD_COMMENT) + move: Optional["Move"] = json_field(FIELD_MOVE) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.ADD_COLUMN_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.ADD_COLUMN_ACTION + + +@dataclass +class RenameColumn(SchemaChange): + FIELD_FIELD_NAMES = "fieldNames" + FIELD_NEW_NAME = "newName" + field_names: List[str] = json_field(FIELD_FIELD_NAMES) + new_name: str = json_field(FIELD_NEW_NAME) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.RENAME_COLUMN_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.RENAME_COLUMN_ACTION + + +@dataclass +class DropColumn(SchemaChange): + FIELD_FIELD_NAMES = "fieldNames" + field_names: List[str] = json_field(FIELD_FIELD_NAMES) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.DROP_COLUMN_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.DROP_COLUMN_ACTION + + +@dataclass +class UpdateColumnType(SchemaChange): + FIELD_FIELD_NAMES = "fieldNames" + FIELD_NEW_DATA_TYPE = "newDataType" + FIELD_KEEP_NULLABILITY = "keepNullability" + field_names: List[str] = json_field(FIELD_FIELD_NAMES) + new_data_type: DataType = json_field(FIELD_NEW_DATA_TYPE) + keep_nullability: bool = json_field(FIELD_KEEP_NULLABILITY) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.UPDATE_COLUMN_TYPE_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.UPDATE_COLUMN_TYPE_ACTION + + +@dataclass +class UpdateColumnNullability(SchemaChange): + FIELD_FIELD_NAMES = "fieldNames" + FIELD_NEW_NULLABILITY = "newNullability" + field_names: List[str] = json_field(FIELD_FIELD_NAMES) + new_nullability: bool = json_field(FIELD_NEW_NULLABILITY) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.UPDATE_COLUMN_NULLABILITY_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.UPDATE_COLUMN_NULLABILITY_ACTION + + +@dataclass +class UpdateColumnComment(SchemaChange): + FIELD_FIELD_NAMES = "fieldNames" + FIELD_NEW_COMMENT = "newComment" + field_names: List[str] = json_field(FIELD_FIELD_NAMES) + new_comment: str = json_field(FIELD_NEW_COMMENT) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.UPDATE_COLUMN_COMMENT_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.UPDATE_COLUMN_COMMENT_ACTION + + +@dataclass +class UpdateColumnDefaultValue(SchemaChange): + FIELD_FIELD_NAMES = "fieldNames" + FIELD_NEW_DEFAULT_VALUE = "newDefaultValue" + field_names: List[str] = json_field(FIELD_FIELD_NAMES) + new_default_value: str = json_field(FIELD_NEW_DEFAULT_VALUE) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION + + +@dataclass +class UpdateColumnPosition(SchemaChange): + FIELD_MOVE = "move" + move: "Move" = json_field(FIELD_MOVE) + action: str = json_field(Actions.FIELD_ACTION, default=Actions.UPDATE_COLUMN_POSITION_ACTION) + + def __post_init__(self): + if not hasattr(self, 'action') or self.action is None: + self.action = Actions.UPDATE_COLUMN_POSITION_ACTION + + +class MoveType(Enum): + FIRST = "FIRST" + AFTER = "AFTER" + BEFORE = "BEFORE" + LAST = "LAST" + + +@dataclass +class Move: + FIELD_FIELD_NAME = "fieldName" + FIELD_REFERENCE_FIELD_NAME = "referenceFieldName" + FIELD_TYPE = "type" + + @staticmethod + def first(field_name: str) -> "Move": + return Move(field_name, None, MoveType.FIRST) + + @staticmethod + def after(field_name: str, reference_field_name: str) -> "Move": + return Move(field_name, reference_field_name, MoveType.AFTER) + + @staticmethod + def before(field_name: str, reference_field_name: str) -> "Move": + return Move(field_name, reference_field_name, MoveType.BEFORE) + + @staticmethod + def last(field_name: str) -> "Move": + return Move(field_name, None, MoveType.LAST) + + field_name: str = json_field(FIELD_FIELD_NAME) + reference_field_name: Optional[str] = json_field(FIELD_REFERENCE_FIELD_NAME) + type: MoveType = json_field(FIELD_TYPE) diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index 86f0f00c35b7..c8f007ba0811 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -17,12 +17,187 @@ ################################################################################ from typing import Optional, List +from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException, ColumnNotExistException from pypaimon.common.file_io import FileIO from pypaimon.common.json_util import JSON +from pypaimon.schema.data_types import AtomicInteger, DataField from pypaimon.schema.schema import Schema +from pypaimon.schema.schema_change import ( + AddColumn, DropColumn, RemoveOption, RenameColumn, + SchemaChange, SetOption, UpdateColumnComment, + UpdateColumnNullability, UpdateColumnPosition, + UpdateColumnType, UpdateComment +) from pypaimon.schema.table_schema import TableSchema +def _find_field_index(fields: List[DataField], field_name: str) -> Optional[int]: + for i, field in enumerate(fields): + if field.name == field_name: + return i + return None + + +def _get_rename_mappings(changes: List[SchemaChange]) -> dict: + rename_mappings = {} + for change in changes: + if isinstance(change, RenameColumn) and len(change.field_names) == 1: + rename_mappings[change.field_names[0]] = change.new_name + return rename_mappings + + +def _handle_update_column_comment( + change: UpdateColumnComment, new_fields: List[DataField] +): + field_name = change.field_names[-1] + field_index = _find_field_index(new_fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + field = new_fields[field_index] + new_fields[field_index] = DataField( + field.id, field.name, field.type, change.new_comment, field.default_value + ) + + +def _handle_update_column_nullability( + change: UpdateColumnNullability, new_fields: List[DataField] +): + field_name = change.field_names[-1] + field_index = _find_field_index(new_fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + field = new_fields[field_index] + from pypaimon.schema.data_types import DataTypeParser + field_type_dict = field.type.to_dict() + new_type = DataTypeParser.parse_data_type(field_type_dict) + new_type.nullable = change.new_nullability + new_fields[field_index] = DataField( + field.id, field.name, new_type, field.description, field.default_value + ) + + +def _handle_update_column_type( + change: UpdateColumnType, new_fields: List[DataField] +): + field_name = change.field_names[-1] + field_index = _find_field_index(new_fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + field = new_fields[field_index] + from pypaimon.schema.data_types import DataTypeParser + new_type_dict = change.new_data_type.to_dict() + new_type = DataTypeParser.parse_data_type(new_type_dict) + if change.keep_nullability: + new_type.nullable = field.type.nullable + new_fields[field_index] = DataField( + field.id, field.name, new_type, field.description, field.default_value + ) + + +def _drop_column_validation(schema: 'TableSchema', change: DropColumn): + if len(change.field_names) > 1: + return + column_to_drop = change.field_names[0] + if column_to_drop in schema.partition_keys or column_to_drop in schema.primary_keys: + raise ValueError( + f"Cannot drop partition key or primary key: [{column_to_drop}]" + ) + + +def _handle_drop_column(change: DropColumn, new_fields: List[DataField]): + field_name = change.field_names[-1] + field_index = _find_field_index(new_fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + new_fields.pop(field_index) + if not new_fields: + raise ValueError("Cannot drop all fields in table") + + +def _assert_not_updating_partition_keys( + schema: 'TableSchema', field_names: List[str], operation: str): + if len(field_names) > 1: + return + field_name = field_names[0] + if field_name in schema.partition_keys: + raise ValueError( + f"Cannot {operation} partition column: [{field_name}]" + ) + + +def _assert_not_updating_primary_keys( + schema: 'TableSchema', field_names: List[str], operation: str): + if len(field_names) > 1: + return + field_name = field_names[0] + if field_name in schema.primary_keys: + raise ValueError(f"Cannot {operation} primary key") + + +def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]): + field_name = change.field_names[-1] + new_name = change.new_name + field_index = _find_field_index(new_fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + if _find_field_index(new_fields, new_name) is not None: + raise ColumnAlreadyExistException(new_name) + field = new_fields[field_index] + new_fields[field_index] = DataField( + field.id, new_name, field.type, field.description, field.default_value + ) + + +def _apply_move(fields: List[DataField], new_field: Optional[DataField], move): + from pypaimon.schema.schema_change import MoveType + + if new_field: + pass + else: + field_name = move.field_name + new_field = None + for i, field in enumerate(fields): + if field.name == field_name: + new_field = fields.pop(i) + break + if new_field is None: + raise ColumnNotExistException(field_name) + + field_map = {field.name: i for i, field in enumerate(fields)} + if move.type == MoveType.FIRST: + fields.insert(0, new_field) + elif move.type == MoveType.AFTER: + if move.reference_field_name not in field_map: + raise ColumnNotExistException(move.reference_field_name) + fields.insert(field_map[move.reference_field_name] + 1, new_field) + elif move.type == MoveType.BEFORE: + if move.reference_field_name not in field_map: + raise ColumnNotExistException(move.reference_field_name) + fields.insert(field_map[move.reference_field_name], new_field) + elif move.type == MoveType.LAST: + fields.append(new_field) + else: + raise ValueError(f"Unsupported move type: {move.type}") + + +def _handle_add_column( + change: AddColumn, new_fields: List[DataField], highest_field_id: AtomicInteger +): + if not change.data_type.nullable: + raise ValueError( + f"Column {'.'.join(change.field_names)} cannot specify NOT NULL in the table." + ) + field_id = highest_field_id.increment_and_get() + field_name = change.field_names[-1] + if _find_field_index(new_fields, field_name) is not None: + raise ColumnAlreadyExistException(field_name) + new_field = DataField(field_id, field_name, change.data_type, change.comment) + if change.move: + _apply_move(new_fields, new_field, change.move) + else: + new_fields.append(new_field) + + class SchemaManager: def __init__(self, file_io: FileIO, table_path: str): @@ -94,3 +269,135 @@ def _list_versioned_files(self) -> List[int]: except ValueError: continue return versions + + def commit_changes(self, changes: List[SchemaChange]) -> TableSchema: + while True: + old_table_schema = self.latest() + if old_table_schema is None: + raise RuntimeError( + f"Table schema does not exist at path: {self.table_path}. " + "This may happen if the table was deleted concurrently." + ) + + new_table_schema = self._generate_table_schema(old_table_schema, changes) + try: + success = self.commit(new_table_schema) + if success: + return new_table_schema + except Exception as e: + raise RuntimeError(f"Failed to commit schema changes: {e}") from e + + def _generate_table_schema( + self, old_table_schema: TableSchema, changes: List[SchemaChange] + ) -> TableSchema: + new_options = dict(old_table_schema.options) + new_fields = [] + for field in old_table_schema.fields: + from pypaimon.schema.data_types import DataTypeParser + field_type_dict = field.type.to_dict() + copied_type = DataTypeParser.parse_data_type(field_type_dict) + new_fields.append(DataField( + field.id, + field.name, + copied_type, + field.description, + field.default_value + )) + highest_field_id = AtomicInteger(old_table_schema.highest_field_id) + new_comment = old_table_schema.comment + + for change in changes: + if isinstance(change, SetOption): + new_options[change.key] = change.value + elif isinstance(change, RemoveOption): + new_options.pop(change.key, None) + elif isinstance(change, UpdateComment): + new_comment = change.comment + elif isinstance(change, AddColumn): + _handle_add_column(change, new_fields, highest_field_id) + elif isinstance(change, RenameColumn): + _assert_not_updating_partition_keys( + old_table_schema, change.field_names, "rename" + ) + _handle_rename_column(change, new_fields) + elif isinstance(change, DropColumn): + _drop_column_validation(old_table_schema, change) + _handle_drop_column(change, new_fields) + elif isinstance(change, UpdateColumnType): + _assert_not_updating_partition_keys( + old_table_schema, change.field_names, "update" + ) + _assert_not_updating_primary_keys( + old_table_schema, change.field_names, "update" + ) + _handle_update_column_type(change, new_fields) + elif isinstance(change, UpdateColumnNullability): + if change.new_nullability: + _assert_not_updating_primary_keys( + old_table_schema, change.field_names, "change nullability of" + ) + _handle_update_column_nullability(change, new_fields) + elif isinstance(change, UpdateColumnComment): + _handle_update_column_comment(change, new_fields) + elif isinstance(change, UpdateColumnPosition): + _apply_move(new_fields, None, change.move) + else: + raise NotImplementedError(f"Unsupported change: {type(change)}") + + rename_mappings = _get_rename_mappings(changes) + new_primary_keys = SchemaManager._apply_not_nested_column_rename( + old_table_schema.primary_keys, rename_mappings + ) + new_options = SchemaManager._apply_rename_columns_to_options(new_options, rename_mappings) + + new_schema = Schema( + fields=new_fields, + partition_keys=old_table_schema.partition_keys, + primary_keys=new_primary_keys, + options=new_options, + comment=new_comment + ) + + return TableSchema( + version=old_table_schema.version, + id=old_table_schema.id + 1, + fields=new_schema.fields, + highest_field_id=highest_field_id.get(), + partition_keys=new_schema.partition_keys, + primary_keys=new_schema.primary_keys, + options=new_schema.options, + comment=new_schema.comment + ) + + @staticmethod + def _apply_not_nested_column_rename( + columns: List[str], rename_mappings: dict + ) -> List[str]: + return [rename_mappings.get(col, col) for col in columns] + + @staticmethod + def _apply_rename_columns_to_options( + options: dict, rename_mappings: dict + ) -> dict: + if not rename_mappings: + return options + new_options = dict(options) + from pypaimon.common.options.core_options import CoreOptions + + bucket_key = CoreOptions.BUCKET_KEY.key() + if bucket_key in new_options: + bucket_columns = new_options[bucket_key].split(",") + new_bucket_columns = SchemaManager._apply_not_nested_column_rename( + bucket_columns, rename_mappings + ) + new_options[bucket_key] = ",".join(new_bucket_columns) + + sequence_field = "sequence.field" + if sequence_field in new_options: + sequence_fields = new_options[sequence_field].split(",") + new_sequence_fields = SchemaManager._apply_not_nested_column_rename( + sequence_fields, rename_mappings + ) + new_options[sequence_field] = ",".join(new_sequence_fields) + + return new_options diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py index 530e33aa78ee..02c7102ab817 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -25,6 +25,7 @@ TableAlreadyExistException, TableNotExistException) from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.schema.schema_change import SchemaChange from pypaimon.table.file_store_table import FileStoreTable @@ -85,3 +86,88 @@ def test_table(self): self.assertEqual(table.fields[2].name, "f2") self.assertTrue(isinstance(table.fields[2].type, AtomicType)) self.assertEqual(table.fields[2].type.type, "STRING") + + def test_alter_table(self): + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db", False) + + identifier = "test_db.test_table" + schema = Schema( + fields=[ + DataField.from_dict({"id": 0, "name": "col1", "type": "STRING", "description": "field1"}), + DataField.from_dict({"id": 1, "name": "col2", "type": "STRING", "description": "field2"}) + ], + partition_keys=[], + primary_keys=[], + options={}, + comment="comment" + ) + catalog.create_table(identifier, schema, False) + + catalog.alter_table( + identifier, + [SchemaChange.add_column("col3", AtomicType("DATE"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 3) + self.assertEqual(table.fields[2].name, "col3") + self.assertEqual(table.fields[2].type.type, "DATE") + + catalog.alter_table( + identifier, + [SchemaChange.update_comment("new comment")], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.table_schema.comment, "new comment") + + catalog.alter_table( + identifier, + [SchemaChange.rename_column("col1", "new_col1")], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.fields[0].name, "new_col1") + + catalog.alter_table( + identifier, + [SchemaChange.update_column_type("col2", AtomicType("BIGINT"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.fields[1].type.type, "BIGINT") + + catalog.alter_table( + identifier, + [SchemaChange.update_column_comment("col2", "col2 field")], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.fields[1].description, "col2 field") + + catalog.alter_table( + identifier, + [SchemaChange.set_option("write-buffer-size", "256 MB")], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.table_schema.options.get("write-buffer-size"), "256 MB") + + catalog.alter_table( + identifier, + [SchemaChange.remove_option("write-buffer-size")], + False + ) + table = catalog.get_table(identifier) + self.assertNotIn("write-buffer-size", table.table_schema.options) + + catalog.alter_table( + identifier, + [SchemaChange.drop_column("col3")], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 2) diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py b/paimon-python/pypaimon/tests/rest/rest_server.py index b97d015dbeef..e0bedeac1bb9 100755 --- a/paimon-python/pypaimon/tests/rest/rest_server.py +++ b/paimon-python/pypaimon/tests/rest/rest_server.py @@ -27,7 +27,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union from urllib.parse import urlparse -from pypaimon.api.api_request import (CreateDatabaseRequest, +from pypaimon.api.api_request import (AlterTableRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest) from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse, GetTableResponse, ListDatabasesResponse, @@ -44,6 +44,8 @@ from pypaimon.common.identifier import Identifier from pypaimon.common.json_util import JSON from pypaimon import Schema +from pypaimon.schema.schema_change import Actions, SchemaChange +from pypaimon.schema.schema_manager import SchemaManager from pypaimon.schema.table_schema import TableSchema @@ -91,6 +93,105 @@ class ErrorResponse(RESTResponse): OBJECT_TABLE = "OBJECT_TABLE" +def _dict_to_schema_change(change_dict: dict) -> SchemaChange: + from pypaimon.schema.schema_change import ( + SetOption, RemoveOption, UpdateComment, AddColumn, RenameColumn, + DropColumn, UpdateColumnType, UpdateColumnNullability, + UpdateColumnComment, UpdateColumnDefaultValue, UpdateColumnPosition, Move, MoveType + ) + + action = change_dict.get(Actions.FIELD_ACTION) + if action == Actions.SET_OPTION_ACTION: + return SetOption(key=change_dict["key"], value=change_dict["value"]) + elif action == Actions.REMOVE_OPTION_ACTION: + return RemoveOption(key=change_dict["key"]) + elif action == Actions.UPDATE_COMMENT_ACTION: + return UpdateComment(comment=change_dict.get("comment")) + elif action == Actions.ADD_COLUMN_ACTION: + from pypaimon.schema.data_types import DataTypeParser + data_type_value = change_dict.get("dataType") or change_dict.get(AddColumn.FIELD_DATA_TYPE) + if data_type_value is None: + raise ValueError(f"Missing dataType field in AddColumn change: {change_dict}") + data_type = DataTypeParser.parse_data_type(data_type_value) + move = None + if "move" in change_dict and change_dict["move"] is not None: + move_dict = change_dict["move"] + if isinstance(move_dict, dict): + move_type_str = move_dict.get("type") or move_dict.get(Move.FIELD_TYPE) + if move_type_str is None: + raise ValueError(f"Missing type field in Move: {move_dict}") + move_type = MoveType(move_type_str) + field_name = move_dict.get("fieldName") or move_dict.get(Move.FIELD_FIELD_NAME) + if field_name is None: + raise ValueError(f"Missing fieldName field in Move: {move_dict}") + reference_field = ( + move_dict.get("referenceFieldName") or + move_dict.get(Move.FIELD_REFERENCE_FIELD_NAME) + ) + move = Move( + field_name=field_name, + reference_field_name=reference_field, + type=move_type + ) + field_names = change_dict.get("fieldNames") or change_dict.get(AddColumn.FIELD_FIELD_NAMES) + if field_names is None: + raise ValueError(f"Missing fieldNames field in AddColumn change: {change_dict}") + return AddColumn( + field_names=field_names, + data_type=data_type, + comment=change_dict.get("comment") or change_dict.get(AddColumn.FIELD_COMMENT), + move=move + ) + elif action == Actions.RENAME_COLUMN_ACTION: + return RenameColumn(field_names=change_dict["fieldNames"], new_name=change_dict["newName"]) + elif action == Actions.DROP_COLUMN_ACTION: + return DropColumn(field_names=change_dict["fieldNames"]) + elif action == Actions.UPDATE_COLUMN_TYPE_ACTION: + from pypaimon.schema.data_types import DataTypeParser + new_type = DataTypeParser.parse_data_type(change_dict["newDataType"]) + return UpdateColumnType( + field_names=change_dict["fieldNames"], + new_data_type=new_type, + keep_nullability=change_dict.get("keepNullability", False) + ) + elif action == Actions.UPDATE_COLUMN_NULLABILITY_ACTION: + return UpdateColumnNullability( + field_names=change_dict["fieldNames"], + new_nullability=change_dict["newNullability"] + ) + elif action == Actions.UPDATE_COLUMN_COMMENT_ACTION: + return UpdateColumnComment( + field_names=change_dict["fieldNames"], + new_comment=change_dict.get("newComment") + ) + elif action == Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION: + return UpdateColumnDefaultValue( + field_names=change_dict["fieldNames"], + new_default_value=change_dict["newDefaultValue"] + ) + elif action == Actions.UPDATE_COLUMN_POSITION_ACTION: + move_dict = change_dict.get("move") or change_dict.get(UpdateColumnPosition.FIELD_MOVE) + if move_dict is None: + raise ValueError(f"Missing move field in UpdateColumnPosition change: {change_dict}") + if not isinstance(move_dict, dict): + raise ValueError(f"move field must be a dict in UpdateColumnPosition change: {change_dict}") + move_type_str = move_dict.get("type") or move_dict.get(Move.FIELD_TYPE) + if move_type_str is None: + raise ValueError(f"Missing type field in Move: {move_dict}") + move_type = MoveType(move_type_str) + field_name = move_dict.get("fieldName") or move_dict.get(Move.FIELD_FIELD_NAME) + if field_name is None: + raise ValueError(f"Missing fieldName field in Move: {move_dict}") + move = Move( + field_name=field_name, + reference_field_name=move_dict.get("referenceFieldName") or move_dict.get(Move.FIELD_REFERENCE_FIELD_NAME), + type=move_type + ) + return UpdateColumnPosition(move=move) + else: + raise ValueError(f"Unknown schema change action: {action}") + + class RESTCatalogServer: """Mock REST server for testing""" @@ -453,13 +554,11 @@ def _table_handle(self, method: str, data: str, identifier: Identifier) -> Tuple schema = table_metadata.schema.to_schema() response = self.mock_table(identifier, table_metadata, table_path, schema) return self._mock_response(response, 200) - # - # elif method == "POST": - # # Alter table - # request_body = JSON.from_json(data, AlterTableRequest) - # self._alter_table_impl(identifier, request_body.get_changes()) - # return self._mock_response("", 200) - + elif method == "POST": + # Alter table + request_body = JSON.from_json(data, AlterTableRequest) + self._alter_table_impl(identifier, request_body.changes) + return self._mock_response("", 200) elif method == "DELETE": # Drop table if identifier.get_full_name() not in self.table_metadata_store: @@ -665,6 +764,44 @@ def _sql_pattern_to_regex(self, pattern: str) -> str: return '^' + ''.join(regex) + '$' + def _alter_table_impl(self, identifier: Identifier, changes: List) -> None: + if identifier.get_full_name() not in self.table_metadata_store: + raise TableNotExistException(identifier) + + schema_changes = [] + for change in changes: + if isinstance(change, dict): + try: + schema_changes.append(_dict_to_schema_change(change)) + except (KeyError, TypeError) as e: + raise ValueError(f"Failed to convert change dict to SchemaChange: {change}, error: {e}") from e + else: + schema_changes.append(change) + + table_metadata = self.table_metadata_store[identifier.get_full_name()] + + table_path = ( + Path(self.data_path) / self.warehouse / + identifier.get_database_name() / identifier.get_object_name() + ) + schema_manager = SchemaManager(self._get_file_io(), str(table_path)) + new_schema = schema_manager.commit_changes(schema_changes) + + updated_metadata = TableMetadata( + schema=new_schema, + is_external=table_metadata.is_external, + uuid=table_metadata.uuid + ) + self.table_metadata_store[identifier.get_full_name()] = updated_metadata + + def _get_file_io(self): + """Get FileIO instance for SchemaManager""" + from pypaimon.common.file_io import FileIO + from pypaimon.common.options import Options + warehouse_path = str(Path(self.data_path) / self.warehouse) + options = Options({"warehouse": warehouse_path}) + return FileIO(warehouse_path, options) + def _create_table_metadata(self, identifier: Identifier, schema_id: int, schema: Schema, uuid_str: str, is_external: bool) -> TableMetadata: """Create table metadata""" diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py b/paimon-python/pypaimon/tests/rest/rest_simple_test.py index c5be4ad473be..2adce404961d 100644 --- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py @@ -21,6 +21,8 @@ from pypaimon import Schema from pypaimon.catalog.catalog_exception import DatabaseAlreadyExistException, TableAlreadyExistException, \ DatabaseNotExistException, TableNotExistException +from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.schema.schema_change import SchemaChange from pypaimon.tests.rest.rest_base_test import RESTBaseTest from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor, DynamicBucketRowKeyExtractor, \ UnawareBucketRowKeyExtractor @@ -710,3 +712,91 @@ def test_create_drop_database_table(self): self.rest_catalog.drop_database("db1", True) except DatabaseNotExistException: self.fail("drop_database with ignore_if_not_exists=True should not raise DatabaseNotExistException") + + def test_alter_table(self): + catalog = self.rest_catalog + catalog.create_database("test_db_alter", True) + + identifier = "test_db_alter.test_table" + schema = Schema( + fields=[ + DataField.from_dict({"id": 0, "name": "col1", "type": "STRING", "description": "field1"}), + DataField.from_dict({"id": 1, "name": "col2", "type": "STRING", "description": "field2"}) + ], + partition_keys=[], + primary_keys=[], + options={}, + comment="comment" + ) + catalog.create_table(identifier, schema, False) + + catalog.alter_table( + identifier, + [SchemaChange.add_column("col3", AtomicType("DATE"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(len(table.fields), 3) + self.assertEqual(table.fields[2].name, "col3") + self.assertEqual(table.fields[2].type.type, "DATE") + + catalog.alter_table( + identifier, + [SchemaChange.update_comment("new comment")], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.table_schema.comment, "new comment") + + catalog.alter_table( + identifier, + [SchemaChange.rename_column("col1", "new_col1")], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.fields[0].name, "new_col1") + + catalog.alter_table( + identifier, + [SchemaChange.update_column_type("col2", AtomicType("BIGINT"))], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.fields[1].type.type, "BIGINT") + + catalog.alter_table( + identifier, + [SchemaChange.update_column_comment("col2", "col2 field")], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.fields[1].description, "col2 field") + + catalog.alter_table( + identifier, + [SchemaChange.set_option("write-buffer-size", "256 MB")], + False + ) + table = catalog.get_table(identifier) + self.assertEqual(table.table_schema.options.get("write-buffer-size"), "256 MB") + + catalog.alter_table( + identifier, + [SchemaChange.remove_option("write-buffer-size")], + False + ) + table = catalog.get_table(identifier) + self.assertNotIn("write-buffer-size", table.table_schema.options) + + with self.assertRaises(TableNotExistException): + catalog.alter_table( + "test_db_alter.non_existing_table", + [SchemaChange.add_column("col2", AtomicType("INT"))], + False + ) + + catalog.alter_table( + "test_db_alter.non_existing_table", + [SchemaChange.add_column("col2", AtomicType("INT"))], + True + )