Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Import_cluster_template module #197

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 12 additions & 64 deletions plugins/action/assemble_cluster_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,72 +29,15 @@
from ansible.plugins.action import ActionBase
from ansible.utils.hashing import checksum_s

from ansible_collections.cloudera.cluster.plugins.module_utils.cm_utils import ClusterTemplate

class ActionModule(ActionBase):
TRANSFERS_FILES = True

MERGED = {}
IDEMPOTENT_IDS = ["refName", "name", "clusterName", "hostName", "product"]
UNIQUE_IDS = ["repositories"]

def update_object(self, base, template, breadcrumbs=""):
if isinstance(base, dict) and isinstance(template, dict):
self.update_dict(base, template, breadcrumbs)
return True
elif isinstance(base, list) and isinstance(template, list):
self.update_list(base, template, breadcrumbs)
return True
return False

def update_dict(self, base, template, breadcrumbs=""):
for key, value in template.items():
crumb = breadcrumbs + "/" + key

if key in self.IDEMPOTENT_IDS:
if base[key] != value:
self._display.error(
"Objects with distinct IDs should not be merged: " + crumb
)
continue

if key not in base:
base[key] = value
elif not self.update_object(base[key], value, crumb) and base[key] != value:
self._display.warning(
f"Value being overwritten for key [{crumb}]], Old: [{base[key]}], New: [{value}]"
)
base[key] = value

if key in self.UNIQUE_IDS:
base[key] = list(set(base[key]))

def update_list(self, base, template, breadcrumbs=""):
for item in template:
if isinstance(item, dict):
for attr in self.IDEMPOTENT_IDS:
if attr in item:
idempotent_id = attr
break
else:
idempotent_id = None
if idempotent_id:
namesake = [
i for i in base if i[idempotent_id] == item[idempotent_id]
]
if namesake:
self.update_dict(
namesake[0],
item,
breadcrumbs
+ "/["
+ idempotent_id
+ "="
+ item[idempotent_id]
+ "]",
)
continue
base.append(item)
base.sort(key=lambda x: json.dumps(x, sort_keys=True))

def __init__(self, task, connection, play_context, loader, templar, shared_loader_obj):
super().__init__(task, connection, play_context, loader, templar, shared_loader_obj)
self.TEMPLATE = ClusterTemplate(warn_fn=self._display.warning, error_fn=self._display.error)
self.MERGED = {}

def assemble_fragments(
self, assembled_file, src_path, regex=None, ignore_hidden=True, decrypt=True
Expand All @@ -121,7 +64,10 @@ def assemble_fragments(
encoding="utf-8",
) as fragment_file:
try:
self.update_object(self.MERGED, json.loads(fragment_file.read()))
if not self.MERGED:
self.MERGED = json.loads(fragment_file.read())
else:
self.TEMPLATE.merge(self.MERGED, json.loads(fragment_file.read()))
except json.JSONDecodeError as e:
raise AnsibleActionFail(
message=f"JSON parsing error: {to_text(e.msg)}",
Expand Down Expand Up @@ -155,6 +101,8 @@ def run(self, tmp=None, task_vars=None):
regexp = self._task.args.get("regexp", None)
if regexp is None:
regexp = self._task.args.get("filter", None)
if regexp is None:
regexp = self._task.args.get("regex", None)

remote_src = boolean(self._task.args.get("remote_src", False))
follow = boolean(self._task.args.get("follow", False))
Expand Down
120 changes: 113 additions & 7 deletions plugins/module_utils/cm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,120 @@
import logging

from functools import wraps
from typing import Union
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning, MaxRetryError, HTTPError
from urllib3.util import Url
from urllib.parse import urljoin

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_text

from time import sleep
from cm_client import ApiClient, Configuration
from cm_client.rest import ApiException, RESTClientObject
from cm_client.apis.cloudera_manager_resource_api import ClouderaManagerResourceApi
from cm_client.apis.commands_resource_api import CommandsResourceApi


__credits__ = ["[email protected]"]
__maintainer__ = ["[email protected]"]


class ClusterTemplate(object):
IDEMPOTENT_IDS = frozenset(
["refName", "name", "clusterName", "hostName", "product"]
)
UNIQUE_IDS = frozenset(["repositories"])

def __init__(self, warn_fn, error_fn) -> None:
self._warn = warn_fn
self._error = error_fn

def merge(self, base: Union[dict, list], fragment: Union[dict, list]) -> bool:
if isinstance(base, dict) and isinstance(fragment, dict):
self._update_dict(base, fragment)
elif isinstance(base, list) and isinstance(fragment, list):
self._update_list(base, fragment)
else:
raise TypeError(
f"Base and fragment arguments must be the same type: base[{type(base)}], fragment[{type(fragment)}]"
)

def _update_dict(self, base, fragment, breadcrumbs="") -> None:
for key, value in fragment.items():
crumb = breadcrumbs + "/" + key

# If the key is idempotent, error that the values are different
if key in self.IDEMPOTENT_IDS:
if base[key] != value:
self._error(f"Unable to override value for distinct key [{crumb}]")
continue

# If it's a new key, add to the bae
if key not in base:
base[key] = value
# If the value is a dictionary, merge
elif isinstance(value, dict):
self._update_dict(base[key], value, crumb)
# If the value is a list, merge
elif isinstance(value, list):
self._update_list(base[key], value, crumb)
# Else the value is a scalar
else:
# If the value is different, override
if base[key] != value:
self._warn(
f"Overriding value for key [{crumb}]], Old: [{base[key]}], New: [{value}]"
)
base[key] = value

if key in self.UNIQUE_IDS:
base[key] = list(set(base[key]))
base[key].sort(key=lambda x: json.dumps(x, sort_keys=True))

def _update_list(self, base, fragment, breadcrumbs="") -> None:
for entry in fragment:
if isinstance(entry, dict):
# Discover if the incoming dict has an idempotent key
idempotent_key = next(
iter(
[
id
for id in set(entry.keys()).intersection(
self.IDEMPOTENT_IDS
)
]
),
None,
)

# Merge the idemponent key's dictionary rather than appending as a new entry
if idempotent_key:
existing_entry = next(
iter(
[
i
for i in base
if isinstance(i, dict)
and idempotent_key in i
and i[idempotent_key] == entry[idempotent_key]
]
),
None,
)
if existing_entry:
self._update_dict(
existing_entry,
entry,
f"{breadcrumbs}/[{idempotent_key}={entry[idempotent_key]}]",
)
continue
# Else, drop to appending the entry as net new
base.append(entry)

base.sort(key=lambda x: json.dumps(x, sort_keys=True))


class ClouderaManagerModule(object):
"""Base Ansible Module for API access to Cloudera Manager."""

Expand All @@ -60,13 +157,12 @@ def _add_log(err):
err = dict(
msg="API error: " + to_text(ae.reason),
status_code=ae.status,
body=ae.body.decode("utf-8"),
)
if err["body"] != "":
if ae.body:
try:
err.update(body=json.loads(err["body"]))
except Exception as te:
pass
err.update(body=json.loads(ae.body))
except Exception:
err.update(body=ae.body.decode("utf-8")),

self.module.fail_json(**_add_log(err))
except MaxRetryError as maxe:
Expand Down Expand Up @@ -207,6 +303,16 @@ def set_session_cookie(self):
api_instance.get_version()
self.api_client.cookie = self.api_client.last_response.getheader("Set-Cookie")

def wait_for_command_state(self,command_id, polling_interval):
command_api_instance = CommandsResourceApi(self.api_client)
while True:
get_command_state = command_api_instance.read_command_with_http_info(command_id=command_id)
state = get_command_state[0].active
if not state:
break
sleep(polling_interval)
return True

def call_api(self, path, method, query=None, field="items", body=None):
"""Wrapper to call a CM API endpoint path directly."""
path_params = []
Expand Down Expand Up @@ -274,7 +380,7 @@ def ansible_module(
mutually_exclusive=[],
required_one_of=[],
required_together=[],
**kwargs
**kwargs,
):
"""
Creates the base Ansible module argument spec and dependencies,
Expand Down
Loading
Loading