Skip to content

动态计算

Yuan Yifan edited this page Apr 10, 2019 · 2 revisions

动态计算

简介

这里提供了一种思路,利用轻量化的脚本引擎来进行动态的计算,所谓动态,也就是计算的逻辑和操作可以在流计算进行的过程中更新。 这种动态更新逻辑的能力特别适合不确定场景下的业务开发,因为逻辑修改后立刻上线,上线后配合日志系统立刻能看见效果。

你可以在JS中连接Redis集群,MongoDB数据库,所有支持JDBC驱动的SQL数据库,可以调用同步或者异步的Http接口,这已经覆盖了不少场景,你还可以模仿libXXX.js(在资源文件夹)的开发模式开发自己的库(希望您可以Merge回来)。

缺点是由于又使用JS包了一层,所以性能一定会下降,对于数据量特别大的,在业务稳定后可以考虑继承IRichServic[Document]接口实现Scala/Java版本的流计算业务。

快速开始

首先准备一个文件服务器,里面找地方放置一个json文件,这个JSON文件的内容是List[String],例如http://config.shinonomelab.com/lofka-scripts/config.json中的这样:

[
    "http://config.shinonomelab.com/lofka-scripts/print-test.js"
]

这里面是一堆URL,每一个URL都是一个脚本文件。

启动Flink集群的时候带上参数:--dynamics http://config.shinonomelab.com/lofka-scripts/config.json

你需要实现三个方法:opencloseprocessing_data(str_data),其中:

  • open 方法会在程序加载的时候执行,且只执行一次。
  • close 会在程序终结的时候执行,且只执行一次
  • processing_data会在每一条日志数据过来的时候执行,str_data是JSON格式的字符串。
  • 脚本的变更会导致程序的重新加载。

一个简单的脚本文件如下所示,该脚本的目的是在日志中打印出所有henan_zj/streaming/alarms汇报的日志。

/**
 * 启动程序的时候加载
 */
function open(){
    console.log("Script loaded");
}

/**
 * 关闭程序的时候加载
 */
function close(){
    console.log("Script closed");
}

/**
 * 运行程序
 */
function processing_data(str_data){
    var data = JSON.parse(str_data);
    if(filter_data(data)){
        process_filtered_data(data);
    }
}

function process_filtered_data(data){
    console.logJSON(data);
}

function filter_data(data){
    return data["app_name"]==="henan_zj/streaming/alarms";
}

加载机制

当配置的JSON文件改变的时候,程序会自动检测到并且加载,此时会根据新的URL列表重新整理引擎,已经加载过的不再重新加载。

当JS脚本变化的时候,引擎会重新加载。

脚本语言API概览

公共部分

参见libCommon.js文件。

日志

使用consolelog,warn,err/error可以正常将字符串输出到日志中。 除此之外特别提供了consolelogJSON,warnJSON,errJSON,这些方法会将输入的对象转为JSON格式并且输出

异步执行

async_execute(需要执行的函数)

例如:

async_execute(
    function(){
        //一些耗时的操作
    }

加载其他库

load(某个JS脚本的URL)可以下载并执行指定的库。

load_library(存在conf/lib文件夹或者资源文件夹中lib下的脚本)可以加载本地的库

__global_eval(data)函数可以以全局的身份执行一些代码。

HTTP

同步和异步:同步接口和异步接口的API是一样的,下面统一使用$(异步接口),如果将$替换为asyncHttp则还是异步,如果替换为syncHttp则是同步。

使用方法:

$.get(
    "http://www.baidu.com/",
    {
        "p":1 //这里输入你要的数据
    },
    function(str_data){
        //处理获取的数据
    }
)

POST的请求和这个类似,只需要将get换成post即可,get还有一个特殊用法是getJSON,它会主动将收到的数据以JSON的方式解析。

Redis Cluster

你可以连接你的

创建连接

var redis = new RedisCluster([1, 2, 3, 4, 5].map(function (i) {
    return "10.10.10.124:700" + i
}));

操作示例

一般操作

SET set(k,v) GET get(k) 将获取的字符串返回

HASH操作

HGET hget(key,field),返回获取的String HGETALL hgetall(key),将每一个Field->Value以 JSON Object 的形式返回 HMSET hmset(key, data) 将DATA中的String->String保存到key中

EXPIRE操作

EXPIRE expire(key,expire_seconds) 某个键过一段时间过期

自定义操作

使用get_connection()方法可以获取JedisCluster对象进行操作,也可以使用RedisCluster.prototype.xxx=xxx来增加自己的方法。

关闭连接

redis.close();

SQL

创建连接

var conn = new SQLConnection({
    "uri": "jdbc:mysql://127.0.0.1:3066/xxdb",
    "user": "userxxxxxx",
    "password": "pwdxxxxx",
    "driver": "com.mysql.jdbc.Driver"
});

通过合理的配置即可连接数据库,需要注意的是,如果你有特殊的数据库驱动要提前放到POM中一起编译。

查询操作

举个例子:

var data = conn.query("SELECT F_VEHICLE_ID as id,F_GPS_TIME as gt,F_LONGITUDE AS lng, F_LATITUDE as lat FROM T_LOCATION_20181128 LIMIT 0,2")

返回结果如下所示:

{
  "result": [//这里是所有查询的结果
    {
      "id": "4942200360503908",
      "gt": "2018-11-28 00:00:00",
      "lng": "115137336",
      "lat": "31216027"
    },
    {
      "id": "4942200360503908",
      "gt": "2018-11-28 00:00:09",
      "lng": "115138544",
      "lat": "31215503"
    }
  ],
  //这里是你输入的SQL,照样返回
  "sql": "SELECT F_VEHICLE_ID as id,F_GPS_TIME as gt,F_LONGITUDE AS lng, F_LATITUDE as lat FROM T_LOCATION_20181128 LIMIT 0,2",
  "columns": [// 这里是所有的字段(有顺序的)
    "id",
    "gt",
    "lng",
    "lat"
  ],
  "cols": 4,//有多少列
  "rows": 2//有多少行
}

执行操作

conn.exec("某些INSERT之类的语句")

即可。

关闭连接

conn.close();

MongoDB

创建连接

var mongo = new MongoClient({
    "servers": "127.0.0.1",
    "auth":"lofka:lofka@logger" // 用户名密码和鉴权使用的数据,这一行可选
});

查询操作

查询,包括Aggregate的语法和MongoDB原生的基本一致,例如:

mongo.get_collection(
    "gis",
    "shanghai_poi"
).find({
    "typecode": "010101"
}).sort({
    "_id": -1
}).limit(
    2
).toArray()

这样的语法是支持的

插入操作

mongo.get_collection(
    "gis", "test"
).insert([
    {"x": 1},
    {"x": 2}
]);

这里程序会自动判断你输入的是Array还是Object来判断是使用insert还是insertMany,这一点和原生API不太一样。

扩展方法

可以修改 libMongoDB.js 来实现你要的一些骚操作

关闭连接

mongo.close();
Clone this wiki locally