Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datadog_checks_base/changelog.d/22704.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add HTTPXWrapper and HTTPXResponseAdapter as the httpx-backed HTTP client implementation.
13 changes: 11 additions & 2 deletions datadog_checks_base/datadog_checks/base/checks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,21 @@ def http(self) -> HTTPClientProtocol:
Provides logic to yield consistent network behavior based on user configuration.

Only new checks or checks on Agent 6.13+ can and should use this for HTTP requests.

Set ``use_httpx: true`` in the instance config to use an httpx-backed client.
"""
if not hasattr(self, '_http'):
# See Performance Optimizations in this package's README.md.
from datadog_checks.base.utils.http import RequestsWrapper
if is_affirmative((self.instance or {}).get('use_httpx', False)):
import httpx

from datadog_checks.base.utils.http_httpx import HTTPXWrapper

self._http = HTTPXWrapper(httpx.Client())
else:
from datadog_checks.base.utils.http import RequestsWrapper

self._http = RequestsWrapper(self.instance or {}, self.init_config, self.HTTP_CONFIG_REMAPPER, self.log)
self._http = RequestsWrapper(self.instance or {}, self.init_config, self.HTTP_CONFIG_REMAPPER, self.log)

return self._http

Expand Down
105 changes: 105 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/http_httpx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from typing import Any, Iterator

import httpx

from .http_exceptions import (
HTTPConnectionError,
HTTPError,
HTTPRequestError,
HTTPStatusError,
HTTPTimeoutError,
)


def _translate_httpx_error(e: httpx.HTTPError) -> HTTPError:
if isinstance(e, httpx.HTTPStatusError):
return HTTPStatusError(str(e), response=e.response, request=e.request)
if isinstance(e, httpx.TimeoutException):
return HTTPTimeoutError(str(e), request=e.request)
if isinstance(e, httpx.ConnectError):
return HTTPConnectionError(str(e), request=e.request)
if isinstance(e, httpx.RequestError):
return HTTPRequestError(str(e), request=e.request)
return HTTPError(str(e))


class HTTPXResponseAdapter:
def __init__(self, response: httpx.Response) -> None:
self._response = response

def __getattr__(self, name: str) -> Any:
return getattr(self._response, name)

def iter_content(self, chunk_size: int | None = None, decode_unicode: bool = False) -> Iterator[bytes | str]:
if decode_unicode:
return self._response.iter_text(chunk_size=chunk_size)
return self._response.iter_bytes(chunk_size=chunk_size)

def iter_lines(
self,
chunk_size: int | None = None,
decode_unicode: bool = False,
delimiter: bytes | str | None = None,
) -> Iterator[bytes | str]:
# httpx.iter_lines() yields str; encode to bytes unless decode_unicode is requested.
# Note: httpx normalizes \r\n to \n, which differs from requests behavior.
for line in self._response.iter_lines():
yield line if decode_unicode else line.encode()

def raise_for_status(self) -> None:
try:
self._response.raise_for_status()
except httpx.HTTPStatusError as e:
raise HTTPStatusError(str(e), response=e.response, request=e.request) from e

def __enter__(self) -> HTTPXResponseAdapter:
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self._response.close()


class HTTPXWrapper:
def __init__(self, client: httpx.Client) -> None:
self._client = client

def __del__(self) -> None: # no cov
try:
self._client.close()
except AttributeError:
pass

def _request(self, method: str, url: str, **options: Any) -> HTTPXResponseAdapter:
try:
return HTTPXResponseAdapter(self._client.request(method, url, **options))
except httpx.HTTPError as e:
raise _translate_httpx_error(e) from e
except httpx.InvalidURL as e:
# InvalidURL is not a subclass of httpx.HTTPError; catch it separately.
raise HTTPRequestError(str(e)) from e

def get(self, url: str, **options: Any) -> HTTPXResponseAdapter:
return self._request("GET", url, **options)

def post(self, url: str, **options: Any) -> HTTPXResponseAdapter:
return self._request("POST", url, **options)

def head(self, url: str, **options: Any) -> HTTPXResponseAdapter:
return self._request("HEAD", url, **options)

def put(self, url: str, **options: Any) -> HTTPXResponseAdapter:
return self._request("PUT", url, **options)

def patch(self, url: str, **options: Any) -> HTTPXResponseAdapter:
return self._request("PATCH", url, **options)

def delete(self, url: str, **options: Any) -> HTTPXResponseAdapter:
return self._request("DELETE", url, **options)

def options_method(self, url: str, **options: Any) -> HTTPXResponseAdapter:
return self._request("OPTIONS", url, **options)
1 change: 1 addition & 0 deletions datadog_checks_base/hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ python = ["3.13"]
features = ["db", "deps", "http", "json", "kube"]
dependencies = [
"datadog_checks_tests_helper @ {root:uri}/../datadog_checks_tests_helper",
"httpx==0.28.1",
]
e2e-env = false

Expand Down
16 changes: 16 additions & 0 deletions datadog_checks_base/tests/base/checks/test_agent_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -1381,3 +1381,19 @@ def test_profile_memory_when_enabled(should_profile_value, expected_calls):

assert check.should_profile_memory.call_count == 1
assert check.profile_memory.call_count == expected_calls


def test_http_uses_requests_by_default():
from datadog_checks.base.utils.http import RequestsWrapper

check = AgentCheck('test', {}, [{}])

assert isinstance(check.http, RequestsWrapper)


def test_http_uses_httpx_when_flag_set():
from datadog_checks.base.utils.http_httpx import HTTPXWrapper

check = AgentCheck('test', {}, [{'use_httpx': True}])

assert isinstance(check.http, HTTPXWrapper)
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
#
# Behavior equivalence: verify that RequestsWrapper and HTTPXWrapper produce
# identical results for the same HTTP interactions. Any test that fails for only
# one backend reveals a behavioral gap to fix before promoting that backend.
from unittest.mock import patch

import httpx
import pytest
import requests

from datadog_checks.base.utils.http import RequestsWrapper
from datadog_checks.base.utils.http_httpx import HTTPXWrapper

_BODY = b"line one\nline two\nline three"
_URL = "http://test.example"


def _requests_response(body: bytes, status: int = 200) -> requests.Response:
r = requests.Response()
r._content = body
r._content_consumed = True
r.status_code = status
r.encoding = "utf-8"
return r


def _httpx_transport(body: bytes, status: int = 200) -> httpx.MockTransport:
def handler(request):
return httpx.Response(status, content=body)

return httpx.MockTransport(handler=handler)


@pytest.fixture(params=["requests_backend", "httpx_backend"])
def http_client(request):
if request.param == "requests_backend":
with patch.object(requests.Session, "get", return_value=_requests_response(_BODY)):
yield RequestsWrapper({}, {})
else:
yield HTTPXWrapper(httpx.Client(transport=_httpx_transport(_BODY)))


def test_status_code(http_client):
assert http_client.get(_URL).status_code == 200


def test_body_content(http_client):
assert http_client.get(_URL).content == _BODY


def test_iter_lines_decodes_to_str(http_client):
response = http_client.get(_URL)
assert list(response.iter_lines(decode_unicode=True)) == ["line one", "line two", "line three"]


def test_iter_content_yields_all_bytes(http_client):
response = http_client.get(_URL)
assert b"".join(response.iter_content()) == _BODY


def test_context_manager(http_client):
with http_client.get(_URL) as response:
assert response.status_code == 200
125 changes: 125 additions & 0 deletions datadog_checks_base/tests/base/utils/http/test_http_httpx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from unittest.mock import MagicMock

import httpx
import pytest

from datadog_checks.base.utils.http_exceptions import (
HTTPConnectionError,
HTTPRequestError,
HTTPStatusError,
HTTPTimeoutError,
)
from datadog_checks.base.utils.http_httpx import HTTPXResponseAdapter, HTTPXWrapper


class TestHTTPXResponseAdapter:
def test_iter_content_yields_bytes_by_default(self):
response = MagicMock(spec=httpx.Response)
response.iter_bytes.return_value = iter([b"chunk1", b"chunk2"])
adapter = HTTPXResponseAdapter(response)

assert list(adapter.iter_content()) == [b"chunk1", b"chunk2"]

def test_iter_content_yields_str_when_decode_unicode(self):
response = MagicMock(spec=httpx.Response)
response.iter_text.return_value = iter(["chunk1", "chunk2"])
adapter = HTTPXResponseAdapter(response)

assert list(adapter.iter_content(decode_unicode=True)) == ["chunk1", "chunk2"]

def test_iter_lines_yields_bytes_by_default(self):
response = MagicMock(spec=httpx.Response)
response.iter_lines.return_value = iter(["line1", "line2"])
adapter = HTTPXResponseAdapter(response)

assert list(adapter.iter_lines()) == [b"line1", b"line2"]

def test_iter_lines_yields_str_when_decode_unicode(self):
response = MagicMock(spec=httpx.Response)
response.iter_lines.return_value = iter(["line1", "line2"])
adapter = HTTPXResponseAdapter(response)

assert list(adapter.iter_lines(decode_unicode=True)) == ["line1", "line2"]

def test_raise_for_status_translates_http_status_error(self):
response = MagicMock(spec=httpx.Response)
response.raise_for_status.side_effect = httpx.HTTPStatusError(
"404 Not Found", request=MagicMock(), response=MagicMock()
)
adapter = HTTPXResponseAdapter(response)

with pytest.raises(HTTPStatusError):
adapter.raise_for_status()

def test_context_manager_closes_response_on_exit(self):
response = MagicMock(spec=httpx.Response)
adapter = HTTPXResponseAdapter(response)

with adapter:
pass

response.close.assert_called_once()

def test_response_attributes_accessible(self):
response = MagicMock(spec=httpx.Response)
response.status_code = 200
adapter = HTTPXResponseAdapter(response)

assert adapter.status_code == 200


class TestHTTPXWrapper:
def test_successful_request_returns_response_adapter(self):
client = MagicMock(spec=httpx.Client)
client.request.return_value = MagicMock(spec=httpx.Response)
wrapper = HTTPXWrapper(client)

result = wrapper.get("http://example.com")

assert isinstance(result, HTTPXResponseAdapter)

def test_timeout_raises_http_timeout_error(self):
client = MagicMock(spec=httpx.Client)
request = httpx.Request("GET", "http://example.com")
client.request.side_effect = httpx.TimeoutException("timed out", request=request)
wrapper = HTTPXWrapper(client)

with pytest.raises(HTTPTimeoutError):
wrapper.get("http://example.com")

def test_connect_error_raises_http_connection_error(self):
client = MagicMock(spec=httpx.Client)
request = httpx.Request("GET", "http://example.com")
client.request.side_effect = httpx.ConnectError("connection refused", request=request)
wrapper = HTTPXWrapper(client)

with pytest.raises(HTTPConnectionError):
wrapper.get("http://example.com")

def test_invalid_url_raises_http_request_error(self):
client = MagicMock(spec=httpx.Client)
client.request.side_effect = httpx.InvalidURL("Invalid URL")
wrapper = HTTPXWrapper(client)

with pytest.raises(HTTPRequestError):
wrapper.get("not a url")

def test_all_http_methods_delegate_to_client(self):
client = MagicMock(spec=httpx.Client)
client.request.return_value = MagicMock(spec=httpx.Response)
wrapper = HTTPXWrapper(client)

url = "http://example.com"
wrapper.get(url)
wrapper.post(url)
wrapper.head(url)
wrapper.put(url)
wrapper.patch(url)
wrapper.delete(url)
wrapper.options_method(url)

methods = [call.args[0] for call in client.request.call_args_list]
assert methods == ["GET", "POST", "HEAD", "PUT", "PATCH", "DELETE", "OPTIONS"]
12 changes: 12 additions & 0 deletions nginx/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ def instance_plus_v7():
return base_instance


@pytest.fixture(params=['requests', 'httpx'])
def instance_plus_v7_backends(request):
base_instance = copy.deepcopy(INSTANCE)
base_instance['nginx_status_url'] = 'http://localhost:8080/api'
base_instance['use_plus_api'] = True
base_instance['use_plus_api_stream'] = True
base_instance['plus_api_version'] = 7
if request.param == 'httpx':
base_instance['use_httpx'] = True
return base_instance


@pytest.fixture(scope='session')
def instance_plus_v7_no_stream():
base_instance = copy.deepcopy(INSTANCE)
Expand Down
8 changes: 4 additions & 4 deletions nginx/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ def test_no_version(check, instance, caplog):
),
],
)
def test_get_enabled_endpoints(check, instance_plus_v7, version, use_stream, expected_endpoints, caplog):
def test_get_enabled_endpoints(check, instance_plus_v7_backends, version, use_stream, expected_endpoints, caplog):
caplog.clear()
caplog.set_level(logging.DEBUG)
instance = deepcopy(instance_plus_v7)
instance = deepcopy(instance_plus_v7_backends)
instance['use_plus_api_stream'] = use_stream
instance['plus_api_version'] = version
check = check(instance)
Expand All @@ -193,8 +193,8 @@ def test_get_enabled_endpoints(check, instance_plus_v7, version, use_stream, exp


@pytest.mark.parametrize("only_query_enabled_endpoints", [(True), (False)])
def test_only_query_enabled_endpoints(check, dd_run_check, instance_plus_v7, only_query_enabled_endpoints):
instance = deepcopy(instance_plus_v7)
def test_only_query_enabled_endpoints(check, dd_run_check, instance_plus_v7_backends, only_query_enabled_endpoints):
instance = deepcopy(instance_plus_v7_backends)
instance['only_query_enabled_endpoints'] = only_query_enabled_endpoints
check = check(instance)
check._perform_request = mock.MagicMock(side_effect=mocked_perform_request)
Expand Down
Loading