Skip to content
This repository has been archived by the owner on May 14, 2021. It is now read-only.

MINIFI-433 - PG Variable Registry support in templates #115

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ protected static void addProcessGroup(Document doc, Element element, ProcessGrou
addTextElement(element, "name", processGroupSchema.getName());
addPosition(element);
addTextElement(element, "comment", processGroupSchema.getComment());
addVariables(doc, element, processGroupSchema.getVariables());

for (ProcessorSchema processorConfig : processGroupSchema.getProcessors()) {
addProcessor(element, processorConfig);
Expand Down Expand Up @@ -417,13 +418,30 @@ protected static void addProcessGroup(Document doc, Element element, ProcessGrou
for (ControllerServiceSchema controllerServiceSchema : processGroupSchema.getControllerServices()) {
addControllerService(element, controllerServiceSchema);
}

} catch (ConfigurationChangeException e) {
throw e;
} catch (Exception e) {
throw new ConfigurationChangeException("Failed to parse the config YAML while trying to creating the root Process Group", e);
}
}

protected static void addVariables(Document doc, Element parentElement, Map<String, String> variables) {
if (variables == null) {
return;
}

final Element element = doc.createElement("variables");
parentElement.appendChild(element);

for(String key : variables.keySet()) {
Element entryElement = doc.createElement("entry");
addTextElement(entryElement, "key", key);
addTextElement(entryElement, "value", variables.get(key));
element.appendChild(entryElement);
}
}

protected static void addPort(Document doc, Element parentElement, PortSchema portSchema, String tag) {
Element element = doc.createElement(tag);
parentElement.appendChild(element);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public void testProcessGroupsTransform() throws Exception {
testConfigFileTransform("config-process-groups.yml");
}

@Test
public void testProcessGroupsVariablesTransform() throws Exception {
testConfigFileTransform("config-pg-variables.yml");
}

@Test
public void testFunnelsTransform() throws Exception {
testConfigFileTransform("stress-test-framework-funnel.yml");
Expand Down Expand Up @@ -460,6 +465,8 @@ private void testProcessGroup(Element element, ProcessGroupSchema processGroupSc
assertEquals(processGroupSchema.getName(), getText(element, "name"));
assertEquals(nullToEmpty(processGroupSchema.getComment()), nullToEmpty(getText(element, "comment")));

testVariables(element, processGroupSchema.getVariables());

checkOrderOfChildren(element, PG_ELEMENT_ORDER_MAP);

NodeList processorElements = (NodeList) xPathFactory.newXPath().evaluate("processor", element, XPathConstants.NODESET);
Expand Down Expand Up @@ -510,6 +517,16 @@ private void testProcessGroup(Element element, ProcessGroupSchema processGroupSc
}
}

private void testVariables(Element element, Map<String, String> expected) throws XPathExpressionException {
NodeList variablesElement = (NodeList) xPathFactory.newXPath().evaluate("variables/entry", element, XPathConstants.NODESET);
Map<String, String> variables = new HashMap<>();
for (int i = 0; i < variablesElement.getLength(); i++) {
Element item = (Element) variablesElement.item(i);
variables.put(getText(item, "key"), getText(item, "value"));
}
assertEquals(expected.entrySet().stream().collect(Collectors.toMap(Map.Entry<String, String>::getKey, e -> nullToEmpty(e.getValue()))), variables);
}

private void testProcessor(Element element, ProcessorSchema processorSchema) throws XPathExpressionException {
assertEquals(processorSchema.getId(), getText(element, "id"));
assertEquals(processorSchema.getName(), getText(element, "name"));
Expand Down
283 changes: 283 additions & 0 deletions minifi-bootstrap/src/test/resources/config-pg-variables.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the \"License\"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an \"AS IS\" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

MiNiFi Config Version: 3
Flow Controller:
name: ProcessGroupsAndRemoteProcessGroups
comment: ''
Core Properties:
flow controller graceful shutdown period: 10 sec
flow service write delay interval: 500 ms
administrative yield duration: 30 sec
bored yield duration: 10 millis
max concurrent threads: 1
FlowFile Repository:
partitions: 256
checkpoint interval: 2 mins
always sync: false
Swap:
threshold: 20000
in period: 5 sec
in threads: 1
out period: 5 sec
out threads: 4
Content Repository:
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false
Provenance Repository:
provenance rollover time: 1 min
Component Status Repository:
buffer size: 1440
snapshot frequency: 1 min
Security Properties:
keystore: ''
keystore type: ''
keystore password: ''
key password: ''
truststore: ''
truststore type: ''
truststore password: ''
ssl protocol: ''
Sensitive Props:
key:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors:
- id: 207748d1-0158-1000-0000-000000000000
name: GenerateFlowFile
class: org.apache.nifi.processors.standard.GenerateFlowFile
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 0 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Batch Size: '1'
Data Format: Binary
File Size: 1 b
Unique FlowFiles: 'false'
- id: 2079e8bd-0158-1000-0000-000000000000
name: LogAttribute
class: org.apache.nifi.processors.standard.LogAttribute
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 0 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list:
- success
Properties:
Attributes to Ignore:
Attributes to Log:
Log Level: info
Log Payload: 'false'
Log prefix:
- id: 2077ab1e-0158-1000-0000-000000000000
name: UpdateAttribute
class: org.apache.nifi.processors.attributes.UpdateAttribute
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 0 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Delete Attributes Expression:
top: top
Process Groups:
- id: 207888b1-0158-1000-0000-000000000000
name: middle
Processors:
- id: 2078f34e-0158-1000-0000-000000000000
name: UpdateAttribute
class: org.apache.nifi.processors.attributes.UpdateAttribute
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 0 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Delete Attributes Expression:
middle: middle
Process Groups:
- id: 20794cd4-0158-1000-0000-000000000000
name: bottom
Processors:
- id: 207a89ba-0158-1000-0000-000000000000
name: UpdateAttribute
class: org.apache.nifi.processors.attributes.UpdateAttribute
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 0 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Delete Attributes Expression:
bottom: bottom
Process Groups: []
Input Ports:
- id: 207a5f50-0158-1000-0000-000000000000
name: input
Output Ports:
- id: 207a6d92-0158-1000-0000-000000000000
name: output
Connections:
- id: 21a6abb9-0158-1000-0000-000000000000
name: UpdateAttribute/success/21a39aba-0158-1000-a1a0-1b55bcddcd72
source id: 207a89ba-0158-1000-0000-000000000000
source relationship names:
- success
destination id: 21a39aba-0158-1000-a1a0-1b55bcddcd72
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
- id: 207ad5e9-0158-1000-0000-000000000000
name: UpdateAttribute/success/null
source id: 207a89ba-0158-1000-0000-000000000000
source relationship names:
- success
destination id: 207a6d92-0158-1000-0000-000000000000
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
- id: 207aca0d-0158-1000-0000-000000000000
name: null//UpdateAttribute
source id: 207a5f50-0158-1000-0000-000000000000
source relationship names: []
destination id: 207a89ba-0158-1000-0000-000000000000
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups:
- name: http://localhost:9091/nifi
url: http://localhost:9091/nifi
comment: ''
timeout: 30 sec
yield period: 10 sec
Input Ports:
- id: 21a39aba-0158-1000-a1a0-1b55bcddcd72
name: input2
comment: ''
max concurrent tasks: 1
use compression: false
Variables:
key: value
Input Ports:
- id: 2078c936-0158-1000-0000-000000000000
name: input
Output Ports:
- id: 2079b327-0158-1000-0000-000000000000
name: output
Connections:
- id: 21a5b1f1-0158-1000-0000-000000000000
name: UpdateAttribute/success/21a2fb5e-0158-1000-3b5e-5a7d3aaee01b
source id: 2078f34e-0158-1000-0000-000000000000
source relationship names:
- success
destination id: 21a2fb5e-0158-1000-3b5e-5a7d3aaee01b
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
- id: 207b0eb1-0158-1000-0000-000000000000
name: UpdateAttribute/success/null
source id: 2078f34e-0158-1000-0000-000000000000
source relationship names:
- success
destination id: 207a5f50-0158-1000-0000-000000000000
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
- id: 20792ec2-0158-1000-0000-000000000000
name: null//UpdateAttribute
source id: 2078c936-0158-1000-0000-000000000000
source relationship names: []
destination id: 2078f34e-0158-1000-0000-000000000000
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
- id: 207b1880-0158-1000-0000-000000000000
name: null//null
source id: 207a6d92-0158-1000-0000-000000000000
source relationship names: []
destination id: 2079b327-0158-1000-0000-000000000000
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups:
- name: http://localhost:9090/nifi
url: http://localhost:9090/nifi
comment: ''
timeout: 30 sec
yield period: 10 sec
Input Ports:
- id: 21a2fb5e-0158-1000-3b5e-5a7d3aaee01b
name: input
comment: ''
max concurrent tasks: 1
use compression: false
Variables:
variable: foo
another: one
Input Ports: []
Output Ports: []
Connections:
- id: 2077bf8f-0158-1000-0000-000000000000
name: GenerateFlowFile/success/UpdateAttribute
source id: 207748d1-0158-1000-0000-000000000000
source relationship names:
- success
destination id: 2077ab1e-0158-1000-0000-000000000000
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
- id: 2079cf6f-0158-1000-0000-000000000000
name: UpdateAttribute/success/null
source id: 2077ab1e-0158-1000-0000-000000000000
source relationship names:
- success
destination id: 2078c936-0158-1000-0000-000000000000
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
- id: 2079faa0-0158-1000-0000-000000000000
name: null//LogAttribute
source id: 2079b327-0158-1000-0000-000000000000
source relationship names: []
destination id: 2079e8bd-0158-1000-0000-000000000000
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups: []
Variables:
root: variable
Loading