Skip to content

Commit 9cd7bfe

Browse files
committed
Fix: INPUT_SNMP and OUTPUT_SNPM can (and sometimes do) take more than 2 bytes with V9 netflow
1 parent e8a26b3 commit 9cd7bfe

File tree

2 files changed

+36
-13
lines changed

2 files changed

+36
-13
lines changed

dbutils.py

+25-2
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,6 @@ def migration_step_6():
217217
""" Create a new table for flows using TimescaleDB.
218218
- we use a different name to avoid converting old data; the conversion is too slow to be done on existing
219219
installations without manual work. For those we know about we can perform this operation manually.
220-
- table is *not* unlogged (it is using WAL). If insert performance becomes an issue, we can change this later
221-
using: "ALTER TABLE netflow_flows2 SET UNLOGGED;" (but at the moment we don't know that yet)
222220
"""
223221
with get_db_cursor() as c:
224222
c.execute(f"""
@@ -238,3 +236,28 @@ def migration_step_6():
238236
""")
239237
c.execute(f"SELECT create_hypertable('{DB_PREFIX}flows2', 'ts', chunk_time_interval => INTERVAL '1 hour');")
240238
c.execute(f'CREATE TABLE {DB_PREFIX}bot_jobs2 (job_id TEXT NOT NULL PRIMARY KEY, last_used_ts TIMESTAMP NOT NULL);')
239+
240+
def migration_step_7():
241+
""" Change input_snmp/output_snmp from SMALLINT to BIGINT (it is 2 bytes by default, but is sometimes more)
242+
- since this is temporary data anyway, we drop and recreate the table
243+
- table is *not* unlogged (it is using WAL). If insert performance becomes an issue, we can change this later
244+
using: "ALTER TABLE netflow_flows2 SET UNLOGGED;" (but at the moment we don't know that yet)
245+
"""
246+
with get_db_cursor() as c:
247+
c.execute(f"DROP TABLE {DB_PREFIX}flows2;")
248+
c.execute(f"""
249+
CREATE TABLE {DB_PREFIX}flows2 (
250+
ts TIMESTAMP NOT NULL,
251+
client_ip INET NOT NULL,
252+
in_bytes BIGINT NOT NULL,
253+
protocol SMALLINT NOT NULL,
254+
direction SMALLINT NOT NULL,
255+
l4_dst_port INTEGER NOT NULL,
256+
l4_src_port INTEGER NOT NULL,
257+
input_snmp BIGINT NOT NULL,
258+
output_snmp BIGINT NOT NULL,
259+
ipvX_dst_addr INET NOT NULL,
260+
ipvX_src_addr INET NOT NULL
261+
);
262+
""")
263+
c.execute(f"SELECT create_hypertable('{DB_PREFIX}flows2', 'ts', chunk_time_interval => INTERVAL '1 hour');")

netflowwriter.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ def _pgwriter_init():
6363
return pg_writer
6464

6565

66-
def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, address_family, IPVx_DST_ADDR, IPVx_SRC_ADDR):
67-
buf = struct.pack('!Hiqi4s4siQiHiHiIiIiHiH',
66+
def _pgwriter_encode(ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, address_family, IPVx_DST_ADDR, IPVx_SRC_ADDR):
67+
buf = struct.pack('!Hiqi4s4siQiHiHiIiIiQiQ',
6868
11, # number of columns
6969
8, int(1000000 * (ts - PG_EPOCH_TIMESTAMP)), # https://doxygen.postgresql.org/backend_2utils_2adt_2timestamp_8c_source.html#l00228
7070
8, IPV4_ADDRESS_PREFIX, socket.inet_aton(client_ip), # 4 bytes prefix + 4 bytes IP
@@ -73,8 +73,8 @@ def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_D
7373
2, DIRECTION,
7474
4, L4_DST_PORT,
7575
4, L4_SRC_PORT,
76-
2, INPUT_SNMP,
77-
2, OUTPUT_SNMP,
76+
8, INPUT_SNMP,
77+
8, OUTPUT_SNMP,
7878
)
7979
if address_family != socket.AF_INET6:
8080
buf2 = struct.pack('!i4s4si4s4s',
@@ -86,7 +86,7 @@ def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_D
8686
4 + 16, IPV6_ADDRESS_PREFIX, IPVx_DST_ADDR,
8787
4 + 16, IPV6_ADDRESS_PREFIX, IPVx_SRC_ADDR,
8888
)
89-
pgwriter.write(buf + buf2)
89+
return buf + buf2
9090

9191

9292
def _pgwriter_finish(pgwriter):
@@ -212,7 +212,7 @@ def _get_data(buffer):
212212
dst = socket.inet_aton(f.data["IPV4_DST_ADDR"])
213213
src = socket.inet_aton(f.data["IPV4_SRC_ADDR"])
214214

215-
yield (
215+
yield _pgwriter_encode(
216216
ts,
217217
client_ip,
218218
f.data["IN_BYTES"],
@@ -226,12 +226,12 @@ def _get_data(buffer):
226226
dst,
227227
src,
228228
)
229-
except KeyError:
229+
except:
230230
log.exception(f"[{client_ip}] Error decoding v9 flow. Contents: {repr(f.data)}")
231231
elif netflow_version == 5:
232232
for f in flows:
233233
try:
234-
yield (
234+
yield _pgwriter_encode(
235235
ts,
236236
client_ip,
237237
# "IN_BYTES":
@@ -257,14 +257,14 @@ def _get_data(buffer):
257257
# "IPV4_SRC_ADDR":
258258
struct.pack('!I', f.data["IPV4_SRC_ADDR"]),
259259
)
260-
except KeyError:
260+
except:
261261
log.exception(f"[{client_ip}] Error decoding v5 flow. Contents: {repr(f.data)}")
262262
else:
263263
log.error(f"[{client_ip}] Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")
264264

265265
pgwriter = _pgwriter_init()
266-
for data in _get_data(buffer):
267-
_pgwriter_write(pgwriter, *data)
266+
for encoded_data in _get_data(buffer):
267+
pgwriter.write(encoded_data)
268268
_pgwriter_finish(pgwriter)
269269

270270

0 commit comments

Comments
 (0)