From f116eea09a8ff951045b14e2c85eb4b81073e6a2 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Thu, 28 Sep 2023 15:20:20 +0200 Subject: [PATCH 01/13] Adjust LinkContentFetcher run method, use ByteStream --- .../components/fetchers/link_content.py | 56 ++++++++++++------- .../fetchers/test_link_content_fetcher.py | 49 +++++++--------- 2 files changed, 57 insertions(+), 48 deletions(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index 4412da0f86..74e427d251 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -1,17 +1,17 @@ import io import logging from collections import defaultdict -from datetime import datetime -from typing import Optional, Dict, List, Callable, Any, IO +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, Dict, List, Callable, Any, Tuple import requests from requests import Response from requests.exceptions import HTTPError from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryCallState -from haystack.preview import component, default_from_dict, default_to_dict from haystack import __version__ -from haystack.preview import Document +from haystack.preview import component, default_from_dict, default_to_dict +from haystack.preview.dataclasses import ByteStream logger = logging.getLogger(__name__) @@ -26,20 +26,20 @@ } -def text_content_handler(response: Response) -> Dict[str, str]: +def text_content_handler(response: Response) -> ByteStream: """ :param response: Response object from the request. :return: The extracted text. """ - return {"text": response.text} + return ByteStream.from_string(response.text) -def binary_content_handler(response: Response) -> Dict[str, IO[bytes]]: +def binary_content_handler(response: Response) -> ByteStream: """ :param response: Response object from the request. :return: The extracted binary file-like object. """ - return {"blob": io.BytesIO(response.content)} + return ByteStream(data=response.content) @component @@ -73,7 +73,7 @@ def __init__( self.timeout = timeout # register default content handlers that extract data from the response - self.handlers: Dict[str, Callable[[Response], Dict[str, Any]]] = defaultdict(lambda: text_content_handler) + self.handlers: Dict[str, Callable[[Response], ByteStream]] = defaultdict(lambda: text_content_handler) self.handlers["text/html"] = text_content_handler self.handlers["text/plain"] = text_content_handler self.handlers["application/pdf"] = binary_content_handler @@ -116,34 +116,52 @@ def from_dict(cls, data: Dict[str, Any]) -> "LinkContentFetcher": """ return default_from_dict(cls, data) - @component.output_types(documents=Optional[Document]) - def run(self, url: str): + @component.output_types(streams=Dict[str, List[io.BytesIO]]) + def run(self, urls: List[str]): + streams: Dict[str, List[ByteStream]] = defaultdict(list) + if not urls: + return {"streams": streams} + + # don't use multithreading if there's only one URL + if len(urls) == 1: + content_type, stream = self.fetch(urls[0]) + if content_type and stream: + streams[content_type].append(stream) + else: + with ThreadPoolExecutor() as executor: + results = executor.map(self.fetch, urls) + + for content_type, stream in results: + if content_type and stream: + streams[content_type].append(stream) + + return {"streams": streams} + + def fetch(self, url: str) -> Tuple[str, ByteStream]: """ Fetches content from a URL and converts it to a Document objects. If no content is extracted, an empty Document object is returned (if raise_on_failure is False). :param url: URL to fetch content from. - :param timeout: Timeout in seconds for the request. - :return: List of Document objects or an empty list if no content is extracted. + :return: A tuple containing the content type and the corresponding ByteStream """ - document_data: Dict[str, Any] = {"metadata": {"url": url, "timestamp": int(datetime.utcnow().timestamp())}} + content_type: str = "text/html" + stream: ByteStream = ByteStream(data=b"") try: response = self._get_response(url) content_type = self._get_content_type(response) - document_data["mime_type"] = content_type handler: Callable = self.handlers[content_type] - document_data.update(handler(response)) - return {"document": Document(**document_data)} - + stream = handler(response) except Exception as e: if self.raise_on_failure: raise e logger.debug("Couldn't retrieve content from %s", url) - return {"document": None} finally: self.current_user_agent_idx = 0 + return content_type, stream + def _get_content_type(self, response: Response): """ Get the content type of the response. diff --git a/test/preview/components/fetchers/test_link_content_fetcher.py b/test/preview/components/fetchers/test_link_content_fetcher.py index a8be562cd1..76d5c0e09d 100644 --- a/test/preview/components/fetchers/test_link_content_fetcher.py +++ b/test/preview/components/fetchers/test_link_content_fetcher.py @@ -1,4 +1,3 @@ -import io from unittest.mock import patch, Mock import pytest @@ -99,27 +98,25 @@ def test_from_dict(self): @pytest.mark.unit def test_run_text(self): + correct_response = b"Example test response" with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run: mock_run.get.return_value = Mock( status_code=200, text="Example test response", headers={"Content-Type": "text/plain"} ) fetcher = LinkContentFetcher() - document = fetcher.run("https://www.example.com")["document"] - assert document.text == "Example test response" - assert document.metadata["url"] == "https://www.example.com" - assert "timestamp" in document.metadata + streams = fetcher.run(urls=["https://www.example.com"])["streams"] + assert streams["text/plain"][0].data == correct_response @pytest.mark.unit def test_run_html(self): + correct_response = b"

Example test response

" with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run: mock_run.get.return_value = Mock( status_code=200, text="

Example test response

", headers={"Content-Type": "text/html"} ) fetcher = LinkContentFetcher() - document = fetcher.run("https://www.example.com")["document"] - assert document.text == "

Example test response

" - assert document.metadata["url"] == "https://www.example.com" - assert "timestamp" in document.metadata + streams = fetcher.run(urls=["https://www.example.com"])["streams"] + assert streams["text/html"][0].data == correct_response @pytest.mark.unit def test_run_binary(self, test_files_path): @@ -129,42 +126,36 @@ def test_run_binary(self, test_files_path): status_code=200, content=file_bytes, headers={"Content-Type": "application/pdf"} ) fetcher = LinkContentFetcher() - document = fetcher.run("https://www.example.com")["document"] - # casting to list to make the blobs comparable - assert list(document.blob) == list(io.BytesIO(file_bytes)) - assert document.metadata["url"] == "https://www.example.com" - assert "timestamp" in document.metadata + streams = fetcher.run(urls=["https://www.example.com"])["streams"] + assert streams["application/pdf"][0].data == file_bytes @pytest.mark.unit def test_run_bad_status_code(self): + empty_byte_stream = b"" fetcher = LinkContentFetcher(raise_on_failure=False) mock_response = Mock(status_code=403) with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run: mock_run.get.return_value = mock_response - document = fetcher.run("https://www.example.com")["document"] - assert document is None + streams = fetcher.run(urls=["https://www.example.com"])["streams"] + + # empty byte stream is returned because raise_on_failure is False + assert len(streams) == 1 + assert streams["text/html"][0].data == empty_byte_stream @pytest.mark.integration def test_link_content_fetcher_html(self): fetcher = LinkContentFetcher() - document = fetcher.run(HTML_URL)["document"] - assert document.mime_type == "text/html" - assert "Introduction to Haystack" in document.text - assert document.metadata["url"] == HTML_URL + streams = fetcher.run([HTML_URL])["streams"] + assert "Haystack" in streams["text/html"][0].data.decode("utf-8") @pytest.mark.integration def test_link_content_fetcher_text(self): fetcher = LinkContentFetcher() - document = fetcher.run(TEXT_URL)["document"] - assert document.mime_type == "text/plain" - assert "Haystack" in document.text - assert document.metadata["url"] == TEXT_URL + streams = fetcher.run([TEXT_URL])["streams"] + assert "Haystack" in streams["text/plain"][0].data.decode("utf-8") @pytest.mark.integration def test_link_content_fetcher_pdf(self): fetcher = LinkContentFetcher() - document = fetcher.run(PDF_URL)["document"] - assert document.mime_type == "application/octet-stream" # FIXME Should be "application/pdf"? - assert document.text is None - assert document.blob is not None - assert document.metadata["url"] == PDF_URL + streams = fetcher.run([PDF_URL])["streams"] + assert len(streams["application/pdf"]) == 1 or len(streams["application/octet-stream"]) == 1 From 58e87f7cbc28db376821df632e87e81d115e54ff Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Thu, 5 Oct 2023 13:27:43 +0200 Subject: [PATCH 02/13] Update pydocs --- .../components/fetchers/link_content.py | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index 74e427d251..02f228c042 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -2,12 +2,18 @@ import logging from collections import defaultdict from concurrent.futures import ThreadPoolExecutor -from typing import Optional, Dict, List, Callable, Any, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple import requests from requests import Response from requests.exceptions import HTTPError -from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryCallState +from tenacity import ( + RetryCallState, + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) from haystack import __version__ from haystack.preview import component, default_from_dict, default_to_dict @@ -45,7 +51,8 @@ def binary_content_handler(response: Response) -> ByteStream: @component class LinkContentFetcher: """ - LinkContentFetcher fetches content from a URL link and converts it to a Document object. + LinkContentFetcher is a component for fetching and extracting content from URLs. It supports handling various + content types, retries on failures, and automatic user-agent rotation for failed web requests. """ def __init__( @@ -118,6 +125,16 @@ def from_dict(cls, data: Dict[str, Any]) -> "LinkContentFetcher": @component.output_types(streams=Dict[str, List[io.BytesIO]]) def run(self, urls: List[str]): + """ + Fetches content from a list of URLs and returns a dictionary of extracted content streams. + + :param urls: A list of URLs to fetch content from. + + + :return: A dictionary containing content streams categorized by content type. + The keys are content types (e.g., "text/html", "application/pdf"), and the values are lists of + ByteStream objects representing the extracted content. + """ streams: Dict[str, List[ByteStream]] = defaultdict(list) if not urls: return {"streams": streams} @@ -139,11 +156,16 @@ def run(self, urls: List[str]): def fetch(self, url: str) -> Tuple[str, ByteStream]: """ - Fetches content from a URL and converts it to a Document objects. If no content is extracted, - an empty Document object is returned (if raise_on_failure is False). + Fetches content from a URL and returns it as a ByteStream. + + :param url: The URL to fetch content from. + :return: A tuple containing the content type and the corresponding ByteStream. + The content type is a string indicating the type of content fetched (e.g., "text/html", "application/pdf"). + The ByteStream object contains the fetched content as binary data. + + :raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will + raise an exception. Otherwise, errors are logged, and an empty ByteStream is returned. - :param url: URL to fetch content from. - :return: A tuple containing the content type and the corresponding ByteStream """ content_type: str = "text/html" stream: ByteStream = ByteStream(data=b"") @@ -165,6 +187,7 @@ def fetch(self, url: str) -> Tuple[str, ByteStream]: def _get_content_type(self, response: Response): """ Get the content type of the response. + :param response: The response object. :return: The content type of the response. """ @@ -175,6 +198,7 @@ def _switch_user_agent(self, retry_state: RetryCallState) -> None: """ Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents. Used by tenacity to retry the requests with a different user agent. + :param retry_state: The retry state (unused, required by tenacity). """ self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents) From de23db6f32820edeb91b0355b0360d0153096f10 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Thu, 5 Oct 2023 17:47:59 +0200 Subject: [PATCH 03/13] Make black happy --- haystack/preview/components/fetchers/link_content.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index 02f228c042..c5ae6f74b2 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -7,13 +7,7 @@ import requests from requests import Response from requests.exceptions import HTTPError -from tenacity import ( - RetryCallState, - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, -) +from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential from haystack import __version__ from haystack.preview import component, default_from_dict, default_to_dict From afa1972a5da68dd35581b9a83e7308e8fb8322fd Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Fri, 6 Oct 2023 14:37:50 +0200 Subject: [PATCH 04/13] Julian's PR review --- .../components/fetchers/link_content.py | 11 ++++--- ...link-content-fetcher-145915976f38e1e0.yaml | 4 +-- .../fetchers/test_link_content_fetcher.py | 31 +++++++++++++++++++ 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index c5ae6f74b2..d27f5655ef 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -1,4 +1,3 @@ -import io import logging from collections import defaultdict from concurrent.futures import ThreadPoolExecutor @@ -117,17 +116,18 @@ def from_dict(cls, data: Dict[str, Any]) -> "LinkContentFetcher": """ return default_from_dict(cls, data) - @component.output_types(streams=Dict[str, List[io.BytesIO]]) + @component.output_types(streams=Dict[str, List[ByteStream]]) def run(self, urls: List[str]): """ Fetches content from a list of URLs and returns a dictionary of extracted content streams. + For each content type there will be one outgoing edge created with value of List[ByteStream]. :param urls: A list of URLs to fetch content from. :return: A dictionary containing content streams categorized by content type. - The keys are content types (e.g., "text/html", "application/pdf"), and the values are lists of - ByteStream objects representing the extracted content. + The keys are content types (e.g., "text/html", "text/plain", "application/pdf"), + and the values are lists of ByteStream objects representing the extracted content. """ streams: Dict[str, List[ByteStream]] = defaultdict(list) if not urls: @@ -171,7 +171,8 @@ def fetch(self, url: str) -> Tuple[str, ByteStream]: except Exception as e: if self.raise_on_failure: raise e - logger.debug("Couldn't retrieve content from %s", url) + # less verbose log as this is expected to happen often (requests failing, blocked, etc.) + logger.debug("Couldn't retrieve content from %s due to %s", url, str(e)) finally: self.current_user_agent_idx = 0 diff --git a/releasenotes/notes/add-link-content-fetcher-145915976f38e1e0.yaml b/releasenotes/notes/add-link-content-fetcher-145915976f38e1e0.yaml index bd4c8610d1..f2699d4051 100644 --- a/releasenotes/notes/add-link-content-fetcher-145915976f38e1e0.yaml +++ b/releasenotes/notes/add-link-content-fetcher-145915976f38e1e0.yaml @@ -1,5 +1,5 @@ --- preview: - | - Adds LinkContentFetcher component to Haystack 2.0. LinkContentFetcher fetches content from a given URL and - converts it into a Document object, which can then be used within the Haystack 2.0 pipeline. + Introduced the LinkContentFetcher in Haystack 2.0. This component fetches content from specified + URLs and converts them into ByteStream objects for further processing in Haystack pipelines. diff --git a/test/preview/components/fetchers/test_link_content_fetcher.py b/test/preview/components/fetchers/test_link_content_fetcher.py index 76d5c0e09d..a0220bbd18 100644 --- a/test/preview/components/fetchers/test_link_content_fetcher.py +++ b/test/preview/components/fetchers/test_link_content_fetcher.py @@ -159,3 +159,34 @@ def test_link_content_fetcher_pdf(self): fetcher = LinkContentFetcher() streams = fetcher.run([PDF_URL])["streams"] assert len(streams["application/pdf"]) == 1 or len(streams["application/octet-stream"]) == 1 + + @pytest.mark.integration + def test_link_content_fetcher_multiple_different_content_types(self): + """ + This test is to ensure that the fetcher can handle a list of URLs that contain different content types. + """ + fetcher = LinkContentFetcher() + streams = fetcher.run([PDF_URL, HTML_URL])["streams"] + assert len(streams) == 2 + assert len(streams["application/pdf"]) == 1 or len(streams["application/octet-stream"]) == 1 + assert "Haystack" in streams["text/html"][0].data.decode("utf-8") + + @pytest.mark.integration + def test_link_content_fetcher_multiple_different_content_types_v2(self): + """ + This test is to ensure that the fetcher can handle a list of URLs that contain different content types, + and that we have two html streams. + """ + + fetcher = LinkContentFetcher() + streams = fetcher.run([PDF_URL, HTML_URL, "https://google.com"])["streams"] + assert len(streams) == 2 + assert len(streams["text/html"]) == 2 + assert len(streams["application/pdf"]) == 1 or len(streams["application/octet-stream"]) == 1 + assert "Haystack" in streams["text/html"][0].data.decode("utf-8") or "Haystack" in streams["text/html"][ + 1 + ].data.decode("utf-8") + + assert "Search" in streams["text/html"][1].data.decode("utf-8") or "Search" in streams["text/html"][ + 0 + ].data.decode("utf-8") From 859308e3684e2c81087f18bd6b601098fd2011c7 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Sun, 8 Oct 2023 10:48:48 +0200 Subject: [PATCH 05/13] Link content update --- haystack/preview/components/fetchers/link_content.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index d27f5655ef..c8918e1c5a 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -116,7 +116,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "LinkContentFetcher": """ return default_from_dict(cls, data) - @component.output_types(streams=Dict[str, List[ByteStream]]) + @component.output_types(streams=List[ByteStream]) def run(self, urls: List[str]): """ Fetches content from a list of URLs and returns a dictionary of extracted content streams. @@ -129,7 +129,7 @@ def run(self, urls: List[str]): The keys are content types (e.g., "text/html", "text/plain", "application/pdf"), and the values are lists of ByteStream objects representing the extracted content. """ - streams: Dict[str, List[ByteStream]] = defaultdict(list) + streams = [] if not urls: return {"streams": streams} @@ -137,14 +137,16 @@ def run(self, urls: List[str]): if len(urls) == 1: content_type, stream = self.fetch(urls[0]) if content_type and stream: - streams[content_type].append(stream) + stream.metadata["content_type"] = content_type + streams.append(stream) else: with ThreadPoolExecutor() as executor: results = executor.map(self.fetch, urls) for content_type, stream in results: if content_type and stream: - streams[content_type].append(stream) + stream.metadata["content_type"] = content_type + streams.append(stream) return {"streams": streams} From 65ef6794504a420b373b656585fe58772b5080ec Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Sun, 8 Oct 2023 12:06:54 +0200 Subject: [PATCH 06/13] Update LinkContentFetcher, streams will be routed by FileTypeRouter --- .../components/fetchers/link_content.py | 12 +++--- .../fetchers/test_link_content_fetcher.py | 37 +++++++++---------- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index c8918e1c5a..caaf6a101a 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -119,15 +119,13 @@ def from_dict(cls, data: Dict[str, Any]) -> "LinkContentFetcher": @component.output_types(streams=List[ByteStream]) def run(self, urls: List[str]): """ - Fetches content from a list of URLs and returns a dictionary of extracted content streams. - For each content type there will be one outgoing edge created with value of List[ByteStream]. + Fetches content from a list of URLs and returns a list of extracted content streams. + Each content stream is a ByteStream object containing the extracted content as binary data. + The content type of each stream is stored in the metadata of the ByteStream object under + the key "content_type". :param urls: A list of URLs to fetch content from. - - - :return: A dictionary containing content streams categorized by content type. - The keys are content types (e.g., "text/html", "text/plain", "application/pdf"), - and the values are lists of ByteStream objects representing the extracted content. + :return: A lists of ByteStream objects representing the extracted content. """ streams = [] if not urls: diff --git a/test/preview/components/fetchers/test_link_content_fetcher.py b/test/preview/components/fetchers/test_link_content_fetcher.py index a0220bbd18..886f8fdc02 100644 --- a/test/preview/components/fetchers/test_link_content_fetcher.py +++ b/test/preview/components/fetchers/test_link_content_fetcher.py @@ -105,7 +105,7 @@ def test_run_text(self): ) fetcher = LinkContentFetcher() streams = fetcher.run(urls=["https://www.example.com"])["streams"] - assert streams["text/plain"][0].data == correct_response + assert streams[0].data == correct_response @pytest.mark.unit def test_run_html(self): @@ -116,7 +116,7 @@ def test_run_html(self): ) fetcher = LinkContentFetcher() streams = fetcher.run(urls=["https://www.example.com"])["streams"] - assert streams["text/html"][0].data == correct_response + assert streams[0].data == correct_response @pytest.mark.unit def test_run_binary(self, test_files_path): @@ -127,7 +127,7 @@ def test_run_binary(self, test_files_path): ) fetcher = LinkContentFetcher() streams = fetcher.run(urls=["https://www.example.com"])["streams"] - assert streams["application/pdf"][0].data == file_bytes + assert streams[0].data == file_bytes @pytest.mark.unit def test_run_bad_status_code(self): @@ -140,25 +140,25 @@ def test_run_bad_status_code(self): # empty byte stream is returned because raise_on_failure is False assert len(streams) == 1 - assert streams["text/html"][0].data == empty_byte_stream + assert streams[0].data == empty_byte_stream @pytest.mark.integration def test_link_content_fetcher_html(self): fetcher = LinkContentFetcher() streams = fetcher.run([HTML_URL])["streams"] - assert "Haystack" in streams["text/html"][0].data.decode("utf-8") + assert "Haystack" in streams[0].data.decode("utf-8") @pytest.mark.integration def test_link_content_fetcher_text(self): fetcher = LinkContentFetcher() streams = fetcher.run([TEXT_URL])["streams"] - assert "Haystack" in streams["text/plain"][0].data.decode("utf-8") + assert "Haystack" in streams[0].data.decode("utf-8") @pytest.mark.integration def test_link_content_fetcher_pdf(self): fetcher = LinkContentFetcher() streams = fetcher.run([PDF_URL])["streams"] - assert len(streams["application/pdf"]) == 1 or len(streams["application/octet-stream"]) == 1 + assert len(streams) == 1 @pytest.mark.integration def test_link_content_fetcher_multiple_different_content_types(self): @@ -168,8 +168,11 @@ def test_link_content_fetcher_multiple_different_content_types(self): fetcher = LinkContentFetcher() streams = fetcher.run([PDF_URL, HTML_URL])["streams"] assert len(streams) == 2 - assert len(streams["application/pdf"]) == 1 or len(streams["application/octet-stream"]) == 1 - assert "Haystack" in streams["text/html"][0].data.decode("utf-8") + for stream in streams: + if stream.metadata["content_type"] == "text/html": + assert "Haystack" in stream.data.decode("utf-8") + elif stream.metadata["content_type"] == "application/pdf": + assert len(stream.data) > 0 @pytest.mark.integration def test_link_content_fetcher_multiple_different_content_types_v2(self): @@ -180,13 +183,9 @@ def test_link_content_fetcher_multiple_different_content_types_v2(self): fetcher = LinkContentFetcher() streams = fetcher.run([PDF_URL, HTML_URL, "https://google.com"])["streams"] - assert len(streams) == 2 - assert len(streams["text/html"]) == 2 - assert len(streams["application/pdf"]) == 1 or len(streams["application/octet-stream"]) == 1 - assert "Haystack" in streams["text/html"][0].data.decode("utf-8") or "Haystack" in streams["text/html"][ - 1 - ].data.decode("utf-8") - - assert "Search" in streams["text/html"][1].data.decode("utf-8") or "Search" in streams["text/html"][ - 0 - ].data.decode("utf-8") + assert len(streams) == 3 + for stream in streams: + if stream.metadata["content_type"] == "text/html": + assert "Haystack" in stream.data.decode("utf-8") or "Google" in stream.data.decode("utf-8") + elif stream.metadata["content_type"] == "application/pdf": + assert len(stream.data) > 0 From f444cea1b8f7d16b6acb72a94533b42232c1ac9e Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Sun, 8 Oct 2023 12:29:31 +0200 Subject: [PATCH 07/13] Fix mypy --- haystack/preview/components/fetchers/link_content.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index caaf6a101a..f98a130281 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -127,7 +127,7 @@ def run(self, urls: List[str]): :param urls: A list of URLs to fetch content from. :return: A lists of ByteStream objects representing the extracted content. """ - streams = [] + streams: List[ByteStream] = [] if not urls: return {"streams": streams} From 0cc9236e5bdeadb11ce83892a115fcdf037ca63e Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 9 Oct 2023 16:10:21 +0200 Subject: [PATCH 08/13] Additional unit test checks --- .../components/fetchers/test_link_content_fetcher.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test/preview/components/fetchers/test_link_content_fetcher.py b/test/preview/components/fetchers/test_link_content_fetcher.py index 886f8fdc02..53a77f611a 100644 --- a/test/preview/components/fetchers/test_link_content_fetcher.py +++ b/test/preview/components/fetchers/test_link_content_fetcher.py @@ -146,19 +146,25 @@ def test_run_bad_status_code(self): def test_link_content_fetcher_html(self): fetcher = LinkContentFetcher() streams = fetcher.run([HTML_URL])["streams"] - assert "Haystack" in streams[0].data.decode("utf-8") + first_stream = streams[0] + assert "Haystack" in first_stream.data.decode("utf-8") + assert first_stream.metadata["content_type"] == "text/html" @pytest.mark.integration def test_link_content_fetcher_text(self): fetcher = LinkContentFetcher() streams = fetcher.run([TEXT_URL])["streams"] - assert "Haystack" in streams[0].data.decode("utf-8") + first_stream = streams[0] + assert "Haystack" in first_stream.data.decode("utf-8") + assert first_stream.metadata["content_type"] == "text/plain" @pytest.mark.integration def test_link_content_fetcher_pdf(self): fetcher = LinkContentFetcher() streams = fetcher.run([PDF_URL])["streams"] assert len(streams) == 1 + first_stream = streams[0] + assert first_stream.metadata["content_type"] in ("application/octet-stream", "application/pdf") @pytest.mark.integration def test_link_content_fetcher_multiple_different_content_types(self): @@ -175,7 +181,7 @@ def test_link_content_fetcher_multiple_different_content_types(self): assert len(stream.data) > 0 @pytest.mark.integration - def test_link_content_fetcher_multiple_different_content_types_v2(self): + def test_link_content_fetcher_multiple_html_streams(self): """ This test is to ensure that the fetcher can handle a list of URLs that contain different content types, and that we have two html streams. From ebafcec82c5f32a614e79f02482cdefcc5a9c2bc Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 9 Oct 2023 16:39:34 +0200 Subject: [PATCH 09/13] Julian PR: more test assers --- .../fetchers/test_link_content_fetcher.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/test/preview/components/fetchers/test_link_content_fetcher.py b/test/preview/components/fetchers/test_link_content_fetcher.py index 53a77f611a..d16099d7b7 100644 --- a/test/preview/components/fetchers/test_link_content_fetcher.py +++ b/test/preview/components/fetchers/test_link_content_fetcher.py @@ -105,7 +105,9 @@ def test_run_text(self): ) fetcher = LinkContentFetcher() streams = fetcher.run(urls=["https://www.example.com"])["streams"] - assert streams[0].data == correct_response + first_stream = streams[0] + assert first_stream.data == correct_response + assert first_stream.metadata["content_type"] == "text/plain" @pytest.mark.unit def test_run_html(self): @@ -116,7 +118,9 @@ def test_run_html(self): ) fetcher = LinkContentFetcher() streams = fetcher.run(urls=["https://www.example.com"])["streams"] - assert streams[0].data == correct_response + first_stream = streams[0] + assert first_stream.data == correct_response + assert first_stream.metadata["content_type"] == "text/html" @pytest.mark.unit def test_run_binary(self, test_files_path): @@ -127,7 +131,9 @@ def test_run_binary(self, test_files_path): ) fetcher = LinkContentFetcher() streams = fetcher.run(urls=["https://www.example.com"])["streams"] - assert streams[0].data == file_bytes + first_stream = streams[0] + assert first_stream.data == file_bytes + assert first_stream.metadata["content_type"] == "application/pdf" @pytest.mark.unit def test_run_bad_status_code(self): @@ -140,7 +146,9 @@ def test_run_bad_status_code(self): # empty byte stream is returned because raise_on_failure is False assert len(streams) == 1 - assert streams[0].data == empty_byte_stream + first_stream = streams[0] + assert first_stream.data == empty_byte_stream + assert first_stream.metadata["content_type"] == "text/html" @pytest.mark.integration def test_link_content_fetcher_html(self): @@ -175,6 +183,7 @@ def test_link_content_fetcher_multiple_different_content_types(self): streams = fetcher.run([PDF_URL, HTML_URL])["streams"] assert len(streams) == 2 for stream in streams: + assert stream.metadata["content_type"] in ("text/html", "application/pdf", "application/octet-stream") if stream.metadata["content_type"] == "text/html": assert "Haystack" in stream.data.decode("utf-8") elif stream.metadata["content_type"] == "application/pdf": @@ -191,6 +200,7 @@ def test_link_content_fetcher_multiple_html_streams(self): streams = fetcher.run([PDF_URL, HTML_URL, "https://google.com"])["streams"] assert len(streams) == 3 for stream in streams: + assert stream.metadata["content_type"] in ("text/html", "application/pdf", "application/octet-stream") if stream.metadata["content_type"] == "text/html": assert "Haystack" in stream.data.decode("utf-8") or "Google" in stream.data.decode("utf-8") elif stream.metadata["content_type"] == "application/pdf": From dd56a6404d986b67c6fc4f678756d88b675dd684 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 9 Oct 2023 16:57:03 +0200 Subject: [PATCH 10/13] Add url key/value to ByteStream metadata --- .../components/fetchers/link_content.py | 21 ++++++++++--------- .../fetchers/test_link_content_fetcher.py | 3 +++ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index f98a130281..ffa0981e5d 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -122,7 +122,7 @@ def run(self, urls: List[str]): Fetches content from a list of URLs and returns a list of extracted content streams. Each content stream is a ByteStream object containing the extracted content as binary data. The content type of each stream is stored in the metadata of the ByteStream object under - the key "content_type". + the key "content_type". The URL of the fetched content is stored under the key "url". :param urls: A list of URLs to fetch content from. :return: A lists of ByteStream objects representing the extracted content. @@ -133,27 +133,28 @@ def run(self, urls: List[str]): # don't use multithreading if there's only one URL if len(urls) == 1: - content_type, stream = self.fetch(urls[0]) - if content_type and stream: - stream.metadata["content_type"] = content_type + stream_metadata, stream = self.fetch(urls[0]) + if stream_metadata and stream: + stream.metadata.update(stream_metadata) streams.append(stream) else: with ThreadPoolExecutor() as executor: results = executor.map(self.fetch, urls) - for content_type, stream in results: - if content_type and stream: - stream.metadata["content_type"] = content_type + for stream_metadata, stream in results: + if stream_metadata and stream: + stream.metadata.update(stream_metadata) streams.append(stream) return {"streams": streams} - def fetch(self, url: str) -> Tuple[str, ByteStream]: + def fetch(self, url: str) -> Tuple[Dict[str, str], ByteStream]: """ Fetches content from a URL and returns it as a ByteStream. :param url: The URL to fetch content from. - :return: A tuple containing the content type and the corresponding ByteStream. + :return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream. + ByteStream metadata contains the URL and the content type of the fetched content. The content type is a string indicating the type of content fetched (e.g., "text/html", "application/pdf"). The ByteStream object contains the fetched content as binary data. @@ -177,7 +178,7 @@ def fetch(self, url: str) -> Tuple[str, ByteStream]: finally: self.current_user_agent_idx = 0 - return content_type, stream + return {"content_type": content_type, "url": url}, stream def _get_content_type(self, response: Response): """ diff --git a/test/preview/components/fetchers/test_link_content_fetcher.py b/test/preview/components/fetchers/test_link_content_fetcher.py index d16099d7b7..70c5dc314e 100644 --- a/test/preview/components/fetchers/test_link_content_fetcher.py +++ b/test/preview/components/fetchers/test_link_content_fetcher.py @@ -157,6 +157,7 @@ def test_link_content_fetcher_html(self): first_stream = streams[0] assert "Haystack" in first_stream.data.decode("utf-8") assert first_stream.metadata["content_type"] == "text/html" + assert "url" in first_stream.metadata and first_stream.metadata["url"] == HTML_URL @pytest.mark.integration def test_link_content_fetcher_text(self): @@ -165,6 +166,7 @@ def test_link_content_fetcher_text(self): first_stream = streams[0] assert "Haystack" in first_stream.data.decode("utf-8") assert first_stream.metadata["content_type"] == "text/plain" + assert "url" in first_stream.metadata and first_stream.metadata["url"] == TEXT_URL @pytest.mark.integration def test_link_content_fetcher_pdf(self): @@ -173,6 +175,7 @@ def test_link_content_fetcher_pdf(self): assert len(streams) == 1 first_stream = streams[0] assert first_stream.metadata["content_type"] in ("application/octet-stream", "application/pdf") + assert "url" in first_stream.metadata and first_stream.metadata["url"] == PDF_URL @pytest.mark.integration def test_link_content_fetcher_multiple_different_content_types(self): From 9a69cc863b450d165d52db41cd86470577dc5dfe Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 10 Oct 2023 10:36:17 +0200 Subject: [PATCH 11/13] Properly handle a mix of failed and successful requests --- .../components/fetchers/link_content.py | 32 ++++++++++++++++--- .../fetchers/test_link_content_fetcher.py | 24 ++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index ffa0981e5d..c430a2acaf 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -126,6 +126,11 @@ def run(self, urls: List[str]): :param urls: A list of URLs to fetch content from. :return: A lists of ByteStream objects representing the extracted content. + + :raises: If a list of URLs has a single URL and an error occurs during content retrieval + while `raise_on_failure` is set to True, this method will raise an exception. Otherwise, in all other + cases, retrieval errors are simply logged, and a list of successfully retrieved ByteStream objects is + returned. """ streams: List[ByteStream] = [] if not urls: @@ -134,15 +139,15 @@ def run(self, urls: List[str]): # don't use multithreading if there's only one URL if len(urls) == 1: stream_metadata, stream = self.fetch(urls[0]) - if stream_metadata and stream: + if stream_metadata is not None and stream is not None: stream.metadata.update(stream_metadata) streams.append(stream) else: with ThreadPoolExecutor() as executor: - results = executor.map(self.fetch, urls) + results = executor.map(self._fetch_wrapper, urls) - for stream_metadata, stream in results: - if stream_metadata and stream: + for stream_metadata, stream in results: # type: ignore + if stream_metadata is not None and stream is not None: stream.metadata.update(stream_metadata) streams.append(stream) @@ -159,7 +164,7 @@ def fetch(self, url: str) -> Tuple[Dict[str, str], ByteStream]: The ByteStream object contains the fetched content as binary data. :raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will - raise an exception. Otherwise, errors are logged, and an empty ByteStream is returned. + raise an exception. Otherwise, all fetching errors are logged, and an empty ByteStream is returned. """ content_type: str = "text/html" @@ -180,6 +185,23 @@ def fetch(self, url: str) -> Tuple[Dict[str, str], ByteStream]: return {"content_type": content_type, "url": url}, stream + def _fetch_wrapper(self, url: str) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]: + """ + If `raise_on_failure` is set to True, this method will wrap the fetch method and catch any exceptions. + Otherwise, it will simply call the fetch method. + :param url: The URL to fetch content from. + :return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream. + + """ + if self.raise_on_failure: + try: + return self.fetch(url) + except Exception as e: + logger.warning("Error fetching %s: %s", url, str(e)) + return None, None + else: + return self.fetch(url) + def _get_content_type(self, response: Response): """ Get the content type of the response. diff --git a/test/preview/components/fetchers/test_link_content_fetcher.py b/test/preview/components/fetchers/test_link_content_fetcher.py index 70c5dc314e..91b88dee69 100644 --- a/test/preview/components/fetchers/test_link_content_fetcher.py +++ b/test/preview/components/fetchers/test_link_content_fetcher.py @@ -1,6 +1,7 @@ from unittest.mock import patch, Mock import pytest +import requests from haystack.preview.components.fetchers.link_content import ( LinkContentFetcher, @@ -208,3 +209,26 @@ def test_link_content_fetcher_multiple_html_streams(self): assert "Haystack" in stream.data.decode("utf-8") or "Google" in stream.data.decode("utf-8") elif stream.metadata["content_type"] == "application/pdf": assert len(stream.data) > 0 + + @pytest.mark.integration + def test_mix_of_good_and_failed_requests(self): + """ + This test is to ensure that the fetcher can handle a list of URLs that contain URLs that fail to be fetched. + In such a case, the fetcher should return the content of the URLs that were successfully fetched and not raise + an exception. + """ + fetcher = LinkContentFetcher() + result = fetcher.run(["https://non_existent_website_dot.com/", "https://www.google.com/"]) + assert len(result["streams"]) == 1 + first_stream = result["streams"][0] + assert first_stream.metadata["content_type"] == "text/html" + + @pytest.mark.integration + def test_bad_request_exception_raised(self): + """ + This test is to ensure that the fetcher raises an exception when a single bad request is made and it is configured to + do so. + """ + fetcher = LinkContentFetcher() + with pytest.raises(requests.exceptions.ConnectionError): + fetcher.run(["https://non_existent_website_dot.com/"]) From 5bbdc920eef81854b5d0226122fab57066b3042d Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 10 Oct 2023 16:48:29 +0200 Subject: [PATCH 12/13] Update wrapper name, pydoc messaging --- .../components/fetchers/link_content.py | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index c430a2acaf..11b40dbdb5 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -56,15 +56,13 @@ def __init__( timeout: int = 3, ): """ - Creates a LinkContentFetcher instance. + Initializes a LinkContentFetcher instance. - :param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs - during content extraction. If False, the error is simply logged and the program continues. - Defaults to False. - :param user_agents: A list of user agents to use when fetching content. Defaults to None, in which case a - default user agent is used. - :param retry_attempts: The number of times to retry fetching content. Defaults to 2. - :param timeout: The timeout in seconds for the request. Defaults to 3. + :param raise_on_failure: If True, raises an exception on failure when fetching a single URL. + For multiple URLs, errors are logged and successful fetches are returned. Default is True. + :param user_agents: A list of user agents for fetching content. If None, a default user agent is used. + :param retry_attempts: Number of retry attempts for fetching content. Default is 2. + :param timeout: Timeout in seconds for the request. Default is 3. """ self.raise_on_failure = raise_on_failure self.user_agents = user_agents or [DEFAULT_USER_AGENT] @@ -127,10 +125,9 @@ def run(self, urls: List[str]): :param urls: A list of URLs to fetch content from. :return: A lists of ByteStream objects representing the extracted content. - :raises: If a list of URLs has a single URL and an error occurs during content retrieval - while `raise_on_failure` is set to True, this method will raise an exception. Otherwise, in all other - cases, retrieval errors are simply logged, and a list of successfully retrieved ByteStream objects is - returned. + :raises: If the provided list of URLs contains only a single URL, and `raise_on_failure` is set to True, + an exception will be raised in case of an error during content retrieval. In all other scenarios, any + retrieval errors are logged, and a list of successfully retrieved ByteStream objects is returned. """ streams: List[ByteStream] = [] if not urls: @@ -139,12 +136,11 @@ def run(self, urls: List[str]): # don't use multithreading if there's only one URL if len(urls) == 1: stream_metadata, stream = self.fetch(urls[0]) - if stream_metadata is not None and stream is not None: - stream.metadata.update(stream_metadata) - streams.append(stream) + stream.metadata.update(stream_metadata) + streams.append(stream) else: with ThreadPoolExecutor() as executor: - results = executor.map(self._fetch_wrapper, urls) + results = executor.map(self._fetch_with_exception_suppression, urls) for stream_metadata, stream in results: # type: ignore if stream_metadata is not None and stream is not None: @@ -185,8 +181,10 @@ def fetch(self, url: str) -> Tuple[Dict[str, str], ByteStream]: return {"content_type": content_type, "url": url}, stream - def _fetch_wrapper(self, url: str) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]: + def _fetch_with_exception_suppression(self, url: str) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]: """ + Fetches content from a URL and returns it as a ByteStream. + If `raise_on_failure` is set to True, this method will wrap the fetch method and catch any exceptions. Otherwise, it will simply call the fetch method. :param url: The URL to fetch content from. @@ -198,7 +196,7 @@ def _fetch_wrapper(self, url: str) -> Tuple[Optional[Dict[str, str]], Optional[B return self.fetch(url) except Exception as e: logger.warning("Error fetching %s: %s", url, str(e)) - return None, None + return {"content_type": None, "url": url}, None else: return self.fetch(url) From ac5ec4412160430dc622225996cfd0f4a07183fc Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 10 Oct 2023 17:02:58 +0200 Subject: [PATCH 13/13] Fix mypy --- haystack/preview/components/fetchers/link_content.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index 11b40dbdb5..62368aff11 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -196,7 +196,7 @@ def _fetch_with_exception_suppression(self, url: str) -> Tuple[Optional[Dict[str return self.fetch(url) except Exception as e: logger.warning("Error fetching %s: %s", url, str(e)) - return {"content_type": None, "url": url}, None + return {"content_type": "Unknown", "url": url}, None else: return self.fetch(url)