Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug/Local Reporter Scripts for Testing Entire OT process #94

Open
wants to merge 13 commits into
base: dev
Choose a base branch
from
Open
52 changes: 52 additions & 0 deletions py/cat_to_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env python
import sys
import os
import argparse
import logging
import json
import re

#parse a couple of options
parser = argparse.ArgumentParser(description='Generate reporter post body', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('file', metavar='F', type=str, nargs=1, help='A file name to be read from, use - for stdin')
parser.add_argument('--key-with', type=str, help='A lambda of the form "lambda line: line.do_something()" such that the program can extract a key from a given line of input')

args = parser.parse_args()
args.file = args.file[0]

#Manila Extract BBox
bb_ymin = 14.501
bb_xmin = 120.9
bb_ymax = 14.70
bb_xmax = 121.13

exec('key_with = ' + (args.key_with if args.key_with else 'None'))

#output a single body
#for each line from stdin
handle = open(args.file, 'r') if args.file != '-' else sys.stdin
out_file=[]
for line in handle:
#try to work on the line as normal
try:
l = line.rstrip()
pieces = l.split('|')
lat=float(pieces[9])
lon=float(pieces[10])
if lat >= bb_ymin and lat <= bb_ymax and lon >= bb_xmin and lon <= bb_xmax:
key = bytes(key_with(l)) if key_with else None
value = bytes(l.rstrip())
out_file.append(value+"/n")

except Exception as e:
sys.stderr.write(repr(e))
sys.stderr.write(os.linesep)

print(str(len(out_file)))

#done
if args.file != '-':
handle.close()

with open('2017-01-02', "ab") as f:
f.write(str(out_file))
46 changes: 46 additions & 0 deletions py/get_data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash
trap "kill 0" EXIT
which parallel &> /dev/null
if [ $? != 0 ]; then
echo "parallel is required please install it"
echo "sudo apt-get install parallel"
exit 1
fi
set -e
function usage {
echo -e "Usage:\n-s s3 bucket url\n-f regex to use with grep to get interesting files\n" 1>&2
echo "Example: AWS_DEFAULT_PROFILE=opentraffic $0 -s s3://heaps_of_data/2016_11/ -f 2016_11_01.*gz -b 172.17.0.1:9092 -t mytopic" 1>&2
echo "Note: bucket listing is not recursive" 1>&2
echo "Note: data is pipe delimited: date|id|x|x|x|x|x|x|x|lat|lon|x|x with date format: %Y-%m-%d %H:%M:%S" 1>&2
exit 1
}
while getopts ":s:f:" opt; do
case $opt in
s) s3_dir=$(echo ${OPTARG} | sed -e 's@/\?$@/@g')
;;
f) file_re="${OPTARG}"
;;
\?) echo "Invalid option -${OPTARG}" 1>&2 && usage
;;
esac
done
if [[ -z ${s3_dir} ]] || [[ -z ${file_re} ]]; then
echo "Missing required option" 1>&2 && usage
fi
#get all the list of files we'll be working with
echo "Retrieving file list from s3"
files=$(aws s3 ls ${s3_dir} | awk '{print $4}' | grep -E ${file_re} | tr '\n' ' ')
echo "Processing $(echo ${files} | tr ' ' '\n' | wc -l) files"
echo $files
rm -f 2017_02
touch 2017_02
for file in ${files}; do
#download in the foreground
echo "Retrieving ${file} from s3" && aws s3 cp ${s3_dir}${file} . &> /dev/null
zcat ${file} | sort | awk -F"|" '( $10 >= 14.501 ) && ( $10 <= 14.70 ) && ( $11 >= 120.9 ) && ( $11 <= 121.13 ) { print $0 }' >> 2017-01-02
#zcat ${file} | sort | ./cat_to_file.py --key-with 'lambda line: line.split("|")[1]' -
echo "Finished POST'ing ${file}" && rm -f ${file}
done

sort 2017-01-02 >> 2017-01-02_sorted
echo "Finished sorting!"
45 changes: 45 additions & 0 deletions py/make_day_requests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash
trap "kill 0" EXIT
which parallel &> /dev/null
if [ $? != 0 ]; then
echo "parallel is required please install it"
echo "sudo apt-get install parallel"
exit 1
fi
set -e

function usage {
echo -e "Usage:\n-f day file to process\n" 1>&2
echo "Example: AWS_DEFAULT_PROFILE=opentraffic $0 -f 2017_01_02.gz -b localhost:9092 -t raw" 1>&2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update Usage to reflect actual args (e.g., AWS_DEFAULT_PROFILE not needed)

echo "Note: bucket listing is not recursive" 1>&2
echo "Note: data is pipe delimited: date|id|x|x|x|x|x|x|x|lat|lon|x|x with date format: %Y-%m-%d %H:%M:%S" 1>&2
exit 1
}

while getopts ":s:f:b:t:" opt; do
case $opt in
f) day_file="${OPTARG}"
;;
b) bootstrap="${OPTARG}"
;;
t) topic="${OPTARG}"
;;
\?) echo "Invalid option -${OPTARG}" 1>&2 && usage
;;
esac
done

hash=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 16 | head -n 1)
key_with='lambda line: line.split("|")[1]'
value_with="lambda line: re.sub(r'(.*:[0-5][0-9]\\|)([0-9]+)(\\|.*)', r'\\1\\2${hash}\\3', line)"

if [[ -z ${day_file} ]] || [[ -z ${bootstrap} ]] || [[ -z ${topic} ]]; then
echo "Missing required option" 1>&2 && usage
fi

echo "day_file=${day_file}"
echo "bootstrap=${bootstrap}"
echo "topic=${topic}"
#send to kafka producer
zcat ${day_file} | ./cat_to_kafka.py --bootstrap ${bootstrap} --topic ${topic} --key-with "${key_with}" --value-with "${value_with}" -

32 changes: 32 additions & 0 deletions run_reporter.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash

#setup the dir where the container will find valhalla tiles (tiles.tar)
valhalla_data_dir=/data/valhalla

#pick parallelism
partitions=4

#kill all docker containers
docker rm -f $(docker ps -qa)

#start zookeeper
docker run -d --net opentraffic -p 2181:2181 --name zookeeper wurstmeister/zookeeper:latest

#start kafka brokers
docker run -d --net opentraffic -p 9092:9092 -e "KAFKA_ADVERTISED_HOST_NAME=172.17.0.1" -e "KAFKA_ADVERTISED_PORT=9092" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_CREATE_TOPICS=raw:4:1,formatted:4:1,batched:4:1" -v /var/run/docker.sock:/var/run/docker.sock --name kafka wurstmeister/kafka:latest

#wait for topics to be created
sleep 15

for i in {0..3}; do
target/reporter-kafka -b localhost:9092 -t raw,formatted,batched -f ',sv,\|,1,9,10,0,5,yyyy-MM-dd HH:mm:ss' -u http://localhost:8002/report? -p 2 -q 3600 -i 3600 -s DEBUG -o /home/kdiluca/sandbox/open_traffic/reporter/results &> ${i}.log &
done

#start some traffic segment matchers
#docker run -d --net opentraffic -p 8002 --name reporter-py -e "THREAD_POOL_COUNT=${partitions}" -v ${valhalla_data_dir}:/data/valhalla opentraffic/reporter:latest
THREAD_POOL_COUNT=${partitions} PYTHONPATH=../../valhalla/valhalla/.libs/ py/reporter_service.py ../../conf/valhalla.json localhost:8002

#now load in data with something like this
echo 'cd py'
echo './make_day_requests.sh -f 2017_01_02_sorted.gz -b localhost:9092 -t raw'