Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit and Offset #4

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions akvo-top-asn
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import configparser
import requests
import yaml
import logging
from typing import Tuple, List, Dict


def setup_logging(args):
Expand Down Expand Up @@ -74,11 +75,13 @@ group.add_argument("--save", action="store_true", help="save result to local fil
group.add_argument("--upload", action="store_true", help="upload result to remote http url")
group.add_argument("--print", action="store_true", help="print result to stdout")

parser.add_argument("--query-limit", type=int, default=50)
parser.add_argument("--value-limit", type=int, default=1000)

# logging settings
logging_group = parser.add_mutually_exclusive_group()
logging_group.add_argument("--debug", action="store_true", help="enable debug logging")
logging_group.add_argument("--quiet", action="store_true", help="supress informational logging")
logging_group.add_argument("--quiet", action="store_true", help="suppress informational logging")

args = parser.parse_args()
logger = setup_logging(args)
Expand Down Expand Up @@ -148,11 +151,11 @@ def get_connection(config):
return clickhouse_connect.get_client(**params)


def query_clickhouse(time_range, direction):
def query_clickhouse(time_range: Tuple[str, str], direction: str, offset:int = 0, limit: int = 50):
query = f"""
WITH
source AS (SELECT * FROM flows_5m0s SETTINGS asterisk_include_alias_columns = 1),
rows AS (SELECT SrcAS FROM source WHERE TimeReceived BETWEEN toDateTime('{time_range[0]}', 'UTC') AND toDateTime('{time_range[1]}', 'UTC') AND (InIfBoundary = 'external') GROUP BY SrcAS ORDER BY SUM(Bytes) DESC LIMIT 50)
rows AS (SELECT SrcAS FROM source WHERE TimeReceived BETWEEN toDateTime('{time_range[0]}', 'UTC') AND toDateTime('{time_range[1]}', 'UTC') AND (InIfBoundary = 'external') AND WHERE SUM(Bytes) >= GROUP BY SrcAS ORDER BY SUM(Bytes) DESC LIMIT {limit} OFFSET {offset})
SELECT 1 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived + INTERVAL 900 second, INTERVAL 900 second) - INTERVAL 900 second AS time,
Expand Down Expand Up @@ -182,18 +185,32 @@ except clickhouse_connect.driver.exceptions.DatabaseError as ex:
logger.info("query time range: %s - %s", *time_range)


value_limit = config["query"].get("value_limit", 1000)
query_limit = config["query"].get("query_limit", 100)

# build list of bandwidths per ASN
asn_xps = {}
for direction in directions:
for axis, ts, xps, _asn in query_clickhouse(time_range, direction):
asn = _asn[0]
if asn in asn_xps:
if direction in asn_xps[asn]:
asn_xps[asn][direction].append(xps)
for direction in directions: # in bound and outbound
i = 0
while True:
logger.info(f"query results from {i * query_limit} until {(i + 1) * query_limit} ... ")

query_result = query_clickhouse(time_range, direction, limit=query_limit, offset= i * query_limit)
for axis, ts, xps, _asn in query_result:
asn = _asn[0]
if asn in asn_xps:
if direction in asn_xps[asn]:
asn_xps[asn][direction].append(xps)
else:
asn_xps[asn][direction] = [xps]
else:
asn_xps[asn][direction] = [xps]
asn_xps[asn] = {direction: [xps]}

if query_result < query_limit and (i * query_limit) < value_limit:
i += 1
else:
asn_xps[asn] = {direction: [xps]}
break


logger.info("building statistics...")

Expand Down Expand Up @@ -275,7 +292,6 @@ if args.print:
for metric in ["avg", "p95", "max"]:
columns.append(f"{{{direction.lower()}_{metric}: >9}}")
print(" {metric: <9}".format(metric=f"{direction.lower()}_{metric}"), end="")
print()

# sort & print ASN stats
for asn, stats in sorted(asn_stats.items(), key=lambda i: i[1]["out_p95"] + i[1]["in_p95"], reverse=True):
Expand Down
11 changes: 11 additions & 0 deletions akvo-top-asn.conf.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@
## Use localhost:8123 via http (default):
# host = localhost

[query]
## How to query clickhouse

## how many rows per query
# query_limit = 100

## how many rows will be extracted in total
# value_limit = 1000

## the maximum number of queries against clickhouse is value_limit / query_limit

[upload]
## upload server & auth (i.e. Nextcloud public share)
# url = https://cloud.dd-ix.net/public.php/webdav
Expand Down