Skip to content

Commit 8f994b0

Browse files
author
Dan
committed
Made queries for multiple unique tag values across multiple measurements run in a single statement - resolves #15
1 parent b470fcd commit 8f994b0

File tree

3 files changed

+72
-94
lines changed

3 files changed

+72
-94
lines changed

influxgraph/classes/finder.py

Lines changed: 50 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -359,108 +359,89 @@ def _get_template_values_from_paths(self, paths, _filter, template,
359359
'fields', []):
360360
path_measurements[measurement].setdefault(
361361
'fields', []).append(field)
362-
path_measurements[measurement]['tags'] = list(
363-
itertools.product(*_tags.values()))
362+
path_measurements[measurement].setdefault(
363+
'template', template)
364364
return _measurements, _tags, _fields, matched_paths
365365

366366
def _get_all_template_values(self, paths):
367367
paths = paths[:]
368-
query_data = deque()
369368
path_measurements = {}
369+
measurements, tags, fields = deque(), deque(), set()
370370
for (_filter, template, default_tags, separator) in self.graphite_templates:
371-
# One influx query statement per template
371+
# One influx measurement queried per template
372372
if not paths:
373373
break
374374
_measurements, _tags, _fields, matched_paths = \
375375
self._get_template_values_from_paths(
376376
paths, _filter, template, default_tags, separator,
377377
path_measurements)
378378
if _measurements:
379-
# Found template match for path, add query data and
379+
# Found template match for path, append query data and
380380
# remove matched paths so we do not try to match them again
381-
query_data.append((_measurements, _tags, _fields))
381+
measurements.extend(_measurements)
382+
tags.append(_tags)
383+
fields = fields.union(_fields)
382384
for path in matched_paths:
383385
del paths[paths.index(path)]
384-
return query_data, path_measurements
386+
return measurements, tags, fields, path_measurements
385387

386-
def _gen_query(self, _measurements, _tags, _fields, retention):
387-
# import ipdb; ipdb.set_trace()
388+
def _gen_query(self, measurements, tags, fields, retention):
389+
groupings = set([k for t in tags for k in t.keys()])
388390
measurements = ', '.join(
389-
('"%s"."%s"' % (retention, measure,) for measure in _measurements)) \
391+
('"%s"."%s"' % (retention, measure,) for measure in measurements)) \
390392
if retention \
391-
else ', '.join(('"%s"' % (measure,) for measure in _measurements))
392-
groupings = deque()
393-
tags = deque()
394-
for tag in _tags:
395-
if len(_tags[tag]) > 1:
396-
groupings.append(tag)
397-
continue
398-
tags.append(""""%s" = '%s'""" % (tag, _tags[tag],))
399-
# tag_sets = [[""""%s" = '%s'""" % (tag, tag_val,)
400-
# for tag_val in _tags[tag]]
401-
# for tag in _tags] \
402-
# if _tags else None
403-
####
404-
# tags = [' AND '.join(['(%s)' % ' OR '.join(
405-
# [""""%s" = '%s'""" % (tag, tag_val,) for tag_val in _tags[tag]])
406-
# for tag in _tags])] if _tags else None
407-
####
408-
# tag_pairs = itertools.product(*tag_sets) if tag_sets else None
409-
# tags = [" AND ".join(t) for t in tag_pairs] if tag_pairs else None
410-
fields = _fields if _fields else ['value']
411-
return measurements, tags, fields, groupings
393+
else ', '.join(('"%s"' % (measure,) for measure in measurements))
394+
_tags = ' OR '.join(['(%s)' % (tag_set,) for tag_set in [
395+
' AND '.join(['(%s)' % ' OR '.join([
396+
""""%s" = '%s'""" % (tag, tag_val,)
397+
for tag_val in __tags[tag]])
398+
for tag in __tags])
399+
for __tags in tags]]) if tags else None
400+
fields = fields if fields else ['value']
401+
return measurements, _tags, fields, groupings
412402

413403
def _gen_query_values_from_templates(self, paths, retention):
414-
_query_data, path_measurements = self._get_all_template_values(paths)
415-
if len(_query_data) == 0:
416-
return
417-
query_data = deque()
418-
for (_measurements, _tags, _fields) in _query_data:
419-
measurements, tags, fields, groupings = self._gen_query(
420-
_measurements, _tags, _fields, retention)
421-
query_data.append((measurements, tags, fields, groupings),)
422-
# import ipdb; ipdb.set_trace()
423-
return query_data, path_measurements
404+
measurements, tags, fields, path_measurements = \
405+
self._get_all_template_values(paths)
406+
measurements, tags, fields, groupings = self._gen_query(
407+
measurements, tags, fields, retention)
408+
return measurements, tags, fields, groupings, path_measurements
424409

425410
def _gen_query_values(self, paths, retention):
426411
if self.graphite_templates:
427412
return self._gen_query_values_from_templates(paths, retention)
428413
measurement = ', '.join(('"%s"."%s"' % (retention, path,) for path in paths)) \
429414
if retention \
430415
else ', '.join(('"%s"' % (path,) for path in paths))
431-
return ((measurement, None, ['value']),), None
432-
433-
def _gen_unique_infl_queries(self, query_data, start_time, end_time,
434-
aggregation_func, interval):
435-
queries = deque()
436-
for (measurement, tags, fields) in query_data:
437-
query_fields = ', '.join(['%s("%s") as "%s"' % (
438-
aggregation_func, field, field) for field in fields])
439-
query = 'select %s from %s where (time > %ds and time <= %ds)' % (
440-
query_fields, measurement, start_time, end_time,)
441-
group_by = 'GROUP BY time(%ss) fill(previous)' % (interval,)
442-
if tags:
443-
# Add sub queries
444-
queries.append(';'.join(
445-
["%s AND %s %s" % (query, tag, group_by,)
446-
for tag in tags]))
447-
else:
448-
queries.append(" ".join([query, group_by]))
449-
return queries
416+
return measurement, None, ['value'], None, None
417+
418+
def _gen_infl_stmt(self, measurements, tags, fields, groupings, start_time,
419+
end_time, aggregation_func, interval):
420+
time_clause = "(time > %ds and time <= %ds)" % (start_time, end_time,)
421+
query_fields = ', '.join(['%s("%s") as "%s"' % (
422+
aggregation_func, field, field) for field in fields])
423+
groupings = ['"%s"' % (grouping,) for grouping in groupings] \
424+
if groupings else []
425+
groupings.insert(0, 'time(%ss)' % (interval,))
426+
groupings = ', '.join(groupings)
427+
where_clause = "%s AND %s" % (time_clause, tags,) if tags else \
428+
time_clause
429+
group_by = '%s fill(previous)' % (groupings,)
430+
query = 'select %s from %s where %s GROUP BY %s' % (
431+
query_fields, measurements, where_clause, group_by,)
432+
return query
450433

451434
def _gen_influxdb_stmt(self, start_time, end_time, paths, interval):
452435
retention = get_retention_policy(interval, self.retention_policies) \
453436
if self.retention_policies else None
454437
aggregation_func = self._gen_aggregation_func(paths)
455438
memcache_key = gen_memcache_key(start_time, end_time, aggregation_func,
456439
paths)
457-
try:
458-
query_data, path_measurements = self._gen_query_values(paths, retention)
459-
except TypeError:
460-
return
461-
queries = self._gen_unique_infl_queries(
462-
query_data, start_time, end_time, aggregation_func, interval)
463-
query = ';'.join(queries)
440+
measurements, tags, fields, groupings, path_measurements = \
441+
self._gen_query_values(paths, retention)
442+
query = self._gen_infl_stmt(measurements, tags, fields, groupings,
443+
start_time, end_time, aggregation_func,
444+
interval)
464445
return query, memcache_key, path_measurements
465446

466447
def _make_empty_multi_fetch_result(self, time_info, paths):
@@ -488,7 +469,8 @@ def fetch_multi(self, nodes, start_time, end_time):
488469
try:
489470
query, memcache_key, path_measurements = self._gen_influxdb_stmt(
490471
start_time, end_time, paths, interval)
491-
except TypeError:
472+
except TypeError as ex:
473+
logger.error("Type error generating query statement - %s", ex)
492474
return self._make_empty_multi_fetch_result(time_info, paths)
493475
data = self.memcache.get(memcache_key) if self.memcache else None
494476
if data:
@@ -506,7 +488,6 @@ def fetch_multi(self, nodes, start_time, end_time):
506488
logger.debug("Calling influxdb multi fetch with query - %s", query)
507489
data = self.client.query(query, params=_INFLUXDB_CLIENT_PARAMS)
508490
logger.debug('fetch_multi() - Retrieved %d result set(s)', len(data))
509-
# import ipdb; ipdb.set_trace()
510491
data = read_influxdb_values(data, paths, path_measurements)
511492
timer.stop()
512493
# Graphite API requires that data contain keys for

influxgraph/utils.py

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
from .constants import INFLUXDB_AGGREGATIONS
2626
try:
2727
from .ext.classes.tree import NodeTreeIndex
28-
from .ext.templates import get_series_with_tags
28+
from .ext.templates import get_series_with_tags, heapsort, _make_path_from_template
2929
except ImportError:
3030
from .classes.tree import NodeTreeIndex
31-
from .templates import get_series_with_tags
31+
from .templates import get_series_with_tags, heapsort, _make_path_from_template
3232

3333
def calculate_interval(start_time, end_time, deltas=None):
3434
"""Calculates wanted data series interval according to start and end times
@@ -162,39 +162,34 @@ def get_aggregation_func(path, aggregation_functions):
162162
return aggregation_functions[pattern]
163163
return 'mean'
164164

165-
def _find_metric_name(measurement_paths, tag_sets, field, fields):
166-
for tag_set in tag_sets:
167-
for path in measurement_paths:
168-
if field in fields \
169-
and field in path \
170-
and len([t for t in tag_set if t in path]) == len(tag_set):
171-
del measurement_paths[measurement_paths.index(path)]
172-
return path
173-
174-
def _retrieve_named_field_data(infl_data, path_measurements, measurement, _data):
165+
def _retrieve_named_field_data(infl_data, path_measurements, measurement, tags, _data):
175166
measurement_paths = path_measurements[measurement]['paths'][:]
176-
tag_sets = path_measurements[measurement]['tags'][:]
177-
field_keys = next(infl_data.get_points(measurement)).keys()
167+
field_keys = next(infl_data.get_points(measurement, tags)).keys()
178168
point_fields = sorted([k for k in field_keys if k != 'time'])
179169
for field in point_fields:
180-
metric = _find_metric_name(
181-
measurement_paths, tag_sets, field,
182-
path_measurements[measurement]['fields'])
183-
if not metric:
170+
split_path = []
171+
_make_path_from_template(
172+
split_path, measurement,
173+
path_measurements[measurement]['template'], tags.items())
174+
split_path = [p[1] for p in heapsort(split_path)]
175+
split_path.append(field)
176+
metric = '.'.join(split_path)
177+
if not metric in measurement_paths:
184178
continue
179+
del measurement_paths[measurement_paths.index(metric)]
185180
_data[metric] = [d[field]
186-
for d in infl_data.get_points(measurement)]
181+
for d in infl_data.get_points(measurement, tags)]
187182
path_measurements[measurement]['paths'] = measurement_paths
188183

189184
def _retrieve_field_data(infl_data, path_measurements, measurement,
190-
metric, _data):
185+
metric, tags, _data):
191186
# Retrieve value field data
192187
if 'value' in path_measurements[measurement]['fields']:
193188
_data[metric] = [d['value']
194-
for d in infl_data.get_points(measurement)]
189+
for d in infl_data.get_points(measurement, tags)]
195190
# Retrieve non value named field data with fields from path_measurements
196191
_retrieve_named_field_data(infl_data, path_measurements,
197-
measurement, _data)
192+
measurement, tags, _data)
198193

199194
def read_influxdb_values(influxdb_data, paths, path_measurements):
200195
"""Return key -> values dict for values from InfluxDB data"""
@@ -206,6 +201,7 @@ def read_influxdb_values(influxdb_data, paths, path_measurements):
206201
for infl_data in influxdb_data:
207202
for infl_keys in infl_data.keys():
208203
measurement = infl_keys[0]
204+
tags = infl_keys[1]
209205
if not path_measurements:
210206
if not measurement in paths:
211207
continue
@@ -224,7 +220,7 @@ def read_influxdb_values(influxdb_data, paths, path_measurements):
224220
if metric not in paths:
225221
continue
226222
_retrieve_field_data(infl_data, path_measurements,
227-
measurement, metric, _data)
223+
measurement, metric, tags, _data)
228224
return _data
229225

230226
def gen_memcache_pattern_key(pattern):

tests/test_influxdb_templates_integration.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,6 @@ def test_template_multi_tags_multi_templ_multi_nodes_no_fields(self):
373373
int(self.end_time.strftime("%s")))
374374
self.assertTrue(data[cpu_metric_nodes[0].path][-1] == idle_data['value'],
375375
msg="Got incorrect data from multi-tag query")
376-
# import ipdb; ipdb.set_trace()
377376

378377
def test_template_multi_tags_multi_templ_multi_nodes(self):
379378
self.client.drop_database(self.db_name)
@@ -767,7 +766,9 @@ def test_multi_tag_values_multi_measurements(self):
767766
elif tags_env2_h2['env'] in metric and tags_env2_h2['host'] in metric:
768767
fields = env2_h2_fields
769768
field = [f for f in list(fields.keys()) if metric.endswith(f)][0]
770-
self.assertTrue(data[metric][-1] == fields[field])
769+
self.assertTrue(data[metric][-1] == fields[field],
770+
msg="Incorrect data for metric %s. Should be %s, got %s" % (
771+
metric, fields[field], data[metric][-1]))
771772

772773
def test_field_data_part_or_no_template_match(self):
773774
del self.finder

0 commit comments

Comments
 (0)