Skip to content

Commit

Permalink
use incremental streams and bookmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Oct 25, 2024
1 parent 639ccfe commit 38cd018
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
2 changes: 2 additions & 0 deletions tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class AdAnalyticsBase(LinkedInAdsStreamBase):
"""LinkedInAds stream class for ad analytics."""

path = "/adAnalytics"
replication_key = "day"
replication_method = "INCREMENTAL"

substreams: t.ClassVar[list] = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def get_unencoded_params(self, context: Context | None) -> dict:
Returns:
A dictionary of URL query parameters.
"""
start_date = pendulum.parse(self.config["start_date"])
start_date = self.get_starting_timestamp(context)
end_date = pendulum.parse(self.config["end_date"])
return {
"pivot": "(value:CAMPAIGN)",
Expand Down
6 changes: 5 additions & 1 deletion tap_linkedin_ads/streams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@


class LinkedInAdsStream(LinkedInAdsStreamBase):
replication_key = "last_modified_time"
# Note: manually filtering in post_process since the API doesnt have filter options
replication_method = "INCREMENTAL"

def post_process(self, row: dict, context: dict | None = None) -> dict | None:
# This function extracts day, month, and year from date range column
# These values are parse with datetime function and the date is added to the day column
Expand Down Expand Up @@ -55,7 +59,7 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None:
raise Exception("No changeAuditStamps or createdAt/lastModifiedAt fields found")
# Manual date filtering
date = datetime.fromisoformat(row["last_modified_time"])
start_date = datetime.fromisoformat(self.config["start_date"]).replace(tzinfo=timezone.utc)
start_date = datetime.fromisoformat(self.get_starting_timestamp(context)).replace(tzinfo=timezone.utc)
end_date = datetime.fromisoformat(self.config["end_date"]).replace(tzinfo=timezone.utc)
if date >= start_date and date <= end_date:
return super().post_process(row, context)
Expand Down

0 comments on commit 38cd018

Please sign in to comment.