diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5a9f54a --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +/.gradle +/.idea +/out +/build +*.iml +*.ipr +*.iws + +/drivers +/profiles.json +/extra diff --git a/README.md b/README.md new file mode 100644 index 0000000..cbf7d1f --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# Tellery Connectors - Community Supported + +Here comes the community-supported Tellery connectors diff --git a/hive/.gitignore b/hive/.gitignore new file mode 100644 index 0000000..641fbfa --- /dev/null +++ b/hive/.gitignore @@ -0,0 +1,9 @@ +/.gradle +/.idea +/out +/build +*.iml +*.ipr +*.iws + +/drivers diff --git a/hive/build.gradle.kts b/hive/build.gradle.kts new file mode 100644 index 0000000..74351f9 --- /dev/null +++ b/hive/build.gradle.kts @@ -0,0 +1,57 @@ +import org.jetbrains.kotlin.gradle.tasks.KotlinCompile +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + +group = "io.tellery.connectors" +version = "0.0.1-SNAPSHOT" + +repositories { + mavenLocal() + jcenter() +} + +plugins { + idea + kotlin("jvm") version "1.4.20" + id("com.github.johnrengelman.shadow") version "5.2.0" +} + +dependencies { + implementation("io.tellery:connector-interface:0.0.1-SNAPSHOT") + implementation("org.apache.hive:hive-jdbc:2.1.0") { + exclude(group = "org.slf4j", module = "") + exclude(group = "log4j", module = "log4j") + exclude(group = "org.apache.logging.log4j", module = "") + exclude(group = "io.netty", module = "netty") + exclude(group = "com.google.protobuf", module = "protobuf-java") + } +} + +sourceSets.main { + java.srcDirs("src") + resources.srcDirs("resources") +} + +sourceSets.test { + java.srcDirs("test") + resources.srcDirs("testresources") +} + +idea { + module { + inheritOutputDirs = false + outputDir = file("$buildDir/classes/kotlin/main") + testOutputDir = file("$buildDir/classes/kotlin/test") + } +} + +tasks.withType { + kotlinOptions { + jvmTarget = "1.8" + freeCompilerArgs = freeCompilerArgs + "-Xopt-in=kotlin.RequiresOptIn" + } +} + +tasks.withType { + archiveFileName.set("${project.name}-${project.version}.jar") + isZip64 = true +} diff --git a/hive/gradle.properties b/hive/gradle.properties new file mode 100644 index 0000000..7fc6f1f --- /dev/null +++ b/hive/gradle.properties @@ -0,0 +1 @@ +kotlin.code.style=official diff --git a/hive/gradle/wrapper/gradle-wrapper.jar b/hive/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..e708b1c Binary files /dev/null and b/hive/gradle/wrapper/gradle-wrapper.jar differ diff --git a/hive/gradle/wrapper/gradle-wrapper.properties b/hive/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..da9702f --- /dev/null +++ b/hive/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-6.8-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/hive/gradlew b/hive/gradlew new file mode 100755 index 0000000..4f906e0 --- /dev/null +++ b/hive/gradlew @@ -0,0 +1,185 @@ +#!/usr/bin/env sh + +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=`expr $i + 1` + done + case $i in + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=`save "$@"` + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +exec "$JAVACMD" "$@" diff --git a/hive/gradlew.bat b/hive/gradlew.bat new file mode 100644 index 0000000..ac1b06f --- /dev/null +++ b/hive/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/hive/settings.gradle.kts b/hive/settings.gradle.kts new file mode 100644 index 0000000..8cef264 --- /dev/null +++ b/hive/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name = "hive-connector" \ No newline at end of file diff --git a/hive/src/HiveConnector.kt b/hive/src/HiveConnector.kt new file mode 100644 index 0000000..d414f38 --- /dev/null +++ b/hive/src/HiveConnector.kt @@ -0,0 +1,214 @@ +package io.tellery.connectors.contrib + +import io.tellery.annotations.Connector +import io.tellery.annotations.HandleImport +import io.tellery.annotations.Config +import io.tellery.annotations.Config.ConfigType +import io.tellery.connectors.JDBCConnector +import io.tellery.entities.* +import io.tellery.utils.S3Storage +import io.tellery.utils.queryRemark +import java.sql.Connection +import java.sql.ResultSet +import java.sql.SQLException +import java.sql.Types + + +@Connector( + type="Hive/SparkSQL", + jdbcConfigs = [ + Config(name="Endpoint", type= ConfigType.STRING, description="The endpoint of your hive / sparkSQL thrift server", hint="localhost",required=true), + Config(name="Port", type= ConfigType.NUMBER, description="the port number", hint="10001",required=true), + ], + optionals = [ + Config(name="S3AccessKey", type=ConfigType.STRING, description="S3 Access Key ID(for uploading csv)"), + Config(name="S3SecretKey", type=ConfigType.STRING, description="S3 Secret Access Key (for uploading csv)", secret=true), + Config(name="S3Region", type=ConfigType.STRING, description="S3 region (be the same as your Redshift cluster", hint="us-east-1"), + Config(name="S3Bucket", type=ConfigType.STRING, description="S3 bucket (where uploaded csv stores)", hint="tellery"), + Config(name="S3KeyPrefix", type=ConfigType.STRING, description="S3 key prefix prepends to uploaded csv"), +]) +class HiveConnector : JDBCConnector() { + + override val driverClassName = "org.apache.hive.jdbc.HiveDriver" + override val transactionIsolationLevel = Connection.TRANSACTION_READ_UNCOMMITTED + + override fun buildConnectionStr(profile: Profile): String { + val endpoint = profile.configs["endpoint"] + val port = profile.configs["port"] + return "jdbc:hive2://${endpoint}:${port}" + } + + private var s3Client: S3Storage? = null + + override fun initByProfile(profile: Profile) { + super.initByProfile(profile) + s3Client = S3Storage.buildByOptionals(profile.optionals) + s3Client?.run { + logger.info("{} has initialized s3 client", profile.name) + } + } + + private fun getResultByNames(rs: ResultSet, candidates: Array): String { + for (name in candidates) { + try { + return rs.getString(name) + } catch (ignored: SQLException) { + } + } + throw CustomizedException("Failed to load databases, maybe caused by unsupported version of spark, please create an issue on Github!") + } + + override suspend fun getDatabases(): List { + val remark = mapOf("queryType" to "getDatabase") + val dbNameColCandidates = arrayOf( + "databaseName", // SparkSQL (<=3.0.0) + "database_name", // Hive + "namespace" // SparkSQL (>=3.0.0) + ) + return dbConnection.use { conn -> + conn.createStatement().use { stmt -> + stmt.executeQuery(queryRemark(remark, "SHOW DATABASES")).use { + generateSequence { + if (it.next()) { + getResultByNames(it, dbNameColCandidates) + } else null + }.toList() + } + } + } + } + + override suspend fun getCollections(dbName: String): List { + val remark = mapOf("queryType" to "getCollection") + return dbConnection.use { conn -> + conn.createStatement().use { stmt -> + try { + stmt.executeQuery(queryRemark(remark, "SHOW TABLES FROM `${ + dbName.replace("`", "``") + }`")).use { + generateSequence { + if (it.next()) { + try { + CollectionField(it.getString("tableName"), null) + } catch (ignored: SQLException) { + CollectionField(it.getString("table_name"), null) + } + } else null + }.toList() + } + } catch (e: SQLException) { + if (e.message != null && e.message.toString() + .startsWith("Error running query: org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException") + ) { + emptyList() + } else { + throw e + } + } + } + } + } + + override suspend fun getCollectionSchema( + dbName: String, + collectionName: String, + schemaName: String?, + ): List { + val remark = mapOf("queryType" to "getCollectionSchema") + return dbConnection.use { conn -> + conn.createStatement().use { stmt -> + try { + stmt.executeQuery(queryRemark( + remark, + "DESC `${ + dbName.replace("`", "``") + }`.`${ + collectionName.replace("`", "``") + }`")).use { + generateSequence { + if (it.next()) { + val (colName, dataType) = arrayOf(1, 2).map { colIndex -> it.getString(colIndex) } + if (colName.isNullOrBlank() && dataType.isNullOrBlank()) { + null + } else { + TypeField(colName, hiveTypeToSQLType(dataType)) + } + } else null + }.toList() + } + } catch (e: SQLException) { + if (e.message != null && e.message.toString() + .startsWith("Error running query: org.apache.spark.sql.AnalysisException: Table or view not found") + ) { + emptyList() + } else { + throw e + } + } + } + } + } + + private fun hiveTypeToSQLType(hiveType: String): Int { + return when (hiveType) { + "bigint" -> Types.BIGINT + "boolean" -> Types.BOOLEAN + "double" -> Types.DOUBLE + "double precision" -> Types.DOUBLE + "date" -> Types.DATE + "float" -> Types.FLOAT + "int" -> Types.INTEGER + "real" -> Types.REAL + "smallint" -> Types.SMALLINT + "timestamp" -> Types.TIMESTAMP + "tinyint" -> Types.TINYINT + else -> { + when { + hiveType.startsWith("char") -> Types.CHAR + hiveType.startsWith("varchar") -> Types.VARCHAR + hiveType.startsWith("string") -> Types.VARCHAR + hiveType.startsWith("binary") -> Types.BINARY + hiveType.startsWith("decimal") -> Types.DECIMAL + hiveType.startsWith("struct") -> Types.STRUCT + hiveType.startsWith("array") -> Types.ARRAY + hiveType.startsWith("map") -> Types.STRUCT + else -> Types.OTHER + } + } + } + } + + override fun importSanityCheck(database: String, collection: String, schema: String?) { + super.importSanityCheck(database, collection, schema) + if (this.s3Client == null) { + throw ImportFailureException("Hive Connector must be initialized with s3 config to support importing") + } + } + + @HandleImport("text/csv") + suspend fun importFromCSV(database: String, collection: String, schema: String?, content: ByteArray) { + val filename = "$database/${if (schema != null) "$schema." else ""}$collection.csv" + val s3Path = + this.s3Client!!.uploadFile(filename, content, "text/csv") + dbConnection.use { conn -> + conn.createStatement().use { stmt -> + try { + val sql = """ + |CREATE TABLE $database.$collection + |USING CSV + |OPTIONS( + | path "$s3Path", + | header "true", + | inferSchema "true" + |) + """.trimMargin() + stmt.execute(sql) + } catch (e: Exception) { + logger.error("Error when importing data from s3", e) + stmt.execute("DROP TABLE IF EXISTS $database.$collection") + throw e + } + } + } + } +}