Skip to content

Commit

Permalink
pre commit fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Oct 28, 2024
1 parent e3a13dc commit c1bc034
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 53 deletions.
2 changes: 0 additions & 2 deletions tap_linkedin_ads/streams/ad_analytics/ad_analytics_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None:
Returns:
The resulting record dict, or `None` if the record should be excluded.
"""
# This function extracts day, month, and year from date range column
# These values are parsed with datetime function and the date is added to the day column
start_date = row.get("dateRange", {}).get("start", {})

if start_date:
Expand Down
29 changes: 18 additions & 11 deletions tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_campaign.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from importlib import resources

import pendulum
from singer_sdk.helpers.types import Context
from singer_sdk.typing import (
IntegerType,
ObjectType,
Expand All @@ -19,6 +18,9 @@
from tap_linkedin_ads.streams.ad_analytics.ad_analytics_base import AdAnalyticsBase
from tap_linkedin_ads.streams.streams import CampaignsStream

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context

SCHEMAS_DIR = resources.files(__package__) / "schemas"
UTC = timezone.utc

Expand Down Expand Up @@ -167,7 +169,7 @@ def get_url_params(
**super().get_url_params(context, next_page_token),
}

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict:
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -181,16 +183,21 @@ def get_unencoded_params(self, context: Context | None) -> dict:
return {
"pivot": "(value:CAMPAIGN)",
"timeGranularity": "(value:DAILY)",
"campaigns": f"List(urn%3Ali%3AsponsoredCampaign%3A{context['campaign_id']})",
"dateRange": f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))",
"campaigns": (
f"List(urn%3Ali%3AsponsoredCampaign%3A{context['campaign_id']})"
),
"dateRange": (
f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),"
f"end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))"
),
"fields": self.adanalyticscolumns[0],
}


class _AdAnalyticsByCampaignSecond(_AdAnalyticsByCampaignInit):
name = "adanalyticsbycampaign_second"

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict:
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -209,7 +216,7 @@ def get_unencoded_params(self, context: Context | None) -> dict:
class _AdAnalyticsByCampaignThird(_AdAnalyticsByCampaignInit):
name = "adanalyticsbycampaign_third"

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict:
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -230,7 +237,7 @@ class AdAnalyticsByCampaignStream(_AdAnalyticsByCampaignInit):

name = "ad_analytics_by_campaign"

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict:
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -248,13 +255,13 @@ def get_unencoded_params(self, context: Context | None) -> dict:
def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
"""Return a dictionary of records from adAnalytics classes.
Combines request columns from multiple calls to the api, which are limited to 20 columns
each.
Combines request columns from multiple calls to the api, which are limited to 20
columns each.
Uses `merge_dicts` to combine responses from each class
super().get_records calls only the records from the adAnalyticsByCampaign class
zip() Iterates over the records of adAnalytics classes and merges them with merge_dicts()
function list() converts each stream context into lists
zip() Iterates over the records of adAnalytics classes and merges them with
merge_dicts() function list() converts each stream context into lists
Args:
context: The stream context.
Expand Down
29 changes: 18 additions & 11 deletions tap_linkedin_ads/streams/ad_analytics/ad_analytics_by_creative.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from importlib import resources

import pendulum
from singer_sdk.helpers.types import Context
from singer_sdk.typing import (
IntegerType,
ObjectType,
Expand All @@ -19,6 +18,9 @@
from tap_linkedin_ads.streams.ad_analytics.ad_analytics_base import AdAnalyticsBase
from tap_linkedin_ads.streams.streams import CreativesStream

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context

SCHEMAS_DIR = resources.files(__package__) / "schemas"
UTC = timezone.utc

Expand Down Expand Up @@ -167,7 +169,7 @@ def get_url_params(
**super().get_url_params(context, next_page_token),
}

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict:
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -181,8 +183,13 @@ def get_unencoded_params(self, context: Context | None) -> dict:
return {
"pivot": "(value:CREATIVE)",
"timeGranularity": "(value:DAILY)",
"creatives": f"List(urn%3Ali%3AsponsoredCreative%3A{context['creative_id']})",
"dateRange": f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))",
"creatives": (
f"List(urn%3Ali%3AsponsoredCreative%3A{context['creative_id']})"
),
"dateRange": (
f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),"
f"end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))"
),
"fields": self.adanalyticscolumns[0],
}

Expand All @@ -197,7 +204,7 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None:
class _AdAnalyticsByCreativeSecond(_AdAnalyticsByCreativeInit):
name = "adanalyticsbycreative_second"

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict:
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -216,7 +223,7 @@ def get_unencoded_params(self, context: Context | None) -> dict:
class _AdAnalyticsByCreativeThird(_AdAnalyticsByCreativeInit):
name = "adanalyticsbycreative_third"

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict:
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -237,7 +244,7 @@ class AdAnalyticsByCreativeStream(_AdAnalyticsByCreativeInit):

name = "ad_analytics_by_creative"

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict:
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -255,13 +262,13 @@ def get_unencoded_params(self, context: Context | None) -> dict:
def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
"""Return a dictionary of records from adAnalytics classes.
Combines request columns from multiple calls to the api, which are limited to 20 columns
each.
Combines request columns from multiple calls to the api, which are limited to 20
columns each.
Uses `merge_dicts` to combine responses from each class
super().get_records calls only the records from adAnalyticsByCreative class
zip() Iterates over the records of adAnalytics classes and merges them with merge_dicts()
function list() converts each stream context into lists
zip() Iterates over the records of adAnalytics classes and merges them with
merge_dicts() function list() converts each stream context into lists
Args:
context: The stream context.
Expand Down
5 changes: 1 addition & 4 deletions tap_linkedin_ads/streams/base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ def get_url_params(
params: dict = {}
if next_page_token:
params["pageToken"] = next_page_token
# if self.replication_key:
# params["sort"] = "asc"
# params["order_by"] = self.replication_key
return params

def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
Expand All @@ -100,7 +97,7 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
"""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict: # noqa: ARG002
"""Return a dictionary of unencoded params.
Args:
Expand Down
61 changes: 40 additions & 21 deletions tap_linkedin_ads/streams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import datetime, timezone
from importlib import resources

from singer_sdk.helpers.types import Context
from singer_sdk.typing import (
ArrayType,
BooleanType,
Expand All @@ -20,18 +19,22 @@

from tap_linkedin_ads.streams.base_stream import LinkedInAdsStreamBase

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context

SCHEMAS_DIR = resources.files(__package__) / "schemas"
UTC = timezone.utc


class LinkedInAdsStream(LinkedInAdsStreamBase):
"""LinkedInAds stream class."""

replication_key = "last_modified_time"
# Note: manually filtering in post_process since the API doesnt have filter options
replication_method = "INCREMENTAL"

def post_process(self, row: dict, context: dict | None = None) -> dict | None:
# This function extracts day, month, and year from date range column
# These values are parse with datetime function and the date is added to the day column
"""Post-process each record returned by the API."""
if "changeAuditStamps" in row:
created_time = (
row.get("changeAuditStamps", {}).get("created", {}).get("time")
Expand All @@ -49,17 +52,16 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None:
).isoformat()
elif "createdAt" in row:
row["created_time"] = datetime.fromtimestamp(
int(row.get("createdAt")) / 1000,
int(row["createdAt"]) / 1000,
tz=UTC,
).isoformat()
row["last_modified_time"] = datetime.fromtimestamp(
int(row.get("lastModifiedAt")) / 1000,
int(row["lastModifiedAt"]) / 1000,
tz=UTC,
).isoformat()
else:
raise Exception(
"No changeAuditStamps or createdAt/lastModifiedAt fields found"
)
msg = "No changeAuditStamps or createdAt/lastModifiedAt fields found"
raise Exception(msg) # noqa: TRY002
# Manual date filtering
date = datetime.fromisoformat(row["last_modified_time"])
start_date = self.get_starting_timestamp(context)
Expand Down Expand Up @@ -129,7 +131,7 @@ class AccountsStream(LinkedInAdsStream):
),
).to_dict()

def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict:
def get_child_context(self, record: dict, context: dict | None) -> dict: # noqa: ARG002
"""Return a context dictionary for a child stream."""
return {
"account_id": record["id"],
Expand Down Expand Up @@ -214,7 +216,7 @@ def get_url_params(
**super().get_url_params(context, next_page_token),
}

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict:
"""Return a dictionary of unencoded params.
Args:
Expand Down Expand Up @@ -494,6 +496,9 @@ def get_url(self, context: dict | None) -> str:
Returns:
A URL, optionally targeted to a specific partition or context.
"""
if not context:
msg = "Context is required for this stream"
raise ValueError(msg)
return super().get_url(context) + f'/{context["account_id"]}/adCampaigns'

def get_url_params(
Expand All @@ -516,7 +521,7 @@ def get_url_params(
**super().get_url_params(context, next_page_token),
}

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict: # noqa: ARG002
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -526,19 +531,22 @@ def get_unencoded_params(self, context: Context | None) -> dict:
A dictionary of URL query parameters.
"""
return {
"search": "(status:(values:List(ACTIVE,PAUSED,ARCHIVED,COMPLETED,CANCELED,DRAFT,PENDING_DELETION,REMOVED)))"
"search": (
"(status:(values:List(ACTIVE,PAUSED,ARCHIVED,COMPLETED,"
"CANCELED,DRAFT,PENDING_DELETION,REMOVED)))"
)
}

def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict:
def get_child_context(self, record: dict, context: dict | None) -> dict: # noqa: ARG002
"""Return a context dictionary for a child stream."""
return {
"campaign_id": record["id"],
}

def post_process(self, row: dict, context: dict | None = None) -> dict | None:
schedule_column = row.get("runSchedule").get("start")
"""Post-process each record returned by the API."""
row["run_schedule_start"] = datetime.fromtimestamp( # noqa: DTZ006
int(schedule_column) / 1000,
int(row["runSchedule"]["start"]) / 1000,
).isoformat()
row["campaign_group_id"] = int(row["campaignGroup"].split(":")[3])
return super().post_process(row, context)
Expand Down Expand Up @@ -608,6 +616,9 @@ def get_url(self, context: dict | None) -> str:
Returns:
A URL, optionally targeted to a specific partition or context.
"""
if not context:
msg = "Context is required for this stream"
raise ValueError(msg)
return super().get_url(context) + f'/{context["account_id"]}/adCampaignGroups'

def get_url_params(
Expand All @@ -630,7 +641,7 @@ def get_url_params(
**super().get_url_params(context, next_page_token),
}

def get_unencoded_params(self, context: Context | None) -> dict:
def get_unencoded_params(self, context: Context) -> dict: # noqa: ARG002
"""Return a dictionary of unencoded params.
Args:
Expand All @@ -640,13 +651,16 @@ def get_unencoded_params(self, context: Context | None) -> dict:
A dictionary of URL query parameters.
"""
return {
"search": "(status:(values:List(ACTIVE,ARCHIVED,CANCELED,DRAFT,PAUSED,PENDING_DELETION,REMOVED)))"
"search": (
"(status:(values:List(ACTIVE,ARCHIVED,CANCELED,DRAFT,PAUSED,"
"PENDING_DELETION,REMOVED)))"
)
}

def post_process(self, row: dict, context: dict | None = None) -> dict | None:
schedule_column = row.get("runSchedule").get("start")
"""Post-process each record returned by the API."""
row["run_schedule_start"] = datetime.fromtimestamp( # noqa: DTZ006
int(schedule_column) / 1000,
int(row["runSchedule"]["start"]) / 1000,
).isoformat()
return super().post_process(row, context)

Expand Down Expand Up @@ -703,7 +717,9 @@ def get_url(self, context: dict | None) -> str:
Returns:
A URL, optionally targeted to a specific partition or context.
"""
# TODO: optional filter 'urn%3Ali%3AsponsoredCreative%3A{self.config["creative"]}'
if not context:
msg = "Context is required for this stream"
raise ValueError(msg)
return super().get_url(context) + f'/{context["account_id"]}/creatives'

def get_url_params(
Expand All @@ -725,7 +741,7 @@ def get_url_params(
**super().get_url_params(context, next_page_token),
}

def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict:
def get_child_context(self, record: dict, context: dict | None) -> dict: # noqa: ARG002
"""Return a context dictionary for a child stream."""
creative_id = record["id"].split(":")[-1]
return {
Expand Down Expand Up @@ -790,6 +806,9 @@ def get_url_params(
Returns:
A dictionary of URL query parameters.
"""
if not context:
msg = "Context is required for this stream"
raise ValueError(msg)
return {
"q": "account",
"account": f"urn:li:sponsoredAccount:{context['account_id']}",
Expand Down
Loading

0 comments on commit c1bc034

Please sign in to comment.