Skip to content

Commit

Permalink
feat: connect hive by meta store
Browse files Browse the repository at this point in the history
  • Loading branch information
awang12345 committed Aug 16, 2024
1 parent eaf7b90 commit f1a2708
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 54 deletions.
6 changes: 2 additions & 4 deletions nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

# Hive related config
hive: {
#[Optional] spark and hive require configuration on different clusters. Read and write connect hive with this metastore
metaStoreUris: "thrift://hive-metastore-server:9083"
# algo's data source from hive
read: {
#[Optional] spark and hive require configuration on different clusters
metaStoreUris: "thrift://hive-metastore-server-01:9083"
#spark sql
sql: "select column_1,column_2,column_3 from database_01.table_01 "
#[Optional] graph source vid mapping with column of sql result.
Expand All @@ -37,8 +37,6 @@

# algo result sink into hive
write: {
#[Optional] spark and hive require configuration on different clusters
metaStoreUris: "thrift://hive-metastore-server-02:9083"
#save result to hive table
dbTableName: "database_02.table_02"
#[Optional] spark dataframe save mode,optional of Append,Overwrite,ErrorIfExists,Ignore. Default is Overwrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,33 @@ object LocalConfigEntry {

object HiveConfigEntry {
def apply(config: Config): HiveConfigEntry = {
//执行SQL
val sql: String = getOrElse(config,"hive.read.sql","")
//起点ID字段名称
val srcIdCol: String = getOrElse(config,"hive.read.srcId","")
//目标ID字段名称
val dstIdCol: String = getOrElse(config,"hive.read.dstId","")
//权重字段名称
val weightCol: String = getOrElse(config,"hive.read.weight","")
//hive元数据地址
val readMetaStoreUris: String = getOrElse(config,"hive.read.metaStoreUris","")
val readConfigEntry = HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol, readMetaStoreUris)

//写入hive表名:db.table
val dbTableName: String = getOrElse(config,"hive.write.dbTableName","")
//保存模式,见spark中的saveMode
val saveMode: String = getOrElse(config,"hive.write.saveMode","")
//是否自动建表
val autoCreateTable: Boolean = getOrElse(config,"hive.write.autoCreateTable",true)
//hive元数据地址
val writeMetaStoreUris: String = getOrElse(config,"hive.write.metaStoreUris","")
//执行结果和表字段映射关系,比如将算法结果中的_id映射为user_id
//uri of hive metastore. eg: thrift://127.0.0.1:9083
val hiveMetaStoreUris: String = getOrElse(config, "hive.metaStoreUris", "")
val readConfigEntry = buildReadConfig(config)
val writeConfigEntry = buildWriteConfig(config)
HiveConfigEntry(hiveMetaStoreUris,readConfigEntry, writeConfigEntry)
}

def buildReadConfig(config: Config): HiveReadConfigEntry = {
//source data of spark sql
val sql: String = getOrElse(config, "hive.read.sql", "")
//the source vertex ID is mapped with the SQL result column name
val srcIdCol: String = getOrElse(config, "hive.read.srcId", "")
//the dest vertex ID is mapped with the SQL result column name
val dstIdCol: String = getOrElse(config, "hive.read.dstId", "")
//the weight is mapped with the SQL result column name
val weightCol: String = getOrElse(config, "hive.read.weight", "")
HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol)
}

def buildWriteConfig(config: Config): HiveWriteConfigEntry = {
//algo result save to hive table
val dbTableName: String = getOrElse(config, "hive.write.dbTableName", "")
//save mode of spark
val saveMode: String = getOrElse(config, "hive.write.saveMode", "")
//Whether the table is automatically created
val autoCreateTable: Boolean = getOrElse(config, "hive.write.autoCreateTable", true)
//algo results dataframe column and hive table column mapping relationships
val resultColumnMapping = mutable.Map[String, String]()
val mappingKey = "hive.write.resultTableColumnMapping"
if (config.hasPath(mappingKey)) {
Expand All @@ -164,10 +170,9 @@ object HiveConfigEntry {
resultColumnMapping += subkey -> value
}
}
val writeConfigEntry = HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping, writeMetaStoreUris)

HiveConfigEntry(readConfigEntry, writeConfigEntry)
HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping)
}

}

/**
Expand Down Expand Up @@ -214,32 +219,31 @@ case class LocalConfigEntry(filePath: String,
}
}

case class HiveConfigEntry(hiveReadConfigEntry:HiveReadConfigEntry,
hiveWriteConfigEntry:HiveWriteConfigEntry) {
case class HiveConfigEntry(hiveMetaStoreUris: String,
hiveReadConfigEntry: HiveReadConfigEntry,
hiveWriteConfigEntry: HiveWriteConfigEntry) {
override def toString: String = {
s"HiveConfigEntry: {read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}"
s"HiveConfigEntry: {hiveMetaStoreUris:$hiveMetaStoreUris, read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}"
}
}

case class HiveReadConfigEntry(sql: String,
srcIdCol: String = "srcId",
dstIdCol: String = "dstId",
weightCol: String,
metaStoreUris: String) {
weightCol: String) {
override def toString: String = {
s"HiveReadConfigEntry: {sql: $sql, srcIdCol: $srcIdCol, dstIdCol: $dstIdCol, " +
s"weightCol:$weightCol, metaStoreUris:$metaStoreUris}"
s"weightCol:$weightCol}"
}
}

case class HiveWriteConfigEntry(dbTableName: String,
saveMode: String,
autoCreateTable: Boolean,
resultColumnMapping: mutable.Map[String, String],
metaStoreUris: String) {
resultColumnMapping: mutable.Map[String, String]) {
override def toString: String = {
s"HiveWriteConfigEntry: {dbTableName: $dbTableName, saveMode=$saveMode, " +
s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping, metaStoreUris=$metaStoreUris}"
s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping}"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,28 @@ object SparkConfig {
session.config(key, value)
}

val dataSource = configs.dataSourceSinkEntry
if (dataSource.source.equals(ReaderType.hive.stringify)
|| dataSource.sink.equals(WriterType.hive.stringify)) {
session.enableHiveSupport()
}
// set hive config
setHiveConfig(session, configs)

val partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0")
val spark = session.getOrCreate()
validate(spark.version, "2.4.*")
SparkConfig(spark, partitionNum.toInt)
}

private def setHiveConfig(session: org.apache.spark.sql.SparkSession.Builder, configs: Configs): Unit = {
val dataSource = configs.dataSourceSinkEntry
if (dataSource.source.equals(ReaderType.hive.stringify)
|| dataSource.sink.equals(WriterType.hive.stringify)) {
session.enableHiveSupport()
val uris = configs.hiveConfigEntry.hiveMetaStoreUris
if (uris != null && uris.trim.nonEmpty) {
session.config("hive.metastore.schema.verification", false)
session.config("hive.metastore.uris", uris)
}
}
}

private def validate(sparkVersion: String, supportedVersions: String*): Unit = {
if (sparkVersion != "UNKNOWN" && !supportedVersions.exists(sparkVersion.matches)) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,6 @@ final class HiveReader extends DataReader {
val dstIdCol = readConfig.dstIdCol
val weightCol = readConfig.weightCol

println(s"""hiveDataReader, srcIdCol:$srcIdCol, dstIdCol:$dstIdCol, weightCol:$weightCol""")

if (readConfig.metaStoreUris != null && readConfig.metaStoreUris.trim.nonEmpty) {
spark.conf.set("hive.metastore.schema.verification", false)
spark.conf.set("hive.metastore.uris", readConfig.metaStoreUris)
}

var data = spark.sql(sql)

if (srcIdCol != null && dstIdCol != null && srcIdCol.trim.nonEmpty && dstIdCol.trim.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ final class HiveWriter extends AlgoWriter {
_data = _data.withColumnRenamed(from, to)
}

if (config.metaStoreUris != null && config.metaStoreUris.trim.nonEmpty) {
spark.conf.set("hive.metastore.schema.verification", false)
spark.conf.set("hive.metastore.uris", config.metaStoreUris)
}

if(config.autoCreateTable){
val createTableStatement = generateCreateTableStatement(_data, config.dbTableName)
println(s"execute create hive table statement:${createTableStatement}")
Expand Down

0 comments on commit f1a2708

Please sign in to comment.