From 38cd018d543e96b4f7c62fb6eb03b83182f10879 Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Fri, 25 Oct 2024 16:12:49 -0400 Subject: [PATCH] use incremental streams and bookmarks --- tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py | 2 ++ .../streams/ad_analytics/ad_analytics_by_campaign.py | 2 +- tap_linkedin_ads/streams/streams.py | 6 +++++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py index ecd761f..8e2cbf9 100644 --- a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py +++ b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py @@ -16,6 +16,8 @@ class AdAnalyticsBase(LinkedInAdsStreamBase): """LinkedInAds stream class for ad analytics.""" path = "/adAnalytics" + replication_key = "day" + replication_method = "INCREMENTAL" substreams: t.ClassVar[list] = [] diff --git a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_campaign.py b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_campaign.py index a95c03a..b0ec7bb 100644 --- a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_campaign.py +++ b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_campaign.py @@ -176,7 +176,7 @@ def get_unencoded_params(self, context: Context | None) -> dict: Returns: A dictionary of URL query parameters. """ - start_date = pendulum.parse(self.config["start_date"]) + start_date = self.get_starting_timestamp(context) end_date = pendulum.parse(self.config["end_date"]) return { "pivot": "(value:CAMPAIGN)", diff --git a/tap_linkedin_ads/streams/streams.py b/tap_linkedin_ads/streams/streams.py index 5d768b9..e8f0548 100644 --- a/tap_linkedin_ads/streams/streams.py +++ b/tap_linkedin_ads/streams/streams.py @@ -26,6 +26,10 @@ class LinkedInAdsStream(LinkedInAdsStreamBase): + replication_key = "last_modified_time" + # Note: manually filtering in post_process since the API doesnt have filter options + replication_method = "INCREMENTAL" + def post_process(self, row: dict, context: dict | None = None) -> dict | None: # This function extracts day, month, and year from date range column # These values are parse with datetime function and the date is added to the day column @@ -55,7 +59,7 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None: raise Exception("No changeAuditStamps or createdAt/lastModifiedAt fields found") # Manual date filtering date = datetime.fromisoformat(row["last_modified_time"]) - start_date = datetime.fromisoformat(self.config["start_date"]).replace(tzinfo=timezone.utc) + start_date = datetime.fromisoformat(self.get_starting_timestamp(context)).replace(tzinfo=timezone.utc) end_date = datetime.fromisoformat(self.config["end_date"]).replace(tzinfo=timezone.utc) if date >= start_date and date <= end_date: return super().post_process(row, context)