forked from cloudera-labs/cloudera.cluster
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update logging and error handling for CM API modules (cloudera-labs#168)
* Update logging and error handling for consistent debugging experience * Add ssl_ca_cert parameter for custom SSL certificate validation * Update return field for cm_version_info * Update to normalize redirects during endpoint discovery * Update logging to use the root logger and add a specific logger for cloudera.cluster Signed-off-by: Webster Mudge <[email protected]>
- Loading branch information
Showing
7 changed files
with
251 additions
and
212 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,3 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
|
||
# Copyright 2023 Cloudera, Inc. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
|
@@ -30,6 +27,7 @@ | |
from urllib.parse import urljoin | ||
|
||
from ansible.module_utils.basic import AnsibleModule | ||
from ansible.module_utils.common.text.converters import to_text | ||
|
||
from cm_client import ApiClient, Configuration | ||
from cm_client.rest import ApiException, RESTClientObject | ||
|
@@ -39,57 +37,65 @@ | |
__credits__ = ["[email protected]"] | ||
__maintainer__ = ["[email protected]"] | ||
|
||
""" | ||
A common Ansible Module for API access to Cloudera Manager. | ||
""" | ||
|
||
class ClouderaManagerModule(object): | ||
"""Base Ansible Module for API access to Cloudera Manager.""" | ||
|
||
@classmethod | ||
def handle_process(cls, f): | ||
"""Wrapper to handle log capture and common HTTP errors""" | ||
"""Wrapper to handle API, retry, and HTTP errors.""" | ||
|
||
@wraps(f) | ||
def _impl(self, *args, **kwargs): | ||
try: | ||
self._initialize_client() | ||
result = f(self, *args, **kwargs) | ||
def _add_log(err): | ||
if self.debug: | ||
self.log_out = self._get_log() | ||
self.log_lines.append(self.log_out.splitlines()) | ||
return result | ||
log = self.log_capture.getvalue() | ||
err.update(debug=log, debug_lines=log.split("\n")) | ||
return err | ||
|
||
try: | ||
self.initialize_client() | ||
return f(self, *args, **kwargs) | ||
except ApiException as ae: | ||
body = ae.body.decode("utf-8") | ||
if body != "": | ||
body = json.loads(body) | ||
self.module.fail_json( | ||
msg="API error: " + str(ae.reason), status_code=ae.status, body=body | ||
err = dict( | ||
msg="API error: " + to_text(ae.reason), | ||
status_code=ae.status, | ||
body=ae.body.decode("utf-8"), | ||
) | ||
if err["body"] != "": | ||
try: | ||
err.update(body=json.loads(err["body"])) | ||
except Exception as te: | ||
pass | ||
|
||
self.module.fail_json(**_add_log(err)) | ||
except MaxRetryError as maxe: | ||
self.module.fail_json(msg="Request error: " + str(maxe.reason)) | ||
err = dict( | ||
msg="Request error: " + to_text(maxe.reason), url=to_text(maxe.url) | ||
) | ||
self.module.fail_json(**_add_log(err)) | ||
except HTTPError as he: | ||
self.module.fail_json(msg="HTTP request: " + str(he)) | ||
err = dict(msg="HTTP request: " + str(he)) | ||
self.module.fail_json(**_add_log(err)) | ||
|
||
return _impl | ||
|
||
"""A base Cloudera Manager (CM) module class""" | ||
|
||
def __init__(self, module): | ||
# Set common parameters | ||
self.module = module | ||
self.url = self._get_param("url", None) | ||
self.force_tls = self._get_param("force_tls") | ||
self.host = self._get_param("host") | ||
self.port = self._get_param("port") | ||
self.version = self._get_param("version") | ||
self.username = self._get_param("username") | ||
self.password = self._get_param("password") | ||
self.verify_tls = self._get_param("verify_tls") | ||
self.debug = self._get_param("debug") | ||
self.agent_header = self._get_param("agent_header") | ||
self.url = self.get_param("url", None) | ||
self.force_tls = self.get_param("force_tls") | ||
self.host = self.get_param("host") | ||
self.port = self.get_param("port") | ||
self.version = self.get_param("version") | ||
self.username = self.get_param("username") | ||
self.password = self.get_param("password") | ||
self.verify_tls = self.get_param("verify_tls") | ||
self.ssl_ca_cert = self.get_param("ssl_ca_cert") | ||
self.debug = self.get_param("debug") | ||
self.agent_header = self.get_param("agent_header") | ||
|
||
# Initialize common return values | ||
self.log_out = None | ||
self.log_lines = [] | ||
self.changed = False | ||
|
||
# Configure the core CM API client parameters | ||
|
@@ -99,69 +105,71 @@ def __init__(self, module): | |
config.verify_ssl = self.verify_tls | ||
config.debug = self.debug | ||
|
||
# Configure logging | ||
_log_format = ( | ||
# Configure custom validation certificate | ||
if self.ssl_ca_cert: | ||
config.ssl_ca_cert = self.ssl_ca_cert | ||
|
||
# Create a common logging format | ||
log_format = ( | ||
"%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s" | ||
) | ||
|
||
# Configure the urllib3 logger | ||
self.logger = logging.getLogger("cloudera.cluster") | ||
|
||
if self.debug: | ||
self._setup_logger(logging.DEBUG, _log_format) | ||
self.logger.debug("CM API agent: %s", self.agent_header) | ||
else: | ||
self._setup_logger(logging.ERROR, _log_format) | ||
root_logger = logging.getLogger() | ||
root_logger.setLevel(logging.DEBUG) | ||
root_logger.propagate = True | ||
|
||
self.log_capture = io.StringIO() | ||
handler = logging.StreamHandler(self.log_capture) | ||
|
||
formatter = logging.Formatter(log_format) | ||
handler.setFormatter(formatter) | ||
|
||
root_logger.addHandler(handler) | ||
|
||
self.logger.debug("CM API agent: %s", self.agent_header) | ||
|
||
if self.verify_tls is False: | ||
disable_warnings(InsecureRequestWarning) | ||
|
||
def _get_param(self, param, default=None): | ||
"""Fetches an Ansible input parameter if it exists, else returns optional default or None""" | ||
def get_param(self, param, default=None): | ||
""" | ||
Fetches an Ansible input parameter if it exists, else returns optional | ||
default or None. | ||
""" | ||
if self.module is not None: | ||
return self.module.params[param] if param in self.module.params else default | ||
return default | ||
|
||
def _setup_logger(self, log_level, log_format): | ||
"""Configures the logging of the HTTP activity""" | ||
self.logger = logging.getLogger("urllib3") | ||
self.logger.setLevel(log_level) | ||
|
||
self.__log_capture = io.StringIO() | ||
handler = logging.StreamHandler(self.__log_capture) | ||
handler.setLevel(log_level) | ||
|
||
formatter = logging.Formatter(log_format) | ||
handler.setFormatter(formatter) | ||
|
||
self.logger.addHandler(handler) | ||
|
||
def _get_log(self): | ||
"""Retrieves the contents of the captured log""" | ||
contents = self.__log_capture.getvalue() | ||
self.__log_capture.truncate(0) | ||
return contents | ||
|
||
def _initialize_client(self): | ||
"""Configures and creates the API client""" | ||
def initialize_client(self): | ||
"""Creates the CM API client""" | ||
config = Configuration() | ||
|
||
# If provided a CML endpoint URL, use it directly | ||
if self.url: | ||
config.host = self.url | ||
# Otherwise, run discovery on missing parts | ||
else: | ||
config.host = self._discover_endpoint(config) | ||
config.host = self.discover_endpoint(config) | ||
|
||
# Create and set the API Client | ||
self.api_client = ApiClient() | ||
|
||
def get_auth_headers(self, config): | ||
"""Constructs a Basic Auth header dictionary from the Configuration. | ||
This dictionary can be used directly with the API client's REST client.""" | ||
""" | ||
Constructs a Basic Auth header dictionary from the Configuration. This | ||
dictionary can be used directly with the API client's REST client. | ||
""" | ||
headers = dict() | ||
auth = config.auth_settings().get("basic") | ||
headers[auth["key"]] = auth["value"] | ||
return headers | ||
|
||
def _discover_endpoint(self, config): | ||
"""Discovers the scheme and version of a potential Cloudara Manager host""" | ||
def discover_endpoint(self, config): | ||
"""Discovers the scheme and version of a potential Cloudara Manager host.""" | ||
# Get the authentication headers and REST client | ||
headers = self.get_auth_headers(config) | ||
rest = RESTClientObject() | ||
|
@@ -173,7 +181,15 @@ def _discover_endpoint(self, config): | |
rendered = rest.pool_manager.request( | ||
"GET", pre_rendered.url, headers=headers.copy() | ||
) | ||
rendered_url = rendered.geturl() | ||
|
||
# Normalize to handle redirects | ||
try: | ||
rendered_url = rendered.url | ||
except Exception: | ||
rendered_url = rendered.geturl() | ||
|
||
if rendered_url == "/": | ||
rendered_url = pre_rendered.url | ||
|
||
# Discover API version if not set | ||
if not self.version: | ||
|
@@ -213,20 +229,17 @@ def call_api(self, path, method, query=None, field="items", body=None): | |
_preload_content=False, | ||
) | ||
|
||
if 200 >= results[1] <= 299: | ||
data = json.loads(results[0].data.decode("utf-8")) | ||
if field in data: | ||
data = data[field] | ||
return data if type(data) is list else [data] | ||
else: | ||
self.module.fail_json( | ||
msg="Error interacting with CM resource", status_code=results[1] | ||
) | ||
data = json.loads(results[0].data.decode("utf-8")) | ||
if field in data: | ||
data = data[field] | ||
return data if type(data) is list else [data] | ||
|
||
@staticmethod | ||
def ansible_module_discovery(argument_spec={}, required_together=[], **kwargs): | ||
"""INTERNAL: Creates the Ansible module argument spec and dependencies for CM API endpoint discovery. | ||
Typically, modules will use the ansible_module method to include direct API endpoint URL support. | ||
def ansible_module_internal(argument_spec={}, required_together=[], **kwargs): | ||
""" | ||
INTERNAL: Creates the Ansible module argument spec and dependencies for | ||
CM API endpoint discovery. Typically, modules will use the | ||
ansible_module method to include direct API endpoint URL support. | ||
""" | ||
return AnsibleModule( | ||
argument_spec=dict( | ||
|
@@ -238,6 +251,7 @@ def ansible_module_discovery(argument_spec={}, required_together=[], **kwargs): | |
verify_tls=dict( | ||
required=False, type="bool", default=True, aliases=["tls"] | ||
), | ||
ssl_ca_cert=dict(type="path", aliases=["tls_cert", "ssl_cert"]), | ||
username=dict(required=True, type="str"), | ||
password=dict(required=True, type="str", no_log=True), | ||
debug=dict( | ||
|
@@ -262,8 +276,11 @@ def ansible_module( | |
required_together=[], | ||
**kwargs | ||
): | ||
"""Creates the base Ansible module argument spec and dependencies, including discovery and direct endpoint URL support.""" | ||
return ClouderaManagerModule.ansible_module_discovery( | ||
""" | ||
Creates the base Ansible module argument spec and dependencies, | ||
including discovery and direct endpoint URL support. | ||
""" | ||
return ClouderaManagerModule.ansible_module_internal( | ||
argument_spec=dict( | ||
**argument_spec, | ||
url=dict(type="str", aliases=["endpoint", "cm_endpoint_url"]), | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.