diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 976d5e0..1640da2 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -637,6 +637,12 @@ def get_url_params( **super().get_url_params(context, next_page_token), } + def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict: + """Return a context dictionary for a child stream.""" + return { + "creative_urn": record["id"], + } + class VideoAdsStream(LinkedInAdsStream): """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/advertising-targeting/create-and-manage-video#finders.""" @@ -977,3 +983,268 @@ def merge_dicts(self, *dict_args: dict) -> dict: for dictionary in dict_args: result.update(dictionary) return result + + +class AdAnalyticsByCreativeInit(LinkedInAdsStream): + """https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/ads-reporting#analytics-finder.""" + + name = "AdAnalyticsByCreativeInit" + path = "/adAnalytics" + parent_stream_type = CreativesStream + + schema = PropertiesList( + Property("landingPageClicks", IntegerType), + Property("reactions", IntegerType), + Property("adUnitClicks", IntegerType), + Property("creative_id", StringType), + Property("documentCompletions", IntegerType), + Property("documentFirstQuartileCompletions", IntegerType), + Property("clicks", IntegerType), + Property("documentMidpointCompletions", IntegerType), + Property("documentThirdQuartileCompletions", IntegerType), + Property("downloadClicks", IntegerType), + Property("jobApplications", StringType), + Property("jobApplyClicks", StringType), + Property("postViewJobApplications", StringType), + Property("costInUsd", StringType), + Property("postViewRegistrations", StringType), + Property("registrations", StringType), + Property("talentLeads", IntegerType), + Property("viralDocumentCompletions", IntegerType), + Property("viralDocumentFirstQuartileCompletions", IntegerType), + Property("viralDocumentMidpointCompletions", IntegerType), + Property("viralDocumentThirdQuartileCompletions", IntegerType), + Property("viralDownloadClicks", IntegerType), + Property("viralJobApplications", StringType), + Property("viralJobApplyClicks", StringType), + Property("costInLocalCurrency", StringType), + Property("viralRegistrations", IntegerType), + Property("approximateUniqueImpressions", IntegerType), + Property("cardClicks", IntegerType), + Property("cardImpressions", IntegerType), + Property("commentLikes", IntegerType), + Property("viralCardClicks", IntegerType), + Property("viralCardImpressions", IntegerType), + Property("viralCommentLikes", IntegerType), + Property("actionClicks", IntegerType), + Property("comments", IntegerType), + Property("companyPageClicks", IntegerType), + Property("conversionValueInLocalCurrency", StringType), + Property( + "dateRange", + ObjectType( + Property( + "end", + ObjectType( + Property("day", IntegerType), + Property("month", IntegerType), + Property("year", IntegerType), + additional_properties=False, + ), + ), + Property( + "start", + ObjectType( + Property("day", IntegerType), + Property("month", IntegerType), + Property("year", IntegerType), + additional_properties=False, + ), + ), + ), + ), + Property("day", StringType), + Property("externalWebsiteConversions", IntegerType), + Property("externalWebsitePostClickConversions", IntegerType), + Property("externalWebsitePostViewConversions", IntegerType), + Property("follows", IntegerType), + Property("fullScreenPlays", IntegerType), + Property("impressions", IntegerType), + Property("landingPageClicks", IntegerType), + Property("leadGenerationMailContactInfoShares", IntegerType), + Property("leadGenerationMailInterestedClicks", IntegerType), + Property("likes", IntegerType), + Property("oneClickLeadFormOpens", IntegerType), + Property("oneClickLeads", IntegerType), + Property("opens", IntegerType), + Property("otherEngagements", IntegerType), + Property("sends", IntegerType), + Property("shares", IntegerType), + Property("textUrlClicks", IntegerType), + Property("totalEngagements", IntegerType), + Property("videoCompletions", IntegerType), + Property("videoFirstQuartileCompletions", IntegerType), + Property("videoMidpointCompletions", IntegerType), + Property("videoStarts", IntegerType), + Property("videoThirdQuartileCompletions", IntegerType), + Property("videoViews", IntegerType), + Property("viralClicks", IntegerType), + Property("viralComments", IntegerType), + Property("viralCompanyPageClicks", IntegerType), + Property("viralExternalWebsiteConversions", IntegerType), + Property("viralExternalWebsitePostClickConversions", IntegerType), + Property("viralExternalWebsitePostViewConversions", IntegerType), + Property("viralFollows", IntegerType), + Property("viralFullScreenPlays", IntegerType), + Property("viralImpressions", IntegerType), + Property("viralLandingPageClicks", IntegerType), + Property("viralLikes", IntegerType), + Property("viralOneClickLeadFormOpens", IntegerType), + Property("viralOneclickLeads", IntegerType), + Property("viralOtherEngagements", IntegerType), + Property("viralReactions", IntegerType), + Property("viralShares", IntegerType), + Property("viralTotalEngagements", IntegerType), + Property("viralVideoCompletions", IntegerType), + Property("viralVideoFirstQuartileCompletions", IntegerType), + Property("viralVideoMidpointCompletions", IntegerType), + Property("viralVideoStarts", IntegerType), + Property("viralVideoThirdQuartileCompletions", IntegerType), + Property("viralVideoViews", IntegerType), + ).to_dict() + + @property + def adanalyticscolumns(self) -> list[str]: + """List of columns for adanalytics endpoint.""" + return [ + "viralLandingPageClicks,viralExternalWebsitePostClickConversions,externalWebsiteConversions,viralVideoFirstQuartileCompletions,leadGenerationMailContactInfoShares,clicks,viralClicks,shares,viralFullScreenPlays,videoMidpointCompletions,viralCardClicks,viralExternalWebsitePostViewConversions,viralTotalEngagements,viralCompanyPageClicks,actionClicks,viralShares,videoCompletions,comments,externalWebsitePostViewConversions,dateRange", + "costInUsd,landingPageClicks,oneClickLeadFormOpens,talentLeads,sends,viralOneClickLeadFormOpens,conversionValueInLocalCurrency,viralFollows,otherEngagements,viralVideoCompletions,cardImpressions,leadGenerationMailInterestedClicks,opens,totalEngagements,videoViews,viralImpressions,viralVideoViews,commentLikes,viralDocumentThirdQuartileCompletions,viralLikes", + "adUnitClicks,videoThirdQuartileCompletions,cardClicks,likes,viralComments,viralVideoMidpointCompletions,viralVideoThirdQuartileCompletions,oneClickLeads,fullScreenPlays,viralCardImpressions,follows,videoStarts,videoFirstQuartileCompletions,textUrlClicks,reactions,viralReactions,externalWebsitePostClickConversions,viralOtherEngagements,costInLocalCurrency", + "viralVideoStarts,viralRegistrations,viralJobApplyClicks,viralJobApplications,jobApplications,jobApplyClicks,viralExternalWebsiteConversions,postViewRegistrations,companyPageClicks,documentCompletions,documentFirstQuartileCompletions,documentMidpointCompletions,documentThirdQuartileCompletions,downloadClicks,viralDocumentCompletions,viralDocumentFirstQuartileCompletions,viralDocumentMidpointCompletions,approximateUniqueImpressions,viralDownloadClicks,impressions", + ] + + def get_url_params( + self, + context: dict | None, # noqa: ARG002 + next_page_token: t.Any | None, # noqa: ANN401 + ) -> dict[str, t.Any]: + """Return a dictionary of values to be used in URL parameterization. + + Args: + context: The stream context. + next_page_token: The next page index or value. + + Returns: + A dictionary of URL query parameters. + """ + return { + "q": "analytics", + **super().get_url_params(context, next_page_token), + } + + def get_unescaped_params(self, context: Context | None) -> dict: + start_date = pendulum.parse(self.config["start_date"]) + end_date = pendulum.parse(self.config["end_date"]) + creative_urn = context["creative_urn"] + creative_id = creative_urn.split(":")[-1] + return { + "pivot": "(value:CREATIVE)", + "timeGranularity": "(value:DAILY)", + "creatives": f"List(urn%3Ali%3AsponsoredCreative%3A{creative_id})", + "dateRange": f"(start:(year:{start_date.year},month:{start_date.month},day:{start_date.day}),end:(year:{end_date.year},month:{end_date.month},day:{end_date.day}))", + "fields": self.adanalyticscolumns[0], + } + + def post_process(self, row: dict, context: dict | None = None) -> dict | None: + # This function extracts day, month, and year from date range column + # These values are parsed with datetime function and the date is added to the day column + date_range = row.get("dateRange", {}) + start_date = date_range.get("start", {}) + + if start_date: + row["day"] = datetime.strptime( + f'{start_date.get("year")}-{start_date.get("month")}-{start_date.get("day")}', + "%Y-%m-%d", + ).astimezone(UTC) + + viral_registrations = row.pop("viralRegistrations", None) + if viral_registrations: + row["viralRegistrations"] = int(viral_registrations) + + return super().post_process(row, context) + + +class AdAnalyticsByCreativeStream(AdAnalyticsByCreativeInit): + name = "ad_analytics_by_creative" + + def get_unescaped_params(self, context: Context | None) -> dict: + return { + **super().get_unescaped_params(context), + # Overwrite fields with this column subset + "fields": self.adanalyticscolumns[1], + } + + def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: + """Return a dictionary of records from adAnalytics classes. + + Combines request columns from multiple calls to the api, which are limited to 20 columns + each. + + Uses `merge_dicts` to combine responses from each class + super().get_records calls only the records from adAnalyticsByCreative class + zip() Iterates over the records of adAnalytics classes and merges them with merge_dicts() + function list() converts each stream context into lists + + Args: + context: The stream context. + + Returns: + A dictionary of records given from adAnalytics streams + """ + adanalyticsinit_stream = AdAnalyticsByCreativeInit( + self._tap, + schema={"properties": {}}, + ) + adanalyticsecond_stream = AdAnalyticsByCreativeSecond( + self._tap, + schema={"properties": {}}, + ) + adanalyticsthird_stream = AdAnalyticsByCreativeThird( + self._tap, + schema={"properties": {}}, + ) + return [ + self.merge_dicts(x, y, z, p) + for x, y, z, p in zip( + list(adanalyticsinit_stream.get_records(context)), + list(super().get_records(context)), + list(adanalyticsecond_stream.get_records(context)), + list(adanalyticsthird_stream.get_records(context)), + ) + ] + + def merge_dicts(self, *dict_args: dict) -> dict: + """Return a merged dictionary of adAnalytics responses. + + Args: + *dict_args: dictionaries with adAnalytics response data. + + Returns: + A merged dictionary of adAnalytics responses + """ + result = {} + for dictionary in dict_args: + result.update(dictionary) + return result + + +class AdAnalyticsByCreativeSecond(AdAnalyticsByCreativeInit): + name = "adanalyticsbycreative_second" + + def get_unescaped_params(self, context: Context | None) -> dict: + return { + **super().get_unescaped_params(context), + # Overwrite fields with this column subset + "fields": self.adanalyticscolumns[2], + } + + +class AdAnalyticsByCreativeThird(AdAnalyticsByCreativeInit): + name = "adanalyticsbycreative_third" + + def get_unescaped_params(self, context: Context | None) -> dict: + return { + **super().get_unescaped_params(context), + # Overwrite fields with this column subset + "fields": self.adanalyticscolumns[3], + } diff --git a/tap_linkedin_ads/tap.py b/tap_linkedin_ads/tap.py index 716cb50..025a5c6 100644 --- a/tap_linkedin_ads/tap.py +++ b/tap_linkedin_ads/tap.py @@ -83,6 +83,7 @@ def discover_streams(self) -> list[streams.LinkedInAdsStream]: streams.CreativesStream(self), streams.VideoAdsStream(self), streams.AdAnalyticsByCampaignStream(self), + streams.AdAnalyticsByCreativeStream(self), ]