Skip to content

Commit dcaae10

Browse files
committed
Make new main ProductRegistry when input updates
If the ProductRegistry of the source changes, we now create a new main ProductRegistry. The main Registry is then passed to the Principals which update their data structures.
1 parent e533960 commit dcaae10

File tree

6 files changed

+38
-17
lines changed

6 files changed

+38
-17
lines changed

FWCore/Framework/interface/Principal.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ namespace edm {
7474

7575
~Principal() override;
7676

77-
void adjustIndexesAfterProductRegistryAddition();
77+
//This should only be called when this Principal is not being actively used
78+
void possiblyUpdateAfterAddition(std::shared_ptr<ProductRegistry const>);
7879

7980
void fillPrincipal(DelayedReader* reader);
8081
void fillPrincipal(ProcessHistoryID const& hist, ProcessHistory const* phr, DelayedReader* reader);
@@ -213,6 +214,8 @@ namespace edm {
213214
}
214215

215216
private:
217+
void adjustIndexesAfterProductRegistryAddition();
218+
216219
//called by adjustIndexesAfterProductRegistryAddition only if an index actually changed
217220
virtual void changedIndexes_() {}
218221

FWCore/Framework/interface/PrincipalCache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ namespace edm {
5353

5454
void adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const>);
5555

56-
void adjustIndexesAfterProductRegistryAddition();
56+
void adjustIndexesAfterProductRegistryAddition(std::shared_ptr<ProductRegistry const>);
5757

5858
private:
5959
std::unique_ptr<ProcessBlockPrincipal> processBlockPrincipal_;

FWCore/Framework/src/EventProcessor.cc

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,21 +1022,29 @@ namespace edm {
10221022
FDEBUG(1) << " \treadFile\n";
10231023
SendSourceTerminationSignalIfException sentry(actReg_.get());
10241024

1025+
//CDJ: THIS SHOULD NOT BE APPLIED TO THE ACTIVE INTERVALS
10251026
if (streamRunActive_ > 0) {
1027+
//deals with data structures that allows merged Run products to be split on Lumi boundaries then
1028+
// in later processes reintegrated.
10261029
streamRunStatus_[0]->runPrincipal()->preReadFile();
1027-
streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
1028-
}
1029-
1030-
if (streamLumiActive_ > 0) {
1031-
streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
10321030
}
10331031

1032+
auto sizeBefore = input_->productRegistry().size();
10341033
fb_ = input_->readFile();
10351034
//incase the input's registry changed
1036-
const size_t size = preg_->size();
1037-
preg_->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
1038-
if (size < preg_->size()) {
1039-
principalCache_.adjustIndexesAfterProductRegistryAddition();
1035+
if (input_->productRegistry().size() != sizeBefore) {
1036+
auto temp = std::make_shared<edm::ProductRegistry>(*preg_);
1037+
temp->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
1038+
preg_ = std::move(temp);
1039+
//This handles are presently unused Run/Lumis
1040+
principalCache_.adjustIndexesAfterProductRegistryAddition(edm::get_underlying_safe(preg_));
1041+
if (streamLumiActive_ > 0) {
1042+
//Can update the active ones now, even before an `end` transition is called because no OutputModule
1043+
// supports storing ProductDescriptions for Run/LuminosityBlock products which were dropped. Since only
1044+
// dropped products can change the ProductRegistry, only changes in Event can cause that.
1045+
streamRunStatus_[0]->runPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_));
1046+
streamLumiStatus_[0]->lumiPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_));
1047+
}
10401048
}
10411049
principalCache_.adjustEventsToNewProductRegistry(preg());
10421050
if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
@@ -2022,6 +2030,7 @@ namespace edm {
20222030

20232031
std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
20242032
auto rp = principalCache_.getAvailableRunPrincipalPtr();
2033+
rp->possiblyUpdateAfterAddition(preg());
20252034
assert(rp);
20262035
rp->setAux(*input_->runAuxiliary());
20272036
{
@@ -2046,6 +2055,7 @@ namespace edm {
20462055

20472056
std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
20482057
auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
2058+
lbp->possiblyUpdateAfterAddition(preg());
20492059
assert(lbp);
20502060
lbp->setAux(*input_->luminosityBlockAuxiliary());
20512061
{

FWCore/Framework/src/Principal.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
#include <stdexcept>
3232
#include <typeinfo>
3333
#include <atomic>
34-
34+
//DEBUG
35+
#include <iostream>
3536
namespace edm {
3637

3738
static ProcessHistory const s_emptyProcessHistory;
@@ -149,6 +150,13 @@ namespace edm {
149150
return size;
150151
}
151152

153+
void Principal::possiblyUpdateAfterAddition(std::shared_ptr<ProductRegistry const> iProd) {
154+
if (iProd.get() != preg_.get()) {
155+
preg_ = iProd;
156+
adjustIndexesAfterProductRegistryAddition();
157+
}
158+
}
159+
152160
void Principal::addDroppedProduct(ProductDescription const& bd) {
153161
addProductOrThrow(std::make_unique<DroppedDataProductResolver>(std::make_shared<ProductDescription const>(bd)));
154162
}

FWCore/Framework/src/PrincipalCache.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,22 +43,22 @@ namespace edm {
4343
void PrincipalCache::adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const> reg) {
4444
for (auto& eventPrincipal : eventPrincipals_) {
4545
if (eventPrincipal) {
46-
eventPrincipal->adjustIndexesAfterProductRegistryAddition();
46+
eventPrincipal->possiblyUpdateAfterAddition(reg);
4747
}
4848
}
4949
}
5050

51-
void PrincipalCache::adjustIndexesAfterProductRegistryAddition() {
51+
void PrincipalCache::adjustIndexesAfterProductRegistryAddition(std::shared_ptr<ProductRegistry const> iReg) {
5252
//Need to temporarily hold all the runs to clear out the runHolder_
5353
std::vector<std::shared_ptr<RunPrincipal>> tempRunPrincipals;
5454
while (auto p = runHolder_.tryToGet()) {
55-
p->adjustIndexesAfterProductRegistryAddition();
55+
p->possiblyUpdateAfterAddition(iReg);
5656
tempRunPrincipals.emplace_back(std::move(p));
5757
}
5858
//Need to temporarily hold all the lumis to clear out the lumiHolder_
5959
std::vector<std::shared_ptr<LuminosityBlockPrincipal>> tempLumiPrincipals;
6060
while (auto p = lumiHolder_.tryToGet()) {
61-
p->adjustIndexesAfterProductRegistryAddition();
61+
p->possiblyUpdateAfterAddition(iReg);
6262
tempLumiPrincipals.emplace_back(std::move(p));
6363
}
6464
}

FWCore/TestProcessor/src/TestSourceProcessor.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ namespace edm::test {
229229
const size_t size = preg_->size();
230230
preg_->merge(source_->productRegistry(), fb_ ? fb_->fileName() : std::string());
231231
if (size < preg_->size()) {
232-
principalCache_.adjustIndexesAfterProductRegistryAddition();
232+
principalCache_.adjustIndexesAfterProductRegistryAddition(preg_);
233233
}
234234
principalCache_.adjustEventsToNewProductRegistry(preg_);
235235

0 commit comments

Comments
 (0)