为了补充标准计算模式的多样性和复杂性,产品提供了策略设置的高级计算模式。在高级计算模式中,可以更加灵活地定义查询语句和触发条件,支持更多应用场景。
支持标准的Elasticsearch(以下简称ES)查询语句
- 选择索引,选择数据所在的索引。
- 编辑框内,填写ES查询的body体,查询语句遵循标准的ES查询标准,示例如下:
{
"query": {
"bool": {
"must": [
{
"range": {
"time_local": {
"gte": "now-5m",
"lte": "now"
}
}
}
],
"must_not": []
}
},
"size": 10
}
PS: 更多查询语句可以参考如下链接文档:
https://www.elastic.co/guide/cn/elasticsearch/guide/current/search-in-depth.html
主要涉及到结构化搜索,全文搜索,多字段搜索,近似匹配...等。
- 查询结果。根据(1)选择的索引,以及(2)填写的查询语句,可在ES中检索到相应数据,这里用payload表示查询后的结果,该结果也是定义触发条件的输入。
触发条件需要定义一个触发函数,支持JavaScript语法。
该函数的输入参数是上一步数据查询的结果-payload,在函数编写过程中可参考页面展示的数据查询结果。
通过一系列的触发条件判断编写,
若策略触发,则该函数的返回值定义为策略的触发结果。
若策略没有触发,则该函数可不返回返回值,或着返回false。
一个简单的触发函数,示例如下:
function trigger(payload) {
let result = "payload.hits.total=" + payload.hits.total
if (payload.hits.total > 5) {
return result;
}
}
- 描述:监控最近5分钟5XX状态码的数量,如果5XX的请求数在所有请求数中所占的比例超过一定值(假定为5%)则认为触发该事件,记录下当前5XX的数量及占比。
- 思路:
- 选择该事件数据来源的索引 - atd-statistics-domain-*
- 编写Elasticsearch查询条件。根据描述可知,时间范围为最近五分钟,需要查询得到5XX的请求数量以及该时间段内所有请求数的数量。
- 根据查询结果编写触发函数。判断占比是否超过5%,若超过则返回需要记录的数据。
- 实现:
{ "query": { "bool": { "must": [ { "range": { "time_local": { "gte": "now-5m", "lte": "now" } } } ], "must_not": [] } }, "aggs": { "5xx_count_agg": { "sum": { "field": "5xx_count" } }, "request_count_agg": { "sum": { "field": "request_count" } } }, "size": 0 }
{ "took": 1, "timed_out": false, "_shards": { "total": 7, "successful": 7, "skipped": 0, "failed": 0 }, "hits": { "total": 13, "max_score": 0, "hits": [] }, "aggregations": { "request_count_agg": { "value": 24277 }, "5xx_count_agg": { "value": 3108 } } }
function trigger(payload) { let count_5xx = 0; let request_count = 0; let proportion, result; if (payload.aggregations && payload.aggregations["5xx_count_agg"]) { count_5xx = payload.aggregations["5xx_count_agg"].value; } if (payload.aggregations && payload.aggregations.request_count_agg) { request_count = payload.aggregations.request_count_agg.value; } if (request_count) { proportion = count_5xx / request_count; } if (proportion > 0.05) { result = "5xx数量:" + count_5xx +",当前占比:" + (proportion * 100).toFixed(2) + "%"; return result; } }
- 描述:监控最近5分钟5XX状态码的数量,比较当前时间5XX状态码数量超过一定值(假定1000),并且当前数量与5分钟前的5XX数量相比增多超过一定比例(假定20%),则认为触发事件,记录当前时间与5分钟前的5XX数量。
- 思路:
- 选择该事件数据来源的索引 - atd-statistics-domain-*
- 编写Elasticsearch查询条件。根据描述可知,时间范围为最近五分钟,需要查询得到5XX的请求数量。
- 编写触发函数。根据描述需要进行当前数量与5分钟前数量进行比较,这里需要借助CacheStorage模块进行5XX数量的临时存储,存储的信息包括(key:'5xx_count_MM-DDHH:mm',value: 5XX数量),在进行触发判断时,首先去获取5分钟前的数量,然后与给定值,以及当前时间的数量进行比较,若超过则返回需要记录的数据。
- 实现:
{ "query": { "bool": { "must": [ { "range": { "time_local": { "gte": "now-5m", "lte": "now" } } } ], "must_not": [] } }, "aggs": { "5xx_count_agg": { "sum": { "field": "5xx_count" } } }, "size": 0 }
{ "took": 3, "timed_out": false, "_shards": { "total": 8, "successful": 8, "skipped": 0, "failed": 0 }, "hits": { "total": 13, "max_score": 0, "hits": [] }, "aggregations": { "5xx_count_agg": { "value": 87 } } }
async function trigger(payload) { const normal_5xx_count = 1000; const normal_5xx_proportion = 0.2; const cache_storage = new CacheStorage('event'); let timestamp = moment().unix(); //存当前数据 let key = '5xx_count_' + moment(timestamp * 1000).format("MM-DDHH:mm"); let value; if (payload.aggregations && payload.aggregations["5xx_count_agg"]) { value = payload.aggregations["5xx_count_agg"].value; } let data = {}; data.value = value; data.name = '5XX数量'; cache_storage.set(key, data); //取5分钟前数据 let pre_key = '5xx_count_' + moment(timestamp * 1000).subtract(5, 'minutes').format("MM-DDHH:mm"); let pre_data = await cache_storage.get(pre_key); let pre_value = pre_data && pre_data.value; //比较 let rising_value = value - pre_value; if (value < normal_5xx_count || value < pre_data) { return false; } if (pre_value && rising_value / pre_value >= normal_5xx_proportion) { let rising_proportion = ((rising_value / pre_value) * 100).toFixed(2) + "%"; let result = '当前5XX数量:' + value + ',5分钟前5XX数量:' + pre_value + "增加:" + rising_proportion; return result; } }
- 描述:监控每分钟web类型的威胁事件,对请求数排行TOP5的威胁事件的IP做进一步的判断,判断其是否在已有的威胁情报库中(假定这里通过接口 http://www.baishancloud.com/threatInfo?ip=xxx 判断),通过判断条件若满足条件,则触发事件,记录下该IP及其在威胁情报库中的原因。
接口说明: http://www.baishancloud.com/threatInfo?ip=xxx
接口返回值:
data: {
hit: 0 or 1, // 0表示威胁情报库中没有该IP,1表示威胁情报库中存在该IP
reason: '爬虫' // 该IP在威胁情报库中对应的原因
}
-
思路:
- 选择该事件数据来源的索引 - atd-event-online-*
- 编写Elasticsearch查询条件。根据描述可知,时间范围为每分钟,需要查询得到类型为web,按IP维度聚合并按请求数降序排序的TOP5威胁事件。
- 编写触发函数。根据描述可知需要对TOP5的IP进行调用接口,并对接口返回进行判断,若满足条件则触发事件返回需要记录的信息。
-
实现:
{ "query": { "bool": { "must": [ { "range": { "time_local": { "gte": "now-1m", "lte": "now" } } }, { "term": { "service_category": "web" } } ], "must_not": [] } }, "aggs": { "ip_group": { "terms": { "field": "ip", "size": 5, "order": { "pv_count": "desc" } }, "aggs": { "pv_count": { "sum": { "field": "pv" } } } } }, "size": 0 }
{ "took": 7, "timed_out": false, "_shards": { "total": 71, "successful": 71, "skipped": 0, "failed": 0 }, "hits": { "total": 28, "max_score": 0, "hits": [] }, "aggregations": { "ip_group": { "doc_count_error_upper_bound": -1, "sum_other_doc_count": 20, "buckets": [ { "key": "124.65.226.82", "doc_count": 2, "pv_count": { "value": 26 } }, { "key": "36.110.108.66", "doc_count": 2, "pv_count": { "value": 21 } }, { "key": "203.76.219.218", "doc_count": 2, "pv_count": { "value": 3 } }, { "key": "36.110.72.18", "doc_count": 1, "pv_count": { "value": 3 } }, { "key": "180.118.247.2", "doc_count": 1, "pv_count": { "value": 2 } } ] } } }
async function trigger(payload) { const requestUrl = "http://www.baishancloud.com/threatInfo?ip="; const method = "get"; let topIP = []; let result = ""; if (payload.aggregations && payload.aggregations.ip_group && payload.aggregations.ip_group.buckets) { topIP = payload.aggregations.ip_group.buckets; } for (let i = 0; i < topIP.length; i++) { let url = requestUrl + topIP[i].key; let res = await fetchData(url, method); let hited = res.data && res.data.hit || 0; let reason = res.data && res.data.reason || '暂无'; if (hited) { result += "IP:" + topIP[i].key + "情报库内原因:" + reason + "<br/>"; } } if (result) { return result; } }
在触发函数编写的过程中,本产品提供了封装好的接口请求函数fetchData,该函数的返回值为请求返回值。 请配合async/await的同步操作使用。即触发函数以async开头,接口请求函数以await开始。示例如下:
fetchData(url, method, headers, body),其中:
url: 完整的请求地址,
method: 请求方法,
headers: 请求的headers,不传时默认为{ 'Content-Type': 'application/json' },
body: 请求的body体
对于一个带接口请求的触发函数,示例如下:
async function trigger(payload) {
let url = "http://xxx/test";
let method = "get";
let data = await fetchData(url, method);
let result = "payload.hits.total=" + payload.hits.total;
if (payload.hits.total > data.total) {
return result;
}
}
对于触发函数中的时间格式化处理,本产品支持moment进行时间格式化。示例如下:
var time = moment().unix();
本产品内置了临时数据存储模块CacheStorage,可供用户进行数据的临时存储,以及后续使用。
该存储模块提供了三种处理数据的方法,分别为get,set,delete。请配合async/await的同步操作使用。
CacheStorage模块的使用具体描述如下:
const cache_storage = new CacheStorage('event');
cache_storage.get(key);
cache_storage.set(key, value, expire_time);
cache_storage.delete(key);
cache_storage.get(key),其中:
参数:
key: 数据存储的key值
返回值:
若存在该key对应的value,且该数据没有过期,则返回value,否则返回null。
示例:
let value = await cache_storage.get(key);
cache_storage.set(key, value, expire_time),其中:
参数:
key: 数据存储的key;
value: 存储的数据,支持number,string,object,array等格式;
expire_time: 数据保存时间(秒),可选。若不填,默认数据保存30天;
返回值:
若数据存储成功,返回true;失败则返回false。
示例:
let key = 'total';
let value = 100;
let expire_time = 86400; //数据保存86400秒
await set(key, value, expire_time);
cache_storage.delete(key),其中:
参数:
key: 数据存储的key值
返回值:
若数据删除成功,返回true;失败则返回false。
示例:
await cache_storage.delete(key);
本产品提供内置的邮件发送函数postEmail
,示例如下:
function postEmail(mailData, SMTPConfig, callReady, callError),其中:
mailData: 发送邮件内容对象,包括to、title、content属性;
SMTPConfig: 若需要使用自己的SMTP服务器,请传入相关配置包括host、port、user、password属性,可选;
callReady: 发送成功回调,可选;
callError: 发送失败回调,可选;
其中mailData
参数,分使用本产品提供的SMTP服务和用户自身SMTP服务两种情况进行说明:
- 使用本产品提供的SMTP,示例如下:
let mailData = {}; //邮件信息
mailData.to = xxx; //收件人邮箱地址,多个用分号分割
mailData.title = xxx; //邮件主题
mailData.content = xxx; //邮件内容
postEmail(mailData);
- 用户自身SMTP服务器,示例如下:
let mailData = {}; //邮件信息
mailData.to = xxx; //收件人邮箱地址,多个用分号分割
mailData.title = xxx; //邮件主题
mailData.content = xxx; //邮件内容
let SMTPConfig = {}; //SMTP服务器信息
SMTPConfig.host = xxx; //SMPT服务器地址
SMTPConfig.port = xxx; //端口
SMTPConfig.user = xxx; //发件邮箱
SMTPConfig.password = xxx; //SMTP授权码
postEmail(mailData, SMTPConfig);
在触发函数编写的过程中,本产品提供了封装好的ES请求模块esClient,详细的API调用方式请参考官方elasticsearch.js。 文档如下:
https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/api-reference.html
请配合async/await的同步操作使用。即触发函数以async开头,接口请求函数以await开始。示例如下:
async function trigger(payload) {
let index = xxx;
let body = xxx;
let data = await esClient.search({
index: index,
body: body
});
......判断条件......
}
在触发函数编写的过程中,为了方便关联各种设备的数据,本产品提供了封装好的调用模块bscModule,可用于各种设备调用交互处理。bscModule模块中封装的各设备类型及文档说明,如下表所示。
名称 | 对应模块 | 示例 | 文档 |
---|---|---|---|
f5 | f5IControl | let f5IControl = bscModule.f5IControl |
https://github.com/thwi/node-icontrol |
radware | radWare | let radWare = bscModule.radWare |
https://webhelp.radware.com/Vision/REST/4_00_00/index.html |
具体示例:
function trigger(payload) {
var f5IControl = bscModule.f5IControl; //引入f5关联模块
var bigip = new f5IControl({
host: '192.xxx.xxx.xxx',
proto: 'https',
port: 443,
user: 'admin',
pass: 'admin',
strict: true,
debug: false
});
......其他api使用请参考文档:https://github.com/thwi/node-icontrol#usage ......
}
radware模块中,提供登录函数login,以及数据交互请求函数request。 login函数的返回值如下:
登录成功:
{
"status": "ok",
"jsessionid": "sessionid"
}
登录失败:
{
"status": "error",
"message": "Invalid Username or invalid Password. Re-enter."
}
request函数具体描述如下:
request(path, method, opts),其中:
path: 请参照radware文档中各请求path,如:'/mgmt/system/user/logout',
method: 请求方法,
opts: {
headers: {}, //已设置默认值,一般请求无需设置,特殊情况可自行设置
body: {}
}
* headers说明,若需设置,请参照
headers = {
'Content-Type': 'application/json',
'cookie': jsessionid //*必须,值为login接口返回的 jsessionid
};
具体应用实例如下:
async function trigger(payload) {
var radWare = bscModule.radWare; //引入radware关联模块
var radware = new radWare({
host: '192.xxx.xxx.xxx',
proto: 'http', //默认为http
username: 'admin',
password: 'admin'
});
//登录
var login = await radware.login();
if(login.status == 'ok') {
//进一步的数据关联交互,如获取apm设备列表
var apm_application = await radware.requst('/mgmt/system/config/apm/application','get');
......其他api使用请参考官方文档https://webhelp.radware.com/Vision/REST/4_00_00/index.html......
}
}
为了更方便,快捷地使用JavaScript语言进行高级模式的编写,本产品提供了内置封装的lodash
JS工具模块,使用时通过_
调用,简单示例如下:
对象取值
_.get(object, path, [defaultValue])
var object = { 'a': [{ 'b': { 'c': 3 } }] };
_.get(object, 'a[0].b.c');
// => 3
_.get(object, ['a', '0', 'b', 'c']);
// => 3
_.get(object, 'a.b.c', 'default');
// => 'default'
数组的交集
_.intersection([arrays])
参数
[arrays] (...Array): 待检查的数组。
返回值
(Array): 返回一个包含所有传入数组交集元素的新数组。
_.intersection([1, 2], [4, 2], [2, 1]);
// => [2]
数组的差集
_.difference(array, [values])
参数
array (Array): 要检查的数组。
[values] (...Array): 排除的值。
返回值
(Array): 返回一个过滤值后的新数组。
_.difference([3, 2, 1], [4, 2]);
// => [3, 1]
......
具体的API调用方式,可参考官方文档: https://lodash.com/docs/4.17.15