Skip to content

Commit

Permalink
Working account level analytics
Browse files Browse the repository at this point in the history
  • Loading branch information
NiallRees committed Dec 1, 2021
1 parent 38ee5f0 commit 74ca38d
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 56 deletions.
26 changes: 20 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ license = "Apache 2.0"
[tool.poetry.dependencies]
python = "<3.10,>=3.6.2"
requests = "^2.25.1"
singer-sdk = "^0.3.10"
singer-sdk = "^0.3.14"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand Down
Empty file modified tap-pinterest-ads.sh
100644 → 100755
Empty file.
41 changes: 40 additions & 1 deletion tap_pinterest_ads/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""REST client handling, including PinterestStream base class."""
from typing import Any, Dict, Optional, Iterable, Callable

import backoff
import requests
from typing import Any, Dict, Optional, Iterable

from memoization import cached
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import RESTStream
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError

from tap_pinterest_ads.auth import PinterestAuthenticator

Expand Down Expand Up @@ -61,3 +64,39 @@ def get_url_params(
def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

def validate_response(self, response: requests.Response) -> None:
if response.status_code == 429 or 500 <= response.status_code < 600:
msg = (
f"{response.status_code} Server Error: "
f"{response.reason} for path: {self.path}"
f"\n{response.text}"
)
raise RetriableAPIError(msg)
elif 400 <= response.status_code < 500:
msg = (
f"{response.status_code} Client Error: "
f"{response.reason} for path: {self.path}"
f"\n{response.text}"
)
raise FatalAPIError(msg)

def request_decorator(self, func: Callable) -> Callable:
"""Instantiate a decorator for handling request failures.
Developers may override this method to provide custom backoff or retry
handling.
Args:
func: Function to decorate.
Returns:
A decorated method.
"""
decorator: Callable = backoff.on_exception(
backoff.expo,
(RetriableAPIError,),
max_tries=5,
factor=5,
)(func)
return decorator
114 changes: 69 additions & 45 deletions tap_pinterest_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import copy
import datetime

import backoff
import requests
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from typing import Any, Callable, Optional

from tap_pinterest_ads.client import PinterestStream
Expand Down Expand Up @@ -183,7 +181,7 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
}


ANALYTICS = [
AD_ANALYTICS_COLUMNS = [
"SPEND_IN_DOLLAR", "ECPC_IN_DOLLAR", "CTR", "ECTR", "ECPE_IN_DOLLAR",
"ENGAGEMENT_RATE", "EENGAGEMENT_RATE", "REPIN_RATE", "CTR_2", "CAMPAIGN_ID",
"AD_ACCOUNT_ID", "AD_GROUP_ID", "CAMPAIGN_ENTITY_STATUS",
Expand Down Expand Up @@ -216,8 +214,7 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"TOTAL_WEB_VIEW_CHECKOUT", "TOTAL_WEB_VIEW_CHECKOUT_VALUE_IN_MICRO_DOLLAR"
]


class AdAnalyticStream(PinterestStream):
class AdAnalyticsStream(PinterestStream):
name = 'ad_analytics'
parent_stream_type = AdStream
path = "ad_accounts/{ad_account_id}/ads/analytics?ad_ids={ad_id}"
Expand All @@ -229,7 +226,7 @@ class AdAnalyticStream(PinterestStream):
Property("AD_ID", StringType),
Property("DATE", DateTimeType),
]
properties += [Property(a, NumberType) for a in ANALYTICS]
properties += [Property(a, NumberType) for a in AD_ANALYTICS_COLUMNS]
schema = PropertiesList(*properties).to_dict()

def get_url_params(
Expand All @@ -243,51 +240,78 @@ def get_url_params(
min(start_date + datetime.timedelta(days=100), yesterday)
).strftime('%Y-%m-%d'),
'granularity': 'DAY',
'columns': ','.join(ANALYTICS),
'columns': ','.join(AD_ANALYTICS_COLUMNS),
'page_size': 100,
}
if next_page_token:
params['bookmark'] = next_page_token
self.logger.debug(params)
return params

def validate_response(self, response: requests.Response) -> None:
if response.status_code == 429:
msg = (
f"{response.status_code} Server Error: "
f"{response.reason} for path: {self.path}"
)
raise RetriableAPIError(msg)
elif 400 <= response.status_code < 500:
msg = (
f"{response.status_code} Client Error: "
f"{response.reason} for path: {self.path}"
)
raise FatalAPIError(msg)

elif 500 <= response.status_code < 600:
msg = (
f"{response.status_code} Server Error: "
f"{response.reason} for path: {self.path}"
)
raise RetriableAPIError(msg)

def request_decorator(self, func: Callable) -> Callable:
"""Instantiate a decorator for handling request failures.
Developers may override this method to provide custom backoff or retry
handling.
ACCOUNT_ANALYTICS_COLUMNS = [
"SPEND_IN_DOLLAR", "ECPC_IN_DOLLAR", "CTR", "ECTR", "ECPE_IN_DOLLAR",
"ENGAGEMENT_RATE", "EENGAGEMENT_RATE", "REPIN_RATE", "CTR_2", "CAMPAIGN_ID",
"AD_GROUP_ID", "CAMPAIGN_ENTITY_STATUS",
"CPM_IN_DOLLAR", "AD_GROUP_ENTITY_STATUS", "TOTAL_CLICKTHROUGH",
"TOTAL_IMPRESSION_FREQUENCY", "TOTAL_ENGAGEMENT_SIGNUP",
"TOTAL_ENGAGEMENT_CHECKOUT", "TOTAL_CLICK_SIGNUP", "TOTAL_CLICK_CHECKOUT",
"TOTAL_VIEW_SIGNUP", "TOTAL_VIEW_CHECKOUT", "TOTAL_CONVERSIONS",
"TOTAL_ENGAGEMENT_SIGNUP_VALUE_IN_MICRO_DOLLAR",
"TOTAL_ENGAGEMENT_CHECKOUT_VALUE_IN_MICRO_DOLLAR",
"TOTAL_CLICK_SIGNUP_VALUE_IN_MICRO_DOLLAR",
"TOTAL_CLICK_CHECKOUT_VALUE_IN_MICRO_DOLLAR",
"TOTAL_VIEW_SIGNUP_VALUE_IN_MICRO_DOLLAR",
"TOTAL_VIEW_CHECKOUT_VALUE_IN_MICRO_DOLLAR", "TOTAL_PAGE_VISIT",
"TOTAL_SIGNUP", "TOTAL_CHECKOUT", "TOTAL_SIGNUP_VALUE_IN_MICRO_DOLLAR",
"TOTAL_CHECKOUT_VALUE_IN_MICRO_DOLLAR", "PAGE_VISIT_COST_PER_ACTION",
"PAGE_VISIT_ROAS", "CHECKOUT_ROAS", "VIDEO_3SEC_VIEWS_2",
"VIDEO_P100_COMPLETE_2", "VIDEO_P0_COMBINED_2", "VIDEO_P25_COMBINED_2",
"VIDEO_P50_COMBINED_2", "VIDEO_P75_COMBINED_2", "VIDEO_P95_COMBINED_2",
"VIDEO_MRC_VIEWS_2", "ECPV_IN_DOLLAR", "ECPCV_IN_DOLLAR",
"ECPCV_P95_IN_DOLLAR", "TOTAL_VIDEO_3SEC_VIEWS", "TOTAL_VIDEO_P100_COMPLETE",
"TOTAL_VIDEO_P0_COMBINED", "TOTAL_VIDEO_P25_COMBINED",
"TOTAL_VIDEO_P50_COMBINED", "TOTAL_VIDEO_P75_COMBINED",
"TOTAL_VIDEO_P95_COMBINED", "TOTAL_VIDEO_MRC_VIEWS",
"TOTAL_VIDEO_AVG_WATCHTIME_IN_SECOND",
"TOTAL_REPIN_RATE", "WEB_CHECKOUT_COST_PER_ACTION", "WEB_CHECKOUT_ROAS",
"TOTAL_WEB_CHECKOUT", "TOTAL_WEB_CHECKOUT_VALUE_IN_MICRO_DOLLAR",
"TOTAL_WEB_CLICK_CHECKOUT", "TOTAL_WEB_CLICK_CHECKOUT_VALUE_IN_MICRO_DOLLAR",
"TOTAL_WEB_ENGAGEMENT_CHECKOUT",
"TOTAL_WEB_ENGAGEMENT_CHECKOUT_VALUE_IN_MICRO_DOLLAR",
"TOTAL_WEB_VIEW_CHECKOUT", "TOTAL_WEB_VIEW_CHECKOUT_VALUE_IN_MICRO_DOLLAR"
]

Args:
func: Function to decorate.
class AccountAnalyticsStream(PinterestStream):
name = 'account_analytics'
parent_stream_type = AdAccountStream
path = "ad_accounts/{ad_account_id}/analytics"
records_jsonpath = "$[*]"
ignore_parent_replication_keys = True
primary_keys = ["DATE"]
replication_key = "DATE"
properties = [
Property("AD_ACCOUNT_ID", StringType),
Property("AD_ID", StringType),
Property("DATE", DateTimeType),
]
properties += [Property(a, NumberType) for a in ACCOUNT_ANALYTICS_COLUMNS]
schema = PropertiesList(*properties).to_dict()

Returns:
A decorated method.
"""
decorator: Callable = backoff.on_exception(
backoff.expo,
(RetriableAPIError,),
max_tries=5,
factor=5,
)(func)
return decorator
def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Optional[dict]:
start_date = self.get_starting_timestamp(context)
yesterday = datetime.datetime.now(tz=start_date.tzinfo) - datetime.timedelta(days=1)
params = {
'start_date': start_date.strftime('%Y-%m-%d'),
'end_date': (
min(start_date + datetime.timedelta(days=100), yesterday)
).strftime('%Y-%m-%d'),
'granularity': 'DAY',
'columns': ','.join(ACCOUNT_ANALYTICS_COLUMNS),
'page_size': 100,
}
if next_page_token:
params['bookmark'] = next_page_token
self.logger.debug(params)
return params
8 changes: 5 additions & 3 deletions tap_pinterest_ads/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
CampaignStream,
AdGroupStream,
AdStream,
AdAnalyticStream
AdAnalyticsStream,
AccountAnalyticsStream
)
STREAM_TYPES = [
AdAccountStream,
CampaignStream,
AdGroupStream,
AdStream,
AdAnalyticStream
AdAnalyticsStream,
AccountAnalyticsStream
]

CONFIG = th.PropertiesList(
Expand All @@ -42,7 +44,7 @@
th.Property(
"start_date",
th.DateTimeType,
required=True,
default="2019-10-17T00:00:00Z",
description="Date to start collection analytics from"
),
).to_dict()
Expand Down

0 comments on commit 74ca38d

Please sign in to comment.