From ca0228b691c298c4b24ddb465514ecaa3b90916f Mon Sep 17 00:00:00 2001 From: kdiluca Date: Tue, 22 Aug 2017 15:26:54 -0400 Subject: [PATCH 1/4] reporter scripts used for debugging; follow instructions for use in google doc --- py/cat_to_file.py | 52 +++++++++++++++++++++++++++++++++++++++++ py/get_data.sh | 46 ++++++++++++++++++++++++++++++++++++ py/make_day_requests.sh | 45 +++++++++++++++++++++++++++++++++++ run_reporter.sh | 32 +++++++++++++++++++++++++ 4 files changed, 175 insertions(+) create mode 100755 py/cat_to_file.py create mode 100755 py/get_data.sh create mode 100755 py/make_day_requests.sh create mode 100755 run_reporter.sh diff --git a/py/cat_to_file.py b/py/cat_to_file.py new file mode 100755 index 0000000..ce9adb2 --- /dev/null +++ b/py/cat_to_file.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +import sys +import os +import argparse +import logging +import json +import re + +#parse a couple of options +parser = argparse.ArgumentParser(description='Generate reporter post body', formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument('file', metavar='F', type=str, nargs=1, help='A file name to be read from, use - for stdin') +parser.add_argument('--key-with', type=str, help='A lambda of the form "lambda line: line.do_something()" such that the program can extract a key from a given line of input') + +args = parser.parse_args() +args.file = args.file[0] + +#Manila Extract BBox +bb_ymin = 14.501 +bb_xmin = 120.9 +bb_ymax = 14.70 +bb_xmax = 121.13 + +exec('key_with = ' + (args.key_with if args.key_with else 'None')) + +#output a single body +#for each line from stdin +handle = open(args.file, 'r') if args.file != '-' else sys.stdin +out_file=[] +for line in handle: + #try to work on the line as normal + try: + l = line.rstrip() + pieces = l.split('|') + lat=float(pieces[9]) + lon=float(pieces[10]) + if lat >= bb_ymin and lat <= bb_ymax and lon >= bb_xmin and lon <= bb_xmax: + key = bytes(key_with(l)) if key_with else None + value = bytes(l.rstrip()) + out_file.append(value+"/n") + + except Exception as e: + sys.stderr.write(repr(e)) + sys.stderr.write(os.linesep) + +print(str(len(out_file))) + +#done +if args.file != '-': + handle.close() + +with open('2017-01-02', "ab") as f: + f.write(str(out_file)) diff --git a/py/get_data.sh b/py/get_data.sh new file mode 100755 index 0000000..dd6e649 --- /dev/null +++ b/py/get_data.sh @@ -0,0 +1,46 @@ +#!/bin/bash +trap "kill 0" EXIT +which parallel &> /dev/null +if [ $? != 0 ]; then + echo "parallel is required please install it" + echo "sudo apt-get install parallel" + exit 1 +fi +set -e +function usage { + echo -e "Usage:\n-s s3 bucket url\n-f regex to use with grep to get interesting files\n" 1>&2 + echo "Example: AWS_DEFAULT_PROFILE=opentraffic $0 -s s3://heaps_of_data/2016_11/ -f 2016_11_01.*gz -b 172.17.0.1:9092 -t mytopic" 1>&2 + echo "Note: bucket listing is not recursive" 1>&2 + echo "Note: data is pipe delimited: date|id|x|x|x|x|x|x|x|lat|lon|x|x with date format: %Y-%m-%d %H:%M:%S" 1>&2 + exit 1 +} +while getopts ":s:f:" opt; do + case $opt in + s) s3_dir=$(echo ${OPTARG} | sed -e 's@/\?$@/@g') + ;; + f) file_re="${OPTARG}" + ;; + \?) echo "Invalid option -${OPTARG}" 1>&2 && usage + ;; + esac +done +if [[ -z ${s3_dir} ]] || [[ -z ${file_re} ]]; then + echo "Missing required option" 1>&2 && usage +fi +#get all the list of files we'll be working with +echo "Retrieving file list from s3" +files=$(aws s3 ls ${s3_dir} | awk '{print $4}' | grep -E ${file_re} | tr '\n' ' ') +echo "Processing $(echo ${files} | tr ' ' '\n' | wc -l) files" +echo $files +rm -f 2017_02 +touch 2017_02 +for file in ${files}; do + #download in the foreground + echo "Retrieving ${file} from s3" && aws s3 cp ${s3_dir}${file} . &> /dev/null + zcat ${file} | sort | awk -F"|" '( $10 >= 14.501 ) && ( $10 <= 14.70 ) && ( $11 >= 120.9 ) && ( $11 <= 121.13 ) { print $0 }' >> 2017-01-02 + #zcat ${file} | sort | ./cat_to_file.py --key-with 'lambda line: line.split("|")[1]' - + echo "Finished POST'ing ${file}" && rm -f ${file} +done + +sort 2017-01-02 >> 2017-01-02_sorted +echo "Finished sorting!" diff --git a/py/make_day_requests.sh b/py/make_day_requests.sh new file mode 100755 index 0000000..b5f300c --- /dev/null +++ b/py/make_day_requests.sh @@ -0,0 +1,45 @@ +#!/bin/bash +trap "kill 0" EXIT +which parallel &> /dev/null +if [ $? != 0 ]; then + echo "parallel is required please install it" + echo "sudo apt-get install parallel" + exit 1 +fi +set -e + +function usage { + echo -e "Usage:\n-f day file to process\n" 1>&2 + echo "Example: AWS_DEFAULT_PROFILE=opentraffic $0 -f 2017_01_02.gz -b localhost:9092 -t raw" 1>&2 + echo "Note: bucket listing is not recursive" 1>&2 + echo "Note: data is pipe delimited: date|id|x|x|x|x|x|x|x|lat|lon|x|x with date format: %Y-%m-%d %H:%M:%S" 1>&2 + exit 1 +} + +while getopts ":s:f:b:t:" opt; do + case $opt in + f) day_file="${OPTARG}" + ;; + b) bootstrap="${OPTARG}" + ;; + t) topic="${OPTARG}" + ;; + \?) echo "Invalid option -${OPTARG}" 1>&2 && usage + ;; + esac +done + +hash=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 16 | head -n 1) +key_with='lambda line: line.split("|")[1]' +value_with="lambda line: re.sub(r'(.*:[0-5][0-9]\\|)([0-9]+)(\\|.*)', r'\\1\\2${hash}\\3', line)" + +if [[ -z ${day_file} ]] || [[ -z ${bootstrap} ]] || [[ -z ${topic} ]]; then + echo "Missing required option" 1>&2 && usage +fi + +echo "day_file=${day_file}" +echo "bootstrap=${bootstrap}" +echo "topic=${topic}" +#send to kafka producer +zcat ${day_file} | ./cat_to_kafka.py --bootstrap ${bootstrap} --topic ${topic} --key-with "${key_with}" --value-with "${value_with}" - + diff --git a/run_reporter.sh b/run_reporter.sh new file mode 100755 index 0000000..e7a6c7f --- /dev/null +++ b/run_reporter.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +#setup the dir where the container will find valhalla tiles (tiles.tar) +valhalla_data_dir=/data/valhalla + +#pick parallelism +partitions=4 + +#kill all docker containers +docker rm -f $(docker ps -qa) + +#start zookeeper +docker run -d --net opentraffic -p 2181:2181 --name zookeeper wurstmeister/zookeeper:latest + +#start kafka brokers +docker run -d --net opentraffic -p 9092:9092 -e "KAFKA_ADVERTISED_HOST_NAME=172.17.0.1" -e "KAFKA_ADVERTISED_PORT=9092" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_CREATE_TOPICS=raw:4:1,formatted:4:1,batched:4:1" -v /var/run/docker.sock:/var/run/docker.sock --name kafka wurstmeister/kafka:latest + +#wait for topics to be created +sleep 15 + +for i in {0..3}; do + target/reporter-kafka -b localhost:9092 -t raw,formatted,batched -f ',sv,\|,1,9,10,0,5,yyyy-MM-dd HH:mm:ss' -u http://localhost:8002/report? -p 2 -q 3600 -i 3600 -s DEBUG -o /home/kdiluca/sandbox/open_traffic/reporter/results &> ${i}.log & +done + +#start some traffic segment matchers +#docker run -d --net opentraffic -p 8002 --name reporter-py -e "THREAD_POOL_COUNT=${partitions}" -v ${valhalla_data_dir}:/data/valhalla opentraffic/reporter:latest +THREAD_POOL_COUNT=${partitions} PYTHONPATH=../../valhalla/valhalla/.libs/ py/reporter_service.py ../../conf/valhalla.json localhost:8002 + +#now load in data with something like this +echo 'cd py' +echo './make_day_requests.sh -f 2017_01_02_sorted.gz -b localhost:9092 -t raw' + From dab1178be77c98b6cfdca8de7b49b96b3340bbe6 Mon Sep 17 00:00:00 2001 From: Duane Gearhart Date: Thu, 24 Aug 2017 14:23:31 -0400 Subject: [PATCH 2/4] Updated example --- py/make_day_requests.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/py/make_day_requests.sh b/py/make_day_requests.sh index b5f300c..c9c7d09 100755 --- a/py/make_day_requests.sh +++ b/py/make_day_requests.sh @@ -9,8 +9,8 @@ fi set -e function usage { - echo -e "Usage:\n-f day file to process\n" 1>&2 - echo "Example: AWS_DEFAULT_PROFILE=opentraffic $0 -f 2017_01_02.gz -b localhost:9092 -t raw" 1>&2 + echo -e "Usage:\n-f sorted day file to process\n" 1>&2 + echo "Example: $0 -f 2017_01_02_12_sorted.gz -b localhost:9092 -t raw" 1>&2 echo "Note: bucket listing is not recursive" 1>&2 echo "Note: data is pipe delimited: date|id|x|x|x|x|x|x|x|lat|lon|x|x with date format: %Y-%m-%d %H:%M:%S" 1>&2 exit 1 From c521f3f392407f8f20fc83a5379fef35f7b03989 Mon Sep 17 00:00:00 2001 From: Grant Heffernan Date: Tue, 29 Aug 2017 07:30:21 -0400 Subject: [PATCH 3/4] dont call punctuate from close (#99) (#100) * dont call punctuate from close, the kvstore will keep the data around for the next worker to use * we learned to delete while iterating from the anonymising processor * fix 200kph print * simplify --- py/reporter_service.py | 3 ++- .../reporter/AnonymisingProcessor.java | 1 - .../reporter/BatchingProcessor.java | 22 ++++++------------- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/py/reporter_service.py b/py/reporter_service.py index 3bb3d7e..ef05879 100755 --- a/py/reporter_service.py +++ b/py/reporter_service.py @@ -172,7 +172,8 @@ def report(self, trace): else: #Excessive speed - log this as an error sys.stderr.write("Speed exceeds 200kph\n") - #sys.stderr.write(json.dumps(trace)) + #sys.stderr.write(json.dumps(trace) + '\n') + sys.stderr.flush() invalid_speed_count += 1 #Log prior segments on local level not being reported; lets do a count and track prior_segment_ids else: diff --git a/src/main/java/io/opentraffic/reporter/AnonymisingProcessor.java b/src/main/java/io/opentraffic/reporter/AnonymisingProcessor.java index 9ed43c6..b2076e7 100644 --- a/src/main/java/io/opentraffic/reporter/AnonymisingProcessor.java +++ b/src/main/java/io/opentraffic/reporter/AnonymisingProcessor.java @@ -263,7 +263,6 @@ public void punctuate(long timestamp) { @Override public void close() { - punctuate(0); } }; diff --git a/src/main/java/io/opentraffic/reporter/BatchingProcessor.java b/src/main/java/io/opentraffic/reporter/BatchingProcessor.java index 00deb7d..c624c2b 100644 --- a/src/main/java/io/opentraffic/reporter/BatchingProcessor.java +++ b/src/main/java/io/opentraffic/reporter/BatchingProcessor.java @@ -81,26 +81,20 @@ public void process(String key, Point point) { @Override public void punctuate(long timestamp) { //find which ones need to go - HashSet to_delete = new HashSet(); KeyValueIterator it = store.all(); while(it.hasNext()) { KeyValue kv = it.next(); - if(kv != null && (kv.value == null || timestamp - kv.value.last_update > SESSION_GAP)) - to_delete.add(kv.key); - } - it.close(); - - //off to the glue factory with you guys - for(String key : to_delete) { - //TODO: dont actually report here, instead insert into a queue that a thread can drain asynchronously - logger.debug("Evicting " + key + " as it was stale"); - Batch batch = store.delete(key); - if(batch != null) { - int reported = forward(batch.report(key, url, 0, 2, 0)); + //off to the glue factory with you + if(kv.value == null || timestamp - kv.value.last_update > SESSION_GAP) { + logger.debug("Evicting " + kv.key + " as it was stale"); + store.delete(kv.key); + //report what we can + int reported = forward(kv.value.report(kv.key, url, 0, 2, 0)); if(reported > 0) logger.debug("Reported on " + reported + " segment pairs during eviction"); } } + it.close(); } private int forward(JsonNode result) { @@ -140,8 +134,6 @@ else if(result != null) { @Override public void close() { - //take care of the rest of the stuff thats hanging around - punctuate(Long.MAX_VALUE); } }; } From 0704596c9bdf6ac4f60603705297f022beb0361b Mon Sep 17 00:00:00 2001 From: kdiluca Date: Thu, 31 Aug 2017 13:14:03 -0400 Subject: [PATCH 4/4] small edits to debug scripts --- py/cat_to_file.py | 3 ++- py/get_data.sh | 10 +++++----- py/make_day_requests.sh | 2 +- run_reporter.sh | 13 +++++++------ 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/py/cat_to_file.py b/py/cat_to_file.py index ce9adb2..1179c0a 100755 --- a/py/cat_to_file.py +++ b/py/cat_to_file.py @@ -15,6 +15,7 @@ args.file = args.file[0] #Manila Extract BBox + bb_ymin = 14.501 bb_xmin = 120.9 bb_ymax = 14.70 @@ -48,5 +49,5 @@ if args.file != '-': handle.close() -with open('2017-01-02', "ab") as f: +with open('2017-01-01', "ab") as f: f.write(str(out_file)) diff --git a/py/get_data.sh b/py/get_data.sh index dd6e649..3238084 100755 --- a/py/get_data.sh +++ b/py/get_data.sh @@ -32,15 +32,15 @@ echo "Retrieving file list from s3" files=$(aws s3 ls ${s3_dir} | awk '{print $4}' | grep -E ${file_re} | tr '\n' ' ') echo "Processing $(echo ${files} | tr ' ' '\n' | wc -l) files" echo $files -rm -f 2017_02 -touch 2017_02 +#rm -f 2017__01_01 +#touch 2017_01_01 for file in ${files}; do #download in the foreground echo "Retrieving ${file} from s3" && aws s3 cp ${s3_dir}${file} . &> /dev/null - zcat ${file} | sort | awk -F"|" '( $10 >= 14.501 ) && ( $10 <= 14.70 ) && ( $11 >= 120.9 ) && ( $11 <= 121.13 ) { print $0 }' >> 2017-01-02 - #zcat ${file} | sort | ./cat_to_file.py --key-with 'lambda line: line.split("|")[1]' - + #zcat ${file} | sort | awk -F"|" '( $10 >= 14.501 ) && ( $10 <= 14.70 ) && ( $11 >= 120.9 ) && ( $11 <= 121.13 ) { print $0 }' >> 2017-01-01 + zcat ${file} | sort | ./cat_to_file.py --key-with 'lambda line: line.split("|")[1]' - echo "Finished POST'ing ${file}" && rm -f ${file} done -sort 2017-01-02 >> 2017-01-02_sorted +sort 2017-01-01 >> 2017-01-01_sorted echo "Finished sorting!" diff --git a/py/make_day_requests.sh b/py/make_day_requests.sh index b5f300c..7ba4ad0 100755 --- a/py/make_day_requests.sh +++ b/py/make_day_requests.sh @@ -10,7 +10,7 @@ set -e function usage { echo -e "Usage:\n-f day file to process\n" 1>&2 - echo "Example: AWS_DEFAULT_PROFILE=opentraffic $0 -f 2017_01_02.gz -b localhost:9092 -t raw" 1>&2 + echo "Example: AWS_DEFAULT_PROFILE=opentraffic $0 -f 2017_01_01_partial.gz -b localhost:9092 -t raw" 1>&2 echo "Note: bucket listing is not recursive" 1>&2 echo "Note: data is pipe delimited: date|id|x|x|x|x|x|x|x|lat|lon|x|x with date format: %Y-%m-%d %H:%M:%S" 1>&2 exit 1 diff --git a/run_reporter.sh b/run_reporter.sh index e7a6c7f..2a647ae 100755 --- a/run_reporter.sh +++ b/run_reporter.sh @@ -4,29 +4,30 @@ valhalla_data_dir=/data/valhalla #pick parallelism -partitions=4 +partitions=7 #kill all docker containers docker rm -f $(docker ps -qa) +docker rmi -f $(docker images -q) #start zookeeper docker run -d --net opentraffic -p 2181:2181 --name zookeeper wurstmeister/zookeeper:latest #start kafka brokers -docker run -d --net opentraffic -p 9092:9092 -e "KAFKA_ADVERTISED_HOST_NAME=172.17.0.1" -e "KAFKA_ADVERTISED_PORT=9092" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_CREATE_TOPICS=raw:4:1,formatted:4:1,batched:4:1" -v /var/run/docker.sock:/var/run/docker.sock --name kafka wurstmeister/kafka:latest +docker run -d --net opentraffic -p 9092:9092 -e "KAFKA_ADVERTISED_HOST_NAME=172.17.0.1" -e "KAFKA_ADVERTISED_PORT=9092" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_CREATE_TOPICS=raw:7:1,formatted:7:1,batched:7:1" -v /var/run/docker.sock:/var/run/docker.sock --name kafka wurstmeister/kafka:latest #wait for topics to be created sleep 15 -for i in {0..3}; do - target/reporter-kafka -b localhost:9092 -t raw,formatted,batched -f ',sv,\|,1,9,10,0,5,yyyy-MM-dd HH:mm:ss' -u http://localhost:8002/report? -p 2 -q 3600 -i 3600 -s DEBUG -o /home/kdiluca/sandbox/open_traffic/reporter/results &> ${i}.log & +for i in {0..6}; do + target/reporter-kafka -b localhost:9092 -t raw,formatted,batched -f ',sv,\|,1,9,10,0,5,yyyy-MM-dd HH:mm:ss' -u http://localhost:8002/report? -p 2 -q 3600 -i 3600 -s DEBUG -o /data/opentraffic/reporter/results &> ${i}.log & done #start some traffic segment matchers #docker run -d --net opentraffic -p 8002 --name reporter-py -e "THREAD_POOL_COUNT=${partitions}" -v ${valhalla_data_dir}:/data/valhalla opentraffic/reporter:latest -THREAD_POOL_COUNT=${partitions} PYTHONPATH=../../valhalla/valhalla/.libs/ py/reporter_service.py ../../conf/valhalla.json localhost:8002 +THREAD_POOL_COUNT=7 PYTHONPATH=../../valhalla/valhalla/.libs/ py/reporter_service.py ../../conf/valhalla.json localhost:8002 #now load in data with something like this echo 'cd py' -echo './make_day_requests.sh -f 2017_01_02_sorted.gz -b localhost:9092 -t raw' +echo './make_day_requests.sh -f 2017-01-01.gz -b localhost:9092 -t raw' '