diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java index 0da92ac30..68faf905e 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java @@ -383,6 +383,7 @@ protected static void addProcessGroup(Document doc, Element element, ProcessGrou addTextElement(element, "name", processGroupSchema.getName()); addPosition(element); addTextElement(element, "comment", processGroupSchema.getComment()); + addVariables(doc, element, processGroupSchema.getVariables()); for (ProcessorSchema processorConfig : processGroupSchema.getProcessors()) { addProcessor(element, processorConfig); @@ -417,6 +418,7 @@ protected static void addProcessGroup(Document doc, Element element, ProcessGrou for (ControllerServiceSchema controllerServiceSchema : processGroupSchema.getControllerServices()) { addControllerService(element, controllerServiceSchema); } + } catch (ConfigurationChangeException e) { throw e; } catch (Exception e) { @@ -424,6 +426,22 @@ protected static void addProcessGroup(Document doc, Element element, ProcessGrou } } + protected static void addVariables(Document doc, Element parentElement, Map variables) { + if (variables == null) { + return; + } + + final Element element = doc.createElement("variables"); + parentElement.appendChild(element); + + for(String key : variables.keySet()) { + Element entryElement = doc.createElement("entry"); + addTextElement(entryElement, "key", key); + addTextElement(entryElement, "value", variables.get(key)); + element.appendChild(entryElement); + } + } + protected static void addPort(Document doc, Element parentElement, PortSchema portSchema, String tag) { Element element = doc.createElement(tag); parentElement.appendChild(element); diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java index 7d2d48bec..8fac74497 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java @@ -120,6 +120,11 @@ public void testProcessGroupsTransform() throws Exception { testConfigFileTransform("config-process-groups.yml"); } + @Test + public void testProcessGroupsVariablesTransform() throws Exception { + testConfigFileTransform("config-pg-variables.yml"); + } + @Test public void testFunnelsTransform() throws Exception { testConfigFileTransform("stress-test-framework-funnel.yml"); @@ -460,6 +465,8 @@ private void testProcessGroup(Element element, ProcessGroupSchema processGroupSc assertEquals(processGroupSchema.getName(), getText(element, "name")); assertEquals(nullToEmpty(processGroupSchema.getComment()), nullToEmpty(getText(element, "comment"))); + testVariables(element, processGroupSchema.getVariables()); + checkOrderOfChildren(element, PG_ELEMENT_ORDER_MAP); NodeList processorElements = (NodeList) xPathFactory.newXPath().evaluate("processor", element, XPathConstants.NODESET); @@ -510,6 +517,16 @@ private void testProcessGroup(Element element, ProcessGroupSchema processGroupSc } } + private void testVariables(Element element, Map expected) throws XPathExpressionException { + NodeList variablesElement = (NodeList) xPathFactory.newXPath().evaluate("variables/entry", element, XPathConstants.NODESET); + Map variables = new HashMap<>(); + for (int i = 0; i < variablesElement.getLength(); i++) { + Element item = (Element) variablesElement.item(i); + variables.put(getText(item, "key"), getText(item, "value")); + } + assertEquals(expected.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> nullToEmpty(e.getValue()))), variables); + } + private void testProcessor(Element element, ProcessorSchema processorSchema) throws XPathExpressionException { assertEquals(processorSchema.getId(), getText(element, "id")); assertEquals(processorSchema.getName(), getText(element, "name")); diff --git a/minifi-bootstrap/src/test/resources/config-pg-variables.yml b/minifi-bootstrap/src/test/resources/config-pg-variables.yml new file mode 100644 index 000000000..e34e3e95c --- /dev/null +++ b/minifi-bootstrap/src/test/resources/config-pg-variables.yml @@ -0,0 +1,283 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +MiNiFi Config Version: 3 +Flow Controller: + name: ProcessGroupsAndRemoteProcessGroups + comment: '' +Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 +FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: + threshold: 20000 + in period: 5 sec + in threads: 1 + out period: 5 sec + out threads: 4 +Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false +Provenance Repository: + provenance rollover time: 1 min +Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min +Security Properties: + keystore: '' + keystore type: '' + keystore password: '' + key password: '' + truststore: '' + truststore type: '' + truststore password: '' + ssl protocol: '' + Sensitive Props: + key: + algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL + provider: BC +Processors: +- id: 207748d1-0158-1000-0000-000000000000 + name: GenerateFlowFile + class: org.apache.nifi.processors.standard.GenerateFlowFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Batch Size: '1' + Data Format: Binary + File Size: 1 b + Unique FlowFiles: 'false' +- id: 2079e8bd-0158-1000-0000-000000000000 + name: LogAttribute + class: org.apache.nifi.processors.standard.LogAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + - success + Properties: + Attributes to Ignore: + Attributes to Log: + Log Level: info + Log Payload: 'false' + Log prefix: +- id: 2077ab1e-0158-1000-0000-000000000000 + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Delete Attributes Expression: + top: top +Process Groups: +- id: 207888b1-0158-1000-0000-000000000000 + name: middle + Processors: + - id: 2078f34e-0158-1000-0000-000000000000 + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Delete Attributes Expression: + middle: middle + Process Groups: + - id: 20794cd4-0158-1000-0000-000000000000 + name: bottom + Processors: + - id: 207a89ba-0158-1000-0000-000000000000 + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Delete Attributes Expression: + bottom: bottom + Process Groups: [] + Input Ports: + - id: 207a5f50-0158-1000-0000-000000000000 + name: input + Output Ports: + - id: 207a6d92-0158-1000-0000-000000000000 + name: output + Connections: + - id: 21a6abb9-0158-1000-0000-000000000000 + name: UpdateAttribute/success/21a39aba-0158-1000-a1a0-1b55bcddcd72 + source id: 207a89ba-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 21a39aba-0158-1000-a1a0-1b55bcddcd72 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 207ad5e9-0158-1000-0000-000000000000 + name: UpdateAttribute/success/null + source id: 207a89ba-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 207a6d92-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 207aca0d-0158-1000-0000-000000000000 + name: null//UpdateAttribute + source id: 207a5f50-0158-1000-0000-000000000000 + source relationship names: [] + destination id: 207a89ba-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + Remote Process Groups: + - name: http://localhost:9091/nifi + url: http://localhost:9091/nifi + comment: '' + timeout: 30 sec + yield period: 10 sec + Input Ports: + - id: 21a39aba-0158-1000-a1a0-1b55bcddcd72 + name: input2 + comment: '' + max concurrent tasks: 1 + use compression: false + Variables: + key: value + Input Ports: + - id: 2078c936-0158-1000-0000-000000000000 + name: input + Output Ports: + - id: 2079b327-0158-1000-0000-000000000000 + name: output + Connections: + - id: 21a5b1f1-0158-1000-0000-000000000000 + name: UpdateAttribute/success/21a2fb5e-0158-1000-3b5e-5a7d3aaee01b + source id: 2078f34e-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 21a2fb5e-0158-1000-3b5e-5a7d3aaee01b + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 207b0eb1-0158-1000-0000-000000000000 + name: UpdateAttribute/success/null + source id: 2078f34e-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 207a5f50-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 20792ec2-0158-1000-0000-000000000000 + name: null//UpdateAttribute + source id: 2078c936-0158-1000-0000-000000000000 + source relationship names: [] + destination id: 2078f34e-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + - id: 207b1880-0158-1000-0000-000000000000 + name: null//null + source id: 207a6d92-0158-1000-0000-000000000000 + source relationship names: [] + destination id: 2079b327-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' + Remote Process Groups: + - name: http://localhost:9090/nifi + url: http://localhost:9090/nifi + comment: '' + timeout: 30 sec + yield period: 10 sec + Input Ports: + - id: 21a2fb5e-0158-1000-3b5e-5a7d3aaee01b + name: input + comment: '' + max concurrent tasks: 1 + use compression: false + Variables: + variable: foo + another: one +Input Ports: [] +Output Ports: [] +Connections: +- id: 2077bf8f-0158-1000-0000-000000000000 + name: GenerateFlowFile/success/UpdateAttribute + source id: 207748d1-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 2077ab1e-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' +- id: 2079cf6f-0158-1000-0000-000000000000 + name: UpdateAttribute/success/null + source id: 2077ab1e-0158-1000-0000-000000000000 + source relationship names: + - success + destination id: 2078c936-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' +- id: 2079faa0-0158-1000-0000-000000000000 + name: null//LogAttribute + source id: 2079b327-0158-1000-0000-000000000000 + source relationship names: [] + destination id: 2079e8bd-0158-1000-0000-000000000000 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: '' +Remote Process Groups: [] +Variables: + root: variable \ No newline at end of file diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java index 08a3acb19..385858509 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java @@ -24,10 +24,12 @@ import org.apache.nifi.minifi.commons.schema.common.StringUtil; import org.apache.nifi.minifi.commons.schema.common.WritableSchema; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY; @@ -43,6 +45,7 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements WritableSchema, ConvertableSchema { public static final String PROCESS_GROUPS_KEY = "Process Groups"; + public static final String PROCESS_GROUPS_VARIABLES_KEY = "Variables"; public static final String ID_DEFAULT = "Root-Group"; private String comment; @@ -54,6 +57,7 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa private List processGroupSchemas; private List inputPortSchemas; private List outputPortSchemas; + private Map variables; public ProcessGroupSchema(Map map, String wrapperName) { super(map, wrapperName); @@ -66,6 +70,7 @@ public ProcessGroupSchema(Map map, String wrapperName) { inputPortSchemas = getOptionalKeyAsList(map, INPUT_PORTS_KEY, m -> new PortSchema(m, "InputPort(id: {id}, name: {name})"), wrapperName); outputPortSchemas = getOptionalKeyAsList(map, OUTPUT_PORTS_KEY, m -> new PortSchema(m, "OutputPort(id: {id}, name: {name})"), wrapperName); processGroupSchemas = getOptionalKeyAsList(map, PROCESS_GROUPS_KEY, m -> new ProcessGroupSchema(m, "ProcessGroup(id: {id}, name: {name})"), wrapperName); + variables = getOptionalKeyAsType(map, PROCESS_GROUPS_VARIABLES_KEY, Map.class, wrapperName, Collections.emptyMap()); if (ConfigSchema.TOP_LEVEL_NAME.equals(wrapperName)) { if (inputPortSchemas.size() > 0) { @@ -93,6 +98,7 @@ public ProcessGroupSchema(Map map, String wrapperName) { addIssuesIfNotNull(connections); } + @Override public Map toMap() { Map result = mapSupplier.get(); String id = getId(); @@ -108,6 +114,11 @@ public Map toMap() { putListIfNotNull(result, FUNNELS_KEY, funnels); putListIfNotNull(result, CONNECTIONS_KEY, connections); putListIfNotNull(result, REMOTE_PROCESS_GROUPS_KEY, remoteProcessGroups); + + if(!variables.isEmpty()) { + result.put(PROCESS_GROUPS_VARIABLES_KEY, new TreeMap<>(variables)); + } + return result; } @@ -176,6 +187,10 @@ public List getInputPortSchemas() { return inputPortSchemas; } + public Map getVariables() { + return variables; + } + @Override protected boolean isValidId(String value) { if (ID_DEFAULT.equals(value)) { diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java index 221c4aa1a..6bf3e85d6 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java +++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java @@ -85,10 +85,10 @@ public ConfigSchema apply(TemplateDTO templateDTO) { } protected void addSnippet(Map map, FlowSnippetDTO snippet) { - addSnippet(map, null, null, snippet); + addSnippet(map, null, null, null, snippet); } - protected Map addSnippet(Map map, String id, String name, FlowSnippetDTO snippet) { + protected Map addSnippet(Map map, String id, String name, Map variables, FlowSnippetDTO snippet) { if (!StringUtil.isNullOrEmpty(id)) { map.put(ID_KEY, id); } @@ -97,6 +97,10 @@ protected Map addSnippet(Map map, String id, Str map.put(NAME_KEY, name); } + if (variables != null) { + map.put(ProcessGroupSchema.PROCESS_GROUPS_VARIABLES_KEY, variables); + } + map.put(CommonPropertyKeys.PROCESSORS_KEY, nullToEmpty(snippet.getProcessors()).stream() .map(processorSchemaFunction) .sorted(Comparator.comparing(ProcessorSchema::getName)) @@ -140,7 +144,8 @@ protected Map addSnippet(Map map, String id, Str .collect(Collectors.toList())); map.put(ProcessGroupSchema.PROCESS_GROUPS_KEY, nullToEmpty(snippet.getProcessGroups()).stream() - .map(p -> addSnippet(new HashMap<>(), p.getId(), p.getName(), p.getContents())).collect(Collectors.toList())); + .map(p -> addSnippet(new HashMap<>(), p.getId(), p.getName(), p.getVariables(), p.getContents())) + .collect(Collectors.toList())); return map; } diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java index 12e44e3bd..466e473aa 100644 --- a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java +++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java @@ -171,6 +171,12 @@ public void testTransformRoundTripCsvToJson() throws IOException, JAXBException, public void testTransformRoundTrip15RPGHandling() throws IOException, JAXBException, SchemaLoaderException { transformRoundTrip("1.5_RPG_Handling"); } + + @Test + public void testTransformRoundTrip15VariablesHandling() throws IOException, JAXBException, SchemaLoaderException { + transformRoundTrip("1.5_Variables_Handling"); + } + @Test public void testTransformRoundTripDecompression() throws IOException, JAXBException, SchemaLoaderException { transformRoundTrip("DecompressionCircularFlow"); diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/1.5_Variables_Handling.xml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/1.5_Variables_Handling.xml new file mode 100644 index 000000000..defcdf2f2 --- /dev/null +++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/1.5_Variables_Handling.xml @@ -0,0 +1,291 @@ + + + + +