Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【有奖征文】linkis与SQL中间件(跨数据源混查)结合实践分享 #9

Open
libailin opened this issue Nov 4, 2020 · 0 comments

Comments

@libailin
Copy link

libailin commented Nov 4, 2020

标题:linkis与SQL中间件(跨数据源混查)结合实践分享

背景需求

-业务需求:

在公司有很多运营、数据分析的童鞋,虽然自有的BI产品功能丰富,有各式各样的定制化分析报表、各种维度的图表。
但是有时需要临时查询数据,发现数据是分布在不同数据源里的,也有可能数据来自不同业务的不同集群。
比如查询hive表的数据,但是维度映射数据在mysql里。还有要两个mysql数据库(不同服务器)关联查询等情况,
平时这些需求都需要程序猿大神们写程序实现。

现在可以通过Linkis与SQL中间件完美结合满足上述需求。难道不香吗?

-技术迭代:

之前使用的Apache Livy作为SQL执行入口,把查询的请求都提交给Livy,但是体验上不尽人意。
一直想找个替代组件, 后来发现功能强大的linkis。至于Linkis跟Apache Livy的对比,可以查看官方相关文档。

SQL中间件介绍:

SQL中间件是基于公司目前开源的XSQL和Quicksql两款SQL中间件,两种都支持跨数据源混查,两个都很优秀,
至于大家选择集成哪个可以根据自身情况决定。因为之前使用过XSQL,所以是在linkis增加了xsql查询引擎。

以下分别简单介绍下两款开源组件:

XSQL:

XSQL是一款低门槛、更稳定的分布式查询引擎。它允许你快速、近实时地查询大量数据。对于一些数据源(例如:Elasticsearch、MongoDB、Druid等),他可以大幅地降低学习曲线,并节省人力成本。除Hive外,每种数据源都支持除子查询外的下推执行优化。用户有时希望将位于不同数据源上的数据关联起来进行查询,但是由于各种数据源是异构的且一些数据源不支持SQL或者支持的SQL语法非常有限,因此传统互联网公司的做法是,将不同的数据同步到统一的存储介质中,再进行OLAP的查询。数据同步的过程中可能面临数据迁移、主从同步、网络带宽等诸多困难和挑战,而且需要浪费大量的人力、物力及时间,无法满足大数据产品当前阶段对于近实时甚至准实时的场景。通过XSQL你将可以避免数据迁移和时间浪费,更加专注于业务本身。XSQL可以通过下推、并行计算、迭代计算等底层支撑技术,对各种数据源的查询加速。

功能特性:

  • 内置8种数据源,包括:Hive、Mysql、EleasticSearch、Mongo、Kafka、Hbase、Redis、Druid等。
  • XSQL采用数据源(DataSource)、数据库(Database)、数据表(Table)的三层元信息,为异构数据源提供了统一视图,进而实现了跨数据源的数据关联
  • SQL Everything,将程序与数据源具体版本解耦,程序迁移能力得到加强
  • 对DDL、DML、可下推查询,延迟与Yarn的交互及资源申请,进而提升效率并节省资源。
  • 相比很多开源分布式查询引擎,XSQL替换了Spark SQL,因而只需要一次SQL解析,避免多次解析带来的时延。
  • XSQL允许用户将聚合、过滤、投影等操作下推至数据源计算引擎,相比DataSet API更容易实现毫秒级响应。
  • XSQL借鉴了业内优秀的开源项目,放弃元数据的中心化,因此避免了数据同步、数据不一致,数据延迟等不利因素。XSQL也因此在部署上更加轻量、简便。
  • XSQL对元数据的缓存有两种级别,既能减少对底层数据源的压力,也提升了XSQL的执行效率。
  • XSQL可以按照用户需要,设置元数据白名单来避免缓存多余的元信息,进一步提升执行效率。
  • 可适配到Spark 2.x任意版本,解压即可运行,不需要引入额外依赖。且与原生SparkSQL隔离运行,不影响现有程序运行

111

Quicksql:

Quicksql是一款跨计算引擎的统一联邦查询中间件,用户可以使用标准SQL语法对各类数据源进行联合分析查询。其目标是构建实时\离线全数据源统一的数据处理范式,屏蔽底层物理存储和计算层,最大化业务处理数据的效率。同时能够提供给开发人员可插拔接口,由开发人员自行对接新数据源。

功能特性:

  • 支持8种数据源查询:Hive, MySQL, Kylin, Elasticsearch, Oracle, MongoDB, PostgreSQL, GBase-8s;
  • 支持Spark、Flink双计算引擎;
  • 支持基础CLI命令行查询和JDBC远程连接查询;
  • JDBC类型数据源可通过YAML配置快速接入,无需修改代码;
  • 提供方言/语法对接模板,支持用户对新数据源的语法自定义;
  • 提供元数据采集功能,批量拉取预置元数据;
  • 支持落地HDFS,支持可配置的异步响应机制

p1

执行流程图

333

实践过程

参考linkis官方文档《如何快速实现新的底层引擎》、《Spark引擎介绍》,然后在uejs/definedEngines下创建xsql模块进行相关开发。

功能点:

  • 1、支持按照不同集群加载相关配置
  • 2、支持自定义结果存储路径
  • 3、支持是否开启默认limit 5000限制保护
  • 4、linkis网关上socket支持token user认证。
  • 5、适配公司内部hadoop版本
  • 6、增加XSQL执行引擎

实现过程简述:

由于公司大数据是有多集群的,为了节省客户端资源,可以复用客户端提交任务到不同集群,这时就需要能够灵活指定不同集群的配置文件。
目前是将用到的集群配置文件放到linkis-hadoop-conf文件夹下,用于在启动引擎时以及己启动的执行引擎里进入动态加载。


├── client-viewfs.xml
├── core-site-cluster1.xml
├── hbase-site-cluster1.xml
├── hdfs-site.xml
├── hive-default.xml
├── hive-exec-log4j.properties
├── hive-log4j.properties
├── hive-site-cluster1.xml
├── ivysettings.xml
├── mapred-site-cluster1.xml
├── spark-defaults-cluster1.conf
├── xsql-spark-defaults-cluster1.conf
└── yarn-site-cluster1.xml

在linkis-gateway网关模块,改造让socket支持token user认证。比如在创建socket连接时可以通过传入token相关参数来完成用户认证。

ws://gateway.linkis.net:9001/ws/api/entrance/connect?Token-User=xxxx&Token-Code=BML-AUTH

{
	//这个地址也需要增加token参数
 	"method":"/api/rest_j/v1/entrance/execute?Token-User=xxx&Token-Code=BML-AUTH",
 	"data":{
		"params": {
			"variable":{
			},
			"configuration":{
				"special":{

				},
				"runtime":{

				},
				"startup":{
				}
			}
		},
		"executeApplicationName":"xsql",
		"executionCode":"SELECT * FROM abc limit 5;",
		"runType":"sql"
	}
}

由于业务实际查询时是需要全量数据,不需要进行limit限制。
而且是想根据每次请求中参数动态来设置是否需要Limit,而不是通过全局配置统一禁用还是开启。
业务需要自定义存储结果路径,比如跨集群跨账号存储查询结果。

以上是Linkis\ujes\entrance入口模块里进行参数接受处理。

XSQL执行引擎实现:

  • 目录结构

222

由于xsql是基于spark实现的。所以xsql执行引擎基本是复用了linkis spark引擎代码。

重点是修改如下:

主要涉及到linkis-ujes-xsql-engine 模块相关改动

  • pom.xml

<!--<spark.version>2.4.3</spark.version> -->
<!--把2.4.3修改为2.4.3.xsql-0.6.0 -->
<spark.version>2.4.3.xsql-0.6.0</spark.version>,

2.4.3.xsql-0.6.0这个版本请根据从开源xsql编译时获取,由于适配了公司内部,所以版本号可能略有不同。

SparkEngineExecutorFactory 类

override def createExecutor(options: JMap[String, String]): SparkEngineExecutor = {

    val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath
    SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) =>
      k.startsWith("spark.")
    }.foreach { case (k, v) =>
      conf.set(k, v)
      sys.props.getOrElseUpdate(k, v)
    }

}

def createSparkSession(outputDir: File, conf: SparkConf, addPythonSupport: Boolean = false): SparkSession = {


	//val builder = SparkSession.builder.config(conf)
    //builder.enableHiveSupport().getOrCreate()

	//划重点:将enableHiveSupport改成enableXSQLSupport()
	val builder = SparkSession.builder.config(conf)
    builder.enableXSQLSupport().getOrCreate()
}

SparkEngineExecutor 类

override protected def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse = Utils.tryFinally {

	//同样要增加加载配置代码段
	val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath
    SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) =>
      k.startsWith("spark.")
    }.foreach { case (k, v) =>
      sc.getConf.set(k, v)
      sys.props.getOrElseUpdate(k, v)
    }

}

如何使用

提交参数如下:

{
    "params":{
        "variable":{
        },
        "configuration":{
            "special":{
            },
            "runtime":{
                "clusterName":"cluster1",
                "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",
                "userName":"hadoop",
                "wds.linkis.yarnqueue":"hadoop",
				//可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限
                "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"
                //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true,
                //否则不需要传这个参数,linkis则默认会进行limit 5000限制
                //"allowNoLimit" : true
            },
            "startup":{
                "clusterName":"cluster1",
                "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",
                "userName":"hadoop",
                "wds.linkis.yarnqueue":"hadoop",
				//可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限
                "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"
                //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true,
                //否则不需要传这个参数,linkis则默认会进行limit 5000限制
                //"allowNoLimit" : true
            }
        }
    },
    "executeApplicationName":"xsql",
    "executionCode":"
		REMOVE DATASOURCE IF EXISTS mysql_connect_name;
		ADD DATASOURCE mysql_connect_name(type='mysql',url='jdbc:mysql://10.22.22.22:3306',user='root',password='123456',pushdown='false',useSSL='false',version='5.7.28');
		REMOVE DATASOURCE IF EXISTS hive_cluster1;
		ADD DATASOURCE hive_cluster1(type='hive',metastore.url='thrift://10.222.222.222:9083',user='test',password='test',version='1.2.1');
		SELECT t1.id,t1.name,t1.title,t2.time,t2.url,t2.partner,t2.m2 FROM (SELECT id,name,title,ip FROM mysql_connect_name.database_name.mysql_tables) t1 JOIN 
		(SELECT m,time,url,partner,ip FROM hive_cluster1.database_name.hive_tablse WHERE day = 20200903) t2 
		ON t1.ip=t2.ip order by t2.time;",
    "runType":"sql"
}

XSQL语法说明:

删除数据源时请使用这种语法 REMOVE DATASOURCE IF EXISTS 数据源名称; 避免直接REMOVE DATASOURCE 数据源名称, 因为上来就执行删除数据源,会因为找不到数据源来报异常。

查询的表名需要增加数据源以及数据库进行限定,要符合三段式表名。比如:hive_cluster1.database_name.table_name

第一段数据源名称,就是添加数据源语法时自定义的名称hive_cluster1,比如ADD DATASOURCE hive_cluster1(... ...)

第二段数据库名称,这个需要是真实的数据库,比如database_name

第三段表名,表要是第二段数据库下真实的表名。

更多XSQL使用语法,可以查看官方相关文档。https://qihoo360.github.io/XSQL/tutorial/syntax/

这样就可以实现mysql与hive数据进行关联查询了。

相关版本

hive 1.2.1

spark 2.4.3

linkis 0.9.3

xsql 0.6.0

java 1.8+

hadoop 2.7.2

相关资源

https://github.com/WeBankFinTech/Linkis

https://github.com/Qihoo360/XSQL

https://github.com/Qihoo360/Quicksql

@libailin libailin changed the title linkis与SQL中间件(跨数据源混查)结合实践分享 【有奖征文】linkis与SQL中间件(跨数据源混查)结合实践分享 Nov 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant