Skip to content

Commit

Permalink
fixup! Add Snowflake JDBC Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 5, 2024
1 parent ae553ee commit 8771370
Show file tree
Hide file tree
Showing 15 changed files with 671 additions and 776 deletions.
27 changes: 20 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ jobs:
&& ! (contains(matrix.modules, 'trino-bigquery') && contains(matrix.profile, 'cloud-tests-2'))
&& ! (contains(matrix.modules, 'trino-redshift') && contains(matrix.profile, 'cloud-tests'))
&& ! (contains(matrix.modules, 'trino-redshift') && contains(matrix.profile, 'fte-tests'))
&& ! (contains(matrix.modules, 'trino-snowflake') && contains(matrix.profile, 'cloud-tests'))
&& ! (contains(matrix.modules, 'trino-filesystem-s3') && contains(matrix.profile, 'cloud-tests'))
&& ! (contains(matrix.modules, 'trino-filesystem-azure') && contains(matrix.profile, 'cloud-tests'))
&& ! (contains(matrix.modules, 'trino-hdfs') && contains(matrix.profile, 'cloud-tests'))
Expand Down Expand Up @@ -655,17 +656,17 @@ jobs:
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests-case-insensitive-mapping -Dbigquery.credentials-key="${BIGQUERY_CASE_INSENSITIVE_CREDENTIALS_KEY}"
- name: Cloud Snowflake Tests
id: tests-snowflake
env:
SNOWFLAKE_URL: ${{ secrets.SNOWFLAKE_URL }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_URL: ${{ vars.SNOWFLAKE_URL }}
SNOWFLAKE_USER: ${{ vars.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }}
SNOWFLAKE_ROLE: ${{ secrets.SNOWFLAKE_ROLE }}
SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }}
if: matrix.modules == 'plugin/trino-snowflake' && !contains(matrix.profile, 'cloud-tests') && (env.SNOWFLAKE_URL != '' && env.SNOWFLAKE_USER != '' && env.SNOWFLAKE_PASSWORD != '')
SNOWFLAKE_DATABASE: ${{ vars.SNOWFLAKE_DATABASE }}
SNOWFLAKE_ROLE: ${{ vars.SNOWFLAKE_ROLE }}
SNOWFLAKE_WAREHOUSE: ${{ vars.SNOWFLAKE_WAREHOUSE }}
if: matrix.modules == 'plugin/trino-snowflake' && contains(matrix.profile, 'cloud-tests') && (env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.SNOWFLAKE_URL != '')
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-snowflake -Pcloud-tests \
-Dconnector.name="snowflake" \
-Dsnowflake.test.server.url="${SNOWFLAKE_URL}" \
-Dsnowflake.test.server.user="${SNOWFLAKE_USER}" \
-Dsnowflake.test.server.password="${SNOWFLAKE_PASSWORD}" \
Expand Down Expand Up @@ -952,6 +953,12 @@ jobs:
DATABRICKS_TOKEN:
GCP_CREDENTIALS_KEY:
GCP_STORAGE_BUCKET:
SNOWFLAKE_URL:
SNOWFLAKE_USER:
SNOWFLAKE_PASSWORD:
SNOWFLAKE_DATABASE:
SNOWFLAKE_ROLE:
SNOWFLAKE_WAREHOUSE:
TESTCONTAINERS_NEVER_PULL: true
run: |
# converts filtered YAML file into JSON
Expand Down Expand Up @@ -1019,6 +1026,12 @@ jobs:
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }}
GCP_STORAGE_BUCKET: ${{ vars.GCP_STORAGE_BUCKET }}
SNOWFLAKE_URL: ${{ vars.SNOWFLAKE_URL }}
SNOWFLAKE_USER: ${{ vars.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
SNOWFLAKE_DATABASE: ${{ vars.SNOWFLAKE_DATABASE }}
SNOWFLAKE_ROLE: ${{ vars.SNOWFLAKE_ROLE }}
SNOWFLAKE_WAREHOUSE: ${{ vars.SNOWFLAKE_WAREHOUSE }}
run: |
exec testing/trino-product-tests-launcher/target/trino-product-tests-launcher-*-executable.jar suite run \
--suite ${{ matrix.suite }} \
Expand Down
21 changes: 10 additions & 11 deletions plugin/trino-snowflake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<air.test.jvm.additional-arguments>--add-opens=java.base/java.nio=ALL-UNNAMED</air.test.jvm.additional-arguments>
</properties>

<dependencies>
Expand All @@ -34,11 +33,6 @@
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-base-jdbc</artifactId>
Expand All @@ -52,7 +46,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.32</version>
<version>3.15.0</version>
</dependency>

<!-- Trino SPI -->
Expand Down Expand Up @@ -92,6 +86,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
<scope>runtime</scope>
</dependency>

<!-- for testing -->
<dependency>
<groupId>io.airlift</groupId>
Expand Down Expand Up @@ -201,7 +201,6 @@
<configuration>
<excludes>
<exclude>**/TestSnowflakeConnectorTest.java</exclude>
<exclude>**/TestSnowflakePlugin.java</exclude>
<exclude>**/TestSnowflakeTypeMapping.java</exclude>
</excludes>
</configuration>
Expand All @@ -216,17 +215,17 @@
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<air.test.jvm.additional-arguments>${air.test.jvm.additional-arguments.default} --add-opens=java.base/java.nio=ALL-UNNAMED</air.test.jvm.additional-arguments>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/TestSnowflakeClient.java</include>
<include>**/TestSnowflakeConfig.java</include>
<include>**/TestSnowflakeConnectorTest.java</include>
<include>**/TestSnowflakePlugin.java</include>
<include>**/TestSnowflakeTypeMapping.java</include>
</includes>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.plugin.base.aggregation.AggregateFunctionRewriter;
import io.trino.plugin.base.aggregation.AggregateFunctionRule;
import io.trino.plugin.base.expression.ConnectorExpressionRewriter;
Expand All @@ -30,11 +29,9 @@
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.ObjectReadFunction;
import io.trino.plugin.jdbc.ObjectWriteFunction;
import io.trino.plugin.jdbc.PredicatePushdownController;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.SliceReadFunction;
import io.trino.plugin.jdbc.SliceWriteFunction;
import io.trino.plugin.jdbc.StandardColumnMappings;
import io.trino.plugin.jdbc.WriteMapping;
Expand Down Expand Up @@ -69,29 +66,20 @@
import java.math.RoundingMode;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.function.BiFunction;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
Expand All @@ -108,7 +96,7 @@ public class SnowflakeClient
All TIME values must be between 00:00:00 and 23:59:59.999999999. TIME internally stores “wallclock” time, and all operations on TIME values are performed without taking any time zone into consideration.
*/
private static final int MAX_SUPPORTED_TEMPORAL_PRECISION = 9;
private static final Logger log = Logger.get(SnowflakeClient.class);

private static final DateTimeFormatter SNOWFLAKE_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("u-MM-dd'T'HH:mm:ss.SSSSSSSSSXXX");
private static final DateTimeFormatter SNOWFLAKE_DATE_FORMATTER = DateTimeFormatter.ofPattern("uuuu-MM-dd");
private static final DateTimeFormatter SNOWFLAKE_TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("u-MM-dd'T'HH:mm:ss.SSSSSSSSS");
Expand All @@ -125,8 +113,6 @@ private interface ColumnMappingFunction
Optional<ColumnMapping> convert(JdbcTypeHandle typeHandle);
}

private static final TimeZone UTC_TZ = TimeZone.getTimeZone(ZoneId.of("UTC"));

@Inject
public SnowflakeClient(
BaseJdbcConfig config,
Expand Down Expand Up @@ -183,9 +169,9 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
}

final Map<String, ColumnMappingFunction> snowflakeColumnMappings = ImmutableMap.<String, ColumnMappingFunction>builder()
.put("time", handle -> { return Optional.of(timeColumnMapping(handle.getRequiredDecimalDigits())); })
.put("date", handle -> { return Optional.of(ColumnMapping.longMapping(DateType.DATE, (resultSet, columnIndex) -> LocalDate.ofEpochDay(resultSet.getLong(columnIndex)).toEpochDay(), snowFlakeDateWriter())); })
.put("varchar", handle -> { return Optional.of(varcharColumnMapping(handle.getRequiredColumnSize())); })
.put("time", handle -> Optional.of(timeColumnMapping(handle.getRequiredDecimalDigits())))
.put("date", handle -> Optional.of(ColumnMapping.longMapping(DateType.DATE, (resultSet, columnIndex) -> LocalDate.ofEpochDay(resultSet.getLong(columnIndex)).toEpochDay(), snowFlakeDateWriter())))
.put("varchar", handle -> Optional.of(varcharColumnMapping(handle.getRequiredColumnSize())))
.put("number", handle -> {
int decimalDigits = handle.getRequiredDecimalDigits();
int precision = handle.getRequiredColumnSize() + Math.max(-decimalDigits, 0);
Expand Down Expand Up @@ -231,9 +217,7 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
}

final Map<String, WriteMappingFunction> snowflakeWriteMappings = ImmutableMap.<String, WriteMappingFunction>builder()
.put("TimeType", writeType -> {
return WriteMapping.longMapping("time", timeWriteFunction(((TimeType) writeType).getPrecision()));
})
.put("TimeType", writeType -> WriteMapping.longMapping("time", timeWriteFunction(((TimeType) writeType).getPrecision())))
.put("ShortTimestampType", writeType -> {
WriteMapping myMap = SnowflakeClient.snowFlakeTimestampWriter(writeType);
return myMap;
Expand Down Expand Up @@ -306,11 +290,6 @@ public void setColumnType(ConnectorSession session, JdbcTableHandle handle, Jdbc
throw new TrinoException(NOT_SUPPORTED, "This connector does not support setting column types");
}

private static SliceReadFunction variantReadFunction()
{
return (resultSet, columnIndex) -> utf8Slice(resultSet.getString(columnIndex).replaceAll("^\"|\"$", ""));
}

private static ColumnMapping columnMappingPushdown(ColumnMapping mapping)
{
if (mapping.getPredicatePushdownController() == PredicatePushdownController.DISABLE_PUSHDOWN) {
Expand Down Expand Up @@ -368,17 +347,6 @@ private static ColumnMapping varcharColumnMapping(int varcharLength)
StandardColumnMappings.varcharWriteFunction());
}

private static ObjectReadFunction longTimestampWithTimezoneReadFunction()
{
return ObjectReadFunction.of(LongTimestampWithTimeZone.class, (resultSet, columnIndex) -> {
ZonedDateTime timestamp = SNOWFLAKE_DATETIME_FORMATTER.parse(resultSet.getString(columnIndex), ZonedDateTime::from);
return LongTimestampWithTimeZone.fromEpochSecondsAndFraction(
timestamp.toEpochSecond(),
(long) timestamp.getNano() * PICOSECONDS_PER_NANOSECOND,
TimeZoneKey.getTimeZoneKey(timestamp.getZone().getId()));
});
}

private static ObjectWriteFunction longTimestampWithTzWriteFunction()
{
return ObjectWriteFunction.of(LongTimestampWithTimeZone.class, (statement, index, value) -> {
Expand Down Expand Up @@ -476,51 +444,4 @@ private static LongWriteFunction timestampWithTimezoneWriteFunction()
statement.setString(index, SNOWFLAKE_DATETIME_FORMATTER.format(instant.atZone(zone)));
};
}

private static ObjectReadFunction longTimestampReader()
{
return ObjectReadFunction.of(LongTimestamp.class, (resultSet, columnIndex) -> {
Calendar calendar = new GregorianCalendar(UTC_TZ, Locale.ENGLISH);
calendar.setTime(new Date(0));
Timestamp ts = resultSet.getTimestamp(columnIndex, calendar);
long epochMillis = ts.getTime();
int nanosInTheSecond = ts.getNanos();
int nanosInTheMilli = nanosInTheSecond % NANOSECONDS_PER_MILLISECOND;
long micro = epochMillis * Timestamps.MICROSECONDS_PER_MILLISECOND + (nanosInTheMilli / Timestamps.NANOSECONDS_PER_MICROSECOND);
int picosOfMicro = nanosInTheMilli % 1000 * 1000;
return new LongTimestamp(micro, picosOfMicro);
});
}

private static ColumnMapping timestampColumnMapping(JdbcTypeHandle typeHandle)
{
int precision = typeHandle.getRequiredDecimalDigits();
String jdbcTypeName = typeHandle.getJdbcTypeName()
.orElseThrow(() -> new TrinoException(JDBC_ERROR, "Type name is missing: " + typeHandle));
int type = typeHandle.getJdbcType();
log.debug("timestampColumnMapping: jdbcTypeName(%s):%s precision:%s", type, jdbcTypeName, precision);

// <= 6 fits into a long
if (precision <= 6) {
return ColumnMapping.longMapping(
TimestampType.createTimestampType(precision),
(resultSet, columnIndex) -> StandardColumnMappings.toTrinoTimestamp(TimestampType.createTimestampType(precision), toLocalDateTime(resultSet, columnIndex)),
timestampWriteFunction());
}

// Too big. Put it in an object
return ColumnMapping.objectMapping(
TimestampType.createTimestampType(precision),
longTimestampReader(),
longTimestampWriter(precision));
}

private static LocalDateTime toLocalDateTime(ResultSet resultSet, int columnIndex)
throws SQLException
{
Calendar calendar = new GregorianCalendar(UTC_TZ, Locale.ENGLISH);
calendar.setTime(new Date(0));
Timestamp ts = resultSet.getTimestamp(columnIndex, calendar);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(ts.getTime()), ZoneOffset.UTC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,10 @@ public ConnectionFactory getConnectionFactory(BaseJdbcConfig baseJdbcConfig, Sno
properties.setProperty("TIMESTAMP_TZ_OUTPUT_FORMAT", "YYYY-MM-DD\"T\"HH24:MI:SS.FF9TZH:TZM");
properties.setProperty("TIMESTAMP_LTZ_OUTPUT_FORMAT", "YYYY-MM-DD\"T\"HH24:MI:SS.FF9TZH:TZM");
properties.setProperty("TIME_OUTPUT_FORMAT", "HH24:MI:SS.FF9");
snowflakeConfig.getTimestampNoTimezoneAsUTC().ifPresent(as_utc -> properties.setProperty("JDBC_TREAT_TIMESTAMP_NTZ_AS_UTC", as_utc ? "true" : "false"));

// Support for Corporate proxies
if (snowflakeConfig.getHTTPProxy().isPresent()) {
String proxy = snowflakeConfig.getHTTPProxy().get();
if (snowflakeConfig.getHttpProxy().isPresent()) {
String proxy = snowflakeConfig.getHttpProxy().get();

URL url = new URL(proxy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.snowflake;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigSecuritySensitive;

import java.util.Optional;

Expand All @@ -23,7 +24,6 @@ public class SnowflakeConfig
private String database;
private String role;
private String warehouse;
private Boolean timestampNoTimezoneAsUTC;
private String httpProxy;

public Optional<String> getAccount()
Expand Down Expand Up @@ -74,18 +74,14 @@ public SnowflakeConfig setWarehouse(String warehouse)
return this;
}

public Optional<Boolean> getTimestampNoTimezoneAsUTC()
{
return Optional.ofNullable(timestampNoTimezoneAsUTC);
}

public Optional<String> getHTTPProxy()
public Optional<String> getHttpProxy()
{
return Optional.ofNullable(httpProxy);
}

@Config("snowflake.http-proxy")
public SnowflakeConfig setHTTPProxy(String httpProxy)
@ConfigSecuritySensitive
public SnowflakeConfig setHttpProxy(String httpProxy)
{
this.httpProxy = httpProxy;
return this;
Expand Down
Loading

0 comments on commit 8771370

Please sign in to comment.