diff --git a/py/cat_to_file.py b/py/cat_to_file.py new file mode 100755 index 0000000..1179c0a --- /dev/null +++ b/py/cat_to_file.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +import sys +import os +import argparse +import logging +import json +import re + +#parse a couple of options +parser = argparse.ArgumentParser(description='Generate reporter post body', formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument('file', metavar='F', type=str, nargs=1, help='A file name to be read from, use - for stdin') +parser.add_argument('--key-with', type=str, help='A lambda of the form "lambda line: line.do_something()" such that the program can extract a key from a given line of input') + +args = parser.parse_args() +args.file = args.file[0] + +#Manila Extract BBox + +bb_ymin = 14.501 +bb_xmin = 120.9 +bb_ymax = 14.70 +bb_xmax = 121.13 + +exec('key_with = ' + (args.key_with if args.key_with else 'None')) + +#output a single body +#for each line from stdin +handle = open(args.file, 'r') if args.file != '-' else sys.stdin +out_file=[] +for line in handle: + #try to work on the line as normal + try: + l = line.rstrip() + pieces = l.split('|') + lat=float(pieces[9]) + lon=float(pieces[10]) + if lat >= bb_ymin and lat <= bb_ymax and lon >= bb_xmin and lon <= bb_xmax: + key = bytes(key_with(l)) if key_with else None + value = bytes(l.rstrip()) + out_file.append(value+"/n") + + except Exception as e: + sys.stderr.write(repr(e)) + sys.stderr.write(os.linesep) + +print(str(len(out_file))) + +#done +if args.file != '-': + handle.close() + +with open('2017-01-01', "ab") as f: + f.write(str(out_file)) diff --git a/py/get_data.sh b/py/get_data.sh new file mode 100755 index 0000000..3238084 --- /dev/null +++ b/py/get_data.sh @@ -0,0 +1,46 @@ +#!/bin/bash +trap "kill 0" EXIT +which parallel &> /dev/null +if [ $? != 0 ]; then + echo "parallel is required please install it" + echo "sudo apt-get install parallel" + exit 1 +fi +set -e +function usage { + echo -e "Usage:\n-s s3 bucket url\n-f regex to use with grep to get interesting files\n" 1>&2 + echo "Example: AWS_DEFAULT_PROFILE=opentraffic $0 -s s3://heaps_of_data/2016_11/ -f 2016_11_01.*gz -b 172.17.0.1:9092 -t mytopic" 1>&2 + echo "Note: bucket listing is not recursive" 1>&2 + echo "Note: data is pipe delimited: date|id|x|x|x|x|x|x|x|lat|lon|x|x with date format: %Y-%m-%d %H:%M:%S" 1>&2 + exit 1 +} +while getopts ":s:f:" opt; do + case $opt in + s) s3_dir=$(echo ${OPTARG} | sed -e 's@/\?$@/@g') + ;; + f) file_re="${OPTARG}" + ;; + \?) echo "Invalid option -${OPTARG}" 1>&2 && usage + ;; + esac +done +if [[ -z ${s3_dir} ]] || [[ -z ${file_re} ]]; then + echo "Missing required option" 1>&2 && usage +fi +#get all the list of files we'll be working with +echo "Retrieving file list from s3" +files=$(aws s3 ls ${s3_dir} | awk '{print $4}' | grep -E ${file_re} | tr '\n' ' ') +echo "Processing $(echo ${files} | tr ' ' '\n' | wc -l) files" +echo $files +#rm -f 2017__01_01 +#touch 2017_01_01 +for file in ${files}; do + #download in the foreground + echo "Retrieving ${file} from s3" && aws s3 cp ${s3_dir}${file} . &> /dev/null + #zcat ${file} | sort | awk -F"|" '( $10 >= 14.501 ) && ( $10 <= 14.70 ) && ( $11 >= 120.9 ) && ( $11 <= 121.13 ) { print $0 }' >> 2017-01-01 + zcat ${file} | sort | ./cat_to_file.py --key-with 'lambda line: line.split("|")[1]' - + echo "Finished POST'ing ${file}" && rm -f ${file} +done + +sort 2017-01-01 >> 2017-01-01_sorted +echo "Finished sorting!" diff --git a/py/make_day_requests.sh b/py/make_day_requests.sh new file mode 100755 index 0000000..c9c7d09 --- /dev/null +++ b/py/make_day_requests.sh @@ -0,0 +1,45 @@ +#!/bin/bash +trap "kill 0" EXIT +which parallel &> /dev/null +if [ $? != 0 ]; then + echo "parallel is required please install it" + echo "sudo apt-get install parallel" + exit 1 +fi +set -e + +function usage { + echo -e "Usage:\n-f sorted day file to process\n" 1>&2 + echo "Example: $0 -f 2017_01_02_12_sorted.gz -b localhost:9092 -t raw" 1>&2 + echo "Note: bucket listing is not recursive" 1>&2 + echo "Note: data is pipe delimited: date|id|x|x|x|x|x|x|x|lat|lon|x|x with date format: %Y-%m-%d %H:%M:%S" 1>&2 + exit 1 +} + +while getopts ":s:f:b:t:" opt; do + case $opt in + f) day_file="${OPTARG}" + ;; + b) bootstrap="${OPTARG}" + ;; + t) topic="${OPTARG}" + ;; + \?) echo "Invalid option -${OPTARG}" 1>&2 && usage + ;; + esac +done + +hash=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 16 | head -n 1) +key_with='lambda line: line.split("|")[1]' +value_with="lambda line: re.sub(r'(.*:[0-5][0-9]\\|)([0-9]+)(\\|.*)', r'\\1\\2${hash}\\3', line)" + +if [[ -z ${day_file} ]] || [[ -z ${bootstrap} ]] || [[ -z ${topic} ]]; then + echo "Missing required option" 1>&2 && usage +fi + +echo "day_file=${day_file}" +echo "bootstrap=${bootstrap}" +echo "topic=${topic}" +#send to kafka producer +zcat ${day_file} | ./cat_to_kafka.py --bootstrap ${bootstrap} --topic ${topic} --key-with "${key_with}" --value-with "${value_with}" - + diff --git a/run_reporter.sh b/run_reporter.sh new file mode 100755 index 0000000..2a647ae --- /dev/null +++ b/run_reporter.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +#setup the dir where the container will find valhalla tiles (tiles.tar) +valhalla_data_dir=/data/valhalla + +#pick parallelism +partitions=7 + +#kill all docker containers +docker rm -f $(docker ps -qa) +docker rmi -f $(docker images -q) + +#start zookeeper +docker run -d --net opentraffic -p 2181:2181 --name zookeeper wurstmeister/zookeeper:latest + +#start kafka brokers +docker run -d --net opentraffic -p 9092:9092 -e "KAFKA_ADVERTISED_HOST_NAME=172.17.0.1" -e "KAFKA_ADVERTISED_PORT=9092" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_CREATE_TOPICS=raw:7:1,formatted:7:1,batched:7:1" -v /var/run/docker.sock:/var/run/docker.sock --name kafka wurstmeister/kafka:latest + +#wait for topics to be created +sleep 15 + +for i in {0..6}; do + target/reporter-kafka -b localhost:9092 -t raw,formatted,batched -f ',sv,\|,1,9,10,0,5,yyyy-MM-dd HH:mm:ss' -u http://localhost:8002/report? -p 2 -q 3600 -i 3600 -s DEBUG -o /data/opentraffic/reporter/results &> ${i}.log & +done + +#start some traffic segment matchers +#docker run -d --net opentraffic -p 8002 --name reporter-py -e "THREAD_POOL_COUNT=${partitions}" -v ${valhalla_data_dir}:/data/valhalla opentraffic/reporter:latest +THREAD_POOL_COUNT=7 PYTHONPATH=../../valhalla/valhalla/.libs/ py/reporter_service.py ../../conf/valhalla.json localhost:8002 + +#now load in data with something like this +echo 'cd py' +echo './make_day_requests.sh -f 2017-01-01.gz -b localhost:9092 -t raw' ' +