Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
294 changes: 294 additions & 0 deletions keep/providers/nagios_provider/nagios_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
"""
NagiosProvider is a class that provides methods to interact with the Nagios XI API.
"""

import dataclasses
import datetime
from typing import Optional

import pydantic
import requests

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
from keep.exceptions.provider_exception import ProviderException
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig, ProviderScope


@pydantic.dataclasses.dataclass
class NagiosProviderAuthConfig:
"""
NagiosProviderAuthConfig holds the authentication information for the NagiosProvider.
"""

host_url: pydantic.AnyHttpUrl = dataclasses.field(
metadata={
"required": True,
"description": "Nagios XI Host URL (e.g., https://nagios.example.com)",
"sensitive": False,
"validation": "any_http_url",
},
)

api_key: str = dataclasses.field(
metadata={
"required": True,
"description": "Nagios XI API Key",
"sensitive": True,
},
default=None,
)


class NagiosProvider(BaseProvider):
"""
NagiosProvider allows Keep to integrate with Nagios XI for monitoring alerts.

Nagios is one of the most widely used open-source monitoring systems,
providing comprehensive monitoring of hosts, services, and network infrastructure.
"""

PROVIDER_DISPLAY_NAME = "Nagios"
PROVIDER_TAGS = ["alert"]
PROVIDER_CATEGORY = ["Monitoring"]
PROVIDER_SCOPES = [
ProviderScope(name="authenticated", description="User is authenticated"),
]

# Nagios host states: 0=UP, 1=DOWN, 2=UNREACHABLE
HOST_STATUS_MAP = {
0: AlertStatus.RESOLVED,
1: AlertStatus.FIRING,
2: AlertStatus.FIRING,
}

# Nagios service states: 0=OK, 1=WARNING, 2=CRITICAL, 3=UNKNOWN
SERVICE_STATUS_MAP = {
0: AlertStatus.RESOLVED,
1: AlertStatus.FIRING,
2: AlertStatus.FIRING,
3: AlertStatus.FIRING,
}

SEVERITY_MAP = {
0: AlertSeverity.LOW, # OK
1: AlertSeverity.WARNING, # WARNING
2: AlertSeverity.CRITICAL, # CRITICAL
3: AlertSeverity.INFO, # UNKNOWN
}

def __init__(
self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
):
super().__init__(context_manager, provider_id, config)

def dispose(self):
pass

def validate_config(self):
"""
Validates the configuration of the Nagios provider.
"""
self.authentication_config = NagiosProviderAuthConfig(
**self.config.authentication
)

def _get_api_url(self, endpoint: str) -> str:
"""
Constructs the full API URL for a given endpoint.
"""
base_url = str(self.authentication_config.host_url).rstrip("/")
return f"{base_url}/nagiosxi/api/v1/{endpoint}"

def _make_request(self, endpoint: str, params: Optional[dict] = None) -> dict:
"""
Makes an authenticated request to the Nagios XI API.
"""
url = self._get_api_url(endpoint)
request_params = {"apikey": self.authentication_config.api_key}
if params:
request_params.update(params)

try:
response = requests.get(url, params=request_params, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise ProviderException(f"Failed to connect to Nagios API: {str(e)}")

def validate_scopes(self) -> dict[str, bool | str]:
"""
Validate the scopes of the provider by testing API connectivity.
"""
try:
# Test API access by fetching system status
self._make_request("system/status")
return {"authenticated": True}
except ProviderException as e:
return {"authenticated": str(e)}

def _get_alerts(self) -> list[AlertDto]:
"""
Fetches current alerts from Nagios XI.
"""
alerts = []

# Fetch host problems
try:
host_data = self._make_request("objects/hoststatus", {"hoststatustypes": "12"})
hosts = host_data.get("hoststatus", [])

for host in hosts:
alert = self._host_to_alert(host)
if alert:
alerts.append(alert)
except ProviderException:
self.logger.warning("Failed to fetch host status from Nagios")

# Fetch service problems
try:
service_data = self._make_request("objects/servicestatus", {"servicestatustypes": "28"})
services = service_data.get("servicestatus", [])

for service in services:
alert = self._service_to_alert(service)
if alert:
alerts.append(alert)
except ProviderException:
self.logger.warning("Failed to fetch service status from Nagios")

return alerts

def _host_to_alert(self, host: dict) -> Optional[AlertDto]:
"""
Converts a Nagios host status to an AlertDto.
"""
try:
current_state = int(host.get("current_state", 0))

return AlertDto(
id=f"nagios-host-{host.get('host_id', '')}",
name=f"Host: {host.get('name', 'Unknown')}",
description=host.get("status_text", host.get("plugin_output", "")),
status=self.HOST_STATUS_MAP.get(current_state, AlertStatus.FIRING),
severity=self.SEVERITY_MAP.get(current_state, AlertSeverity.WARNING),
source=["nagios"],
lastReceived=datetime.datetime.fromisoformat(
host.get("last_check", datetime.datetime.now().isoformat())
) if host.get("last_check") else datetime.datetime.now(),
Comment on lines +170 to +179
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AlertDto.lastReceived is being passed a datetime object here, but AlertDto's lastReceived validator (see keep/api/models/alert.py:170-208) expects a string/timestamp and will fail when given a datetime, causing _host_to_alert to raise and return None so host alerts are silently dropped. To avoid this, either pass a string (e.g. using .isoformat() or the raw API value) or let AlertDto handle the original last_check value directly and remove the datetime.fromisoformat call here.

Copilot uses AI. Check for mistakes.
fingerprint=f"nagios-host-{host.get('host_id', '')}",
labels={
"host_name": host.get("name", ""),
"address": host.get("address", ""),
"host_id": str(host.get("host_id", "")),
},
)
except Exception as e:
self.logger.error(f"Failed to convert host to alert: {e}")
return None

def _service_to_alert(self, service: dict) -> Optional[AlertDto]:
"""
Converts a Nagios service status to an AlertDto.
"""
try:
current_state = int(service.get("current_state", 0))
host_name = service.get("host_name", "Unknown")
service_name = service.get("name", service.get("service_description", "Unknown"))

return AlertDto(
id=f"nagios-service-{service.get('servicestatus_id', '')}",
name=f"Service: {host_name}/{service_name}",
description=service.get("status_text", service.get("plugin_output", "")),
status=self.SERVICE_STATUS_MAP.get(current_state, AlertStatus.FIRING),
severity=self.SEVERITY_MAP.get(current_state, AlertSeverity.WARNING),
source=["nagios"],
lastReceived=datetime.datetime.fromisoformat(
service.get("last_check", datetime.datetime.now().isoformat())
) if service.get("last_check") else datetime.datetime.now(),
Comment on lines +200 to +209
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as for hosts, lastReceived is set to a datetime object here, which conflicts with AlertDto's lastReceived validator expecting a string and will cause _service_to_alert to fail and drop service alerts. Align this with other providers by passing a string value (e.g. ISO-formatted or the raw last_check from the API) instead of a datetime instance.

Copilot uses AI. Check for mistakes.
fingerprint=f"nagios-service-{service.get('servicestatus_id', '')}",
labels={
"host_name": host_name,
"service_name": service_name,
"service_id": str(service.get("servicestatus_id", "")),
},
)
except Exception as e:
self.logger.error(f"Failed to convert service to alert: {e}")
return None

@staticmethod
def _format_alert(
event: dict, provider_instance: Optional["NagiosProvider"] = None
) -> AlertDto:
"""
Formats an incoming Nagios webhook event to AlertDto.

This method handles webhooks from Nagios XI when configured to send
notifications to Keep.
"""
# Determine if this is a host or service alert
is_service = "service" in event or "servicedesc" in event

if is_service:
# Service alert
state = int(event.get("servicestate", event.get("state", 0)))
status = NagiosProvider.SERVICE_STATUS_MAP.get(state, AlertStatus.FIRING)
severity = NagiosProvider.SEVERITY_MAP.get(state, AlertSeverity.WARNING)

host_name = event.get("hostname", event.get("host", "Unknown"))
service_name = event.get("servicedesc", event.get("service", "Unknown"))

return AlertDto(
id=event.get("id", f"nagios-{host_name}-{service_name}"),
name=f"Service: {host_name}/{service_name}",
description=event.get("output", event.get("message", "")),
status=status,
severity=severity,
source=["nagios"],
fingerprint=f"nagios-service-{host_name}-{service_name}",
labels={
"host_name": host_name,
"service_name": service_name,
"notification_type": event.get("notificationtype", ""),
},
)
else:
# Host alert
state = int(event.get("hoststate", event.get("state", 0)))
status = NagiosProvider.HOST_STATUS_MAP.get(state, AlertStatus.FIRING)
severity = NagiosProvider.SEVERITY_MAP.get(state, AlertSeverity.WARNING)
Comment on lines +235 to +261
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The webhook formatter assumes servicestate/hoststate (or state) are numeric and calls int(...), but Nagios notification macros like SERVICESTATE/HOSTSTATE are typically strings such as "OK", "WARNING", "CRITICAL", "UP", "DOWN"; trying to cast those to int will raise and break webhook handling, contradicting the documented state mapping behavior. Consider supporting both textual and numeric states (e.g. by normalizing strings through a separate mapping or by trying int only when the value is digit-like) so that common Nagios webhook payloads are accepted.

Copilot uses AI. Check for mistakes.

host_name = event.get("hostname", event.get("host", "Unknown"))

return AlertDto(
id=event.get("id", f"nagios-host-{host_name}"),
name=f"Host: {host_name}",
description=event.get("output", event.get("message", "")),
status=status,
severity=severity,
source=["nagios"],
fingerprint=f"nagios-host-{host_name}",
labels={
"host_name": host_name,
"address": event.get("hostaddress", event.get("address", "")),
"notification_type": event.get("notificationtype", ""),
},
)


if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.DEBUG)

config = ProviderConfig(
authentication={
"host_url": "https://nagios.example.com",
"api_key": "your-api-key",
}
)

context_manager = ContextManager(tenant_id="test")
provider = NagiosProvider(context_manager, "nagios-test", config)
print("Nagios provider initialized successfully!")
Loading