From c1bc03440a88c8299003effc5dee682aeed14dac Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Mon, 28 Oct 2024 14:35:12 -0400 Subject: [PATCH] pre commit fixes --- .../streams/ad_analytics/ad_analytics_base.py | 2 - .../ad_analytics/ad_analytics_by_campaign.py | 29 +++++---- .../ad_analytics/ad_analytics_by_creative.py | 29 +++++---- tap_linkedin_ads/streams/base_stream.py | 5 +- tap_linkedin_ads/streams/streams.py | 61 ++++++++++++------- tests/test_core.py | 4 -- 6 files changed, 77 insertions(+), 53 deletions(-) diff --git a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py index 03b45a4..bc736b8 100644 --- a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py +++ b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py @@ -31,8 +31,6 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None: Returns: The resulting record dict, or `None` if the record should be excluded. """ - # This function extracts day, month, and year from date range column - # These values are parsed with datetime function and the date is added to the day column start_date = row.get("dateRange", {}).get("start", {}) if start_date: diff --git a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_campaign.py b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_campaign.py index b0ec7bb..63161eb 100644 --- a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_campaign.py +++ b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_campaign.py @@ -7,7 +7,6 @@ from importlib import resources import pendulum -from singer_sdk.helpers.types import Context from singer_sdk.typing import ( IntegerType, ObjectType, @@ -19,6 +18,9 @@ from tap_linkedin_ads.streams.ad_analytics.ad_analytics_base import AdAnalyticsBase from tap_linkedin_ads.streams.streams import CampaignsStream +if t.TYPE_CHECKING: + from singer_sdk.helpers.types import Context + SCHEMAS_DIR = resources.files(__package__) / "schemas" UTC = timezone.utc @@ -167,7 +169,7 @@ def get_url_params( **super().get_url_params(context, next_page_token), } - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: """Return a dictionary of unencoded params. Args: @@ -181,8 +183,13 @@ def get_unencoded_params(self, context: Context | None) -> dict: return { "pivot": "(value:CAMPAIGN)", "timeGranularity": "(value:DAILY)", - "campaigns": f"List(urn%3Ali%3AsponsoredCampaign%3A{context['campaign_id']})", - "dateRange": f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))", + "campaigns": ( + f"List(urn%3Ali%3AsponsoredCampaign%3A{context['campaign_id']})" + ), + "dateRange": ( + f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day})," + f"end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))" + ), "fields": self.adanalyticscolumns[0], } @@ -190,7 +197,7 @@ def get_unencoded_params(self, context: Context | None) -> dict: class _AdAnalyticsByCampaignSecond(_AdAnalyticsByCampaignInit): name = "adanalyticsbycampaign_second" - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: """Return a dictionary of unencoded params. Args: @@ -209,7 +216,7 @@ def get_unencoded_params(self, context: Context | None) -> dict: class _AdAnalyticsByCampaignThird(_AdAnalyticsByCampaignInit): name = "adanalyticsbycampaign_third" - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: """Return a dictionary of unencoded params. Args: @@ -230,7 +237,7 @@ class AdAnalyticsByCampaignStream(_AdAnalyticsByCampaignInit): name = "ad_analytics_by_campaign" - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: """Return a dictionary of unencoded params. Args: @@ -248,13 +255,13 @@ def get_unencoded_params(self, context: Context | None) -> dict: def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: """Return a dictionary of records from adAnalytics classes. - Combines request columns from multiple calls to the api, which are limited to 20 columns - each. + Combines request columns from multiple calls to the api, which are limited to 20 + columns each. Uses `merge_dicts` to combine responses from each class super().get_records calls only the records from the adAnalyticsByCampaign class - zip() Iterates over the records of adAnalytics classes and merges them with merge_dicts() - function list() converts each stream context into lists + zip() Iterates over the records of adAnalytics classes and merges them with + merge_dicts() function list() converts each stream context into lists Args: context: The stream context. diff --git a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_creative.py b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_creative.py index 40e3e76..e1b82b8 100644 --- a/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_creative.py +++ b/tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_creative.py @@ -7,7 +7,6 @@ from importlib import resources import pendulum -from singer_sdk.helpers.types import Context from singer_sdk.typing import ( IntegerType, ObjectType, @@ -19,6 +18,9 @@ from tap_linkedin_ads.streams.ad_analytics.ad_analytics_base import AdAnalyticsBase from tap_linkedin_ads.streams.streams import CreativesStream +if t.TYPE_CHECKING: + from singer_sdk.helpers.types import Context + SCHEMAS_DIR = resources.files(__package__) / "schemas" UTC = timezone.utc @@ -167,7 +169,7 @@ def get_url_params( **super().get_url_params(context, next_page_token), } - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: """Return a dictionary of unencoded params. Args: @@ -181,8 +183,13 @@ def get_unencoded_params(self, context: Context | None) -> dict: return { "pivot": "(value:CREATIVE)", "timeGranularity": "(value:DAILY)", - "creatives": f"List(urn%3Ali%3AsponsoredCreative%3A{context['creative_id']})", - "dateRange": f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))", + "creatives": ( + f"List(urn%3Ali%3AsponsoredCreative%3A{context['creative_id']})" + ), + "dateRange": ( + f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day})," + f"end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))" + ), "fields": self.adanalyticscolumns[0], } @@ -197,7 +204,7 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None: class _AdAnalyticsByCreativeSecond(_AdAnalyticsByCreativeInit): name = "adanalyticsbycreative_second" - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: """Return a dictionary of unencoded params. Args: @@ -216,7 +223,7 @@ def get_unencoded_params(self, context: Context | None) -> dict: class _AdAnalyticsByCreativeThird(_AdAnalyticsByCreativeInit): name = "adanalyticsbycreative_third" - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: """Return a dictionary of unencoded params. Args: @@ -237,7 +244,7 @@ class AdAnalyticsByCreativeStream(_AdAnalyticsByCreativeInit): name = "ad_analytics_by_creative" - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: """Return a dictionary of unencoded params. Args: @@ -255,13 +262,13 @@ def get_unencoded_params(self, context: Context | None) -> dict: def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: """Return a dictionary of records from adAnalytics classes. - Combines request columns from multiple calls to the api, which are limited to 20 columns - each. + Combines request columns from multiple calls to the api, which are limited to 20 + columns each. Uses `merge_dicts` to combine responses from each class super().get_records calls only the records from adAnalyticsByCreative class - zip() Iterates over the records of adAnalytics classes and merges them with merge_dicts() - function list() converts each stream context into lists + zip() Iterates over the records of adAnalytics classes and merges them with + merge_dicts() function list() converts each stream context into lists Args: context: The stream context. diff --git a/tap_linkedin_ads/streams/base_stream.py b/tap_linkedin_ads/streams/base_stream.py index bdd650d..49921e8 100644 --- a/tap_linkedin_ads/streams/base_stream.py +++ b/tap_linkedin_ads/streams/base_stream.py @@ -84,9 +84,6 @@ def get_url_params( params: dict = {} if next_page_token: params["pageToken"] = next_page_token - # if self.replication_key: - # params["sort"] = "asc" - # params["order_by"] = self.replication_key return params def parse_response(self, response: requests.Response) -> t.Iterable[dict]: @@ -100,7 +97,7 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]: """ yield from extract_jsonpath(self.records_jsonpath, input=response.json()) - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: # noqa: ARG002 """Return a dictionary of unencoded params. Args: diff --git a/tap_linkedin_ads/streams/streams.py b/tap_linkedin_ads/streams/streams.py index 6e631d0..eb18d28 100644 --- a/tap_linkedin_ads/streams/streams.py +++ b/tap_linkedin_ads/streams/streams.py @@ -6,7 +6,6 @@ from datetime import datetime, timezone from importlib import resources -from singer_sdk.helpers.types import Context from singer_sdk.typing import ( ArrayType, BooleanType, @@ -20,18 +19,22 @@ from tap_linkedin_ads.streams.base_stream import LinkedInAdsStreamBase +if t.TYPE_CHECKING: + from singer_sdk.helpers.types import Context + SCHEMAS_DIR = resources.files(__package__) / "schemas" UTC = timezone.utc class LinkedInAdsStream(LinkedInAdsStreamBase): + """LinkedInAds stream class.""" + replication_key = "last_modified_time" # Note: manually filtering in post_process since the API doesnt have filter options replication_method = "INCREMENTAL" def post_process(self, row: dict, context: dict | None = None) -> dict | None: - # This function extracts day, month, and year from date range column - # These values are parse with datetime function and the date is added to the day column + """Post-process each record returned by the API.""" if "changeAuditStamps" in row: created_time = ( row.get("changeAuditStamps", {}).get("created", {}).get("time") @@ -49,17 +52,16 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None: ).isoformat() elif "createdAt" in row: row["created_time"] = datetime.fromtimestamp( - int(row.get("createdAt")) / 1000, + int(row["createdAt"]) / 1000, tz=UTC, ).isoformat() row["last_modified_time"] = datetime.fromtimestamp( - int(row.get("lastModifiedAt")) / 1000, + int(row["lastModifiedAt"]) / 1000, tz=UTC, ).isoformat() else: - raise Exception( - "No changeAuditStamps or createdAt/lastModifiedAt fields found" - ) + msg = "No changeAuditStamps or createdAt/lastModifiedAt fields found" + raise Exception(msg) # noqa: TRY002 # Manual date filtering date = datetime.fromisoformat(row["last_modified_time"]) start_date = self.get_starting_timestamp(context) @@ -129,7 +131,7 @@ class AccountsStream(LinkedInAdsStream): ), ).to_dict() - def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: + def get_child_context(self, record: dict, context: dict | None) -> dict: # noqa: ARG002 """Return a context dictionary for a child stream.""" return { "account_id": record["id"], @@ -214,7 +216,7 @@ def get_url_params( **super().get_url_params(context, next_page_token), } - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: """Return a dictionary of unencoded params. Args: @@ -494,6 +496,9 @@ def get_url(self, context: dict | None) -> str: Returns: A URL, optionally targeted to a specific partition or context. """ + if not context: + msg = "Context is required for this stream" + raise ValueError(msg) return super().get_url(context) + f'/{context["account_id"]}/adCampaigns' def get_url_params( @@ -516,7 +521,7 @@ def get_url_params( **super().get_url_params(context, next_page_token), } - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: # noqa: ARG002 """Return a dictionary of unencoded params. Args: @@ -526,19 +531,22 @@ def get_unencoded_params(self, context: Context | None) -> dict: A dictionary of URL query parameters. """ return { - "search": "(status:(values:List(ACTIVE,PAUSED,ARCHIVED,COMPLETED,CANCELED,DRAFT,PENDING_DELETION,REMOVED)))" + "search": ( + "(status:(values:List(ACTIVE,PAUSED,ARCHIVED,COMPLETED," + "CANCELED,DRAFT,PENDING_DELETION,REMOVED)))" + ) } - def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: + def get_child_context(self, record: dict, context: dict | None) -> dict: # noqa: ARG002 """Return a context dictionary for a child stream.""" return { "campaign_id": record["id"], } def post_process(self, row: dict, context: dict | None = None) -> dict | None: - schedule_column = row.get("runSchedule").get("start") + """Post-process each record returned by the API.""" row["run_schedule_start"] = datetime.fromtimestamp( # noqa: DTZ006 - int(schedule_column) / 1000, + int(row["runSchedule"]["start"]) / 1000, ).isoformat() row["campaign_group_id"] = int(row["campaignGroup"].split(":")[3]) return super().post_process(row, context) @@ -608,6 +616,9 @@ def get_url(self, context: dict | None) -> str: Returns: A URL, optionally targeted to a specific partition or context. """ + if not context: + msg = "Context is required for this stream" + raise ValueError(msg) return super().get_url(context) + f'/{context["account_id"]}/adCampaignGroups' def get_url_params( @@ -630,7 +641,7 @@ def get_url_params( **super().get_url_params(context, next_page_token), } - def get_unencoded_params(self, context: Context | None) -> dict: + def get_unencoded_params(self, context: Context) -> dict: # noqa: ARG002 """Return a dictionary of unencoded params. Args: @@ -640,13 +651,16 @@ def get_unencoded_params(self, context: Context | None) -> dict: A dictionary of URL query parameters. """ return { - "search": "(status:(values:List(ACTIVE,ARCHIVED,CANCELED,DRAFT,PAUSED,PENDING_DELETION,REMOVED)))" + "search": ( + "(status:(values:List(ACTIVE,ARCHIVED,CANCELED,DRAFT,PAUSED," + "PENDING_DELETION,REMOVED)))" + ) } def post_process(self, row: dict, context: dict | None = None) -> dict | None: - schedule_column = row.get("runSchedule").get("start") + """Post-process each record returned by the API.""" row["run_schedule_start"] = datetime.fromtimestamp( # noqa: DTZ006 - int(schedule_column) / 1000, + int(row["runSchedule"]["start"]) / 1000, ).isoformat() return super().post_process(row, context) @@ -703,7 +717,9 @@ def get_url(self, context: dict | None) -> str: Returns: A URL, optionally targeted to a specific partition or context. """ - # TODO: optional filter 'urn%3Ali%3AsponsoredCreative%3A{self.config["creative"]}' + if not context: + msg = "Context is required for this stream" + raise ValueError(msg) return super().get_url(context) + f'/{context["account_id"]}/creatives' def get_url_params( @@ -725,7 +741,7 @@ def get_url_params( **super().get_url_params(context, next_page_token), } - def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: + def get_child_context(self, record: dict, context: dict | None) -> dict: # noqa: ARG002 """Return a context dictionary for a child stream.""" creative_id = record["id"].split(":")[-1] return { @@ -790,6 +806,9 @@ def get_url_params( Returns: A dictionary of URL query parameters. """ + if not context: + msg = "Context is required for this stream" + raise ValueError(msg) return { "q": "account", "account": f"urn:li:sponsoredAccount:{context['account_id']}", diff --git a/tests/test_core.py b/tests/test_core.py index b212a19..fd4b209 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -8,7 +8,6 @@ SAMPLE_CONFIG = { "start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d"), - # TODO: Initialize minimal tap config } @@ -17,6 +16,3 @@ tap_class=TapLinkedInAds, config=SAMPLE_CONFIG, ) - - -# TODO: Create additional tests as appropriate for your tap.