From 96d3b6bf1478a49706caed70189923960be6909a Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Mon, 10 Jun 2019 14:01:07 -0700 Subject: [PATCH 01/22] Update aggregation queries to use JSON columns --- sql/country_statistics.sql | 251 +++++++++++++++++--------------- sql/hashtag_statistics.sql | 262 +++++++++++++-------------------- sql/user_statistics.sql | 291 +++++++++++++++++++------------------ 3 files changed, 393 insertions(+), 411 deletions(-) diff --git a/sql/country_statistics.sql b/sql/country_statistics.sql index e9bb00c..346e7d2 100644 --- a/sql/country_statistics.sql +++ b/sql/country_statistics.sql @@ -1,117 +1,140 @@ CREATE MATERIALIZED VIEW country_statistics AS - WITH country_counts AS ( - -- Collect country-related changesets with edit counts and associate with hashtags - SELECT cc.changeset_id, - countries.id, - countries.code, - countries.name AS country_name, - cc.edit_count, - hts.hashtag_id - FROM ((changesets_countries cc - JOIN countries ON ((cc.country_id = countries.id))) - FULL OUTER JOIN changesets_hashtags hts ON (hts.changeset_id = cc.changeset_id)) - ), user_edits AS ( - -- Associate user ids with changesets/edit count - SELECT c_chg.country_id, - c_chg.edit_count, - c.user_id - FROM (changesets_countries c_chg - JOIN changesets c ON (c.id = c_chg.changeset_id)) - ), country_edits AS ( - -- Aggregate edit counts per user (ignore UID 0) - SELECT country_id, - user_id, - sum(edit_count) AS edits - FROM user_edits - WHERE user_id <> 0 - GROUP BY country_id, user_id - ), grouped_user_edits AS ( - -- Rank user edit totals per country - SELECT *, - ROW_NUMBER() OVER (PARTITION BY country_id ORDER BY edits DESC) as rank - FROM country_edits - ), json_country_edits AS ( - -- Collapse top ten user edit totals into JSON object - SELECT country_id, - json_agg(json_build_object('user', user_id, 'count', edits)) AS edits - FROM grouped_user_edits - WHERE rank <= 10 - GROUP BY country_id - ), ht_edits AS ( - -- Associate hashtags with aggregate edit counts per country - SELECT cc.id as country_id, - hts.hashtag, - sum(edit_count) as edit_count - FROM (country_counts cc - JOIN hashtags hts ON cc.hashtag_id = hts.id) - GROUP BY cc.id, hts.hashtag - ), grouped_hts AS ( - -- Rank edit counts per country - SELECT *, - ROW_NUMBER() OVER (PARTITION BY country_id ORDER BY edit_count DESC) as rank - FROM ht_edits - ), ht_json AS ( - -- Collapse top ten most active hashtags per country into JSON object - SELECT country_id, - json_agg(json_build_object('hashtag', hashtag, 'count', edit_count)) as hashtag_edits - FROM grouped_hts - WHERE rank <= 10 - GROUP BY country_id - ), agg_stats AS ( - -- Aggregate statistics per country - SELECT cc.id as country_id, - sum(chg.road_km_added) AS road_km_added, - sum(chg.road_km_modified + chg.road_km_deleted) AS road_km_modified, - sum(chg.waterway_km_added) AS waterway_km_added, - sum(chg.waterway_km_modified + chg.waterway_km_deleted) AS waterway_km_modified, - sum(chg.coastline_km_added) AS coastline_km_added, - sum(chg.coastline_km_modified + chg.coastline_km_deleted) AS coastline_km_modified, - sum(chg.roads_added) AS roads_added, - sum(chg.roads_modified + chg.roads_deleted) AS roads_modified, - sum(chg.waterways_added) AS waterways_added, - sum(chg.waterways_modified + chg.waterways_deleted) AS waterways_modified, - sum(chg.coastlines_added) AS coastlines_added, - sum(chg.coastlines_modified + chg.coastlines_deleted) AS coastlines_modified, - sum(chg.buildings_added) AS buildings_added, - sum(chg.buildings_modified + chg.buildings_deleted) AS buildings_modified, - sum(chg.pois_added) AS pois_added, - sum(chg.pois_modified + chg.pois_deleted) AS pois_modified, - max(coalesce(chg.closed_at, chg.created_at)) AS last_edit, - max(COALESCE(chg.closed_at, chg.created_at, chg.updated_at)) AS updated_at, - count(*) AS changeset_count, - sum(cc.edit_count) AS edit_count - FROM (changesets chg - JOIN country_counts cc ON ((cc.changeset_id = chg.id))) - GROUP BY cc.id - ) - SELECT agg.country_id, - countries.name AS country_name, - countries.code AS country_code, - agg.road_km_added, - agg.road_km_modified, - agg.waterway_km_added, - agg.waterway_km_modified, - agg.coastline_km_added, - agg.coastline_km_modified, - agg.roads_added, - agg.roads_modified, - agg.waterways_added, - agg.waterways_modified, - agg.coastlines_added, - agg.coastlines_modified, - agg.buildings_added, - agg.buildings_modified, - agg.pois_added, - agg.pois_modified, - agg.last_edit, - agg.updated_at, - agg.changeset_count, - agg.edit_count, - jce.edits AS user_edit_counts, - hts.hashtag_edits - FROM (agg_stats agg - FULL OUTER JOIN json_country_edits jce ON (agg.country_id = jce.country_id) - FULL OUTER JOIN ht_json hts ON (agg.country_id = hts.country_id) - JOIN countries ON agg.country_id = countries.id); + WITH changesets AS ( + SELECT + * + FROM changesets + -- ignore users 0 and 1 + WHERE user_id > 1 + ), + general AS ( + SELECT + country_id, + max(coalesce(closed_at, created_at)) last_edit, + count(*) changeset_count, + sum(edit_count) edit_count, + max(updated_at) updated_at + FROM changesets + JOIN changesets_countries ON changesets.id = changesets_countries.changeset_id + GROUP BY country_id + ), + processed_changesets AS ( + SELECT + -- TODO include changesets_countries.edit_count as an alternative to changeset count + id, + user_id, + country_id, + measurements, + counts + FROM changesets + JOIN changesets_countries ON changesets.id = changesets_countries.changeset_id + ), + hashtag_counts AS ( + SELECT + -- TODO rank by edit count? + RANK() OVER (PARTITION BY country_id ORDER BY count(*) DESC) AS rank, + country_id, + hashtag, + -- TODO expose edit count instead? + count(*) changesets + FROM processed_changesets + JOIN changesets_hashtags ON processed_changesets.id = changesets_hashtags.changeset_id + JOIN hashtags ON changesets_hashtags.hashtag_id = hashtags.id + GROUP BY country_id, hashtag + ), + hashtags AS ( + SELECT + country_id, + json_object_agg(hashtag, changesets) hashtags + FROM hashtag_counts + WHERE rank <= 10 + GROUP BY country_id + ), + user_counts AS ( + SELECT + -- TODO rank by edit count? + RANK() OVER (PARTITION BY country_id ORDER BY count(*) DESC) AS rank, + country_id, + user_id, + -- TODO expose edit count instead? + count(*) changesets + FROM processed_changesets + GROUP BY country_id, user_id + ), + users AS ( + SELECT + country_id, + json_object_agg(user_id, changesets) users + FROM user_counts + WHERE rank <= 10 + GROUP BY country_id + ), + measurements AS ( + SELECT + id, + country_id, + key, + value + FROM processed_changesets + CROSS JOIN LATERAL jsonb_each(measurements) + ), + aggregated_measurements_kv AS ( + SELECT + country_id, + key, + sum(value::numeric) AS value + FROM measurements + GROUP BY country_id, key + ), + aggregated_measurements AS ( + SELECT + country_id, + json_object_agg(key, value) measurements + FROM aggregated_measurements_kv + GROUP BY country_id + ), + counts AS ( + SELECT + id, + country_id, + key, + value + FROM processed_changesets + CROSS JOIN LATERAL jsonb_each(counts) + ), + aggregated_counts_kv AS ( + SELECT + country_id, + key, + sum(value::numeric) AS value + FROM counts + GROUP BY country_id, key + ), + aggregated_counts AS ( + SELECT + country_id, + json_object_agg(key, value) counts + FROM aggregated_counts_kv + GROUP BY country_id + ) + SELECT + general.country_id, + countries.name country_name, + countries.code country_code, + -- NOTE these are per-changeset, not per-country, so stats are double-counted + measurements, + -- NOTE these are per-changeset, not per-country, so stats are double-counted + counts, + general.changeset_count, + general.edit_count, + general.last_edit, + general.updated_at, + users user_edit_counts, + hashtags hashtag_edits + FROM general + JOIN countries ON country_id = countries.id + LEFT OUTER JOIN users USING (country_id) + LEFT OUTER JOIN hashtags USING (country_id) + LEFT OUTER JOIN aggregated_measurements USING (country_id) + LEFT OUTER JOIN aggregated_counts USING (country_id); CREATE UNIQUE INDEX country_statistics_id ON country_statistics(country_code); diff --git a/sql/hashtag_statistics.sql b/sql/hashtag_statistics.sql index 6a157b9..3129437 100644 --- a/sql/hashtag_statistics.sql +++ b/sql/hashtag_statistics.sql @@ -1,159 +1,107 @@ CREATE MATERIALIZED VIEW hashtag_statistics AS - WITH hashtag_join AS ( - SELECT chg.id, - chg.road_km_added, - chg.road_km_modified, - chg.road_km_deleted, - chg.waterway_km_added, - chg.waterway_km_modified, - chg.waterway_km_deleted, - chg.coastline_km_added, - chg.coastline_km_modified, - chg.coastline_km_deleted, - chg.roads_added, - chg.roads_modified, - chg.roads_deleted, - chg.waterways_added, - chg.waterways_modified, - chg.waterways_deleted, - chg.coastlines_added, - chg.coastlines_modified, - chg.coastlines_deleted, - chg.buildings_added, - chg.buildings_modified, - chg.buildings_deleted, - chg.pois_added, - chg.pois_modified, - chg.pois_deleted, - chg.editor, - chg.user_id, - chg.created_at, - chg.closed_at, - chg.augmented_diffs, - chg.updated_at, - ch.hashtag_id - FROM (changesets chg - JOIN changesets_hashtags ch ON ((ch.changeset_id = chg.id))) - ), tag_usr_counts AS ( - SELECT hj.hashtag_id, - array_agg(DISTINCT users.name) AS names, - users.id AS uid, - count(*) AS edit_count - FROM (users - JOIN hashtag_join hj ON ((hj.user_id = users.id))) - WHERE users.id <> 0 - GROUP BY hj.hashtag_id, users.id - ), named_usr_counts AS ( - SELECT *, unnest(names) as name - FROM tag_usr_counts - ), hashtag_usr_counts AS ( - SELECT hj.hashtag_id, - users.uid AS uid, - array_agg(DISTINCT users.name) AS names, - sum(hj.road_km_added) as road_km_added, - sum(hj.road_km_modified) as road_km_modified, - sum(hj.road_km_deleted) as road_km_deleted, - sum(hj.waterway_km_added) as waterway_km_added, - sum(hj.waterway_km_modified) as waterway_km_modified, - sum(hj.waterway_km_deleted) as waterway_km_deleted, - sum(hj.coastline_km_added) as coastline_km_added, - sum(hj.coastline_km_modified) as coastline_km_modified, - sum(hj.coastline_km_deleted) as coastline_km_deleted, - sum(hj.roads_added) as roads_added, - sum(hj.roads_modified) as roads_modified, - sum(hj.roads_deleted) as roads_deleted, - sum(hj.waterways_added) as waterways_added, - sum(hj.waterways_modified) as waterways_modified, - sum(hj.waterways_deleted) as waterways_deleted, - sum(hj.coastlines_added) as coastlines_added, - sum(hj.coastlines_modified) as coastlines_modified, - sum(hj.coastlines_deleted) as coastlines_deleted, - sum(hj.buildings_added) as buildings_added, - sum(hj.buildings_modified) as buildings_modified, - sum(hj.buildings_deleted) as buildings_deleted, - sum(hj.pois_added) as pois_added, - sum(hj.pois_modified) as pois_modified, - sum(hj.pois_deleted) as pois_deleted, - count(*) AS edit_count - FROM (named_usr_counts users - JOIN hashtag_join hj ON ((hj.user_id = users.uid AND hj.hashtag_id = users.hashtag_id))) - GROUP BY hj.hashtag_id, users.uid - ), usr_json_agg AS ( - SELECT usr_counts.hashtag_id, - json_agg(json_build_object('name', usr_counts.names[1], - 'uid', usr_counts.uid, - 'km_roads_add', usr_counts.road_km_added, - 'km_roads_mod', usr_counts.road_km_modified, - 'km_roads_del', usr_counts.road_km_deleted, - 'km_waterways_add', usr_counts.waterway_km_added, - 'km_waterways_mod', usr_counts.waterway_km_modified, - 'km_waterways_del', usr_counts.waterway_km_deleted, - 'km_coastlines_add', usr_counts.coastline_km_added, - 'km_coastlines_mod', usr_counts.coastline_km_modified, - 'km_coastlines_del', usr_counts.coastline_km_deleted, - 'roads_add', usr_counts.roads_added, - 'roads_mod', usr_counts.roads_modified, - 'roads_del', usr_counts.roads_deleted, - 'waterways_add', usr_counts.waterways_added, - 'waterways_mod', usr_counts.waterways_modified, - 'waterways_del', usr_counts.waterways_deleted, - 'coastlines_add', usr_counts.coastlines_added, - 'coastlines_mod', usr_counts.coastlines_modified, - 'coastlines_del', usr_counts.coastlines_deleted, - 'buildings_add', usr_counts.buildings_added, - 'buildings_mod', usr_counts.buildings_modified, - 'buildings_del', usr_counts.buildings_deleted, - 'poi_add', usr_counts.pois_added, - 'poi_mod', usr_counts.pois_modified, - 'poi_del', usr_counts.pois_deleted, - 'edits', usr_counts.edit_count)) AS users - FROM hashtag_usr_counts usr_counts - GROUP BY usr_counts.hashtag_id - ), without_json AS ( - SELECT ht.hashtag AS tag, - ht.id AS hashtag_id, - (('hashtag/'::text || ht.hashtag) || '/{z}/{x}/{y}.mvt'::text) AS extent_uri, - sum(hashtag_join.buildings_added) AS buildings_added, - sum(hashtag_join.buildings_modified + hashtag_join.buildings_deleted) AS buildings_modified, - sum(hashtag_join.roads_added) AS roads_added, - sum(hashtag_join.road_km_added) AS road_km_added, - sum(hashtag_join.roads_modified + hashtag_join.roads_deleted) AS roads_modified, - sum(hashtag_join.road_km_modified + hashtag_join.road_km_deleted) AS road_km_modified, - sum(hashtag_join.waterways_added) AS waterways_added, - sum(hashtag_join.waterway_km_added) AS waterway_km_added, - sum(hashtag_join.waterways_modified + hashtag_join.waterways_deleted) AS waterways_modified, - sum(hashtag_join.waterway_km_modified + hashtag_join.waterway_km_deleted) AS waterway_km_modified, - sum(hashtag_join.coastlines_added) AS coastlines_added, - sum(hashtag_join.coastline_km_added) AS coastline_km_added, - sum(hashtag_join.coastlines_modified + hashtag_join.coastlines_deleted) AS coastlines_modified, - sum(hashtag_join.coastline_km_modified + hashtag_join.coastline_km_deleted) AS coastline_km_modified, - sum(hashtag_join.pois_added) AS pois_added, - sum(hashtag_join.pois_modified + hashtag_join.pois_deleted) AS pois_modified - FROM (hashtags ht - JOIN hashtag_join ON ((ht.id = hashtag_join.hashtag_id))) - GROUP BY ht.id, ht.hashtag - ) - SELECT without_json.tag, - without_json.hashtag_id, - without_json.extent_uri, - without_json.buildings_added, - without_json.buildings_modified, - without_json.roads_added, - without_json.road_km_added, - without_json.roads_modified, - without_json.road_km_modified, - without_json.waterways_added, - without_json.waterway_km_added, - without_json.waterways_modified, - without_json.waterway_km_modified, - without_json.coastlines_added, - without_json.coastline_km_added, - without_json.coastlines_modified, - without_json.coastline_km_modified, - without_json.pois_added, - without_json.pois_modified, - usr_json_agg.users - FROM (without_json - JOIN usr_json_agg ON ((without_json.hashtag_id = usr_json_agg.hashtag_id))); + WITH general AS ( + SELECT + hashtag_id, + max(coalesce(closed_at, created_at)) last_edit, + count(*) changeset_count, + sum(total_edits) edit_count, + max(updated_at) updated_at + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + GROUP BY hashtag_id + ), + processed_changesets AS ( + SELECT + id, + user_id, + hashtag_id, + measurements, + counts + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + ), + user_counts AS ( + SELECT + -- TODO rank by edit count? + RANK() OVER (PARTITION BY hashtag_id ORDER BY count(*) DESC) AS rank, + hashtag_id, + user_id, + -- TODO expose edit count instead? + count(*) changesets + FROM processed_changesets + GROUP BY hashtag_id, user_id + ), + users AS ( + SELECT + hashtag_id, + json_object_agg(user_id, changesets) users + FROM user_counts + WHERE rank <= 10 + GROUP BY hashtag_id + ), + measurements AS ( + SELECT + id, + hashtag_id, + key, + value + FROM processed_changesets + CROSS JOIN LATERAL jsonb_each(measurements) + ), + aggregated_measurements_kv AS ( + SELECT + hashtag_id, + key, + sum(value::numeric) AS value + FROM measurements + GROUP BY hashtag_id, key + ), + aggregated_measurements AS ( + SELECT + hashtag_id, + json_object_agg(key, value) measurements + FROM aggregated_measurements_kv + GROUP BY hashtag_id + ), + counts AS ( + SELECT + id, + hashtag_id, + key, + value + FROM processed_changesets + CROSS JOIN LATERAL jsonb_each(counts) + ), + aggregated_counts_kv AS ( + SELECT + hashtag_id, + key, + sum(value::numeric) AS value + FROM counts + GROUP BY hashtag_id, key + ), + aggregated_counts AS ( + SELECT + hashtag_id, + json_object_agg(key, value) counts + FROM aggregated_counts_kv + GROUP BY hashtag_id + ) + SELECT + hashtags.hashtag tag, + general.hashtag_id, + 'hashtag/' || general.hashtag_id || '/{z}/{x}/{y}.mvt' AS extent_uri, + measurements, + counts, + general.changeset_count, + general.edit_count, + general.last_edit, + general.updated_at, + users + FROM general + JOIN hashtags ON hashtag_id = hashtags.id + LEFT OUTER JOIN users USING (hashtag_id) + LEFT OUTER JOIN aggregated_measurements USING (hashtag_id) + LEFT OUTER JOIN aggregated_counts USING (hashtag_id); -CREATE UNIQUE INDEX hashtag_statistics_hashtag_id ON hashtag_statistics(hashtag_id); +CREATE UNIQUE INDEX hashtag_statistics_hashtag_id ON hashtag_statistics(hashtag_id); \ No newline at end of file diff --git a/sql/user_statistics.sql b/sql/user_statistics.sql index 9b20e56..7aa44ef 100644 --- a/sql/user_statistics.sql +++ b/sql/user_statistics.sql @@ -1,143 +1,154 @@ CREATE MATERIALIZED VIEW user_statistics AS - WITH country_counts AS ( - SELECT cc.changeset_id, - countries.name, - cc.edit_count - FROM (changesets_countries cc - JOIN countries ON ((cc.country_id = countries.id))) - ), chgset_country_counts AS ( - SELECT chg.user_id, - country_counts.name, - sum(country_counts.edit_count) AS edit_count - FROM (country_counts - JOIN changesets chg ON ((country_counts.changeset_id = chg.id))) - GROUP BY chg.user_id, country_counts.name - ), usr_country_counts AS ( - SELECT chgset_country_counts.user_id, - json_agg(json_build_object('name', chgset_country_counts.name, 'count', chgset_country_counts.edit_count)) AS country_json - FROM chgset_country_counts - GROUP BY chgset_country_counts.user_id - ), day_counts AS ( - SELECT chg.user_id, - to_char(date_trunc('day'::text, chg.created_at), 'YYYY-MM-DD'::text) AS day, - count(*) AS cnt - FROM changesets chg - WHERE (chg.created_at IS NOT NULL) - GROUP BY chg.user_id, (date_trunc('day'::text, chg.created_at)) - ), usr_day_counts AS ( - SELECT day_counts.user_id, - json_agg(json_build_object('day', day_counts.day, 'count', day_counts.cnt)) AS day_json - FROM day_counts - GROUP BY day_counts.user_id - ), editor_counts AS ( - SELECT chg.user_id, - chg.editor, - count(*) AS cnt - FROM changesets chg - WHERE (chg.editor IS NOT NULL) - GROUP BY chg.user_id, chg.editor - ), usr_editor_counts AS ( - SELECT editor_counts.user_id, - json_agg(json_build_object('editor', editor_counts.editor, 'count', editor_counts.cnt)) AS editor_json - FROM editor_counts - GROUP BY editor_counts.user_id - ), hashtag_counts AS ( - SELECT ch.changeset_id, - hashtags.hashtag, - count(*) AS edit_count - FROM (changesets_hashtags ch - JOIN hashtags ON ((ch.hashtag_id = hashtags.id))) - GROUP BY ch.changeset_id, hashtags.hashtag - ), chgset_ht_counts AS ( - SELECT chg.user_id, - hashtag_counts.hashtag, - count(*) AS cnt - FROM (changesets chg - JOIN hashtag_counts ON ((chg.id = hashtag_counts.changeset_id))) - GROUP BY chg.user_id, hashtag_counts.hashtag - ), usr_hashtag_counts AS ( - SELECT chgset_ht_counts.user_id, - json_agg(json_build_object('tag', chgset_ht_counts.hashtag, 'count', chgset_ht_counts.cnt)) AS hashtag_json - FROM chgset_ht_counts - GROUP BY chgset_ht_counts.user_id - ), agg_stats AS ( - SELECT usr.id, - usr.name, - (('user/'::text || usr.id) || '/{z}/{x}/{y}.mvt'::text) AS extent_uri, - array_agg(chg.id) AS changesets, - sum(chg.road_km_added) AS road_km_added, - sum(chg.road_km_modified) AS road_km_modified, - sum(chg.road_km_deleted) AS road_km_deleted, - sum(chg.waterway_km_added) AS waterway_km_added, - sum(chg.waterway_km_modified) AS waterway_km_modified, - sum(chg.waterway_km_deleted) AS waterway_km_deleted, - sum(chg.coastline_km_added) AS coastline_km_added, - sum(chg.coastline_km_modified) AS coastline_km_modified, - sum(chg.coastline_km_deleted) AS coastline_km_deleted, - sum(chg.roads_added) AS roads_added, - sum(chg.roads_modified) AS roads_modified, - sum(chg.roads_deleted) AS roads_deleted, - sum(chg.waterways_added) AS waterways_added, - sum(chg.waterways_modified) AS waterways_modified, - sum(chg.waterways_deleted) AS waterways_deleted, - sum(chg.coastlines_added) AS coastlines_added, - sum(chg.coastlines_modified) AS coastlines_modified, - sum(chg.coastlines_deleted) AS coastlines_deleted, - sum(chg.buildings_added) AS buildings_added, - sum(chg.buildings_modified) AS buildings_modified, - sum(chg.buildings_deleted) AS buildings_deleted, - sum(chg.pois_added) AS pois_added, - sum(chg.pois_modified) AS pois_modified, - sum(chg.pois_deleted) AS pois_deleted, - max(coalesce(chg.closed_at, chg.created_at)) AS last_edit, - count(*) AS changeset_count, - count(*) AS edit_count, - max(COALESCE(chg.closed_at, chg.created_at, chg.updated_at)) AS updated_at - FROM (changesets chg - JOIN users usr ON ((chg.user_id = usr.id))) - WHERE (chg.user_id IS NOT NULL) - GROUP BY usr.id, usr.name - ) - SELECT agg_stats.id, - agg_stats.name, - agg_stats.extent_uri, - agg_stats.changesets, - agg_stats.road_km_added, - agg_stats.road_km_modified, - agg_stats.road_km_deleted, - agg_stats.waterway_km_added, - agg_stats.waterway_km_modified, - agg_stats.waterway_km_deleted, - agg_stats.coastline_km_added, - agg_stats.coastline_km_modified, - agg_stats.coastline_km_deleted, - agg_stats.roads_added, - agg_stats.roads_modified, - agg_stats.roads_deleted, - agg_stats.waterways_added, - agg_stats.waterways_modified, - agg_stats.waterways_deleted, - agg_stats.coastlines_added, - agg_stats.coastlines_modified, - agg_stats.coastlines_deleted, - agg_stats.buildings_added, - agg_stats.buildings_modified, - agg_stats.buildings_deleted, - agg_stats.pois_added, - agg_stats.pois_modified, - agg_stats.pois_deleted, - agg_stats.last_edit, - agg_stats.changeset_count, - agg_stats.edit_count, - usr_editor_counts.editor_json AS editors, - usr_day_counts.day_json AS edit_times, - coalesce(usr_country_counts.country_json, '[]') AS country_list, - coalesce(usr_hashtag_counts.hashtag_json, '[]') AS hashtags, - agg_stats.updated_at - FROM ((((agg_stats - LEFT JOIN usr_country_counts ON ((agg_stats.id = usr_country_counts.user_id))) - LEFT JOIN usr_hashtag_counts ON ((agg_stats.id = usr_hashtag_counts.user_id))) - LEFT JOIN usr_day_counts ON ((agg_stats.id = usr_day_counts.user_id))) - LEFT JOIN usr_editor_counts ON ((agg_stats.id = usr_editor_counts.user_id))); + WITH general AS ( + SELECT + user_id, + array_agg(id) changesets, + max(coalesce(closed_at, created_at)) last_edit, + count(*) changeset_count, + sum(total_edits) edit_count, + max(updated_at) updated_at + FROM changesets + GROUP BY user_id + ), + country_counts AS ( + SELECT + user_id, + code, + count(*) changesets + FROM changesets + JOIN changesets_countries ON changesets.id = changesets_countries.changeset_id + JOIN countries ON changesets_countries.country_id = countries.id + GROUP BY user_id, code + ), + countries AS ( + SELECT + user_id, + json_object_agg(code, changesets) countries + FROM country_counts + GROUP BY user_id + ), + edit_time_counts AS ( + SELECT + user_id, + date_trunc('day', coalesce(closed_at, created_at))::date AS day, + count(*) changesets + FROM changesets + GROUP BY user_id, day + ), + edit_times AS ( + SELECT + user_id, + json_object_agg(day, changesets) edit_times + FROM edit_time_counts + GROUP BY user_id + ), + editor_counts AS ( + SELECT + user_id, + editor, + count(*) changesets + FROM changesets + WHERE editor IS NOT NULL + GROUP BY user_id, editor + ), + editors AS ( + SELECT + user_id, + json_object_agg(editor, changesets) editors + FROM editor_counts + GROUP BY user_id + ), + hashtag_counts AS ( + SELECT + user_id, + hashtag, + count(*) changesets + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + JOIN hashtags ON changesets_hashtags.hashtag_id = hashtags.id + GROUP BY user_id, hashtag + ), + hashtags AS ( + SELECT + user_id, + json_object_agg(hashtag, changesets) hashtags + FROM hashtag_counts + GROUP BY user_id + ), + measurements AS ( + SELECT + id, + user_id, + key, + value + FROM changesets + CROSS JOIN LATERAL jsonb_each(measurements) + ), + aggregated_measurements_kv AS ( + SELECT + user_id, + key, + sum(value::numeric) AS value + FROM measurements + GROUP BY user_id, key + ), + aggregated_measurements AS ( + SELECT + user_id, + json_object_agg(key, value) measurements + FROM aggregated_measurements_kv + GROUP BY user_id + ), + counts AS ( + SELECT + id, + user_id, + key, + value + FROM changesets + CROSS JOIN LATERAL jsonb_each(counts) + ), + aggregated_counts_kv AS ( + SELECT + user_id, + key, + sum(value::numeric) AS value + FROM counts + GROUP BY user_id, key + ), + aggregated_counts AS ( + SELECT + user_id, + json_object_agg(key, value) counts + FROM aggregated_counts_kv + GROUP BY user_id + ) + SELECT + user_id AS id, + users.name, + 'user/' || users.id || '/{z}/{x}/{y}.mvt' AS extent_uri, + -- TODO this is unbounded; drop it? + changesets, + measurements, + counts, + last_edit, + changeset_count, + edit_count, + -- TODO this is unbounded; top N? + editors, + edit_times, + -- TODO top N? + countries, + -- TODO top N? + hashtags, + updated_at + FROM general + LEFT OUTER JOIN countries USING (user_id) + LEFT OUTER JOIN editors USING (user_id) + LEFT OUTER JOIN edit_times USING (user_id) + LEFT OUTER JOIN hashtags USING (user_id) + LEFT OUTER JOIN aggregated_measurements USING (user_id) + LEFT OUTER JOIN aggregated_counts USING (user_id) + JOIN users ON user_id = users.id; CREATE UNIQUE INDEX user_statistics_id ON user_statistics(id); From 20e4b330b02f424d860b10ccc733d79add5a405e Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Mon, 17 Jun 2019 16:25:51 -0700 Subject: [PATCH 02/22] refreshments schema --- sql/refreshments.sql | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 sql/refreshments.sql diff --git a/sql/refreshments.sql b/sql/refreshments.sql new file mode 100644 index 0000000..1b933f8 --- /dev/null +++ b/sql/refreshments.sql @@ -0,0 +1,7 @@ +CREATE TABLE refreshments ( + mat_view text NOT NULL, + updated_at timestamp with time zone, + PRIMARY KEY(mat_view) +); + +INSERT INTO refreshments VALUES ('user_statistics', to_timestamp(0)), ('country_statistics', to_timestamp(0)), ('hashtag_statistics', to_timestamp(0)); \ No newline at end of file From efecee26e2cfa442ebaeb16556808e3d5ed48179 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Mon, 17 Jun 2019 16:26:21 -0700 Subject: [PATCH 03/22] Generate JSONB tables --- sql/country_statistics.sql | 8 ++++---- sql/hashtag_statistics.sql | 6 +++--- sql/user_statistics.sql | 12 ++++++------ 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/country_statistics.sql b/sql/country_statistics.sql index 346e7d2..710d0fc 100644 --- a/sql/country_statistics.sql +++ b/sql/country_statistics.sql @@ -44,7 +44,7 @@ CREATE MATERIALIZED VIEW country_statistics AS hashtags AS ( SELECT country_id, - json_object_agg(hashtag, changesets) hashtags + jsonb_object_agg(hashtag, changesets) hashtags FROM hashtag_counts WHERE rank <= 10 GROUP BY country_id @@ -63,7 +63,7 @@ CREATE MATERIALIZED VIEW country_statistics AS users AS ( SELECT country_id, - json_object_agg(user_id, changesets) users + jsonb_object_agg(user_id, changesets) users FROM user_counts WHERE rank <= 10 GROUP BY country_id @@ -88,7 +88,7 @@ CREATE MATERIALIZED VIEW country_statistics AS aggregated_measurements AS ( SELECT country_id, - json_object_agg(key, value) measurements + jsonb_object_agg(key, value) measurements FROM aggregated_measurements_kv GROUP BY country_id ), @@ -112,7 +112,7 @@ CREATE MATERIALIZED VIEW country_statistics AS aggregated_counts AS ( SELECT country_id, - json_object_agg(key, value) counts + jsonb_object_agg(key, value) counts FROM aggregated_counts_kv GROUP BY country_id ) diff --git a/sql/hashtag_statistics.sql b/sql/hashtag_statistics.sql index 3129437..47ab69d 100644 --- a/sql/hashtag_statistics.sql +++ b/sql/hashtag_statistics.sql @@ -34,7 +34,7 @@ CREATE MATERIALIZED VIEW hashtag_statistics AS users AS ( SELECT hashtag_id, - json_object_agg(user_id, changesets) users + jsonb_object_agg(user_id, changesets) users FROM user_counts WHERE rank <= 10 GROUP BY hashtag_id @@ -59,7 +59,7 @@ CREATE MATERIALIZED VIEW hashtag_statistics AS aggregated_measurements AS ( SELECT hashtag_id, - json_object_agg(key, value) measurements + jsonb_object_agg(key, value) measurements FROM aggregated_measurements_kv GROUP BY hashtag_id ), @@ -83,7 +83,7 @@ CREATE MATERIALIZED VIEW hashtag_statistics AS aggregated_counts AS ( SELECT hashtag_id, - json_object_agg(key, value) counts + jsonb_object_agg(key, value) counts FROM aggregated_counts_kv GROUP BY hashtag_id ) diff --git a/sql/user_statistics.sql b/sql/user_statistics.sql index 7aa44ef..f64b217 100644 --- a/sql/user_statistics.sql +++ b/sql/user_statistics.sql @@ -23,7 +23,7 @@ CREATE MATERIALIZED VIEW user_statistics AS countries AS ( SELECT user_id, - json_object_agg(code, changesets) countries + jsonb_object_agg(code, changesets) countries FROM country_counts GROUP BY user_id ), @@ -38,7 +38,7 @@ CREATE MATERIALIZED VIEW user_statistics AS edit_times AS ( SELECT user_id, - json_object_agg(day, changesets) edit_times + jsonb_object_agg(day, changesets) edit_times FROM edit_time_counts GROUP BY user_id ), @@ -54,7 +54,7 @@ CREATE MATERIALIZED VIEW user_statistics AS editors AS ( SELECT user_id, - json_object_agg(editor, changesets) editors + jsonb_object_agg(editor, changesets) editors FROM editor_counts GROUP BY user_id ), @@ -71,7 +71,7 @@ CREATE MATERIALIZED VIEW user_statistics AS hashtags AS ( SELECT user_id, - json_object_agg(hashtag, changesets) hashtags + jsonb_object_agg(hashtag, changesets) hashtags FROM hashtag_counts GROUP BY user_id ), @@ -95,7 +95,7 @@ CREATE MATERIALIZED VIEW user_statistics AS aggregated_measurements AS ( SELECT user_id, - json_object_agg(key, value) measurements + jsonb_object_agg(key, value) measurements FROM aggregated_measurements_kv GROUP BY user_id ), @@ -119,7 +119,7 @@ CREATE MATERIALIZED VIEW user_statistics AS aggregated_counts AS ( SELECT user_id, - json_object_agg(key, value) counts + jsonb_object_agg(key, value) counts FROM aggregated_counts_kv GROUP BY user_id ) From df239a39296861838e7b426381d509b3255049e7 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Mon, 17 Jun 2019 16:26:48 -0700 Subject: [PATCH 04/22] Drop unnecessary tables --- sql/hashtag_statistics.sql | 1 - sql/user_statistics.sql | 2 -- 2 files changed, 3 deletions(-) diff --git a/sql/hashtag_statistics.sql b/sql/hashtag_statistics.sql index 47ab69d..12a38aa 100644 --- a/sql/hashtag_statistics.sql +++ b/sql/hashtag_statistics.sql @@ -90,7 +90,6 @@ CREATE MATERIALIZED VIEW hashtag_statistics AS SELECT hashtags.hashtag tag, general.hashtag_id, - 'hashtag/' || general.hashtag_id || '/{z}/{x}/{y}.mvt' AS extent_uri, measurements, counts, general.changeset_count, diff --git a/sql/user_statistics.sql b/sql/user_statistics.sql index f64b217..67f3a39 100644 --- a/sql/user_statistics.sql +++ b/sql/user_statistics.sql @@ -127,8 +127,6 @@ CREATE MATERIALIZED VIEW user_statistics AS user_id AS id, users.name, 'user/' || users.id || '/{z}/{x}/{y}.mvt' AS extent_uri, - -- TODO this is unbounded; drop it? - changesets, measurements, counts, last_edit, From 6ed90fd8050255d953ac86c86c071c3392498a58 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Mon, 17 Jun 2019 16:27:09 -0700 Subject: [PATCH 05/22] Allow password to be omitted --- src/main/resources/application.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index a34c726..43bde71 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -9,6 +9,7 @@ database { driver = ${?DB_DRIVER} url = ${?DB_URL} user = ${?DB_USER} + password = "" password = ${?DB_PASS} } From a9cd26df64233ca69709e8887bce2f781ac0e16d Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Mon, 17 Jun 2019 16:28:31 -0700 Subject: [PATCH 06/22] Misc: see full message * Rename StatsRouter -> DefaultRouter * Enable CORS * Fix ChangesetCountry model / endpoint * Add support for JSONB columns * Make page sizes consistent * Make pagination 1-based * Make output formatting consistent (snake_case) * Output Timestamps as ISO-8601 strings (using Instants internally) * scalafmt --- src/main/scala/osmesa/server/Config.scala | 29 ++-- src/main/scala/osmesa/server/Database.scala | 7 +- .../StatsRouter.scala => DefaultRouter.scala} | 66 +++++---- src/main/scala/osmesa/server/Server.scala | 33 ++--- .../scala/osmesa/server/model/Changeset.scala | 81 +++++------ .../server/model/ChangesetCountry.scala | 41 +++--- .../scala/osmesa/server/model/Country.scala | 28 ++-- .../scala/osmesa/server/model/Hashtag.scala | 36 ++--- .../osmesa/server/model/OsmStatError.scala | 7 +- src/main/scala/osmesa/server/model/Page.scala | 12 +- src/main/scala/osmesa/server/model/User.scala | 27 ++-- src/main/scala/osmesa/server/package.scala | 27 +++- .../osmesa/server/stats/CountryStats.scala | 100 ++++++------- .../osmesa/server/stats/HashtagStats.scala | 106 ++++---------- .../osmesa/server/stats/RefreshStats.scala | 59 +++----- .../scala/osmesa/server/stats/UserStats.scala | 131 +++++------------- .../scala/osmesa/server/stats/package.scala | 32 ----- .../osmesa/server/tile/TileLayouts.scala | 10 +- .../scala/osmesa/server/tile/TileRouter.scala | 93 +++++++------ 19 files changed, 380 insertions(+), 545 deletions(-) rename src/main/scala/osmesa/server/{stats/StatsRouter.scala => DefaultRouter.scala} (67%) delete mode 100644 src/main/scala/osmesa/server/stats/package.scala diff --git a/src/main/scala/osmesa/server/Config.scala b/src/main/scala/osmesa/server/Config.scala index d7565d5..d5ee914 100644 --- a/src/main/scala/osmesa/server/Config.scala +++ b/src/main/scala/osmesa/server/Config.scala @@ -2,25 +2,34 @@ package osmesa.server import cats.effect.IO import com.typesafe.config.ConfigFactory +import pureconfig._ import pureconfig.error.ConfigReaderException - -case class Config(server: Config.Server, database: Config.Database, tiles: Config.Tiles) - +case class Config(server: Config.Server, + database: Config.Database, + tiles: Config.Tiles) object Config { - case class Database(driver: String, url: String, user: String, password: String) - case class Server(host: String, port: Int) - case class Tiles(s3bucket: String, s3prefix: String, s3suffix: Option[String], chunkSize: Int, gzipped: Boolean) - - import pureconfig._ - def load(configFile: String = "application.conf"): IO[Config] = { IO { loadConfig[Config](ConfigFactory.load(configFile)) }.flatMap { - case Left(e) => IO.raiseError[Config](new ConfigReaderException[Config](e)) + case Left(e) => + IO.raiseError[Config](new ConfigReaderException[Config](e)) case Right(config) => IO.pure(config) } } + + case class Database(driver: String, + url: String, + user: String, + password: String) + + case class Server(host: String, port: Int) + + case class Tiles(s3bucket: String, + s3prefix: String, + s3suffix: Option[String], + chunkSize: Int, + gzipped: Boolean) } diff --git a/src/main/scala/osmesa/server/Database.scala b/src/main/scala/osmesa/server/Database.scala index f4dd897..77ee04a 100644 --- a/src/main/scala/osmesa/server/Database.scala +++ b/src/main/scala/osmesa/server/Database.scala @@ -7,7 +7,12 @@ import org.flywaydb.core.Flyway object Database { def transactor(dbconfig: Config.Database): IO[HikariTransactor[IO]] = { - HikariTransactor.newHikariTransactor[IO](dbconfig.driver, dbconfig.url, dbconfig.user, dbconfig.password) + HikariTransactor.newHikariTransactor[IO]( + dbconfig.driver, + dbconfig.url, + dbconfig.user, + dbconfig.password + ) } def initialize(transactor: HikariTransactor[IO]): IO[Unit] = { diff --git a/src/main/scala/osmesa/server/stats/StatsRouter.scala b/src/main/scala/osmesa/server/DefaultRouter.scala similarity index 67% rename from src/main/scala/osmesa/server/stats/StatsRouter.scala rename to src/main/scala/osmesa/server/DefaultRouter.scala index da8ca00..98ea05e 100644 --- a/src/main/scala/osmesa/server/stats/StatsRouter.scala +++ b/src/main/scala/osmesa/server/DefaultRouter.scala @@ -1,45 +1,24 @@ -package osmesa.server.stats - -import osmesa.server.model._ +package osmesa.server import cats.effect._ import doobie.Transactor import io.circe._ import io.circe.syntax._ -import fs2._ -import fs2.StreamApp.ExitCode -import org.http4s.circe._ import org.http4s._ +import org.http4s.circe._ import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ - - -class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { - - private def eitherResult[Result: Encoder](result: Either[OsmStatError, Result]) = { - result match { - case Right(succ) => Ok(succ.asJson, `Content-Type`(MediaType.`application/json`)) - case Left(err) => NotFound(err.toString) - } - } - - implicit val xa: Transactor[IO] = trans - - object OptionalPageQueryParamMatcher extends OptionalQueryParamDecoderMatcher[Int]("page") +import org.http4s.headers.`Content-Type` +import osmesa.server.model._ +import osmesa.server.stats.{CountryStats, HashtagStats, RefreshStats, UserStats} +class DefaultRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { def routes: HttpService[IO] = HttpService[IO] { case GET -> Root => Ok("hello world") case GET -> Root / "users" :? OptionalPageQueryParamMatcher(pageNum) => - Ok(UserStats.getPage(pageNum.getOrElse(0)).map(_.asJson)) + Ok(UserStats.getPage(pageNum.getOrElse(1)).map(_.asJson)) case GET -> Root / "users" / IntVar(userId) => for { @@ -49,7 +28,7 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { // Too many results. The data will get where it needs to go (streamed, chunked response) but the client might well crash case GET -> Root / "changesets" :? OptionalPageQueryParamMatcher(pageNum) => - Ok(Changeset.getPage(pageNum.getOrElse(0)).map(_.asJson)) + Ok(Changeset.getPage(pageNum.getOrElse(1)).map(_.asJson)) case GET -> Root / "changesets" / LongVar(changesetId) => for { @@ -59,7 +38,7 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { case GET -> Root / "campaigns" :? OptionalPageQueryParamMatcher(pageNum) => for { - io <- HashtagStats.getPage(pageNum.getOrElse(0)) + io <- HashtagStats.getPage(pageNum.getOrElse(1)) res <- eitherResult(io) } yield res @@ -70,7 +49,7 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { } yield result case GET -> Root / "countries" :? OptionalPageQueryParamMatcher(pageNum) => - Ok(Country.getPage(pageNum.getOrElse(0)).map(_.asJson)) + Ok(Country.getPage(pageNum.getOrElse(1)).map(_.asJson)) case GET -> Root / "countries" / IntVar(countryId) => for { @@ -84,10 +63,14 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { result <- eitherResult(io) } yield result - case GET -> Root / "changesets-countries" :? OptionalPageQueryParamMatcher(pageNum) => - Ok(ChangesetCountry.getPage(pageNum.getOrElse(0)).map(_.asJson)) + case GET -> Root / "changesets-countries" :? OptionalPageQueryParamMatcher( + pageNum + ) => + Ok(ChangesetCountry.getPage(pageNum.getOrElse(1)).map(_.asJson)) - case GET -> Root / "changesets-countries" / IntVar(changesetId) / IntVar(countryId) => + case GET -> Root / "changesets-countries" / IntVar(changesetId) / IntVar( + countryId + ) => for { io <- ChangesetCountry.byId(changesetId, countryId) changesetCountry <- eitherResult(io) @@ -99,4 +82,19 @@ class StatsRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { result <- eitherResult(Right(io)) } yield result } + + implicit val xa: Transactor[IO] = trans + + private def eitherResult[Result: Encoder]( + result: Either[OsmStatError, Result] + ) = { + result match { + case Right(succ) => + Ok(succ.asJson, `Content-Type`(MediaType.`application/json`)) + case Left(err) => NotFound(err.toString) + } + } + + object OptionalPageQueryParamMatcher + extends OptionalQueryParamDecoderMatcher[Int]("page") } diff --git a/src/main/scala/osmesa/server/Server.scala b/src/main/scala/osmesa/server/Server.scala index 0184e76..5d3a404 100644 --- a/src/main/scala/osmesa/server/Server.scala +++ b/src/main/scala/osmesa/server/Server.scala @@ -1,26 +1,17 @@ package osmesa.server -import osmesa.server.model._ -import osmesa.server.stats._ -import osmesa.server.tile._ - import cats.effect._ -import doobie.Transactor -import io.circe._ -import io.circe.syntax._ -import fs2._ import fs2.StreamApp.ExitCode -import org.http4s.circe._ +import fs2._ import org.http4s._ -import org.http4s.server.blaze.BlazeBuilder import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} +import org.http4s.server.blaze.BlazeBuilder +import org.http4s.server.middleware.{CORS, CORSConfig} +import osmesa.server.tile._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ - object Server extends StreamApp[IO] { private val corsConfig = CORSConfig( @@ -31,24 +22,24 @@ object Server extends StreamApp[IO] { maxAge = 1.day.toSeconds ) - private val middleware: HttpMiddleware[IO] = { (routes: HttpService[IO]) => - CORS(routes) + private val middleware: HttpMiddleware[IO] = { routes: HttpService[IO] => + CORS(routes, corsConfig) } - def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] = { + def stream(args: List[String], + requestShutdown: IO[Unit]): Stream[IO, ExitCode] = { for { - config <- Stream.eval(Config.load()) + config <- Stream.eval(Config.load()) transactor <- Stream.eval(Database.transactor(config.database)) //_ <- Stream.eval(Database.initialize(transactor)) - stats = middleware(new StatsRouter(transactor).routes) + default = middleware(new DefaultRouter(transactor).routes) tiles = middleware(new TileRouter(config.tiles).routes) - exitCode <- BlazeBuilder[IO] + exitCode <- BlazeBuilder[IO] .enableHttp2(true) .bindHttp(config.server.port, config.server.host) - .mountService(stats, "/") + .mountService(default, "/") .mountService(tiles, "/tiles") .serve } yield exitCode } } - diff --git a/src/main/scala/osmesa/server/model/Changeset.scala b/src/main/scala/osmesa/server/model/Changeset.scala index 0235ed3..860bbd9 100644 --- a/src/main/scala/osmesa/server/model/Changeset.scala +++ b/src/main/scala/osmesa/server/model/Changeset.scala @@ -1,73 +1,62 @@ package osmesa.server.model +import java.time.Instant + +import cats.effect._ import doobie._ import doobie.implicits._ -import doobie.postgres._ import doobie.postgres.implicits._ -import cats.effect._ import io.circe._ -import io.circe.generic.semiauto._ +import io.circe.generic.extras.semiauto._ import io.circe.java8.time._ +import osmesa.server._ -import java.time.LocalDate - - -case class Changeset( - id: Long, - kmRoadsAdd: Option[Double], - kmRoadsMod: Option[Double], - kmWaterwaysAdd: Option[Double], - kmWaterwaysMod: Option[Double], - kmCoastlinesAdd: Option[Double], - kmCoastlinesMod: Option[Double], - roadsAdd: Option[Int], - roadsMod: Option[Int], - waterwaysAdd: Option[Int], - waterwaysMod: Option[Int], - coastlinesAdd: Option[Int], - coastlinesMod: Option[Int], - buildingsAdd: Option[Int], - buildingsMod: Option[Int], - poiAdd: Option[Int], - poiMod: Option[Int], - editor: Option[String], - userId: Option[Int], - createdAt: Option[LocalDate], - closedAt: Option[LocalDate], - augmentedDiffs: Option[Array[Int]], - updatedAt: Option[LocalDate] -) - - -object Changeset { +case class Changeset(id: Long, + measurements: Json, + counts: Json, + editor: Option[String], + userId: Option[Int], + createdAt: Option[Instant], + closedAt: Option[Instant], + augmentedDiffs: Option[Array[Int]], + updatedAt: Option[Instant]) - implicit val changesetDecoder: Decoder[Changeset] = deriveDecoder - implicit val changesetEncoder: Encoder[Changeset] = deriveEncoder +object Changeset extends Implicits { + implicit val changesetDecoder: Decoder[Changeset] = deriveDecoder[Changeset] + implicit val changesetEncoder: Encoder[Changeset] = deriveEncoder[Changeset] private val selectF = fr""" SELECT - id, road_km_added, road_km_modified, waterway_km_added, waterway_km_modified, - coastline_km_added, coastline_km_modified, roads_added, roads_modified, - waterways_added, waterways_modified, coastlines_added, coastlines_modified, - buildings_added, buildings_modified, pois_added, pois_modified, editor, user_id, - created_at, closed_at, augmented_diffs, updated_at + id, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, + editor, + user_id, + created_at, + closed_at, + augmented_diffs, + updated_at FROM changesets """ - def byId(id: Long)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Changeset]] = + def byId( + id: Long + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Changeset]] = (selectF ++ fr"WHERE id = $id") .query[Changeset] .option .transact(xa) .map { case Some(changeset) => Right(changeset) - case None => Left(IdNotFoundError("changeset", id)) + case None => Left(IdNotFoundError("changeset", id)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[Changeset]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY id ASC LIMIT 10 OFFSET $offset;") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[Changeset]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset;") .query[Changeset] .to[List] .map({ ResultPage(_, pageNum) }) diff --git a/src/main/scala/osmesa/server/model/ChangesetCountry.scala b/src/main/scala/osmesa/server/model/ChangesetCountry.scala index 8bc7c02..67411e4 100644 --- a/src/main/scala/osmesa/server/model/ChangesetCountry.scala +++ b/src/main/scala/osmesa/server/model/ChangesetCountry.scala @@ -1,48 +1,51 @@ package osmesa.server.model +import cats.effect._ import doobie._ import doobie.implicits._ -import cats.effect._ import io.circe._ import io.circe.generic.semiauto._ +import osmesa.server._ +case class ChangesetCountry(changesetId: Int, countryId: Int, editCount: Int) -case class ChangesetCountry( - changesetId: Int, - countryId: Int, - editCount: Int -) - +object ChangesetCountry extends Implicits { -object ChangesetCountry { - - implicit val changesetCountryDecoder: Decoder[ChangesetCountry] = deriveDecoder - implicit val changesetCountryEncoder: Encoder[ChangesetCountry] = deriveEncoder + implicit val changesetCountryDecoder: Decoder[ChangesetCountry] = + deriveDecoder + implicit val changesetCountryEncoder: Encoder[ChangesetCountry] = + deriveEncoder private val selectF = fr""" SELECT - changeset_id, country_id, edit_count + changeset_id, + country_id, + edit_count FROM changesets_countries """ - def byId(changesetId: Int, countryId: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, ChangesetCountry]] = - (selectF ++ fr"WHERE changeset_id = $changesetId AND country_id == $countryId") + def byId(changesetId: Int, countryId: Int)( + implicit xa: Transactor[IO] + ): IO[Either[OsmStatError, ChangesetCountry]] = + (selectF ++ fr"WHERE changeset_id = $changesetId AND country_id = $countryId") .query[ChangesetCountry] .option .transact(xa) .map { case Some(changesetCountry) => Right(changesetCountry) - case None => Left(IdNotFoundError("changesetCountry", (changesetId, countryId))) + case None => + Left(IdNotFoundError("changesetCountry", (changesetId, countryId))) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[ChangesetCountry]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY changesetId ASC, countryId ASC LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[ChangesetCountry]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY changeset_id ASC, country_id ASC LIMIT $pageSize OFFSET $offset") .query[ChangesetCountry] .to[List] .map({ ResultPage(_, pageNum) }) .transact(xa) } } - diff --git a/src/main/scala/osmesa/server/model/Country.scala b/src/main/scala/osmesa/server/model/Country.scala index 47eb5a2..22919c3 100644 --- a/src/main/scala/osmesa/server/model/Country.scala +++ b/src/main/scala/osmesa/server/model/Country.scala @@ -1,20 +1,15 @@ package osmesa.server.model +import cats.effect._ import doobie._ import doobie.implicits._ -import cats.effect._ import io.circe._ import io.circe.generic.semiauto._ +import osmesa.server._ +case class Country(id: Int, name: Option[String], code: String) -case class Country( - id: Int, - name: Option[String], - code: String -) - - -object Country { +object Country extends Implicits { implicit val countryDecoder: Decoder[Country] = deriveDecoder implicit val countryEncoder: Encoder[Country] = deriveEncoder @@ -26,19 +21,23 @@ object Country { countries """ - def byId(id: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Country]] = + def byId( + id: Int + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Country]] = (selectF ++ fr"WHERE id = $id") .query[Country] .option .transact(xa) .map { case Some(country) => Right(country) - case None => Left(IdNotFoundError("country", id)) + case None => Left(IdNotFoundError("country", id)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[Country]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY id ASC LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[Country]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset") .query[Country] .to[List] .map({ ResultPage(_, pageNum) }) @@ -46,4 +45,3 @@ object Country { } } - diff --git a/src/main/scala/osmesa/server/model/Hashtag.scala b/src/main/scala/osmesa/server/model/Hashtag.scala index 4bdbc4b..57dd533 100644 --- a/src/main/scala/osmesa/server/model/Hashtag.scala +++ b/src/main/scala/osmesa/server/model/Hashtag.scala @@ -1,49 +1,49 @@ package osmesa.server.model +import cats.effect._ import doobie._ import doobie.implicits._ -import cats.effect._ import io.circe._ import io.circe.generic.semiauto._ +import osmesa.server._ -import scala.concurrent.Future - +case class Hashtag(id: Int, hashtag: String) -case class Hashtag( - id: Int, - hashtag: String -) - - -object Hashtag { +object Hashtag extends Implicits { implicit val hashtagDecoder: Decoder[Hashtag] = deriveDecoder implicit val hashtagEncoder: Encoder[Hashtag] = deriveEncoder - private val selectF = fr""" + private val selectF = + fr""" SELECT id, hashtag FROM hashtags """ - def byId(id: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Hashtag]] = + def byId( + id: Int + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, Hashtag]] = (selectF ++ fr"WHERE id = $id") .query[Hashtag] .option .transact(xa) .map { case Some(country) => Right(country) - case None => Left(IdNotFoundError("hashtag", id)) + case None => Left(IdNotFoundError("hashtag", id)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[Hashtag]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY id LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[Hashtag]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY id LIMIT $pageSize OFFSET $offset") .query[Hashtag] .to[List] - .map({ ResultPage(_, pageNum) }) + .map({ + ResultPage(_, pageNum) + }) .transact(xa) } } - diff --git a/src/main/scala/osmesa/server/model/OsmStatError.scala b/src/main/scala/osmesa/server/model/OsmStatError.scala index 402519a..3781312 100644 --- a/src/main/scala/osmesa/server/model/OsmStatError.scala +++ b/src/main/scala/osmesa/server/model/OsmStatError.scala @@ -1,15 +1,12 @@ package osmesa.server.model -import io.circe._ - - trait OsmStatError case class UnknownError(message: String) extends OsmStatError { override def toString = s"Unknown error: $message" } -case class IdNotFoundError[ID](recordType: String, id: ID) extends OsmStatError { +case class IdNotFoundError[ID](recordType: String, id: ID) + extends OsmStatError { override def toString = s"Unable to retrieve ${recordType} record at ${id}" } - diff --git a/src/main/scala/osmesa/server/model/Page.scala b/src/main/scala/osmesa/server/model/Page.scala index 1a48dad..9f54c78 100644 --- a/src/main/scala/osmesa/server/model/Page.scala +++ b/src/main/scala/osmesa/server/model/Page.scala @@ -3,13 +3,11 @@ package osmesa.server.model import io.circe._ import io.circe.generic.semiauto._ - -case class ResultPage[RESULT]( - results: List[RESULT], - page: Int -) +case class ResultPage[RESULT](results: List[RESULT], page: Int) object ResultPage { - implicit def resultPageDecoder[RESULT: Decoder]: Decoder[ResultPage[RESULT]] = deriveDecoder - implicit def resultPageEncoder[RESULT: Encoder]: Encoder[ResultPage[RESULT]] = deriveEncoder + implicit def resultPageDecoder[RESULT: Decoder]: Decoder[ResultPage[RESULT]] = + deriveDecoder + implicit def resultPageEncoder[RESULT: Encoder]: Encoder[ResultPage[RESULT]] = + deriveEncoder } diff --git a/src/main/scala/osmesa/server/model/User.scala b/src/main/scala/osmesa/server/model/User.scala index 27a1880..dbcaf8a 100644 --- a/src/main/scala/osmesa/server/model/User.scala +++ b/src/main/scala/osmesa/server/model/User.scala @@ -1,19 +1,15 @@ package osmesa.server.model +import cats.effect._ import doobie._ import doobie.implicits._ -import cats.effect._ import io.circe._ import io.circe.generic.semiauto._ +import osmesa.server._ +case class User(id: Int, name: Option[String]) -case class User( - id: Int, - name: Option[String] -) - - -object User { +object User extends Implicits { implicit val userDecoder: Decoder[User] = deriveDecoder implicit val userEncoder: Encoder[User] = deriveEncoder @@ -25,23 +21,26 @@ object User { users """ - def byId(id: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, User]] = + def byId( + id: Int + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, User]] = (selectF ++ fr"WHERE id = $id") .query[User] .option .transact(xa) .map { case Some(user) => Right(user) - case None => Left(IdNotFoundError("user", id)) + case None => Left(IdNotFoundError("user", id)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[ResultPage[User]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY id ASC LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[User]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset") .query[User] .to[List] .map({ ResultPage(_, pageNum) }) .transact(xa) } } - diff --git a/src/main/scala/osmesa/server/package.scala b/src/main/scala/osmesa/server/package.scala index ed6bc6f..92a8a4a 100644 --- a/src/main/scala/osmesa/server/package.scala +++ b/src/main/scala/osmesa/server/package.scala @@ -1,4 +1,29 @@ package osmesa +import cats.implicits._ +import doobie.util.meta.Meta +import io.circe.Json +import io.circe.generic.extras.Configuration +import io.circe.parser.parse +import org.postgresql.util.PGobject -package object server {} +package object server { + trait Implicits { + implicit val CustomConfig: Configuration = + Configuration.default.withSnakeCaseMemberNames.withDefaults + + } + + implicit final val JsonMeta: Meta[Json] = + Meta + .other[PGobject]("json") + .xmap[Json]( + a => parse(a.getValue).leftMap[Json](e => throw e).merge, + a => { + val o = new PGobject + o.setType("json") + o.setValue(a.noSpaces) + o + } + ) +} diff --git a/src/main/scala/osmesa/server/stats/CountryStats.scala b/src/main/scala/osmesa/server/stats/CountryStats.scala index b923b1f..22d5ff4 100644 --- a/src/main/scala/osmesa/server/stats/CountryStats.scala +++ b/src/main/scala/osmesa/server/stats/CountryStats.scala @@ -1,90 +1,72 @@ package osmesa.server.stats -import osmesa.server.model._ +import java.time.Instant +import cats.effect._ import doobie._ import doobie.implicits._ -import doobie.postgres._ -import doobie.postgres.implicits._ -import cats._ -import cats.data._ -import cats.effect._ -import cats.implicits._ import io.circe._ -import io.circe.jawn._ -import io.circe.syntax._ -import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto._ -import fs2._ -import org.http4s.circe._ -import org.http4s._ -import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} -import org.postgresql.util.PGobject - -import scala.concurrent.duration._ +import io.circe.java8.time._ +import osmesa.server._ +import osmesa.server.model._ -case class CountryStats( - countryId: Long, - name: Option[String], - kmRoadsAdd: Option[Double], - kmRoadsMod: Option[Double], - kmWaterwaysAdd: Option[Double], - kmWaterwaysMod: Option[Double], - kmCoastlinesAdd: Option[Double], - kmCoastlinesMod: Option[Double], - roadsAdd: Option[Int], - roadsMod: Option[Int], - waterwaysAdd: Option[Int], - waterwaysMod: Option[Int], - coastlinesAdd: Option[Int], - coastlinesMod: Option[Int], - buildingsAdd: Option[Int], - buildingsMod: Option[Int], - poiAdd: Option[Int], - poiMod: Option[Int], - lastEdit: Option[java.sql.Timestamp], - updatedAt: Option[java.sql.Timestamp], - changesetCount: Option[Int], - editCount: Option[Int], - userEdits: Json, - hashtagEdits: Json -) +case class CountryStats(countryId: Long, + name: Option[String], + code: Option[String], + measurements: Json, + counts: Json, + lastEdit: Option[Instant], + updatedAt: Option[Instant], + changesetCount: Option[Int], + editCount: Option[Int], + userEdits: Json, + hashtagEdits: Json) -object CountryStats { - implicit val customConfig: Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults +object CountryStats extends Implicits { implicit val countryStatsDecoder: Decoder[CountryStats] = deriveDecoder implicit val countryStatsEncoder: Encoder[CountryStats] = deriveEncoder - private val selectF = fr""" + private val selectF = + fr""" SELECT - country_id, country_name, road_km_added, road_km_modified, waterway_km_added, waterway_km_modified, - coastline_km_added, coastline_km_modified, roads_added, roads_modified, waterways_added, waterways_modified, - coastlines_added, coastlines_modified, buildings_added, buildings_modified, pois_added, pois_modified, - last_edit, updated_at, changeset_count, edit_count, user_edit_counts, hashtag_edits + country_id, + country_name, + country_code, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, + last_edit, + updated_at, + changeset_count, + edit_count, + coalesce(user_edit_counts, '{}'::jsonb) user_edit_counts, + coalesce(hashtag_edits, '{}'::jsonb) hashtag_edits FROM country_statistics """ - def byId(code: String)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, CountryStats]] = + def byId( + code: String + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, CountryStats]] = (selectF ++ fr"WHERE country_code = $code") .query[CountryStats] .option .transact(xa) .map { case Some(country) => Right(country) - case None => Left(IdNotFoundError("country", code)) + case None => Left(IdNotFoundError("country", code)) } - def getPage(pageNum: Int, pageSize: Int = 25)(implicit xa: Transactor[IO]): IO[ResultPage[CountryStats]] = { - val offset = pageNum * pageSize + 1 + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[CountryStats]] = { + val offset = (pageNum - 1) * pageSize (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset") .query[CountryStats] .to[List] - .map({ ResultPage(_, pageNum) }) + .map({ + ResultPage(_, pageNum) + }) .transact(xa) } } diff --git a/src/main/scala/osmesa/server/stats/HashtagStats.scala b/src/main/scala/osmesa/server/stats/HashtagStats.scala index 8f2c646..f00a7ab 100644 --- a/src/main/scala/osmesa/server/stats/HashtagStats.scala +++ b/src/main/scala/osmesa/server/stats/HashtagStats.scala @@ -1,111 +1,63 @@ package osmesa.server.stats -import osmesa.server.model._ - -import doobie._ -import doobie.implicits._ -import doobie.postgres._ -import doobie.postgres.implicits._ -import cats._ -import cats.data._ import cats.effect._ import cats.implicits._ +import doobie._ +import doobie.implicits._ import io.circe._ -import io.circe.jawn._ -import io.circe.syntax._ -import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto._ -import fs2._ -import org.postgresql.util.PGobject - -import scala.concurrent.duration._ - +import osmesa.server._ +import osmesa.server.model._ -case class HashtagStats( - tag: String, - extentUri: Option[String], - buildingsAdd: Option[Int], - buildingsMod: Option[Int], - roadsAdd: Option[Int], - kmRoadsAdd: Option[Double], - roadsMod: Option[Int], - kmRoadsMod: Option[Double], - waterwaysAdd: Option[Int], - kmWaterwaysAdd: Option[Double], - waterwaysMod: Option[Int], - kmWaterwaysMod: Option[Double], - coastlinesAdd: Option[Int], - kmCoastlinesAdd: Option[Double], - coastlinesMod: Option[Int], - kmCoastlinesMod: Option[Double], - poiAdd: Option[Int], - poiMod: Option[Int], - users: Json -) +case class HashtagStats(tag: String, + measurements: Json, + counts: Json, + users: Json) -/** ------------------------+------------------+-----------+----------+--------- - tag | text | | | - users | json | | | - extent_uri | text | | | - buildings_added | bigint | | | - buildings_modified | bigint | | | - roads_added | bigint | | | - road_km_added | double precision | | | - roads_modified | bigint | | | - road_km_modified | double precision | | | - waterways_added | bigint | | | - waterway_km_added | double precision | | | - waterways_modified | bigint | | | - waterway_km_modified | double precision | | | - coastlines_added | bigint | | | - coastline_km_added | double precision | | | - coastlines_modified | bigint | | | - coastline_km_modified | double precision | | | - pois_added | bigint | | | - pois_modified | bigint | | | - **/ -object HashtagStats { - implicit val customConfig: Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults +object HashtagStats extends Implicits { implicit val userHashtagDecoder: Decoder[HashtagStats] = deriveDecoder implicit val userHashtagEncoder: Encoder[HashtagStats] = deriveEncoder - private val selectF = fr""" + private val selectF = + fr""" SELECT - tag, extent_uri, buildings_added, buildings_modified, - roads_added, road_km_added, roads_modified, road_km_modified, - waterways_added, waterway_km_added, waterways_modified, - waterway_km_modified, coastlines_added, coastline_km_added, - coastlines_modified, coastline_km_modified, pois_added, - pois_modified, users + tag, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, + coalesce(users, '{}'::jsonb) users FROM hashtag_statistics """ - def byTag(tag: String)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, HashtagStats]] = + def byTag( + tag: String + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, HashtagStats]] = (selectF ++ fr"WHERE tag = $tag") .query[HashtagStats] .option .attempt .transact(xa) .map { - case Right(hashtagOrNone) => hashtagOrNone match { - case Some(ht) => Right(ht) - case None => Left(IdNotFoundError("hashtag_statistics", tag)) - } + case Right(hashtagOrNone) => + hashtagOrNone match { + case Some(ht) => Right(ht) + case None => Left(IdNotFoundError("hashtag_statistics", tag)) + } case Left(err) => Left(UnknownError(err.toString)) } - def getPage(pageNum: Int)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, ResultPage[HashtagStats]]] = { - val offset = pageNum * 10 + 1 - (selectF ++ fr"ORDER BY tag ASC LIMIT 10 OFFSET $offset") + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[Either[OsmStatError, ResultPage[HashtagStats]]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"ORDER BY tag ASC LIMIT $pageSize OFFSET $offset") .query[HashtagStats] .to[List] .attempt .transact(xa) .map { case Right(results) => Right(ResultPage(results, pageNum)) - case Left(err) => Left(UnknownError(err.toString)) + case Left(err) => Left(UnknownError(err.toString)) } } } diff --git a/src/main/scala/osmesa/server/stats/RefreshStats.scala b/src/main/scala/osmesa/server/stats/RefreshStats.scala index 20d176b..fe49135 100644 --- a/src/main/scala/osmesa/server/stats/RefreshStats.scala +++ b/src/main/scala/osmesa/server/stats/RefreshStats.scala @@ -1,43 +1,22 @@ package osmesa.server.stats -import osmesa.server.model._ +import java.time.Instant +import cats.effect._ import doobie._ import doobie.implicits._ -import doobie.postgres._ import doobie.postgres.implicits._ -import cats._ -import cats.data._ -import cats.effect._ -import cats.implicits._ import io.circe._ -import io.circe.jawn._ -import io.circe.syntax._ -import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto._ -import fs2._ -import org.http4s.circe._ -import org.http4s._ -import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} -import org.postgresql.util.PGobject +import io.circe.java8.time._ +import osmesa.server.Implicits -import scala.concurrent.duration._ +case class RefreshTime(view: Option[String], updatedAt: Option[Instant]) -case class RefreshTime( - view: Option[String], - updatedAt: Option[java.sql.Timestamp] -) - -case class RefreshStats( - userStatsRefresh: Option[java.sql.Timestamp], - countryStatsRefresh: Option[java.sql.Timestamp], - hashtagStatsRefresh: Option[java.sql.Timestamp] -) { - def +(that: RefreshStats) = { +case class RefreshStats(userStatsRefresh: Option[Instant], + countryStatsRefresh: Option[Instant], + hashtagStatsRefresh: Option[Instant]) { + def +(that: RefreshStats): RefreshStats = { RefreshStats( (userStatsRefresh.toList ++ that.userStatsRefresh).headOption, (countryStatsRefresh.toList ++ that.countryStatsRefresh).headOption, @@ -46,9 +25,7 @@ case class RefreshStats( } } -object RefreshStats { - implicit val customConfig: Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults - +object RefreshStats extends Implicits { implicit val refreshStatsDecoder: Decoder[RefreshStats] = deriveDecoder implicit val refreshStatsEncoder: Encoder[RefreshStats] = deriveEncoder @@ -59,17 +36,17 @@ object RefreshStats { refreshments """ - def apply(arg: RefreshTime): RefreshStats = arg.view.get match { - case "user_statistics" => RefreshStats(arg.updatedAt, None, None) - case "country_statistics" => RefreshStats(None, arg.updatedAt, None) - case "hashtag_statistics" => RefreshStats(None, None, arg.updatedAt) - } - def getCurrentStatus()(implicit xa: Transactor[IO]): IO[RefreshStats] = - (selectF) + selectF .query[RefreshTime] .to[List] .transact(xa) - .map(_.map(RefreshStats.apply(_)).reduce(_+_)) + .map(_.map(RefreshStats.apply).reduce(_ + _)) + + def apply(arg: RefreshTime): RefreshStats = arg.view.get match { + case "user_statistics" => RefreshStats(arg.updatedAt, None, None) + case "country_statistics" => RefreshStats(None, arg.updatedAt, None) + case "hashtag_statistics" => RefreshStats(None, None, arg.updatedAt) + } } diff --git a/src/main/scala/osmesa/server/stats/UserStats.scala b/src/main/scala/osmesa/server/stats/UserStats.scala index 15cefc5..7569485 100644 --- a/src/main/scala/osmesa/server/stats/UserStats.scala +++ b/src/main/scala/osmesa/server/stats/UserStats.scala @@ -1,131 +1,74 @@ package osmesa.server.stats -import osmesa.server.model._ +import java.time.Instant +import cats.effect._ import doobie._ import doobie.implicits._ -import doobie.postgres._ -import doobie.postgres.implicits._ -import cats._ -import cats.data._ -import cats.effect._ -import cats.implicits._ import io.circe._ -import io.circe.jawn._ -import io.circe.syntax._ -import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto._ -import fs2._ -import org.http4s.circe._ -import org.http4s._ -import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`} -import org.postgresql.util.PGobject - -import scala.concurrent.duration._ - +import io.circe.java8.time._ +import osmesa.server._ +import osmesa.server.model.{IdNotFoundError, OsmStatError, ResultPage} -case class UserStats( - uid: Long, - name: Option[String], - extentUri: Option[String], - kmRoadsAdd: Option[Double], - kmRoadsMod: Option[Double], - kmRoadsDel: Option[Double], - kmWaterwaysAdd: Option[Double], - kmWaterwaysMod: Option[Double], - kmWaterwaysDel: Option[Double], - kmCoastlinesAdd: Option[Double], - kmCoastlinesMod: Option[Double], - kmCoastlinesDel: Option[Double], - roadsAdd: Option[Int], - roadsMod: Option[Int], - roadsDel: Option[Int], - waterwaysAdd: Option[Int], - waterwaysMod: Option[Int], - waterwaysDel: Option[Int], - coastlinesAdd: Option[Int], - coastlinesMod: Option[Int], - coastlinesDel: Option[Int], - buildingsAdd: Option[Int], - buildingsMod: Option[Int], - buildingsDel: Option[Int], - poiAdd: Option[Int], - poiMod: Option[Int], - poiDel: Option[Int], - lastEdit: Option[java.sql.Timestamp], - changesetCount: Option[Int], - editCount: Option[Int], - editors: Json, - editTimes: Json, - countryList: Json, - hashtags: Json -) +case class UserStats(uid: Long, + name: Option[String], + extentUri: Option[String], + measurements: Json, + counts: Json, + lastEdit: Option[Instant], + changesetCount: Option[Int], + editCount: Option[Int], + editors: Json, + editTimes: Json, + countries: Json, + hashtags: Json) -object UserStats { - implicit val customConfig: Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults +object UserStats extends Implicits { implicit val userStatsDecoder: Decoder[UserStats] = deriveDecoder implicit val userStatsEncoder: Encoder[UserStats] = deriveEncoder - private val selectF = fr""" + private val selectF = + fr""" SELECT id, name, extent_uri, - road_km_added, - road_km_modified, - road_km_deleted, - waterway_km_added, - waterway_km_modified, - waterway_km_deleted, - coastline_km_added, - coastline_km_modified, - coastline_km_deleted, - roads_added, - roads_modified, - roads_deleted, - waterways_added, - waterways_modified, - waterways_deleted, - coastlines_added, - coastlines_modified, - coastlines_deleted, - buildings_added, - buildings_modified, - buildings_deleted, - pois_added, - pois_modified, - pois_deleted, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, last_edit, changeset_count, edit_count, - editors, - edit_times, - country_list, - hashtags + coalesce(editors, '{}'::jsonb) editors, + coalesce(edit_times, '{}'::jsonb) edit_times, + coalesce(countries, '{}'::jsonb) countries, + coalesce(hashtags, '{}'::jsonb) hashtags FROM user_statistics """ - def byId(id: Long)(implicit xa: Transactor[IO]): IO[Either[OsmStatError, UserStats]] = + def byId( + id: Long + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, UserStats]] = (selectF ++ fr"WHERE id = $id") .query[UserStats] .option .transact(xa) .map { case Some(user) => Right(user) - case None => Left(IdNotFoundError("user", id)) + case None => Left(IdNotFoundError("user", id)) } - def getPage(pageNum: Int, pageSize: Int = 25)(implicit xa: Transactor[IO]): IO[ResultPage[UserStats]] = { - val offset = pageNum * pageSize + 1 + def getPage(pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[ResultPage[UserStats]] = { + val offset = (pageNum - 1) * pageSize (selectF ++ fr"ORDER BY id ASC LIMIT $pageSize OFFSET $offset") .query[UserStats] .to[List] - .map({ ResultPage(_, pageNum) }) + .map({ + ResultPage(_, pageNum) + }) .transact(xa) } } diff --git a/src/main/scala/osmesa/server/stats/package.scala b/src/main/scala/osmesa/server/stats/package.scala deleted file mode 100644 index ab91351..0000000 --- a/src/main/scala/osmesa/server/stats/package.scala +++ /dev/null @@ -1,32 +0,0 @@ -package osmesa.server - -import osmesa.server.model._ - -import doobie._ -import doobie.implicits._ -import doobie.postgres._ -import doobie.postgres.implicits._ -import io.circe._ -import io.circe.parser.parse -import cats._ -import cats.implicits._ -import org.postgresql.util.PGobject -import java.sql.Timestamp - -package object stats { - implicit val JsonMeta: Meta[Json] = - Meta.other[PGobject]("json").xmap[Json]( - a => parse(a.getValue).leftMap[Json](e => throw e).merge, - a => { - val o = new PGobject - o.setType("json") - o.setValue(a.noSpaces) - o - } - ) - implicit val TimestampFormat : Encoder[Timestamp] with Decoder[Timestamp] = new Encoder[Timestamp] with Decoder[Timestamp] { - override def apply(a: Timestamp): Json = Encoder.encodeLong.apply(a.getTime) - - override def apply(c: HCursor): Decoder.Result[Timestamp] = Decoder.decodeLong.map(s => new Timestamp(s)).apply(c) - } -} diff --git a/src/main/scala/osmesa/server/tile/TileLayouts.scala b/src/main/scala/osmesa/server/tile/TileLayouts.scala index 6008abc..774cc34 100644 --- a/src/main/scala/osmesa/server/tile/TileLayouts.scala +++ b/src/main/scala/osmesa/server/tile/TileLayouts.scala @@ -1,14 +1,14 @@ package osmesa.server.tile import geotrellis.proj4.WebMercator -import geotrellis.raster._ import geotrellis.spark.tiling._ - object TileLayouts { - private val layouts: Array[LayoutDefinition] = (0 to 30).map({ n => - ZoomedLayoutScheme.layoutForZoom(n, WebMercator.worldExtent, 256) - }).toArray + private val layouts: Array[LayoutDefinition] = (0 to 30) + .map({ n => + ZoomedLayoutScheme.layoutForZoom(n, WebMercator.worldExtent, 256) + }) + .toArray def apply(i: Int) = layouts(i) } diff --git a/src/main/scala/osmesa/server/tile/TileRouter.scala b/src/main/scala/osmesa/server/tile/TileRouter.scala index 8c321e7..b1c830f 100644 --- a/src/main/scala/osmesa/server/tile/TileRouter.scala +++ b/src/main/scala/osmesa/server/tile/TileRouter.scala @@ -1,78 +1,79 @@ package osmesa.server.tile -import osmesa.server.Config -import osmesa.server.model._ - -import cats.effect._ -import doobie.Transactor -import io.circe._ -import io.circe.syntax._ -import fs2._ -import fs2.StreamApp.ExitCode -import org.http4s.circe._ -import org.http4s._ -import org.http4s.dsl.Http4sDsl -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.server.HttpMiddleware -import org.http4s.server.middleware.{GZip, CORS, CORSConfig} -import org.http4s.headers.{Location, `Content-Type`, `Content-Encoding`} -import blobstore.{Path => BStorePath} -import blobstore.Store import blobstore.s3.S3Store -import geotrellis.vector.Extent -import geotrellis.vectortile.VectorTile +import blobstore.{Store, Path => BStorePath} +import cats.effect._ import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.model.AmazonS3Exception +import geotrellis.vector.Extent +import geotrellis.vectortile.VectorTile +import org.http4s._ +import org.http4s.dsl.Http4sDsl +import org.http4s.headers.{`Content-Encoding`, `Content-Type`} +import osmesa.server.Config import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ import scala.util._ - class TileRouter(tileConf: Config.Tiles) extends Http4sDsl[IO] { - private val s3client = AmazonS3ClientBuilder.standard().withRegion("us-east-1").build() + private val s3client = + AmazonS3ClientBuilder.standard().withRegion("us-east-1").build() private val store: Store[IO] = S3Store[IO](s3client) - private val vtileContentType = `Content-Type`(("application", "vnd.mapbox-vector-tile")) + private val vtileContentType = `Content-Type`( + ("application", "vnd.mapbox-vector-tile") + ) private val emptyVT = VectorTile(Map(), Extent(0, 0, 1, 1)) - def tilePath(pre: String, z: Int, x: Int, y: String) = { - BStorePath(tileConf.s3bucket, s"${pre}/${z}/${x}/${y}", None, false, None) - } - def routes: HttpService[IO] = HttpService[IO] { case GET -> Root / "user" / userId / IntVar(z) / IntVar(x) / y => val getBytes = store - .get(tilePath(s"${tileConf.s3prefix}/user/${userId}", z, x, y), tileConf.chunkSize) + .get( + tilePath(s"${tileConf.s3prefix}/user/${userId}", z, x, y), + tileConf.chunkSize + ) .compile .to[Array] .attempt - getBytes.flatMap { - case Right(bytes) if tileConf.gzipped => - Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) - case Right(bytes) => - Ok(bytes) - case Left(s3e: AmazonS3Exception) if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => - Ok(emptyVT.toBytes) - }.map(_.withContentType(vtileContentType)) + getBytes + .flatMap { + case Right(bytes) if tileConf.gzipped => + Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) + case Right(bytes) => + Ok(bytes) + case Left(s3e: AmazonS3Exception) + if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => + Ok(emptyVT.toBytes) + } + .map(_.withContentType(vtileContentType)) case GET -> Root / "hashtag" / hashtag / IntVar(z) / IntVar(x) / y => val getBytes = store - .get(tilePath(s"${tileConf.s3prefix}/hashtag/${hashtag}", z, x, y), tileConf.chunkSize) + .get( + tilePath(s"${tileConf.s3prefix}/hashtag/${hashtag}", z, x, y), + tileConf.chunkSize + ) .compile .to[Array] .attempt - getBytes.flatMap { - case Right(bytes) if tileConf.gzipped => - Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) - case Right(bytes) => - Ok(bytes) - case Left(s3e: AmazonS3Exception) if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => - Ok(emptyVT.toBytes) - }.map(_.withContentType(vtileContentType)) + getBytes + .flatMap { + case Right(bytes) if tileConf.gzipped => + Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) + case Right(bytes) => + Ok(bytes) + case Left(s3e: AmazonS3Exception) + if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => + Ok(emptyVT.toBytes) + } + .map(_.withContentType(vtileContentType)) + } + + def tilePath(pre: String, z: Int, x: Int, y: String) = { + BStorePath(tileConf.s3bucket, s"${pre}/${z}/${x}/${y}", None, false, None) } } From 72c18b9f541a06e25d1c6cb6a3961cffe429f5c2 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Mon, 17 Jun 2019 16:35:53 -0700 Subject: [PATCH 07/22] Tiles should always be gzipped --- src/main/resources/application.conf | 2 -- src/main/scala/osmesa/server/Config.scala | 3 +-- src/main/scala/osmesa/server/tile/TileRouter.scala | 8 ++------ 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 43bde71..cae588b 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -19,7 +19,5 @@ tiles { s-3suffix = ${?TILE_SUFFIX} chunk-size = 4096 chunk-size = ${?TILE_CHUNK_SIZE} - gzipped = true - gzipped = ${?GZIPPED} } diff --git a/src/main/scala/osmesa/server/Config.scala b/src/main/scala/osmesa/server/Config.scala index d5ee914..fe9b6d0 100644 --- a/src/main/scala/osmesa/server/Config.scala +++ b/src/main/scala/osmesa/server/Config.scala @@ -30,6 +30,5 @@ object Config { case class Tiles(s3bucket: String, s3prefix: String, s3suffix: Option[String], - chunkSize: Int, - gzipped: Boolean) + chunkSize: Int) } diff --git a/src/main/scala/osmesa/server/tile/TileRouter.scala b/src/main/scala/osmesa/server/tile/TileRouter.scala index b1c830f..8479b0f 100644 --- a/src/main/scala/osmesa/server/tile/TileRouter.scala +++ b/src/main/scala/osmesa/server/tile/TileRouter.scala @@ -40,10 +40,8 @@ class TileRouter(tileConf: Config.Tiles) extends Http4sDsl[IO] { getBytes .flatMap { - case Right(bytes) if tileConf.gzipped => - Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) case Right(bytes) => - Ok(bytes) + Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) case Left(s3e: AmazonS3Exception) if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => Ok(emptyVT.toBytes) @@ -62,10 +60,8 @@ class TileRouter(tileConf: Config.Tiles) extends Http4sDsl[IO] { getBytes .flatMap { - case Right(bytes) if tileConf.gzipped => - Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) case Right(bytes) => - Ok(bytes) + Ok(bytes, `Content-Encoding`(ContentCoding.gzip)) case Left(s3e: AmazonS3Exception) if s3e.getStatusCode == 403 || s3e.getStatusCode == 404 => Ok(emptyVT.toBytes) From 2505e0a7333c291cc769fae25ace899d3448da54 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Wed, 7 Aug 2019 14:23:39 -0700 Subject: [PATCH 08/22] Use edit counts for ranking country users and hashtags This will now produce the top 10 users / hashtags by number of edits. In addition, `user_edit_counts` (now `user_edits`) and `hashtag_edits` now surface number of edits (not changesets). Changeset counts are surfaced as `user_changesets` and `hashtag_changesets`. --- sql/country_statistics.sql | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/sql/country_statistics.sql b/sql/country_statistics.sql index 710d0fc..4a08867 100644 --- a/sql/country_statistics.sql +++ b/sql/country_statistics.sql @@ -1,3 +1,4 @@ +DROP MATERIALIZED VIEW IF EXISTS country_statistics; CREATE MATERIALIZED VIEW country_statistics AS WITH changesets AS ( SELECT @@ -11,7 +12,7 @@ CREATE MATERIALIZED VIEW country_statistics AS country_id, max(coalesce(closed_at, created_at)) last_edit, count(*) changeset_count, - sum(edit_count) edit_count, + sum(coalesce(edit_count, 0)) edit_count, max(updated_at) updated_at FROM changesets JOIN changesets_countries ON changesets.id = changesets_countries.changeset_id @@ -19,23 +20,22 @@ CREATE MATERIALIZED VIEW country_statistics AS ), processed_changesets AS ( SELECT - -- TODO include changesets_countries.edit_count as an alternative to changeset count id, user_id, country_id, measurements, - counts + counts, + edit_count FROM changesets JOIN changesets_countries ON changesets.id = changesets_countries.changeset_id ), hashtag_counts AS ( SELECT - -- TODO rank by edit count? - RANK() OVER (PARTITION BY country_id ORDER BY count(*) DESC) AS rank, + RANK() OVER (PARTITION BY country_id ORDER BY sum(coalesce(edit_count, 0)) DESC) AS rank, country_id, hashtag, - -- TODO expose edit count instead? - count(*) changesets + count(*) changesets, + sum(coalesce(edit_count, 0)) edits FROM processed_changesets JOIN changesets_hashtags ON processed_changesets.id = changesets_hashtags.changeset_id JOIN hashtags ON changesets_hashtags.hashtag_id = hashtags.id @@ -44,26 +44,27 @@ CREATE MATERIALIZED VIEW country_statistics AS hashtags AS ( SELECT country_id, - jsonb_object_agg(hashtag, changesets) hashtags + jsonb_object_agg(hashtag, changesets) hashtag_changesets, + jsonb_object_agg(hashtag, edits) hashtag_edits FROM hashtag_counts WHERE rank <= 10 GROUP BY country_id ), user_counts AS ( SELECT - -- TODO rank by edit count? - RANK() OVER (PARTITION BY country_id ORDER BY count(*) DESC) AS rank, + RANK() OVER (PARTITION BY country_id ORDER BY sum(coalesce(edit_count, 0)) DESC) AS rank, country_id, user_id, - -- TODO expose edit count instead? - count(*) changesets + count(*) changesets, + sum(coalesce(edit_count, 0)) edits FROM processed_changesets GROUP BY country_id, user_id ), users AS ( SELECT country_id, - jsonb_object_agg(user_id, changesets) users + jsonb_object_agg(user_id, changesets) user_changesets, + jsonb_object_agg(user_id, edits) user_edits FROM user_counts WHERE rank <= 10 GROUP BY country_id @@ -128,8 +129,10 @@ CREATE MATERIALIZED VIEW country_statistics AS general.edit_count, general.last_edit, general.updated_at, - users user_edit_counts, - hashtags hashtag_edits + user_changesets, + user_edits, + hashtag_changesets, + hashtag_edits FROM general JOIN countries ON country_id = countries.id LEFT OUTER JOIN users USING (country_id) @@ -137,4 +140,4 @@ CREATE MATERIALIZED VIEW country_statistics AS LEFT OUTER JOIN aggregated_measurements USING (country_id) LEFT OUTER JOIN aggregated_counts USING (country_id); -CREATE UNIQUE INDEX country_statistics_id ON country_statistics(country_code); +CREATE UNIQUE INDEX IF NOT EXISTS country_statistics_id ON country_statistics(country_code); From 610a6c6bc4fee10ab789577096bcf90e65ccd5a6 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Wed, 7 Aug 2019 14:40:35 -0700 Subject: [PATCH 09/22] Use edit counts for ranking hashtag users Also, surface both changeset and edit counts per-user. --- sql/hashtag_statistics.sql | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/hashtag_statistics.sql b/sql/hashtag_statistics.sql index 12a38aa..bf85d59 100644 --- a/sql/hashtag_statistics.sql +++ b/sql/hashtag_statistics.sql @@ -1,10 +1,11 @@ +DROP MATERIALIZED VIEW IF EXISTS hashtag_statistics; CREATE MATERIALIZED VIEW hashtag_statistics AS WITH general AS ( SELECT hashtag_id, max(coalesce(closed_at, created_at)) last_edit, count(*) changeset_count, - sum(total_edits) edit_count, + sum(coalesce(total_edits, 0)) edit_count, max(updated_at) updated_at FROM changesets JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id @@ -16,25 +17,26 @@ CREATE MATERIALIZED VIEW hashtag_statistics AS user_id, hashtag_id, measurements, - counts + counts, + total_edits FROM changesets JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id ), user_counts AS ( SELECT - -- TODO rank by edit count? - RANK() OVER (PARTITION BY hashtag_id ORDER BY count(*) DESC) AS rank, + RANK() OVER (PARTITION BY hashtag_id ORDER BY sum(coalesce(total_edits, 0)) DESC) AS rank, hashtag_id, user_id, - -- TODO expose edit count instead? - count(*) changesets + count(*) changesets, + sum(coalesce(total_edits, 0)) edit_count FROM processed_changesets GROUP BY hashtag_id, user_id ), users AS ( SELECT hashtag_id, - jsonb_object_agg(user_id, changesets) users + jsonb_object_agg(user_id, changesets) user_changesets, + jsonb_object_agg(user_id, edit_count) user_edits FROM user_counts WHERE rank <= 10 GROUP BY hashtag_id @@ -96,11 +98,12 @@ CREATE MATERIALIZED VIEW hashtag_statistics AS general.edit_count, general.last_edit, general.updated_at, - users + user_changesets, + user_edits FROM general JOIN hashtags ON hashtag_id = hashtags.id LEFT OUTER JOIN users USING (hashtag_id) LEFT OUTER JOIN aggregated_measurements USING (hashtag_id) LEFT OUTER JOIN aggregated_counts USING (hashtag_id); -CREATE UNIQUE INDEX hashtag_statistics_hashtag_id ON hashtag_statistics(hashtag_id); \ No newline at end of file +CREATE UNIQUE INDEX IF NOT EXISTS hashtag_statistics_hashtag_id ON hashtag_statistics(hashtag_id); \ No newline at end of file From 131a88cec40f58f81796a254d70cd5cbe09226a8 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Wed, 7 Aug 2019 15:09:26 -0700 Subject: [PATCH 10/22] Use edit counts for ranking user countries, editors, and hashtags Also splits out changesets and edits for each dimension. --- sql/user_statistics.sql | 57 +++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/sql/user_statistics.sql b/sql/user_statistics.sql index 67f3a39..26f097b 100644 --- a/sql/user_statistics.sql +++ b/sql/user_statistics.sql @@ -1,3 +1,4 @@ +DROP MATERIALIZED VIEW IF EXISTS user_statistics; CREATE MATERIALIZED VIEW user_statistics AS WITH general AS ( SELECT @@ -5,7 +6,7 @@ CREATE MATERIALIZED VIEW user_statistics AS array_agg(id) changesets, max(coalesce(closed_at, created_at)) last_edit, count(*) changeset_count, - sum(total_edits) edit_count, + sum(coalesce(total_edits, 0)) edit_count, max(updated_at) updated_at FROM changesets GROUP BY user_id @@ -14,7 +15,8 @@ CREATE MATERIALIZED VIEW user_statistics AS SELECT user_id, code, - count(*) changesets + count(*) changesets, + sum(coalesce(total_edits, 0)) edits FROM changesets JOIN changesets_countries ON changesets.id = changesets_countries.changeset_id JOIN countries ON changesets_countries.country_id = countries.id @@ -23,30 +25,35 @@ CREATE MATERIALIZED VIEW user_statistics AS countries AS ( SELECT user_id, - jsonb_object_agg(code, changesets) countries + jsonb_object_agg(code, changesets) country_changesets, + jsonb_object_agg(code, edits) country_edits FROM country_counts GROUP BY user_id ), - edit_time_counts AS ( + edit_day_counts AS ( SELECT user_id, date_trunc('day', coalesce(closed_at, created_at))::date AS day, - count(*) changesets + count(*) changesets, + sum(coalesce(total_edits, 0)) edits FROM changesets GROUP BY user_id, day ), - edit_times AS ( + edit_days AS ( SELECT user_id, - jsonb_object_agg(day, changesets) edit_times - FROM edit_time_counts + jsonb_object_agg(day, changesets) day_changesets, + jsonb_object_agg(day, edits) day_edits + FROM edit_day_counts GROUP BY user_id ), editor_counts AS ( SELECT + RANK() OVER (PARTITION BY user_id ORDER BY sum(coalesce(total_edits, 0)) DESC) AS rank, user_id, editor, - count(*) changesets + count(*) changesets, + sum(coalesce(total_edits, 0)) edits FROM changesets WHERE editor IS NOT NULL GROUP BY user_id, editor @@ -54,15 +61,19 @@ CREATE MATERIALIZED VIEW user_statistics AS editors AS ( SELECT user_id, - jsonb_object_agg(editor, changesets) editors + jsonb_object_agg(editor, changesets) editor_changesets, + jsonb_object_agg(editor, edits) editor_edits FROM editor_counts + WHERE rank <= 10 GROUP BY user_id ), hashtag_counts AS ( SELECT + RANK() OVER (PARTITION BY user_id ORDER BY sum(coalesce(total_edits, 0)) DESC) AS rank, user_id, hashtag, - count(*) changesets + count(*) changesets, + sum(coalesce(total_edits)) edits FROM changesets JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id JOIN hashtags ON changesets_hashtags.hashtag_id = hashtags.id @@ -71,8 +82,10 @@ CREATE MATERIALIZED VIEW user_statistics AS hashtags AS ( SELECT user_id, - jsonb_object_agg(hashtag, changesets) hashtags + jsonb_object_agg(hashtag, changesets) hashtag_changesets, + jsonb_object_agg(hashtag, edits) hashtag_edits FROM hashtag_counts + WHERE rank <= 50 GROUP BY user_id ), measurements AS ( @@ -126,27 +139,27 @@ CREATE MATERIALIZED VIEW user_statistics AS SELECT user_id AS id, users.name, - 'user/' || users.id || '/{z}/{x}/{y}.mvt' AS extent_uri, measurements, counts, last_edit, changeset_count, edit_count, - -- TODO this is unbounded; top N? - editors, - edit_times, - -- TODO top N? - countries, - -- TODO top N? - hashtags, + editor_changesets, + editor_edits, + day_changesets, + day_edits, + country_changesets, + country_edits, + hashtag_changesets, + hashtag_edits, updated_at FROM general LEFT OUTER JOIN countries USING (user_id) LEFT OUTER JOIN editors USING (user_id) - LEFT OUTER JOIN edit_times USING (user_id) + LEFT OUTER JOIN edit_days USING (user_id) LEFT OUTER JOIN hashtags USING (user_id) LEFT OUTER JOIN aggregated_measurements USING (user_id) LEFT OUTER JOIN aggregated_counts USING (user_id) JOIN users ON user_id = users.id; -CREATE UNIQUE INDEX user_statistics_id ON user_statistics(id); +CREATE UNIQUE INDEX IF NOT EXISTS user_statistics_id ON user_statistics(id); From 7cb2556484ee7735959aa872b7d7a00311e3b84f Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Sat, 7 Sep 2019 13:39:12 -0500 Subject: [PATCH 11/22] Statistics by user and hashtag --- sql/hashtag_user_statistic.sql | 88 ++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 sql/hashtag_user_statistic.sql diff --git a/sql/hashtag_user_statistic.sql b/sql/hashtag_user_statistic.sql new file mode 100644 index 0000000..60799a4 --- /dev/null +++ b/sql/hashtag_user_statistic.sql @@ -0,0 +1,88 @@ +DROP MATERIALIZED VIEW IF EXISTS hashtag_user_statistics; +CREATE MATERIALIZED VIEW hashtag_user_statistics AS + WITH general AS ( + SELECT + user_id, + hashtag_id, + array_agg(id) changesets, + max(coalesce(closed_at, created_at)) last_edit, + count(*) changeset_count, + sum(coalesce(total_edits, 0)) edit_count, + max(updated_at) updated_at + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + GROUP BY user_id, hashtag_id + ), + measurements AS ( + SELECT + id, + user_id, + hashtag_id, + key, + value + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + CROSS JOIN LATERAL jsonb_each(measurements) + ), + aggregated_measurements_kv AS ( + SELECT + user_id, + hashtag_id, + key, + sum((value->>0)::numeric) AS value + FROM measurements + GROUP BY user_id, hashtag_id, key + ), + aggregated_measurements AS ( + SELECT + user_id, + hashtag_id, + jsonb_object_agg(key, value) measurements + FROM aggregated_measurements_kv + GROUP BY user_id, hashtag_id + ), + counts AS ( + SELECT + id, + user_id, + hashtag_id, + key, + value + FROM changesets + JOIN changesets_hashtags ON changesets.id = changesets_hashtags.changeset_id + CROSS JOIN LATERAL jsonb_each(counts) + ), + aggregated_counts_kv AS ( + SELECT + user_id, + hashtag_id, + key, + sum((value->>0)::numeric) AS value + FROM counts + GROUP BY user_id, hashtag_id, key + ), + aggregated_counts AS ( + SELECT + user_id, + hashtag_id, + jsonb_object_agg(key, value) counts + FROM aggregated_counts_kv + GROUP BY user_id, hashtag_id + ) + SELECT + user_id, + users.name, + general.hashtag_id, + hashtags.hashtag, + measurements, + counts, + last_edit, + changeset_count, + updated_at + FROM general + LEFT OUTER JOIN hashtags ON general.hashtag_id = hashtags.id + LEFT OUTER JOIN aggregated_measurements USING (user_id, hashtag_id) + LEFT OUTER JOIN aggregated_counts USING (user_id, hashtag_id) + JOIN users ON user_id = users.id; + +CREATE UNIQUE INDEX IF NOT EXISTS hashtag_user_statistics_pk ON hashtag_user_statistics(hashtag_id, user_id); \ No newline at end of file From a4891e8537dbaf0494b53809613da5940bcc645d Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Sat, 7 Sep 2019 16:08:58 -0500 Subject: [PATCH 12/22] Refresh hashtag/user stats --- docker/osm-stat-server/refresh-views.sh | 9 +++++++++ sql/refreshments.sql | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/docker/osm-stat-server/refresh-views.sh b/docker/osm-stat-server/refresh-views.sh index 461136d..59f885b 100755 --- a/docker/osm-stat-server/refresh-views.sh +++ b/docker/osm-stat-server/refresh-views.sh @@ -27,5 +27,14 @@ if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'r $DATABASE_URL & fi +if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'refresh materialized view concurrently hashtag_user_statistics%' and state='active'" $DATABASE_URL 2> /dev/null)" == "0" ]; then + # refresh in the background to return immediately + echo "$(date --iso-8601=seconds): Refreshing hashtag/user statistics" + psql -Aqt \ + -c "REFRESH MATERIALIZED VIEW CONCURRENTLY hashtag_user_statistics" \ + -c "UPDATE refreshments SET updated_at=now() where mat_view='hashtag_user_statistics'" \ + $DATABASE_URL & +fi + wait echo "$(date --iso-8601=seconds): Completed" diff --git a/sql/refreshments.sql b/sql/refreshments.sql index 1b933f8..1d827e1 100644 --- a/sql/refreshments.sql +++ b/sql/refreshments.sql @@ -4,4 +4,4 @@ CREATE TABLE refreshments ( PRIMARY KEY(mat_view) ); -INSERT INTO refreshments VALUES ('user_statistics', to_timestamp(0)), ('country_statistics', to_timestamp(0)), ('hashtag_statistics', to_timestamp(0)); \ No newline at end of file +INSERT INTO refreshments VALUES ('user_statistics', to_timestamp(0)), ('country_statistics', to_timestamp(0)), ('hashtag_statistics', to_timestamp(0)), ('hashtag_user_statistics', to_timestamp(0)); \ No newline at end of file From 3c9983caa26c724d9d706ca895221fd90452fda0 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Wed, 21 Aug 2019 13:13:33 -0400 Subject: [PATCH 13/22] Start reorganization of deployment scripts --- deployment/.gitignore | 3 +- deployment/Makefile | 102 ++++++++---------- deployment/config-deployment.mk.template | 23 ++++ deployment/docker-compose.deploy.yml.tpl | 24 ----- .../docker}/Dockerfile | 0 .../docker}/refresh-views.sh | 8 ++ deployment/ecs-params.yml | 8 -- deployment/expand.sh | 21 ---- deployment/scripts/create-log-groups.sh | 18 ++++ deployment/scripts/define-production-tasks.sh | 39 +++++++ deployment/scripts/define-staging-tasks.sh | 39 +++++++ deployment/scripts/get-tag.sh | 17 +++ 12 files changed, 190 insertions(+), 112 deletions(-) create mode 100644 deployment/config-deployment.mk.template delete mode 100644 deployment/docker-compose.deploy.yml.tpl rename {docker/osm-stat-server => deployment/docker}/Dockerfile (100%) rename {docker/osm-stat-server => deployment/docker}/refresh-views.sh (87%) delete mode 100644 deployment/ecs-params.yml delete mode 100755 deployment/expand.sh create mode 100755 deployment/scripts/create-log-groups.sh create mode 100755 deployment/scripts/define-production-tasks.sh create mode 100755 deployment/scripts/define-staging-tasks.sh create mode 100755 deployment/scripts/get-tag.sh diff --git a/deployment/.gitignore b/deployment/.gitignore index 9ca1bff..32e2785 100644 --- a/deployment/.gitignore +++ b/deployment/.gitignore @@ -1,2 +1 @@ -config-aws.mk -docker-compose.deploy.yml +config-*.mk diff --git a/deployment/Makefile b/deployment/Makefile index 83cf95a..ec68d42 100644 --- a/deployment/Makefile +++ b/deployment/Makefile @@ -1,71 +1,59 @@ -include config-aws.mk # Variables for AWS options - -# The osmesa container -LOCAL_IMG := quay.io/geotrellis/osm-stat-server:latest - - -######### -# AWS # -######### +include config-deployment.mk + +# If the user is on master branch, see if we should deploy to production +VERSION_TAG=$(shell ./scripts/get-tag.sh) +ifeq ($(VERSION_TAG), production) + DATABASE=${PRODUCTION_DB} + ECS_CLUSTER=${CLUSTER_NAME_DEPLOYMENT} + TASK_SUFFIX= +else + DATABASE=${STAGING_DB} + ECS_CLUSTER=${CLUSTER_NAME_STAGING} + TASK_SUFFIX=-staging +endif +DB_URI=${DB_BASE_URI}/${DATABASE} + +.EXPORT_ALL_VARIABLES: + +############################# +# Docker image management # +############################# + +.PHONY: login-aws-registry tag-image push-image + +build-container: + make -C .. build login-aws-registry: eval `aws ecr get-login --no-include-email --region ${AWS_REGION}` -tag-image: +tag-image: build-container docker tag ${LOCAL_IMG} ${ECR_REPO} push-image: login-aws-registry tag-image docker push ${ECR_REPO} -.PHONY: docker-compose.deploy.yml - -docker-compose.deploy.yml: - ./expand.sh docker-compose.deploy.yml.tpl > docker-compose.deploy.yml - -configure-cluster: - ecs-cli configure \ - --cluster ${CLUSTER_NAME} \ - --region ${AWS_REGION} \ - --config-name ${CONFIG_NAME} +####################### +# Streaming AWS Tasks # +####################### -cluster-up: - ecs-cli up \ - --keypair ${KEYPAIR} \ - --instance-role ${INSTANCE_ROLE} \ - --size 1 \ - --instance-type ${INSTANCE_TYPE} \ - --cluster-config ${CONFIG_NAME} \ - --subnets ${SUBNETS} \ - --vpc ${VPC} \ - --force \ - --verbose +.PHONY: create-log-groups define-production-tasks define-staging-tasks deploy-stat-server -cluster-down: - ecs-cli down --cluster-config ${CONFIG_NAME} +create-log-groups: + ./scripts/create-log-groups.sh -.PHONY: create-service - -create-service: docker-compose.deploy.yml configure-cluster - ecs-cli compose \ - --file $< create \ - --cluster ${CLUSTER_NAME} - -start-service: docker-compose.deploy.yml configure-cluster create-service - ecs-cli compose --file $< service up \ - --deployment-min-healthy-percent 0 \ - --create-log-groups \ - --cluster ${CLUSTER_NAME} - -stop-service: - ecs-cli compose down - - -######### -# ALL # -######### -build-image: - make -C .. build +define-staging-tasks: + ./scripts/define-staging-tasks.sh -clean: - rm -f docker-compose.deploy.yml +define-production-tasks: + ./scripts/define-production-tasks.sh +deploy-stat-server: + aws ecs create-service \ + --cluster "${ECS_CLUSTER}" \ + --service-name "osmesa-stats-server" \ + --task-definition "osmesa-stat-server${TASK_SUFFIX}" \ + --desired-count 1 \ + --launch-type FARGATE \ + --scheduling-strategy REPLICA \ + --network-configuration ${NETWORK_CONFIGURATION} diff --git a/deployment/config-deployment.mk.template b/deployment/config-deployment.mk.template new file mode 100644 index 0000000..7ca67bc --- /dev/null +++ b/deployment/config-deployment.mk.template @@ -0,0 +1,23 @@ +################################################################################ +# AWS properties +################################################################################ +export KEYPAIR := +export SUBNET := +export AWS_REGION := us-east-1 +export IAM_ACCOUNT := + +################################################################################ +# Streaming resource definitions +################################################################################ +export STREAMING_INSTANCE_TYPE := m4.xlarge +export ECR_IMAGE := +export AWS_LOG_GROUP := osmesa-stats-server +export ECS_SUBNET := ${SUBNET} +export ECS_SECURITY_GROUP := + +export CLUSTER_NAME_DEPLOYMENT := +export CLUSTER_NAME_STAGING := + +export DB_BASE_URI := +export PRODUCTION_DB := +export STAGING_DB := diff --git a/deployment/docker-compose.deploy.yml.tpl b/deployment/docker-compose.deploy.yml.tpl deleted file mode 100644 index ca0e9f9..0000000 --- a/deployment/docker-compose.deploy.yml.tpl +++ /dev/null @@ -1,24 +0,0 @@ -version: '3.0' -services: - stats-server: - image: ${ECR_REPO}:latest - command: java -jar /opt/osm-stat-server.jar - ports: - - ${PORT}:${PORT} - environment: - - HOST=${HOST} - - PORT=${PORT} - - DB_DRIVER=${DB_DRIVER} - - DB_URL=${DB_URL} - - DB_USER=${DB_USER} - - DB_PASS=${DB_PASS} - - TILE_BUCKET=${TILE_BUCKET} - - TILE_PREFIX=${TILE_PREFIX} - - GZIPPED=${GZIPPED} - - DATABASE_URL=${DATABASE_URL} - logging: - driver: awslogs - options: - awslogs-group: ${AWS_LOG_GROUP} - awslogs-region: ${AWS_REGION} - awslogs-stream-prefix: osmesa-stat-server diff --git a/docker/osm-stat-server/Dockerfile b/deployment/docker/Dockerfile similarity index 100% rename from docker/osm-stat-server/Dockerfile rename to deployment/docker/Dockerfile diff --git a/docker/osm-stat-server/refresh-views.sh b/deployment/docker/refresh-views.sh similarity index 87% rename from docker/osm-stat-server/refresh-views.sh rename to deployment/docker/refresh-views.sh index 59f885b..563a7cd 100755 --- a/docker/osm-stat-server/refresh-views.sh +++ b/deployment/docker/refresh-views.sh @@ -1,5 +1,7 @@ #!/usr/bin/env bash +echo "$(date --iso-8601=seconds): Starting view refreshment" + if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'refresh materialized view concurrently user_statistics%' and state='active'" $DATABASE_URL 2> /dev/null)" == "0" ]; then echo "$(date --iso-8601=seconds): Refreshing user statistics" # refresh in the background to return immediately @@ -7,6 +9,8 @@ if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'r -c "REFRESH MATERIALIZED VIEW CONCURRENTLY user_statistics" \ -c "UPDATE refreshments SET updated_at=now() where mat_view='user_statistics'" \ $DATABASE_URL & +else + echo "$(date --iso-8601=seconds): User stats table already refreshing" fi if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'refresh materialized view concurrently hashtag_statistics%' and state='active'" $DATABASE_URL 2> /dev/null)" == "0" ]; then @@ -16,6 +20,8 @@ if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'r -c "REFRESH MATERIALIZED VIEW CONCURRENTLY hashtag_statistics" \ -c "UPDATE refreshments SET updated_at=now() where mat_view='hashtag_statistics'" \ $DATABASE_URL & +else + echo "$(date --iso-8601=seconds): Hashtag stats table already refreshing" fi if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'refresh materialized view concurrently country_statistics%' and state='active'" $DATABASE_URL 2> /dev/null)" == "0" ]; then @@ -25,6 +31,8 @@ if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'r -c "REFRESH MATERIALIZED VIEW CONCURRENTLY country_statistics" \ -c "UPDATE refreshments SET updated_at=now() where mat_view='country_statistics'" \ $DATABASE_URL & +else + echo "$(date --iso-8601=seconds): Country stats table already refreshing" fi if [ "$(psql -Aqtc "select count(pid) from pg_stat_activity where query ilike 'refresh materialized view concurrently hashtag_user_statistics%' and state='active'" $DATABASE_URL 2> /dev/null)" == "0" ]; then diff --git a/deployment/ecs-params.yml b/deployment/ecs-params.yml deleted file mode 100644 index e00349b..0000000 --- a/deployment/ecs-params.yml +++ /dev/null @@ -1,8 +0,0 @@ -# this file should be in deployment dir (relative to Makefile path) -# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html -# NOTE: comment it out for the test case -version: 1 -task_definition: - services: - stats-server: - mem_reservation: 1024m diff --git a/deployment/expand.sh b/deployment/expand.sh deleted file mode 100755 index 49ef41e..0000000 --- a/deployment/expand.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/sh - -set -e - -PROG=$(basename $0) - -usage() -{ - echo "${PROG} " -} - -expand() -{ - local template="$(cat $1)" - eval "echo \"${template}\"" -} - -case $# in - 1) expand "$1";; - *) usage; exit 0;; -esac diff --git a/deployment/scripts/create-log-groups.sh b/deployment/scripts/create-log-groups.sh new file mode 100755 index 0000000..2debcd9 --- /dev/null +++ b/deployment/scripts/create-log-groups.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +DEFINED_GROUPS=$(aws logs describe-log-groups | jq '.logGroups[].logGroupName' | sed -e 's/"//g') + +if [[ $DEFINED_GROUPS != *"/ecs/${AWS_LOG_GROUP}"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP} +fi + +if [[ $DEFINED_GROUPS != *"/ecs/${AWS_LOG_GROUP}-staging"* ]]; then + aws logs create-log-group \ + --log-group-name /ecs/${AWS_LOG_GROUP}-staging +fi diff --git a/deployment/scripts/define-production-tasks.sh b/deployment/scripts/define-production-tasks.sh new file mode 100755 index 0000000..ac3b966 --- /dev/null +++ b/deployment/scripts/define-production-tasks.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family streaming-stats-updater-production \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "4 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"java\", + \"-jar\", \"/opt/osm-stat-server.jar\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:production\", + \"name\": \"osmesa-stat-server\" + } + ]" diff --git a/deployment/scripts/define-staging-tasks.sh b/deployment/scripts/define-staging-tasks.sh new file mode 100755 index 0000000..f5c54c6 --- /dev/null +++ b/deployment/scripts/define-staging-tasks.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws ecs register-task-definition \ + --family streaming-stats-updater-production \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "1 vCPU" \ + --memory "4 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}-staging\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"java\", + \"-jar\", \"/opt/osm-stat-server.jar\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${STAGING_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"osmesa-stat-server-staging\" + } + ]" diff --git a/deployment/scripts/get-tag.sh b/deployment/scripts/get-tag.sh new file mode 100755 index 0000000..388bc0d --- /dev/null +++ b/deployment/scripts/get-tag.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ "$(git branch | grep '* master')" = "* master" ]; then + while true; do + echo "You are on the master branch. Do you wish to publish to the production tag?" + select yn in "Yes" "No"; do + case $yn in + Yes ) VERSION_TAG="production"; break;; + No ) VERSION_TAG="latest"; break;; + esac + done + done +else + VERSION_TAG="latest" +fi + +echo "${VERSION_TAG}" From 38e42634963e8463ff603ab55526c7a6b24b5e21 Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Wed, 21 Aug 2019 14:11:04 -0400 Subject: [PATCH 14/22] Fix container build procedure --- deployment/Makefile | 6 +++--- deployment/build-container.sh | 22 ++++++++++++++++++++++ deployment/config-aws.mk.example | 28 ---------------------------- 3 files changed, 25 insertions(+), 31 deletions(-) create mode 100755 deployment/build-container.sh delete mode 100644 deployment/config-aws.mk.example diff --git a/deployment/Makefile b/deployment/Makefile index ec68d42..ff545b8 100644 --- a/deployment/Makefile +++ b/deployment/Makefile @@ -22,16 +22,16 @@ DB_URI=${DB_BASE_URI}/${DATABASE} .PHONY: login-aws-registry tag-image push-image build-container: - make -C .. build + ./build-container.sh login-aws-registry: eval `aws ecr get-login --no-include-email --region ${AWS_REGION}` tag-image: build-container - docker tag ${LOCAL_IMG} ${ECR_REPO} + docker tag osm_stat_server:${VERSION_TAG} ${ECR_IMAGE}:${VERSION_TAG} push-image: login-aws-registry tag-image - docker push ${ECR_REPO} + docker push ${ECR_IMAGE}:${VERSION_TAG} ####################### # Streaming AWS Tasks # diff --git a/deployment/build-container.sh b/deployment/build-container.sh new file mode 100755 index 0000000..368809f --- /dev/null +++ b/deployment/build-container.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "No version tag has been set. Do not run this script directly; instead, issue" + echo " make build-container" + echo "from the 'streaming' directory." + exit 1 +else + echo "Version tag is set to '${VERSION_TAG}'" +fi + +set -xe +SBT_DIR=$(pwd)/.. +JAR_DIR=${SBT_DIR}/target/scala-2.11/ +DOCKER_DIR=$(pwd)/docker + +cd ${SBT_DIR} +./sbt clean assembly +cp ${JAR_DIR}/osm-stat-server.jar ${DOCKER_DIR}/osm-stat-server.jar + +cd ${DOCKER_DIR} +docker build -f Dockerfile --tag osm_stat_server:${VERSION_TAG} . diff --git a/deployment/config-aws.mk.example b/deployment/config-aws.mk.example deleted file mode 100644 index 13de95a..0000000 --- a/deployment/config-aws.mk.example +++ /dev/null @@ -1,28 +0,0 @@ -export CONFIG_NAME := osm-stat-stream-config - -# AWS properties -export CLUSTER_NAME := osm-stat-stream-cluster -export INSTANCE_TYPE := m4.xlarge -export KEYPAIR := [AWS key pair] -export VPC := [VPC ID] -export SUBNETS := [comma-delimited list of subnets within the above VPC] -export SECURITY_GROUP := [comma-delimited list of AWS Security Group IDs] -export ECR_REPO := [AWS ECR repo URI] -export AWS_LOG_GROUP := osm-stats-server -export AWS_REGION := us-east-1 -export INSTANCE_ROLE := [IAM instance role] - -export HOST := 0.0.0.0 -export PORT := 80 - -export DB_DRIVER := org.postgresql.Driver -export DB_URL := [database URI, e.g. jdbc:postgresql://[:]/] -export DB_USER := [database username] -export DB_PASS := [database password] - -export DATABASE_URL := [standard database URL, for psql, e.g. postgresql://[user][:][password]@[:]/] - -export TILE_BUCKET := [S3 bucket] -export TILE_PREFIX := [S3 prefix] -export TILE_SUFFIX := [Tile suffix (typically file extension, including '.')] -export GZIPPED := [Whether to expect pre-gzipped tiles on S3 (true or false)] \ No newline at end of file From d43d6dd1f817cf7b7bcdd920ccca0ca4ce24ae2e Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Wed, 21 Aug 2019 17:30:53 -0400 Subject: [PATCH 15/22] Improvements to task management; stat server now deploys properly --- deployment/Makefile | 7 +- deployment/docker/.gitignore | 1 + deployment/docker/Dockerfile | 2 +- deployment/scripts/define-production-tasks.sh | 78 +++++++++++++++++-- deployment/scripts/define-staging-tasks.sh | 76 +++++++++++++++++- deployment/scripts/stop-stat-server.sh | 24 ++++++ 6 files changed, 176 insertions(+), 12 deletions(-) create mode 100644 deployment/docker/.gitignore create mode 100755 deployment/scripts/stop-stat-server.sh diff --git a/deployment/Makefile b/deployment/Makefile index ff545b8..5d30b2a 100644 --- a/deployment/Makefile +++ b/deployment/Makefile @@ -37,7 +37,7 @@ push-image: login-aws-registry tag-image # Streaming AWS Tasks # ####################### -.PHONY: create-log-groups define-production-tasks define-staging-tasks deploy-stat-server +.PHONY: create-log-groups define-production-tasks define-staging-tasks stop-stat-server deploy-stat-server create-log-groups: ./scripts/create-log-groups.sh @@ -48,7 +48,10 @@ define-staging-tasks: define-production-tasks: ./scripts/define-production-tasks.sh -deploy-stat-server: +stop-stat-server: + ./scripts/stop-stat-server.sh + +deploy-stat-server: stop-stat-server aws ecs create-service \ --cluster "${ECS_CLUSTER}" \ --service-name "osmesa-stats-server" \ diff --git a/deployment/docker/.gitignore b/deployment/docker/.gitignore new file mode 100644 index 0000000..d392f0e --- /dev/null +++ b/deployment/docker/.gitignore @@ -0,0 +1 @@ +*.jar diff --git a/deployment/docker/Dockerfile b/deployment/docker/Dockerfile index 5162723..83fa28d 100644 --- a/deployment/docker/Dockerfile +++ b/deployment/docker/Dockerfile @@ -9,4 +9,4 @@ RUN \ COPY osm-stat-server.jar /opt/osm-stat-server.jar COPY refresh-views.sh /usr/local/bin/refresh-views.sh -WORKDIR /opt \ No newline at end of file +WORKDIR /opt diff --git a/deployment/scripts/define-production-tasks.sh b/deployment/scripts/define-production-tasks.sh index ac3b966..7e6a93f 100755 --- a/deployment/scripts/define-production-tasks.sh +++ b/deployment/scripts/define-production-tasks.sh @@ -6,13 +6,13 @@ if [ -z ${VERSION_TAG+x} ]; then fi aws ecs register-task-definition \ - --family streaming-stats-updater-production \ + --family osmesa-stat-server \ --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ --network-mode awsvpc \ --requires-compatibilities EC2 FARGATE \ - --cpu "1 vCPU" \ - --memory "4 GB" \ + --cpu "0.5 vCPU" \ + --memory "1 GB" \ --container-definitions "[ { \"logConfiguration\": { @@ -31,9 +31,77 @@ aws ecs register-task-definition \ { \"name\": \"DATABASE_URL\", \"value\": \"${DB_BASE_URI}/${PRODUCTION_DB}\" - } + }, + { + \"name\": \"DB_DRIVER\", + \"value\": \"${DB_DRIVER}\" + }, + { + \"name\": \"DB_URL\", + \"value\": \"${DB_JDBC_BASE_URL}/${PRODUCTION_DB}\" + }, + { + \"name\": \"DB_USER\", + \"value\": \"${DB_USER}\" + }, + { + \"name\": \"DB_PASS\", + \"value\": \"${DB_PASS}\" + }, + { + \"name\": \"GZIPPED\", + \"value\": \"true\" + }, + { + \"name\": \"HOST\", + \"value\": \"0.0.0.0\" + }, + { + \"name\": \"PORT\", + \"value\": \"80\" + }, + { + \"name\": \"TILE_BUCKET\", + \"value\": \"${TILE_BUCKET}\" + }, + { + \"name\": \"TILE_PREFIX\", + \"value\": \"${TILE_PREFIX}\" + } ], - \"image\": \"${ECR_IMAGE}:production\", + \"image\": \"${ECR_IMAGE}:latest\", \"name\": \"osmesa-stat-server\" } ]" + +aws ecs register-task-definition \ + --family osmesa-stats-view-refresher \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "0.25 vCPU" \ + --memory "0.5 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"refresh-views.sh\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${PRODUCTION_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"stats-view-refresher\" + } + ]" diff --git a/deployment/scripts/define-staging-tasks.sh b/deployment/scripts/define-staging-tasks.sh index f5c54c6..c2d8ff6 100755 --- a/deployment/scripts/define-staging-tasks.sh +++ b/deployment/scripts/define-staging-tasks.sh @@ -6,13 +6,13 @@ if [ -z ${VERSION_TAG+x} ]; then fi aws ecs register-task-definition \ - --family streaming-stats-updater-production \ + --family osmesa-stat-server-staging \ --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ --network-mode awsvpc \ --requires-compatibilities EC2 FARGATE \ - --cpu "1 vCPU" \ - --memory "4 GB" \ + --cpu "0.5 vCPU" \ + --memory "1 GB" \ --container-definitions "[ { \"logConfiguration\": { @@ -31,9 +31,77 @@ aws ecs register-task-definition \ { \"name\": \"DATABASE_URL\", \"value\": \"${DB_BASE_URI}/${STAGING_DB}\" - } + }, + { + \"name\": \"DB_DRIVER\", + \"value\": \"${DB_DRIVER}\" + }, + { + \"name\": \"DB_URL\", + \"value\": \"${DB_JDBC_BASE_URL}/${STAGING_DB}\" + }, + { + \"name\": \"DB_USER\", + \"value\": \"${DB_USER}\" + }, + { + \"name\": \"DB_PASS\", + \"value\": \"${DB_PASS}\" + }, + { + \"name\": \"GZIPPED\", + \"value\": \"true\" + }, + { + \"name\": \"HOST\", + \"value\": \"0.0.0.0\" + }, + { + \"name\": \"PORT\", + \"value\": \"80\" + }, + { + \"name\": \"TILE_BUCKET\", + \"value\": \"${TILE_BUCKET}\" + }, + { + \"name\": \"TILE_PREFIX\", + \"value\": \"${TILE_PREFIX}\" + } ], \"image\": \"${ECR_IMAGE}:latest\", \"name\": \"osmesa-stat-server-staging\" } ]" + +aws ecs register-task-definition \ + --family osmesa-stats-view-refresher-staging \ + --task-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ECSTaskS3" \ + --execution-role-arn "arn:aws:iam::${IAM_ACCOUNT}:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities EC2 FARGATE \ + --cpu "0.25 vCPU" \ + --memory "0.5 GB" \ + --container-definitions "[ + { + \"logConfiguration\": { + \"logDriver\": \"awslogs\", + \"options\": { + \"awslogs-group\": \"/ecs/${AWS_LOG_GROUP}-staging\", + \"awslogs-region\": \"${AWS_REGION}\", + \"awslogs-stream-prefix\": \"ecs\" + } + }, + \"command\": [ + \"refresh-views.sh\" + ], + \"environment\": [ + { + \"name\": \"DATABASE_URL\", + \"value\": \"${DB_BASE_URI}/${STAGING_DB}\" + } + ], + \"image\": \"${ECR_IMAGE}:latest\", + \"name\": \"stats-view-refresher-staging\" + } + ]" diff --git a/deployment/scripts/stop-stat-server.sh b/deployment/scripts/stop-stat-server.sh new file mode 100755 index 0000000..f37b9a0 --- /dev/null +++ b/deployment/scripts/stop-stat-server.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +check_status() { + STATUS=$(aws ecs describe-services --services osmesa-stats-server --cluster $ECS_CLUSTER | jq '.services[].status') +} + +check_status +if [[ $STATUS == "\"ACTIVE\"" ]]; then + aws ecs delete-service --service osmesa-stats-server --cluster $ECS_CLUSTER --force + echo "Waiting for service to shut down" + check_status + while [[ $STATUS != "\"INACTIVE\"" ]]; do + echo " current status: $STATUS, still waiting" + sleep 15s + check_status + done +else + echo "Status was $STATUS, nothing to stop" +fi From 0bec1a13a9e8203f14181cb3ddaa8cffd83982ce Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 23 Aug 2019 10:25:37 -0400 Subject: [PATCH 16/22] Complete deployment scripts (tested in staging environment) --- deployment/Makefile | 3 ++ deployment/scripts/deploy-stats-refresher.sh | 29 ++++++++++++++++++++ deployment/scripts/stop-stat-server.sh | 1 + 3 files changed, 33 insertions(+) create mode 100755 deployment/scripts/deploy-stats-refresher.sh diff --git a/deployment/Makefile b/deployment/Makefile index 5d30b2a..0034b65 100644 --- a/deployment/Makefile +++ b/deployment/Makefile @@ -60,3 +60,6 @@ deploy-stat-server: stop-stat-server --launch-type FARGATE \ --scheduling-strategy REPLICA \ --network-configuration ${NETWORK_CONFIGURATION} + +deploy-stats-refresher: + ./scripts/deploy-stats-refresher.sh diff --git a/deployment/scripts/deploy-stats-refresher.sh b/deployment/scripts/deploy-stats-refresher.sh new file mode 100755 index 0000000..571f7be --- /dev/null +++ b/deployment/scripts/deploy-stats-refresher.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +if [ -z ${VERSION_TAG+x} ]; then + echo "Do not run this script directly. Use the Makefile in the parent directory." + exit 1 +fi + +aws events put-rule --schedule-expression "rate(1 minute)" --name osmesa-stats-view-refresher${TASK_SUFFIX} +aws events put-targets \ + --rule "osmesa-stats-view-refresher${TASK_SUFFIX}" \ + --targets "[ + { + \"Id\": \"osmesa-stats-view-refresher${TASK_SUFFIX}\", + \"Arn\": \"arn:aws:ecs:${AWS_REGION}:${IAM_ACCOUNT}:cluster/${ECS_CLUSTER}\", + \"RoleArn\": \"arn:aws:iam::${IAM_ACCOUNT}:role/ecsEventsRole\", + \"EcsParameters\": { + \"TaskDefinitionArn\": \"arn:aws:ecs:${AWS_REGION}:${IAM_ACCOUNT}:task-definition/osmesa-stats-view-refresher${TASK_SUFFIX}\", + \"TaskCount\": 1, + \"LaunchType\": \"FARGATE\", + \"NetworkConfiguration\": { + \"awsvpcConfiguration\": { + \"Subnets\": [\"${ECS_SUBNET}\"], + \"SecurityGroups\": [\"${ECS_SECURITY_GROUP}\"], + \"AssignPublicIp\": \"DISABLED\" + } + } + } + } + ]" diff --git a/deployment/scripts/stop-stat-server.sh b/deployment/scripts/stop-stat-server.sh index f37b9a0..23a9e41 100755 --- a/deployment/scripts/stop-stat-server.sh +++ b/deployment/scripts/stop-stat-server.sh @@ -19,6 +19,7 @@ if [[ $STATUS == "\"ACTIVE\"" ]]; then sleep 15s check_status done + echo " final status: $STATUS" else echo "Status was $STATUS, nothing to stop" fi From 5e9698567298de84f2fb69ee1910622d5c46dedc Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 23 Aug 2019 10:26:20 -0400 Subject: [PATCH 17/22] Bring SQL queries in line with deployed DB schema --- sql/user_statistics.sql | 1 + .../osmesa/server/stats/CountryStats.scala | 6 +++- .../osmesa/server/stats/HashtagStats.scala | 17 +++++++++-- .../scala/osmesa/server/stats/UserStats.scala | 28 ++++++++++++------- 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/sql/user_statistics.sql b/sql/user_statistics.sql index 58ddfe8..a14ae34 100644 --- a/sql/user_statistics.sql +++ b/sql/user_statistics.sql @@ -37,6 +37,7 @@ CREATE MATERIALIZED VIEW user_statistics AS count(*) changesets, sum(coalesce(total_edits, 0)) edits FROM changesets + WHERE coalesce(closed_at, created_at) IS NOT NULL GROUP BY user_id, day ), edit_days AS ( diff --git a/src/main/scala/osmesa/server/stats/CountryStats.scala b/src/main/scala/osmesa/server/stats/CountryStats.scala index 22d5ff4..c56d5d6 100644 --- a/src/main/scala/osmesa/server/stats/CountryStats.scala +++ b/src/main/scala/osmesa/server/stats/CountryStats.scala @@ -20,7 +20,9 @@ case class CountryStats(countryId: Long, updatedAt: Option[Instant], changesetCount: Option[Int], editCount: Option[Int], + userChangesets: Json, userEdits: Json, + hashtagChangesets: Json, hashtagEdits: Json) object CountryStats extends Implicits { @@ -39,7 +41,9 @@ object CountryStats extends Implicits { updated_at, changeset_count, edit_count, - coalesce(user_edit_counts, '{}'::jsonb) user_edit_counts, + coalesce(user_changesets, '{}'::jsonb) user_changesets, + coalesce(user_edits, '{}'::jsonb) user_edits, + coalesce(hashtag_changesets, '{}'::jsonb) hashtag_changesets, coalesce(hashtag_edits, '{}'::jsonb) hashtag_edits FROM country_statistics diff --git a/src/main/scala/osmesa/server/stats/HashtagStats.scala b/src/main/scala/osmesa/server/stats/HashtagStats.scala index f00a7ab..3034ed3 100644 --- a/src/main/scala/osmesa/server/stats/HashtagStats.scala +++ b/src/main/scala/osmesa/server/stats/HashtagStats.scala @@ -1,18 +1,26 @@ package osmesa.server.stats +import java.time.Instant + import cats.effect._ import cats.implicits._ import doobie._ import doobie.implicits._ import io.circe._ import io.circe.generic.extras.semiauto._ +import io.circe.java8.time._ import osmesa.server._ import osmesa.server.model._ case class HashtagStats(tag: String, measurements: Json, counts: Json, - users: Json) + changesetCount: Option[Int], + editCount: Option[Int], + lastEdit: Option[Instant], + updatedAt: Option[Instant], + userChangesets: Json, + userEdits: Json) object HashtagStats extends Implicits { implicit val userHashtagDecoder: Decoder[HashtagStats] = deriveDecoder @@ -24,7 +32,12 @@ object HashtagStats extends Implicits { tag, coalesce(measurements, '{}'::jsonb) measurements, coalesce(counts, '{}'::jsonb) counts, - coalesce(users, '{}'::jsonb) users + changeset_count, + edit_count, + last_edit, + updated_at, + coalesce(user_changesets, '{}'::jsonb) user_changesets, + coalesce(user_edits, '{}'::jsonb) user_edits FROM hashtag_statistics """ diff --git a/src/main/scala/osmesa/server/stats/UserStats.scala b/src/main/scala/osmesa/server/stats/UserStats.scala index 7569485..2502a4b 100644 --- a/src/main/scala/osmesa/server/stats/UserStats.scala +++ b/src/main/scala/osmesa/server/stats/UserStats.scala @@ -13,16 +13,20 @@ import osmesa.server.model.{IdNotFoundError, OsmStatError, ResultPage} case class UserStats(uid: Long, name: Option[String], - extentUri: Option[String], measurements: Json, counts: Json, lastEdit: Option[Instant], + updatedAt: Option[Instant], changesetCount: Option[Int], editCount: Option[Int], - editors: Json, - editTimes: Json, - countries: Json, - hashtags: Json) + editorChangesets: Json, + editorEdits: Json, + dayChangesets: Json, + dayEdits: Json, + countryChangesets: Json, + countryEdits: Json, + hashtagChangesets: Json, + hashtagEdits: Json) object UserStats extends Implicits { implicit val userStatsDecoder: Decoder[UserStats] = deriveDecoder @@ -33,16 +37,20 @@ object UserStats extends Implicits { SELECT id, name, - extent_uri, coalesce(measurements, '{}'::jsonb) measurements, coalesce(counts, '{}'::jsonb) counts, last_edit, + updated_at, changeset_count, edit_count, - coalesce(editors, '{}'::jsonb) editors, - coalesce(edit_times, '{}'::jsonb) edit_times, - coalesce(countries, '{}'::jsonb) countries, - coalesce(hashtags, '{}'::jsonb) hashtags + coalesce(editor_changesets, '{}'::jsonb) editor_changesets, + coalesce(editor_edits, '{}'::jsonb) editor_edits, + coalesce(day_changesets, '{}'::jsonb) day_changesets, + coalesce(day_edits, '{}'::jsonb) day_edits, + coalesce(country_changesets, '{}'::jsonb) country_changesets, + coalesce(country_edits, '{}'::jsonb) country_edits, + coalesce(hashtag_changesets, '{}'::jsonb) hashtag_changesets, + coalesce(hashtag_edits, '{}'::jsonb) hashtag_edits FROM user_statistics """ From f3a30a99acc3ebc4fd231c33ca560a828eb3248b Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Sat, 14 Sep 2019 09:57:18 -0700 Subject: [PATCH 18/22] Name hashtag_user_statistics correctly --- sql/{hashtag_user_statistic.sql => hashtag_user_statistics.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sql/{hashtag_user_statistic.sql => hashtag_user_statistics.sql} (100%) diff --git a/sql/hashtag_user_statistic.sql b/sql/hashtag_user_statistics.sql similarity index 100% rename from sql/hashtag_user_statistic.sql rename to sql/hashtag_user_statistics.sql From c844a4245227225dd64ca0fda0773c72a9a09f21 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Sat, 14 Sep 2019 10:07:43 -0700 Subject: [PATCH 19/22] Surface edit count --- sql/hashtag_user_statistics.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hashtag_user_statistics.sql b/sql/hashtag_user_statistics.sql index 60799a4..e5d309f 100644 --- a/sql/hashtag_user_statistics.sql +++ b/sql/hashtag_user_statistics.sql @@ -78,6 +78,7 @@ CREATE MATERIALIZED VIEW hashtag_user_statistics AS counts, last_edit, changeset_count, + edit_count, updated_at FROM general LEFT OUTER JOIN hashtags ON general.hashtag_id = hashtags.id From a16728d53de0a0907bb98d7fb766077ee7a4e820 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Sat, 14 Sep 2019 10:23:55 -0700 Subject: [PATCH 20/22] Hashtag user stats endpoint --- .../scala/osmesa/server/DefaultRouter.scala | 14 +++- .../server/stats/HashtagUserStats.scala | 76 +++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/osmesa/server/stats/HashtagUserStats.scala diff --git a/src/main/scala/osmesa/server/DefaultRouter.scala b/src/main/scala/osmesa/server/DefaultRouter.scala index 98ea05e..abf8a95 100644 --- a/src/main/scala/osmesa/server/DefaultRouter.scala +++ b/src/main/scala/osmesa/server/DefaultRouter.scala @@ -9,7 +9,7 @@ import org.http4s.circe._ import org.http4s.dsl.Http4sDsl import org.http4s.headers.`Content-Type` import osmesa.server.model._ -import osmesa.server.stats.{CountryStats, HashtagStats, RefreshStats, UserStats} +import osmesa.server.stats._ class DefaultRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { @@ -48,6 +48,18 @@ class DefaultRouter(trans: Transactor[IO]) extends Http4sDsl[IO] { result <- eitherResult(io) } yield result + case GET -> Root / "campaigns" / hashtag / "users" :? OptionalPageQueryParamMatcher(pageNum) => + for { + io <- HashtagUserStats.getPage(hashtag, pageNum.getOrElse(1)) + result <- eitherResult(io) + } yield result + + case GET -> Root / "campaigns" / hashtag / LongVar(uid) => + for { + io <- HashtagUserStats.byTagAndUid(hashtag, uid) + result <- eitherResult(io) + } yield result + case GET -> Root / "countries" :? OptionalPageQueryParamMatcher(pageNum) => Ok(Country.getPage(pageNum.getOrElse(1)).map(_.asJson)) diff --git a/src/main/scala/osmesa/server/stats/HashtagUserStats.scala b/src/main/scala/osmesa/server/stats/HashtagUserStats.scala new file mode 100644 index 0000000..208c8eb --- /dev/null +++ b/src/main/scala/osmesa/server/stats/HashtagUserStats.scala @@ -0,0 +1,76 @@ +package osmesa.server.stats + +import java.time.Instant + +import cats.effect._ +import cats.implicits._ +import doobie._ +import doobie.implicits._ +import io.circe._ +import io.circe.generic.extras.semiauto._ +import io.circe.java8.time._ +import osmesa.server._ +import osmesa.server.model._ + +case class HashtagUserStats(tag: String, + uid: Long, + name: Option[String], + measurements: Json, + counts: Json, + lastEdit: Option[Instant], + changesetCount: Option[Int], + editCount: Option[Int] + ) + +object HashtagUserStats extends Implicits { + implicit val hashtagUserDecoder: Decoder[HashtagUserStats] = deriveDecoder + implicit val hashtagUserEncoder: Encoder[HashtagUserStats] = deriveEncoder + + private val selectF = + fr""" + SELECT + hashtag tag, + user_id uid, + name, + coalesce(measurements, '{}'::jsonb) measurements, + coalesce(counts, '{}'::jsonb) counts, + last_edit, + changeset_count, + edit_count + FROM + hashtag_user_statistics + """ + + def byTagAndUid( + tag: String, + uid: Long + )(implicit xa: Transactor[IO]): IO[Either[OsmStatError, HashtagUserStats]] = + (selectF ++ fr"WHERE hashtag = $tag AND user_id = $uid") + .query[HashtagUserStats] + .option + .attempt + .transact(xa) + .map { + case Right(hashtagOrNone) => + hashtagOrNone match { + case Some(ht) => Right(ht) + case None => Left(IdNotFoundError("hashtag_user_statistics", tag)) + } + case Left(err) => Left(UnknownError(err.toString)) + } + + def getPage(tag: String, pageNum: Int, pageSize: Int = 25)( + implicit xa: Transactor[IO] + ): IO[Either[OsmStatError, ResultPage[HashtagUserStats]]] = { + val offset = (pageNum - 1) * pageSize + (selectF ++ fr"WHERE hashtag = $tag ORDER BY hashtag, user_id ASC LIMIT $pageSize OFFSET $offset") + .query[HashtagUserStats] + .to[List] + .attempt + .transact(xa) + .map { + case Right(results) => Right(ResultPage(results, pageNum)) + case Left(err) => Left(UnknownError(err.toString)) + } + } +} From 5156a5be88a62e68f449e8cb8a80b9df5757d691 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Sun, 15 Sep 2019 10:27:33 -0700 Subject: [PATCH 21/22] Provide a default JDBC driver --- src/main/resources/application.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index cae588b..99fe237 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -6,6 +6,7 @@ server { } database { + driver = "org.postgresql.Driver" driver = ${?DB_DRIVER} url = ${?DB_URL} user = ${?DB_USER} From 7d65999610a0dde301557fc2ff244079580d0267 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Sun, 15 Sep 2019 10:28:48 -0700 Subject: [PATCH 22/22] Improve naming --- src/main/scala/osmesa/server/stats/HashtagStats.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/osmesa/server/stats/HashtagStats.scala b/src/main/scala/osmesa/server/stats/HashtagStats.scala index 3034ed3..9d8a913 100644 --- a/src/main/scala/osmesa/server/stats/HashtagStats.scala +++ b/src/main/scala/osmesa/server/stats/HashtagStats.scala @@ -23,8 +23,8 @@ case class HashtagStats(tag: String, userEdits: Json) object HashtagStats extends Implicits { - implicit val userHashtagDecoder: Decoder[HashtagStats] = deriveDecoder - implicit val userHashtagEncoder: Encoder[HashtagStats] = deriveEncoder + implicit val hashtagDecoder: Decoder[HashtagStats] = deriveDecoder + implicit val hashtagEncoder: Encoder[HashtagStats] = deriveEncoder private val selectF = fr"""