Skip to content

Commit 350b3e4

Browse files
authored
Merge pull request #80 from my-dev-app/hotfix/bandwitdh
Hotfix/bandwitdh
2 parents 89ca549 + 9cc0d2c commit 350b3e4

File tree

4 files changed

+34
-12
lines changed

4 files changed

+34
-12
lines changed

aproxyrelay/__init__.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,6 @@ def __init__(
6161
# Configure the logger
6262
logging.basicConfig(level=logging.INFO if not debug else logging.DEBUG)
6363
self.logger = logging.getLogger(__name__)
64-
console_handler = logging.StreamHandler()
65-
console_handler.setLevel(logging.DEBUG if debug else logging.INFO)
66-
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
67-
console_handler.setFormatter(formatter)
68-
self.logger.addHandler(console_handler)
6964

7065
# Initialize Core
7166
AProxyRelayCore.__init__(self)

aproxyrelay/core.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,14 @@ async def get_proxies(self) -> None:
7373
if self.scrape:
7474
async with ClientSession(conn_timeout=self.timeout) as session:
7575
await self._fetch_proxy_page(scrapes, session)
76-
self.logger.info(f'[aProxyRelay] Scraper: Found {self._queue_filter.qsize()} competent proxy servers')
76+
self.logger.info(f'[aProxyRelay] Scraper: Found ({self._queue_filter.qsize()}) competent proxy servers')
7777
else:
7878
self.logger.info('[aProxyRelay] Scraper: Skip discovery of new proxy servers ...')
7979

8080
if self.filter and self.scrape:
8181
self.logger.info(f'[aProxyRelay] Validating: Proxies ({self._queue_filter.qsize()}), checking if proxies meet connection requirements ...') # noqa: B950
82+
self.logger.info(f'[aProxyRelay] Keep an eye on "pending task name" once it reaches ({self._queue_filter.qsize()}) all tests have been completed') # noqa: B950
83+
self.logger.info('[aProxyRelay] Grab some coffee ... please wait ...')
8284
await self._test_all_proxies()
8385
self.logger.info(f'[aProxyRelay] Filter: Found {self._filtered_failed} incompetent and {self._filtered_available} available proxy servers in {datetime.now(UTC) - self.started}') # noqa: B950
8486
else:

aproxyrelay/process.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ def __init__(self) -> None:
2222
"""
2323
self._queue_result = Queue() # Holds target results
2424

25+
def _chunk_list(self, lst, chunk_size):
26+
"""Chunks a list in its desired size"""
27+
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]
28+
2529
async def _process_targets_main(self) -> None:
2630
"""
2731
Start the Proxy Relay Processor. Proxies in the queue are nothing less than burners.
@@ -41,8 +45,16 @@ async def _process_targets_main(self) -> None:
4145
# Append the coroutine object to the tasks list
4246
tasks.append(self._obtain_targets(proxy, target))
4347

44-
# Use asyncio.gather to concurrently execute all tasks
45-
await gather(*tasks)
48+
# We have to chunk our tasks, otherwise the internet bandwitdh might be compromised
49+
chunks = self._chunk_list(tasks, 10000)
50+
i = 0
51+
for chunk in chunks:
52+
self.logger.info(f"[aProxyRelay] Processing ({i}/{len(tasks)}) ... please wait ...")
53+
i += int(len(chunk))
54+
# Use asyncio.gather to concurrently execute all tasks
55+
await gather(*chunk)
56+
# # Use asyncio.gather to concurrently execute all tasks
57+
# await gather(*tasks)
4658

4759
self.logger.info(f'[aProxyRelay] Processing ({self._queue_target_process.qsize()}) items in Queue using ({self.proxies.qsize()}) proxies ... Please wait...') # noqa: B950
4860

aproxyrelay/req.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ def __init__(self) -> None:
3131
"""
3232
self.logger.info("[aProxyRelay] Request module initialized!")
3333

34+
def _chunk_list(self, lst, chunk_size):
35+
"""Chunks a list in its desired size"""
36+
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]
37+
3438
async def _fetch_proxy_page(self, urls, session):
3539
"""
3640
Use asyncio.gather to run multiple requests concurrently by executing `self._request_proxy_page`.
@@ -84,16 +88,25 @@ async def _test_all_proxies(self):
8488
Use asyncio.gather to run multiple requests concurrently by executing `self._test_proxy_link`.
8589
"""
8690
# Use asyncio.gather to run multiple tests concurrently
87-
to_filter = []
91+
raw = []
8892
while not self._queue_filter.empty():
8993
_target = self._queue_filter.get()
9094
_target['proxy'] = f"{_target['protocol'].replace('https', 'http')}://{_target['ip']}:{_target['port']}"
91-
to_filter.append(_target)
95+
raw.append(_target)
9296

9397
# Remove duplicate entries
94-
to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in to_filter]))]
98+
to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in raw]))]
99+
self.logger.info(f"[aProxyRelay] Found ({int(len(raw)) - int(len(to_filter))}) duplicates which have been removed")
95100
tasks = [self._test_proxy_link(proxy['proxy'], proxy) for proxy in to_filter]
96-
await gather(*tasks)
101+
# We have to chunk our tasks, otherwise the internet bandwitdh might be compromised
102+
chunks = self._chunk_list(tasks, 10000)
103+
i = 0
104+
for chunk in chunks:
105+
self.logger.info(f"[aProxyRelay] Brewing ({i}/{len(tasks)}) ... please wait ...")
106+
i += int(len(chunk))
107+
# Use asyncio.gather to concurrently execute all tasks
108+
await gather(*chunk)
109+
# await gather(*tasks)
97110

98111
async def _test_proxy_link(self, proxy_url, data) -> None:
99112
"""

0 commit comments

Comments
 (0)