Skip to content

Commit

Permalink
Issue #83: Use Asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
Nekmo committed Aug 9, 2023
1 parent 622c45f commit c5ba130
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 197 deletions.
12 changes: 6 additions & 6 deletions dirhunt/colors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

def status_code_colors(status_code):
if 100 <= status_code < 200:
return Fore.WHITE
return "white"
elif 200 == status_code:
return Fore.LIGHTGREEN_EX
return "green1"
elif 200 < status_code < 300:
return Fore.GREEN
return "green3"
elif 300 <= status_code < 400:
return Fore.LIGHTBLUE_EX
return "deep_sky_blue1"
elif 500 == status_code:
return Fore.LIGHTMAGENTA_EX
return "magenta1"
else:
return Fore.MAGENTA
return "medium_orchid1"
19 changes: 12 additions & 7 deletions dirhunt/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from concurrent.futures.thread import _python_exit
from threading import Lock, ThreadError
import datetime
from typing import Optional

import humanize as humanize
from click import get_terminal_size
from rich.console import Console

from dirhunt import processors
from dirhunt import __version__
Expand Down Expand Up @@ -65,12 +67,13 @@ def __init__(self, configuration: Configuration, loop: asyncio.AbstractEventLoop
self.configuration = configuration
self.loop = loop
self.tasks = set()
self.crawler_urls = set()
self.domains = set()
self.console = Console(highlight=False)
self.session = Session()
self.domain_semaphore = DomainSemaphore(configuration.concurrency)
self.domains = set()
self.results = Queue()
self.index_of_processors = []
self.processing = {}
self.processed = {}
self.add_lock = Lock()
self.start_dt = datetime.datetime.now()
Expand All @@ -82,16 +85,18 @@ async def start(self):
await self.add_crawler_url(
CrawlerUrl(self, url, depth=self.configuration.max_depth)
)
await asyncio.wait(self.tasks)
while self.tasks:
await asyncio.wait(self.tasks)

async def add_crawler_url(self, crawler_url: CrawlerUrl):
async def add_crawler_url(self, crawler_url: CrawlerUrl) -> Optional[asyncio.Task]:
"""Add crawler_url to tasks"""
if crawler_url.url.url in self.processing:
if crawler_url.url.url in self.crawler_urls:
return
task = self.loop.create_task(crawler_url.retrieve())
self.tasks.add(task)
self.processing[crawler_url.url.url] = task
task.add_done_callback(lambda: self.tasks.discard(task))
self.crawler_urls.add(crawler_url)
task.add_done_callback(self.tasks.discard)
return task

def add_init_urls(self, *urls):
"""Add urls to queue."""
Expand Down
212 changes: 131 additions & 81 deletions dirhunt/crawler_url.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# -*- coding: utf-8 -*-
import cgi
import socket
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Optional

from aiohttp.web_response import Response
from bs4 import BeautifulSoup
from requests import RequestException
from urllib3.exceptions import ReadTimeoutError
Expand All @@ -19,9 +20,86 @@

if TYPE_CHECKING:
from dirhunt.crawler import Crawler
from dirhunt.processors import ProcessBase


class CrawlerUrl(object):
class CrawlerUrlRequest:
response = Optional[Response]
content: Optional[str] = None
_soup: Optional[BeautifulSoup] = None

def __init__(self, crawler_url: "CrawlerUrl"):
self.crawler_url = crawler_url
self.crawler = crawler_url.crawler

async def retrieve(self) -> "ProcessBase":
from dirhunt.processors import (
get_processor,
Error,
)

text = ""
try:
await self.crawler.domain_semaphore.acquire(self.crawler_url.url.domain)
pass
async with self.crawler.session.get(
self.crawler_url.url.url,
verify_ssl=False,
timeout=self.crawler.configuration.timeout,
allow_redirects=False,
) as response:
self.crawler_url.set_type(response.headers.get("Content-Type"))
self.crawler_url.flags.add(str(response.status))
self.response = response
processor = get_processor(self)
if processor and processor.requires_content:
encoding = response.get_encoding()
self.content = (
await response.content.read(MAX_RESPONSE_SIZE)
).decode(encoding, errors="ignore")
if processor.has_descendants:
processor = get_processor(self)
# text = ""
# soup = None
# processor = None
# if response.status_code < 300 and self.must_be_downloaded(response):
# try:
# text = response.raw.read(MAX_RESPONSE_SIZE, decode_content=True)
# except (RequestException, ReadTimeoutError, socket.timeout) as e:
# self.crawler.current_processed_count += 1
# self.crawler.results.put(Error(self, e))
# self.close()
# return self
# content_type = cgi.parse_header(
# response.headers.get("Content-Type", "")
# )[0]
# soup = (
# BeautifulSoup(text, "html.parser")
# if content_type == "text/html"
# else None
# )
except RequestException as e:
self.crawler.current_processed_count += 1
processor = Error(self, e)
else:
await processor.process(self)
finally:
self.crawler.domain_semaphore.release(self.crawler_url.url.domain)
return processor

@property
def soup(self):
if self._soup is None and self.content is not None:
self._soup = BeautifulSoup(self.content, "html.parser")
return self._soup

def __repr__(self):
return "<CrawlerUrlRequest {}>".format(self.crawler_url.url)


class CrawlerUrl:
processor: Optional["ProcessBase"] = None

def __init__(
self,
crawler: "Crawler",
Expand Down Expand Up @@ -50,13 +128,13 @@ def __init__(
self.exists = exists
self.url_type = url_type
if url.is_valid() and (not url.path or url.path == "/"):
self.type = "directory"
self.url_type = "directory"
self.resp = None
self.processor_data = None

def add_self_directories(self, exists=None, url_type=None):
async def add_self_directories(self, exists=None, url_type=None):
for url in self.url.breadcrumb():
self.crawler.add_crawler_url(
await self.crawler.add_crawler_url(
CrawlerUrl(
self.crawler,
url,
Expand All @@ -66,91 +144,56 @@ def add_self_directories(self, exists=None, url_type=None):
url_type,
)
)
# TODO: si no se puede añadir porque ya se ha añadido, establecer como que ya existe si la orden es exists
# TODO: if exists=True and the urls is already processed before add it, but the already processed
# url has exists=False, then update the exists to True

async def retrieve(self):
from dirhunt.processors import (
get_processor,
GenericProcessor,
Error,
ProcessIndexOfRequest,
)

try:
await self.crawler.domain_semaphore.acquire(self.url.domain)
async with self.crawler.session.get(
self.url.url,
verify_ssl=False,
timeout=self.crawler.configuration.timeout,
allow_redirects=False,
) as resp:
self.set_type(resp.headers.get("Content-Type"))
self.flags.add(str(resp.status))
text = ""
soup = None
processor = None
if resp.status_code < 300 and self.must_be_downloaded(resp):
try:
text = resp.raw.read(MAX_RESPONSE_SIZE, decode_content=True)
except (RequestException, ReadTimeoutError, socket.timeout) as e:
self.crawler.current_processed_count += 1
self.crawler.results.put(Error(self, e))
self.close()
return self
content_type = cgi.parse_header(
resp.headers.get("Content-Type", "")
)[0]
soup = (
BeautifulSoup(text, "html.parser")
if content_type == "text/html"
else None
)
except RequestException as e:
self.crawler.current_processed_count += 1
self.crawler.results.put(Error(self, e))
self.close()
return self
finally:
self.crawler.domain_semaphore.release(self.url.domain)

if self.must_be_downloaded(resp):
processor = get_processor(resp, text, self, soup) or GenericProcessor(
resp, self
)
processor.process(text, soup)
self.flags.update(processor.flags)
if self.maybe_directory():
self.crawler.results.put(processor)
if processor is not None:
self.processor_data = processor.json()
if processor and isinstance(processor, ProcessIndexOfRequest):
self.crawler.index_of_processors.append(processor)
else:
self.crawler.current_processed_count += 1
# TODO: Podemos fijarnos en el processor.index_file. Si existe y es un 200, entonces es que existe.
if self.exists is None and resp.status_code < 404:
from processors import GenericProcessor

crawler_url_request = CrawlerUrlRequest(self)
processor = await crawler_url_request.retrieve()
if processor is not None and not isinstance(processor, GenericProcessor):
self.crawler.console.print(processor.get_text())
# if self.must_be_downloaded(response):
# processor = get_processor(response, text, self, soup) or GenericProcessor(
# response, self
# )
# processor.process(text, soup)
# self.flags.update(processor.flags)
# if self.maybe_directory():
# self.crawler.results.put(processor)
# if processor is not None:
# self.processor_data = processor.json()
# if processor and isinstance(processor, ProcessIndexOfRequest):
# self.crawler.index_of_processors.append(processor)
# else:
# self.crawler.current_processed_count += 1
if (
self.exists is None
and crawler_url_request.response is not None
and crawler_url_request.response.status < 404
):
self.exists = True
self.add_self_directories(
True if (not self.maybe_rewrite() and self.exists) else None,
"directory" if not self.maybe_rewrite() else None,
)
self.close()
return self
# TODO: uncomment
# await self.add_self_directories(
# True if (not self.maybe_rewrite() and self.exists) else None,
# "directory" if not self.maybe_rewrite() else None,
# )

def set_type(self, content_type):
from dirhunt.processors import INDEX_FILES

if not self.type and not (content_type or "").startswith("text/html"):
self.type = "asset"
if not self.url_type and not (content_type or "").startswith("text/html"):
self.url_type = "asset"
if (
not self.type
not self.url_type
and (content_type or "").startswith("text/html")
and self.url.name in INDEX_FILES
):
self.type = "document"
self.url_type = "document"

def maybe_rewrite(self):
return self.type not in ["asset", "directory"]
return self.url_type not in ["asset", "directory"]

def must_be_downloaded(self, response):
"""The file must be downloaded to obtain information."""
Expand All @@ -174,10 +217,6 @@ def weight(self):
value -= len(list(self.url.breadcrumb())) * 1.5
return value

def close(self):
self.crawler.processed[self.url.url] = self
del self.crawler.processing[self.url.url]

def json(self):
return {
"flags": self.flags,
Expand All @@ -186,3 +225,14 @@ def json(self):
"type": self.type,
"exists": self.exists,
}

def __repr__(self):
return f"<CrawlerUrl {self.url}>"

def __eq__(self, other: Any) -> bool:
if not isinstance(other, CrawlerUrl):
return False
return self.url.url == other.url.url

def __hash__(self):
return hash(self.url.url)
Loading

0 comments on commit c5ba130

Please sign in to comment.