Skip to content

Commit a8bf768

Browse files
committed
DPL: allow for duplicate devices if they have an appropriate label
1 parent 1a24064 commit a8bf768

6 files changed

Lines changed: 68 additions & 5 deletions

File tree

Framework/Core/include/Framework/CommonLabels.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ namespace o2::framework
2121
// When present on a DataProcessor, no DomainInfoHeader messages will be sent downstream.
2222
const extern DataProcessorLabel suppressDomainInfoLabel;
2323

24+
// Label to allow multiple DataProcessorSpecs with the same name in the topology.
25+
// When present, duplicate specs with matching inputs and outputs will be deduplicated
26+
// with a warning instead of causing a fatal error.
27+
const extern DataProcessorLabel allowDuplicatesLabel;
28+
2429
} // namespace o2::framework
2530

2631
#endif // O2_FRAMEWORK_COMMONLABELS_H

Framework/Core/src/CommonLabels.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ namespace o2::framework
1515
{
1616

1717
const DataProcessorLabel suppressDomainInfoLabel = {"suppress-domain-info"};
18+
const DataProcessorLabel allowDuplicatesLabel = {"allow-duplicates"};
1819

1920
} // namespace o2::framework

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
#include "WorkflowHelpers.h"
1212
#include "Framework/AnalysisSupportHelpers.h"
1313
#include "Framework/AlgorithmSpec.h"
14+
#include "Framework/CommonLabels.h"
1415
#include "Framework/ConfigParamSpec.h"
1516
#include "Framework/ConfigParamsHelper.h"
1617
#include "Framework/CommonDataProcessors.h"
1718
#include "Framework/ConfigContext.h"
19+
#include "Framework/DataProcessorSpecHelpers.h"
1820
#include "Framework/DeviceSpec.h"
1921
#include "Framework/DataSpecUtils.h"
2022
#include "Framework/DataSpecViews.h"
@@ -32,6 +34,7 @@
3234
#include "Headers/DataHeader.h"
3335
#include <algorithm>
3436
#include <list>
37+
#include <map>
3538
#include <set>
3639
#include <utility>
3740
#include <vector>
@@ -959,7 +962,7 @@ WorkflowParsingState WorkflowHelpers::verifyWorkflow(const o2::framework::Workfl
959962
if (workflow.empty()) {
960963
return WorkflowParsingState::Empty;
961964
}
962-
std::set<std::string> validNames;
965+
std::map<std::string, size_t> validNames;
963966
// std::vector<OutputSpec> availableOutputs;
964967
// std::vector<InputSpec> requiredInputs;
965968

@@ -971,17 +974,25 @@ WorkflowParsingState WorkflowHelpers::verifyWorkflow(const o2::framework::Workfl
971974

972975
std::ostringstream ss;
973976

974-
for (auto& spec : workflow) {
977+
for (size_t si = 0; si < workflow.size(); ++si) {
978+
auto& spec = workflow[si];
975979
if (spec.name.empty()) {
976980
throw std::runtime_error("Invalid DataProcessorSpec name");
977981
}
978982
if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
979983
throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
980984
}
981-
if (validNames.find(spec.name) != validNames.end()) {
982-
throw std::runtime_error("Name " + spec.name + " is used twice.");
985+
auto it = validNames.find(spec.name);
986+
if (it != validNames.end()) {
987+
auto& firstSpec = workflow[it->second];
988+
if (!DataProcessorSpecHelpers::hasLabel(firstSpec, allowDuplicatesLabel.value.c_str()) ||
989+
!DataProcessorSpecHelpers::hasLabel(spec, allowDuplicatesLabel.value.c_str())) {
990+
throw std::runtime_error("Name " + spec.name + " is used twice.");
991+
}
992+
LOG(warning) << "Duplicate DataProcessorSpec " << spec.name << " found with allow-duplicates label. Will be deduplicated.";
993+
continue;
983994
}
984-
validNames.insert(spec.name);
995+
validNames.emplace(spec.name, si);
985996
for (auto& option : spec.options) {
986997
if (option.defaultValue.type() != VariantType::Empty &&
987998
option.type != option.defaultValue.type()) {
@@ -1005,6 +1016,22 @@ WorkflowParsingState WorkflowHelpers::verifyWorkflow(const o2::framework::Workfl
10051016
return WorkflowParsingState::Valid;
10061017
}
10071018

1019+
void WorkflowHelpers::removeDuplicates(WorkflowSpec& workflow)
1020+
{
1021+
std::set<std::string> seen;
1022+
auto it = std::remove_if(workflow.begin(), workflow.end(), [&seen](DataProcessorSpec const& spec) {
1023+
if (seen.find(spec.name) == seen.end()) {
1024+
seen.insert(spec.name);
1025+
return false;
1026+
}
1027+
if (!DataProcessorSpecHelpers::hasLabel(spec, allowDuplicatesLabel.value.c_str())) {
1028+
return false;
1029+
}
1030+
return true;
1031+
});
1032+
workflow.erase(it, workflow.end());
1033+
}
1034+
10081035
using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
10091036
struct DataMatcherId {
10101037
size_t workflowId;

Framework/Core/src/WorkflowHelpers.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ struct WorkflowHelpers {
175175
// it contains no empty labels.
176176
[[nodiscard]] static WorkflowParsingState verifyWorkflow(const WorkflowSpec& workflow);
177177

178+
// Remove duplicate DataProcessorSpecs that have the "allow-duplicates" label.
179+
// Duplicate specs must have the same inputs and outputs, otherwise an exception is thrown.
180+
static void removeDuplicates(WorkflowSpec& workflow);
181+
178182
// Depending on the workflow and the dangling inputs inside it, inject "fake"
179183
// devices to mark the fact we might need some extra action to make sure
180184
// dangling inputs are satisfied.

Framework/Core/src/runDataProcessing.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,6 +1663,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
16631663
/// extract and apply process switches
16641664
/// prune device inputs
16651665
auto altered_workflow = workflow;
1666+
WorkflowHelpers::removeDuplicates(altered_workflow);
16661667

16671668
auto confNameFromParam = [](std::string const& paramName) {
16681669
std::regex name_regex(R"(^control:([\w-]+)\/(\w+))");

Framework/Core/test/test_WorkflowHelpers.cxx

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// or submit itself to any jurisdiction.
1111
#include "Mocking.h"
1212
#include "test_HelperMacros.h"
13+
#include "Framework/CommonLabels.h"
1314
#include "Framework/ConfigContext.h"
1415
#include "Framework/WorkflowSpec.h"
1516
#include "Framework/DataSpecUtils.h"
@@ -60,6 +61,30 @@ TEST_CASE("TestVerifyWorkflow")
6061
checkOk(WorkflowSpec{{"A", {InputSpec{"x", "TST", "A"}}}});
6162
// Check for duplicate DataProcessorSpecs names
6263
checkNotOk(WorkflowSpec{{"A"}, {"A"}});
64+
// Duplicates with allow-duplicates label should not throw
65+
checkOk(WorkflowSpec{
66+
{.name = "A", .labels = {allowDuplicatesLabel}},
67+
{.name = "A", .labels = {allowDuplicatesLabel}},
68+
});
69+
// Duplicates without the label should still throw
70+
checkNotOk(WorkflowSpec{
71+
{.name = "A"},
72+
{.name = "A", .labels = {allowDuplicatesLabel}},
73+
});
74+
}
75+
76+
TEST_CASE("TestRemoveDuplicates")
77+
{
78+
// removeDuplicates should keep only the first spec with a given name
79+
WorkflowSpec workflow{
80+
{.name = "A", .labels = {allowDuplicatesLabel}},
81+
{.name = "B"},
82+
{.name = "A", .labels = {allowDuplicatesLabel}},
83+
};
84+
WorkflowHelpers::removeDuplicates(workflow);
85+
REQUIRE(workflow.size() == 2);
86+
REQUIRE(workflow[0].name == "A");
87+
REQUIRE(workflow[1].name == "B");
6388
}
6489

6590
TEST_CASE("TestWorkflowHelpers")

0 commit comments

Comments
 (0)