Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adjust LinkContentFetcher run method, use ByteStream #5972

Merged
merged 13 commits into from
Oct 10, 2023
84 changes: 60 additions & 24 deletions haystack/preview/components/fetchers/link_content.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import io
import logging
from collections import defaultdict
from datetime import datetime
from typing import Optional, Dict, List, Callable, Any, IO
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional, Tuple

import requests
from requests import Response
from requests.exceptions import HTTPError
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryCallState
from haystack.preview import component, default_from_dict, default_to_dict
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential

from haystack import __version__
from haystack.preview import Document
from haystack.preview import component, default_from_dict, default_to_dict
from haystack.preview.dataclasses import ByteStream

logger = logging.getLogger(__name__)

Expand All @@ -26,26 +26,27 @@
}


def text_content_handler(response: Response) -> Dict[str, str]:
def text_content_handler(response: Response) -> ByteStream:
"""
:param response: Response object from the request.
:return: The extracted text.
"""
return {"text": response.text}
return ByteStream.from_string(response.text)


def binary_content_handler(response: Response) -> Dict[str, IO[bytes]]:
def binary_content_handler(response: Response) -> ByteStream:
"""
:param response: Response object from the request.
:return: The extracted binary file-like object.
"""
return {"blob": io.BytesIO(response.content)}
return ByteStream(data=response.content)


@component
class LinkContentFetcher:
"""
LinkContentFetcher fetches content from a URL link and converts it to a Document object.
LinkContentFetcher is a component for fetching and extracting content from URLs. It supports handling various
content types, retries on failures, and automatic user-agent rotation for failed web requests.
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(
Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__(
self.timeout = timeout

# register default content handlers that extract data from the response
self.handlers: Dict[str, Callable[[Response], Dict[str, Any]]] = defaultdict(lambda: text_content_handler)
self.handlers: Dict[str, Callable[[Response], ByteStream]] = defaultdict(lambda: text_content_handler)
self.handlers["text/html"] = text_content_handler
self.handlers["text/plain"] = text_content_handler
self.handlers["application/pdf"] = binary_content_handler
Expand Down Expand Up @@ -116,37 +117,71 @@ def from_dict(cls, data: Dict[str, Any]) -> "LinkContentFetcher":
"""
return default_from_dict(cls, data)

@component.output_types(documents=Optional[Document])
def run(self, url: str):
@component.output_types(streams=Dict[str, List[io.BytesIO]])
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
def run(self, urls: List[str]):
"""
Fetches content from a URL and converts it to a Document objects. If no content is extracted,
an empty Document object is returned (if raise_on_failure is False).
Fetches content from a list of URLs and returns a dictionary of extracted content streams.

:param urls: A list of URLs to fetch content from.


:param url: URL to fetch content from.
:param timeout: Timeout in seconds for the request.
:return: List of Document objects or an empty list if no content is extracted.
:return: A dictionary containing content streams categorized by content type.
The keys are content types (e.g., "text/html", "application/pdf"), and the values are lists of
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
ByteStream objects representing the extracted content.
"""
document_data: Dict[str, Any] = {"metadata": {"url": url, "timestamp": int(datetime.utcnow().timestamp())}}
streams: Dict[str, List[ByteStream]] = defaultdict(list)
if not urls:
return {"streams": streams}

# don't use multithreading if there's only one URL
if len(urls) == 1:
content_type, stream = self.fetch(urls[0])
if content_type and stream:
streams[content_type].append(stream)
else:
with ThreadPoolExecutor() as executor:
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
results = executor.map(self.fetch, urls)

for content_type, stream in results:
if content_type and stream:
streams[content_type].append(stream)

return {"streams": streams}

def fetch(self, url: str) -> Tuple[str, ByteStream]:
"""
Fetches content from a URL and returns it as a ByteStream.

:param url: The URL to fetch content from.
:return: A tuple containing the content type and the corresponding ByteStream.
The content type is a string indicating the type of content fetched (e.g., "text/html", "application/pdf").
The ByteStream object contains the fetched content as binary data.

:raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will
raise an exception. Otherwise, errors are logged, and an empty ByteStream is returned.

"""
content_type: str = "text/html"
stream: ByteStream = ByteStream(data=b"")
try:
response = self._get_response(url)
content_type = self._get_content_type(response)
document_data["mime_type"] = content_type
handler: Callable = self.handlers[content_type]
document_data.update(handler(response))
return {"document": Document(**document_data)}

stream = handler(response)
except Exception as e:
if self.raise_on_failure:
raise e
logger.debug("Couldn't retrieve content from %s", url)
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
return {"document": None}

finally:
self.current_user_agent_idx = 0

return content_type, stream

def _get_content_type(self, response: Response):
"""
Get the content type of the response.

:param response: The response object.
:return: The content type of the response.
"""
Expand All @@ -157,6 +192,7 @@ def _switch_user_agent(self, retry_state: RetryCallState) -> None:
"""
Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents.
Used by tenacity to retry the requests with a different user agent.

:param retry_state: The retry state (unused, required by tenacity).
"""
self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents)
Expand Down
49 changes: 20 additions & 29 deletions test/preview/components/fetchers/test_link_content_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import io
from unittest.mock import patch, Mock

import pytest
Expand Down Expand Up @@ -99,27 +98,25 @@ def test_from_dict(self):

@pytest.mark.unit
def test_run_text(self):
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
correct_response = b"Example test response"
with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = Mock(
status_code=200, text="Example test response", headers={"Content-Type": "text/plain"}
)
fetcher = LinkContentFetcher()
document = fetcher.run("https://www.example.com")["document"]
assert document.text == "Example test response"
assert document.metadata["url"] == "https://www.example.com"
assert "timestamp" in document.metadata
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
assert streams["text/plain"][0].data == correct_response

@pytest.mark.unit
def test_run_html(self):
correct_response = b"<h1>Example test response</h1>"
with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = Mock(
status_code=200, text="<h1>Example test response</h1>", headers={"Content-Type": "text/html"}
)
fetcher = LinkContentFetcher()
document = fetcher.run("https://www.example.com")["document"]
assert document.text == "<h1>Example test response</h1>"
assert document.metadata["url"] == "https://www.example.com"
assert "timestamp" in document.metadata
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
assert streams["text/html"][0].data == correct_response

@pytest.mark.unit
def test_run_binary(self, test_files_path):
Expand All @@ -129,42 +126,36 @@ def test_run_binary(self, test_files_path):
status_code=200, content=file_bytes, headers={"Content-Type": "application/pdf"}
)
fetcher = LinkContentFetcher()
document = fetcher.run("https://www.example.com")["document"]
# casting to list to make the blobs comparable
assert list(document.blob) == list(io.BytesIO(file_bytes))
assert document.metadata["url"] == "https://www.example.com"
assert "timestamp" in document.metadata
streams = fetcher.run(urls=["https://www.example.com"])["streams"]
assert streams["application/pdf"][0].data == file_bytes

@pytest.mark.unit
def test_run_bad_status_code(self):
empty_byte_stream = b""
fetcher = LinkContentFetcher(raise_on_failure=False)
mock_response = Mock(status_code=403)
with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run:
mock_run.get.return_value = mock_response
document = fetcher.run("https://www.example.com")["document"]
assert document is None
streams = fetcher.run(urls=["https://www.example.com"])["streams"]

# empty byte stream is returned because raise_on_failure is False
assert len(streams) == 1
assert streams["text/html"][0].data == empty_byte_stream

@pytest.mark.integration
def test_link_content_fetcher_html(self):
fetcher = LinkContentFetcher()
document = fetcher.run(HTML_URL)["document"]
assert document.mime_type == "text/html"
assert "Introduction to Haystack" in document.text
assert document.metadata["url"] == HTML_URL
streams = fetcher.run([HTML_URL])["streams"]
assert "Haystack" in streams["text/html"][0].data.decode("utf-8")

@pytest.mark.integration
def test_link_content_fetcher_text(self):
fetcher = LinkContentFetcher()
document = fetcher.run(TEXT_URL)["document"]
assert document.mime_type == "text/plain"
assert "Haystack" in document.text
assert document.metadata["url"] == TEXT_URL
streams = fetcher.run([TEXT_URL])["streams"]
assert "Haystack" in streams["text/plain"][0].data.decode("utf-8")

@pytest.mark.integration
def test_link_content_fetcher_pdf(self):
fetcher = LinkContentFetcher()
document = fetcher.run(PDF_URL)["document"]
assert document.mime_type == "application/octet-stream" # FIXME Should be "application/pdf"?
assert document.text is None
assert document.blob is not None
assert document.metadata["url"] == PDF_URL
streams = fetcher.run([PDF_URL])["streams"]
assert len(streams["application/pdf"]) == 1 or len(streams["application/octet-stream"]) == 1
julian-risch marked this conversation as resolved.
Show resolved Hide resolved