Skip to content

Commit 59ab26f

Browse files
committed
Sending periodic cummulative traffic data for all entities
1 parent 7d2f709 commit 59ab26f

File tree

6 files changed

+218
-95
lines changed

6 files changed

+218
-95
lines changed

Pipfile.lock

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dbutils.py

+4
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,7 @@ def migration_step_2():
154154
IPV4_SRC_ADDR TEXT
155155
);
156156
""")
157+
158+
def migration_step_3():
159+
with db.cursor() as c:
160+
c.execute(f'CREATE TABLE {DB_PREFIX}bot_jobs (job_id TEXT NOT NULL, last_used_seq BIGSERIAL DEFAULT NULL);')

lookup.py

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
DIRECTION_INGRESS = 0
2+
DIRECTION_EGRESS = 1
3+
14
PROTOCOLS = {
25
0: "HOPOPT",
36
1: "ICMP",

netflowbot.py

+171-92
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from grafoleancollector import Collector, send_results_to_grafolean
1616
from dbutils import db, DB_PREFIX
17-
from lookup import PROTOCOLS
17+
from lookup import PROTOCOLS, DIRECTION_INGRESS, DIRECTION_EGRESS
1818

1919
logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
2020
datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG)
@@ -25,6 +25,26 @@
2525
log = logging.getLogger("{}.{}".format(__name__, "base"))
2626

2727

28+
def _get_last_used_seq(job_id):
29+
with db.cursor() as c:
30+
c.execute(f'SELECT j.last_used_seq, r.ts FROM {DB_PREFIX}bot_jobs j, {DB_PREFIX}records r WHERE j.id = %s and j.last_used_seq == r.seq;', (job_id,))
31+
last_used_seq, ts = c.fetchone()
32+
return last_used_seq, ts
33+
34+
def _get_current_max_seq():
35+
with db.cursor() as c:
36+
c.execute(f"SELECT MAX(seq) FROM {DB_PREFIX}records;")
37+
max_seq, = c.fetchone()
38+
return max_seq
39+
40+
def _save_current_max_seq(job_id, seq):
41+
with db.cursor() as c:
42+
c.execute(f"INSERT INTO {DB_PREFIX}bot_jobs (job_id, last_used_seq) VALUES (%s, %s) ON CONFLICT (job_id) DO UPDATE SET last_used_seq = %s;", (job_id, seq, seq))
43+
44+
45+
def get_entities():
46+
requests.get()
47+
2848
class NetFlowBot(Collector):
2949

3050
def jobs(self):
@@ -45,6 +65,7 @@ def jobs(self):
4565
intervals = [60]
4666
job_params = {
4767
"job_id": job_id,
68+
"interval_slug": '1min',
4869
"entity_info": {
4970
"account_id": 129104112,
5071
"entity_id": 236477687,
@@ -58,104 +79,123 @@ def jobs(self):
5879
}
5980
yield job_id, intervals, NetFlowBot.perform_job, job_params
6081

61-
job_id = '1h'
62-
intervals = [3600]
63-
job_params = {
64-
"job_id": job_id,
65-
"entity_info": {
66-
"account_id": 129104112,
67-
"entity_id": 236477687,
68-
"entity_type": "device",
69-
"details": {
70-
"ipv4": "1.2.3.4"
71-
},
72-
},
73-
"backend_url": self.backend_url,
74-
"bot_token": self.bot_token,
75-
}
76-
yield job_id, intervals, NetFlowBot.perform_job, job_params
77-
78-
job_id = '24h'
79-
intervals = [3600 * 24]
80-
job_params = {
81-
"job_id": job_id,
82-
"entity_info": {
83-
"account_id": 129104112,
84-
"entity_id": 236477687,
85-
"entity_type": "device",
86-
"details": {
87-
"ipv4": "1.2.3.4"
88-
},
89-
},
90-
"backend_url": self.backend_url,
91-
"bot_token": self.bot_token,
92-
}
93-
yield job_id, intervals, NetFlowBot.perform_job, job_params
94-
82+
# job_id = '1h'
83+
# intervals = [3600]
84+
# job_params = {
85+
# "job_id": job_id,
86+
# "interval_slug": '1h',
87+
# "entity_info": {
88+
# "account_id": 129104112,
89+
# "entity_id": 236477687,
90+
# "entity_type": "device",
91+
# "details": {
92+
# "ipv4": "1.2.3.4"
93+
# },
94+
# },
95+
# "backend_url": self.backend_url,
96+
# "bot_token": self.bot_token,
97+
# }
98+
# yield job_id, intervals, NetFlowBot.perform_job, job_params
99+
100+
# job_id = '24h'
101+
# intervals = [3600 * 24]
102+
# job_params = {
103+
# "job_id": job_id,
104+
# "interval_slug": '24h',
105+
# "entity_info": {
106+
# "account_id": 129104112,
107+
# "entity_id": 236477687,
108+
# "entity_type": "device",
109+
# "details": {
110+
# "ipv4": "1.2.3.4"
111+
# },
112+
# },
113+
# "backend_url": self.backend_url,
114+
# "bot_token": self.bot_token,
115+
# }
116+
# yield job_id, intervals, NetFlowBot.perform_job, job_params
95117

96118

97119
# This method is called whenever the job needs to be done. It gets the parameters and performs fetching of data.
98120
@staticmethod
99121
def perform_job(*args, **job_params):
100-
# {
101-
# "DST_AS": 0,
102-
# "SRC_AS": 0,
103-
# "IN_PKTS": 1, # Incoming counter with length N x 8 bits for the number of packets associated with an IP Flow
104-
# "SRC_TOS": 0,
105-
# "DST_MASK": 0,
106-
# "IN_BYTES": 52, # Incoming counter with length N x 8 bits for number of bytes associated with an IP Flow.
107-
# "PROTOCOL": 6, # IP protocol
108-
# "SRC_MASK": 25,
109-
# "DIRECTION": 0, # Flow direction: 0 - ingress flow, 1 - egress flow
110-
# "TCP_FLAGS": 20,
111-
# "INPUT_SNMP": 17, # Input interface index
112-
# "L4_DST_PORT": 443, # TCP/UDP destination port number
113-
# "L4_SRC_PORT": 36458,
114-
# "OUTPUT_SNMP": 3, # Output interface index
115-
# "IPV4_DST_ADDR": "1.2.3.4",
116-
# "IPV4_NEXT_HOP": 1385497089,
117-
# "IPV4_SRC_ADDR": "4.3.2.1",
118-
# "LAST_SWITCHED": 2222830592,
119-
# "FIRST_SWITCHED": 2222830592,
120-
# "FLOW_SAMPLER_ID": 0,
121-
# "UNKNOWN_FIELD_TYPE": 0
122-
# }
123-
# https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622
122+
# \d netflow_flows
123+
# Column | Type | Description
124+
# ---------------+----------+-------------
125+
# record | integer | // FK -> netflow_records.seq (PK)
126+
# in_bytes | integer | number of bytes associated with an IP Flow
127+
# protocol | smallint | IP protocol (see lookup.py -> PROTOCOLS)
128+
# direction | smallint | flow direction: 0 - ingress flow, 1 - egress flow
129+
# l4_dst_port | integer | destination port
130+
# l4_src_port | integer | source port
131+
# input_snmp | smallint | input interface index
132+
# output_snmp | smallint | output interface index
133+
# ipv4_src_addr | text | source IP
134+
# ipv4_dst_addr | text | destination IP
135+
# ---------------+----------+-------------
124136

125137
job_id = job_params["job_id"]
138+
interval_slug = job_params["interval_slug"]
139+
140+
entity_id = entity_ip = interface_index = None
141+
entity_info = job_params.get("entity_info", None)
142+
if entity_info is not None:
143+
entity_id = entity_info["entity_id"]
144+
entity_ip = entity_info["details"]["ipv4"]
145+
interface_index = entity_info.get("interface_index", None)
146+
147+
148+
last_used_seq, last_used_ts = _get_last_used_seq(job_id)
149+
max_seq = _get_current_max_seq()
150+
_save_current_max_seq(job_id, max_seq)
151+
126152
values = []
127-
entity_info = job_params["entity_info"]
128-
minute_ago = datetime.now() - timedelta(minutes=1)
129-
130-
if job_id == '1min':
131-
output_path_prefix = f'entity.{entity_info["entity_id"]}.netflow.traffic_in'
132-
133-
two_minutes_ago = minute_ago - timedelta(minutes=1)
134-
135-
# Traffic in and out: (per interface)
136-
values.extend(NetFlowBot.get_values_traffic_in(output_path_prefix, two_minutes_ago, minute_ago))
137-
values.extend(NetFlowBot.get_values_traffic_out(output_path_prefix, two_minutes_ago, minute_ago))
138-
# output_path_prefix = f'entity.{entity_info["entity_id"]}.netflow'
139-
values.extend(NetFlowBot.get_top_N_IPs(output_path_prefix, two_minutes_ago, minute_ago, 18, is_direction_in=True))
140-
values.extend(NetFlowBot.get_top_N_IPs(output_path_prefix, two_minutes_ago, minute_ago, 18, is_direction_in=False))
141-
values.extend(NetFlowBot.get_top_N_protocols(output_path_prefix, two_minutes_ago, minute_ago, 18, is_direction_in=True))
142-
values.extend(NetFlowBot.get_top_N_protocols(output_path_prefix, two_minutes_ago, minute_ago, 18, is_direction_in=False))
143-
144-
# every hour, collect stats for the whole hour:
145-
elif job_id == '1h':
146-
output_path_prefix_1hour = f'entity.{entity_info["entity_id"]}.netflow.traffic.in.1hour'
147-
hour_ago = minute_ago - timedelta(hours=1)
148-
values.extend(NetFlowBot.get_top_N_IPs(output_path_prefix_1hour, hour_ago, minute_ago, 18, is_direction_in=True))
149-
values.extend(NetFlowBot.get_top_N_IPs(output_path_prefix_1hour, hour_ago, minute_ago, 18, is_direction_in=False))
150-
151-
# every 24h, also collect stats for the whole day:
152-
elif job_id == '24h':
153-
output_path_prefix_1day = f'entity.{entity_info["entity_id"]}.netflow.traffic.in.1day'
154-
day_ago = minute_ago - timedelta(days=1)
155-
values.extend(NetFlowBot.get_top_N_IPs(output_path_prefix_1day, day_ago, minute_ago, 18, is_direction_in=True))
156-
values.extend(NetFlowBot.get_top_N_IPs(output_path_prefix_1day, day_ago, minute_ago, 18, is_direction_in=False))
157-
values.extend(NetFlowBot.get_top_N_protocols(output_path_prefix_1day, day_ago, minute_ago, 18, is_direction_in=True))
158-
values.extend(NetFlowBot.get_top_N_protocols(output_path_prefix_1day, day_ago, minute_ago, 18, is_direction_in=False))
153+
for direction in [DIRECTION_EGRESS, DIRECTION_INGRESS]:
154+
values.extend(NetFlowBot.get_traffic_all_entities(interval_slug, last_used_seq, max_seq, direction=direction))
155+
156+
157+
# values.extend(NetFlowBot.get_traffic(interval_slug, last_used_seq, max_seq, direction=DIRECTION_EGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index))
158+
# values.extend(NetFlowBot.get_traffic(interval_slug, last_used_seq, max_seq, direction=DIRECTION_INGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index))
159+
160+
# values.extend(NetFlowBot.get_top_protocols(interval_slug, last_used_seq, max_seq, direction=DIRECTION_EGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index, n=15))
161+
# values.extend(NetFlowBot.get_top_protocols(interval_slug, last_used_seq, max_seq, direction=DIRECTION_INGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index, n=15))
162+
163+
# values.extend(NetFlowBot.get_top_IPs(interval_slug, last_used_seq, max_seq, direction=DIRECTION_EGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index, n=15))
164+
# values.extend(NetFlowBot.get_top_IPs(interval_slug, last_used_seq, max_seq, direction=DIRECTION_INGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index, n=15))
165+
166+
167+
# protocol_str = 'TCP'
168+
# ipv4_dst_addr = '1.2.3.4'
169+
# ipv4_src_addr = '4.3.2.1'
170+
# # traffic on all devices, all interfaces, per ingress / egress:
171+
# f'netflow.{interval_slug}.egress'
172+
# f'netflow.{interval_slug}.ingress'
173+
# # traffic on all devices, all interfaces, per ingress / egress, for top X protocols:
174+
# f'netflow.{interval_slug}.egress.protocol.{protocol_str}'
175+
# f'netflow.{interval_slug}.ingress.protocol.{protocol_str}'
176+
# # traffic on all devices, all interfaces, per ingress / egress, for top X ips:
177+
# f'netflow.{interval_slug}.egress.ip.{ipv4_dst_addr}'
178+
# f'netflow.{interval_slug}.ingress.ip.{ipv4_src_addr}'
179+
180+
# # traffic on all interfaces, per device, per ingress / egress:
181+
# f'netflow.{interval_slug}.egress.entity.{entity_id}'
182+
# f'netflow.{interval_slug}.ingress.entity.{entity_id}'
183+
# # traffic on all interfaces, per device, per ingress / egress, for top X protocols:
184+
# f'netflow.{interval_slug}.egress.entity.{entity_id}.protocol.{protocol_str}'
185+
# f'netflow.{interval_slug}.ingress.entity.{entity_id}.protocol.{protocol_str}'
186+
# # traffic on all interfaces, per device, per ingress / egress, for top X ips:
187+
# f'netflow.{interval_slug}.egress.entity.{entity_id}.ip.{ipv4_dst_addr}'
188+
# f'netflow.{interval_slug}.ingress.entity.{entity_id}.ip.{ipv4_src_addr}'
189+
190+
# # traffic per interface, per device, per ingress / egress:
191+
# f'netflow.{interval_slug}.egress.entity.{entity_id}.if.{output_snmp}'
192+
# f'netflow.{interval_slug}.ingress.entity.{entity_id}.if.{input_snmp}'
193+
# # traffic per interface, per device, per ingress / egress, for top X protocols:
194+
# f'netflow.{interval_slug}.egress.entity.{entity_id}.if.{output_snmp}.protocol.{protocol_str}'
195+
# f'netflow.{interval_slug}.ingress.entity.{entity_id}.if.{input_snmp}.protocol.{protocol_str}'
196+
# # traffic per interface, per device, per ingress / egress, for top X ips:
197+
# f'netflow.{interval_slug}.egress.entity.{entity_id}.if.{output_snmp}.ip.{ipv4_dst_addr}'
198+
# f'netflow.{interval_slug}.ingress.entity.{entity_id}.if.{input_snmp}.ip.{ipv4_src_addr}'
159199

160200
if not values:
161201
log.warning("No values found to be sent to Grafolean")
@@ -170,7 +210,45 @@ def perform_job(*args, **job_params):
170210
)
171211

172212
@staticmethod
173-
def get_values_traffic_in(output_path_prefix, from_time, to_time):
213+
def construct_output_path_prefix(interval_slug, direction, entity, interface):
214+
prefix = f"netflow.{interval_slug}.{'ingress' if direction == DIRECTION_INGRESS else 'egress'}"
215+
if entity is None:
216+
return prefix
217+
prefix = f'{prefix}.entity.{entity}'
218+
if interface is None:
219+
return prefix
220+
prefix = f'{prefix}.if.{interface}'
221+
return prefix
222+
223+
224+
@staticmethod
225+
def get_traffic_all_entities(interval_slug, last_seq, max_seq, direction):
226+
output_path = NetFlowBot.construct_output_path_prefix(interval_slug, direction, entity=None, interface=None)
227+
with db.cursor() as c:
228+
c.execute(f"""
229+
SELECT
230+
sum(f.in_bytes)
231+
FROM
232+
{DB_PREFIX}records "r",
233+
{DB_PREFIX}flows "f"
234+
WHERE
235+
r.seq > %s AND
236+
r.ts <= %s AND
237+
r.seq = f.record AND
238+
f.direction = %s
239+
""", (last_seq, max_seq, direction))
240+
values = []
241+
traffic_bytes, = c.fetchone()
242+
values.append({
243+
'p': output_path,
244+
'v': traffic_bytes, # Bps
245+
})
246+
return values
247+
248+
249+
@staticmethod
250+
def get_traffic(interval_slug, last_seq, max_seq, direction, entity=None, interface=None):
251+
output_path = NetFlowBot.construct_output_path_prefix(interval_slug, direction, entity, interface)
174252
with db.cursor() as c:
175253
# TODO: missing check for IP: r.client_ip = %s AND
176254
c.execute(f"""
@@ -197,6 +275,7 @@ def get_values_traffic_in(output_path_prefix, from_time, to_time):
197275
'v': traffic_bytes / 60., # Bps
198276
})
199277
return values
278+
return []
200279

201280
@staticmethod
202281
def get_values_traffic_out(output_path_prefix, from_time, to_time):

netflowwriter.py

+25
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,31 @@ def process_named_pipe(named_pipe_filename):
4444

4545

4646
def write_record(j):
47+
# {
48+
# "DST_AS": 0,
49+
# "SRC_AS": 0,
50+
# "IN_PKTS": 1, # Incoming counter with length N x 8 bits for the number of packets associated with an IP Flow
51+
# "SRC_TOS": 0,
52+
# "DST_MASK": 0,
53+
# "IN_BYTES": 52, # Incoming counter with length N x 8 bits for number of bytes associated with an IP Flow.
54+
# "PROTOCOL": 6, # IP protocol
55+
# "SRC_MASK": 25,
56+
# "DIRECTION": 0, # Flow direction: 0 - ingress flow, 1 - egress flow
57+
# "TCP_FLAGS": 20,
58+
# "INPUT_SNMP": 17, # Input interface index
59+
# "L4_SRC_PORT": 36458, # TCP/UDP source port number
60+
# "L4_DST_PORT": 443, # TCP/UDP destination port number
61+
# "OUTPUT_SNMP": 3, # Output interface index
62+
# "IPV4_DST_ADDR": "1.2.3.4",
63+
# "IPV4_NEXT_HOP": 1385497089,
64+
# "IPV4_SRC_ADDR": "4.3.2.1",
65+
# "LAST_SWITCHED": 2222830592,
66+
# "FIRST_SWITCHED": 2222830592,
67+
# "FLOW_SAMPLER_ID": 0,
68+
# "UNKNOWN_FIELD_TYPE": 0
69+
# }
70+
# https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622
71+
4772
with db.cursor() as c:
4873
# first save the flow record:
4974
ts = datetime.utcfromtimestamp(j['ts'])

test_bot.py

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from netflowbot import NetFlowBot
2+
from lookup import DIRECTION_EGRESS, DIRECTION_INGRESS
3+
4+
def test_bot_output_path():
5+
assert NetFlowBot.construct_output_path_prefix('1min', DIRECTION_EGRESS, None, None) == 'netflow.1min.egress'
6+
assert NetFlowBot.construct_output_path_prefix('1h', DIRECTION_EGRESS, None, None) == 'netflow.1h.egress'
7+
8+
assert NetFlowBot.construct_output_path_prefix('1min', DIRECTION_INGRESS, None, None) == 'netflow.1min.ingress'
9+
assert NetFlowBot.construct_output_path_prefix('1h', DIRECTION_INGRESS, None, None) == 'netflow.1h.ingress'
10+
11+
assert NetFlowBot.construct_output_path_prefix('1h', DIRECTION_INGRESS, '123', None) == 'netflow.1h.ingress.entity.123'
12+
assert NetFlowBot.construct_output_path_prefix('1h', DIRECTION_INGRESS, '123', '321') == 'netflow.1h.ingress.entity.123.if.321'

0 commit comments

Comments
 (0)