diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index 5fb4d85..0fec307 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -21,10 +21,10 @@ # Hive related config hive: { + #[Optional] spark and hive require configuration on different clusters. Read and write connect hive with this metastore + metaStoreUris: "thrift://hive-metastore-server:9083" # algo's data source from hive read: { - #[Optional] spark and hive require configuration on different clusters - metaStoreUris: "thrift://hive-metastore-server-01:9083" #spark sql sql: "select column_1,column_2,column_3 from database_01.table_01 " #[Optional] graph source vid mapping with column of sql result. @@ -37,8 +37,6 @@ # algo result sink into hive write: { - #[Optional] spark and hive require configuration on different clusters - metaStoreUris: "thrift://hive-metastore-server-02:9083" #save result to hive table dbTableName: "database_02.table_02" #[Optional] spark dataframe save mode,optional of Append,Overwrite,ErrorIfExists,Ignore. Default is Overwrite diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala index b91b35b..6222bc6 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala @@ -133,27 +133,33 @@ object LocalConfigEntry { object HiveConfigEntry { def apply(config: Config): HiveConfigEntry = { - //执行SQL - val sql: String = getOrElse(config,"hive.read.sql","") - //起点ID字段名称 - val srcIdCol: String = getOrElse(config,"hive.read.srcId","") - //目标ID字段名称 - val dstIdCol: String = getOrElse(config,"hive.read.dstId","") - //权重字段名称 - val weightCol: String = getOrElse(config,"hive.read.weight","") - //hive元数据地址 - val readMetaStoreUris: String = getOrElse(config,"hive.read.metaStoreUris","") - val readConfigEntry = HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol, readMetaStoreUris) - - //写入hive表名:db.table - val dbTableName: String = getOrElse(config,"hive.write.dbTableName","") - //保存模式,见spark中的saveMode - val saveMode: String = getOrElse(config,"hive.write.saveMode","") - //是否自动建表 - val autoCreateTable: Boolean = getOrElse(config,"hive.write.autoCreateTable",true) - //hive元数据地址 - val writeMetaStoreUris: String = getOrElse(config,"hive.write.metaStoreUris","") - //执行结果和表字段映射关系,比如将算法结果中的_id映射为user_id + //uri of hive metastore. eg: thrift://127.0.0.1:9083 + val hiveMetaStoreUris: String = getOrElse(config, "hive.metaStoreUris", "") + val readConfigEntry = buildReadConfig(config) + val writeConfigEntry = buildWriteConfig(config) + HiveConfigEntry(hiveMetaStoreUris,readConfigEntry, writeConfigEntry) + } + + def buildReadConfig(config: Config): HiveReadConfigEntry = { + //source data of spark sql + val sql: String = getOrElse(config, "hive.read.sql", "") + //the source vertex ID is mapped with the SQL result column name + val srcIdCol: String = getOrElse(config, "hive.read.srcId", "") + //the dest vertex ID is mapped with the SQL result column name + val dstIdCol: String = getOrElse(config, "hive.read.dstId", "") + //the weight is mapped with the SQL result column name + val weightCol: String = getOrElse(config, "hive.read.weight", "") + HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol) + } + + def buildWriteConfig(config: Config): HiveWriteConfigEntry = { + //algo result save to hive table + val dbTableName: String = getOrElse(config, "hive.write.dbTableName", "") + //save mode of spark + val saveMode: String = getOrElse(config, "hive.write.saveMode", "") + //Whether the table is automatically created + val autoCreateTable: Boolean = getOrElse(config, "hive.write.autoCreateTable", true) + //algo results dataframe column and hive table column mapping relationships val resultColumnMapping = mutable.Map[String, String]() val mappingKey = "hive.write.resultTableColumnMapping" if (config.hasPath(mappingKey)) { @@ -164,10 +170,9 @@ object HiveConfigEntry { resultColumnMapping += subkey -> value } } - val writeConfigEntry = HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping, writeMetaStoreUris) - - HiveConfigEntry(readConfigEntry, writeConfigEntry) + HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping) } + } /** @@ -214,32 +219,31 @@ case class LocalConfigEntry(filePath: String, } } -case class HiveConfigEntry(hiveReadConfigEntry:HiveReadConfigEntry, - hiveWriteConfigEntry:HiveWriteConfigEntry) { +case class HiveConfigEntry(hiveMetaStoreUris: String, + hiveReadConfigEntry: HiveReadConfigEntry, + hiveWriteConfigEntry: HiveWriteConfigEntry) { override def toString: String = { - s"HiveConfigEntry: {read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}" + s"HiveConfigEntry: {hiveMetaStoreUris:$hiveMetaStoreUris, read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}" } } case class HiveReadConfigEntry(sql: String, srcIdCol: String = "srcId", dstIdCol: String = "dstId", - weightCol: String, - metaStoreUris: String) { + weightCol: String) { override def toString: String = { s"HiveReadConfigEntry: {sql: $sql, srcIdCol: $srcIdCol, dstIdCol: $dstIdCol, " + - s"weightCol:$weightCol, metaStoreUris:$metaStoreUris}" + s"weightCol:$weightCol}" } } case class HiveWriteConfigEntry(dbTableName: String, saveMode: String, autoCreateTable: Boolean, - resultColumnMapping: mutable.Map[String, String], - metaStoreUris: String) { + resultColumnMapping: mutable.Map[String, String]) { override def toString: String = { s"HiveWriteConfigEntry: {dbTableName: $dbTableName, saveMode=$saveMode, " + - s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping, metaStoreUris=$metaStoreUris}" + s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping}" } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala index f407a37..86c68b4 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala @@ -23,11 +23,8 @@ object SparkConfig { session.config(key, value) } - val dataSource = configs.dataSourceSinkEntry - if (dataSource.source.equals(ReaderType.hive.stringify) - || dataSource.sink.equals(WriterType.hive.stringify)) { - session.enableHiveSupport() - } + // set hive config + setHiveConfig(session, configs) val partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0") val spark = session.getOrCreate() @@ -35,6 +32,19 @@ object SparkConfig { SparkConfig(spark, partitionNum.toInt) } + private def setHiveConfig(session: org.apache.spark.sql.SparkSession.Builder, configs: Configs): Unit = { + val dataSource = configs.dataSourceSinkEntry + if (dataSource.source.equals(ReaderType.hive.stringify) + || dataSource.sink.equals(WriterType.hive.stringify)) { + session.enableHiveSupport() + val uris = configs.hiveConfigEntry.hiveMetaStoreUris + if (uris != null && uris.trim.nonEmpty) { + session.config("hive.metastore.schema.verification", false) + session.config("hive.metastore.uris", uris) + } + } + } + private def validate(sparkVersion: String, supportedVersions: String*): Unit = { if (sparkVersion != "UNKNOWN" && !supportedVersions.exists(sparkVersion.matches)) { throw new RuntimeException( diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala index 4f61333..c2392f5 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala @@ -190,13 +190,6 @@ final class HiveReader extends DataReader { val dstIdCol = readConfig.dstIdCol val weightCol = readConfig.weightCol - println(s"""hiveDataReader, srcIdCol:$srcIdCol, dstIdCol:$dstIdCol, weightCol:$weightCol""") - - if (readConfig.metaStoreUris != null && readConfig.metaStoreUris.trim.nonEmpty) { - spark.conf.set("hive.metastore.schema.verification", false) - spark.conf.set("hive.metastore.uris", readConfig.metaStoreUris) - } - var data = spark.sql(sql) if (srcIdCol != null && dstIdCol != null && srcIdCol.trim.nonEmpty && dstIdCol.trim.nonEmpty) { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala index 4b6e23f..31f07e0 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala @@ -89,11 +89,6 @@ final class HiveWriter extends AlgoWriter { _data = _data.withColumnRenamed(from, to) } - if (config.metaStoreUris != null && config.metaStoreUris.trim.nonEmpty) { - spark.conf.set("hive.metastore.schema.verification", false) - spark.conf.set("hive.metastore.uris", config.metaStoreUris) - } - if(config.autoCreateTable){ val createTableStatement = generateCreateTableStatement(_data, config.dbTableName) println(s"execute create hive table statement:${createTableStatement}")