diff --git a/.gitignore b/.gitignore index 385dffa..a7aebcf 100644 --- a/.gitignore +++ b/.gitignore @@ -98,3 +98,5 @@ config.json .autoenv.zsh todo.org + +.idea/ diff --git a/tap_zuora/apis.py b/tap_zuora/apis.py index bc37248..542f7be 100644 --- a/tap_zuora/apis.py +++ b/tap_zuora/apis.py @@ -1,6 +1,7 @@ import pendulum import singer from singer import metadata +from tap_zuora.client import ApiException MAX_EXPORT_DAYS = 30 @@ -59,7 +60,7 @@ class Aqua: ZOQL_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" # Specifying incrementalTime requires this format, but ZOQL requires the 'T' PARAMETER_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" - + # Zuora's documentation describes some objects which are not supported for deleted # See https://knowledgecenter.zuora.com/DC_Developers/T_Aggregate_Query_API/B_Submit_Query/a_Export_Deleted_Data # and https://github.com/singer-io/tap-zuora/pull/8 for more info. @@ -301,7 +302,12 @@ def stream_status(client, stream_name): "Query": query, "Format": "csv" } - resp = client.rest_request("POST", endpoint, json=payload).json() + + # With OAuth2, some entities that exist in Describe might not be available in the export + try: + resp = client.rest_request("POST", endpoint, json=payload).json() + except ApiException: + return "unavailable" if resp["Success"]: return "available" diff --git a/tap_zuora/client.py b/tap_zuora/client.py index 5d559ee..0e02d12 100644 --- a/tap_zuora/client.py +++ b/tap_zuora/client.py @@ -1,5 +1,5 @@ import requests - +import pendulum import singer @@ -33,13 +33,16 @@ def __init__(self, resp): class Client: - def __init__(self, username, password, partner_id, sandbox=False, european=False): + def __init__(self, username, password, partner_id, sandbox=False, european=False, use_oauth2=False): self.username = username self.password = password self.sandbox = sandbox self.european = european self.partner_id = partner_id self._session = requests.Session() + self.oauth2_token = None + self.use_oauth2 = use_oauth2 + self.token_expiration_date = None adapter = requests.adapters.HTTPAdapter(max_retries=1) # Try again in the case the TCP socket closes self._session.mount('https://', adapter) @@ -49,7 +52,8 @@ def from_config(config): sandbox = config.get('sandbox', False) == 'true' european = config.get('european', False) == 'true' partner_id = config.get('partner_id', None) - return Client(config['username'], config['password'], partner_id, sandbox, european) + use_oauth2 = config.get('use_oauth2', False) == 'true' + return Client(config['username'], config['password'], partner_id, sandbox, european, use_oauth2) def get_url(self, url, rest=False): return URLS[(rest, self.sandbox, self.european)] + url @@ -60,12 +64,18 @@ def aqua_auth(self): @property def rest_headers(self): - return { - 'apiAccessKeyId': self.username, - 'apiSecretAccessKey': self.password, - 'X-Zuora-WSDL-Version': LATEST_WSDL_VERSION, - 'Content-Type': 'application/json', + if self.use_oauth2: + return { + 'Authorization': 'Bearer ' + self.oauth2_token['access_token'], + 'Content-Type': 'application/json', } + else: + return { + 'apiAccessKeyId': self.username, + 'apiSecretAccessKey': self.password, + 'X-Zuora-WSDL-Version': LATEST_WSDL_VERSION, + 'Content-Type': 'application/json', + } def _request(self, method, url, stream=False, **kwargs): req = requests.Request(method, url, **kwargs).prepare() @@ -76,10 +86,43 @@ def _request(self, method, url, stream=False, **kwargs): return resp + def is_auth_token_valid(self): + if self.oauth2_token and self.token_expiration_date and pendulum.utcnow().diff(self.token_expiration_date).in_seconds() > 60: # Allows at least one minute of breathing room + return True + + return False + + def ensure_valid_auth_token(self): + if not self.is_auth_token_valid(): + self.oauth2_token = self.request_token() + + def request_token(self): + url = self.get_url('oauth/token', rest=True) + payload = { + 'client_id': self.username, + 'client_secret': self.password, + 'grant_type': 'client_credentials', + } + + token = self._request('POST', url, data=payload).json() + self.token_expiration_date = pendulum.utcnow().add(seconds=token['expires_in']) + + return token + def aqua_request(self, method, url, **kwargs): + if self.use_oauth2: + self.ensure_valid_auth_token() + url = self.get_url(url, rest=False) - return self._request(method, url, auth=self.aqua_auth, **kwargs) + + if self.use_oauth2: + return self._request(method, url, headers=self.rest_headers, **kwargs) + else: + return self._request(method, url, auth=self.aqua_auth, **kwargs) def rest_request(self, method, url, **kwargs): + if self.use_oauth2: + self.ensure_valid_auth_token() + url = self.get_url(url, rest=True) return self._request(method, url, headers=self.rest_headers, **kwargs)