Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions paimon-python/pypaimon/api/api_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pypaimon.common.identifier import Identifier
from pypaimon.common.json_util import json_field
from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics

Expand Down Expand Up @@ -76,3 +77,10 @@ class CommitTableRequest(RESTRequest):
table_uuid: Optional[str] = json_field(FIELD_TABLE_UUID)
snapshot: Snapshot = json_field(FIELD_SNAPSHOT)
statistics: List[PartitionStatistics] = json_field(FIELD_STATISTICS)


@dataclass
class AlterTableRequest(RESTRequest):
FIELD_CHANGES = "changes"

changes: List[SchemaChange] = json_field(FIELD_CHANGES)
13 changes: 12 additions & 1 deletion paimon-python/pypaimon/api/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import logging
from typing import Callable, Dict, List, Optional, Union

from pypaimon.api.api_request import (AlterDatabaseRequest, CommitTableRequest,
from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest, CommitTableRequest,
CreateDatabaseRequest,
CreateTableRequest, RenameTableRequest)
from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
Expand Down Expand Up @@ -289,6 +289,17 @@ def rename_table(self, source_identifier: Identifier, target_identifier: Identif
request,
self.rest_auth_function)

def alter_table(self, identifier: Identifier, changes: List):
database_name, table_name = self.__validate_identifier(identifier)
if not changes:
raise ValueError("Changes cannot be empty")

request = AlterTableRequest(changes)
return self.client.post(
self.resource_paths.table(database_name, table_name),
request,
self.rest_auth_function)

def load_table_token(self, identifier: Identifier) -> GetTableTokenResponse:
database_name, table_name = self.__validate_identifier(identifier)

Expand Down
10 changes: 10 additions & 0 deletions paimon-python/pypaimon/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics

Expand Down Expand Up @@ -54,6 +55,15 @@ def get_table(self, identifier: Union[str, Identifier]) -> 'Table':
def create_table(self, identifier: Union[str, Identifier], schema: Schema, ignore_if_exists: bool):
"""Create table with schema."""

@abstractmethod
def alter_table(
self,
identifier: Union[str, Identifier],
changes: List[SchemaChange],
ignore_if_not_exists: bool = False
):
"""Alter table with schema changes."""

def supports_version_management(self) -> bool:
"""
Whether this catalog supports version management for tables.
Expand Down
33 changes: 29 additions & 4 deletions paimon-python/pypaimon/catalog/filesystem_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@

from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_environment import CatalogEnvironment
from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException,
DatabaseNotExistException,
TableAlreadyExistException,
TableNotExistException)
from pypaimon.catalog.catalog_exception import (
DatabaseAlreadyExistException,
DatabaseNotExistException,
TableAlreadyExistException,
TableNotExistException
)
from pypaimon.catalog.database import Database
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
Expand Down Expand Up @@ -115,6 +118,28 @@ def get_table_path(self, identifier: Identifier) -> str:
db_path = self.get_database_path(identifier.get_database_name())
return f"{db_path}/{identifier.get_table_name()}"

def alter_table(
self,
identifier: Union[str, Identifier],
changes: List[SchemaChange],
ignore_if_not_exists: bool = False
):
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
try:
self.get_table(identifier)
except TableNotExistException:
if not ignore_if_not_exists:
raise
return

table_path = self.get_table_path(identifier)
schema_manager = SchemaManager(self.file_io, table_path)
try:
schema_manager.commit_changes(changes)
except Exception as e:
raise RuntimeError(f"Failed to alter table {identifier.get_full_name()}: {e}") from e

def commit_snapshot(
self,
identifier: Identifier,
Expand Down
15 changes: 15 additions & 0 deletions paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.table_schema import TableSchema
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
Expand Down Expand Up @@ -177,6 +178,20 @@ def drop_table(self, identifier: Union[str, Identifier], ignore_if_not_exists: b
if not ignore_if_not_exists:
raise TableNotExistException(identifier) from e

def alter_table(
self,
identifier: Union[str, Identifier],
changes: List[SchemaChange],
ignore_if_not_exists: bool = False
):
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
try:
self.rest_api.alter_table(identifier, changes)
except NoSuchResourceException as e:
if not ignore_if_not_exists:
raise TableNotExistException(identifier) from e

def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
response = self.rest_api.get_table(identifier)
return self.to_table_metadata(identifier.get_database_name(), response)
Expand Down
Loading