Skip to content

Commit

Permalink
Add task to find overlapping events between custom and central NANOAOD (
Browse files Browse the repository at this point in the history
#44)

* Add overlap checking task.

* add a simple hash function that calculates a unique number using 'event', 'luminisityBlock' and 'run' information of a nano AOD

* linting

* Completed the task to find overlapping events within our custom Nano AOD and the centralized created NANOAOD.

Due to different compressions between our and the central NANOAODs we need to find out which overlap both have and filter them out.
This task finds the events that overlap and savin the unique event identifier as tuple in a json.
This json also contains the relative number of overlap as information.

* make it more clear why the value is padded to this specific value

* rearrange order of fields to the one used in the hash

* add an assertion to check if unique identifier column exceed a specfic value, which is given by data (and may exceed in far future

* moved output of unique ovelap identifier from json to parquet file, use this file to create filter mask in sync csv task

* linting

* moved type cast to hash function, refactor names

* move imports into hash function

* linting and add maybe import for numpy

* overlapping events should not be chunk depending

* add new check so that people will not create an int64 overflow

* previous overlapping find compared chunkwise, but chunks are not always of same size. Changed this to a global comparison.

* Changed default value of file path to empty string, since None is resolved by law to `/None`

* Added case padding handling for array of dim < 2

Some array variables, e.g. like MET.covXX is of type numpy and not ListNumpy, meaning slicing is not possible.

* added more variables for the sync

* Apply suggestions from code review

---------

Co-authored-by: Marcel R. <[email protected]>
Co-authored-by: Marcel Rieger <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2024
1 parent f4a8a4e commit 5ee7b03
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 3 deletions.
179 changes: 176 additions & 3 deletions hbt/tasks/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,108 @@

from __future__ import annotations

import luigi
import law

from columnflow.tasks.framework.base import Requirements

from columnflow.tasks.framework.base import Requirements, DatasetTask
from columnflow.tasks.framework.mixins import (
ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin,
)
from columnflow.tasks.framework.remote import RemoteWorkflow
from columnflow.tasks.external import GetDatasetLFNs
from columnflow.tasks.reduction import ReducedEventsUser
from columnflow.tasks.production import ProduceColumns
from columnflow.tasks.ml import MLEvaluation
from columnflow.util import dev_sandbox

from hbt.tasks.base import HBTTask
from hbt.util import hash_events


class CheckExternalLFNOverlap(
HBTTask,
DatasetTask,
):

lfn = luigi.Parameter(
description="local path to an external LFN to check for overlap with the dataset",
# fetched via nanogen's FetchLFN
default="/pnfs/desy.de/cms/tier2/store/user/mrieger/nanogen_store/FetchLFN/store/mc/Run3Summer22NanoAODv12/GluGlutoHHto2B2Tau_kl-1p00_kt-1p00_c2-0p00_LHEweights_TuneCP5_13p6TeV_powheg-pythia8/NANOAODSIM/130X_mcRun3_2022_realistic_v5-v2/50000/992697da-4a10-4435-b63a-413f6d33517e.root", # noqa
)

# no versioning required
version = None

# default sandbox, might be overwritten by calibrator function
sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox"))

# upstream requirements
reqs = Requirements(
GetDatasetLFNs=GetDatasetLFNs,
)

def requires(self):
return self.reqs.GetDatasetLFNs.req(self)

def output(self):
return {
"overlap": self.target("lfn_overlap.json"),
"unique_identifiers": self.target("unique_identifiers.parquet"),
}

def run(self):
import awkward as ak
import numpy as np

# load the index columns of the reference lfn
output = self.output()

with self.publish_step("loading reference ids"):
ref_hashes = hash_events(self.load_nano_index(law.LocalFileTarget(self.lfn)))

# loop over all lfns in the dataset
n_files = self.dataset_inst.n_files
lfns_task = self.requires()
relative_overlap = {}
overlapping_identifier = []

for i, lfn_target in lfns_task.iter_nano_files(self, lfn_indices=list(range(n_files))):
with self.publish_step(f"loading ids of file {i}"):
file_arr = self.load_nano_index(lfn_target)
file_hashes = hash_events(file_arr)
# find unique hashes in the reference and the file
# faster than np.
overlapping_mask = np.isin(
file_hashes,
ref_hashes,
assume_unique=True,
kind="sort",
)

num_overlapping = np.sum(overlapping_mask)
if num_overlapping:
# calculate the relative overlap
relative_overlap[str(i)] = np.sum(num_overlapping) / len(file_hashes)

# apply mask and store the overlapping identifiers
overlapping_identifier.extend(file_arr[overlapping_mask])

output["overlap"].dump(
relative_overlap,
formatter="json",
)

output["unique_identifiers"].dump(
ak.Array(overlapping_identifier),
formatter="awkward",
)

@classmethod
def load_nano_index(cls, lfn_target: law.FileSystemFileTarget) -> set[int]:
fields = ["event", "luminosityBlock", "run"]
arr = lfn_target.load(formatter="uproot")["Events"].arrays(fields)
return arr


class CreateSyncFiles(
Expand All @@ -31,6 +120,11 @@ class CreateSyncFiles(
law.LocalWorkflow,
RemoteWorkflow,
):
filter_file = luigi.Parameter(
description="local path to a file containing event unique identifier to filter them out",
default="",
)

sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox"))

# upstream requirements
Expand Down Expand Up @@ -89,6 +183,7 @@ def output(self):
@law.decorator.localize
def run(self):
import awkward as ak
import numpy as np
from columnflow.columnar_util import update_ak_array, EMPTY_FLOAT, set_ak_column

# prepare inputs and outputs
Expand All @@ -115,7 +210,8 @@ def pad_nested(arr, n, *, axis=1):

# helper to pad and select the last element on the first inner axis
def select(arr, idx):
return replace_empty_float(pad_nested(arr, idx + 1, axis=1)[:, idx])
padded = pad_nested(arr, idx + 1, axis=-1)
return replace_empty_float(padded if arr.ndim == 1 else padded[:, idx])

# helper to select leptons
def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> ak.Array:
Expand All @@ -131,18 +227,39 @@ def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> a
return set_ak_column(events, "Lepton", ak.concatenate(leptons, axis=1))

# event chunk loop

# optional filter to get only the events that overlap with given external LFN
if self.filter_file:
with self.publish_step("loading reference ids"):
events_to_filter = law.LocalFileTarget(self.filter_file).load(formatter="awkward")
filter_events = hash_events(events_to_filter)

for (events, *columns), pos in self.iter_chunked_io(
files,
source_type=len(files) * ["awkward_parquet"],
pool_size=1,
):

# optional check for overlapping inputs
if self.check_overlapping_inputs:
self.raise_if_overlapping([events] + list(columns))

# add additional columns
events = update_ak_array(events, *columns)

# apply mask if optional filter is given
# calculate mask by using 1D hash values
if self.filter_file:
mask = np.isin(
hash_events(events),
filter_events,
assume_unique=True,
kind="sort",
)

# apply mask
events = events[mask]

# insert leptons
events = select_leptons(events, {"rawDeepTau2018v2p5VSjet": empty_float})

Expand Down Expand Up @@ -173,7 +290,63 @@ def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> a
"lep2_eta": select(events.Lepton.eta, 1),
"lep2_charge": select(events.Lepton.charge, 1),
"lep2_deeptauvsjet": select(events.Lepton.rawDeepTau2018v2p5VSjet, 1),
# TODO: add additional variables
"electron1_charge": select(events.Electron.charge, 0),
"electron1_eta": select(events.Electron.eta, 0),
"electron1_mass": select(events.Electron.mass, 0),
"electron1_phi": select(events.Electron.phi, 0),
"electron1_pt": select(events.Electron.pt, 0),
"electron2_charge": select(events.Electron.charge, 1),
"electron2_eta": select(events.Electron.eta, 1),
"electron2_mass": select(events.Electron.mass, 1),
"electron2_phi": select(events.Electron.phi, 1),
"electron2_pt": select(events.Electron.pt, 1),

"muon1_charge": select(events.Muon.charge, 0),
"muon1_eta": select(events.Muon.eta, 0),
"muon1_mass": select(events.Muon.mass, 0),
"muon1_phi": select(events.Muon.phi, 0),
"muon1_pt": select(events.Muon.pt, 0),
"muon2_charge": select(events.Muon.charge, 1),
"muon2_eta": select(events.Muon.eta, 1),
"muon2_mass": select(events.Muon.mass, 1),
"muon2_phi": select(events.Muon.phi, 1),
"muon2_pt": select(events.Muon.pt, 1),

"tau1_charge": select(events.Tau.charge, 0),
"tau1_eta": select(events.Tau.eta, 0),
"tau1_mass": select(events.Tau.mass, 0),
"tau1_phi": select(events.Tau.phi, 0),
"tau1_pt": select(events.Tau.pt, 0),
"tau2_charge": select(events.Tau.charge, 1),
"tau2_eta": select(events.Tau.eta, 1),
"tau2_mass": select(events.Tau.mass, 1),
"tau2_phi": select(events.Tau.phi, 1),
"tau2_pt": select(events.Tau.pt, 1),

"met1_covXX": select(events.MET.covXX, 0),
"met1_covXY": select(events.MET.covXY, 0),
"met1_covYY": select(events.MET.covYY, 0),
"met1_phi": select(events.MET.phi, 0),
"met1_pt": select(events.MET.pt, 0),
"met1_significance": select(events.MET.significance, 0),

"fatjet1_eta": select(events.FatJet.eta, 0),
"fatjet1_mass": select(events.FatJet.mass, 0),
"fatjet1_phi": select(events.FatJet.phi, 0),
"fatjet1_pt": select(events.FatJet.pt, 0),
"fatjet1_tau1": select(events.FatJet.tau1, 0),
"fatjet1_tau2": select(events.FatJet.tau2, 0),
"fatjet1_tau3": select(events.FatJet.tau3, 0),
"fatjet1_tau4": select(events.FatJet.tau4, 0),

"fatjet2_eta": select(events.FatJet.eta, 1),
"fatjet2_mass": select(events.FatJet.mass, 1),
"fatjet2_phi": select(events.FatJet.phi, 1),
"fatjet2_pt": select(events.FatJet.pt, 1),
"fatjet2_tau1": select(events.FatJet.tau1, 1),
"fatjet2_tau2": select(events.FatJet.tau2, 1),
"fatjet2_tau3": select(events.FatJet.tau3, 1),
"fatjet2_tau4": select(events.FatJet.tau4, 1),
})

# save as csv in output, append if necessary
Expand Down
35 changes: 35 additions & 0 deletions hbt/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

from columnflow.types import Any
from columnflow.columnar_util import ArrayFunction, deferred_column
from columnflow.util import maybe_import

np = maybe_import("numpy")


@deferred_column
Expand Down Expand Up @@ -57,3 +60,35 @@ def IF_DATASET_IS_DY(
return self.get()

return self.get() if func.dataset_inst.has_tag("is_dy") else None


def hash_events(arr: np.ndarray) -> np.ndarray:
"""
Helper function to create a hash value from the event, run and luminosityBlock columns.
The values are padded to specific lengths and concatenated to a single integer.
"""
import awkward as ak

def assert_value(arr: np.ndarray, field: str, max_value: int) -> None:
"""
Helper function to check if a column does not exceed a maximum value.
"""
digits = len(str(arr[field].to_numpy().max()))
assert digits <= max_value, f"{field} digit count is {digits} and exceed max value {max_value}"

max_digits_run = 6
max_digits_luminosityBlock = 5
max_digits_event = 8
assert_value(arr, "run", max_digits_run)
assert_value(arr, "luminosityBlock", max_digits_luminosityBlock)
assert_value(arr, "event", max_digits_event)

max_digits_hash = max_digits_event + max_digits_luminosityBlock + max_digits_run
assert max_digits_hash <= 19, "sum of digits exceeds int64"

# upcast to int64 to avoid overflow
return (
ak.values_astype(arr.event, np.int64) * 10**(max_digits_luminosityBlock + max_digits_run) +
ak.values_astype(arr.luminosityBlock, np.int64) * 10**max_digits_run +
ak.values_astype(arr.run, np.int64)
)

0 comments on commit 5ee7b03

Please sign in to comment.