Skip to content

Commit

Permalink
Fix connection pass through preventing functions from working (#112)
Browse files Browse the repository at this point in the history
* Fix connection pass through preventing functions from working

* Add comment
  • Loading branch information
jogrogan authored Feb 24, 2025
1 parent 9b89fbb commit 5c51af5
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 32 deletions.
4 changes: 2 additions & 2 deletions deploy/samples/demodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: ads-database
spec:
schema: ADS
url: jdbc:demodb://ads
url: jdbc:demodb://names=ads
dialect: Calcite

---
Expand All @@ -15,7 +15,7 @@ metadata:
name: profile-database
spec:
schema: PROFILE
url: jdbc:demodb://profile
url: jdbc:demodb://names=profile
dialect: Calcite

---
Expand Down
2 changes: 1 addition & 1 deletion hoptimator
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ $BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli \
-Dorg.slf4j.simpleLogger.showLogName=false \
sqlline.SqlLine \
-ac sqlline.HoptimatorAppConfig \
-u jdbc:hoptimator:// -n "" -p "" -nn "Hoptimator" $@
-u jdbc:hoptimator://fun=mysql -n "" -p "" -nn "Hoptimator" $@

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.calcite.avatica.ConnectStringParser;
import org.apache.calcite.avatica.DriverVersion;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.Driver;
Expand Down Expand Up @@ -37,15 +38,18 @@ public Connection connect(String url, Properties props) throws SQLException {
if (!url.startsWith(getConnectStringPrefix())) {
return null;
}
String params = url.substring(getConnectStringPrefix().length());
Set<String> schemas = Arrays.asList(params.split(","))
Properties properties = new Properties();
properties.putAll(props);
properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())));

Set<String> schemas = Arrays.asList(properties.getProperty("names").split(","))
.stream()
.map(x -> x.trim())
.filter(x -> !x.isEmpty())
.map(x -> x.toUpperCase(Locale.ROOT))
.collect(Collectors.toSet());
try {
Connection connection = super.connect(url, props);
Connection connection = super.connect(url, properties);
if (connection == null) {
throw new IOException("Could not connect to " + url);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;
import org.apache.calcite.server.DdlExecutor;
import org.apache.calcite.server.ServerDdlExecutor;
import org.apache.calcite.sql.SqlCall;
Expand All @@ -65,6 +64,7 @@
import com.linkedin.hoptimator.Database;
import com.linkedin.hoptimator.MaterializedView;
import com.linkedin.hoptimator.Pipeline;
import com.linkedin.hoptimator.jdbc.schema.HoptimatorViewTableMacro;
import com.linkedin.hoptimator.util.DeploymentService;
import com.linkedin.hoptimator.util.planner.PipelineRel;

Expand Down Expand Up @@ -117,8 +117,9 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
List<String> viewPath = new ArrayList<>();
viewPath.addAll(schemaPath);
viewPath.add(viewName);
ViewTableMacro viewTableMacro = ViewTable.viewMacro(schemaPlus, sql, schemaPath, viewPath, false);
ViewTable viewTable = (ViewTable) viewTableMacro.apply(Collections.emptyList());
HoptimatorViewTableMacro viewTableMacro = new HoptimatorViewTableMacro(CalciteSchema.from(schemaPlus),
sql, schemaPath, viewPath, false);
ViewTable viewTable = (ViewTable) viewTableMacro.apply(connectionProperties);
try {
ValidationService.validateOrThrow(viewTable, TranslatableTable.class);
if (create.getReplace()) {
Expand Down Expand Up @@ -174,8 +175,9 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con

// Table does not exist. Create it.
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
ViewTableMacro viewTableMacro = ViewTable.viewMacro(schemaPlus, sql, schemaPath, viewPath, false);
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTableMacro);
HoptimatorViewTableMacro viewTableMacro = new HoptimatorViewTableMacro(CalciteSchema.from(schemaPlus),
sql, schemaPath, viewPath, false);
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTableMacro, connectionProperties);
RelDataType viewRowType = materializedViewTable.getRowType(typeFactory);

// Support "partial views", i.e. CREATE VIEW FOO$BAR, where the view name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.hoptimator.jdbc;

import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
Expand All @@ -11,7 +11,8 @@
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;

import com.linkedin.hoptimator.jdbc.schema.HoptimatorViewTableMacro;


public class MaterializedViewTable extends AbstractTable implements TranslatableTable {
Expand All @@ -22,8 +23,8 @@ public MaterializedViewTable(ViewTable viewTable) {
this.viewTable = viewTable;
}

public MaterializedViewTable(ViewTableMacro viewTableMacro) {
this((ViewTable) viewTableMacro.apply(Collections.emptyList()));
public MaterializedViewTable(HoptimatorViewTableMacro viewTableMacro, Properties connectionProperties) {
this((ViewTable) viewTableMacro.apply(connectionProperties));
}

public ViewTable viewTable() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.linkedin.hoptimator.jdbc.schema;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;

import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.ViewTableMacro;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* This file is copy-pasted from {@link ViewTableMacro} with the only modification being
* how the connection is instantiated.
*/
public class HoptimatorViewTableMacro extends ViewTableMacro {

private final Boolean modifiable;
public HoptimatorViewTableMacro(CalciteSchema schema, String viewSql,
@Nullable List<String> schemaPath, @Nullable List<String> viewPath,
@Nullable Boolean modifiable) {
super(schema, viewSql, schemaPath, viewPath, modifiable);
this.modifiable = modifiable;
}

public TranslatableTable apply(Properties properties) {
CalciteConnection connection;
try {
connection = DriverManager.getConnection("jdbc:calcite:", properties)
.unwrap(CalciteConnection.class);
} catch (SQLException e) {
throw new RuntimeException(e);
}
CalcitePrepare.AnalyzeViewResult parsed =
Schemas.analyzeView(connection, schema, schemaPath, viewSql, viewPath,
modifiable != null && modifiable);
final List<String> schemaPath1 =
schemaPath != null ? schemaPath : schema.path(null);
if ((modifiable == null || modifiable)
&& parsed.modifiable
&& parsed.table != null) {
return modifiableViewTable(parsed, viewSql, schemaPath1, viewPath, schema);
} else {
return viewTable(parsed, viewSql, schemaPath1, viewPath);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,21 @@
public abstract class QuidemTestBase {

protected void run(String resourceName) throws IOException, URISyntaxException {
run(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource(resourceName)).toURI());
run(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource(resourceName)).toURI(), "");
}

protected void run(URI resource) throws IOException {
protected void run(String resourceName, String jdbcProperties) throws IOException, URISyntaxException {
run(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource(resourceName)).toURI(), jdbcProperties);
}

protected void run(URI resource, String jdbcProperties) throws IOException {
File in = new File(resource);
File out = File.createTempFile(in.getName(), ".out");
try (Reader r = new FileReader(in); Writer w = new PrintWriter(out)) {
Quidem.Config config = Quidem.configBuilder()
.withReader(r)
.withWriter(w)
.withConnectionFactory((x, y) -> DriverManager.getConnection("jdbc:hoptimator://catalogs=" + x))
.withConnectionFactory((x, y) -> DriverManager.getConnection("jdbc:hoptimator://catalogs=" + x + jdbcProperties))
.withCommandHandler(new CustomCommandHandler())
.build();
new Quidem(config).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ public void register(Wrapper parentSchema, Properties connectionProperties) thro
K8sMetadata metadata = new K8sMetadata(context);
schemaPlus.add("k8s", metadata);
metadata.databaseTable().addDatabases(schemaPlus, connectionProperties);
metadata.viewTable().addViews(schemaPlus);
metadata.viewTable().addViews(schemaPlus, connectionProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.StringJoiner;
import javax.sql.DataSource;

import org.apache.calcite.adapter.jdbc.JdbcSchema;
Expand Down Expand Up @@ -51,7 +52,7 @@ public K8sDatabaseTable(K8sContext context, K8sEngineTable engines) {
public void addDatabases(SchemaPlus parentSchema, Properties connectionProperties) {
for (Row row : rows()) {
parentSchema.add(schemaName(row),
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row), parentSchema, dialect(row), engines.forDatabase(row.NAME), connectionProperties));
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row, connectionProperties), parentSchema, dialect(row), engines.forDatabase(row.NAME), connectionProperties));
}
}

Expand Down Expand Up @@ -82,9 +83,28 @@ private static String schemaName(Row row) {
}
}

private static DataSource dataSource(Row row) {
// TODO fetch username/password from Secret
return JdbcSchema.dataSource(row.URL, row.DRIVER, "nouser", "nopass");
private static DataSource dataSource(Row row, Properties connectionProperties) {
String user = "nouser";
String pass = "nopass";
StringJoiner joiner = new StringJoiner(";");
for (String key : connectionProperties.stringPropertyNames()) {
if ("user".equals(key)) {
user = connectionProperties.getProperty(key);
} else if ("password".equals(key)) {
pass = connectionProperties.getProperty(key);
} else {
String value = connectionProperties.getProperty(key);
joiner.add(key + "=" + value);
}
}
String joinedUrl = row.URL;
// Handles case where there are no properties already in the URL
if (row.URL.endsWith("//")) {
joinedUrl = joinedUrl + joiner;
} else {
joinedUrl = joinedUrl + ";" + joiner;
}
return JdbcSchema.dataSource(joinedUrl, row.DRIVER, user, pass);
}

private static SqlDialect dialect(Row row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;

import io.kubernetes.client.openapi.models.V1ObjectMeta;

import com.linkedin.hoptimator.Validated;
import com.linkedin.hoptimator.Validator;
import com.linkedin.hoptimator.jdbc.MaterializedViewTable;
import com.linkedin.hoptimator.jdbc.schema.HoptimatorViewTableMacro;
import com.linkedin.hoptimator.k8s.models.V1alpha1View;
import com.linkedin.hoptimator.k8s.models.V1alpha1ViewList;
import com.linkedin.hoptimator.k8s.models.V1alpha1ViewSpec;
Expand Down Expand Up @@ -75,7 +76,7 @@ public K8sViewTable(K8sContext context) {
super(context, K8sApiEndpoints.VIEWS, Row.class);
}

public void addViews(SchemaPlus parentSchema) {
public void addViews(SchemaPlus parentSchema, Properties connectionProperties) {
for (Row row : rows()) {

// build schema path, filling in any missing schemas
Expand All @@ -88,7 +89,7 @@ public void addViews(SchemaPlus parentSchema) {
}
schema = next;
}
schema.add(row.viewName(), makeView(schema, row));
schema.add(row.viewName(), makeView(schema, row, connectionProperties));
}
}

Expand All @@ -107,12 +108,13 @@ public void remove(String name) {
rows().remove(find(name));
}

private Table makeView(SchemaPlus parentSchema, Row row) {
ViewTableMacro viewTableMacro = ViewTable.viewMacro(parentSchema, row.SQL, row.schemaPath(), row.viewPath(), false);
private Table makeView(SchemaPlus parentSchema, Row row, Properties connectionProperties) {
HoptimatorViewTableMacro viewTableMacro = new HoptimatorViewTableMacro(CalciteSchema.from(parentSchema), row.SQL,
row.schemaPath(), row.viewPath(), false);
if (row.MATERIALIZED) {
return new MaterializedViewTable(viewTableMacro);
return new MaterializedViewTable(viewTableMacro, connectionProperties);
} else {
return viewTableMacro.apply(Collections.emptyList());
return viewTableMacro.apply(connectionProperties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@ public class TestSqlScripts extends QuidemTestBase {
public void k8sDdlScript() throws Exception {
run("k8s-ddl.id");
}

@Test
@Tag("integration")
public void k8sDdlScriptFunction() throws Exception {
run("k8s-ddl-function.id", ";fun=mysql");
}
}
Loading

0 comments on commit 5c51af5

Please sign in to comment.