Skip to content

Commit f61ee7b

Browse files
committed
Nicer DB connection loss handling
1 parent 4bae627 commit f61ee7b

File tree

3 files changed

+184
-122
lines changed

3 files changed

+184
-122
lines changed

dbutils.py

+55-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import sys
66
import copy
77
import json
8+
import time
89

910
import psycopg2
1011
from psycopg2.pool import ThreadedConnectionPool
@@ -20,6 +21,10 @@
2021
log = logging.getLogger("{}.{}".format(__name__, "dbutils"))
2122

2223

24+
class DBConnectionError(Exception):
25+
pass
26+
27+
2328
db_pool = None
2429
DB_PREFIX = 'netflow_'
2530
LEAVE_N_PAST_DAYS = 5 # 5 days
@@ -31,26 +36,48 @@ def get_db_connection():
3136
global db_pool
3237
if db_pool is None:
3338
db_connect()
34-
3539
try:
40+
if db_pool is None:
41+
# connecting to DB failed
42+
raise DBConnectionError()
3643
conn = db_pool.getconn()
44+
if conn is None:
45+
# pool wasn't able to return a valid connection
46+
raise DBConnectionError()
47+
3748
conn.autocommit = True
3849
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
3950
yield conn
51+
except (DBConnectionError, psycopg2.OperationalError):
52+
db_pool = None # make sure that we reconnect next time
53+
yield None
4054
finally:
41-
db_pool.putconn(conn)
55+
if db_pool is not None:
56+
db_pool.putconn(conn)
4257

4358

4459
@contextmanager
45-
def get_db_cursor(commit=False):
60+
def get_db_cursor():
4661
with get_db_connection() as connection:
62+
if connection is None:
63+
yield InvalidDBCursor()
64+
return
65+
4766
cursor = connection.cursor()
4867
try:
4968
yield cursor
50-
if commit:
51-
connection.commit()
5269
finally:
53-
cursor.close()
70+
if not isinstance(cursor, InvalidDBCursor):
71+
cursor.close()
72+
73+
74+
# In python it is not possible to throw an exception within the __enter__ phase of a with statement:
75+
# https://www.python.org/dev/peps/pep-0377/
76+
# If we want to handle DB connection failures gracefully we return a cursor which will throw
77+
# DBConnectionError exception whenever it is accessed.
78+
class InvalidDBCursor(object):
79+
def __getattr__(self, attr):
80+
raise DBConnectionError()
5481

5582

5683
def db_connect():
@@ -73,7 +100,28 @@ def db_connect():
73100
connect_timeout=connect_timeout)
74101
except:
75102
db_pool = None
76-
log.error("DB connection failed")
103+
log.warning("DB connection failed")
104+
105+
106+
def db_disconnect():
107+
global db_pool
108+
if not db_pool:
109+
return
110+
db_pool.closeall()
111+
db_pool = None
112+
log.info("DB connection is closed")
113+
114+
115+
def initial_wait_for_db():
116+
while True:
117+
with get_db_cursor() as c:
118+
try:
119+
c.execute('SELECT 1;')
120+
res = c.fetchone()
121+
return
122+
except DBConnectionError:
123+
log.info("DB connection failed - waiting for DB to become available, sleeping 5s")
124+
time.sleep(5)
77125

78126

79127
###########################

netflowbot.py

+119-110
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import requests
1515

1616
from grafoleancollector import Collector, send_results_to_grafolean
17-
from dbutils import get_db_cursor, DB_PREFIX, LEAVE_N_PAST_DAYS
17+
from dbutils import get_db_cursor, DB_PREFIX, LEAVE_N_PAST_DAYS, DBConnectionError
1818
from lookup import PROTOCOLS, DIRECTION_INGRESS, DIRECTION_EGRESS
1919

2020
logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
@@ -78,57 +78,62 @@ def _save_current_max_ts(job_id, max_ts):
7878

7979
def job_maint_remove_old_data(*args, **kwargs):
8080
log.info("MAINT: Maintenance started - removing old data")
81-
with get_db_cursor() as c:
82-
c.execute(f"SELECT drop_chunks(INTERVAL '{LEAVE_N_PAST_DAYS} days', '{DB_PREFIX}flows2');")
83-
log.info("MAINT: Maintenance finished (removing old data).")
81+
try:
82+
with get_db_cursor() as c:
83+
c.execute(f"SELECT drop_chunks(INTERVAL '{LEAVE_N_PAST_DAYS} days', '{DB_PREFIX}flows2');")
84+
log.info("MAINT: Maintenance finished (removing old data).")
85+
except DBConnectionError:
86+
log.error("MAINT: Maintenance job (removing old data) failed due to DB connection issues.")
8487

8588

8689
def job_maint_suggest_entities(*args, **job_params):
8790
log.info("MAINT: Maintenance started - making suggestions for device entities")
88-
89-
backend_url = job_params['backend_url']
90-
bot_token = job_params['bot_token']
91-
requests_session = requests.Session()
92-
93-
# for each account, add any new netflow exporters (entities) that might not exist yet:
94-
# find all the accounts we have access to:
95-
r = requests_session.get(f'{backend_url}/accounts/?b={bot_token}')
96-
if r.status_code != 200:
97-
raise Exception("Invalid bot token or network error, got status {} while retrieving {}/accounts".format(r.status_code, backend_url))
98-
j = r.json()
99-
accounts_ids = [a["id"] for a in j["list"]]
100-
101-
# find all entities for each of the accounts:
102-
for account_id in accounts_ids:
103-
r = requests_session.get('{}/accounts/{}/entities/?b={}'.format(backend_url, account_id, bot_token))
91+
try:
92+
backend_url = job_params['backend_url']
93+
bot_token = job_params['bot_token']
94+
requests_session = requests.Session()
95+
96+
# for each account, add any new netflow exporters (entities) that might not exist yet:
97+
# find all the accounts we have access to:
98+
r = requests_session.get(f'{backend_url}/accounts/?b={bot_token}')
10499
if r.status_code != 200:
105-
raise Exception("Network error, got status {} while retrieving {}/accounts/{}/entities".format(r.status_code, backend_url, account_id))
100+
raise Exception("Invalid bot token or network error, got status {} while retrieving {}/accounts".format(r.status_code, backend_url))
106101
j = r.json()
107-
entities_ips = [e["details"]["ipv4"] for e in j["list"] if e["entity_type"] == "device"]
108-
109-
with get_db_cursor() as c:
110-
# Ideally, we would just run "select distinct(client_ip) from netflow_flows;", but unfortunately
111-
# I was unable to find a performant way to run this query. So we are using netflow_exporters:
112-
c.execute(f"SELECT ip FROM {DB_PREFIX}exporters;")
113-
for client_ip, in c.fetchall():
114-
if client_ip in entities_ips:
115-
log.info(f"MAINT: We already know exporter [{client_ip}]")
116-
continue
117-
118-
log.info(f"MAINT: Unknown exporter found, inserting [{client_ip}] to account [{account_id}]")
119-
url = f'{backend_url}/accounts/{account_id}/entities/?b={bot_token}'
120-
params = {
121-
"name": f'{client_ip} (NetFlow exporter)',
122-
"entity_type": "device",
123-
"details": {
124-
"ipv4": client_ip,
125-
},
126-
}
127-
r = requests_session.post(url, json=params)
128-
if r.status_code > 299:
129-
raise Exception("Network error, got status {} while posting to {}/accounts/{}/entities: {}".format(r.status_code, backend_url, account_id, r.content))
130-
131-
log.info("MAINT: Maintenance finished (device entities suggestions).")
102+
accounts_ids = [a["id"] for a in j["list"]]
103+
104+
# find all entities for each of the accounts:
105+
for account_id in accounts_ids:
106+
r = requests_session.get('{}/accounts/{}/entities/?b={}'.format(backend_url, account_id, bot_token))
107+
if r.status_code != 200:
108+
raise Exception("Network error, got status {} while retrieving {}/accounts/{}/entities".format(r.status_code, backend_url, account_id))
109+
j = r.json()
110+
entities_ips = [e["details"]["ipv4"] for e in j["list"] if e["entity_type"] == "device"]
111+
112+
with get_db_cursor() as c:
113+
# Ideally, we would just run "select distinct(client_ip) from netflow_flows;", but unfortunately
114+
# I was unable to find a performant way to run this query. So we are using netflow_exporters:
115+
c.execute(f"SELECT ip FROM {DB_PREFIX}exporters;")
116+
for client_ip, in c.fetchall():
117+
if client_ip in entities_ips:
118+
log.info(f"MAINT: We already know exporter [{client_ip}]")
119+
continue
120+
121+
log.info(f"MAINT: Unknown exporter found, inserting [{client_ip}] to account [{account_id}]")
122+
url = f'{backend_url}/accounts/{account_id}/entities/?b={bot_token}'
123+
params = {
124+
"name": f'{client_ip} (NetFlow exporter)',
125+
"entity_type": "device",
126+
"details": {
127+
"ipv4": client_ip,
128+
},
129+
}
130+
r = requests_session.post(url, json=params)
131+
if r.status_code > 299:
132+
raise Exception("Network error, got status {} while posting to {}/accounts/{}/entities: {}".format(r.status_code, backend_url, account_id, r.content))
133+
134+
log.info("MAINT: Maintenance finished (device entities suggestions).")
135+
except DBConnectionError:
136+
log.error("MAINT: Maintenance job (device entities suggestions) failed due to DB connection issues.")
132137

133138

134139
class NetFlowBot(Collector):
@@ -163,11 +168,11 @@ def jobs(self):
163168
"bot_token": self.bot_token,
164169
}
165170
start_ts = int(time.time()) + first_run_ts - interval # start_ts must be in the past
166-
yield job_id, [interval], NetFlowBot.perform_account_aggr_job, job_params, start_ts
171+
yield job_id, [interval], NetFlowBot.job_perform_account_aggr, job_params, start_ts
167172

168173

169174
@staticmethod
170-
def perform_account_aggr_job(*args, **job_params):
175+
def job_perform_account_aggr(*args, **job_params):
171176
# \d netflow_flows2
172177
# Column | Type | Description
173178
# ---------------+---------------+------------
@@ -189,68 +194,72 @@ def perform_account_aggr_job(*args, **job_params):
189194
entities = [(entity_info["entity_id"], entity_info["details"]["ipv4"],) for entity_info in job_params["entities_infos"]]
190195
log.info(f"Starting {interval_label} aggregation job for account {account_id}...")
191196

192-
last_used_ts = _get_last_used_ts(job_id)
193-
max_ts = _get_current_max_ts()
194-
if max_ts is None or last_used_ts == max_ts:
195-
log.info(f"No netflow data found for job {job_id}, skipping.")
196-
return
197-
_save_current_max_ts(job_id, max_ts)
198-
if last_used_ts is None:
199-
log.info(f"Counter was not yet initialized for job {job_id}, skipping.")
200-
return
201-
202-
# WATCH OUT! This hack changes all of the units from Bps to B! (should be cleaned up)
203-
#time_between = float(max_ts - last_used_ts)
204-
time_between = 1 # we want to use bytes as unit, not bytes per second
205-
206-
# traffic:
207-
values = []
208-
sum_traffic_egress = 0
209-
sum_traffic_ingress = 0
210-
for entity_id, entity_ip in entities:
211-
v, s = NetFlowBot.get_traffic_for_entity(interval_label, last_used_ts, max_ts, time_between, DIRECTION_EGRESS, entity_id, entity_ip)
212-
values.extend(v)
213-
sum_traffic_egress += s
214-
v, s = NetFlowBot.get_traffic_for_entity(interval_label, last_used_ts, max_ts, time_between, DIRECTION_INGRESS, entity_id, entity_ip)
215-
values.extend(v)
216-
sum_traffic_ingress += s
217-
218-
# cumulative sum for the whole account:
219-
output_path = NetFlowBot.construct_output_path_prefix(interval_label, DIRECTION_EGRESS, entity_id=None, interface=None)
220-
values.append({
221-
'p': output_path,
222-
'v': sum_traffic_egress / time_between,
223-
})
224-
output_path = NetFlowBot.construct_output_path_prefix(interval_label, DIRECTION_INGRESS, entity_id=None, interface=None)
225-
values.append({
226-
'p': output_path,
227-
'v': sum_traffic_ingress / time_between,
228-
})
229-
230-
# top N IPs:
231-
for entity_id, entity_ip in entities:
232-
for direction in [DIRECTION_EGRESS, DIRECTION_INGRESS]:
233-
values.extend(NetFlowBot.get_top_N_IPs_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
234-
values.extend(NetFlowBot.get_top_N_IPs_for_entity_interfaces(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
235-
values.extend(NetFlowBot.get_top_N_protocols_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
236-
values.extend(NetFlowBot.get_top_N_protocols_for_entity_interfaces(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
237-
values.extend(NetFlowBot.get_top_N_connections_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
238-
239-
if not values:
240-
log.warning("No values found to be sent to Grafolean")
241-
return
242-
243-
# the values are Decimals because they come from BIGINT column, so we must transform
244-
# them to strings before encoding to JSON:
245-
values = [{'p': v['p'], 'v': str(v['v'])} for v in values]
246-
247-
# send the data to Grafolean:
248-
send_results_to_grafolean(
249-
job_params['backend_url'],
250-
job_params['bot_token'],
251-
account_id,
252-
values,
253-
)
197+
try:
198+
last_used_ts = _get_last_used_ts(job_id)
199+
max_ts = _get_current_max_ts()
200+
if max_ts is None or last_used_ts == max_ts:
201+
log.info(f"No netflow data found for job {job_id}, skipping.")
202+
return
203+
_save_current_max_ts(job_id, max_ts)
204+
if last_used_ts is None:
205+
log.info(f"Counter was not yet initialized for job {job_id}, skipping.")
206+
return
207+
208+
# WATCH OUT! This hack changes all of the units from Bps to B! (should be cleaned up)
209+
#time_between = float(max_ts - last_used_ts)
210+
time_between = 1 # we want to use bytes as unit, not bytes per second
211+
212+
# traffic:
213+
values = []
214+
sum_traffic_egress = 0
215+
sum_traffic_ingress = 0
216+
for entity_id, entity_ip in entities:
217+
v, s = NetFlowBot.get_traffic_for_entity(interval_label, last_used_ts, max_ts, time_between, DIRECTION_EGRESS, entity_id, entity_ip)
218+
values.extend(v)
219+
sum_traffic_egress += s
220+
v, s = NetFlowBot.get_traffic_for_entity(interval_label, last_used_ts, max_ts, time_between, DIRECTION_INGRESS, entity_id, entity_ip)
221+
values.extend(v)
222+
sum_traffic_ingress += s
223+
224+
# cumulative sum for the whole account:
225+
output_path = NetFlowBot.construct_output_path_prefix(interval_label, DIRECTION_EGRESS, entity_id=None, interface=None)
226+
values.append({
227+
'p': output_path,
228+
'v': sum_traffic_egress / time_between,
229+
})
230+
output_path = NetFlowBot.construct_output_path_prefix(interval_label, DIRECTION_INGRESS, entity_id=None, interface=None)
231+
values.append({
232+
'p': output_path,
233+
'v': sum_traffic_ingress / time_between,
234+
})
235+
236+
# top N IPs:
237+
for entity_id, entity_ip in entities:
238+
for direction in [DIRECTION_EGRESS, DIRECTION_INGRESS]:
239+
values.extend(NetFlowBot.get_top_N_IPs_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
240+
values.extend(NetFlowBot.get_top_N_IPs_for_entity_interfaces(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
241+
values.extend(NetFlowBot.get_top_N_protocols_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
242+
values.extend(NetFlowBot.get_top_N_protocols_for_entity_interfaces(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
243+
values.extend(NetFlowBot.get_top_N_connections_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
244+
245+
if not values:
246+
log.warning("No values found to be sent to Grafolean")
247+
return
248+
249+
# the values are Decimals because they come from BIGINT column, so we must transform
250+
# them to strings before encoding to JSON:
251+
values = [{'p': v['p'], 'v': str(v['v'])} for v in values]
252+
253+
# send the data to Grafolean:
254+
send_results_to_grafolean(
255+
job_params['backend_url'],
256+
job_params['bot_token'],
257+
account_id,
258+
values,
259+
)
260+
261+
except DBConnectionError:
262+
log.error(f"{interval_label} aggregation job for account {account_id} failed due to DB connection issues.")
254263

255264

256265
@staticmethod

0 commit comments

Comments
 (0)