-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathcall.py
More file actions
97 lines (82 loc) · 3.25 KB
/
call.py
File metadata and controls
97 lines (82 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from .authentication import Authentication
from .authentication import DEFAULT_USER_AGENT
from .credentials import Profile
from .requester import Requester
from requests import Session
from urllib3.util import parse_url
from datetime import timedelta
from .limiter import RateLimiter
import json
import warnings
class Call(object):
def __init__(self, logger=None, limiter=None, **kwargs):
self.version = kwargs.pop("version", "latest")
self.host = kwargs.pop("host", None)
self.ssl = kwargs.pop("_ssl", True)
self.user_agent = kwargs.pop("user_agent", DEFAULT_USER_AGENT)
self.logger = logger
self.limiter: RateLimiter | None = limiter
self.retry_kwargs = {}
self.session = Session()
self.session.trust_env = False
kwargs = self.update_limiter(**kwargs)
kwargs = self.update_retry(**kwargs)
self.update_profile(**kwargs)
def update_credentials(self, **kwargs):
warnings.warn(
"update_credentials is deprecated, use update_profile instead",
DeprecationWarning,
stacklevel=2,
)
return self.update_profile(**kwargs)
def update_profile(self, **kwargs):
self.profile = Profile.from_standard_configuration(
kwargs.pop("path", None), kwargs.pop("profile", None)
)
self.profile.merge(Profile(**kwargs))
return kwargs
def update_limiter(self, **kwargs):
limiter_window = kwargs.pop("limiter_window", None)
if limiter_window is not None and self.limiter is not None:
self.limiter.window = timedelta(seconds=int(limiter_window))
limiter_max_requests = kwargs.pop("limiter_max_requests", None)
if limiter_max_requests is not None and self.limiter is not None:
self.limiter.max_requests = limiter_max_requests
return kwargs
def update_retry(self, **kwargs):
max_retries = kwargs.pop("max_retries", None)
if max_retries is not None:
self.retry_kwargs["max_retries"] = int(max_retries)
for key in ["backoff_factor", "backoff_jitter", "backoff_max"]:
value = kwargs.pop(f"retry_{key}", None)
if value is not None:
self.retry_kwargs[key] = float(value)
return kwargs
def api(self, action, service="api", **data):
try:
endpoint = self.profile.get_endpoint(service) + "/" + action
parsed_url = parse_url(endpoint)
uri = parsed_url.path
host = parsed_url.host
if self.limiter is not None:
self.limiter.acquire()
requester = Requester(
self.session,
Authentication(
self.profile,
host,
user_agent=self.user_agent,
),
endpoint,
**self.retry_kwargs,
)
if self.logger is not None:
self.logger.do_log(
"uri: " + uri + "\npayload:\n" + json.dumps(data, indent=2)
)
return requester.send(uri, json.dumps(data))
except Exception as err:
raise err
def close(self):
if self.session:
self.session.close()