-
Notifications
You must be signed in to change notification settings - Fork 120
Expand file tree
/
Copy pathhandler_async_test.py
More file actions
289 lines (240 loc) · 9.59 KB
/
handler_async_test.py
File metadata and controls
289 lines (240 loc) · 9.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
import json
import os
import sys
from ipinfo.cache.default import DefaultCache
from ipinfo.details import Details
from ipinfo.handler_async import AsyncHandler
from ipinfo import handler_utils
from ipinfo.error import APIError
from ipinfo.exceptions import RequestQuotaExceededError
import ipinfo
import pytest
import aiohttp
skip_if_python_3_11_or_later = sys.version_info >= (3, 11)
class MockResponse:
def __init__(self, text, status, headers):
self._text = text
self.status = status
self.headers = headers
def text(self):
return self._text
async def json(self):
return json.loads(self._text)
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def __aenter__(self):
return self
async def release(self):
pass
@pytest.mark.asyncio
async def test_init():
token = "mytesttoken"
handler = AsyncHandler(token)
assert handler.access_token == token
assert isinstance(handler.cache, DefaultCache)
assert "PK" in handler.countries
await handler.deinit()
@pytest.mark.asyncio
async def test_headers():
token = "mytesttoken"
handler = AsyncHandler(token, headers={"custom_field": "yes"})
headers = handler_utils.get_headers(token, handler.headers)
await handler.deinit()
assert "user-agent" in headers
assert "accept" in headers
assert "authorization" in headers
assert "custom_field" in headers
@pytest.mark.asyncio
async def test_get_details():
token = os.environ.get("IPINFO_TOKEN", "")
handler = AsyncHandler(token)
details = await handler.getDetails("8.8.8.8")
assert isinstance(details, Details)
assert details.ip == "8.8.8.8"
assert details.hostname == "dns.google"
assert details.city == "Mountain View"
assert details.region == "California"
assert details.country == "US"
assert details.country_name == "United States"
assert details.isEU == False
country_flag = details.country_flag
assert country_flag["emoji"] == "🇺🇸"
assert country_flag["unicode"] == "U+1F1FA U+1F1F8"
country_flag_url = details.country_flag_url
assert (
country_flag_url
== "https://cdn.ipinfo.io/static/images/countries-flags/US.svg"
)
country_currency = details.country_currency
assert country_currency["code"] == "USD"
assert country_currency["symbol"] == "$"
continent = details.continent
assert continent["code"] == "NA"
assert continent["name"] == "North America"
assert details.loc is not None
assert details.latitude is not None
assert details.longitude is not None
assert details.postal == "94043"
assert details.timezone == "America/Los_Angeles"
if token:
asn = details.asn
assert asn["asn"] == "AS15169"
assert asn["name"] == "Google LLC"
assert asn["domain"] == "google.com"
assert asn["route"] == "8.8.8.0/24"
assert asn["type"] == "hosting"
company = details.company
assert company["name"] == "Google LLC"
assert company["domain"] == "google.com"
assert company["type"] == "hosting"
privacy = details.privacy
assert privacy["vpn"] == False
assert privacy["proxy"] == False
assert privacy["tor"] == False
assert privacy["relay"] == False
assert privacy["hosting"] == True
assert privacy["service"] == ""
abuse = details.abuse
assert (
abuse["address"]
== "US, CA, Mountain View, 1600 Amphitheatre Parkway, 94043"
)
assert abuse["country"] == "US"
assert abuse["email"] == "network-abuse@google.com"
assert abuse["name"] == "Abuse"
assert abuse["network"] == "8.8.8.0/24"
assert abuse["phone"] == "+1-650-253-0000"
domains = details.domains
assert domains["ip"] == "8.8.8.8"
# NOTE: actual number changes too much
assert "total" in domains
assert len(domains["domains"]) == 5
await handler.deinit()
@pytest.mark.parametrize(
("mock_resp_status_code", "mock_resp_headers", "mock_resp_error_msg", "expected_error_json"),
[
pytest.param(503, {"Content-Type": "text/plain"}, "Service Unavailable", {"error": "Service Unavailable"}, id="5xx_not_json"),
pytest.param(403, {"Content-Type": "application/json"}, '{"message": "missing token"}', {"message": "missing token"}, id="4xx_json"),
pytest.param(400, {"Content-Type": "application/json"}, '{"message": "missing field"}', {"message": "missing field"}, id="400"),
]
)
@pytest.mark.asyncio
async def test_get_details_error(monkeypatch, mock_resp_status_code, mock_resp_headers, mock_resp_error_msg, expected_error_json):
async def mock_get(*args, **kwargs):
response = MockResponse(status=mock_resp_status_code, text=mock_resp_error_msg, headers=mock_resp_headers)
return response
monkeypatch.setattr(aiohttp.ClientSession, 'get', lambda *args, **kwargs: aiohttp.client._RequestContextManager(mock_get()))
token = os.environ.get("IPINFO_TOKEN", "")
handler = AsyncHandler(token)
with pytest.raises(APIError) as exc_info:
await handler.getDetails("8.8.8.8")
assert exc_info.value.error_code == mock_resp_status_code
assert exc_info.value.error_json == expected_error_json
@pytest.mark.asyncio
async def test_get_details_quota_error(monkeypatch):
async def mock_get(*args, **kwargs):
response = MockResponse(status=429, text="Quota exceeded", headers={})
return response
monkeypatch.setattr(aiohttp.ClientSession, 'get', lambda *args, **kwargs: aiohttp.client._RequestContextManager(mock_get()))
token = os.environ.get("IPINFO_TOKEN", "")
handler = AsyncHandler(token)
with pytest.raises(RequestQuotaExceededError):
await handler.getDetails("8.8.8.8")
#############
# BATCH TESTS
#############
_batch_ip_addrs = ["1.1.1.1", "8.8.8.8", "9.9.9.9"]
def _prepare_batch_test():
"""Helper for preparing batch test cases."""
token = os.environ.get("IPINFO_TOKEN", "")
if not token:
pytest.skip("token required for batch tests")
handler = AsyncHandler(token)
return handler, token, _batch_ip_addrs
def _check_batch_details(ips, details, token):
"""Helper for batch tests."""
for ip in ips:
assert ip in details
d = details[ip]
assert d["ip"] == ip
assert "country" in d
assert "country_name" in d
if token:
assert "asn" in d
assert "company" in d
assert "privacy" in d
assert "abuse" in d
assert "domains" in d
@pytest.mark.skipif(skip_if_python_3_11_or_later, reason="Requires Python 3.10 or earlier")
@pytest.mark.parametrize("batch_size", [None, 1, 2, 3])
@pytest.mark.asyncio
async def test_get_batch_details(batch_size):
handler, token, ips = _prepare_batch_test()
details = await handler.getBatchDetails(ips, batch_size=batch_size)
_check_batch_details(ips, details, token)
await handler.deinit()
def _check_iterative_batch_details(ip, details, token):
"""Helper for iterative batch tests."""
assert ip == details.get("ip")
assert "country" in details
assert "city" in details
if token:
assert "asn" in details or "anycast" in details
assert "company" in details or "org" in details
assert "privacy" in details or "anycast" in details
assert "abuse" in details or "anycast" in details
assert "domains" in details or "anycast" in details
@pytest.mark.parametrize("batch_size", [None, 1, 2, 3])
@pytest.mark.asyncio
async def test_get_iterative_batch_details(batch_size):
handler, token, ips = _prepare_batch_test()
async for ips, details in handler.getBatchDetailsIter(ips, batch_size):
_check_iterative_batch_details(ips, details, token)
@pytest.mark.skipif(skip_if_python_3_11_or_later, reason="Requires Python 3.10 or earlier")
@pytest.mark.parametrize("batch_size", [None, 1, 2, 3])
@pytest.mark.asyncio
async def test_get_batch_details_total_timeout(batch_size):
handler, token, ips = _prepare_batch_test()
with pytest.raises(ipinfo.exceptions.TimeoutExceededError):
await handler.getBatchDetails(
ips, batch_size=batch_size, timeout_total=0.001
)
await handler.deinit()
#############
# BOGON TESTS
#############
@pytest.mark.asyncio
async def test_bogon_details():
token = os.environ.get("IPINFO_TOKEN", "")
handler = AsyncHandler(token)
details = await handler.getDetails("127.0.0.1")
assert details.all == {"bogon": True, "ip": "127.0.0.1"}
#################
# RESPROXY TESTS
#################
@pytest.mark.asyncio
async def test_get_resproxy():
token = os.environ.get("IPINFO_TOKEN", "")
if not token:
pytest.skip("token required for resproxy tests")
handler = AsyncHandler(token)
# Use an IP known to be a residential proxy (from API documentation)
details = await handler.getResproxy("175.107.211.204")
assert isinstance(details, Details)
assert details.ip == "175.107.211.204"
assert details.last_seen is not None
assert details.percent_days_seen is not None
assert details.service is not None
await handler.deinit()
@pytest.mark.asyncio
async def test_get_resproxy_caching():
token = os.environ.get("IPINFO_TOKEN", "")
if not token:
pytest.skip("token required for resproxy tests")
handler = AsyncHandler(token)
# First call should hit the API
details1 = await handler.getResproxy("175.107.211.204")
# Second call should hit the cache
details2 = await handler.getResproxy("175.107.211.204")
assert details1.ip == details2.ip
await handler.deinit()