Skip to content

Commit af6456c

Browse files
committed
Make new main ProductRegistry when input updates
If the ProductRegistry of the source changes, we now create a new main ProductRegistry. The main Registry is then passed to the Principals which update their data structures.
1 parent e533960 commit af6456c

File tree

6 files changed

+35
-17
lines changed

6 files changed

+35
-17
lines changed

FWCore/Framework/interface/Principal.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ namespace edm {
7474

7575
~Principal() override;
7676

77-
void adjustIndexesAfterProductRegistryAddition();
77+
//This should only be called when this Principal is not being actively used
78+
void possiblyUpdateAfterAddition(std::shared_ptr<ProductRegistry const>);
7879

7980
void fillPrincipal(DelayedReader* reader);
8081
void fillPrincipal(ProcessHistoryID const& hist, ProcessHistory const* phr, DelayedReader* reader);
@@ -213,6 +214,8 @@ namespace edm {
213214
}
214215

215216
private:
217+
void adjustIndexesAfterProductRegistryAddition();
218+
216219
//called by adjustIndexesAfterProductRegistryAddition only if an index actually changed
217220
virtual void changedIndexes_() {}
218221

FWCore/Framework/interface/PrincipalCache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ namespace edm {
5353

5454
void adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const>);
5555

56-
void adjustIndexesAfterProductRegistryAddition();
56+
void adjustIndexesAfterProductRegistryAddition(std::shared_ptr<ProductRegistry const>);
5757

5858
private:
5959
std::unique_ptr<ProcessBlockPrincipal> processBlockPrincipal_;

FWCore/Framework/src/EventProcessor.cc

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,20 +1023,27 @@ namespace edm {
10231023
SendSourceTerminationSignalIfException sentry(actReg_.get());
10241024

10251025
if (streamRunActive_ > 0) {
1026+
//deals with data structures that allows merged Run products to be split on Lumi boundaries then
1027+
// in later processes reintegrated.
10261028
streamRunStatus_[0]->runPrincipal()->preReadFile();
1027-
streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
1028-
}
1029-
1030-
if (streamLumiActive_ > 0) {
1031-
streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
10321029
}
10331030

1031+
auto sizeBefore = input_->productRegistry().size();
10341032
fb_ = input_->readFile();
10351033
//incase the input's registry changed
1036-
const size_t size = preg_->size();
1037-
preg_->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
1038-
if (size < preg_->size()) {
1039-
principalCache_.adjustIndexesAfterProductRegistryAddition();
1034+
if (input_->productRegistry().size() != sizeBefore) {
1035+
auto temp = std::make_shared<edm::ProductRegistry>(*preg_);
1036+
temp->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
1037+
preg_ = std::move(temp);
1038+
//This handles are presently unused Run/Lumis
1039+
principalCache_.adjustIndexesAfterProductRegistryAddition(edm::get_underlying_safe(preg_));
1040+
if (streamLumiActive_ > 0) {
1041+
//Can update the active ones now, even before an `end` transition is called because no OutputModule
1042+
// supports storing ProductDescriptions for Run/LuminosityBlock products which were dropped. Since only
1043+
// dropped products can change the ProductRegistry, only changes in Event can cause that.
1044+
streamRunStatus_[0]->runPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_));
1045+
streamLumiStatus_[0]->lumiPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_));
1046+
}
10401047
}
10411048
principalCache_.adjustEventsToNewProductRegistry(preg());
10421049
if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
@@ -2022,6 +2029,7 @@ namespace edm {
20222029

20232030
std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
20242031
auto rp = principalCache_.getAvailableRunPrincipalPtr();
2032+
rp->possiblyUpdateAfterAddition(preg());
20252033
assert(rp);
20262034
rp->setAux(*input_->runAuxiliary());
20272035
{
@@ -2046,6 +2054,7 @@ namespace edm {
20462054

20472055
std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
20482056
auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
2057+
lbp->possiblyUpdateAfterAddition(preg());
20492058
assert(lbp);
20502059
lbp->setAux(*input_->luminosityBlockAuxiliary());
20512060
{

FWCore/Framework/src/Principal.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
#include <stdexcept>
3232
#include <typeinfo>
3333
#include <atomic>
34-
3534
namespace edm {
3635

3736
static ProcessHistory const s_emptyProcessHistory;
@@ -149,6 +148,13 @@ namespace edm {
149148
return size;
150149
}
151150

151+
void Principal::possiblyUpdateAfterAddition(std::shared_ptr<ProductRegistry const> iProd) {
152+
if (iProd.get() != preg_.get()) {
153+
preg_ = iProd;
154+
adjustIndexesAfterProductRegistryAddition();
155+
}
156+
}
157+
152158
void Principal::addDroppedProduct(ProductDescription const& bd) {
153159
addProductOrThrow(std::make_unique<DroppedDataProductResolver>(std::make_shared<ProductDescription const>(bd)));
154160
}

FWCore/Framework/src/PrincipalCache.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,22 +43,22 @@ namespace edm {
4343
void PrincipalCache::adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const> reg) {
4444
for (auto& eventPrincipal : eventPrincipals_) {
4545
if (eventPrincipal) {
46-
eventPrincipal->adjustIndexesAfterProductRegistryAddition();
46+
eventPrincipal->possiblyUpdateAfterAddition(reg);
4747
}
4848
}
4949
}
5050

51-
void PrincipalCache::adjustIndexesAfterProductRegistryAddition() {
51+
void PrincipalCache::adjustIndexesAfterProductRegistryAddition(std::shared_ptr<ProductRegistry const> iReg) {
5252
//Need to temporarily hold all the runs to clear out the runHolder_
5353
std::vector<std::shared_ptr<RunPrincipal>> tempRunPrincipals;
5454
while (auto p = runHolder_.tryToGet()) {
55-
p->adjustIndexesAfterProductRegistryAddition();
55+
p->possiblyUpdateAfterAddition(iReg);
5656
tempRunPrincipals.emplace_back(std::move(p));
5757
}
5858
//Need to temporarily hold all the lumis to clear out the lumiHolder_
5959
std::vector<std::shared_ptr<LuminosityBlockPrincipal>> tempLumiPrincipals;
6060
while (auto p = lumiHolder_.tryToGet()) {
61-
p->adjustIndexesAfterProductRegistryAddition();
61+
p->possiblyUpdateAfterAddition(iReg);
6262
tempLumiPrincipals.emplace_back(std::move(p));
6363
}
6464
}

FWCore/TestProcessor/src/TestSourceProcessor.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ namespace edm::test {
229229
const size_t size = preg_->size();
230230
preg_->merge(source_->productRegistry(), fb_ ? fb_->fileName() : std::string());
231231
if (size < preg_->size()) {
232-
principalCache_.adjustIndexesAfterProductRegistryAddition();
232+
principalCache_.adjustIndexesAfterProductRegistryAddition(preg_);
233233
}
234234
principalCache_.adjustEventsToNewProductRegistry(preg_);
235235

0 commit comments

Comments
 (0)