Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【有奖征文】DSS+Linkis在知因智慧使用情况 #15

Open
dlimeng opened this issue Nov 15, 2020 · 0 comments
Open

【有奖征文】DSS+Linkis在知因智慧使用情况 #15

dlimeng opened this issue Nov 15, 2020 · 0 comments

Comments

@dlimeng
Copy link

dlimeng commented Nov 15, 2020

一.应用场景

首先感谢社区各位大佬的指点,学习到很多。

知因智慧是一家toB金融公司,里面需要大量的ETL过程,原先用Shell脚本连接各种Hql,Spark等等,XXL- Job调度,可能一个模块就被一个大的脚本包含住了,耦合性特别强,调度这块也有问题,无法监控中间的报错,2019下半年时看到社区开源组件,一直研究怎么跟公司整合。

希望借助社区的力量,结合公司实际情况,打通公司级数据中台的流程,目前数据建设主要集中在元数据管理,数据仓库ETL流程,数据质量,任务调度这几个方面。

二. 解决的问题

基于LDAP服务

基于LDAP管理用户,代理服务模块修改,以组为单位共用账户,公司的整个数据开发人员不多,基于这种方式可以支撑下去。

object LDAPUtils extends Logging {
  val url =  CommonVars("wds.linkis.ldap.proxy.url", "").getValue
  val baseDN = CommonVars("wds.linkis.ldap.proxy.baseDN", "").getValue
  def login(userID: String, password: String): Unit = {
    if(userID.isEmpty) throw new NamingException("userID is null")
    val env = new Hashtable[String, String]()
    val bindDN = "uid="+userID+","
    val bindPassword = password
    env.put(Context.SECURITY_AUTHENTICATION, "simple")
    env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory")
    env.put(Context.PROVIDER_URL, url)
    env.put(Context.SECURITY_PRINCIPAL, bindDN+baseDN)
    env.put(Context.SECURITY_CREDENTIALS, bindPassword)
    new InitialLdapContext(env, null)
    info(s"user $userID login success.")
  }
}

放开权限

因为资源有限,有些权限管理很繁琐,放开一些权限:

filesystem.path 存储日志,脚本的权限,统一根据nfs挂载,把目录权限统一根据用户随意修改。

linkis-metadata元数据管理,把Hive相关权限放开(HiveMetaDao.xml):

<select id="getDbsByUser" resultType="java.lang.String" parameterType="java.lang.String" databaseId="mysql">
   select NAME from DBS GROUP BY NAME order by NAME
</select>
<select id="getTablesByDbNameAndUser" resultType="map"  parameterType="map" databaseId="mysql">
    select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
    from TBLS t2, DBS t3 where 1=1 and t2.DB_ID=t3.DB_ID and t3.NAME = #{dbName,jdbcType=VARCHAR}
    order by NAME;
</select>

依赖兼容问题

因为环境是CDH-5.16.2 编译部署DSS和Linkis是根据原生的版本号,大部分服务都没有问题,但是有的服务有些问题,因为CDH会把组件重新编译,有的指令会改变。

原先发生过Hive 和 Spark应该支持的函数,到Scriptis上运行脚本,不支持,这是因为得把两个服务相关的Hive 和 Spark jar 都变成后缀带有CDH的。

image

tez的支持

Linkis Hive引擎对tez的支持:

  • tez相关jar放到linkis-ujes-hive-enginemanager/lib
  • linkis.properties配置Hive配置文件目录,hive-site.xml配置文件中
<property>
    <name>tez.lib.uris</name>
    <value>hdfs:///apps/tez/tez-0.8.5.tar.gz</value>
  </property>
   <property>
    <name>hive.tez.container.size</name>
    <value>10240</value>
  </property>

Shell定义变量

image

自定义变量的支持:

CustomVariableUtils 工具类中,Shell关枚举都要添加上。

/**
 * @Classname ShellScriptCompaction
 * @Description TODO
 * @Date 2020/8/19 18:22
 * @Created by limeng
 */
class ShellScriptCompaction private extends CommonScriptCompaction{
  override def prefixConf: String = "#conf@set"
  override def prefix: String = "#@set"
  override def belongTo(suffix: String): Boolean ={
    suffix match {
      case "sh"=>true
      case _=>false
    }
  }
}
object ShellScriptCompaction{
  val shellScriptCompaction:ShellScriptCompaction=new ShellScriptCompaction
  def apply(): CommonScriptCompaction = shellScriptCompaction
}

ScriptFsWriter Shell相关 def listCompactions(): Array[Compaction] = Array(PYScriptCompaction(),QLScriptCompaction(),ScalaScriptCompaction(),ShellScriptCompaction())
WorkspaceUtil 工具类正则有问题,无法修改名称,中间有.符号去除

public static void fileAndDirNameSpecialCharCheck(String path) throws WorkSpaceException {
    String name = new File(path).getName();
    LOGGER.info(path);
    String specialRegEx = "[ _`~!@#$%^&*()+=|{}':;',\\[\\]<>/?~!@#¥%……&*()——+|{}【】‘;:”“’。,、?]|\n|\r|\t";
    Pattern specialPattern = Pattern.compile(specialRegEx);
    if(specialPattern.matcher(name).find()){
        throw new WorkSpaceException("the path exist special char");
    }
}

使用 eventReceiver节点异常(eventchecker组件)#247

EventCheckerNodeExecution.scala
      Utils.tryFinally {
		
              resultSetWriter.addMetaData(null)
		
              resultSetWriter.addRecord(new LineRecord(action.saveKeyAndValue))
		
           	
            }(Utils.tryQuietly(resultSetWriter.close()))
		
          }
          response.setIsSucceed(true)
		
        }else{
        
.............................................
AppJointEntranceJob.scala
override def run(): Unit = {
  if(!isScheduled) return
  info(s"$getId starts to run")
  getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo(s"$getId starts to execute.")))
  startTime = System.currentTimeMillis
  getExecutor match {
    case appjointEntranceEngine:AppJointEntranceEngine => appjointEntranceEngine.setJob(this)
      appjointEntranceEngine.setInstance(Sender.getThisInstance)
  }
  Utils.tryAndErrorMsg(transition(Running))(s"transition $getId from Scheduler to Running failed.")

加入spark streaming

  • linkis-ujes-spark-enginemanager引入依赖,spark-streaming相关。
  • SparkScalaExecutor.scala bindSparkSession方法引入相关依赖

测试结果
企业微信截图_16062055657760

调试服务

把有问题的服务,bin目录下启动脚本,远程debug打开

image

因为平台Cookie的原因,直接用接口发送请求,有的无法调试:

import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
/**
 * @Classname HttpUtil
 * @Description TODO
 * @Date 2020/10/30 14:49
 * @Created by limeng
 */
public class HttpUtil {
    public static RestTemplate getRestClient(){
        CloseableHttpClient build =  HttpClientBuilder.create().useSystemProperties().build();
        return new RestTemplate(new HttpComponentsClientHttpRequestFactory(build));
    }
}
import com.linkis.web.utils.HttpUtil;
import net.sf.json.JSONObject;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
/**
 * @Classname LinkisMain
 * @Description TODO
 * @Date 2020/10/30 14:54
 * @Created by limeng
 * 测试类 EntranceRestfulTest
 */
public class LinkisMain {
    public static void main(String[] args) {
        RestTemplate restClient = HttpUtil.getRestClient();
        JSONObject postData = new JSONObject();
        postData.put("password","hdfs");
        postData.put("userName","hdfs");
        String loginUrl = "http://192.168.200.116:8088/api/rest_j/v1/user/login";
        ResponseEntity<JSONObject> jsonResponseEntity = restClient.postForEntity(loginUrl, postData, JSONObject.class);
        System.out.println("状态码:"+jsonResponseEntity.getStatusCodeValue());
        JSONObject body = jsonResponseEntity.getBody();
        System.out.println("body :" + body.toString());
    }
}

三.最佳实践

我以公司标签库为例,讲述下操作流程。

对企业数据进行挖掘和分析,建立标签特征体系,创建个性化的多层级标签,并在此基础上进行细分和精准营销场景应用,有利于对企业/集团进行深入经营,充分挖掘企业/集团潜力,提升企业/集团价值。以此为目标的标签库的构建,将有利于了解和深耕企业/集团,可更好地助力于企业金融服务。

image

image

这是执行流程图,中间调度过程,Hql,Spark目前运行在DSS平台上。

image

其中一个企业经营特别的跑批流程。

软件版本

CDH-5.16.2

Hadoop-2.6

Hive-1.1

Spark-2.2

kafka_2.11-1.0.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant