Skip to content

Commit 51e39d8

Browse files
author
Dan
committed
Added template match check when applying template to metric path for templates without filters. Corrected tests that were wrongly expecting data from branch nodes. Updated finder to fail fast on invalid node requests. Updated finder to not query for paths without a template match
1 parent 0350219 commit 51e39d8

File tree

4 files changed

+70
-42
lines changed

4 files changed

+70
-42
lines changed

influxgraph/classes/finder.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
from ..utils import NullStatsd, calculate_interval, read_influxdb_values, \
4545
get_aggregation_func, gen_memcache_key, gen_memcache_pattern_key, \
4646
Query, get_retention_policy, _compile_aggregation_patterns, parse_series
47-
from ..templates import parse_influxdb_graphite_templates, apply_template
47+
from ..templates import parse_influxdb_graphite_templates, apply_template, \
48+
TemplateMatchError
4849
from .reader import InfluxDBReader
4950
from .leaf import InfluxDBLeafNode
5051
try:
@@ -356,8 +357,11 @@ def _get_all_template_values(self, paths):
356357
for path in paths:
357358
if _filter and not _filter.match(path):
358359
continue
359-
measurement, tags, field = apply_template(
360-
path.split('.'), template, default_tags, separator)
360+
try:
361+
measurement, tags, field = apply_template(
362+
path.split('.'), template, default_tags, separator)
363+
except TemplateMatchError:
364+
continue
361365
if measurement not in _measurements:
362366
_measurements.append(measurement)
363367
for tag in tags:
@@ -380,6 +384,8 @@ def _get_all_template_values(self, paths):
380384

381385
def _gen_query_values_from_templates(self, paths, retention):
382386
_query_data, path_measurements = self._get_all_template_values(paths)
387+
if len(_query_data) == 0:
388+
return
383389
query_data = deque()
384390
for (_measurements, _tags, _fields) in _query_data:
385391
measurements = ', '.join(
@@ -410,7 +416,10 @@ def _gen_influxdb_query(self, start_time, end_time, paths, interval):
410416
memcache_key = gen_memcache_key(start_time, end_time, aggregation_func,
411417
paths)
412418
queries = deque()
413-
query_data, path_measurements = self._gen_query_values(paths, retention)
419+
try:
420+
query_data, path_measurements = self._gen_query_values(paths, retention)
421+
except TypeError:
422+
return
414423
for (measurement, tags, _fields) in query_data:
415424
if not _fields:
416425
_fields = ['value']
@@ -434,6 +443,12 @@ def _gen_influxdb_query(self, start_time, end_time, paths, interval):
434443
query = ';'.join(queries)
435444
return query, memcache_key, path_measurements
436445

446+
def _make_empty_multi_fetch_result(self, time_info, paths):
447+
data = {}
448+
for key in paths:
449+
data[key] = []
450+
return time_info, data
451+
437452
def fetch_multi(self, nodes, start_time, end_time):
438453
"""Fetch datapoints for all series between start and end times
439454
@@ -446,9 +461,15 @@ def fetch_multi(self, nodes, start_time, end_time):
446461
time_info = start_time, end_time, interval
447462
if not nodes:
448463
return time_info, {}
449-
paths = [n.path for n in nodes]
450-
query, memcache_key, path_measurements = self._gen_influxdb_query(
451-
start_time, end_time, paths, interval)
464+
paths = [n.path for n in nodes if n.is_leaf]
465+
if not len(paths) > 0:
466+
return self._make_empty_multi_fetch_result(
467+
time_info, [n.path for n in nodes])
468+
try:
469+
query, memcache_key, path_measurements = self._gen_influxdb_query(
470+
start_time, end_time, paths, interval)
471+
except TypeError:
472+
return self._make_empty_multi_fetch_result(time_info, paths)
452473
data = self.memcache.get(memcache_key) if self.memcache else None
453474
if data:
454475
logger.debug("Found cached data for key %s", memcache_key)

influxgraph/templates.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ class InvalidTemplateError(Exception):
4949
pass
5050

5151

52+
class TemplateMatchError(Exception):
53+
"""Raised on errors matching template with path"""
54+
pass
55+
56+
5257
# Function as per Python official documentation
5358
def heapsort(iterable):
5459
h = []
@@ -118,10 +123,14 @@ def _template_sanity_check(template):
118123
template)
119124

120125
def apply_template(metric_path_parts, template, default_tags, separator='.'):
121-
"""Apply template to metric path parts and return measurements, tags and field"""
126+
"""Apply template to metric path parts and return measurements, tags and field
127+
128+
:raises: mod:`TemplateMatchError` on error matching template"""
122129
measurement = []
123130
tags = {}
124131
field = ""
132+
if len(metric_path_parts) < len(template.keys()):
133+
raise TemplateMatchError()
125134
for i, tag in template.items():
126135
if i >= len(metric_path_parts):
127136
continue

tests/test_influxdb_integration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ def test_multi_fetch_non_existant_series(self):
364364
database=self.db_name), path1, influxgraph.utils.NullStatsd())
365365
reader2 = influxgraph.InfluxdbReader(InfluxDBClient(
366366
database=self.db_name), path2, influxgraph.utils.NullStatsd())
367-
nodes = [reader1, reader2]
367+
nodes = [influxgraph.classes.leaf.InfluxDBLeafNode(path1, reader1),
368+
influxgraph.classes.leaf.InfluxDBLeafNode(path2, reader2)]
368369
time_info, data = self.finder.fetch_multi(nodes,
369370
int(self.start_time.strftime("%s")),
370371
int(self.end_time.strftime("%s")))

tests/test_influxdb_templates_integration.py

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -353,27 +353,27 @@ def test_template_multi_tag_no_field(self):
353353
"*.interface.* host.measurement.device.metric",
354354
]
355355
mem_measurement = 'memory'
356-
tags = {'host': 'my_host',
357-
'metric': 'some_metric',
356+
mem_tags = {'host': 'my_host',
357+
'metric': 'mem_metric',
358358
}
359359
fields = {'value': 1,
360360
}
361-
self.write_data([mem_measurement], tags, fields)
361+
self.write_data([mem_measurement], mem_tags, fields)
362362
int_measurement = 'interface'
363-
tags = {'host': 'my_host',
364-
'device': 'dev',
365-
'metric': 'some_metric',
366-
}
363+
int_tags = {'host': 'my_host',
364+
'device': 'dev',
365+
'metric': 'int_metric',
366+
}
367367
fields = {'value': 1,
368368
}
369-
self.write_data([int_measurement], tags, fields)
369+
self.write_data([int_measurement], int_tags, fields)
370370
self.config['influxdb']['templates'] = templates
371371
self.finder = influxgraph.InfluxDBFinder(self.config)
372372
##
373373
query = Query('%s.%s.*' % (
374-
tags['host'], mem_measurement,))
374+
mem_tags['host'], mem_measurement,))
375375
nodes = list(self.finder.find_nodes(query))
376-
self.assertEqual(sorted([n.name for n in nodes]), [tags['metric']])
376+
self.assertEqual(sorted([n.name for n in nodes]), [mem_tags['metric']])
377377
time_info, data = self.finder.fetch_multi(
378378
nodes, int(self.start_time.strftime("%s")),
379379
int(self.end_time.strftime("%s")))
@@ -383,29 +383,22 @@ def test_template_multi_tag_no_field(self):
383383
self.num_datapoints, nodes[0].path, len(datapoints),))
384384
##
385385
query = Query('%s.%s.*' % (
386-
tags['host'], int_measurement,))
386+
int_tags['host'], int_measurement,))
387387
nodes = list(self.finder.find_nodes(query))
388-
self.assertEqual(sorted([n.name for n in nodes]), [tags['device']])
388+
self.assertEqual(sorted([n.name for n in nodes]), [int_tags['device']])
389389
time_info, data = self.finder.fetch_multi(
390390
nodes, int(self.start_time.strftime("%s")),
391391
int(self.end_time.strftime("%s")))
392392
for metric in [n.path for n in nodes]:
393-
datapoints = [v for v in data[metric] if v]
394-
self.assertTrue(len(datapoints) == self.num_datapoints,
395-
msg="Expected %s datapoints for %s - got %s" % (
396-
self.num_datapoints, metric, len(datapoints),))
393+
self.assertTrue(len(data[metric]) == 0)
397394
query = Query('%s.%s.*.*' % (
398-
tags['host'], int_measurement,))
395+
int_tags['host'], int_measurement,))
399396
nodes = list(self.finder.find_nodes(query))
400-
self.assertEqual(sorted([n.name for n in nodes]), [tags['metric']])
397+
self.assertEqual(sorted([n.name for n in nodes]), [int_tags['metric']])
401398
time_info, data = self.finder.fetch_multi(
402399
nodes, int(self.start_time.strftime("%s")),
403400
int(self.end_time.strftime("%s")))
404-
for metric in [n.path for n in nodes]:
405-
datapoints = [v for v in data[metric] if v]
406-
self.assertTrue(len(datapoints) == self.num_datapoints,
407-
msg="Expected %s datapoints for %s - got %s" % (
408-
self.num_datapoints, metric, len(datapoints),))
401+
self._test_data_in_nodes(nodes)
409402

410403
def test_template_multiple_tags(self):
411404
self.client.drop_database(self.db_name)
@@ -504,18 +497,19 @@ def test_data_with_fields(self):
504497
measurements = ['cpu-0', 'cpu-1', 'cpu-2', 'cpu-3']
505498
fields = {'load': 1, 'idle': 1,
506499
'usage': 1, 'user': 1,
507-
'user.io': 1, 'idle.io': 1,
508-
'load.io': 1, 'usage.io': 1,
509-
}
500+
'io.usr': 1, 'io.swp': 1,
501+
'io.sys': 1,
502+
}
510503
tags = {'host': 'my_host',
511504
'env': 'my_env',
512505
}
513506
cpu_metrics = ['.'.join(['my_host', m, f])
514507
for m in measurements
515-
for f in ['load', 'usage', 'user', 'idle']]
508+
for f in ['load', 'usage', 'user', 'idle',
509+
'io']]
516510
io_metrics = ['.'.join(['my_host', m, f])
517511
for m in measurements
518-
for f in ['user.io', 'idle.io', 'load.io', 'usage.io']]
512+
for f in ['io.usr', 'io.swp', 'io.sys']]
519513
self.client.drop_database(self.db_name)
520514
self.client.create_database(self.db_name)
521515
self.write_data(measurements, tags, fields)
@@ -530,13 +524,12 @@ def test_data_with_fields(self):
530524
nodes = list(self.finder.find_nodes(query))
531525
node_paths = sorted([n.path for n in nodes])
532526
expected = sorted(cpu_metrics)
533-
self.assertEqual(node_paths, expected,
534-
msg="Got query %s result %s - wanted %s" % (
535-
query.pattern, node_paths, expected,))
527+
self.assertEqual(node_paths, expected)
536528
time_info, data = self.finder.fetch_multi(nodes,
537529
int(self.start_time.strftime("%s")),
538530
int(self.end_time.strftime("%s")))
539-
for metric in cpu_metrics:
531+
cpu_leaf_nodes = [m for m in cpu_metrics if not m.endswith('io')]
532+
for metric in cpu_leaf_nodes:
540533
self.assertTrue(metric in data,
541534
msg="Did not get data for requested series %s - got data for %s" % (
542535
metric, data.keys(),))
@@ -547,11 +540,15 @@ def test_data_with_fields(self):
547540
time_info, _data = self.finder.fetch_multi(nodes,
548541
int(self.start_time.strftime("%s")),
549542
int(self.end_time.strftime("%s")))
550-
for metric in cpu_metrics:
543+
for metric in cpu_leaf_nodes:
551544
datapoints = [v for v in _data[metric] if v]
552545
self.assertTrue(len(datapoints) == self.num_datapoints,
553546
msg="Expected %s datapoints for %s - got %s" % (
554547
self.num_datapoints, metric, len(datapoints),))
548+
query = Query('%s.*.*.*' % (tags['host'],))
549+
nodes = list(self.finder.find_nodes(query))
550+
self.assertEqual(sorted([n.path for n in nodes]), sorted(io_metrics))
551+
self._test_data_in_nodes(nodes)
555552

556553
def test_multi_tag_values_multi_measurements(self):
557554
measurements = ['cpu-0', 'cpu-1', 'cpu-2', 'cpu-3']

0 commit comments

Comments
 (0)