Skip to content

Commit

Permalink
Add merge logic to assemble_cluster_template module plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Webster Mudge <[email protected]>
  • Loading branch information
wmudge committed Nov 17, 2023
1 parent acd69dd commit 4418710
Showing 1 changed file with 90 additions and 36 deletions.
126 changes: 90 additions & 36 deletions plugins/modules/assemble_cluster_template.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright 2023 Cloudera, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -44,6 +41,7 @@
src:
description:
- An already existing directory of cluster template files.
- TODO Local or remote
type: path
required: True
aliases:
Expand Down Expand Up @@ -118,16 +116,23 @@
cloudera.cluster.assemble_cluster_template:
src: examples
dest: /opt/cloudera/cluster-template.json
- name: Assemble a cluster template from selected files (on the controller)
cloudera.cluster.assemble_cluster_template:
src: examples
dest: /opt/cloudera/cluster-template.json
regexp: "base|nifi"
- name: Assemble a cluster template from files on the target host
cloudera.cluster.assemble_cluster_template:
src: /tmp/examples
dest: /opt/cloudera/cluster-template.json
remote_src: yes
"""

RETURN = r"""#"""

import json
import os
import re
import tempfile
Expand All @@ -137,9 +142,13 @@


class AssembleClusterTemplate(object):
MERGED = {}
IDEMPOTENT_IDS = ["refName", "name", "clusterName", "hostName", "product"]
UNIQUE_IDS = ["repositories"]

def __init__(self, module):
self.module = module
self.module = module

# Set parameters
self.src = self.module.params["src"]
self.dest = self.module.params["dest"]
Expand All @@ -161,15 +170,66 @@ def __init__(self, module):
# Execute the logic
self.process()

def process_fragment(self, fh) -> bytes:
updated = bytearray(fh.read())
updated.extend("\n-------\n".encode())
return updated

def complete_assembly(self, assembled_file):
pass
def update_object(self, base, template, breadcrumbs=""):
if isinstance(base, dict) and isinstance(template, dict):
self.update_dict(base, template, breadcrumbs)
return True
elif isinstance(base, list) and isinstance(template, list):
self.update_list(base, template, breadcrumbs)
return True
return False

def update_dict(self, base, template, breadcrumbs=""):
for key, value in template.items():
crumb = breadcrumbs + "/" + key

if key in self.IDEMPOTENT_IDS:
if base[key] != value:
self.module.warn(
f"Objects with distinct IDs should not be merged: {crumb}"
)
continue

def _assemble_fragments(self, assembled_file):
if key not in base:
base[key] = value
elif not self.update_object(base[key], value, crumb) and base[key] != value:
self.module.warn(
f"Value being overwritten for key [{crumb}]; Old: [{base[key]}], New: [{value}]"
)
base[key] = value

if key in self.UNIQUE_IDS:
base[key] = list(set(base[key]))

def update_list(self, base, template, breadcrumbs=""):
for item in template:
if isinstance(item, dict):
for attr in self.IDEMPOTENT_IDS:
if attr in item:
idempotent_id = attr
break
else:
idempotent_id = None
if idempotent_id:
namesake = [
i for i in base if i[idempotent_id] == item[idempotent_id]
]
if namesake:
self.update_dict(
namesake[0],
item,
breadcrumbs
+ "/["
+ idempotent_id
+ "="
+ item[idempotent_id]
+ "]",
)
continue
base.append(item)
base.sort(key=lambda x: json.dumps(x, sort_keys=True))

def assemble_fragments(self, assembled_file):
# By file name sort order
for f in sorted(os.listdir(self.src)):
# Filter by regexp
Expand All @@ -183,17 +243,18 @@ def _assemble_fragments(self, assembled_file):
):
continue

with open(fragment, "rb") as fragment_file:
content = self.process_fragment(fragment_file)

# Write the resulting bytes
if content is not None:
assembled_file.write(content)
with open(fragment, "r", encoding="utf-8") as fragment_file:
try:
self.update_object(self.MERGED, json.loads(fragment_file.read()))
except json.JSONDecodeError as e:
self.module.fail_json(
msg=f"JSON parsing error: {to_text(e.msg)}", error=to_native(e)
)

# Finalize any remaining assembly
self.complete_assembly(assembled_file)
# Write out the final assembly
json.dump(self.MERGED, assembled_file, indent=2, sort_keys=False)

# Close the assembled file handle
# Close the assembled file handle; will not delete for atomic_move
assembled_file.close()

def process(self):
Expand All @@ -212,18 +273,13 @@ def process(self):
msg=f"Regular expression, {self.regexp} is invalid: {to_native(e)}"
)

# Assemble fragments
# Assemble the src files into output file
# No deletion on close; atomic_move "removes" the file
with tempfile.NamedTemporaryFile(
dir=self.module.tmpdir, delete=False
) as assembled:
mode="w", encoding="utf-8", dir=self.module.tmpdir, delete=False
) as assembled:
# Process fragments into temporary file
self._assemble_fragments(assembled)

# Confirm the assembled file is closed
if not assembled.closed:
self.module.fail_json(
msg=f"Assembled file, {assembled.name}, not closed after fragment processing"
)
self.assemble_fragments(assembled)

# Generate hashes for assembled file
assembled_sha1 = self.module.sha1(assembled.name)
Expand All @@ -242,9 +298,7 @@ def process(self):

if assembled_sha1 != dest_sha1:
if self.backup and dest_sha1 is not None:
self.output.update(
backup_file=self.module.backup_local(self.dest)
)
self.output.update(backup_file=self.module.backup_local(self.dest))

self.module.atomic_move(
assembled.name, self.dest, unsafe_writes=self.unsafe_writes
Expand Down

0 comments on commit 4418710

Please sign in to comment.