Skip to content

Commit 1487976

Browse files
committed
Send less data over named pipe (parse packets later)
1 parent 58f5122 commit 1487976

File tree

2 files changed

+115
-118
lines changed

2 files changed

+115
-118
lines changed

netflowcollector.py

+17-97
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import argparse
2+
import base64
23
import gzip
34
import json
45
import logging
@@ -12,16 +13,6 @@
1213
from colors import color
1314

1415

15-
from lookup import DIRECTION_INGRESS
16-
17-
18-
# python-netflow-v9-softflowd expects main.py to be the main entrypoint, but we only need
19-
# get_export_packets() iterator:
20-
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/pynetflow')
21-
from pynetflow.main import get_export_packets
22-
# disable DEBUG logging on NetFlow collector library:
23-
logging.getLogger('pynetflow.main').setLevel(logging.INFO)
24-
2516
IS_DEBUG = os.environ.get('DEBUG', 'false') in ['true', 'yes', '1']
2617
logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
2718
datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG if IS_DEBUG else logging.INFO)
@@ -32,99 +23,28 @@
3223
log = logging.getLogger("{}.{}".format(__name__, "collector"))
3324

3425

35-
def process_netflow(netflow_port, named_pipe_filename):
36-
# endless loop - read netflow packets, encode them to JSON and write them to named pipe:
37-
line = None
38-
last_record_seqs = {}
26+
def pass_netflow_data(netflow_port, named_pipe_filename):
27+
# endless loop - read netflow packets from UDP port and write them to named pipe:
28+
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
29+
server_address = ('localhost', netflow_port,)
30+
log.debug('starting up on {} port {}'.format(*server_address))
31+
sock.bind(server_address)
32+
33+
MAX_BUF_SIZE = 4096
34+
BUFFERING_LINES = 1 # https://docs.python.org/2/library/functions.html#open
3935
while True:
4036
try:
41-
with open(named_pipe_filename, "wb", 0) as fp:
42-
# if named pipe threq an error for some reason (BrokenPipe), write the line we
43-
# have in buffer before listening to new packets:
44-
if line is not None:
45-
fp.write(line)
46-
line = None
47-
for ts, client, export in get_export_packets('0.0.0.0', NETFLOW_PORT):
48-
49-
client_ip, _ = client
37+
with open(named_pipe_filename, "wb", BUFFERING_LINES) as fp:
38+
data, address = sock.recvfrom(MAX_BUF_SIZE)
39+
now = time.time()
40+
line = json.dumps((base64.b64encode(data).decode(), now, address)).encode() + b'\n'
41+
fp.write(line)
42+
log.debug(f"Passing [{len(data)}] from client [{address[0]}], ts [{now}]")
5043

51-
# check for missing records:
52-
last_record_seq = last_record_seqs.get(client_ip)
53-
if last_record_seq is None:
54-
log.warning(f"Last record sequence number is not known, starting with {export.header.sequence}")
55-
elif export.header.sequence != last_record_seq + 1:
56-
log.error(f"Sequence number ({export.header.sequence}) does not follow ({last_record_seq}), some records might have been skipped")
57-
last_record_seqs[client_ip] = export.header.sequence
58-
59-
flows_data = [flow.data for flow in export.flows]
60-
61-
if export.header.version == 9:
62-
entry = {
63-
"ts": ts,
64-
"client": client_ip,
65-
"seq": export.header.sequence,
66-
"flows": [[
67-
# "IN_BYTES":
68-
data["IN_BYTES"],
69-
# "PROTOCOL":
70-
data["PROTOCOL"],
71-
# "DIRECTION":
72-
data["DIRECTION"],
73-
# "L4_DST_PORT":
74-
data["L4_DST_PORT"],
75-
# "L4_SRC_PORT":
76-
data["L4_SRC_PORT"],
77-
# "INPUT_SNMP":
78-
data["INPUT_SNMP"],
79-
# "OUTPUT_SNMP":
80-
data["OUTPUT_SNMP"],
81-
# "IPV4_DST_ADDR":
82-
data["IPV4_DST_ADDR"],
83-
# "IPV4_SRC_ADDR":
84-
data["IPV4_SRC_ADDR"],
85-
] for data in flows_data],
86-
}
87-
elif export.header.version == 5:
88-
entry = {
89-
"ts": ts,
90-
"client": client_ip,
91-
"seq": export.header.sequence,
92-
"flows": [[
93-
# "IN_BYTES":
94-
data["IN_OCTETS"],
95-
# "PROTOCOL":
96-
data["PROTO"],
97-
# "DIRECTION":
98-
DIRECTION_INGRESS,
99-
# "L4_DST_PORT":
100-
data["DST_PORT"],
101-
# "L4_SRC_PORT":
102-
data["SRC_PORT"],
103-
# "INPUT_SNMP":
104-
data["INPUT"],
105-
# "OUTPUT_SNMP":
106-
data["OUTPUT"],
107-
# netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
108-
# them back to bytes and transform them to strings:
109-
# "IPV4_DST_ADDR":
110-
socket.inet_ntoa(struct.pack('!I', data["IPV4_DST_ADDR"])),
111-
# "IPV4_SRC_ADDR":
112-
socket.inet_ntoa(struct.pack('!I', data["IPV4_SRC_ADDR"])),
113-
] for data in flows_data],
114-
}
115-
else:
116-
log.error(f"Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")
117-
continue
118-
119-
line = json.dumps(entry).encode() + b'\n'
120-
fp.write(line)
121-
log.debug(f"Wrote seq [{export.header.sequence}] from client [{client_ip}], ts [{ts}], n flows: [{len(flows_data)}]")
122-
line = None
12344
except Exception as ex:
12445
log.exception(f"Exception: {str(ex)}")
12546

12647

127-
12848
if __name__ == "__main__":
12949

13050
NAMED_PIPE_FILENAME = os.environ.get('NAMED_PIPE_FILENAME', None)
@@ -140,7 +60,7 @@ def process_netflow(netflow_port, named_pipe_filename):
14060
log.info(f"Listening for NetFlow traffic on UDP port {NETFLOW_PORT}")
14161

14262
try:
143-
process_netflow(NETFLOW_PORT, NAMED_PIPE_FILENAME)
63+
pass_netflow_data(NETFLOW_PORT, NAMED_PIPE_FILENAME)
14464
except KeyboardInterrupt:
14565
log.info("KeyboardInterrupt -> exit")
14666
pass

netflowwriter.py

+98-21
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import argparse
2+
import base64
23
import gzip
34
import json
45
import logging
56
import os
67
import errno
78
import sys
9+
import socket
10+
import struct
811
import time
912
from collections import defaultdict
1013
from datetime import datetime
@@ -14,6 +17,13 @@
1417

1518
from lookup import PROTOCOLS
1619
from dbutils import migrate_if_needed, get_db_cursor, DB_PREFIX
20+
from lookup import DIRECTION_INGRESS
21+
22+
23+
# python-netflow-v9-softflowd expects main.py to be the main entrypoint, but we only need
24+
# parse_packet():
25+
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/pynetflow')
26+
from pynetflow.netflow import parse_packet, UnknownNetFlowVersion, TemplateNotRecognized
1727

1828

1929
IS_DEBUG = os.environ.get('DEBUG', 'false') in ['true', 'yes', '1']
@@ -26,13 +36,17 @@
2636
log = logging.getLogger("{}.{}".format(__name__, "writer"))
2737

2838

39+
# Amount of time to wait before dropping an undecodable ExportPacket
40+
PACKET_TIMEOUT = 60 * 60
41+
2942
def process_named_pipe(named_pipe_filename):
3043
try:
3144
os.mkfifo(named_pipe_filename)
3245
except OSError as ex:
3346
if ex.errno != errno.EEXIST:
3447
raise
3548

49+
templates = {}
3650
while True:
3751
with open(named_pipe_filename, "rb") as fp:
3852
log.info(f"Opened named pipe {named_pipe_filename}")
@@ -42,12 +56,28 @@ def process_named_pipe(named_pipe_filename):
4256
break
4357

4458
try:
45-
write_record(json.loads(line))
59+
data_b64, ts, client = json.loads(line)
60+
data = base64.b64decode(data_b64)
61+
62+
try:
63+
export = parse_packet(data, templates)
64+
write_record(ts, client, export)
65+
except UnknownNetFlowVersion:
66+
log.warning("Unknown NetFlow version")
67+
continue
68+
except TemplateNotRecognized:
69+
log.warning("Failed to decode a v9 ExportPacket, template not "
70+
"recognized (if this happens at the start, it's ok)")
71+
continue
72+
4673
except Exception as ex:
4774
log.exception("Error writing line, skipping...")
4875

4976

50-
def write_record(j):
77+
last_record_seqs = {}
78+
79+
80+
def write_record(ts, client, export):
5181
# {
5282
# "DST_AS": 0,
5383
# "SRC_AS": 0,
@@ -73,30 +103,77 @@ def write_record(j):
73103
# }
74104
# https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622
75105

106+
client_ip, _ = client
107+
108+
# check for missing records:
109+
last_record_seq = last_record_seqs.get(client_ip)
110+
if last_record_seq is None:
111+
log.warning(f"Last record sequence number is not known, starting with {export.header.sequence}")
112+
elif export.header.sequence != last_record_seq + 1:
113+
log.error(f"Sequence number ({export.header.sequence}) does not follow ({last_record_seq}), some records might have been skipped")
114+
last_record_seqs[client_ip] = export.header.sequence
115+
76116
with get_db_cursor() as c:
77117
# first save the flow record:
78-
ts = j['ts']
79-
log.debug(f"Received record [{j['seq']}]: {datetime.utcfromtimestamp(ts)} from {j['client']}")
80-
c.execute(f"INSERT INTO {DB_PREFIX}records (ts, client_ip) VALUES (%s, %s) RETURNING seq;", (ts, j['client'],))
118+
log.debug(f"Received record [{export.header.sequence}]: {datetime.utcfromtimestamp(ts)} from {client_ip}")
119+
c.execute(f"INSERT INTO {DB_PREFIX}records (ts, client_ip) VALUES (%s, %s) RETURNING seq;", (ts, client_ip,))
81120
record_db_seq = c.fetchone()[0]
82121

83122
# then save each of the flows within the record, but use execute_values() to perform bulk insert:
84-
def _get_data(record_db_seq, flows):
85-
for flow in flows:
86-
yield (
87-
record_db_seq,
88-
*flow,
89-
# flow.get('IN_BYTES'),
90-
# flow.get('PROTOCOL'),
91-
# flow.get('DIRECTION'),
92-
# flow.get('L4_DST_PORT'),
93-
# flow.get('L4_SRC_PORT'),
94-
# flow.get('INPUT_SNMP'),
95-
# flow.get('OUTPUT_SNMP'),
96-
# flow.get('IPV4_DST_ADDR'),
97-
# flow.get('IPV4_SRC_ADDR'),
98-
)
99-
data_iterator = _get_data(record_db_seq, j['flows'])
123+
def _get_data(netflow_version, record_db_seq, flows):
124+
if netflow_version == 9:
125+
for f in flows:
126+
yield (
127+
record_db_seq,
128+
# "IN_BYTES":
129+
f.data["IN_BYTES"],
130+
# "PROTOCOL":
131+
f.data["PROTOCOL"],
132+
# "DIRECTION":
133+
f.data["DIRECTION"],
134+
# "L4_DST_PORT":
135+
f.data["L4_DST_PORT"],
136+
# "L4_SRC_PORT":
137+
f.data["L4_SRC_PORT"],
138+
# "INPUT_SNMP":
139+
f.data["INPUT_SNMP"],
140+
# "OUTPUT_SNMP":
141+
f.data["OUTPUT_SNMP"],
142+
# "IPV4_DST_ADDR":
143+
f.data["IPV4_DST_ADDR"],
144+
# "IPV4_SRC_ADDR":
145+
f.data["IPV4_SRC_ADDR"],
146+
)
147+
elif netflow_version == 5:
148+
for f in flows:
149+
yield (
150+
record_db_seq,
151+
# "IN_BYTES":
152+
f.data["IN_OCTETS"],
153+
# "PROTOCOL":
154+
f.data["PROTO"],
155+
# "DIRECTION":
156+
DIRECTION_INGRESS,
157+
# "L4_DST_PORT":
158+
f.data["DST_PORT"],
159+
# "L4_SRC_PORT":
160+
f.data["SRC_PORT"],
161+
# "INPUT_SNMP":
162+
f.data["INPUT"],
163+
# "OUTPUT_SNMP":
164+
f.data["OUTPUT"],
165+
# netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
166+
# them back to bytes and transform them to strings:
167+
# "IPV4_DST_ADDR":
168+
socket.inet_ntoa(struct.pack('!I', f.data["IPV4_DST_ADDR"])),
169+
# "IPV4_SRC_ADDR":
170+
socket.inet_ntoa(struct.pack('!I', f.data["IPV4_SRC_ADDR"])),
171+
)
172+
else:
173+
log.error(f"Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")
174+
return
175+
176+
data_iterator = _get_data(export.header.version, record_db_seq, export.flows)
100177
psycopg2.extras.execute_values(
101178
c,
102179
f"INSERT INTO {DB_PREFIX}flows (record, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s",

0 commit comments

Comments
 (0)