diff --git a/deployment/.gitignore b/deployment/.gitignore index 9ca1bff..32e2785 100644 --- a/deployment/.gitignore +++ b/deployment/.gitignore @@ -1,2 +1 @@ -config-aws.mk -docker-compose.deploy.yml +config-*.mk diff --git a/deployment/Makefile b/deployment/Makefile index 83cf95a..0034b65 100644 --- a/deployment/Makefile +++ b/deployment/Makefile @@ -1,71 +1,65 @@ -include config-aws.mk # Variables for AWS options +include config-deployment.mk -# The osmesa container -LOCAL_IMG := quay.io/geotrellis/osm-stat-server:latest +# If the user is on master branch, see if we should deploy to production +VERSION_TAG=$(shell ./scripts/get-tag.sh) +ifeq ($(VERSION_TAG), production) + DATABASE=${PRODUCTION_DB} + ECS_CLUSTER=${CLUSTER_NAME_DEPLOYMENT} + TASK_SUFFIX= +else + DATABASE=${STAGING_DB} + ECS_CLUSTER=${CLUSTER_NAME_STAGING} + TASK_SUFFIX=-staging +endif +DB_URI=${DB_BASE_URI}/${DATABASE} +.EXPORT_ALL_VARIABLES: -######### -# AWS # -######### +############################# +# Docker image management # +############################# + +.PHONY: login-aws-registry tag-image push-image + +build-container: + ./build-container.sh login-aws-registry: eval `aws ecr get-login --no-include-email --region ${AWS_REGION}` -tag-image: - docker tag ${LOCAL_IMG} ${ECR_REPO} +tag-image: build-container + docker tag osm_stat_server:${VERSION_TAG} ${ECR_IMAGE}:${VERSION_TAG} push-image: login-aws-registry tag-image - docker push ${ECR_REPO} - -.PHONY: docker-compose.deploy.yml - -docker-compose.deploy.yml: - ./expand.sh docker-compose.deploy.yml.tpl > docker-compose.deploy.yml - -configure-cluster: - ecs-cli configure \ - --cluster ${CLUSTER_NAME} \ - --region ${AWS_REGION} \ - --config-name ${CONFIG_NAME} - -cluster-up: - ecs-cli up \ - --keypair ${KEYPAIR} \ - --instance-role ${INSTANCE_ROLE} \ - --size 1 \ - --instance-type ${INSTANCE_TYPE} \ - --cluster-config ${CONFIG_NAME} \ - --subnets ${SUBNETS} \ - --vpc ${VPC} \ - --force \ - --verbose - -cluster-down: - ecs-cli down --cluster-config ${CONFIG_NAME} + docker push ${ECR_IMAGE}:${VERSION_TAG} -.PHONY: create-service +####################### +# Streaming AWS Tasks # +####################### -create-service: docker-compose.deploy.yml configure-cluster - ecs-cli compose \ - --file $< create \ - --cluster ${CLUSTER_NAME} +.PHONY: create-log-groups define-production-tasks define-staging-tasks stop-stat-server deploy-stat-server -start-service: docker-compose.deploy.yml configure-cluster create-service - ecs-cli compose --file $< service up \ - --deployment-min-healthy-percent 0 \ - --create-log-groups \ - --cluster ${CLUSTER_NAME} +create-log-groups: + ./scripts/create-log-groups.sh -stop-service: - ecs-cli compose down +define-staging-tasks: + ./scripts/define-staging-tasks.sh +define-production-tasks: + ./scripts/define-production-tasks.sh -######### -# ALL # -######### -build-image: - make -C .. build +stop-stat-server: + ./scripts/stop-stat-server.sh -clean: - rm -f docker-compose.deploy.yml +deploy-stat-server: stop-stat-server + aws ecs create-service \ + --cluster "${ECS_CLUSTER}" \ + --service-name "osmesa-stats-server" \ + --task-definition "osmesa-stat-server${TASK_SUFFIX}" \ + --desired-count 1 \ + --launch-type FARGATE \ + --scheduling-strategy REPLICA \ + --network-configuration ${NETWORK_CONFIGURATION} +deploy-stats-refresher: + ./scripts/deploy-stats-refresher.sh diff --git a/deployment/build-container.sh b/deployment/build-container.sh new file mode 100755 index 0000000..368809f --- /dev/null +++ b/deployment/build-container.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "No version tag has been set. Do not run this script directly; instead, issue" + echo " make build-container" + echo "from the 'streaming' directory." + exit 1 +else + echo "Version tag is set to '${VERSION_TAG}'" +fi + +set -xe +SBT_DIR=$(pwd)/.. +JAR_DIR=${SBT_DIR}/target/scala-2.11/ +DOCKER_DIR=$(pwd)/docker + +cd ${SBT_DIR} +./sbt clean assembly +cp ${JAR_DIR}/osm-stat-server.jar ${DOCKER_DIR}/osm-stat-server.jar + +cd ${DOCKER_DIR} +docker build -f Dockerfile --tag osm_stat_server:${VERSION_TAG} . diff --git a/deployment/config-aws.mk.example b/deployment/config-aws.mk.example deleted file mode 100644 index 13de95a..0000000 --- a/deployment/config-aws.mk.example +++ /dev/null @@ -1,28 +0,0 @@ -export CONFIG_NAME := osm-stat-stream-config - -# AWS properties -export CLUSTER_NAME := osm-stat-stream-cluster -export INSTANCE_TYPE := m4.xlarge -export KEYPAIR := [AWS key pair] -export VPC := [VPC ID] -export SUBNETS := [comma-delimited list of subnets within the above VPC] -export SECURITY_GROUP := [comma-delimited list of AWS Security Group IDs] -export ECR_REPO := [AWS ECR repo URI] -export AWS_LOG_GROUP := osm-stats-server -export AWS_REGION := us-east-1 -export INSTANCE_ROLE := [IAM instance role] - -export HOST := 0.0.0.0 -export PORT := 80 - -export DB_DRIVER := org.postgresql.Driver -export DB_URL := [database URI, e.g. jdbc:postgresql://[:]/] -export DB_USER := [database username] -export DB_PASS := [database password] - -export DATABASE_URL := [standard database URL, for psql, e.g. postgresql://[user][:][password]@[:]/] - -export TILE_BUCKET := [S3 bucket] -export TILE_PREFIX := [S3 prefix] -export TILE_SUFFIX := [Tile suffix (typically file extension, including '.')] -export GZIPPED := [Whether to expect pre-gzipped tiles on S3 (true or false)] \ No newline at end of file diff --git a/deployment/config-deployment.mk.template b/deployment/config-deployment.mk.template new file mode 100644 index 0000000..7ca67bc --- /dev/null +++ b/deployment/config-deployment.mk.template @@ -0,0 +1,23 @@ +################################################################################ +# AWS properties +################################################################################ +export KEYPAIR := +export SUBNET := +export AWS_REGION := us-east-1 +export IAM_ACCOUNT := + +################################################################################ +# Streaming resource definitions +################################################################################ +export STREAMING_INSTANCE_TYPE := m4.xlarge +export ECR_IMAGE := +export AWS_LOG_GROUP := osmesa-stats-server +export ECS_SUBNET := ${SUBNET} +export ECS_SECURITY_GROUP := + +export CLUSTER_NAME_DEPLOYMENT := +export CLUSTER_NAME_STAGING := + +export DB_BASE_URI := +export PRODUCTION_DB := +export STAGING_DB := diff --git a/deployment/docker-compose.deploy.yml.tpl b/deployment/docker-compose.deploy.yml.tpl deleted file mode 100644 index ca0e9f9..0000000 --- a/deployment/docker-compose.deploy.yml.tpl +++ /dev/null @@ -1,24 +0,0 @@ -version: '3.0' -services: - stats-server: - image: ${ECR_REPO}:latest - command: java -jar /opt/osm-stat-server.jar - ports: - - ${PORT}:${PORT} - environment: - - HOST=${HOST} - - PORT=${PORT} - - DB_DRIVER=${DB_DRIVER} - - DB_URL=${DB_URL} - - DB_USER=${DB_USER} - - DB_PASS=${DB_PASS} - - TILE_BUCKET=${TILE_BUCKET} - - TILE_PREFIX=${TILE_PREFIX} - - GZIPPED=${GZIPPED} - - DATABASE_URL=${DATABASE_URL} - logging: - driver: awslogs - options: - awslogs-group: ${AWS_LOG_GROUP} - awslogs-region: ${AWS_REGION} - awslogs-stream-prefix: osmesa-stat-server diff --git a/deployment/docker/.gitignore b/deployment/docker/.gitignore new file mode 100644 index 0000000..d392f0e --- /dev/null +++ b/deployment/docker/.gitignore @@ -0,0 +1 @@ +*.jar diff --git a/docker/osm-stat-server/Dockerfile b/deployment/docker/Dockerfile similarity index 95% rename from docker/osm-stat-server/Dockerfile rename to deployment/docker/Dockerfile index 5162723..83fa28d 100644 --- a/docker/osm-stat-server/Dockerfile +++ b/deployment/docker/Dockerfile @@ -9,4 +9,4 @@ RUN \ COPY osm-stat-server.jar /opt/osm-stat-server.jar COPY refresh-views.sh /usr/local/bin/refresh-views.sh -WORKDIR /opt \ No newline at end of file +WORKDIR /opt diff --git a/docker/osm-stat-server/refresh-views.sh b/deployment/docker/refresh-views.sh similarity index 65% rename from docker/osm-stat-server/refresh-views.sh rename to deployment/docker/refresh-views.sh index 461136d..563a7cd 100755 --- a/docker/osm-stat-server/refresh-views.sh +++ b/deployment/docker/refresh-views.sh @@ -1,5 +1,7 @@ #!/usr/bin/env bash +echo "$(date --iso-8601=seconds): Starting view refreshment" + if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'refresh materialized view concurrently user_statistics%' and state='active'" $DATABASE_URL 2> /dev/null)" == "0" ]; then echo "$(date --iso-8601=seconds): Refreshing user statistics" # refresh in the background to return immediately @@ -7,6 +9,8 @@ if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'r -c "REFRESH MATERIALIZED VIEW CONCURRENTLY user_statistics" \ -c "UPDATE refreshments SET updated_at=now() where mat_view='user_statistics'" \ $DATABASE_URL & +else + echo "$(date --iso-8601=seconds): User stats table already refreshing" fi if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'refresh materialized view concurrently hashtag_statistics%' and state='active'" $DATABASE_URL 2> /dev/null)" == "0" ]; then @@ -16,6 +20,8 @@ if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'r -c "REFRESH MATERIALIZED VIEW CONCURRENTLY hashtag_statistics" \ -c "UPDATE refreshments SET updated_at=now() where mat_view='hashtag_statistics'" \ $DATABASE_URL & +else + echo "$(date --iso-8601=seconds): Hashtag stats table already refreshing" fi if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'refresh materialized view concurrently country_statistics%' and state='active'" $DATABASE_URL 2> /dev/null)" == "0" ]; then @@ -25,6 +31,17 @@ if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'r -c "REFRESH MATERIALIZED VIEW CONCURRENTLY country_statistics" \ -c "UPDATE refreshments SET updated_at=now() where mat_view='country_statistics'" \ $DATABASE_URL & +else + echo "$(date --iso-8601=seconds): Country stats table already refreshing" +fi + +if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'refresh materialized view concurrently hashtag_user_statistics%' and state='active'" $DATABASE_URL 2> /dev/null)" == "0" ]; then + # refresh in the background to return immediately + echo "$(date --iso-8601=seconds): Refreshing hashtag/user statistics" + psql -Aqt \ + -c "REFRESH MATERIALIZED VIEW CONCURRENTLY hashtag_user_statistics" \ + -c "UPDATE refreshments SET updated_at=now() where mat_view='hashtag_user_statistics'" \ + $DATABASE_URL & fi wait diff --git a/deployment/ecs-params.yml b/deployment/ecs-params.yml deleted file mode 100644 index e00349b..0000000 --- a/deployment/ecs-params.yml +++ /dev/null @@ -1,8 +0,0 @@ -# this file should be in deployment dir (relative to Makefile path) -# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html -# NOTE: comment it out for the test case -version: 1 -task_definition: - services: - stats-server: - mem_reservation: 1024m diff --git a/deployment/expand.sh b/deployment/expand.sh deleted file mode 100755 index 49ef41e..0000000 --- a/deployment/expand.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/sh - -set -e - -PROG=$(basename $0) - -usage() -{ - echo "${PROG} " -} - -expand() -{ - local template="$(cat $1)" - eval "echo \"${template}\"" -} - -case $# in - 1) expand "$1";; - *) usage; exit 0;; -esac diff --git a/deployment/scripts/create-log-groups.sh b/deployment/scripts/create-log-groups.sh new file mode 100755 index 0000000..2debcd9 --- /dev/null +++ b/deployment/scripts/create-log-groups.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +DEFINED_GROUPS=$(aws logs describe-log-groups | jq '.logGroups[].logGroupName' | sed -e 's/"//g') + +if [[ $DEFINED_GROUPS != *"/ecs/${AWS_LOG_GROUP}"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP} +fi + +if [[ $DEFINED_GROUPS != *"/ecs/${AWS_LOG_GROUP}-staging"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP}-staging +fi diff --git a/deployment/scripts/define-production-tasks.sh b/deployment/scripts/define-production-tasks.sh new file mode 100755 index 0000000..7e6a93f --- /dev/null +++ b/deployment/scripts/define-production-tasks.sh @@ -0,0 +1,107 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family osmesa-stat-server \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "0.5 vCPU" \ + --memory "1 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"java\", + \"-jar\", \"/opt/osm-stat-server.jar\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${PRODUCTION_DB}\" + }, + { + \"name\": \"DB_DRIVER\", + \"value\": \"${DB_DRIVER}\" + }, + { + \"name\": \"DB_URL\", + \"value\": \"${DB_JDBC_BASE_URL}/${PRODUCTION_DB}\" + }, + { + \"name\": \"DB_USER\", + \"value\": \"${DB_USER}\" + }, + { + \"name\": \"DB_PASS\", + \"value\": \"${DB_PASS}\" + }, + { + \"name\": \"GZIPPED\", + \"value\": \"true\" + }, + { + \"name\": \"HOST\", + \"value\": \"0.0.0.0\" + }, + { + \"name\": \"PORT\", + \"value\": \"80\" + }, + { + \"name\": \"TILE_BUCKET\", + \"value\": \"${TILE_BUCKET}\" + }, + { + \"name\": \"TILE_PREFIX\", + \"value\": \"${TILE_PREFIX}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"osmesa-stat-server\" + } + ]" + +aws ecs register-task-definition \ + --family osmesa-stats-view-refresher \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "0.25 vCPU" \ + --memory "0.5 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"refresh-views.sh\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"stats-view-refresher\" + } + ]" diff --git a/deployment/scripts/define-staging-tasks.sh b/deployment/scripts/define-staging-tasks.sh new file mode 100755 index 0000000..c2d8ff6 --- /dev/null +++ b/deployment/scripts/define-staging-tasks.sh @@ -0,0 +1,107 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family osmesa-stat-server-staging \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "0.5 vCPU" \ + --memory "1 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}-staging\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"java\", + \"-jar\", \"/opt/osm-stat-server.jar\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${STAGING_DB}\" + }, + { + \"name\": \"DB_DRIVER\", + \"value\": \"${DB_DRIVER}\" + }, + { + \"name\": \"DB_URL\", + \"value\": \"${DB_JDBC_BASE_URL}/${STAGING_DB}\" + }, + { + \"name\": \"DB_USER\", + \"value\": \"${DB_USER}\" + }, + { + \"name\": \"DB_PASS\", + \"value\": \"${DB_PASS}\" + }, + { + \"name\": \"GZIPPED\", + \"value\": \"true\" + }, + { + \"name\": \"HOST\", + \"value\": \"0.0.0.0\" + }, + { + \"name\": \"PORT\", + \"value\": \"80\" + }, + { + \"name\": \"TILE_BUCKET\", + \"value\": \"${TILE_BUCKET}\" + }, + { + \"name\": \"TILE_PREFIX\", + \"value\": \"${TILE_PREFIX}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"osmesa-stat-server-staging\" + } + ]" + +aws ecs register-task-definition \ + --family osmesa-stats-view-refresher-staging \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "0.25 vCPU" \ + --memory "0.5 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}-staging\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"refresh-views.sh\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${STAGING_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"stats-view-refresher-staging\" + } + ]" diff --git a/deployment/scripts/deploy-stats-refresher.sh b/deployment/scripts/deploy-stats-refresher.sh new file mode 100755 index 0000000..571f7be --- /dev/null +++ b/deployment/scripts/deploy-stats-refresher.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws events put-rule --schedule-expression "rate(1 minute)" --name osmesa-stats-view-refresher${TASK_SUFFIX} +aws events put-targets \ + --rule "osmesa-stats-view-refresher${TASK_SUFFIX}" \ + --targets "[ + { + \"Id\": \"osmesa-stats-view-refresher${TASK_SUFFIX}\", + \"Arn\": \"arn:aws:ecs:${AWS_REGION}:${IAM_ACCOUNT}:cluster/${ECS_CLUSTER}\", + \"RoleArn\": \"arn:aws:iam::${IAM_ACCOUNT}:role/ecsEventsRole\", + \"EcsParameters\": { + \"TaskDefinitionArn\": \"arn:aws:ecs:${AWS_REGION}:${IAM_ACCOUNT}:task-definition/osmesa-stats-view-refresher${TASK_SUFFIX}\", + \"TaskCount\": 1, + \"LaunchType\": \"FARGATE\", + \"NetworkConfiguration\": { + \"awsvpcConfiguration\": { + \"Subnets\": [\"${ECS_SUBNET}\"], + \"SecurityGroups\": [\"${ECS_SECURITY_GROUP}\"], + \"AssignPublicIp\": \"DISABLED\" + } + } + } + } + ]" diff --git a/deployment/scripts/get-tag.sh b/deployment/scripts/get-tag.sh new file mode 100755 index 0000000..388bc0d --- /dev/null +++ b/deployment/scripts/get-tag.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ "$(git branch | grep '* master')" = "* master" ]; then + while true; do + echo "You are on the master branch. Do you wish to publish to the production tag?" + select yn in "Yes" "No"; do + case $yn in + Yes ) VERSION_TAG="production"; break;; + No ) VERSION_TAG="latest"; break;; + esac + done + done +else + VERSION_TAG="latest" +fi + +echo "${VERSION_TAG}" diff --git a/deployment/scripts/stop-stat-server.sh b/deployment/scripts/stop-stat-server.sh new file mode 100755 index 0000000..23a9e41 --- /dev/null +++ b/deployment/scripts/stop-stat-server.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +check_status() { + STATUS=$(aws ecs describe-services --services osmesa-stats-server --cluster $ECS_CLUSTER | jq '.services[].status') +} + +check_status +if [[ $STATUS == "\"ACTIVE\"" ]]; then + aws ecs delete-service --service osmesa-stats-server --cluster $ECS_CLUSTER --force + echo "Waiting for service to shut down" + check_status + while [[ $STATUS != "\"INACTIVE\"" ]]; do + echo " current status: $STATUS, still waiting" + sleep 15s + check_status + done + echo " final status: $STATUS" +else + echo "Status was $STATUS, nothing to stop" +fi diff --git a/sql/country_statistics.sql b/sql/country_statistics.sql index 5b76620..cc10747 100644 --- a/sql/country_statistics.sql +++ b/sql/country_statistics.sql @@ -1,119 +1,143 @@ +DROP MATERIALIZED VIEW IF EXISTS country_statistics; CREATE MATERIALIZED VIEW country_statistics AS - WITH country_counts AS ( - -- Collect country-related changesets with edit counts and associate with hashtags - SELECT cc.changeset_id, - countries.id, - countries.code, - countries.name AS country_name, - cc.edit_count, - hts.hashtag_id - FROM ((changesets_countries cc - JOIN countries ON ((cc.country_id = countries.id))) - FULL OUTER JOIN changesets_hashtags hts ON (hts.changeset_id = cc.changeset_id)) - ), user_edits AS ( - -- Associate user ids with changesets/edit count - SELECT c_chg.country_id, - c_chg.edit_count, - c.user_id - FROM (changesets_countries c_chg - JOIN changesets c ON (c.id = c_chg.changeset_id)) - ), country_edits AS ( - -- Aggregate edit counts per user (ignore UID 0) - SELECT country_id, - user_id, - sum(edit_count) AS edits - FROM user_edits - WHERE user_id <> 0 and user_id <> 1 - GROUP BY country_id, user_id - ), grouped_user_edits AS ( - -- Rank user edit totals per country - SELECT *, - ROW_NUMBER() OVER (PARTITION BY country_id ORDER BY edits DESC) as rank - FROM country_edits - ), json_country_edits AS ( - -- Collapse top ten user edit totals into JSON object - SELECT country_id, - json_agg(json_build_object('user', user_id, 'count', edits)) AS edits - FROM grouped_user_edits - WHERE rank <= 10 - GROUP BY country_id - ), ht_edits AS ( - -- Associate hashtags with aggregate edit counts per country - SELECT cc.id as country_id, - hts.hashtag, - sum(edit_count) as edit_count - FROM (country_counts cc - JOIN hashtags hts ON cc.hashtag_id = hts.id) - GROUP BY cc.id, hts.hashtag - ), grouped_hts AS ( - -- Rank edit counts per country - SELECT *, - ROW_NUMBER() OVER (PARTITION BY country_id ORDER BY edit_count DESC) as rank - FROM ht_edits - ), ht_json AS ( - -- Collapse top ten most active hashtags per country into JSON object - SELECT country_id, - json_agg(json_build_object('hashtag', hashtag, 'count', edit_count)) as hashtag_edits - FROM grouped_hts - WHERE rank <= 10 - GROUP BY country_id - ), excluded_changesets AS ( - SELECT * FROM changesets where user_id <> 0 and user_id <> 1 - ), agg_stats AS ( - -- Aggregate statistics per country - SELECT cc.id as country_id, - sum(chg.road_km_added) AS road_km_added, - sum(chg.road_km_modified + chg.road_km_deleted) AS road_km_modified, - sum(chg.waterway_km_added) AS waterway_km_added, - sum(chg.waterway_km_modified + chg.waterway_km_deleted) AS waterway_km_modified, - sum(chg.coastline_km_added) AS coastline_km_added, - sum(chg.coastline_km_modified + chg.coastline_km_deleted) AS coastline_km_modified, - sum(chg.roads_added) AS roads_added, - sum(chg.roads_modified + chg.roads_deleted) AS roads_modified, - sum(chg.waterways_added) AS waterways_added, - sum(chg.waterways_modified + chg.waterways_deleted) AS waterways_modified, - sum(chg.coastlines_added) AS coastlines_added, - sum(chg.coastlines_modified + chg.coastlines_deleted) AS coastlines_modified, - sum(chg.buildings_added) AS buildings_added, - sum(chg.buildings_modified + chg.buildings_deleted) AS buildings_modified, - sum(chg.pois_added) AS pois_added, - sum(chg.pois_modified + chg.pois_deleted) AS pois_modified, - max(coalesce(chg.closed_at, chg.created_at)) AS last_edit, - max(COALESCE(chg.closed_at, chg.created_at, chg.updated_at)) AS updated_at, - count(*) AS changeset_count, - sum(cc.edit_count) AS edit_count - FROM (excluded_changesets chg - JOIN country_counts cc ON ((cc.changeset_id = chg.id))) - GROUP BY cc.id - ) - SELECT agg.country_id, - countries.name AS country_name, - countries.code AS country_code, - agg.road_km_added, - agg.road_km_modified, - agg.waterway_km_added, - agg.waterway_km_modified, - agg.coastline_km_added, - agg.coastline_km_modified, - agg.roads_added, - agg.roads_modified, - agg.waterways_added, - agg.waterways_modified, - agg.coastlines_added, - agg.coastlines_modified, - agg.buildings_added, - agg.buildings_modified, - agg.pois_added, - agg.pois_modified, - agg.last_edit, - agg.updated_at, - agg.changeset_count, - agg.edit_count, - jce.edits AS user_edit_counts, - hts.hashtag_edits - FROM (agg_stats agg - FULL OUTER JOIN json_country_edits jce ON (agg.country_id = jce.country_id) - FULL OUTER JOIN ht_json hts ON (agg.country_id = hts.country_id) - JOIN countries ON agg.country_id = countries.id); + WITH changesets AS ( + SELECT + * + FROM changesets + -- ignore users 0 and 1 + WHERE user_id > 1 + ), + general AS ( + SELECT + country_id, + max(coalesce(closed_at, created_at)) last_edit, + count(*) changeset_count, + sum(coalesce(edit_count, 0)) edit_count, + max(updated_at) updated_at + FROM changesets + JOIN changesets_countries ON changesets.id = changesets_countries.changeset_id + GROUP BY country_id + ), + processed_changesets AS ( + SELECT + id, + user_id, + country_id, + measurements, + counts, + edit_count + FROM changesets + JOIN changesets_countries ON changesets.id = changesets_countries.changeset_id + ), + hashtag_counts AS ( + SELECT + RANK() OVER (PARTITION BY country_id ORDER BY sum(coalesce(edit_count, 0)) DESC) AS rank, + country_id, + hashtag, + count(*) changesets, + sum(coalesce(edit_count, 0)) edits + FROM processed_changesets + JOIN changesets_hashtags ON processed_changesets.id = changesets_hashtags.changeset_id + JOIN hashtags ON changesets_hashtags.hashtag_id = hashtags.id + GROUP BY country_id, hashtag + ), + hashtags AS ( + SELECT + country_id, + jsonb_object_agg(hashtag, changesets) hashtag_changesets, + jsonb_object_agg(hashtag, edits) hashtag_edits + FROM hashtag_counts + WHERE rank <= 10 + GROUP BY country_id + ), + user_counts AS ( + SELECT + RANK() OVER (PARTITION BY country_id ORDER BY sum(coalesce(edit_count, 0)) DESC) AS rank, + country_id, + user_id, + count(*) changesets, + sum(coalesce(edit_count, 0)) edits + FROM processed_changesets + GROUP BY country_id, user_id + ), + users AS ( + SELECT + country_id, + jsonb_object_agg(user_id, changesets) user_changesets, + jsonb_object_agg(user_id, edits) user_edits + FROM user_counts + WHERE rank <= 10 + GROUP BY country_id + ), + measurements AS ( + SELECT + id, + country_id, + key, + value + FROM processed_changesets + CROSS JOIN LATERAL jsonb_each(measurements) + ), + aggregated_measurements_kv AS ( + SELECT + country_id, + key, + sum((value->>0)::numeric) AS value + FROM measurements + GROUP BY country_id, key + ), + aggregated_measurements AS ( + SELECT + country_id, + jsonb_object_agg(key, value) measurements + FROM aggregated_measurements_kv + GROUP BY country_id + ), + counts AS ( + SELECT + id, + country_id, + key, + value + FROM processed_changesets + CROSS JOIN LATERAL jsonb_each(counts) + ), + aggregated_counts_kv AS ( + SELECT + country_id, + key, + sum((value->>0)::numeric) AS value + FROM counts + GROUP BY country_id, key + ), + aggregated_counts AS ( + SELECT + country_id, + jsonb_object_agg(key, value) counts + FROM aggregated_counts_kv + GROUP BY country_id + ) + SELECT + general.country_id, + countries.name country_name, + countries.code country_code, + -- NOTE these are per-changeset, not per-country, so stats are double-counted + measurements, + -- NOTE these are per-changeset, not per-country, so stats are double-counted + counts, + general.changeset_count, + general.edit_count, + general.last_edit, + general.updated_at, + user_changesets, + user_edits, + hashtag_changesets, + hashtag_edits + FROM general + JOIN countries ON country_id = countries.id + LEFT OUTER JOIN users USING (country_id) + LEFT OUTER JOIN hashtags USING (country_id) + LEFT OUTER JOIN aggregated_measurements USING (country_id) + LEFT OUTER JOIN aggregated_counts USING (country_id); -CREATE UNIQUE INDEX country_statistics_id ON country_statistics(country_code); +CREATE UNIQUE INDEX IF NOT EXISTS country_statistics_id ON country_statistics(country_code); diff --git a/sql/hashtag_statistics.sql b/sql/hashtag_statistics.sql index 1ae7554..fbab58e 100644 --- a/sql/hashtag_statistics.sql +++ b/sql/hashtag_statistics.sql @@ -1,161 +1,109 @@ +DROP MATERIALIZED VIEW IF EXISTS hashtag_statistics; CREATE MATERIALIZED VIEW hashtag_statistics AS - WITH hashtag_join AS ( - SELECT chg.id, - chg.road_km_added, - chg.road_km_modified, - chg.road_km_deleted, - chg.waterway_km_added, - chg.waterway_km_modified, - chg.waterway_km_deleted, - chg.coastline_km_added, - chg.coastline_km_modified, - chg.coastline_km_deleted, - chg.roads_added, - chg.roads_modified, - chg.roads_deleted, - chg.waterways_added, - chg.waterways_modified, - chg.waterways_deleted, - chg.coastlines_added, - chg.coastlines_modified, - chg.coastlines_deleted, - chg.buildings_added, - chg.buildings_modified, - chg.buildings_deleted, - chg.pois_added, - chg.pois_modified, - chg.pois_deleted, - (chg.roads_added + chg.roads_modified + chg.roads_deleted + chg.waterways_added + chg.waterways_modified + chg.waterways_deleted + chg.coastlines_added + chg.coastlines_modified + chg.coastlines_deleted + chg.buildings_added + chg.buildings_modified + chg.buildings_deleted + chg.pois_added + chg.pois_modified + chg.pois_deleted) as edit_count, - chg.editor, - chg.user_id, - chg.created_at, - chg.closed_at, - chg.augmented_diffs, - chg.updated_at, - ch.hashtag_id - FROM (changesets chg - JOIN changesets_hashtags ch ON ((ch.changeset_id = chg.id))) - ), tag_usr_counts AS ( - SELECT hj.hashtag_id, - array_agg(DISTINCT users.name) AS names, - users.id AS uid - FROM (users - JOIN hashtag_join hj ON ((hj.user_id = users.id))) - WHERE users.id <> 0 and users.id <> 1 - GROUP BY hj.hashtag_id, users.id - ), named_usr_counts AS ( - SELECT *, unnest(names) as name - FROM tag_usr_counts - ), hashtag_usr_counts AS ( - SELECT hj.hashtag_id, - users.uid AS uid, - array_agg(DISTINCT users.name) AS names, - sum(hj.road_km_added) as road_km_added, - sum(hj.road_km_modified) as road_km_modified, - sum(hj.road_km_deleted) as road_km_deleted, - sum(hj.waterway_km_added) as waterway_km_added, - sum(hj.waterway_km_modified) as waterway_km_modified, - sum(hj.waterway_km_deleted) as waterway_km_deleted, - sum(hj.coastline_km_added) as coastline_km_added, - sum(hj.coastline_km_modified) as coastline_km_modified, - sum(hj.coastline_km_deleted) as coastline_km_deleted, - sum(hj.roads_added) as roads_added, - sum(hj.roads_modified) as roads_modified, - sum(hj.roads_deleted) as roads_deleted, - sum(hj.waterways_added) as waterways_added, - sum(hj.waterways_modified) as waterways_modified, - sum(hj.waterways_deleted) as waterways_deleted, - sum(hj.coastlines_added) as coastlines_added, - sum(hj.coastlines_modified) as coastlines_modified, - sum(hj.coastlines_deleted) as coastlines_deleted, - sum(hj.buildings_added) as buildings_added, - sum(hj.buildings_modified) as buildings_modified, - sum(hj.buildings_deleted) as buildings_deleted, - sum(hj.pois_added) as pois_added, - sum(hj.pois_modified) as pois_modified, - sum(hj.pois_deleted) as pois_deleted, - sum(hj.edit_count) AS edit_count, - count(*) AS changeset_count - FROM (named_usr_counts users - JOIN hashtag_join hj ON ((hj.user_id = users.uid AND hj.hashtag_id = users.hashtag_id))) - GROUP BY hj.hashtag_id, users.uid - ), usr_json_agg AS ( - SELECT usr_counts.hashtag_id, - json_agg(json_build_object('name', usr_counts.names[1], - 'uid', usr_counts.uid, - 'km_roads_add', usr_counts.road_km_added, - 'km_roads_mod', usr_counts.road_km_modified, - 'km_roads_del', usr_counts.road_km_deleted, - 'km_waterways_add', usr_counts.waterway_km_added, - 'km_waterways_mod', usr_counts.waterway_km_modified, - 'km_waterways_del', usr_counts.waterway_km_deleted, - 'km_coastlines_add', usr_counts.coastline_km_added, - 'km_coastlines_mod', usr_counts.coastline_km_modified, - 'km_coastlines_del', usr_counts.coastline_km_deleted, - 'roads_add', usr_counts.roads_added, - 'roads_mod', usr_counts.roads_modified, - 'roads_del', usr_counts.roads_deleted, - 'waterways_add', usr_counts.waterways_added, - 'waterways_mod', usr_counts.waterways_modified, - 'waterways_del', usr_counts.waterways_deleted, - 'coastlines_add', usr_counts.coastlines_added, - 'coastlines_mod', usr_counts.coastlines_modified, - 'coastlines_del', usr_counts.coastlines_deleted, - 'buildings_add', usr_counts.buildings_added, - 'buildings_mod', usr_counts.buildings_modified, - 'buildings_del', usr_counts.buildings_deleted, - 'poi_add', usr_counts.pois_added, - 'poi_mod', usr_counts.pois_modified, - 'poi_del', usr_counts.pois_deleted, - 'changeset_count', usr_counts.changeset_count, - 'edit_count', usr_counts.edit_count)) AS users - FROM hashtag_usr_counts usr_counts - GROUP BY usr_counts.hashtag_id - ), without_json AS ( - SELECT ht.hashtag AS tag, - ht.id AS hashtag_id, - (('hashtag/'::text || ht.hashtag) || '/{z}/{x}/{y}.mvt'::text) AS extent_uri, - sum(hashtag_join.buildings_added) AS buildings_added, - sum(hashtag_join.buildings_modified + hashtag_join.buildings_deleted) AS buildings_modified, - sum(hashtag_join.roads_added) AS roads_added, - sum(hashtag_join.road_km_added) AS road_km_added, - sum(hashtag_join.roads_modified + hashtag_join.roads_deleted) AS roads_modified, - sum(hashtag_join.road_km_modified + hashtag_join.road_km_deleted) AS road_km_modified, - sum(hashtag_join.waterways_added) AS waterways_added, - sum(hashtag_join.waterway_km_added) AS waterway_km_added, - sum(hashtag_join.waterways_modified + hashtag_join.waterways_deleted) AS waterways_modified, - sum(hashtag_join.waterway_km_modified + hashtag_join.waterway_km_deleted) AS waterway_km_modified, - sum(hashtag_join.coastlines_added) AS coastlines_added, - sum(hashtag_join.coastline_km_added) AS coastline_km_added, - sum(hashtag_join.coastlines_modified + hashtag_join.coastlines_deleted) AS coastlines_modified, - sum(hashtag_join.coastline_km_modified + hashtag_join.coastline_km_deleted) AS coastline_km_modified, - sum(hashtag_join.pois_added) AS pois_added, - sum(hashtag_join.pois_modified + hashtag_join.pois_deleted) AS pois_modified - FROM (hashtags ht - JOIN hashtag_join ON ((ht.id = hashtag_join.hashtag_id))) - GROUP BY ht.id, ht.hashtag - ) - SELECT without_json.tag, - without_json.hashtag_id, - without_json.extent_uri, - without_json.buildings_added, - without_json.buildings_modified, - without_json.roads_added, - without_json.road_km_added, - without_json.roads_modified, - without_json.road_km_modified, - without_json.waterways_added, - without_json.waterway_km_added, - without_json.waterways_modified, - without_json.waterway_km_modified, - without_json.coastlines_added, - without_json.coastline_km_added, - without_json.coastlines_modified, - without_json.coastline_km_modified, - without_json.pois_added, - without_json.pois_modified, - usr_json_agg.users - FROM (without_json - JOIN usr_json_agg ON ((without_json.hashtag_id = usr_json_agg.hashtag_id))); + WITH general AS ( + SELECT + hashtag_id, + max(coalesce(closed_at, created_at)) last_edit, + count(*) changeset_count, + sum(coalesce(total_edits, 0)) edit_count, + max(updated_at) updated_at + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + GROUP BY hashtag_id + ), + processed_changesets AS ( + SELECT + id, + user_id, + hashtag_id, + measurements, + counts, + total_edits + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + ), + user_counts AS ( + SELECT + RANK() OVER (PARTITION BY hashtag_id ORDER BY sum(coalesce(total_edits, 0)) DESC) AS rank, + hashtag_id, + user_id, + count(*) changesets, + sum(coalesce(total_edits, 0)) edit_count + FROM processed_changesets + GROUP BY hashtag_id, user_id + ), + users AS ( + SELECT + hashtag_id, + jsonb_object_agg(user_id, changesets) user_changesets, + jsonb_object_agg(user_id, edit_count) user_edits + FROM user_counts + WHERE rank <= 10 + GROUP BY hashtag_id + ), + measurements AS ( + SELECT + id, + hashtag_id, + key, + value + FROM processed_changesets + CROSS JOIN LATERAL jsonb_each(measurements) + ), + aggregated_measurements_kv AS ( + SELECT + hashtag_id, + key, + sum((value->>0)::numeric) AS value + FROM measurements + GROUP BY hashtag_id, key + ), + aggregated_measurements AS ( + SELECT + hashtag_id, + jsonb_object_agg(key, value) measurements + FROM aggregated_measurements_kv + GROUP BY hashtag_id + ), + counts AS ( + SELECT + id, + hashtag_id, + key, + value + FROM processed_changesets + CROSS JOIN LATERAL jsonb_each(counts) + ), + aggregated_counts_kv AS ( + SELECT + hashtag_id, + key, + sum((value->>0)::numeric) AS value + FROM counts + GROUP BY hashtag_id, key + ), + aggregated_counts AS ( + SELECT + hashtag_id, + jsonb_object_agg(key, value) counts + FROM aggregated_counts_kv + GROUP BY hashtag_id + ) + SELECT + hashtags.hashtag tag, + general.hashtag_id, + measurements, + counts, + general.changeset_count, + general.edit_count, + general.last_edit, + general.updated_at, + user_changesets, + user_edits + FROM general + JOIN hashtags ON hashtag_id = hashtags.id + LEFT OUTER JOIN users USING (hashtag_id) + LEFT OUTER JOIN aggregated_measurements USING (hashtag_id) + LEFT OUTER JOIN aggregated_counts USING (hashtag_id); -CREATE UNIQUE INDEX hashtag_statistics_hashtag_id ON hashtag_statistics(hashtag_id); +CREATE UNIQUE INDEX IF NOT EXISTS hashtag_statistics_hashtag_id ON hashtag_statistics(hashtag_id); \ No newline at end of file diff --git a/sql/hashtag_user_statistics.sql b/sql/hashtag_user_statistics.sql new file mode 100644 index 0000000..e5d309f --- /dev/null +++ b/sql/hashtag_user_statistics.sql @@ -0,0 +1,89 @@ +DROP MATERIALIZED VIEW IF EXISTS hashtag_user_statistics; +CREATE MATERIALIZED VIEW hashtag_user_statistics AS + WITH general AS ( + SELECT + user_id, + hashtag_id, + array_agg(id) changesets, + max(coalesce(closed_at, created_at)) last_edit, + count(*) changeset_count, + sum(coalesce(total_edits, 0)) edit_count, + max(updated_at) updated_at + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + GROUP BY user_id, hashtag_id + ), + measurements AS ( + SELECT + id, + user_id, + hashtag_id, + key, + value + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + CROSS JOIN LATERAL jsonb_each(measurements) + ), + aggregated_measurements_kv AS ( + SELECT + user_id, + hashtag_id, + key, + sum((value->>0)::numeric) AS value + FROM measurements + GROUP BY user_id, hashtag_id, key + ), + aggregated_measurements AS ( + SELECT + user_id, + hashtag_id, + jsonb_object_agg(key, value) measurements + FROM aggregated_measurements_kv + GROUP BY user_id, hashtag_id + ), + counts AS ( + SELECT + id, + user_id, + hashtag_id, + key, + value + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + CROSS JOIN LATERAL jsonb_each(counts) + ), + aggregated_counts_kv AS ( + SELECT + user_id, + hashtag_id, + key, + sum((value->>0)::numeric) AS value + FROM counts + GROUP BY user_id, hashtag_id, key + ), + aggregated_counts AS ( + SELECT + user_id, + hashtag_id, + jsonb_object_agg(key, value) counts + FROM aggregated_counts_kv + GROUP BY user_id, hashtag_id + ) + SELECT + user_id, + users.name, + general.hashtag_id, + hashtags.hashtag, + measurements, + counts, + last_edit, + changeset_count, + edit_count, + updated_at + FROM general + LEFT OUTER JOIN hashtags ON general.hashtag_id = hashtags.id + LEFT OUTER JOIN aggregated_measurements USING (user_id, hashtag_id) + LEFT OUTER JOIN aggregated_counts USING (user_id, hashtag_id) + JOIN users ON user_id = users.id; + +CREATE UNIQUE INDEX IF NOT EXISTS hashtag_user_statistics_pk ON hashtag_user_statistics(hashtag_id, user_id); \ No newline at end of file diff --git a/sql/refreshments.sql b/sql/refreshments.sql new file mode 100644 index 0000000..1d827e1 --- /dev/null +++ b/sql/refreshments.sql @@ -0,0 +1,7 @@ +CREATE TABLE refreshments ( + mat_view text NOT NULL, + updated_at timestamp with time zone, + PRIMARY KEY(mat_view) +); + +INSERT INTO refreshments VALUES ('user_statistics', to_timestamp(0)), ('country_statistics', to_timestamp(0)), ('hashtag_statistics', to_timestamp(0)), ('hashtag_user_statistics', to_timestamp(0)); \ No newline at end of file diff --git a/sql/user_statistics.sql b/sql/user_statistics.sql index 2f21017..a14ae34 100644 --- a/sql/user_statistics.sql +++ b/sql/user_statistics.sql @@ -1,146 +1,166 @@ +DROP MATERIALIZED VIEW IF EXISTS user_statistics; CREATE MATERIALIZED VIEW user_statistics AS - WITH country_counts AS ( - SELECT cc.changeset_id, - countries.name, - cc.edit_count - FROM (changesets_countries cc - JOIN countries ON ((cc.country_id = countries.id))) - ), chgset_country_counts AS ( - SELECT chg.user_id, - country_counts.name, - sum(country_counts.edit_count) AS edit_count, - count(*) AS changeset_count - FROM (country_counts - JOIN changesets chg ON ((country_counts.changeset_id = chg.id))) - GROUP BY chg.user_id, country_counts.name - ), usr_country_counts AS ( - SELECT chgset_country_counts.user_id, - json_agg(json_build_object('name', chgset_country_counts.name, - 'edit_count', chgset_country_counts.edit_count, - 'changeset_count', chgset_country_counts.changeset_count)) AS country_json - FROM chgset_country_counts - GROUP BY chgset_country_counts.user_id - ), day_counts AS ( - SELECT chg.user_id, - to_char(date_trunc('day'::text, chg.created_at), 'YYYY-MM-DD'::text) AS day, - count(*) AS cnt - FROM changesets chg - WHERE (chg.created_at IS NOT NULL) - GROUP BY chg.user_id, (date_trunc('day'::text, chg.created_at)) - ), usr_day_counts AS ( - SELECT day_counts.user_id, - json_agg(json_build_object('day', day_counts.day, 'count', day_counts.cnt)) AS day_json - FROM day_counts - GROUP BY day_counts.user_id - ), editor_counts AS ( - SELECT chg.user_id, - chg.editor, - count(*) AS cnt - FROM changesets chg - WHERE (chg.editor IS NOT NULL) - GROUP BY chg.user_id, chg.editor - ), usr_editor_counts AS ( - SELECT editor_counts.user_id, - json_agg(json_build_object('editor', editor_counts.editor, 'count', editor_counts.cnt)) AS editor_json - FROM editor_counts - GROUP BY editor_counts.user_id - ), hashtag_counts AS ( - SELECT ch.changeset_id, - hashtags.hashtag, - count(*) AS edit_count - FROM (changesets_hashtags ch - JOIN hashtags ON ((ch.hashtag_id = hashtags.id))) - GROUP BY ch.changeset_id, hashtags.hashtag - ), chgset_ht_counts AS ( - SELECT chg.user_id, - hashtag_counts.hashtag, - count(*) AS cnt - FROM (changesets chg - JOIN hashtag_counts ON ((chg.id = hashtag_counts.changeset_id))) - GROUP BY chg.user_id, hashtag_counts.hashtag - ), usr_hashtag_counts AS ( - SELECT chgset_ht_counts.user_id, - json_agg(json_build_object('tag', chgset_ht_counts.hashtag, 'count', chgset_ht_counts.cnt)) AS hashtag_json - FROM chgset_ht_counts - GROUP BY chgset_ht_counts.user_id - ), agg_stats AS ( - SELECT usr.id, - usr.name, - (('user/'::text || usr.id) || '/{z}/{x}/{y}.mvt'::text) AS extent_uri, - array_agg(chg.id) AS changesets, - sum(chg.road_km_added) AS road_km_added, - sum(chg.road_km_modified) AS road_km_modified, - sum(chg.road_km_deleted) AS road_km_deleted, - sum(chg.waterway_km_added) AS waterway_km_added, - sum(chg.waterway_km_modified) AS waterway_km_modified, - sum(chg.waterway_km_deleted) AS waterway_km_deleted, - sum(chg.coastline_km_added) AS coastline_km_added, - sum(chg.coastline_km_modified) AS coastline_km_modified, - sum(chg.coastline_km_deleted) AS coastline_km_deleted, - sum(chg.roads_added) AS roads_added, - sum(chg.roads_modified) AS roads_modified, - sum(chg.roads_deleted) AS roads_deleted, - sum(chg.waterways_added) AS waterways_added, - sum(chg.waterways_modified) AS waterways_modified, - sum(chg.waterways_deleted) AS waterways_deleted, - sum(chg.coastlines_added) AS coastlines_added, - sum(chg.coastlines_modified) AS coastlines_modified, - sum(chg.coastlines_deleted) AS coastlines_deleted, - sum(chg.buildings_added) AS buildings_added, - sum(chg.buildings_modified) AS buildings_modified, - sum(chg.buildings_deleted) AS buildings_deleted, - sum(chg.pois_added) AS pois_added, - sum(chg.pois_modified) AS pois_modified, - sum(chg.pois_deleted) AS pois_deleted, - max(coalesce(chg.closed_at, chg.created_at)) AS last_edit, - count(*) AS changeset_count, - sum(chg.roads_added + chg.roads_modified + chg.roads_deleted + chg.waterways_added + chg.waterways_modified + chg.waterways_deleted + chg.coastlines_added + chg.coastlines_modified + chg.coastlines_deleted + chg.buildings_added + chg.buildings_modified + chg.buildings_deleted + chg.pois_added + chg.pois_modified + chg.pois_deleted) as edit_count, - max(COALESCE(chg.closed_at, chg.created_at, chg.updated_at)) AS updated_at - FROM (changesets chg - JOIN users usr ON ((chg.user_id = usr.id))) - WHERE (chg.user_id IS NOT NULL) - GROUP BY usr.id, usr.name - ) - SELECT agg_stats.id, - agg_stats.name, - agg_stats.extent_uri, - agg_stats.changesets, - agg_stats.road_km_added, - agg_stats.road_km_modified, - agg_stats.road_km_deleted, - agg_stats.waterway_km_added, - agg_stats.waterway_km_modified, - agg_stats.waterway_km_deleted, - agg_stats.coastline_km_added, - agg_stats.coastline_km_modified, - agg_stats.coastline_km_deleted, - agg_stats.roads_added, - agg_stats.roads_modified, - agg_stats.roads_deleted, - agg_stats.waterways_added, - agg_stats.waterways_modified, - agg_stats.waterways_deleted, - agg_stats.coastlines_added, - agg_stats.coastlines_modified, - agg_stats.coastlines_deleted, - agg_stats.buildings_added, - agg_stats.buildings_modified, - agg_stats.buildings_deleted, - agg_stats.pois_added, - agg_stats.pois_modified, - agg_stats.pois_deleted, - agg_stats.last_edit, - agg_stats.changeset_count, - agg_stats.edit_count, - usr_editor_counts.editor_json AS editors, - usr_day_counts.day_json AS edit_times, - coalesce(usr_country_counts.country_json, '[]') AS country_list, - coalesce(usr_hashtag_counts.hashtag_json, '[]') AS hashtags, - agg_stats.updated_at - FROM ((((agg_stats - LEFT JOIN usr_country_counts ON ((agg_stats.id = usr_country_counts.user_id))) - LEFT JOIN usr_hashtag_counts ON ((agg_stats.id = usr_hashtag_counts.user_id))) - LEFT JOIN usr_day_counts ON ((agg_stats.id = usr_day_counts.user_id))) - LEFT JOIN usr_editor_counts ON ((agg_stats.id = usr_editor_counts.user_id))); + WITH general AS ( + SELECT + user_id, + array_agg(id) changesets, + max(coalesce(closed_at, created_at)) last_edit, + count(*) changeset_count, + sum(coalesce(total_edits, 0)) edit_count, + max(updated_at) updated_at + FROM changesets + GROUP BY user_id + ), + country_counts AS ( + SELECT + user_id, + code, + count(*) changesets, + sum(coalesce(total_edits, 0)) edits + FROM changesets + JOIN changesets_countries ON changesets.id = changesets_countries.changeset_id + JOIN countries ON changesets_countries.country_id = countries.id + GROUP BY user_id, code + ), + countries AS ( + SELECT + user_id, + jsonb_object_agg(code, changesets) country_changesets, + jsonb_object_agg(code, edits) country_edits + FROM country_counts + GROUP BY user_id + ), + edit_day_counts AS ( + SELECT + user_id, + date_trunc('day', coalesce(closed_at, created_at))::date AS day, + count(*) changesets, + sum(coalesce(total_edits, 0)) edits + FROM changesets + WHERE coalesce(closed_at, created_at) IS NOT NULL + GROUP BY user_id, day + ), + edit_days AS ( + SELECT + user_id, + jsonb_object_agg(day, changesets) day_changesets, + jsonb_object_agg(day, edits) day_edits + FROM edit_day_counts + GROUP BY user_id + ), + editor_counts AS ( + SELECT + RANK() OVER (PARTITION BY user_id ORDER BY sum(coalesce(total_edits, 0)) DESC) AS rank, + user_id, + editor, + count(*) changesets, + sum(coalesce(total_edits, 0)) edits + FROM changesets + WHERE editor IS NOT NULL + GROUP BY user_id, editor + ), + editors AS ( + SELECT + user_id, + jsonb_object_agg(editor, changesets) editor_changesets, + jsonb_object_agg(editor, edits) editor_edits + FROM editor_counts + WHERE rank <= 10 + GROUP BY user_id + ), + hashtag_counts AS ( + SELECT + RANK() OVER (PARTITION BY user_id ORDER BY sum(coalesce(total_edits, 0)) DESC) AS rank, + user_id, + hashtag, + count(*) changesets, + sum(coalesce(total_edits)) edits + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + JOIN hashtags ON changesets_hashtags.hashtag_id = hashtags.id + GROUP BY user_id, hashtag + ), + hashtags AS ( + SELECT + user_id, + jsonb_object_agg(hashtag, changesets) hashtag_changesets, + jsonb_object_agg(hashtag, edits) hashtag_edits + FROM hashtag_counts + WHERE rank <= 50 + GROUP BY user_id + ), + measurements AS ( + SELECT + id, + user_id, + key, + value + FROM changesets + CROSS JOIN LATERAL jsonb_each(measurements) + ), + aggregated_measurements_kv AS ( + SELECT + user_id, + key, + sum((value->>0)::numeric) AS value + FROM measurements + GROUP BY user_id, key + ), + aggregated_measurements AS ( + SELECT + user_id, + jsonb_object_agg(key, value) measurements + FROM aggregated_measurements_kv + GROUP BY user_id + ), + counts AS ( + SELECT + id, + user_id, + key, + value + FROM changesets + CROSS JOIN LATERAL jsonb_each(counts) + ), + aggregated_counts_kv AS ( + SELECT + user_id, + key, + sum((value->>0)::numeric) AS value + FROM counts + GROUP BY user_id, key + ), + aggregated_counts AS ( + SELECT + user_id, + jsonb_object_agg(key, value) counts + FROM aggregated_counts_kv + GROUP BY user_id + ) + SELECT + user_id AS id, + users.name, + measurements, + counts, + last_edit, + changeset_count, + edit_count, + editor_changesets, + editor_edits, + day_changesets, + day_edits, + country_changesets, + country_edits, + hashtag_changesets, + hashtag_edits, + updated_at + FROM general + LEFT OUTER JOIN countries USING (user_id) + LEFT OUTER JOIN editors USING (user_id) + LEFT OUTER JOIN edit_days USING (user_id) + LEFT OUTER JOIN hashtags USING (user_id) + LEFT OUTER JOIN aggregated_measurements USING (user_id) + LEFT OUTER JOIN aggregated_counts USING (user_id) + JOIN users ON user_id = users.id; -CREATE UNIQUE INDEX user_statistics_id ON user_statistics(id); +CREATE UNIQUE INDEX IF NOT EXISTS user_statistics_id ON user_statistics(id); diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index a34c726..99fe237 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -6,9 +6,11 @@ server { } database { + driver = "org.postgresql.Driver" driver = ${?DB_DRIVER} url = ${?DB_URL} user = ${?DB_USER} + password = "" password = ${?DB_PASS} } @@ -18,7 +20,5 @@ tiles { s-3suffix = ${?TILE_SUFFIX} chunk-size = 4096 chunk-size = ${?TILE_CHUNK_SIZE} - gzipped = true - gzipped = ${?GZIPPED} } diff --git a/src/main/scala/osmesa/server/Config.scala b/src/main/scala/osmesa/server/Config.scala index d7565d5..fe9b6d0 100644 --- a/src/main/scala/osmesa/server/Config.scala +++ b/src/main/scala/osmesa/server/Config.scala @@ -2,25 +2,33 @@ package osmesa.server import cats.effect.IO import com.typesafe.config.ConfigFactory +import pureconfig._ import pureconfig.error.ConfigReaderException - -case class Config(server: Config.Server, database: Config.Database, tiles: Config.Tiles) - +case class Config(server: Config.Server, + database: Config.Database, + tiles: Config.Tiles) object Config { - case class Database(driver: String, url: String, user: String, password: String) - case class Server(host: String, port: Int) - case class Tiles(s3bucket: String, s3prefix: String, s3suffix: Option[String], chunkSize: Int, gzipped: Boolean) - - import pureconfig._ - def load(configFile: String = "application.conf"): IO[Config] = { IO { loadConfig[Config](ConfigFactory.load(configFile)) }.flatMap { - case Left(e) => IO.raiseError[Config](new ConfigReaderException[Config](e)) + case Left(e) => + IO.raiseError[Config](new ConfigReaderException[Config](e)) case Right(config) => IO.pure(config) } } + + case class Database(driver: String, + url: String, + user: String, + password: String) + + case class Server(host: String, port: Int) + + case class Tiles(s3bucket: String, + s3prefix: String, + s3suffix: Option[String], + chunkSize: Int) } diff --git a/src/main/scala/osmesa/server/Database.scala b/src/main/scala/osmesa/server/Database.scala index f4dd897..77ee04a 100644 --- a/src/main/scala/osmesa/server/Database.scala +++ b/src/main/scala/osmesa/server/Database.scala @@ -7,7 +7,12 @@ import org.flywaydb.core.Flyway object Database { def transactor(dbconfig: Config.Database): IO[HikariTransactor[IO]] = { - HikariTransactor.newHikariTransactor[IO](dbconfig.driver, dbconfig.url, dbconfig.user, dbconfig.password) + HikariTransactor.newHikariTransactor[IO]( + dbconfig.driver, + dbconfig.url, + dbconfig.user, + dbconfig.password + ) } def initialize(transactor: HikariTransactor[IO]): IO[Unit] = { diff --git a/src/main/scala/osmesa/server/stats/StatsRouter.scala b/src/main/scala/osmesa/server/DefaultRouter.scala similarity index 63% rename from src/main/scala/osmesa/server/stats/StatsRouter.scala rename to src/main/scala/osmesa/server/DefaultRouter.scala index da8ca00..abf8a95 100644 --- a/src/main/scala/osmesa/server/stats/StatsRouter.scala +++ b/src/main/scala/osmesa/server/DefaultRouter.scala @@ -1,45 +1,24 @@ -package osmesa.server.stats - -import osmesa.server.model._ +package osmesa.server import cats.effect._ import doobie.Transactor import io.circe._ import io.circe.syntax._ -import fs2._ -import fs2.StreamApp.ExitCode -import org.http4s.circe._ import org.http4s._ +import org.http4s.circe._ import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ - - -class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { - - private def eitherResult[Result: Encoder](result: Either[OsmStatError, Result]) = { - result match { - case Right(succ) => Ok(succ.asJson, `Content-Type`(MediaType.`application/json`)) - case Left(err) => NotFound(err.toString) - } - } - - implicit val xa: Transactor[IO] = trans - - object OptionalPageQueryParamMatcher extends OptionalQueryParamDecoderMatcher[Int]("page") +import org.http4s.headers.`Content-Type` +import osmesa.server.model._ +import osmesa.server.stats._ +class DefaultRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { def routes: HttpService[IO] = HttpService[IO] { case GET -> Root => Ok("hello world") case GET -> Root / "users" :? OptionalPageQueryParamMatcher(pageNum) => - Ok(UserStats.getPage(pageNum.getOrElse(0)).map(_.asJson)) + Ok(UserStats.getPage(pageNum.getOrElse(1)).map(_.asJson)) case GET -> Root / "users" / IntVar(userId) => for { @@ -49,7 +28,7 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { // Too many results. The data will get where it needs to go (streamed, chunked response) but the client might well crash case GET -> Root / "changesets" :? OptionalPageQueryParamMatcher(pageNum) => - Ok(Changeset.getPage(pageNum.getOrElse(0)).map(_.asJson)) + Ok(Changeset.getPage(pageNum.getOrElse(1)).map(_.asJson)) case GET -> Root / "changesets" / LongVar(changesetId) => for { @@ -59,7 +38,7 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { case GET -> Root / "campaigns" :? OptionalPageQueryParamMatcher(pageNum) => for { - io <- HashtagStats.getPage(pageNum.getOrElse(0)) + io <- HashtagStats.getPage(pageNum.getOrElse(1)) res <- eitherResult(io) } yield res @@ -69,8 +48,20 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { result <- eitherResult(io) } yield result + case GET -> Root / "campaigns" / hashtag / "users" :? OptionalPageQueryParamMatcher(pageNum) => + for { + io <- HashtagUserStats.getPage(hashtag, pageNum.getOrElse(1)) + result <- eitherResult(io) + } yield result + + case GET -> Root / "campaigns" / hashtag / LongVar(uid) => + for { + io <- HashtagUserStats.byTagAndUid(hashtag, uid) + result <- eitherResult(io) + } yield result + case GET -> Root / "countries" :? OptionalPageQueryParamMatcher(pageNum) => - Ok(Country.getPage(pageNum.getOrElse(0)).map(_.asJson)) + Ok(Country.getPage(pageNum.getOrElse(1)).map(_.asJson)) case GET -> Root / "countries" / IntVar(countryId) => for { @@ -84,10 +75,14 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { result <- eitherResult(io) } yield result - case GET -> Root / "changesets-countries" :? OptionalPageQueryParamMatcher(pageNum) => - Ok(ChangesetCountry.getPage(pageNum.getOrElse(0)).map(_.asJson)) + case GET -> Root / "changesets-countries" :? OptionalPageQueryParamMatcher( + pageNum + ) => + Ok(ChangesetCountry.getPage(pageNum.getOrElse(1)).map(_.asJson)) - case GET -> Root / "changesets-countries" / IntVar(changesetId) / IntVar(countryId) => + case GET -> Root / "changesets-countries" / IntVar(changesetId) / IntVar( + countryId + ) => for { io <- ChangesetCountry.byId(changesetId, countryId) changesetCountry <- eitherResult(io) @@ -99,4 +94,19 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { result <- eitherResult(Right(io)) } yield result } + + implicit val xa: Transactor[IO] = trans + + private def eitherResult[Result: Encoder]( + result: Either[OsmStatError, Result] + ) = { + result match { + case Right(succ) => + Ok(succ.asJson, `Content-Type`(MediaType.`application/json`)) + case Left(err) => NotFound(err.toString) + } + } + + object OptionalPageQueryParamMatcher + extends OptionalQueryParamDecoderMatcher[Int]("page") } diff --git a/src/main/scala/osmesa/server/Server.scala b/src/main/scala/osmesa/server/Server.scala index 0184e76..5d3a404 100644 --- a/src/main/scala/osmesa/server/Server.scala +++ b/src/main/scala/osmesa/server/Server.scala @@ -1,26 +1,17 @@ package osmesa.server -import osmesa.server.model._ -import osmesa.server.stats._ -import osmesa.server.tile._ - import cats.effect._ -import doobie.Transactor -import io.circe._ -import io.circe.syntax._ -import fs2._ import fs2.StreamApp.ExitCode -import org.http4s.circe._ +import fs2._ import org.http4s._ -import org.http4s.server.blaze.BlazeBuilder import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} +import org.http4s.server.blaze.BlazeBuilder +import org.http4s.server.middleware.{CORS, CORSConfig} +import osmesa.server.tile._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ - object Server extends StreamApp[IO] { private val corsConfig = CORSConfig( @@ -31,24 +22,24 @@ object Server extends StreamApp[IO] { maxAge = 1.day.toSeconds ) - private val middleware: HttpMiddleware[IO] = { (routes: HttpService[IO]) => - CORS(routes) + private val middleware: HttpMiddleware[IO] = { routes: HttpService[IO] => + CORS(routes, corsConfig) } - def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] = { + def stream(args: List[String], + requestShutdown: IO[Unit]): Stream[IO, ExitCode] = { for { - config <- Stream.eval(Config.load()) + config <- Stream.eval(Config.load()) transactor <- Stream.eval(Database.transactor(config.database)) //_ <- Stream.eval(Database.initialize(transactor)) - stats = middleware(new StatsRouter(transactor).routes) + default = middleware(new DefaultRouter(transactor).routes) tiles = middleware(new TileRouter(config.tiles).routes) - exitCode <- BlazeBuilder[IO] + exitCode <- BlazeBuilder[IO] .enableHttp2(true) .bindHttp(config.server.port, config.server.host) - .mountService(stats, "/") + .mountService(default, "/") .mountService(tiles, "/tiles") .serve } yield exitCode } } - diff --git a/src/main/scala/osmesa/server/model/Changeset.scala b/src/main/scala/osmesa/server/model/Changeset.scala index 0235ed3..860bbd9 100644 --- a/src/main/scala/osmesa/server/model/Changeset.scala +++ b/src/main/scala/osmesa/server/model/Changeset.scala @@ -1,73 +1,62 @@ package osmesa.server.model +import java.time.Instant + +import cats.effect._ import doobie._ import doobie.implicits._ -import doobie.postgres._ import doobie.postgres.implicits._ -import cats.effect._ import io.circe._ -import io.circe.generic.semiauto._ +import io.circe.generic.extras.semiauto._ import io.circe.java8.time._ +import osmesa.server._ -import java.time.LocalDate - - -case class Changeset( - id: Long, - kmRoadsAdd: Option[Double], - kmRoadsMod: Option[Double], - kmWaterwaysAdd: Option[Double], - kmWaterwaysMod: Option[Double], - kmCoastlinesAdd: Option[Double], - kmCoastlinesMod: Option[Double], - roadsAdd: Option[Int], - roadsMod: Option[Int], - waterwaysAdd: Option[Int], - waterwaysMod: Option[Int], - coastlinesAdd: Option[Int], - coastlinesMod: Option[Int], - buildingsAdd: Option[Int], - buildingsMod: Option[Int], - poiAdd: Option[Int], - poiMod: Option[Int], - editor: Option[String], - userId: Option[Int], - createdAt: Option[LocalDate], - closedAt: Option[LocalDate], - augmentedDiffs: Option[Array[Int]], - updatedAt: Option[LocalDate] -) - - -object Changeset { +case class Changeset(id: Long, + measurements: Json, + counts: Json, + editor: Option[String], + userId: Option[Int], + createdAt: Option[Instant], + closedAt: Option[Instant], + augmentedDiffs: Option[Array[Int]], + updatedAt: Option[Instant]) - implicit val changesetDecoder: Decoder[Changeset] = deriveDecoder - implicit val changesetEncoder: Encoder[Changeset] = deriveEncoder +object Changeset extends Implicits { + implicit val changesetDecoder: Decoder[Changeset] = deriveDecoder[Changeset] + implicit val changesetEncoder: Encoder[Changeset] = deriveEncoder[Changeset] private val selectF = fr""" SELECT - id, road_km_added, road_km_modified, waterway_km_added, waterway_km_modified, - coastline_km_added, coastline_km_modified, roads_added, roads_modified, - waterways_added, waterways_modified, coastlines_added, coastlines_modified, - buildings_added, buildings_modified, pois_added, pois_modified, editor, user_id, - created_at, closed_at, augmented_diffs, updated_at + id, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, + editor, + user_id, + created_at, + closed_at, + augmented_diffs, + updated_at FROM changesets """ - def byId(id: Long)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Changeset]] = + def byId( + id: Long + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Changeset]] = (selectF ++ fr"WHERE id = $id") .query[Changeset] .option .transact(xa) .map { case Some(changeset) => Right(changeset) - case None => Left(IdNotFoundError("changeset", id)) + case None => Left(IdNotFoundError("changeset", id)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[Changeset]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY id ASC LIMIT 10 OFFSET $offset;") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[Changeset]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset;") .query[Changeset] .to[List] .map({ ResultPage(_, pageNum) }) diff --git a/src/main/scala/osmesa/server/model/ChangesetCountry.scala b/src/main/scala/osmesa/server/model/ChangesetCountry.scala index 8bc7c02..67411e4 100644 --- a/src/main/scala/osmesa/server/model/ChangesetCountry.scala +++ b/src/main/scala/osmesa/server/model/ChangesetCountry.scala @@ -1,48 +1,51 @@ package osmesa.server.model +import cats.effect._ import doobie._ import doobie.implicits._ -import cats.effect._ import io.circe._ import io.circe.generic.semiauto._ +import osmesa.server._ +case class ChangesetCountry(changesetId: Int, countryId: Int, editCount: Int) -case class ChangesetCountry( - changesetId: Int, - countryId: Int, - editCount: Int -) - +object ChangesetCountry extends Implicits { -object ChangesetCountry { - - implicit val changesetCountryDecoder: Decoder[ChangesetCountry] = deriveDecoder - implicit val changesetCountryEncoder: Encoder[ChangesetCountry] = deriveEncoder + implicit val changesetCountryDecoder: Decoder[ChangesetCountry] = + deriveDecoder + implicit val changesetCountryEncoder: Encoder[ChangesetCountry] = + deriveEncoder private val selectF = fr""" SELECT - changeset_id, country_id, edit_count + changeset_id, + country_id, + edit_count FROM changesets_countries """ - def byId(changesetId: Int, countryId: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, ChangesetCountry]] = - (selectF ++ fr"WHERE changeset_id = $changesetId AND country_id == $countryId") + def byId(changesetId: Int, countryId: Int)( + implicit xa: Transactor[IO] + ): IO[Either[OsmStatError, ChangesetCountry]] = + (selectF ++ fr"WHERE changeset_id = $changesetId AND country_id = $countryId") .query[ChangesetCountry] .option .transact(xa) .map { case Some(changesetCountry) => Right(changesetCountry) - case None => Left(IdNotFoundError("changesetCountry", (changesetId, countryId))) + case None => + Left(IdNotFoundError("changesetCountry", (changesetId, countryId))) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[ChangesetCountry]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY changesetId ASC, countryId ASC LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[ChangesetCountry]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY changeset_id ASC, country_id ASC LIMIT $pageSize OFFSET $offset") .query[ChangesetCountry] .to[List] .map({ ResultPage(_, pageNum) }) .transact(xa) } } - diff --git a/src/main/scala/osmesa/server/model/Country.scala b/src/main/scala/osmesa/server/model/Country.scala index 47eb5a2..22919c3 100644 --- a/src/main/scala/osmesa/server/model/Country.scala +++ b/src/main/scala/osmesa/server/model/Country.scala @@ -1,20 +1,15 @@ package osmesa.server.model +import cats.effect._ import doobie._ import doobie.implicits._ -import cats.effect._ import io.circe._ import io.circe.generic.semiauto._ +import osmesa.server._ +case class Country(id: Int, name: Option[String], code: String) -case class Country( - id: Int, - name: Option[String], - code: String -) - - -object Country { +object Country extends Implicits { implicit val countryDecoder: Decoder[Country] = deriveDecoder implicit val countryEncoder: Encoder[Country] = deriveEncoder @@ -26,19 +21,23 @@ object Country { countries """ - def byId(id: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Country]] = + def byId( + id: Int + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Country]] = (selectF ++ fr"WHERE id = $id") .query[Country] .option .transact(xa) .map { case Some(country) => Right(country) - case None => Left(IdNotFoundError("country", id)) + case None => Left(IdNotFoundError("country", id)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[Country]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY id ASC LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[Country]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset") .query[Country] .to[List] .map({ ResultPage(_, pageNum) }) @@ -46,4 +45,3 @@ object Country { } } - diff --git a/src/main/scala/osmesa/server/model/Hashtag.scala b/src/main/scala/osmesa/server/model/Hashtag.scala index 4bdbc4b..57dd533 100644 --- a/src/main/scala/osmesa/server/model/Hashtag.scala +++ b/src/main/scala/osmesa/server/model/Hashtag.scala @@ -1,49 +1,49 @@ package osmesa.server.model +import cats.effect._ import doobie._ import doobie.implicits._ -import cats.effect._ import io.circe._ import io.circe.generic.semiauto._ +import osmesa.server._ -import scala.concurrent.Future - +case class Hashtag(id: Int, hashtag: String) -case class Hashtag( - id: Int, - hashtag: String -) - - -object Hashtag { +object Hashtag extends Implicits { implicit val hashtagDecoder: Decoder[Hashtag] = deriveDecoder implicit val hashtagEncoder: Encoder[Hashtag] = deriveEncoder - private val selectF = fr""" + private val selectF = + fr""" SELECT id, hashtag FROM hashtags """ - def byId(id: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Hashtag]] = + def byId( + id: Int + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Hashtag]] = (selectF ++ fr"WHERE id = $id") .query[Hashtag] .option .transact(xa) .map { case Some(country) => Right(country) - case None => Left(IdNotFoundError("hashtag", id)) + case None => Left(IdNotFoundError("hashtag", id)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[Hashtag]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY id LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[Hashtag]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY id LIMIT $pageSize OFFSET $offset") .query[Hashtag] .to[List] - .map({ ResultPage(_, pageNum) }) + .map({ + ResultPage(_, pageNum) + }) .transact(xa) } } - diff --git a/src/main/scala/osmesa/server/model/OsmStatError.scala b/src/main/scala/osmesa/server/model/OsmStatError.scala index 402519a..3781312 100644 --- a/src/main/scala/osmesa/server/model/OsmStatError.scala +++ b/src/main/scala/osmesa/server/model/OsmStatError.scala @@ -1,15 +1,12 @@ package osmesa.server.model -import io.circe._ - - trait OsmStatError case class UnknownError(message: String) extends OsmStatError { override def toString = s"Unknown error: $message" } -case class IdNotFoundError[ID](recordType: String, id: ID) extends OsmStatError { +case class IdNotFoundError[ID](recordType: String, id: ID) + extends OsmStatError { override def toString = s"Unable to retrieve ${recordType} record at ${id}" } - diff --git a/src/main/scala/osmesa/server/model/Page.scala b/src/main/scala/osmesa/server/model/Page.scala index 1a48dad..9f54c78 100644 --- a/src/main/scala/osmesa/server/model/Page.scala +++ b/src/main/scala/osmesa/server/model/Page.scala @@ -3,13 +3,11 @@ package osmesa.server.model import io.circe._ import io.circe.generic.semiauto._ - -case class ResultPage[RESULT]( - results: List[RESULT], - page: Int -) +case class ResultPage[RESULT](results: List[RESULT], page: Int) object ResultPage { - implicit def resultPageDecoder[RESULT: Decoder]: Decoder[ResultPage[RESULT]] = deriveDecoder - implicit def resultPageEncoder[RESULT: Encoder]: Encoder[ResultPage[RESULT]] = deriveEncoder + implicit def resultPageDecoder[RESULT: Decoder]: Decoder[ResultPage[RESULT]] = + deriveDecoder + implicit def resultPageEncoder[RESULT: Encoder]: Encoder[ResultPage[RESULT]] = + deriveEncoder } diff --git a/src/main/scala/osmesa/server/model/User.scala b/src/main/scala/osmesa/server/model/User.scala index 27a1880..dbcaf8a 100644 --- a/src/main/scala/osmesa/server/model/User.scala +++ b/src/main/scala/osmesa/server/model/User.scala @@ -1,19 +1,15 @@ package osmesa.server.model +import cats.effect._ import doobie._ import doobie.implicits._ -import cats.effect._ import io.circe._ import io.circe.generic.semiauto._ +import osmesa.server._ +case class User(id: Int, name: Option[String]) -case class User( - id: Int, - name: Option[String] -) - - -object User { +object User extends Implicits { implicit val userDecoder: Decoder[User] = deriveDecoder implicit val userEncoder: Encoder[User] = deriveEncoder @@ -25,23 +21,26 @@ object User { users """ - def byId(id: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, User]] = + def byId( + id: Int + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, User]] = (selectF ++ fr"WHERE id = $id") .query[User] .option .transact(xa) .map { case Some(user) => Right(user) - case None => Left(IdNotFoundError("user", id)) + case None => Left(IdNotFoundError("user", id)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[User]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY id ASC LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[User]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset") .query[User] .to[List] .map({ ResultPage(_, pageNum) }) .transact(xa) } } - diff --git a/src/main/scala/osmesa/server/package.scala b/src/main/scala/osmesa/server/package.scala index ed6bc6f..92a8a4a 100644 --- a/src/main/scala/osmesa/server/package.scala +++ b/src/main/scala/osmesa/server/package.scala @@ -1,4 +1,29 @@ package osmesa +import cats.implicits._ +import doobie.util.meta.Meta +import io.circe.Json +import io.circe.generic.extras.Configuration +import io.circe.parser.parse +import org.postgresql.util.PGobject -package object server {} +package object server { + trait Implicits { + implicit val CustomConfig: Configuration = + Configuration.default.withSnakeCaseMemberNames.withDefaults + + } + + implicit final val JsonMeta: Meta[Json] = + Meta + .other[PGobject]("json") + .xmap[Json]( + a => parse(a.getValue).leftMap[Json](e => throw e).merge, + a => { + val o = new PGobject + o.setType("json") + o.setValue(a.noSpaces) + o + } + ) +} diff --git a/src/main/scala/osmesa/server/stats/CountryStats.scala b/src/main/scala/osmesa/server/stats/CountryStats.scala index b923b1f..c56d5d6 100644 --- a/src/main/scala/osmesa/server/stats/CountryStats.scala +++ b/src/main/scala/osmesa/server/stats/CountryStats.scala @@ -1,90 +1,76 @@ package osmesa.server.stats -import osmesa.server.model._ +import java.time.Instant +import cats.effect._ import doobie._ import doobie.implicits._ -import doobie.postgres._ -import doobie.postgres.implicits._ -import cats._ -import cats.data._ -import cats.effect._ -import cats.implicits._ import io.circe._ -import io.circe.jawn._ -import io.circe.syntax._ -import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto._ -import fs2._ -import org.http4s.circe._ -import org.http4s._ -import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} -import org.postgresql.util.PGobject - -import scala.concurrent.duration._ +import io.circe.java8.time._ +import osmesa.server._ +import osmesa.server.model._ -case class CountryStats( - countryId: Long, - name: Option[String], - kmRoadsAdd: Option[Double], - kmRoadsMod: Option[Double], - kmWaterwaysAdd: Option[Double], - kmWaterwaysMod: Option[Double], - kmCoastlinesAdd: Option[Double], - kmCoastlinesMod: Option[Double], - roadsAdd: Option[Int], - roadsMod: Option[Int], - waterwaysAdd: Option[Int], - waterwaysMod: Option[Int], - coastlinesAdd: Option[Int], - coastlinesMod: Option[Int], - buildingsAdd: Option[Int], - buildingsMod: Option[Int], - poiAdd: Option[Int], - poiMod: Option[Int], - lastEdit: Option[java.sql.Timestamp], - updatedAt: Option[java.sql.Timestamp], - changesetCount: Option[Int], - editCount: Option[Int], - userEdits: Json, - hashtagEdits: Json -) +case class CountryStats(countryId: Long, + name: Option[String], + code: Option[String], + measurements: Json, + counts: Json, + lastEdit: Option[Instant], + updatedAt: Option[Instant], + changesetCount: Option[Int], + editCount: Option[Int], + userChangesets: Json, + userEdits: Json, + hashtagChangesets: Json, + hashtagEdits: Json) -object CountryStats { - implicit val customConfig: Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults +object CountryStats extends Implicits { implicit val countryStatsDecoder: Decoder[CountryStats] = deriveDecoder implicit val countryStatsEncoder: Encoder[CountryStats] = deriveEncoder - private val selectF = fr""" + private val selectF = + fr""" SELECT - country_id, country_name, road_km_added, road_km_modified, waterway_km_added, waterway_km_modified, - coastline_km_added, coastline_km_modified, roads_added, roads_modified, waterways_added, waterways_modified, - coastlines_added, coastlines_modified, buildings_added, buildings_modified, pois_added, pois_modified, - last_edit, updated_at, changeset_count, edit_count, user_edit_counts, hashtag_edits + country_id, + country_name, + country_code, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, + last_edit, + updated_at, + changeset_count, + edit_count, + coalesce(user_changesets, '{}'::jsonb) user_changesets, + coalesce(user_edits, '{}'::jsonb) user_edits, + coalesce(hashtag_changesets, '{}'::jsonb) hashtag_changesets, + coalesce(hashtag_edits, '{}'::jsonb) hashtag_edits FROM country_statistics """ - def byId(code: String)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, CountryStats]] = + def byId( + code: String + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, CountryStats]] = (selectF ++ fr"WHERE country_code = $code") .query[CountryStats] .option .transact(xa) .map { case Some(country) => Right(country) - case None => Left(IdNotFoundError("country", code)) + case None => Left(IdNotFoundError("country", code)) } - def getPage(pageNum: Int, pageSize: Int = 25)(implicit xa: Transactor[IO]): IO[ResultPage[CountryStats]] = { - val offset = pageNum * pageSize + 1 + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[CountryStats]] = { + val offset = (pageNum - 1) * pageSize (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset") .query[CountryStats] .to[List] - .map({ ResultPage(_, pageNum) }) + .map({ + ResultPage(_, pageNum) + }) .transact(xa) } } diff --git a/src/main/scala/osmesa/server/stats/HashtagStats.scala b/src/main/scala/osmesa/server/stats/HashtagStats.scala index 8f2c646..9d8a913 100644 --- a/src/main/scala/osmesa/server/stats/HashtagStats.scala +++ b/src/main/scala/osmesa/server/stats/HashtagStats.scala @@ -1,111 +1,76 @@ package osmesa.server.stats -import osmesa.server.model._ +import java.time.Instant -import doobie._ -import doobie.implicits._ -import doobie.postgres._ -import doobie.postgres.implicits._ -import cats._ -import cats.data._ import cats.effect._ import cats.implicits._ +import doobie._ +import doobie.implicits._ import io.circe._ -import io.circe.jawn._ -import io.circe.syntax._ -import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto._ -import fs2._ -import org.postgresql.util.PGobject - -import scala.concurrent.duration._ - +import io.circe.java8.time._ +import osmesa.server._ +import osmesa.server.model._ -case class HashtagStats( - tag: String, - extentUri: Option[String], - buildingsAdd: Option[Int], - buildingsMod: Option[Int], - roadsAdd: Option[Int], - kmRoadsAdd: Option[Double], - roadsMod: Option[Int], - kmRoadsMod: Option[Double], - waterwaysAdd: Option[Int], - kmWaterwaysAdd: Option[Double], - waterwaysMod: Option[Int], - kmWaterwaysMod: Option[Double], - coastlinesAdd: Option[Int], - kmCoastlinesAdd: Option[Double], - coastlinesMod: Option[Int], - kmCoastlinesMod: Option[Double], - poiAdd: Option[Int], - poiMod: Option[Int], - users: Json -) +case class HashtagStats(tag: String, + measurements: Json, + counts: Json, + changesetCount: Option[Int], + editCount: Option[Int], + lastEdit: Option[Instant], + updatedAt: Option[Instant], + userChangesets: Json, + userEdits: Json) -/** ------------------------+------------------+-----------+----------+--------- - tag | text | | | - users | json | | | - extent_uri | text | | | - buildings_added | bigint | | | - buildings_modified | bigint | | | - roads_added | bigint | | | - road_km_added | double precision | | | - roads_modified | bigint | | | - road_km_modified | double precision | | | - waterways_added | bigint | | | - waterway_km_added | double precision | | | - waterways_modified | bigint | | | - waterway_km_modified | double precision | | | - coastlines_added | bigint | | | - coastline_km_added | double precision | | | - coastlines_modified | bigint | | | - coastline_km_modified | double precision | | | - pois_added | bigint | | | - pois_modified | bigint | | | - **/ -object HashtagStats { - implicit val customConfig: Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults - implicit val userHashtagDecoder: Decoder[HashtagStats] = deriveDecoder - implicit val userHashtagEncoder: Encoder[HashtagStats] = deriveEncoder +object HashtagStats extends Implicits { + implicit val hashtagDecoder: Decoder[HashtagStats] = deriveDecoder + implicit val hashtagEncoder: Encoder[HashtagStats] = deriveEncoder - private val selectF = fr""" + private val selectF = + fr""" SELECT - tag, extent_uri, buildings_added, buildings_modified, - roads_added, road_km_added, roads_modified, road_km_modified, - waterways_added, waterway_km_added, waterways_modified, - waterway_km_modified, coastlines_added, coastline_km_added, - coastlines_modified, coastline_km_modified, pois_added, - pois_modified, users + tag, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, + changeset_count, + edit_count, + last_edit, + updated_at, + coalesce(user_changesets, '{}'::jsonb) user_changesets, + coalesce(user_edits, '{}'::jsonb) user_edits FROM hashtag_statistics """ - def byTag(tag: String)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, HashtagStats]] = + def byTag( + tag: String + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, HashtagStats]] = (selectF ++ fr"WHERE tag = $tag") .query[HashtagStats] .option .attempt .transact(xa) .map { - case Right(hashtagOrNone) => hashtagOrNone match { - case Some(ht) => Right(ht) - case None => Left(IdNotFoundError("hashtag_statistics", tag)) - } + case Right(hashtagOrNone) => + hashtagOrNone match { + case Some(ht) => Right(ht) + case None => Left(IdNotFoundError("hashtag_statistics", tag)) + } case Left(err) => Left(UnknownError(err.toString)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, ResultPage[HashtagStats]]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY tag ASC LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[Either[OsmStatError, ResultPage[HashtagStats]]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY tag ASC LIMIT $pageSize OFFSET $offset") .query[HashtagStats] .to[List] .attempt .transact(xa) .map { case Right(results) => Right(ResultPage(results, pageNum)) - case Left(err) => Left(UnknownError(err.toString)) + case Left(err) => Left(UnknownError(err.toString)) } } } diff --git a/src/main/scala/osmesa/server/stats/HashtagUserStats.scala b/src/main/scala/osmesa/server/stats/HashtagUserStats.scala new file mode 100644 index 0000000..208c8eb --- /dev/null +++ b/src/main/scala/osmesa/server/stats/HashtagUserStats.scala @@ -0,0 +1,76 @@ +package osmesa.server.stats + +import java.time.Instant + +import cats.effect._ +import cats.implicits._ +import doobie._ +import doobie.implicits._ +import io.circe._ +import io.circe.generic.extras.semiauto._ +import io.circe.java8.time._ +import osmesa.server._ +import osmesa.server.model._ + +case class HashtagUserStats(tag: String, + uid: Long, + name: Option[String], + measurements: Json, + counts: Json, + lastEdit: Option[Instant], + changesetCount: Option[Int], + editCount: Option[Int] + ) + +object HashtagUserStats extends Implicits { + implicit val hashtagUserDecoder: Decoder[HashtagUserStats] = deriveDecoder + implicit val hashtagUserEncoder: Encoder[HashtagUserStats] = deriveEncoder + + private val selectF = + fr""" + SELECT + hashtag tag, + user_id uid, + name, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, + last_edit, + changeset_count, + edit_count + FROM + hashtag_user_statistics + """ + + def byTagAndUid( + tag: String, + uid: Long + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, HashtagUserStats]] = + (selectF ++ fr"WHERE hashtag = $tag AND user_id = $uid") + .query[HashtagUserStats] + .option + .attempt + .transact(xa) + .map { + case Right(hashtagOrNone) => + hashtagOrNone match { + case Some(ht) => Right(ht) + case None => Left(IdNotFoundError("hashtag_user_statistics", tag)) + } + case Left(err) => Left(UnknownError(err.toString)) + } + + def getPage(tag: String, pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[Either[OsmStatError, ResultPage[HashtagUserStats]]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"WHERE hashtag = $tag ORDER BY hashtag, user_id ASC LIMIT $pageSize OFFSET $offset") + .query[HashtagUserStats] + .to[List] + .attempt + .transact(xa) + .map { + case Right(results) => Right(ResultPage(results, pageNum)) + case Left(err) => Left(UnknownError(err.toString)) + } + } +} diff --git a/src/main/scala/osmesa/server/stats/RefreshStats.scala b/src/main/scala/osmesa/server/stats/RefreshStats.scala index 20d176b..fe49135 100644 --- a/src/main/scala/osmesa/server/stats/RefreshStats.scala +++ b/src/main/scala/osmesa/server/stats/RefreshStats.scala @@ -1,43 +1,22 @@ package osmesa.server.stats -import osmesa.server.model._ +import java.time.Instant +import cats.effect._ import doobie._ import doobie.implicits._ -import doobie.postgres._ import doobie.postgres.implicits._ -import cats._ -import cats.data._ -import cats.effect._ -import cats.implicits._ import io.circe._ -import io.circe.jawn._ -import io.circe.syntax._ -import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto._ -import fs2._ -import org.http4s.circe._ -import org.http4s._ -import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} -import org.postgresql.util.PGobject +import io.circe.java8.time._ +import osmesa.server.Implicits -import scala.concurrent.duration._ +case class RefreshTime(view: Option[String], updatedAt: Option[Instant]) -case class RefreshTime( - view: Option[String], - updatedAt: Option[java.sql.Timestamp] -) - -case class RefreshStats( - userStatsRefresh: Option[java.sql.Timestamp], - countryStatsRefresh: Option[java.sql.Timestamp], - hashtagStatsRefresh: Option[java.sql.Timestamp] -) { - def +(that: RefreshStats) = { +case class RefreshStats(userStatsRefresh: Option[Instant], + countryStatsRefresh: Option[Instant], + hashtagStatsRefresh: Option[Instant]) { + def +(that: RefreshStats): RefreshStats = { RefreshStats( (userStatsRefresh.toList ++ that.userStatsRefresh).headOption, (countryStatsRefresh.toList ++ that.countryStatsRefresh).headOption, @@ -46,9 +25,7 @@ case class RefreshStats( } } -object RefreshStats { - implicit val customConfig: Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults - +object RefreshStats extends Implicits { implicit val refreshStatsDecoder: Decoder[RefreshStats] = deriveDecoder implicit val refreshStatsEncoder: Encoder[RefreshStats] = deriveEncoder @@ -59,17 +36,17 @@ object RefreshStats { refreshments """ - def apply(arg: RefreshTime): RefreshStats = arg.view.get match { - case "user_statistics" => RefreshStats(arg.updatedAt, None, None) - case "country_statistics" => RefreshStats(None, arg.updatedAt, None) - case "hashtag_statistics" => RefreshStats(None, None, arg.updatedAt) - } - def getCurrentStatus()(implicit xa: Transactor[IO]): IO[RefreshStats] = - (selectF) + selectF .query[RefreshTime] .to[List] .transact(xa) - .map(_.map(RefreshStats.apply(_)).reduce(_+_)) + .map(_.map(RefreshStats.apply).reduce(_ + _)) + + def apply(arg: RefreshTime): RefreshStats = arg.view.get match { + case "user_statistics" => RefreshStats(arg.updatedAt, None, None) + case "country_statistics" => RefreshStats(None, arg.updatedAt, None) + case "hashtag_statistics" => RefreshStats(None, None, arg.updatedAt) + } } diff --git a/src/main/scala/osmesa/server/stats/UserStats.scala b/src/main/scala/osmesa/server/stats/UserStats.scala index 15cefc5..2502a4b 100644 --- a/src/main/scala/osmesa/server/stats/UserStats.scala +++ b/src/main/scala/osmesa/server/stats/UserStats.scala @@ -1,131 +1,82 @@ package osmesa.server.stats -import osmesa.server.model._ +import java.time.Instant +import cats.effect._ import doobie._ import doobie.implicits._ -import doobie.postgres._ -import doobie.postgres.implicits._ -import cats._ -import cats.data._ -import cats.effect._ -import cats.implicits._ import io.circe._ -import io.circe.jawn._ -import io.circe.syntax._ -import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto._ -import fs2._ -import org.http4s.circe._ -import org.http4s._ -import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} -import org.postgresql.util.PGobject - -import scala.concurrent.duration._ - +import io.circe.java8.time._ +import osmesa.server._ +import osmesa.server.model.{IdNotFoundError, OsmStatError, ResultPage} -case class UserStats( - uid: Long, - name: Option[String], - extentUri: Option[String], - kmRoadsAdd: Option[Double], - kmRoadsMod: Option[Double], - kmRoadsDel: Option[Double], - kmWaterwaysAdd: Option[Double], - kmWaterwaysMod: Option[Double], - kmWaterwaysDel: Option[Double], - kmCoastlinesAdd: Option[Double], - kmCoastlinesMod: Option[Double], - kmCoastlinesDel: Option[Double], - roadsAdd: Option[Int], - roadsMod: Option[Int], - roadsDel: Option[Int], - waterwaysAdd: Option[Int], - waterwaysMod: Option[Int], - waterwaysDel: Option[Int], - coastlinesAdd: Option[Int], - coastlinesMod: Option[Int], - coastlinesDel: Option[Int], - buildingsAdd: Option[Int], - buildingsMod: Option[Int], - buildingsDel: Option[Int], - poiAdd: Option[Int], - poiMod: Option[Int], - poiDel: Option[Int], - lastEdit: Option[java.sql.Timestamp], - changesetCount: Option[Int], - editCount: Option[Int], - editors: Json, - editTimes: Json, - countryList: Json, - hashtags: Json -) +case class UserStats(uid: Long, + name: Option[String], + measurements: Json, + counts: Json, + lastEdit: Option[Instant], + updatedAt: Option[Instant], + changesetCount: Option[Int], + editCount: Option[Int], + editorChangesets: Json, + editorEdits: Json, + dayChangesets: Json, + dayEdits: Json, + countryChangesets: Json, + countryEdits: Json, + hashtagChangesets: Json, + hashtagEdits: Json) -object UserStats { - implicit val customConfig: Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults +object UserStats extends Implicits { implicit val userStatsDecoder: Decoder[UserStats] = deriveDecoder implicit val userStatsEncoder: Encoder[UserStats] = deriveEncoder - private val selectF = fr""" + private val selectF = + fr""" SELECT id, name, - extent_uri, - road_km_added, - road_km_modified, - road_km_deleted, - waterway_km_added, - waterway_km_modified, - waterway_km_deleted, - coastline_km_added, - coastline_km_modified, - coastline_km_deleted, - roads_added, - roads_modified, - roads_deleted, - waterways_added, - waterways_modified, - waterways_deleted, - coastlines_added, - coastlines_modified, - coastlines_deleted, - buildings_added, - buildings_modified, - buildings_deleted, - pois_added, - pois_modified, - pois_deleted, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, last_edit, + updated_at, changeset_count, edit_count, - editors, - edit_times, - country_list, - hashtags + coalesce(editor_changesets, '{}'::jsonb) editor_changesets, + coalesce(editor_edits, '{}'::jsonb) editor_edits, + coalesce(day_changesets, '{}'::jsonb) day_changesets, + coalesce(day_edits, '{}'::jsonb) day_edits, + coalesce(country_changesets, '{}'::jsonb) country_changesets, + coalesce(country_edits, '{}'::jsonb) country_edits, + coalesce(hashtag_changesets, '{}'::jsonb) hashtag_changesets, + coalesce(hashtag_edits, '{}'::jsonb) hashtag_edits FROM user_statistics """ - def byId(id: Long)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, UserStats]] = + def byId( + id: Long + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, UserStats]] = (selectF ++ fr"WHERE id = $id") .query[UserStats] .option .transact(xa) .map { case Some(user) => Right(user) - case None => Left(IdNotFoundError("user", id)) + case None => Left(IdNotFoundError("user", id)) } - def getPage(pageNum: Int, pageSize: Int = 25)(implicit xa: Transactor[IO]): IO[ResultPage[UserStats]] = { - val offset = pageNum * pageSize + 1 + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[UserStats]] = { + val offset = (pageNum - 1) * pageSize (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset") .query[UserStats] .to[List] - .map({ ResultPage(_, pageNum) }) + .map({ + ResultPage(_, pageNum) + }) .transact(xa) } } diff --git a/src/main/scala/osmesa/server/stats/package.scala b/src/main/scala/osmesa/server/stats/package.scala deleted file mode 100644 index ab91351..0000000 --- a/src/main/scala/osmesa/server/stats/package.scala +++ /dev/null @@ -1,32 +0,0 @@ -package osmesa.server - -import osmesa.server.model._ - -import doobie._ -import doobie.implicits._ -import doobie.postgres._ -import doobie.postgres.implicits._ -import io.circe._ -import io.circe.parser.parse -import cats._ -import cats.implicits._ -import org.postgresql.util.PGobject -import java.sql.Timestamp - -package object stats { - implicit val JsonMeta: Meta[Json] = - Meta.other[PGobject]("json").xmap[Json]( - a => parse(a.getValue).leftMap[Json](e => throw e).merge, - a => { - val o = new PGobject - o.setType("json") - o.setValue(a.noSpaces) - o - } - ) - implicit val TimestampFormat : Encoder[Timestamp] with Decoder[Timestamp] = new Encoder[Timestamp] with Decoder[Timestamp] { - override def apply(a: Timestamp): Json = Encoder.encodeLong.apply(a.getTime) - - override def apply(c: HCursor): Decoder.Result[Timestamp] = Decoder.decodeLong.map(s => new Timestamp(s)).apply(c) - } -} diff --git a/src/main/scala/osmesa/server/tile/TileLayouts.scala b/src/main/scala/osmesa/server/tile/TileLayouts.scala index 6008abc..774cc34 100644 --- a/src/main/scala/osmesa/server/tile/TileLayouts.scala +++ b/src/main/scala/osmesa/server/tile/TileLayouts.scala @@ -1,14 +1,14 @@ package osmesa.server.tile import geotrellis.proj4.WebMercator -import geotrellis.raster._ import geotrellis.spark.tiling._ - object TileLayouts { - private val layouts: Array[LayoutDefinition] = (0 to 30).map({ n => - ZoomedLayoutScheme.layoutForZoom(n, WebMercator.worldExtent, 256) - }).toArray + private val layouts: Array[LayoutDefinition] = (0 to 30) + .map({ n => + ZoomedLayoutScheme.layoutForZoom(n, WebMercator.worldExtent, 256) + }) + .toArray def apply(i: Int) = layouts(i) } diff --git a/src/main/scala/osmesa/server/tile/TileRouter.scala b/src/main/scala/osmesa/server/tile/TileRouter.scala index 8c321e7..8479b0f 100644 --- a/src/main/scala/osmesa/server/tile/TileRouter.scala +++ b/src/main/scala/osmesa/server/tile/TileRouter.scala @@ -1,78 +1,75 @@ package osmesa.server.tile -import osmesa.server.Config -import osmesa.server.model._ - -import cats.effect._ -import doobie.Transactor -import io.circe._ -import io.circe.syntax._ -import fs2._ -import fs2.StreamApp.ExitCode -import org.http4s.circe._ -import org.http4s._ -import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`, `Content-Encoding`} -import blobstore.{Path => BStorePath} -import blobstore.Store import blobstore.s3.S3Store -import geotrellis.vector.Extent -import geotrellis.vectortile.VectorTile +import blobstore.{Store, Path => BStorePath} +import cats.effect._ import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.model.AmazonS3Exception +import geotrellis.vector.Extent +import geotrellis.vectortile.VectorTile +import org.http4s._ +import org.http4s.dsl.Http4sDsl +import org.http4s.headers.{`Content-Encoding`, `Content-Type`} +import osmesa.server.Config import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ import scala.util._ - class TileRouter(tileConf: Config.Tiles) extends Http4sDsl[IO] { - private val s3client = AmazonS3ClientBuilder.standard().withRegion("us-east-1").build() + private val s3client = + AmazonS3ClientBuilder.standard().withRegion("us-east-1").build() private val store: Store[IO] = S3Store[IO](s3client) - private val vtileContentType = `Content-Type`(("application", "vnd.mapbox-vector-tile")) + private val vtileContentType = `Content-Type`( + ("application", "vnd.mapbox-vector-tile") + ) private val emptyVT = VectorTile(Map(), Extent(0, 0, 1, 1)) - def tilePath(pre: String, z: Int, x: Int, y: String) = { - BStorePath(tileConf.s3bucket, s"${pre}/${z}/${x}/${y}", None, false, None) - } - def routes: HttpService[IO] = HttpService[IO] { case GET -> Root / "user" / userId / IntVar(z) / IntVar(x) / y => val getBytes = store - .get(tilePath(s"${tileConf.s3prefix}/user/${userId}", z, x, y), tileConf.chunkSize) + .get( + tilePath(s"${tileConf.s3prefix}/user/${userId}", z, x, y), + tileConf.chunkSize + ) .compile .to[Array] .attempt - getBytes.flatMap { - case Right(bytes) if tileConf.gzipped => - Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) - case Right(bytes) => - Ok(bytes) - case Left(s3e: AmazonS3Exception) if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => - Ok(emptyVT.toBytes) - }.map(_.withContentType(vtileContentType)) + getBytes + .flatMap { + case Right(bytes) => + Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) + case Left(s3e: AmazonS3Exception) + if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => + Ok(emptyVT.toBytes) + } + .map(_.withContentType(vtileContentType)) case GET -> Root / "hashtag" / hashtag / IntVar(z) / IntVar(x) / y => val getBytes = store - .get(tilePath(s"${tileConf.s3prefix}/hashtag/${hashtag}", z, x, y), tileConf.chunkSize) + .get( + tilePath(s"${tileConf.s3prefix}/hashtag/${hashtag}", z, x, y), + tileConf.chunkSize + ) .compile .to[Array] .attempt - getBytes.flatMap { - case Right(bytes) if tileConf.gzipped => - Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) - case Right(bytes) => - Ok(bytes) - case Left(s3e: AmazonS3Exception) if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => - Ok(emptyVT.toBytes) - }.map(_.withContentType(vtileContentType)) + getBytes + .flatMap { + case Right(bytes) => + Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) + case Left(s3e: AmazonS3Exception) + if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => + Ok(emptyVT.toBytes) + } + .map(_.withContentType(vtileContentType)) + } + + def tilePath(pre: String, z: Int, x: Int, y: String) = { + BStorePath(tileConf.s3bucket, s"${pre}/${z}/${x}/${y}", None, false, None) } }