Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ public class AmoroManagementConf {
.defaultValue(1260)
.withDescription("Port that the table service thrift server is bound to.");

public static final ConfigOption<String> TABLE_SERVICE_IMPL =
ConfigOptions.key("table-service.impl")
.stringType()
.defaultValue("default")
.withDescription(
"TableService implementation provider name or FQCN. Default is 'default'.");

public static final ConfigOption<Integer> OPTIMIZING_SERVICE_THRIFT_BIND_PORT =
ConfigOptions.key("thrift-server.optimizing-service.bind-port")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
import org.apache.amoro.server.table.DefaultTableManager;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.TableServiceLoader;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.server.utils.ThriftServiceProxy;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -167,7 +167,7 @@ public void startRestServices() throws Exception {
}

public void startOptimizingService() throws Exception {
tableService = new DefaultTableService(serviceConfig, catalogManager);
tableService = TableServiceLoader.load(serviceConfig, catalogManager);

optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.amoro.server.persistence.TableRuntimeState;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Objects;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -217,7 +216,7 @@ public TableRuntime getRuntime(Long tableId) {
return tableRuntimeMap.get(tableId);
}

@VisibleForTesting
@Override
public void setRuntime(DefaultTableRuntime tableRuntime) {
checkStarted();
tableRuntimeMap.put(tableRuntime.getTableIdentifier().getId(), tableRuntime);
Expand Down Expand Up @@ -245,8 +244,8 @@ public void dispose() {
}
}

@VisibleForTesting
void exploreTableRuntimes() {
@Override
public void exploreTableRuntimes() {
if (!initialized.isDone()) {
throw new IllegalStateException("TableService is not initialized");
}
Expand Down Expand Up @@ -289,7 +288,7 @@ void exploreTableRuntimes() {
LOG.info("Syncing external catalogs took {} ms.", end - start);
}

@VisibleForTesting
@Override
public void exploreExternalCatalog(ExternalCatalog externalCatalog) {
final List<CompletableFuture<Set<TableIdentity>>> tableIdentifiersFutures =
Lists.newArrayList();
Expand Down Expand Up @@ -508,7 +507,7 @@ private void revertTableRuntimeAdded(
}
}

@VisibleForTesting
@Override
public void disposeTable(ServerTableIdentifier tableIdentifier) {
TableRuntime existedTableRuntime = tableRuntimeMap.get(tableIdentifier.getId());
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table;

import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.ExternalCatalog;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.PersistentBase;

public class MasterSlaveTableService extends PersistentBase implements TableService {
private final Configurations serverConfiguration;
private final CatalogManager catalogManager;

public MasterSlaveTableService(Configurations configuration, CatalogManager catalogManager) {
this.serverConfiguration = configuration;
this.catalogManager = catalogManager;
}

@Override
public void initialize() {}

@Override
public void dispose() {}

@Override
public void onTableCreated(InternalCatalog catalog, ServerTableIdentifier identifier) {}

@Override
public void onTableDropped(InternalCatalog catalog, ServerTableIdentifier identifier) {}

@Override
public TableRuntime getRuntime(Long tableId) {
return null;
}

@Override
public AmoroTable<?> loadTable(ServerTableIdentifier identifier) {
return null;
}

@Override
public void exploreTableRuntimes() {}

@Override
public void exploreExternalCatalog(ExternalCatalog externalCatalog) {}

@Override
public void setRuntime(DefaultTableRuntime tableRuntime) {}

@Override
public void disposeTable(ServerTableIdentifier tableIdentifier) {}

@Override
public void addHandlerChain(RuntimeHandlerChain handler) {}

@Override
public void handleTableChanged(
DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {}

@Override
public void handleTableChanged(
DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.catalog.ExternalCatalog;
import org.apache.amoro.server.catalog.InternalCatalog;

public interface TableService extends TableRuntimeHandler {
Expand All @@ -46,4 +47,16 @@ default boolean contains(Long tableId) {
* @return managed table.
*/
AmoroTable<?> loadTable(ServerTableIdentifier identifier);

/** Explore and synchronize table runtimes from catalogs. Intended for periodic sync and tests. */
void exploreTableRuntimes();

/** Explore and synchronize a specific external catalog. */
void exploreExternalCatalog(ExternalCatalog externalCatalog);

/** Set or replace a runtime for testing or recovery scenarios. */
void setRuntime(DefaultTableRuntime tableRuntime);

/** Dispose a managed table and its runtime. */
void disposeTable(ServerTableIdentifier tableIdentifier);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table;

import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.catalog.CatalogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.util.ServiceLoader;

public final class TableServiceLoader {

private static final Logger LOG = LoggerFactory.getLogger(TableServiceLoader.class);

private TableServiceLoader() {}

public static TableService load(Configurations conf, CatalogManager catalogManager) {
String impl = conf.getString(AmoroManagementConf.TABLE_SERVICE_IMPL);

// 1) Try named providers via ServiceLoader
ServiceLoader<TableServiceProvider> loader = ServiceLoader.load(TableServiceProvider.class);
for (TableServiceProvider provider : loader) {
try {
if (provider.name().equalsIgnoreCase(impl)) {
LOG.info("Loading TableService from provider name: {} -> {}", impl, provider.getClass());
return provider.create(conf, catalogManager);
}
} catch (Throwable t) {
LOG.warn("Failed to create TableService from provider {}", provider.getClass(), t);
}
}

// 2) Try FQCN
try {
Class<?> clazz = Class.forName(impl);
if (!TableService.class.isAssignableFrom(clazz)) {
LOG.warn("Configured class {} does not implement TableService, fallback to default.", impl);
} else {
try {
Constructor<?> constructor =
clazz.getConstructor(Configurations.class, CatalogManager.class);
LOG.info("Loading TableService from class: {}", impl);
return (TableService) constructor.newInstance(conf, catalogManager);
} catch (NoSuchMethodException nsme) {
LOG.warn(
"No (Configurations, CatalogManager) constructor for {}, fallback to default.", impl);
}
}
} catch (ClassNotFoundException cnfe) {
LOG.info("Configured TableService impl not found as class: {}. Will fallback.", impl);
} catch (Throwable t) {
LOG.warn("Failed to instantiate TableService impl: {}. Will fallback.", impl, t);
}

// 3) Fallback to 'default' provider
for (TableServiceProvider provider : loader) {
if ("default".equalsIgnoreCase(provider.name())) {
LOG.info("Falling back to default TableService provider: {}", provider.getClass());
return provider.create(conf, catalogManager);
}
}

// 4) Last resort: try DefaultTableService directly (avoid circular deps by FQCN)
LOG.info("Falling back to DefaultTableService directly.");
return new org.apache.amoro.server.table.DefaultTableService(conf, catalogManager);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table;

import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.catalog.CatalogManager;

/**
* SPI provider for {@link TableService}. Implementations should be registered via
* META-INF/services/org.apache.amoro.server.table.TableServiceProvider
*/
public interface TableServiceProvider {

/** Provider name to select by configuration, e.g., "default". */
String name();

/** Create a {@link TableService} instance. */
TableService create(Configurations configuration, CatalogManager catalogManager);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table.spi;

import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.TableServiceProvider;

public class DefaultTableServiceProvider implements TableServiceProvider {

@Override
public String name() {
return "default";
}

@Override
public TableService create(Configurations configuration, CatalogManager catalogManager) {
return new DefaultTableService(configuration, catalogManager);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table.spi;

import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.table.MasterSlaveTableService;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.TableServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** It can be used by configuring the item "table-service.impl=master-slave" */
public class MasterSlaveTableServiceProvider implements TableServiceProvider {

private static final Logger LOG = LoggerFactory.getLogger(MasterSlaveTableServiceProvider.class);

@Override
public String name() {
return "master-slave";
}

@Override
public TableService create(Configurations configuration, CatalogManager catalogManager) {
LOG.info("Creating master-slave TableService implementation");
return new MasterSlaveTableService(configuration, catalogManager);
}
}
Loading