-
Notifications
You must be signed in to change notification settings - Fork 19
动态计算
这里提供了一种思路,利用轻量化的脚本引擎来进行动态的计算,所谓动态,也就是计算的逻辑和操作可以在流计算进行的过程中更新。 这种动态更新逻辑的能力特别适合不确定场景下的业务开发,因为逻辑修改后立刻上线,上线后配合日志系统立刻能看见效果。
你可以在JS中连接Redis集群,MongoDB数据库,所有支持JDBC驱动的SQL数据库,可以调用同步或者异步的Http接口,这已经覆盖了不少场景,你还可以模仿libXXX.js(在资源文件夹)的开发模式开发自己的库(希望您可以Merge回来)。
缺点是由于又使用JS包了一层,所以性能一定会下降,对于数据量特别大的,在业务稳定后可以考虑继承IRichServic[Document]
接口实现Scala/Java版本的流计算业务。
首先准备一个文件服务器,里面找地方放置一个json文件,这个JSON文件的内容是List[String]
,例如http://config.shinonomelab.com/lofka-scripts/config.json中的这样:
[
"http://config.shinonomelab.com/lofka-scripts/print-test.js"
]
这里面是一堆URL,每一个URL都是一个脚本文件。
启动Flink集群的时候带上参数:--dynamics http://config.shinonomelab.com/lofka-scripts/config.json
你需要实现三个方法:open
,close
和processing_data(str_data)
,其中:
-
open
方法会在程序加载的时候执行,且只执行一次。 -
close
会在程序终结的时候执行,且只执行一次 -
processing_data
会在每一条日志数据过来的时候执行,str_data是JSON格式的字符串。 - 脚本的变更会导致程序的重新加载。
一个简单的脚本文件如下所示,该脚本的目的是在日志中打印出所有henan_zj/streaming/alarms
汇报的日志。
/**
* 启动程序的时候加载
*/
function open(){
console.log("Script loaded");
}
/**
* 关闭程序的时候加载
*/
function close(){
console.log("Script closed");
}
/**
* 运行程序
*/
function processing_data(str_data){
var data = JSON.parse(str_data);
if(filter_data(data)){
process_filtered_data(data);
}
}
function process_filtered_data(data){
console.logJSON(data);
}
function filter_data(data){
return data["app_name"]==="henan_zj/streaming/alarms";
}
当配置的JSON文件改变的时候,程序会自动检测到并且加载,此时会根据新的URL列表重新整理引擎,已经加载过的不再重新加载。
当JS脚本变化的时候,引擎会重新加载。
参见libCommon.js
文件。
使用console
的log
,warn
,err/error
可以正常将字符串输出到日志中。
除此之外特别提供了console
的logJSON
,warnJSON
,errJSON
,这些方法会将输入的对象转为JSON格式并且输出
async_execute(需要执行的函数)
例如:
async_execute(
function(){
//一些耗时的操作
}
)
load(某个JS脚本的URL)
可以下载并执行指定的库。
load_library(存在conf/lib文件夹或者资源文件夹中lib下的脚本)
可以加载本地的库
__global_eval(data)
函数可以以全局的身份执行一些代码。
同步和异步:同步接口和异步接口的API是一样的,下面统一使用$
(异步接口),如果将$
替换为asyncHttp
则还是异步,如果替换为syncHttp
则是同步。
使用方法:
$.get(
"http://www.baidu.com/",
{
"p":1 //这里输入你要的数据
},
function(str_data){
//处理获取的数据
}
)
POST的请求和这个类似,只需要将get
换成post
即可,get
还有一个特殊用法是getJSON
,它会主动将收到的数据以JSON的方式解析。
你可以连接你的
var redis = new RedisCluster([1, 2, 3, 4, 5].map(function (i) {
return "10.10.10.124:700" + i
}));
SET set(k,v)
GET get(k)
将获取的字符串返回
HGET hget(key,field)
,返回获取的String
HGETALL hgetall(key)
,将每一个Field->Value以 JSON Object 的形式返回
HMSET hmset(key, data)
将DATA中的String->String保存到key中
EXPIRE expire(key,expire_seconds)
某个键过一段时间过期
使用get_connection()
方法可以获取JedisCluster对象进行操作,也可以使用RedisCluster.prototype.xxx=xxx
来增加自己的方法。
redis.close();
var conn = new SQLConnection({
"uri": "jdbc:mysql://127.0.0.1:3066/xxdb",
"user": "userxxxxxx",
"password": "pwdxxxxx",
"driver": "com.mysql.jdbc.Driver"
});
通过合理的配置即可连接数据库,需要注意的是,如果你有特殊的数据库驱动要提前放到POM中一起编译。
举个例子:
var data = conn.query("SELECT F_VEHICLE_ID as id,F_GPS_TIME as gt,F_LONGITUDE AS lng, F_LATITUDE as lat FROM T_LOCATION_20181128 LIMIT 0,2")
返回结果如下所示:
{
"result": [//这里是所有查询的结果
{
"id": "4942200360503908",
"gt": "2018-11-28 00:00:00",
"lng": "115137336",
"lat": "31216027"
},
{
"id": "4942200360503908",
"gt": "2018-11-28 00:00:09",
"lng": "115138544",
"lat": "31215503"
}
],
//这里是你输入的SQL,照样返回
"sql": "SELECT F_VEHICLE_ID as id,F_GPS_TIME as gt,F_LONGITUDE AS lng, F_LATITUDE as lat FROM T_LOCATION_20181128 LIMIT 0,2",
"columns": [// 这里是所有的字段(有顺序的)
"id",
"gt",
"lng",
"lat"
],
"cols": 4,//有多少列
"rows": 2//有多少行
}
conn.exec("某些INSERT之类的语句")
即可。
conn.close();
var mongo = new MongoClient({
"servers": "127.0.0.1",
"auth":"lofka:lofka@logger" // 用户名密码和鉴权使用的数据,这一行可选
});
查询,包括Aggregate的语法和MongoDB原生的基本一致,例如:
mongo.get_collection(
"gis",
"shanghai_poi"
).find({
"typecode": "010101"
}).sort({
"_id": -1
}).limit(
2
).toArray()
这样的语法是支持的
mongo.get_collection(
"gis", "test"
).insert([
{"x": 1},
{"x": 2}
]);
这里程序会自动判断你输入的是Array还是Object来判断是使用insert还是insertMany,这一点和原生API不太一样。
可以修改 libMongoDB.js
来实现你要的一些骚操作
mongo.close();