diff --git a/src/server/api/API_ingest/shelterluv_people.py b/src/server/api/API_ingest/shelterluv_people.py index 76932a7e..67afedd9 100644 --- a/src/server/api/API_ingest/shelterluv_people.py +++ b/src/server/api/API_ingest/shelterluv_people.py @@ -1,8 +1,12 @@ -import requests, os -from models import ShelterluvPeople -from config import engine -from sqlalchemy.orm import sessionmaker +import os +import requests import structlog +import time +from sqlalchemy.orm import sessionmaker + +from config import engine +from models import ShelterluvPeople + logger = structlog.get_logger() try: @@ -23,6 +27,7 @@ TEST_MODE=os.getenv("TEST_MODE") # if not present, has value None LIMIT = 100 +MAX_RETRIES = 10 ################################# # This script is used to fetch data from shelterluv API. # Please be mindful of your usage. @@ -41,20 +46,38 @@ def store_shelterluv_people_all(): offset = 0 has_more = True Session = sessionmaker(engine) + retries = 0 with Session() as session: logger.debug("Truncating table shelterluvpeople") - session.execute("TRUNCATE TABLE shelterluvpeople") - logger.debug("Start getting shelterluv contacts from people table") while has_more: - r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset), - headers={"x-api-key": SHELTERLUV_SECRET_TOKEN}) - response = r.json() + if retries > MAX_RETRIES: + raise Exception("reached max retries for get store_shelterluv_people_all") + + try: + r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset), + headers={"x-api-key": SHELTERLUV_SECRET_TOKEN}) + except Exception as e: + logger.error("store_shelterluv_people_all failed with %s, retrying...", e) + retries += 1 + continue + + if r.status_code != 200: + logger.error("store_shelterluv_people_all %s code, retrying...", r.status_code) + retries += 1 + continue + + try: + response = r.json() + except Exception as e: + logger.error("store_shelterluv_people_all JSON decode failed with %s", e) + retries += 1 + continue + for person in response["people"]: - #todo: Does this need more "null checks"? session.add(ShelterluvPeople(firstname=person["Firstname"], lastname=person["Lastname"], id=person["ID"] if "ID" in person else None, @@ -69,9 +92,11 @@ def store_shelterluv_people_all(): phone=person["Phone"], animal_ids=person["Animal_ids"])) offset += LIMIT + retries = 0 has_more = response["has_more"] if not TEST_MODE else response["has_more"] and offset < 1000 if offset % 1000 == 0: logger.debug("Reading offset %s", str(offset)) + time.sleep(0.2) session.commit() logger.debug("Finished getting shelterluv contacts from people table") diff --git a/src/server/api/API_ingest/sl_animal_events.py b/src/server/api/API_ingest/sl_animal_events.py index 43170184..8a747792 100644 --- a/src/server/api/API_ingest/sl_animal_events.py +++ b/src/server/api/API_ingest/sl_animal_events.py @@ -1,6 +1,7 @@ import json import os import posixpath as path +import time import structlog @@ -25,6 +26,7 @@ BASE_URL = "http://shelterluv.com/api/" MAX_COUNT = 100 # Max records the API will return for one call +MAX_RETRY = 10 # Get the API key try: @@ -75,8 +77,9 @@ def get_event_count(): """Test that server is operational and get total event count.""" - events = "v1/events&offset=0&limit=1" + events = "v1/events?offset=0&limit=1" URL = path.join(BASE_URL, events) + logger.info("making call: %s", URL) try: response = requests.request("GET", URL, headers=headers) @@ -85,7 +88,7 @@ def get_event_count(): return -2 if response.status_code != 200: - logger.error("get_event_count ", response.status_code, "code") + logger.error("get_event_count status code: %s", response.status_code) return -3 try: @@ -111,30 +114,36 @@ def get_events_bulk(): event_records = [] - raw_url = path.join(BASE_URL, "v1/events&offset={0}&limit={1}") + raw_url = path.join(BASE_URL, "v1/events?offset={0}&limit={1}") offset = 0 limit = MAX_COUNT more_records = True + retries = 0 while more_records: + if retries > MAX_RETRY: + raise Exception("get_events_bulk failed, max retries reached") url = raw_url.format(offset, limit) try: response = requests.request("GET", url, headers=headers) except Exception as e: - logger.error("get_events failed with ", e) - return -2 + logger.error("get_events_buk failed with %s, retrying...", e) + retries += 1 + continue if response.status_code != 200: - logger.error("get_event_count ", response.status_code, "code") - return -3 + logger.error("get_events_bulk %s code, retrying...", response.status_code) + retries += 1 + continue try: decoded = json.loads(response.text) except json.decoder.JSONDecodeError as e: - logger.error("get_event_count JSON decode failed with", e) - return -4 + logger.error("get_events_bulk JSON decode failed with %s", e) + retries += 1 + continue if decoded["success"]: for evrec in decoded["events"]: @@ -143,13 +152,14 @@ def get_events_bulk(): more_records = decoded["has_more"] # if so, we'll make another pass offset += limit + retries = 0 if offset % 1000 == 0: logger.debug("Reading offset %s", str(offset)) if TEST_MODE and offset > 1000: - more_records=False # Break out early - + more_records=False # Break out early else: return -5 # AFAICT, this means URL was bad + time.sleep(0.2) return event_records diff --git a/src/server/api/API_ingest/updated_data.py b/src/server/api/API_ingest/updated_data.py index 7d669a7f..b362aea8 100644 --- a/src/server/api/API_ingest/updated_data.py +++ b/src/server/api/API_ingest/updated_data.py @@ -13,49 +13,41 @@ def get_updated_contact_data(): qry = """ -- Collect latest foster/volunteer dates select json_agg (upd) as "cd" from ( - select - sf.source_id as "Id" , -- long salesforce string - array_agg(sl.source_id) filter (where sl.source_id is not null) as "Person_Id__c", -- short PAWS-local shelterluv id + select + salesforce.source_id as "contactId", + shelterluv.person_ids as "personIds", case - when - (extract(epoch from now())::bigint - max(foster_out) < 365*86400) -- foster out in last year - or (extract(epoch from now())::bigint - max(foster_return) < 365*86400) -- foster return - then 'Active' - else 'Inactive' - end as "Foster_Activity__c", - max(foster_out) as "Foster_Start_Date__c", - max(foster_return) as "Foster_End_Date__c", - min(vol.first_date) "First_volunteer_date__c", - max(vol.last_date) "Last_volunteer_date__c", - sum(vol.hours) as "Total_volunteer_hours__c", - array_agg(vc.source_id::integer) filter(where vc.source_id is not null) as "Volgistics_Id__c" + when volgistics.last_shift_date > now() - interval '1 year' then 'Active' else 'InActive' + end as "volunteerStatus", + shelterluv.foster_start as "fosterStartDate", + null as "fosterEndDate", + shelterluv.latest_foster_event as "latestFosterEvent", + volgistics.first_volunteer_date as "firstVolunteerDate", + volgistics.last_shift_date as "lastShiftDate", + volgistics.total_hours as "totalVolunteerHours", + volgistics.volg_ids as "volgisticIds" from ( - select source_id, matching_id from pdp_contacts sf - where sf.source_type = 'salesforcecontacts' - ) sf - left join pdp_contacts sl on sl.matching_id = sf.matching_id and sl.source_type = 'shelterluvpeople' + select * from pdp_contacts pc where source_type = 'salesforcecontacts' + ) salesforce left join ( - select - person_id, - max(case when event_type=1 then time else null end) * 1000 adopt, - max(case when event_type=2 then time else null end) * 1000 foster_out, - -- max(case when event_type=3 then time else null end) rto, - max(case when event_type=5 then time else null end) * 1000 foster_return - from sl_animal_events - group by person_id - ) sle on sle.person_id::text = sl.source_id - left join pdp_contacts vc on vc.matching_id = sf.matching_id and vc.source_type = 'volgistics' + select matching_id, array_agg(distinct v."number"::int) volg_ids, sum(hours) total_hours, + min(from_date) first_volunteer_date, max(from_date) last_shift_date + from volgistics v + left join volgisticsshifts v2 on v2.volg_id::varchar = v.number + inner join pdp_contacts pc on pc.source_id = v2.volg_id::varchar and pc.source_type = 'volgistics' + group by matching_id + ) volgistics on volgistics.matching_id = salesforce.matching_id left join ( select - volg_id, - sum(hours) as hours, - extract(epoch from min(from_date)) * 1000 as first_date, - extract(epoch from max(from_date)) * 1000 as last_date - from volgisticsshifts - group by volg_id - ) vol on vol.volg_id::text = vc.source_id - where sl.matching_id is not null or vc.matching_id is not null - group by sf.source_id + matching_id, array_agg(distinct p.internal_id) as person_ids, + min(case when event_type in (2,5) then to_timestamp(time) else null end) foster_start, + max(case when event_type in (2,5) then to_timestamp(time) else null end) latest_foster_event + from shelterluvpeople p + left join sl_animal_events sae on sae.person_id::varchar = p.internal_id + inner join pdp_contacts pc on pc.source_id = p.internal_id + group by matching_id + ) shelterluv on shelterluv.matching_id = salesforce.matching_id + where volgistics.matching_id is not null or shelterluv.matching_id is not null ) upd; """ @@ -63,6 +55,5 @@ def get_updated_contact_data(): result = session.execute(qry) sfdata = result.fetchone()[0] if sfdata: - logger.debug(sfdata) logger.debug("Query for Salesforce update returned %d records", len(sfdata)) return sfdata \ No newline at end of file diff --git a/src/server/api/internal_api.py b/src/server/api/internal_api.py index adc40f8a..b62e2a9c 100644 --- a/src/server/api/internal_api.py +++ b/src/server/api/internal_api.py @@ -33,14 +33,9 @@ def user_test2(): @internal_api.route("/api/internal/ingestRawData", methods=["GET"]) def ingest_raw_data(): - try: - ingest_sources_from_api.start() - except Exception as e: - logger.error(e) - + ingest_sources_from_api.start() return jsonify({'outcome': 'OK'}), 200 - @internal_api.route("/api/internal/get_updated_data", methods=["GET"]) def get_contact_data(): logger.debug("Calling get_updated_contact_data()") @@ -49,7 +44,11 @@ def get_contact_data(): logger.debug("Returning %d contact records", len(contact_json)) else: logger.debug("No contact records found") - return jsonify({'outcome': 'OK'}), 200 + return jsonify({ + 'outcome': 'OK', + 'data': contact_json, + 'length': len(contact_json) if contact_json else 0 + }), 200 @internal_api.route("/api/internal/start_flow", methods=["GET"]) diff --git a/src/server/config.py b/src/server/config.py index 5783158d..2af0abb0 100644 --- a/src/server/config.py +++ b/src/server/config.py @@ -1,13 +1,12 @@ +import logging import os import sys -import sqlalchemy as db -import models -from constants import IS_LOCAL, BASE_PATH, RAW_DATA_PATH, OUTPUT_PATH, LOGS_PATH, REPORT_PATH, ZIPPED_FILES -import logging +import sqlalchemy as db import structlog from structlog.processors import CallsiteParameter +from constants import IS_LOCAL, BASE_PATH, RAW_DATA_PATH, OUTPUT_PATH, LOGS_PATH, REPORT_PATH, ZIPPED_FILES # structlog setup for complete app @@ -17,7 +16,7 @@ structlog.processors.add_log_level, structlog.processors.StackInfoRenderer(), structlog.dev.set_exc_info, - structlog.processors.TimeStamper(fmt=None, utc=True ), + structlog.processors.TimeStamper(fmt="iso", utc=True), structlog.processors.CallsiteParameterAdder( [ CallsiteParameter.FILENAME, @@ -67,7 +66,7 @@ + POSTGRES_DATABASE ) -engine = db.create_engine(DB) +engine = db.create_engine(DB, pool_pre_ping=True) # Run Alembic to create managed tables # from alembic.config import Config diff --git a/src/server/pub_sub/salesforce_message_publisher.py b/src/server/pub_sub/salesforce_message_publisher.py index 5d239090..6b23c4b5 100644 --- a/src/server/pub_sub/salesforce_message_publisher.py +++ b/src/server/pub_sub/salesforce_message_publisher.py @@ -60,7 +60,8 @@ def send_pipeline_update_messages(contacts_list): schema_id = stub.GetTopic(pb2.TopicRequest(topic_name=UPDATE_TOPIC), metadata=auth_meta_data).schema_id schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id), metadata=auth_meta_data).schema_json - payloads = [] + + batches = 0 while len(contacts_list) > 0: if len(contacts_list) > BATCH_SIZE: current_batch = contacts_list[:BATCH_SIZE] @@ -85,9 +86,11 @@ def send_pipeline_update_messages(contacts_list): "schema_id": schema_id, "payload": buf.getvalue() } - payloads.append(payload) - stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=payloads), metadata=auth_meta_data) + stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data) + logger.info('Sent %s contacts in message', len(current_batch)) + batches = batches + 1 + - logger.info("%s total pipeline update messages sent", len(payloads)) + logger.info('completed sending platform messages, %s messages sent', batches)