Skip to content

Commit

Permalink
Add cm_cluster module and update assemble_cluster_template module (#197)
Browse files Browse the repository at this point in the history
* Add cm_cluster module
* Relocate cluster template merge logic to cm_utils
* Update logic in cm_cluster module and use shared cluster template merge functions
* Add pytest resources
* Add unit tests for assemble_cluster_template module
* Add unit tests for assemble_cluster_template action
* Add unit tests for multiple idempotent keys and elements in merge functions
* Add short-circuit merger for initial fragment
* Add 'regex' alias to assemble_cluster_template module
* Add "fix" for VSCode pytest discovery
* Add pythonpath parameter to pytest.ini
* Add wait_for_command_state

Signed-off-by: rsuplina <[email protected]>
Signed-off-by: Webster Mudge <[email protected]>
Co-authored-by: Webster Mudge <[email protected]>
  • Loading branch information
rsuplina and wmudge authored Feb 29, 2024
1 parent d5a771f commit 4ff475c
Show file tree
Hide file tree
Showing 16 changed files with 1,790 additions and 150 deletions.
76 changes: 12 additions & 64 deletions plugins/action/assemble_cluster_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,72 +29,15 @@
from ansible.plugins.action import ActionBase
from ansible.utils.hashing import checksum_s

from ansible_collections.cloudera.cluster.plugins.module_utils.cm_utils import ClusterTemplate

class ActionModule(ActionBase):
TRANSFERS_FILES = True

MERGED = {}
IDEMPOTENT_IDS = ["refName", "name", "clusterName", "hostName", "product"]
UNIQUE_IDS = ["repositories"]

def update_object(self, base, template, breadcrumbs=""):
if isinstance(base, dict) and isinstance(template, dict):
self.update_dict(base, template, breadcrumbs)
return True
elif isinstance(base, list) and isinstance(template, list):
self.update_list(base, template, breadcrumbs)
return True
return False

def update_dict(self, base, template, breadcrumbs=""):
for key, value in template.items():
crumb = breadcrumbs + "/" + key

if key in self.IDEMPOTENT_IDS:
if base[key] != value:
self._display.error(
"Objects with distinct IDs should not be merged: " + crumb
)
continue

if key not in base:
base[key] = value
elif not self.update_object(base[key], value, crumb) and base[key] != value:
self._display.warning(
f"Value being overwritten for key [{crumb}]], Old: [{base[key]}], New: [{value}]"
)
base[key] = value

if key in self.UNIQUE_IDS:
base[key] = list(set(base[key]))

def update_list(self, base, template, breadcrumbs=""):
for item in template:
if isinstance(item, dict):
for attr in self.IDEMPOTENT_IDS:
if attr in item:
idempotent_id = attr
break
else:
idempotent_id = None
if idempotent_id:
namesake = [
i for i in base if i[idempotent_id] == item[idempotent_id]
]
if namesake:
self.update_dict(
namesake[0],
item,
breadcrumbs
+ "/["
+ idempotent_id
+ "="
+ item[idempotent_id]
+ "]",
)
continue
base.append(item)
base.sort(key=lambda x: json.dumps(x, sort_keys=True))

def __init__(self, task, connection, play_context, loader, templar, shared_loader_obj):
super().__init__(task, connection, play_context, loader, templar, shared_loader_obj)
self.TEMPLATE = ClusterTemplate(warn_fn=self._display.warning, error_fn=self._display.error)
self.MERGED = {}

def assemble_fragments(
self, assembled_file, src_path, regex=None, ignore_hidden=True, decrypt=True
Expand All @@ -121,7 +64,10 @@ def assemble_fragments(
encoding="utf-8",
) as fragment_file:
try:
self.update_object(self.MERGED, json.loads(fragment_file.read()))
if not self.MERGED:
self.MERGED = json.loads(fragment_file.read())
else:
self.TEMPLATE.merge(self.MERGED, json.loads(fragment_file.read()))
except json.JSONDecodeError as e:
raise AnsibleActionFail(
message=f"JSON parsing error: {to_text(e.msg)}",
Expand Down Expand Up @@ -155,6 +101,8 @@ def run(self, tmp=None, task_vars=None):
regexp = self._task.args.get("regexp", None)
if regexp is None:
regexp = self._task.args.get("filter", None)
if regexp is None:
regexp = self._task.args.get("regex", None)

remote_src = boolean(self._task.args.get("remote_src", False))
follow = boolean(self._task.args.get("follow", False))
Expand Down
109 changes: 102 additions & 7 deletions plugins/module_utils/cm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import logging

from functools import wraps
from typing import Union
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning, MaxRetryError, HTTPError
from urllib3.util import Url
from urllib.parse import urljoin
from time import sleep
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_text

from time import sleep
from cm_client import ApiClient, Configuration
from cm_client.rest import ApiException, RESTClientObject
from cm_client.apis.cloudera_manager_resource_api import ClouderaManagerResourceApi
Expand All @@ -39,6 +40,101 @@
__maintainer__ = ["[email protected]"]


class ClusterTemplate(object):
IDEMPOTENT_IDS = frozenset(
["refName", "name", "clusterName", "hostName", "product"]
)
UNIQUE_IDS = frozenset(["repositories"])

def __init__(self, warn_fn, error_fn) -> None:
self._warn = warn_fn
self._error = error_fn

def merge(self, base: Union[dict, list], fragment: Union[dict, list]) -> bool:
if isinstance(base, dict) and isinstance(fragment, dict):
self._update_dict(base, fragment)
elif isinstance(base, list) and isinstance(fragment, list):
self._update_list(base, fragment)
else:
raise TypeError(
f"Base and fragment arguments must be the same type: base[{type(base)}], fragment[{type(fragment)}]"
)

def _update_dict(self, base, fragment, breadcrumbs="") -> None:
for key, value in fragment.items():
crumb = breadcrumbs + "/" + key

# If the key is idempotent, error that the values are different
if key in self.IDEMPOTENT_IDS:
if base[key] != value:
self._error(f"Unable to override value for distinct key [{crumb}]")
continue

# If it's a new key, add to the bae
if key not in base:
base[key] = value
# If the value is a dictionary, merge
elif isinstance(value, dict):
self._update_dict(base[key], value, crumb)
# If the value is a list, merge
elif isinstance(value, list):
self._update_list(base[key], value, crumb)
# Else the value is a scalar
else:
# If the value is different, override
if base[key] != value:
self._warn(
f"Overriding value for key [{crumb}]], Old: [{base[key]}], New: [{value}]"
)
base[key] = value

if key in self.UNIQUE_IDS:
base[key] = list(set(base[key]))
base[key].sort(key=lambda x: json.dumps(x, sort_keys=True))

def _update_list(self, base, fragment, breadcrumbs="") -> None:
for entry in fragment:
if isinstance(entry, dict):
# Discover if the incoming dict has an idempotent key
idempotent_key = next(
iter(
[
id
for id in set(entry.keys()).intersection(
self.IDEMPOTENT_IDS
)
]
),
None,
)

# Merge the idemponent key's dictionary rather than appending as a new entry
if idempotent_key:
existing_entry = next(
iter(
[
i
for i in base
if isinstance(i, dict)
and idempotent_key in i
and i[idempotent_key] == entry[idempotent_key]
]
),
None,
)
if existing_entry:
self._update_dict(
existing_entry,
entry,
f"{breadcrumbs}/[{idempotent_key}={entry[idempotent_key]}]",
)
continue
# Else, drop to appending the entry as net new
base.append(entry)

base.sort(key=lambda x: json.dumps(x, sort_keys=True))


class ClouderaManagerModule(object):
"""Base Ansible Module for API access to Cloudera Manager."""

Expand All @@ -61,13 +157,12 @@ def _add_log(err):
err = dict(
msg="API error: " + to_text(ae.reason),
status_code=ae.status,
body=ae.body.decode("utf-8"),
)
if err["body"] != "":
if ae.body:
try:
err.update(body=json.loads(err["body"]))
except Exception as te:
pass
err.update(body=json.loads(ae.body))
except Exception:
err.update(body=ae.body.decode("utf-8")),

self.module.fail_json(**_add_log(err))
except MaxRetryError as maxe:
Expand Down Expand Up @@ -285,7 +380,7 @@ def ansible_module(
mutually_exclusive=[],
required_one_of=[],
required_together=[],
**kwargs
**kwargs,
):
"""
Creates the base Ansible module argument spec and dependencies,
Expand Down
Loading

0 comments on commit 4ff475c

Please sign in to comment.