7
7
from datetime import datetime , timezone
8
8
from pathlib import Path
9
9
10
- from singer_sdk .authenticators import BearerTokenAuthenticator
10
+ import requests
11
+ from singer_sdk .authenticators import BearerTokenAuthenticator , OAuthAuthenticator , SingletonMeta
11
12
from singer_sdk .streams import RESTStream
12
13
13
- if t .TYPE_CHECKING :
14
- import requests
15
-
16
14
SCHEMAS_DIR = Path (__file__ ).parent / Path ("./schemas" )
17
15
UTC = timezone .utc
18
16
17
+ _Auth = t .Callable [[requests .PreparedRequest ], requests .PreparedRequest ]
18
+
19
+
20
+ class LinkedInAdsOAuthAuthenticator (OAuthAuthenticator , metaclass = SingletonMeta ):
21
+ """Authenticator class for LinkedInAds."""
22
+
23
+ @property
24
+ def oauth_request_body (self ):
25
+ return {
26
+ "grant_type" : "refresh_token" ,
27
+ "client_id" : self .config ["oauth_credentials" ]["client_id" ],
28
+ "client_secret" : self .config ["oauth_credentials" ]["client_secret" ],
29
+ "refresh_token" : self .config ["oauth_credentials" ]["refresh_token" ],
30
+ }
31
+
19
32
20
33
class LinkedInAdsStream (RESTStream ):
21
34
"""LinkedInAds stream class."""
22
35
23
36
records_jsonpath = "$[*]" # Or override `parse_response`.
24
- next_page_token_jsonpath = (
25
- "$.paging.start" # Or override `get_next_page_token`. # noqa: S105
26
- )
37
+ next_page_token_jsonpath = "$.paging.start" # Or override `get_next_page_token`. # noqa: S105
27
38
28
39
@property
29
- def authenticator (self ) -> BearerTokenAuthenticator :
40
+ def authenticator (self ) -> _Auth :
30
41
"""Return a new authenticator object.
31
42
32
43
Returns:
33
44
An authenticator instance.
34
45
"""
46
+ if "oauth_credentials" in self .config :
47
+ return LinkedInAdsOAuthAuthenticator (
48
+ self ,
49
+ auth_endpoint = "https://www.linkedin.com/oauth/v2/accessToken" ,
50
+ )
35
51
return BearerTokenAuthenticator .create_for_stream (
36
52
self ,
37
53
token = self .config ["access_token" ],
@@ -143,11 +159,7 @@ def parse_response(
143
159
columns ["run_schedule_start" ] = datetime .fromtimestamp ( # noqa: DTZ006
144
160
int (schedule_column ) / 1000 ,
145
161
).isoformat ()
146
- yield from (
147
- resp_json ["elements" ]
148
- if resp_json .get ("elements" ) is not None
149
- else [columns ]
150
- )
162
+ yield from (resp_json ["elements" ] if resp_json .get ("elements" ) is not None else [columns ])
151
163
152
164
def _to_id_column (
153
165
self ,
@@ -161,9 +173,7 @@ def _to_id_column(
161
173
162
174
def _add_datetime_columns (self , columns ): # noqa: ANN202, ANN001
163
175
created_time = columns .get ("changeAuditStamps" ).get ("created" ).get ("time" )
164
- last_modified_time = (
165
- columns .get ("changeAuditStamps" ).get ("lastModified" ).get ("time" )
166
- )
176
+ last_modified_time = columns .get ("changeAuditStamps" ).get ("lastModified" ).get ("time" )
167
177
columns ["created_time" ] = datetime .fromtimestamp (
168
178
int (created_time ) / 1000 ,
169
179
tz = UTC ,
0 commit comments