@@ -41,7 +41,7 @@ def jobs(self):
41
41
# yield job_id, intervals, NetFlowBot.perform_job, job_params
42
42
43
43
# mock the jobs for now: (until frontend is done)
44
- job_id = 'traffic_in '
44
+ job_id = '1min '
45
45
intervals = [60 ]
46
46
job_params = {
47
47
"job_id" : job_id ,
@@ -75,7 +75,7 @@ def jobs(self):
75
75
}
76
76
yield job_id , intervals , NetFlowBot .perform_job , job_params
77
77
78
- job_id = 'daily '
78
+ job_id = '24h '
79
79
intervals = [3600 * 24 ]
80
80
job_params = {
81
81
"job_id" : job_id ,
@@ -122,35 +122,40 @@ def perform_job(*args, **job_params):
122
122
# }
123
123
# https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622
124
124
125
- affecting_intervals , = args
125
+ job_id = job_params [ "job_id" ]
126
126
values = []
127
127
entity_info = job_params ["entity_info" ]
128
+ minute_ago = datetime .now () - timedelta (minutes = 1 )
128
129
129
- if 60 in affecting_intervals :
130
+ if job_id == '1min' :
130
131
output_path_prefix = f'entity.{ entity_info ["entity_id" ]} .netflow.traffic_in'
131
132
132
- minute_ago = datetime .now () - timedelta (minutes = 1 )
133
133
two_minutes_ago = minute_ago - timedelta (minutes = 1 )
134
134
135
135
# Traffic in and out: (per interface)
136
136
values .extend (NetFlowBot .get_values_traffic_in (output_path_prefix , two_minutes_ago , minute_ago ))
137
137
values .extend (NetFlowBot .get_values_traffic_out (output_path_prefix , two_minutes_ago , minute_ago ))
138
+ # output_path_prefix = f'entity.{entity_info["entity_id"]}.netflow'
138
139
values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = True ))
139
140
values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = False ))
141
+ values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = True ))
142
+ values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = False ))
140
143
141
144
# every hour, collect stats for the whole hour:
142
- if 3600 in affecting_intervals :
145
+ elif job_id == '1h' :
143
146
output_path_prefix_1hour = f'entity.{ entity_info ["entity_id" ]} .netflow.traffic.in.1hour'
144
147
hour_ago = minute_ago - timedelta (hours = 1 )
145
148
values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1hour , hour_ago , minute_ago , 18 , is_direction_in = True ))
146
149
values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1hour , hour_ago , minute_ago , 18 , is_direction_in = False ))
147
150
148
151
# every 24h, also collect stats for the whole day:
149
- if 3600 * 24 in affecting_intervals :
152
+ elif job_id == '24h' :
150
153
output_path_prefix_1day = f'entity.{ entity_info ["entity_id" ]} .netflow.traffic.in.1day'
151
154
day_ago = minute_ago - timedelta (days = 1 )
152
155
values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = True ))
153
156
values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = False ))
157
+ values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = True ))
158
+ values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = False ))
154
159
155
160
if not values :
156
161
log .warning ("No values found to be sent to Grafolean" )
@@ -228,7 +233,7 @@ def get_top_N_IPs(output_path_prefix, from_time, to_time, interface_index, is_di
228
233
# TODO: missing check for IP: r.client_ip = %s AND
229
234
c .execute (f"""
230
235
SELECT
231
- f.IPV4_DST_ADDR ,
236
+ f.IPV4_ { 'SRC' if is_direction_in else 'DST' } _ADDR ,
232
237
sum(f.IN_BYTES) "traffic"
233
238
FROM
234
239
netflow_records "r",
@@ -240,14 +245,12 @@ def get_top_N_IPs(output_path_prefix, from_time, to_time, interface_index, is_di
240
245
f.{ 'INPUT_SNMP' if is_direction_in else 'OUTPUT_SNMP' } = %s AND
241
246
f.DIRECTION = { '0' if is_direction_in else '1' }
242
247
GROUP BY
243
- f.IPV4_DST_ADDR
248
+ f.IPV4_ { 'SRC' if is_direction_in else 'DST' } _ADDR
244
249
ORDER BY
245
250
traffic desc
246
251
LIMIT 10;
247
252
""" , (from_time , to_time , interface_index ,))
248
253
249
- #SELECT f.data->'IPV4_DST_ADDR', sum((f.data->'IN_BYTES')::integer) "traffic" FROM netflow_records "r", netflow_flows "f" WHERE r.ts >= now() - interval '1 minute' AND r.seq = f.record AND (f.data->'INPUT_SNMP')::integer = 18 AND (f.data->'DIRECTION')::integer = '0' GROUP BY f.data->'IPV4_DST_ADDR' ORDER BY traffic desc LIMIT 10;
250
-
251
254
values = []
252
255
for top_ip , traffic_bytes in c .fetchall ():
253
256
output_path = f"{ output_path_prefix } .topip.{ 'in' if is_direction_in else 'out' } .{ interface_index } .if{ interface_index } .{ top_ip } "
@@ -257,6 +260,39 @@ def get_top_N_IPs(output_path_prefix, from_time, to_time, interface_index, is_di
257
260
})
258
261
return values
259
262
263
+ @staticmethod
264
+ def get_top_N_protocols (output_path_prefix , from_time , to_time , interface_index , is_direction_in = True ):
265
+ with db .cursor () as c :
266
+ # TODO: missing check for IP: r.client_ip = %s AND
267
+ c .execute (f"""
268
+ SELECT
269
+ f.PROTOCOL,
270
+ sum(f.IN_BYTES) "traffic"
271
+ FROM
272
+ netflow_records "r",
273
+ netflow_flows "f"
274
+ WHERE
275
+ r.ts >= %s AND
276
+ r.ts < %s AND
277
+ r.seq = f.record AND
278
+ f.{ 'INPUT_SNMP' if is_direction_in else 'OUTPUT_SNMP' } = %s AND
279
+ f.DIRECTION = { '0' if is_direction_in else '1' }
280
+ GROUP BY
281
+ f.PROTOCOL
282
+ ORDER BY
283
+ traffic desc
284
+ LIMIT 10;
285
+ """ , (from_time , to_time , interface_index ,))
286
+
287
+ values = []
288
+ for protocol , traffic_bytes in c .fetchall ():
289
+ output_path = f"{ output_path_prefix } .topproto.{ 'in' if is_direction_in else 'out' } .{ interface_index } .if{ interface_index } .{ protocol } .{ PROTOCOLS [protocol ]} "
290
+ values .append ({
291
+ 'p' : output_path ,
292
+ 'v' : traffic_bytes / 60. , # Bps
293
+ })
294
+ return values
295
+
260
296
261
297
def wait_for_grafolean (backend_url ):
262
298
while True :
0 commit comments