Skip to content

Commit

Permalink
Issue #83: Use Asyncio (use aiohttp)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nekmo committed Oct 21, 2020
1 parent 0c946a2 commit 4b4f049
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 92 deletions.
28 changes: 16 additions & 12 deletions dirhunt/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def add_init_urls(self, *urls):
for crawler_url in urls:
if not isinstance(crawler_url, CrawlerUrl):
crawler_url = CrawlerUrl(self, crawler_url, depth=self.depth, timeout=self.timeout)
self.add_domain(crawler_url.url.only_domain)
await self.add_domain(crawler_url.url.only_domain)
await self.add_url(crawler_url)

def in_domains(self, domain):
Expand All @@ -89,27 +89,31 @@ def in_domains(self, domain):
return False
domain = '.'.join(parts[1:])

def add_domain(self, domain):
async def add_domain(self, domain):
if domain in self.domains:
return
self.domains.add(domain)
self.sources.add_domain(domain)
await self.sources.add_domain(domain)

async def add_url(self, crawler_url, force=False):
"""Add url to queue"""
if not isinstance(crawler_url, CrawlerUrl):
crawler_url = CrawlerUrl(self, crawler_url, depth=self.depth, timeout=self.timeout)
self.add_lock.acquire()
# self.add_lock.acquire()
url = crawler_url.url
if not url.is_valid() or not url.only_domain or not self.in_domains(url.only_domain):
self.add_lock.release()
return
if url.url in self.processing or url.url in self.processed:
self.add_lock.release()
return self.processing.get(url.url) or self.processed.get(url.url)
# if not url.is_valid() or not url.only_domain or not self.in_domains(url.only_domain):
# self.add_lock.release()
# return
# if url.url in self.processing or url.url in self.processed:
# self.add_lock.release()
# return self.processing.get(url.url) or self.processed.get(url.url)

# fn = reraise_with_stack(crawler_url.start)
await crawler_url.start()
if url.url in self.processing:
return self.processing[url.url]
future = crawler_url.start()
self.processing[url.url] = future
return await future
# if self.closing:
# self.add_lock.release()
# return
Expand Down Expand Up @@ -151,7 +155,7 @@ def print_progress(self, finished=False):
(humanize.naturaldelta if finished else humanize.naturaltime)(datetime.datetime.now() - self.start_dt),
))

def print_results(self, exclude=None, include=None):
async def print_results(self, exclude=None, include=None):
exclude = exclude or set()
self.echo('Starting...')
while True:
Expand Down
59 changes: 39 additions & 20 deletions dirhunt/crawler_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,37 @@ def __init__(self, crawler, url, depth=3, source=None, exists=None, type=None, t
self.resp = None
self.processor_data = None

def add_self_directories(self, exists=None, type_=None):
async def add_self_directories(self, exists=None, type_=None):
for url in self.url.breadcrumb():
self.crawler.add_url(CrawlerUrl(self.crawler, url, self.depth - 1, self, exists, type_,
timeout=self.timeout))
await self.crawler.add_url(CrawlerUrl(self.crawler, url, self.depth - 1, self, exists, type_,
timeout=self.timeout))
# TODO: si no se puede añadir porque ya se ha añadido, establecer como que ya existe si la orden es exists

async def start(self):
from dirhunt.processors import get_processor, GenericProcessor, Error, ProcessIndexOfRequest

session = self.crawler.sessions.get_session()
text = ''
soup = None
processor = None

try:
# resp = session.get(self.url.url, stream=True, verify=False, timeout=self.timeout, allow_redirects=False)
print(f'Request url {self.url.url}')
req = session.get(self.url.url, timeout=self.timeout, allow_redirects=False)
async with req as resp:
pass
if resp.status < 300 and self.must_be_downloaded(resp):
try:
text = await resp.content.read(MAX_RESPONSE_SIZE)
except (RequestException, ReadTimeoutError, socket.timeout) as e:
self.crawler.current_processed_count += 1
self.crawler.results.put(Error(self, e))
self.close()
return self
else:
text = text.decode(resp.charset, errors='ignore')
soup = BeautifulSoup(text, 'html.parser') if resp.headers.get(
'Content-Type') == 'text/html' else None
except ClientError as e:
self.crawler.current_processed_count += 1
self.crawler.results.put(Error(self, e))
Expand All @@ -66,21 +82,24 @@ async def start(self):
self.set_type(resp.headers.get('Content-Type'))
self.flags.add(str(resp.status))

text = ''
soup = None
processor = None
if resp.status < 300 and self.must_be_downloaded(resp):
try:
text = resp.raw.read(MAX_RESPONSE_SIZE, decode_content=True)
except (RequestException, ReadTimeoutError, socket.timeout) as e:
self.crawler.current_processed_count += 1
self.crawler.results.put(Error(self, e))
self.close()
return self
soup = BeautifulSoup(text, 'html.parser') if resp.headers.get('Content-Type') == 'text/html' else None
# text = ''
# soup = None
# processor = None
# if resp.status < 300 and self.must_be_downloaded(resp):
# try:
# print(self.url.url)
# text = await resp.content.read(MAX_RESPONSE_SIZE)
# except (RequestException, ReadTimeoutError, socket.timeout) as e:
# self.crawler.current_processed_count += 1
# self.crawler.results.put(Error(self, e))
# self.close()
# return self
# else:
# text = text.decode(resp.charset, errors='ignore')
# soup = BeautifulSoup(text, 'html.parser') if resp.headers.get('Content-Type') == 'text/html' else None
if self.must_be_downloaded(resp):
processor = get_processor(resp, text, self, soup) or GenericProcessor(resp, self)
processor.process(text, soup)
await processor.process(text, soup)
self.flags.update(processor.flags)
if self.maybe_directory():
self.crawler.results.put(processor)
Expand All @@ -93,8 +112,8 @@ async def start(self):
# TODO: Podemos fijarnos en el processor.index_file. Si existe y es un 200, entonces es que existe.
if self.exists is None and resp.status < 404:
self.exists = True
self.add_self_directories(True if (not self.maybe_rewrite() and self.exists) else None,
'directory' if not self.maybe_rewrite() else None)
await self.add_self_directories(True if (not self.maybe_rewrite() and self.exists) else None,
'directory' if not self.maybe_rewrite() else None)
self.close()
return self

Expand Down Expand Up @@ -130,7 +149,7 @@ def weight(self):

def close(self):
self.crawler.processed[self.url.url] = self
del self.crawler.processing[self.url.url]
# TODO: del self.crawler.processing[self.url.url]

def json(self):
return {
Expand Down
29 changes: 15 additions & 14 deletions dirhunt/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,21 @@ async def hunt(urls, threads, exclude_flags, include_flags, interesting_extensio
except IncompatibleVersionError as e:
click.echo(e)
await crawler.add_init_urls(*urls)
while True:
choice = catch_keyboard_interrupt_choices(crawler.print_results, ['abort', 'continue', 'results'], 'a')\
(set(exclude_flags), set(include_flags))
if choice == 'a':
crawler.close(True)
click.echo('Created resume file "{}". Run again using the same parameters to resume.'.format(
crawler.get_resume_file())
)
return
elif choice == 'c':
crawler.restart()
continue
else:
break
# await crawler.print_results()
# while True:
# choice = catch_keyboard_interrupt_choices(crawler.print_results, ['abort', 'continue', 'results'], 'a')\
# (set(exclude_flags), set(include_flags))
# if choice == 'a':
# crawler.close(True)
# click.echo('Created resume file "{}". Run again using the same parameters to resume.'.format(
# crawler.get_resume_file())
# )
# return
# elif choice == 'c':
# crawler.restart()
# continue
# else:
# break
crawler.print_urls_info()
if not sys.stdout.isatty():
output_urls(crawler, stdout_flags)
Expand Down
25 changes: 13 additions & 12 deletions dirhunt/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,28 @@
from dirhunt.exceptions import reraise_with_stack


class Pool(ThreadPoolExecutor):
class Pool:
def __init__(self, max_workers=None, **kwargs):
self.threads_running = 0
max_workers = max_workers or ((multiprocessing.cpu_count() or 1) * 5)
super(Pool, self).__init__(max_workers=max_workers, **kwargs)
# max_workers = max_workers or ((multiprocessing.cpu_count() or 1) * 5)
# super(Pool, self).__init__(max_workers=max_workers, **kwargs)

def callback(self, *args, **kwargs):
raise NotImplementedError

def submit(self, *args, **kwargs):
async def submit(self, *args, **kwargs):
self.threads_running += 1
return self.callback(*args, **kwargs)

def execute(*args, **kwargs):
try:
return reraise_with_stack(self.callback)(*args, **kwargs)
except Exception:
raise
finally:
self.threads_running -= 1
# def execute(*args, **kwargs):
# try:
# return reraise_with_stack(self.callback)(*args, **kwargs)
# except Exception:
# raise
# finally:
# self.threads_running -= 1

return super(Pool, self).submit(execute, *args, **kwargs)
# return super(Pool, self).submit(execute, *args, **kwargs)

def is_running(self):
return bool(self.threads_running)
Loading

0 comments on commit 4b4f049

Please sign in to comment.