Skip to content

Latest commit

 

History

History
653 lines (616 loc) · 22.2 KB

高级计算模式的策略设置.md

File metadata and controls

653 lines (616 loc) · 22.2 KB

概述

为了补充标准计算模式的多样性和复杂性,产品提供了策略设置的高级计算模式。在高级计算模式中,可以更加灵活地定义查询语句和触发条件,支持更多应用场景。

设置查询语句

支持标准的Elasticsearch(以下简称ES)查询语句

  1. 选择索引,选择数据所在的索引。
  2. 编辑框内,填写ES查询的body体,查询语句遵循标准的ES查询标准,示例如下:
  {
    "query": {
      "bool": {
        "must": [
          {
            "range": {
              "time_local": {
                "gte": "now-5m",
                "lte": "now"
              }
            }
          }
        ],
        "must_not": []
      }
    },
    "size": 10
  }

PS: 更多查询语句可以参考如下链接文档:

https://www.elastic.co/guide/cn/elasticsearch/guide/current/search-in-depth.html

主要涉及到结构化搜索,全文搜索,多字段搜索,近似匹配...等。

  1. 查询结果。根据(1)选择的索引,以及(2)填写的查询语句,可在ES中检索到相应数据,这里用payload表示查询后的结果,该结果也是定义触发条件的输入。

定义触发条件

触发条件需要定义一个触发函数,支持JavaScript语法。

触发函数参数:

该函数的输入参数是上一步数据查询的结果-payload,在函数编写过程中可参考页面展示的数据查询结果。

触发函数返回值:

通过一系列的触发条件判断编写,

若策略触发,则该函数的返回值定义为策略的触发结果。

若策略没有触发,则该函数可不返回返回值,或着返回false。

一个简单的触发函数,示例如下:

function trigger(payload) {
    let result = "payload.hits.total=" + payload.hits.total
    if (payload.hits.total > 5) {
        return result;
    }
}

示例

场景一:5XX状态码占比异常事件

  • 描述:监控最近5分钟5XX状态码的数量,如果5XX的请求数在所有请求数中所占的比例超过一定值(假定为5%)则认为触发该事件,记录下当前5XX的数量及占比。
  • 思路:
    • 选择该事件数据来源的索引 - atd-statistics-domain-*
    • 编写Elasticsearch查询条件。根据描述可知,时间范围为最近五分钟,需要查询得到5XX的请求数量以及该时间段内所有请求数的数量。
    • 根据查询结果编写触发函数。判断占比是否超过5%,若超过则返回需要记录的数据。
  • 实现:
    设置查询条件:
    {
      "query": {
        "bool": {
          "must": [
            {
              "range": {
                "time_local": {
                  "gte": "now-5m",
                  "lte": "now"
                }
              }
            }
          ],
          "must_not": []
        }
      },
      "aggs": {
          "5xx_count_agg": {
              "sum": {
                  "field": "5xx_count"
              }
          },
          "request_count_agg": {
              "sum": {
                  "field": "request_count"
              }
          }
      },
      "size": 0
    }
    
    查询结果如下:
    {
      "took": 1,
      "timed_out": false,
      "_shards": {
        "total": 7,
        "successful": 7,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 13,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "request_count_agg": {
          "value": 24277
        },
        "5xx_count_agg": {
          "value": 3108
        }
      }
    }
    
    定义触发条件:
    function trigger(payload) {
        let count_5xx = 0;
        let request_count = 0;
        let proportion, result;
        if (payload.aggregations && payload.aggregations["5xx_count_agg"]) {
            count_5xx = payload.aggregations["5xx_count_agg"].value;
        }
        if (payload.aggregations && payload.aggregations.request_count_agg) {
            request_count = payload.aggregations.request_count_agg.value;
        }
        if (request_count) {
            proportion = count_5xx / request_count;
        }
        if (proportion > 0.05) {
            result = "5xx数量:" + count_5xx +",当前占比:" + (proportion * 100).toFixed(2) + "%";
            return result;
        }
    }
    

场景二:5XX状态码数量环比事件

  • 描述:监控最近5分钟5XX状态码的数量,比较当前时间5XX状态码数量超过一定值(假定1000),并且当前数量与5分钟前的5XX数量相比增多超过一定比例(假定20%),则认为触发事件,记录当前时间与5分钟前的5XX数量。
  • 思路:
    • 选择该事件数据来源的索引 - atd-statistics-domain-*
    • 编写Elasticsearch查询条件。根据描述可知,时间范围为最近五分钟,需要查询得到5XX的请求数量。
    • 编写触发函数。根据描述需要进行当前数量与5分钟前数量进行比较,这里需要借助CacheStorage模块进行5XX数量的临时存储,存储的信息包括(key:'5xx_count_MM-DDHH:mm',value: 5XX数量),在进行触发判断时,首先去获取5分钟前的数量,然后与给定值,以及当前时间的数量进行比较,若超过则返回需要记录的数据。
  • 实现:
    设置查询条件:
    {
      "query": {
        "bool": {
          "must": [
            {
              "range": {
                "time_local": {
                  "gte": "now-5m",
                  "lte": "now"
                }
              }
            }
          ],
          "must_not": []
        }
      },
      "aggs": {
          "5xx_count_agg": {
              "sum": {
                  "field": "5xx_count"
              }
          }
      },
      "size": 0
    }
    
    查询结果如下:
    {
      "took": 3,
      "timed_out": false,
      "_shards": {
        "total": 8,
        "successful": 8,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 13,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "5xx_count_agg": {
          "value": 87
        }
      }
    }
    
    定义触发条件:
    async function trigger(payload) {
        const normal_5xx_count = 1000;
        const normal_5xx_proportion = 0.2;
        const cache_storage = new CacheStorage('event');
        let timestamp = moment().unix();
        //存当前数据
        let key = '5xx_count_' + moment(timestamp * 1000).format("MM-DDHH:mm");
        let value;
        if (payload.aggregations && payload.aggregations["5xx_count_agg"]) {
            value = payload.aggregations["5xx_count_agg"].value;
        }
        let data = {};
        data.value = value;
        data.name = '5XX数量';
        cache_storage.set(key, data);
        //取5分钟前数据
        let pre_key = '5xx_count_' + moment(timestamp * 1000).subtract(5, 'minutes').format("MM-DDHH:mm");
        let pre_data = await cache_storage.get(pre_key);
        let pre_value = pre_data && pre_data.value;
        //比较
        let rising_value = value - pre_value;
        if (value < normal_5xx_count || value < pre_data) {
            return false;
        }
        if (pre_value && rising_value / pre_value >= normal_5xx_proportion) {
            let rising_proportion = ((rising_value / pre_value) * 100).toFixed(2) + "%";
            let result = '当前5XX数量:' + value + ',5分钟前5XX数量:' + pre_value + "增加:" +  rising_proportion;
            return result;
        }
    }
    

场景三:每分钟请求数TOP5的威胁IP判断是否在威胁情报库中的事件

  • 描述:监控每分钟web类型的威胁事件,对请求数排行TOP5的威胁事件的IP做进一步的判断,判断其是否在已有的威胁情报库中(假定这里通过接口 http://www.baishancloud.com/threatInfo?ip=xxx 判断),通过判断条件若满足条件,则触发事件,记录下该IP及其在威胁情报库中的原因。
接口说明: http://www.baishancloud.com/threatInfo?ip=xxx
接口返回值:
data: {
    hit: 0 or 1, // 0表示威胁情报库中没有该IP,1表示威胁情报库中存在该IP
    reason: '爬虫' // 该IP在威胁情报库中对应的原因
}

  • 思路:

    • 选择该事件数据来源的索引 - atd-event-online-*
    • 编写Elasticsearch查询条件。根据描述可知,时间范围为每分钟,需要查询得到类型为web,按IP维度聚合并按请求数降序排序的TOP5威胁事件。
    • 编写触发函数。根据描述可知需要对TOP5的IP进行调用接口,并对接口返回进行判断,若满足条件则触发事件返回需要记录的信息。
  • 实现:

    设置查询条件:
    {
        "query": {
            "bool": {
                "must": [
                    {
                        "range": {
                            "time_local": {
                                "gte": "now-1m",
                                "lte": "now"
                            }
                        }
                    }, {
                        "term": {
                            "service_category": "web"
                        }
                    }
                ],
                "must_not": []
            }
        },
        "aggs": {
            "ip_group": {
                "terms": {
                    "field": "ip",
                    "size": 5,
                    "order": {
                        "pv_count": "desc"
                    }
                },
                "aggs": {
                    "pv_count": {
                        "sum": {
                            "field": "pv"
                        }
                    }
                }
            }
        },
        "size": 0
    }
    
    查询结果如下:
    {
     "took": 7,
     "timed_out": false,
     "_shards": {
       "total": 71,
       "successful": 71,
       "skipped": 0,
       "failed": 0
     },
     "hits": {
       "total": 28,
       "max_score": 0,
       "hits": []
     },
     "aggregations": {
       "ip_group": {
         "doc_count_error_upper_bound": -1,
         "sum_other_doc_count": 20,
         "buckets": [
           {
             "key": "124.65.226.82",
             "doc_count": 2,
             "pv_count": {
               "value": 26
             }
           },
           {
             "key": "36.110.108.66",
             "doc_count": 2,
             "pv_count": {
               "value": 21
             }
           },
           {
             "key": "203.76.219.218",
             "doc_count": 2,
             "pv_count": {
               "value": 3
             }
           },
           {
             "key": "36.110.72.18",
             "doc_count": 1,
             "pv_count": {
               "value": 3
             }
           },
           {
             "key": "180.118.247.2",
             "doc_count": 1,
             "pv_count": {
               "value": 2
             }
           }
         ]
       }
     }
    }
    
    定义触发条件:
    async function trigger(payload) {
        const requestUrl = "http://www.baishancloud.com/threatInfo?ip=";
        const method = "get";
        let topIP = [];
        let result = "";
        if (payload.aggregations && payload.aggregations.ip_group && payload.aggregations.ip_group.buckets) {
            topIP = payload.aggregations.ip_group.buckets;
        }
        for (let i = 0; i < topIP.length; i++) {
           let url = requestUrl + topIP[i].key;
           let res = await fetchData(url, method);
           let hited = res.data && res.data.hit || 0;
           let reason = res.data && res.data.reason || '暂无';
           if (hited) {
               result += "IP:" + topIP[i].key + "情报库内原因:" + reason + "<br/>";
           }
        }
        if (result) {
            return result;
        }
    }
    

Q&A

如何进行接口请求?

在触发函数编写的过程中,本产品提供了封装好的接口请求函数fetchData,该函数的返回值为请求返回值。 请配合async/await的同步操作使用。即触发函数以async开头,接口请求函数以await开始。示例如下:

  fetchData(url, method, headers, body),其中:
  url: 完整的请求地址,
  method: 请求方法,
  headers: 请求的headers,不传时默认为{ 'Content-Type': 'application/json' },
  body: 请求的body体需进行stringify,JSON.stringify(body)

对于一个带接口请求的触发函数,示例如下:

async function trigger(payload) {
    let url = "http://xxx/test";
    let method = "get";
    let data = await fetchData(url, method);
    let result = "payload.hits.total=" + payload.hits.total;
    if (payload.hits.total > data.total) {
        return result;
    }
}
async function trigger(payload) {
    let url = "http://xxx/test";
    let method = "post";
    let headers = { 'Content-Type': 'application/json' };
    let body = JSON.stringify({a:1});
    let data = await fetchData(url, method, headers, body);

如何进行时间格式化?

对于触发函数中的时间格式化处理,本产品支持moment进行时间格式化。示例如下:

var time = moment().unix();

如何进行临时数据存储?

本产品内置了临时数据存储模块CacheStorage,可供用户进行数据的临时存储,以及后续使用。

该存储模块提供了三种处理数据的方法,分别为get,set,delete。请配合async/await的同步操作使用。

CacheStorage模块的使用具体描述如下:

const cache_storage = new CacheStorage('event');
cache_storage.get(key);
cache_storage.set(key, value, expire_time);
cache_storage.delete(key);
cache_storage.get(key),其中:
参数:
  key: 数据存储的key值
返回值:
  若存在该key对应的value,且该数据没有过期,则返回value,否则返回null。
示例:
let value = await cache_storage.get(key);
cache_storage.set(key, value, expire_time),其中:
参数:
  key: 数据存储的key;
  value: 存储的数据,支持number,string,object,array等格式;
  expire_time: 数据保存时间(秒),可选。若不填,默认数据保存30天;
返回值:
  若数据存储成功,返回true;失败则返回false。
示例:
  let key = 'total';
  let value = 100;
  let expire_time = 86400; //数据保存86400秒
  await set(key, value, expire_time);
cache_storage.delete(key),其中:
参数:
  key: 数据存储的key值
返回值:
  若数据删除成功,返回true;失败则返回false。
示例:
  await cache_storage.delete(key);

如何发送邮件?

本产品提供内置的邮件发送函数postEmail,示例如下:

  function postEmail(mailData, SMTPConfig, callReady, callError),其中:
  mailData: 发送邮件内容对象,包括to、title、content属性;
  SMTPConfig: 若需要使用自己的SMTP服务器,请传入相关配置包括host、port、user、password属性,可选;
  callReady: 发送成功回调,可选;
  callError: 发送失败回调,可选;

其中mailData参数,分使用本产品提供的SMTP服务和用户自身SMTP服务两种情况进行说明:

  1. 使用本产品提供的SMTP,示例如下:
  let mailData = {}; //邮件信息
  mailData.to = xxx; //收件人邮箱地址,多个用分号分割
  mailData.title = xxx; //邮件主题
  mailData.content = xxx; //邮件内容
  postEmail(mailData);
  1. 用户自身SMTP服务器,示例如下:
  let mailData = {}; //邮件信息
  mailData.to = xxx; //收件人邮箱地址,多个用分号分割
  mailData.title = xxx; //邮件主题
  mailData.content = xxx; //邮件内容
  let SMTPConfig = {}; //SMTP服务器信息
  SMTPConfig.host = xxx; //SMPT服务器地址
  SMTPConfig.port = xxx; //端口
  SMTPConfig.user = xxx; //发件邮箱
  SMTPConfig.password = xxx; //SMTP授权码
  postEmail(mailData, SMTPConfig);

如何进行ES数据查询?

在触发函数编写的过程中,本产品提供了封装好的ES请求模块esClient,详细的API调用方式请参考官方elasticsearch.js。 文档如下:

https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/api-reference.html

请配合async/await的同步操作使用。即触发函数以async开头,接口请求函数以await开始。示例如下:

async function trigger(payload) {
    let index = xxx;
    let body = xxx;
    let data = await esClient.search({
      index: index,
      body: body
    });
    ......判断条件......
}

如何进行设备调用?

在触发函数编写的过程中,为了方便关联各种设备的数据,本产品提供了封装好的调用模块bscModule,可用于各种设备调用交互处理。bscModule模块中封装的各设备类型及文档说明,如下表所示。

名称 对应模块 示例 文档
f5 f5IControl let f5IControl = bscModule.f5IControl https://github.com/thwi/node-icontrol
radware radWare let radWare = bscModule.radWare https://webhelp.radware.com/Vision/REST/4_00_00/index.html

具体示例:

f5模块使用

function trigger(payload) {
    var f5IControl = bscModule.f5IControl; //引入f5关联模块
    var bigip = new f5IControl({
      host: '192.xxx.xxx.xxx',
      proto: 'https',
      port: 443,
      user: 'admin',
      pass: 'admin',
      strict: true,
      debug: false
    });
    ......其他api使用请参考文档:https://github.com/thwi/node-icontrol#usage ......
}

radware模块使用,目前以v4为例

radware模块中,提供登录函数login,以及数据交互请求函数request。 login函数的返回值如下:

登录成功:
{
  "status": "ok",
  "jsessionid": "sessionid"
}
登录失败:
{
  "status": "error",
  "message": "Invalid Username or invalid Password. Re-enter."
}

request函数具体描述如下:

request(path, method, opts),其中:
  path: 请参照radware文档中各请求path,如:'/mgmt/system/user/logout',
  method: 请求方法,
  opts: {
    headers: {}, //已设置默认值,一般请求无需设置,特殊情况可自行设置
    body: {}
  }
  * headers说明,若需设置,请参照
  headers = {
    'Content-Type': 'application/json',
    'cookie': jsessionid //*必须,值为login接口返回的 jsessionid
  };

具体应用实例如下:

async function trigger(payload) {
    var radWare = bscModule.radWare; //引入radware关联模块
    var radware = new radWare({
      host: '192.xxx.xxx.xxx',
      proto: 'http', //默认为http
      username: 'admin',
      password: 'admin'
    });
    //登录
    var login = await radware.login();
    if(login.status == 'ok') {
      //进一步的数据关联交互,如获取apm设备列表
      var apm_application = await radware.requst('/mgmt/system/config/apm/application','get');
      ......其他api使用请参考官方文档https://webhelp.radware.com/Vision/REST/4_00_00/index.html......
    }
}

如何使用JavaScript工具库?

为了更方便,快捷地使用JavaScript语言进行高级模式的编写,本产品提供了内置封装的lodashJS工具模块,使用时通过_调用,简单示例如下:

对象取值
_.get(object, path, [defaultValue])

var object = { 'a': [{ 'b': { 'c': 3 } }] };

_.get(object, 'a[0].b.c');
// => 3

_.get(object, ['a', '0', 'b', 'c']);
// => 3

_.get(object, 'a.b.c', 'default');
// => 'default'

数组的交集
_.intersection([arrays])

参数
[arrays] (...Array): 待检查的数组。
返回值
(Array): 返回一个包含所有传入数组交集元素的新数组。

_.intersection([1, 2], [4, 2], [2, 1]);
// => [2]

数组的差集
_.difference(array, [values])
参数
array (Array): 要检查的数组。
[values] (...Array): 排除的值。
返回值
(Array): 返回一个过滤值后的新数组。

_.difference([3, 2, 1], [4, 2]);
// => [3, 1]

......

具体的API调用方式,可参考官方文档: https://lodash.com/docs/4.17.15