diff --git a/plugins/modules/assemble_cluster_template.py b/plugins/modules/assemble_cluster_template.py index 2a5f507d..7710c6d5 100644 --- a/plugins/modules/assemble_cluster_template.py +++ b/plugins/modules/assemble_cluster_template.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - # Copyright 2023 Cloudera, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -44,6 +41,7 @@ src: description: - An already existing directory of cluster template files. + - TODO Local or remote type: path required: True aliases: @@ -118,16 +116,23 @@ cloudera.cluster.assemble_cluster_template: src: examples dest: /opt/cloudera/cluster-template.json - + - name: Assemble a cluster template from selected files (on the controller) cloudera.cluster.assemble_cluster_template: src: examples dest: /opt/cloudera/cluster-template.json regexp: "base|nifi" + +- name: Assemble a cluster template from files on the target host + cloudera.cluster.assemble_cluster_template: + src: /tmp/examples + dest: /opt/cloudera/cluster-template.json + remote_src: yes """ RETURN = r"""#""" +import json import os import re import tempfile @@ -137,9 +142,13 @@ class AssembleClusterTemplate(object): + MERGED = {} + IDEMPOTENT_IDS = ["refName", "name", "clusterName", "hostName", "product"] + UNIQUE_IDS = ["repositories"] + def __init__(self, module): - self.module = module - + self.module = module + # Set parameters self.src = self.module.params["src"] self.dest = self.module.params["dest"] @@ -161,15 +170,66 @@ def __init__(self, module): # Execute the logic self.process() - def process_fragment(self, fh) -> bytes: - updated = bytearray(fh.read()) - updated.extend("\n-------\n".encode()) - return updated - - def complete_assembly(self, assembled_file): - pass + def update_object(self, base, template, breadcrumbs=""): + if isinstance(base, dict) and isinstance(template, dict): + self.update_dict(base, template, breadcrumbs) + return True + elif isinstance(base, list) and isinstance(template, list): + self.update_list(base, template, breadcrumbs) + return True + return False + + def update_dict(self, base, template, breadcrumbs=""): + for key, value in template.items(): + crumb = breadcrumbs + "/" + key + + if key in self.IDEMPOTENT_IDS: + if base[key] != value: + self.module.warn( + f"Objects with distinct IDs should not be merged: {crumb}" + ) + continue - def _assemble_fragments(self, assembled_file): + if key not in base: + base[key] = value + elif not self.update_object(base[key], value, crumb) and base[key] != value: + self.module.warn( + f"Value being overwritten for key [{crumb}]; Old: [{base[key]}], New: [{value}]" + ) + base[key] = value + + if key in self.UNIQUE_IDS: + base[key] = list(set(base[key])) + + def update_list(self, base, template, breadcrumbs=""): + for item in template: + if isinstance(item, dict): + for attr in self.IDEMPOTENT_IDS: + if attr in item: + idempotent_id = attr + break + else: + idempotent_id = None + if idempotent_id: + namesake = [ + i for i in base if i[idempotent_id] == item[idempotent_id] + ] + if namesake: + self.update_dict( + namesake[0], + item, + breadcrumbs + + "/[" + + idempotent_id + + "=" + + item[idempotent_id] + + "]", + ) + continue + base.append(item) + base.sort(key=lambda x: json.dumps(x, sort_keys=True)) + + def assemble_fragments(self, assembled_file): # By file name sort order for f in sorted(os.listdir(self.src)): # Filter by regexp @@ -183,17 +243,18 @@ def _assemble_fragments(self, assembled_file): ): continue - with open(fragment, "rb") as fragment_file: - content = self.process_fragment(fragment_file) - - # Write the resulting bytes - if content is not None: - assembled_file.write(content) + with open(fragment, "r", encoding="utf-8") as fragment_file: + try: + self.update_object(self.MERGED, json.loads(fragment_file.read())) + except json.JSONDecodeError as e: + self.module.fail_json( + msg=f"JSON parsing error: {to_text(e.msg)}", error=to_native(e) + ) - # Finalize any remaining assembly - self.complete_assembly(assembled_file) + # Write out the final assembly + json.dump(self.MERGED, assembled_file, indent=2, sort_keys=False) - # Close the assembled file handle + # Close the assembled file handle; will not delete for atomic_move assembled_file.close() def process(self): @@ -212,18 +273,13 @@ def process(self): msg=f"Regular expression, {self.regexp} is invalid: {to_native(e)}" ) - # Assemble fragments + # Assemble the src files into output file + # No deletion on close; atomic_move "removes" the file with tempfile.NamedTemporaryFile( - dir=self.module.tmpdir, delete=False - ) as assembled: + mode="w", encoding="utf-8", dir=self.module.tmpdir, delete=False + ) as assembled: # Process fragments into temporary file - self._assemble_fragments(assembled) - - # Confirm the assembled file is closed - if not assembled.closed: - self.module.fail_json( - msg=f"Assembled file, {assembled.name}, not closed after fragment processing" - ) + self.assemble_fragments(assembled) # Generate hashes for assembled file assembled_sha1 = self.module.sha1(assembled.name) @@ -242,9 +298,7 @@ def process(self): if assembled_sha1 != dest_sha1: if self.backup and dest_sha1 is not None: - self.output.update( - backup_file=self.module.backup_local(self.dest) - ) + self.output.update(backup_file=self.module.backup_local(self.dest)) self.module.atomic_move( assembled.name, self.dest, unsafe_writes=self.unsafe_writes