From 6791fcd448444d4bf8c71f17793ed928fee07558 Mon Sep 17 00:00:00 2001 From: Anas <103462379+haddadanas@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:41:58 +0200 Subject: [PATCH 01/25] FIX: num --> sum The num here will always return an array of >= 2 since we require >=2 Taus. I think sum was intended here --- hbt/selection/lepton.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbt/selection/lepton.py b/hbt/selection/lepton.py index 8ba64679..e1c567df 100644 --- a/hbt/selection/lepton.py +++ b/hbt/selection/lepton.py @@ -500,7 +500,7 @@ def lepton_selection( # special case for cross tau vbf trigger: # to avoid overlap, with non-vbf triggers, only one tau is allowed to have pt > 40 if trigger.has_tag("cross_tau_tau_vbf"): - is_tautau = is_tautau & (ak.num(events.Tau[tau_indices].pt > 40, axis=1) <= 1) + is_tautau = is_tautau & (ak.sum(events.Tau[tau_indices].pt > 40, axis=1) <= 1) is_iso = ak.sum(tau_iso_mask, axis=1) >= 2 # tau_indices are sorted by highest isolation as cond. 1 and highest pt as cond. 2, so From 10069a7a4e7eade29f9b651e837caa38c23d63ec Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Thu, 24 Oct 2024 13:17:31 +0200 Subject: [PATCH 02/25] Add the flat-s hist hook as option for histograms Within our histogram pipeline no rebinning is currently possible. This kind of feature is useful for inference models in combine. implements a hist-hook called "flat-s" that takes a finely binned histogram and rebins the histogram for given constraints. The hist-hook is split in 2 parts: - finding edges that full-fill the constraints - apply edges on given "Hist" histograms --- hbt/config/configs_hbt.py | 4 + hbt/config/hist_hooks.py | 272 ++++++++++++++++++++++++++++++++++++++ hbt/config/variables.py | 7 + 3 files changed, 283 insertions(+) diff --git a/hbt/config/configs_hbt.py b/hbt/config/configs_hbt.py index 20aa4e15..785fc414 100644 --- a/hbt/config/configs_hbt.py +++ b/hbt/config/configs_hbt.py @@ -147,6 +147,10 @@ def if_era( if process_name.startswith(("graviton_hh_", "radion_hh_")): proc.add_tag("signal") proc.add_tag("resonant_signal") + if process_name.startswith("tt"): + proc.add_tag("is_ttbar") + if process_name.startswith("dy"): + proc.add_tag("is_dy") # add the process cfg.add_process(proc) diff --git a/hbt/config/hist_hooks.py b/hbt/config/hist_hooks.py index dad43d65..cc2d07b2 100644 --- a/hbt/config/hist_hooks.py +++ b/hbt/config/hist_hooks.py @@ -187,6 +187,278 @@ def qcd_estimation(task, hists): return hists + def flat_s(task, hists: dict[hist.Histogram]) -> dict[hist.Histogram]: + """Rebinnig of the histograms in *hists* to archieve a flat-signal distribution. + Args: + + task (TODO): task instance that contains the process informations + hists (Dict[hist.Histogram]): A dictionary of histograms using Process instances as keys + + Returns: + Dict[hist.Histogram]: A dictionary of histograms using Process instances as keys + """ + def find_edges(signal_histogram, background_histograms, variable, n_bins=10) -> tuple[np.ndarray, np.ndarray]: + """ + Determine new bin edges that result in a flat signal distribution. + The edges are determined by the signal distribution, while the background distribution + is used to ensure that the background yield in each bin is sufficient. + """ + def get_integral(cumulative_weights, stop, offset=0): + """ + Helper to calculate the integral of *cumulative_weights* between the *offset* (included) + and the *stop* index (not included) + """ + return cumulative_weights[stop - 1] - (0 if offset == 0 else cumulative_weights[offset - 1]) + + def prepare_background(histogram: hist.Histogram) -> tuple[np.ndarray, np.ndarray, np.ndarray]: # noqa + """ + Helper to extract information from background histograms. + + Returns: + tuple[np.ndarray]: A tuple containing the array that describe bin yield, + the number of equivalent bins and the cumulative bin yield. + """ + bin_yield = histogram.counts() + # y^2 / sigma^2, where y is the yield, sigma is the uncertainty + # these are the number of events with weight 1 and same statistical fluctuation + number_of_equivalent_bins = bin_yield**2 / histogram.variances() + bin_yield = np.flip(bin_yield, axis=-1) + cumulative_bin_yield = np.cumsum(bin_yield, axis=0) + return ( + bin_yield, + np.flip(number_of_equivalent_bins, axis=-1), + cumulative_bin_yield, + ) + # prepare parameters + low_edge, max_edge = 0, 1 + bin_edges = [max_edge] + indices_gathering = [0] + + # bookkeep reasons for stopping binning + stop_reason = "" + # accumulated signal yield up to the current index + y_already_binned = 0.0 + y_min = 1.0e-5 + # during binning, do not remove leading entries + # instead remember the index that denotes the start of the bin + offset = 0 + + # prepare signal + # fine binned histograms bin centers are approx equivalent to dnn output + # flip arrays to start from the right + dnn_score_signal = np.flip(signal_histogram.axes[variable].centers, axis=-1) + y_signal = np.flip(signal_histogram.counts(), axis=-1) + + # calculate cumulative of reversed signal yield and yield per bin + cumulu_y_signal = np.cumsum(y_signal, axis=0) + full_cum = cumulu_y_signal[-1] + y_per_bin = full_cum / n_bins + num_events = len(cumulu_y_signal) + + # prepare background + + for process, histogram in background_histograms.items(): + if process.name == "tt": + tt_y, tt_num_eq, cumulu_tt_y = prepare_background(histogram) + elif process.name == "dy": + dy_y, dy_num_eq, cumulu_dy_y = prepare_background(histogram) + + # start binning + while len(bin_edges) < n_bins: + # stopping condition 1: reached end of events + if offset >= num_events: + stop_reason = "no more events left" + break + # stopping condition 2: remaining signal yield too small + # this would lead to a bin complelty filled with background + y_remaining = full_cum - y_already_binned + if y_remaining < y_min: + stop_reason = "remaining signal yield insufficient" + break + # find the index of the event that would result in a signal yield increase of more + # than the expected per-bin yield; + # this index would mark the start of the next bin given all constraints are met + if y_remaining >= y_per_bin: + threshold = y_already_binned + y_per_bin + # get indices of array of values above threshold + # first entry defines the next bin edge + # shift next idx by offset + next_idx = offset + np.where(cumulu_y_signal[offset:] > threshold)[0][0] + else: + # special case: remaining signal yield smaller than the expected per-bin yield, + # so find the last event + next_idx = offset + np.where(cumulu_y_signal[offset:])[0][-1] + 1 + + # advance the index until backgrounds constraints are met + + # combine tt and dy histograms + + # Background constraints + while next_idx < num_events: + # get the number of monte carlo tt and dy events + tt_num_events = get_integral(tt_num_eq, next_idx, offset) + dy_num_events = get_integral(tt_num_eq, next_idx, offset) + tt_yield = get_integral(cumulu_tt_y, next_idx, offset) + dy_yield = get_integral(cumulu_dy_y, next_idx, offset) + + # evaluate constraints + # TODO: potentially relax constraints here, e.g when there are 3 (4?) tt events, drop the constraint + # on dy, and vice-versa + constraints_met = ( + # have atleast 1 tt, 1 dy and atleast 4 background events + # scale by lumi ratio to be more fair to the smaller dataset + tt_num_events >= 1 and + dy_num_events >= 1 and + tt_num_events + dy_num_events >= 4 and + + # yields must be positive to avoid negative sums of weights per process + tt_yield > 0 and + dy_yield > 0 + ) + if constraints_met: + # TODO: maybe also check if the background conditions are just barely met and advance next_idx + # to the middle between the current value and the next one that would change anything about the + # background predictions; this might be more stable as the current implementation can highly + # depend on the exact value of a single event (the one that tips the constraints over the edge + # to fulfillment) + # bin found, stop + break + + # constraints not met, advance index to include the next tt or dy event and try again + next_idx += 1 + else: + # stopping condition 3: no more events left, so the last bin (most left one) does not fullfill + # constraints; however, this should practically never happen + stop_reason = "no more events left while trying to fulfill constraints" + break + + # next_idx found, update values + # get next edge or set to low edge if end is reached + if next_idx == num_events: + edge_value = low_edge + else: + # calculate bin center as new edge + edge_value = float(dnn_score_signal[next_idx - 1:next_idx + 1].mean()) + # prevent out of bounds values and push them to the boundaries + bin_edges.append(max(min(edge_value, max_edge), low_edge)) + + y_already_binned += get_integral(cumulu_y_signal, next_idx, offset) + offset = next_idx + indices_gathering.append(next_idx) + + # make sure the lower dnn_output (max events) is included + if bin_edges[-1] != low_edge: + if len(bin_edges) > n_bins: + raise RuntimeError(f"number of bins reached and initial bin edge is not minimal bin edge (edges: {bin_edges})") # noqa + bin_edges.append(low_edge) + indices_gathering.append(num_events) + + # some debugging output + n_bins_actual = len(bin_edges) - 1 + if n_bins_actual > n_bins: + raise Exception("number of actual bins ended up larger than requested (implementation bug)") + if n_bins_actual < n_bins: + print( + f" started from {num_events} bins, targeted {n_bins} but ended at {n_bins_actual} bins\n" + f" -> reason: {stop_reason or 'NO REASON!?'}", + ) + n_bins = n_bins_actual + + # flip indices to the right order + indices_gathering = (np.flip(indices_gathering) - num_events) * -1 + return np.flip(np.array(bin_edges), axis=-1), indices_gathering + + def apply_edges(h: hist.Hist, edges: np.ndarray, indices: np.ndarray, variable: tuple[str]) -> hist.Hist: + """ + Rebin the content axes determined by *variables* of a given hist histogram *h* to + given *edges* and their *indices*. + The rebinned histogram is returned. + + Args: + h (hist.Hist): hist Histogram that is to be rebinned + edges (np.ndarray): a array of ascending bin edges + indices (np.ndarray): a array of indices that define the new bin edges + variables (str): variable name that is rebinned + + Returns: + hist.Hist: rebinned hist histogram + """ + # sort edges and indices, by default they are sorted + ascending_order = np.argsort(edges) + edges, indices = edges[ascending_order], indices[ascending_order] + + # create new hist and add axes with coresponding edges + # define new axes, from old histogram and rebinned variable with new axis + axes = ( + [h.axes[axis] for axis in h.axes.name if axis not in variable] + + [hist.axis.Variable(edges, name=variable, label=f"{variable}-flat-s")] + ) + + new_hist = hist.Hist(*axes, storage=hist.storage.Weight()) + + # slice the old histogram storage view with new edges + # sum over sliced bin contents to get rebinned content + slices = [slice(int(indices[index]), int(indices[index + 1])) for index in range(0, len(indices) - 1)] + slice_array = [np.sum(h.view()[..., _slice], axis=-1, keepdims=True) for _slice in slices] + # concatenate the slices to get the new bin content + # store in new histogram storage view + np.concatenate(slice_array, axis=-1, out=new_hist.view()) + + return new_hist + + import hist + n_bins = 10 + # find signal histogram for which you will optimize, only 1 signal process is allowed + background_hists = {} + for process, histogram in hists.items(): + if process.has_tag("signal"): + signal_proc = process + signal_hist = histogram + else: + background_hists[process] = histogram + + if not signal_proc: + logger.warning("could not find any signal process, return hist unchanged") + return hists + + # 1. preparation + # get the leaf categories (e.g. {etau,mutau}__os__iso) + leaf_cats = task.config_inst.get_category(task.branch_data.category).get_leaf_categories() + + # sum over different leaf categories + cat_ids_locations = [hist.loc(category.id) for category in leaf_cats] + combined_signal_hist = signal_hist[{"category": cat_ids_locations}] + combined_signal_hist = combined_signal_hist[{"category": sum}] + # remove shift axis, since its always nominal + combined_signal_hist = combined_signal_hist[{"shift": hist.loc(0)}] + + # same for background + for process, histogram in background_hists.items(): + combined_background_hist = histogram[{"category": cat_ids_locations}] + combined_background_hist = combined_background_hist[{"category": sum}] + combined_background_hist = combined_background_hist[{"shift": hist.loc(0)}] + background_hists[process] = combined_background_hist + + # 2. determine bin edges + flat_s_edges, flat_s_indices = find_edges( + signal_histogram=combined_signal_hist, + variable=task.variables[0], + background_histograms=background_hists, + n_bins=n_bins, + ) + + # 3. apply to hists + for process, histogram in hists.items(): + hists[process] = apply_edges( + histogram, + flat_s_edges, + flat_s_indices, + task.variables[0], + ) + + return hists + config.x.hist_hooks = { "qcd": qcd_estimation, + "flat_s": flat_s, } diff --git a/hbt/config/variables.py b/hbt/config/variables.py index b75626d5..1f4e83ff 100644 --- a/hbt/config/variables.py +++ b/hbt/config/variables.py @@ -264,3 +264,10 @@ def add_variables(config: od.Config) -> None: binning=(25, 0.0, 1.0), x_title=rf"{proc.upper()} output node, res. DNN", ) + + config.add_variable( + name=f"res_dnn_{proc}_fine", + expression=f"res_dnn_{proc}", + binning=(5000, 0.0, 1.0), + x_title=rf"{proc.upper()} output node, res. DNN", + ) From 5f480910941f6a222c5c5a3aa4166016d316c85e Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Thu, 24 Oct 2024 13:26:37 +0200 Subject: [PATCH 03/25] Add task to create csv file for syncronization tasks --- hbt/tasks/__init__.py | 1 + hbt/tasks/sync_csv.py | 264 ++++++++++++++++++++++++++++++++++++++++++ law.cfg | 1 + 3 files changed, 266 insertions(+) create mode 100644 hbt/tasks/sync_csv.py diff --git a/hbt/tasks/__init__.py b/hbt/tasks/__init__.py index 381a505b..87c04d8a 100644 --- a/hbt/tasks/__init__.py +++ b/hbt/tasks/__init__.py @@ -5,3 +5,4 @@ import hbt.tasks.base import hbt.tasks.stats import hbt.tasks.studies +import hbt.tasks.sync_csv diff --git a/hbt/tasks/sync_csv.py b/hbt/tasks/sync_csv.py new file mode 100644 index 00000000..c69261f4 --- /dev/null +++ b/hbt/tasks/sync_csv.py @@ -0,0 +1,264 @@ +# coding: utf-8 + +""" +Task to create a single file csv file for framework sync with other frameworks. +""" + +import luigi +import law + + +from columnflow.tasks.framework.base import Requirements, AnalysisTask, wrapper_factory +from columnflow.tasks.framework.mixins import ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin +from columnflow.tasks.framework.remote import RemoteWorkflow +from columnflow.tasks.reduction import ReducedEventsUser +from columnflow.tasks.production import ProduceColumns +from columnflow.tasks.ml import MLEvaluation +from columnflow.tasks.selection import MergeSelectionMasks +from columnflow.util import dev_sandbox, DotDict + + +class CreateSyncFile( + MLModelsMixin, + ProducersMixin, + ChunkedIOMixin, + ReducedEventsUser, + SelectorMixin, + law.LocalWorkflow, + RemoteWorkflow, +): + sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox")) + + file_type = luigi.ChoiceParameter( + default="csv", + choices=("csv",), + description="the file type to create; choices: csv; default: csv", + ) + + # upstream requirements + reqs = Requirements( + ReducedEventsUser.reqs, + RemoteWorkflow.reqs, + ProduceColumns=ProduceColumns, + MLEvaluation=MLEvaluation, + MergeSelectionMasks=MergeSelectionMasks, + ) + + def workflow_requires(self): + reqs = super().workflow_requires() + + # require the full merge forest + reqs["events"] = self.reqs.ProvideReducedEvents.req(self) + + if not self.pilot: + if self.producer_insts: + reqs["producers"] = [ + self.reqs.ProduceColumns.req(self, producer=producer_inst.cls_name) + for producer_inst in self.producer_insts + if producer_inst.produced_columns + ] + if self.ml_model_insts: + reqs["ml"] = [ + self.reqs.MLEvaluation.req(self, ml_model=m) + for m in self.ml_models + ] + + return reqs + + def requires(self): + reqs = {"events": self.reqs.ProvideReducedEvents.req(self)} + + if self.producer_insts: + reqs["producers"] = [ + self.reqs.ProduceColumns.req(self, producer=producer_inst.cls_name) + for producer_inst in self.producer_insts + if producer_inst.produced_columns + ] + if self.ml_model_insts: + reqs["ml"] = [ + self.reqs.MLEvaluation.req(self, ml_model=m) + for m in self.ml_models + ] + + return reqs + + workflow_condition = ReducedEventsUser.workflow_condition.copy() + + @workflow_condition.output + def output(self): + return self.target(f"sync_file_{self.branch}.{self.file_type}") + + @law.decorator.log + @law.decorator.localize(input=True, output=True) + @law.decorator.safe_output + def run(self): + from columnflow.columnar_util import ( + Route, RouteFilter, mandatory_coffea_columns, update_ak_array, + sort_ak_fields, EMPTY_FLOAT, + ) + import awkward as ak + import pandas as pd + def sync_columns(): + columns = { + "physics_objects": { + "Jet.{pt,eta,phi,mass}": 2, + }, + "flat_objects": [ + "event", + "run", + "channel_id", + "leptons_os", + "luminosityBlock", + "lep*", + ], + } + mapping = { + "luminosityBlock": "lumi", + "leptons_os": "is_os", + } + return columns, mapping + + def lepton_selection(events, attributes=["pt", "eta", "phi", "charge"]): + first_lepton = ak.concatenate([events["Electron"], events["Muon"]], axis=1) + second_lepton = events["Tau"] + for attribute in attributes: + events[f"lep1_{attribute}"] = first_lepton[attribute] + events[f"lep2_{attribute}"] = second_lepton[attribute] + return events + + def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): + """Helper function to convert awkward arrays to pandas dataframes. + + Args: + events (ak.array): Awkward array with nested structure. + physics_objects (Dict[str]): Dict of physics objects to consider, with value representing padding. + attributes (List[str]): List of attributes to consider. + flat_objects (List[str]): List of additional columns to consider, these are not padded. + + Returns: + pd.DataFrame: Pandas dataframe with flattened structure of the awkward array. + """ + events = sort_ak_fields(events) + f = DotDict() + # add meta columns (like event number, ...) + # these columns do not need padding + + # columns that need no padding (like event number, ...) + # resolve glob patterns + for flat_object_pattern in flat_objects: + for field in events.fields: + if law.util.fnmatch.fnmatch(field, flat_object_pattern): + f[field] = events[field] + + # add columns of physics objects + # columns are padded to given pad_length + for physics_pattern, pad_length in physics_objects.items(): + physics_objects = law.util.brace_expand(physics_pattern) + for physics_object in physics_objects: + physics_route = Route(physics_object) + parts = physics_route.fields + physics_array = physics_route.apply(events) + physics_array = ak.pad_none(physics_array, pad_length) + physics_array = ak.fill_none(physics_array, EMPTY_FLOAT) + for pad_number in range(0, pad_length): + full_name = f"{parts[0].lower()}{pad_number}_{parts[1].lower()}" + f[full_name] = physics_array[:, pad_number] + return ak.to_dataframe(f) + + # prepare inputs and outputs + inputs = self.input() + + # create a temp dir for saving intermediate files + tmp_dir = law.LocalDirectoryTarget(is_tmp=True) + tmp_dir.touch() + + # define columns that will be written + write_columns: set[Route] = set() + skip_columns: set[str] = set() + for c in self.config_inst.x.keep_columns.get(self.task_family, ["*"]): + for r in self._expand_keep_column(c): + if r.has_tag("skip"): + skip_columns.add(r.column) + else: + write_columns.add(r) + write_columns = { + r for r in write_columns + if not law.util.multi_match(r.column, skip_columns, mode=any) + } + route_filter = RouteFilter(write_columns) + + # define columns that need to be read + read_columns = write_columns | set(mandatory_coffea_columns) + read_columns = {Route(c) for c in read_columns} + + # iterate over chunks of events and diffs + files = [inputs["events"]["events"].abspath] + if self.producer_insts: + files.extend([inp["columns"].abspath for inp in inputs["producers"]]) + if self.ml_model_insts: + files.extend([inp["mlcolumns"].abspath for inp in inputs["ml"]]) + + pandas_frameworks = [] + for (events, *columns), pos in self.iter_chunked_io( + files, + source_type=len(files) * ["awkward_parquet"], + read_columns=len(files) * [read_columns], + ): + # optional check for overlapping inputs + if self.check_overlapping_inputs: + self.raise_if_overlapping([events] + list(columns)) + + # add additional columns + events = update_ak_array(events, *columns) + + # remove columns + events = route_filter(events) + + # optional check for finite values + if self.check_finite_output: + self.raise_if_not_finite(events) + + # construct first, second lepton columns + events = lepton_selection( + events, + attributes=["pt", "eta", "phi", "mass", "charge"], + ) + + # convert to pandas dataframe + keep_columns, mapping = sync_columns() + + events_pd = awkward_to_pandas( + events=events, + physics_objects=keep_columns["physics_objects"], + flat_objects=keep_columns["flat_objects"], + ) + + pandas_frameworks.append(events_pd) + + # merge output files + merged_pandas_framework = pd.concat( + pandas_frameworks, + ignore_index=True, + ).rename(columns=mapping, inplace=False) + self.output().dump(merged_pandas_framework, index=False, formatter="pandas") + + +# overwrite class defaults +check_finite_tasks = law.config.get_expanded("analysis", "check_finite_output", [], split_csv=True) +CreateSyncFile.check_finite_output = ChunkedIOMixin.check_finite_output.copy( + default=CreateSyncFile.task_family in check_finite_tasks, + add_default_to_description=True, +) + +check_overlap_tasks = law.config.get_expanded("analysis", "check_overlapping_inputs", [], split_csv=True) +CreateSyncFile.check_overlapping_inputs = ChunkedIOMixin.check_overlapping_inputs.copy( + default=CreateSyncFile.task_family in check_overlap_tasks, + add_default_to_description=True, +) + + +CreateSyncFileWrapper = wrapper_factory( + base_cls=AnalysisTask, + require_cls=CreateSyncFile, + enable=["configs", "skip_configs", "datasets", "skip_datasets", "shifts", "skip_shifts"], +) diff --git a/law.cfg b/law.cfg index 81e7119d..a58dc10a 100644 --- a/law.cfg +++ b/law.cfg @@ -132,6 +132,7 @@ cf.MergeMLEvents: wlcg cf.MLTraining: wlcg cf.MLEvaluation: wlcg cf.UniteColumns: wlcg +cf.CreateSyncFile: wlcg [versions] From fbe850d80c7f782ca7b03b57eb941fb9ac6a6b3f Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Thu, 24 Oct 2024 14:49:54 +0200 Subject: [PATCH 04/25] changed tag to be more consistent with previous defined tags --- hbt/config/configs_hbt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbt/config/configs_hbt.py b/hbt/config/configs_hbt.py index 785fc414..f7a88089 100644 --- a/hbt/config/configs_hbt.py +++ b/hbt/config/configs_hbt.py @@ -148,9 +148,9 @@ def if_era( proc.add_tag("signal") proc.add_tag("resonant_signal") if process_name.startswith("tt"): - proc.add_tag("is_ttbar") + proc.add_tag("ttbar") if process_name.startswith("dy"): - proc.add_tag("is_dy") + proc.add_tag("dy") # add the process cfg.add_process(proc) From 90730ce7bfa2cdab44e9e48b38329fd8b5b523e6 Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Thu, 24 Oct 2024 14:50:35 +0200 Subject: [PATCH 05/25] adapt doc strings to cf style, fixed typos --- hbt/config/hist_hooks.py | 57 ++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/hbt/config/hist_hooks.py b/hbt/config/hist_hooks.py index cc2d07b2..1e47de92 100644 --- a/hbt/config/hist_hooks.py +++ b/hbt/config/hist_hooks.py @@ -189,24 +189,39 @@ def qcd_estimation(task, hists): def flat_s(task, hists: dict[hist.Histogram]) -> dict[hist.Histogram]: """Rebinnig of the histograms in *hists* to archieve a flat-signal distribution. - Args: - task (TODO): task instance that contains the process informations - hists (Dict[hist.Histogram]): A dictionary of histograms using Process instances as keys + :param task: task instance that contains the process informations + :param hists: A dictionary of histograms using Process instances as keys - Returns: - Dict[hist.Histogram]: A dictionary of histograms using Process instances as keys + :raises RuntimeError: If the wanted number of bins is reached and the initial + bin edge is not minimal. + :raises Exception: If the number of actual bins ended up larger than requested. + :return: A dictionary of histograms using Process instances as keys """ def find_edges(signal_histogram, background_histograms, variable, n_bins=10) -> tuple[np.ndarray, np.ndarray]: """ Determine new bin edges that result in a flat signal distribution. The edges are determined by the signal distribution, while the background distribution is used to ensure that the background yield in each bin is sufficient. + + :param signal_histogram: The histogram that describes the signal distribution. + :param background_histograms: A dictionary of histograms using the process as key + that describe the background distribution. + :param variable: The variable name that is rebinned. + :param n_bins: The number of bins that the signal distribution should be rebinned to. + + :return: A tuple containing the new bin edges and the indices that define the new bin edges. """ def get_integral(cumulative_weights, stop, offset=0): """ Helper to calculate the integral of *cumulative_weights* between the *offset* (included) and the *stop* index (not included) + + :param cumulative_weights: The cumulative weights array. + :param stop: The index up to which the integral is calculated. + :param offset: The index from which the integral is calculated. + + :return The integral of the weights between the given indices. """ return cumulative_weights[stop - 1] - (0 if offset == 0 else cumulative_weights[offset - 1]) @@ -214,8 +229,9 @@ def prepare_background(histogram: hist.Histogram) -> tuple[np.ndarray, np.ndarra """ Helper to extract information from background histograms. - Returns: - tuple[np.ndarray]: A tuple containing the array that describe bin yield, + :param histogram: A histogram that describes the background distribution. + + :return: A tuple containing the array that describe bin yield, the number of equivalent bins and the cumulative bin yield. """ bin_yield = histogram.counts() @@ -258,7 +274,7 @@ def prepare_background(histogram: hist.Histogram) -> tuple[np.ndarray, np.ndarra # prepare background for process, histogram in background_histograms.items(): - if process.name == "tt": + if process.name == "ttbar": tt_y, tt_num_eq, cumulu_tt_y = prepare_background(histogram) elif process.name == "dy": dy_y, dy_num_eq, cumulu_dy_y = prepare_background(histogram) @@ -289,10 +305,6 @@ def prepare_background(histogram: hist.Histogram) -> tuple[np.ndarray, np.ndarra # so find the last event next_idx = offset + np.where(cumulu_y_signal[offset:])[0][-1] + 1 - # advance the index until backgrounds constraints are met - - # combine tt and dy histograms - # Background constraints while next_idx < num_events: # get the number of monte carlo tt and dy events @@ -349,7 +361,9 @@ def prepare_background(histogram: hist.Histogram) -> tuple[np.ndarray, np.ndarra # make sure the lower dnn_output (max events) is included if bin_edges[-1] != low_edge: if len(bin_edges) > n_bins: - raise RuntimeError(f"number of bins reached and initial bin edge is not minimal bin edge (edges: {bin_edges})") # noqa + raise RuntimeError( + "number of bins reached and initial bin edge" + f"is not minimal bin edge (edges: {bin_edges})") bin_edges.append(low_edge) indices_gathering.append(num_events) @@ -368,20 +382,19 @@ def prepare_background(histogram: hist.Histogram) -> tuple[np.ndarray, np.ndarra indices_gathering = (np.flip(indices_gathering) - num_events) * -1 return np.flip(np.array(bin_edges), axis=-1), indices_gathering - def apply_edges(h: hist.Hist, edges: np.ndarray, indices: np.ndarray, variable: tuple[str]) -> hist.Hist: + def apply_edges(h: hist.Hist, edges: np.ndarray, indices: np.ndarray, variable: str) -> hist.Hist: """ - Rebin the content axes determined by *variables* of a given hist histogram *h* to + Rebin the content axes determined by *variable* of a given hist histogram *h* to given *edges* and their *indices*. The rebinned histogram is returned. - Args: - h (hist.Hist): hist Histogram that is to be rebinned - edges (np.ndarray): a array of ascending bin edges - indices (np.ndarray): a array of indices that define the new bin edges - variables (str): variable name that is rebinned - Returns: - hist.Hist: rebinned hist histogram + :param h: hist Histogram that is to be rebinned + :param edges: a array of ascending bin edges + :param indices: a array of indices that define the new bin edges + :param variable: variable name that is rebinned + + :return: rebinned hist histogram """ # sort edges and indices, by default they are sorted ascending_order = np.argsort(edges) From 903bfe748582cd9d9c119fe2b2e37768f1e7ec8e Mon Sep 17 00:00:00 2001 From: Bogdan-Wiederspan <79155113+Bogdan-Wiederspan@users.noreply.github.com> Date: Thu, 24 Oct 2024 15:05:29 +0200 Subject: [PATCH 06/25] Update type hint in hist_hooks.py --- hbt/config/hist_hooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbt/config/hist_hooks.py b/hbt/config/hist_hooks.py index 1e47de92..d0ef8a86 100644 --- a/hbt/config/hist_hooks.py +++ b/hbt/config/hist_hooks.py @@ -187,7 +187,7 @@ def qcd_estimation(task, hists): return hists - def flat_s(task, hists: dict[hist.Histogram]) -> dict[hist.Histogram]: + def flat_s(task, hists: dict[od.Process, hist.Histogram]) -> dict[hist.Histogram]: """Rebinnig of the histograms in *hists* to archieve a flat-signal distribution. :param task: task instance that contains the process informations From 2682b1c07aa9d51f51b02b8195911c6d67bb66e7 Mon Sep 17 00:00:00 2001 From: Bogdan-Wiederspan <79155113+Bogdan-Wiederspan@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:04:44 +0200 Subject: [PATCH 07/25] typo Co-authored-by: Marcel Rieger --- hbt/config/hist_hooks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbt/config/hist_hooks.py b/hbt/config/hist_hooks.py index d0ef8a86..982feab9 100644 --- a/hbt/config/hist_hooks.py +++ b/hbt/config/hist_hooks.py @@ -363,7 +363,8 @@ def prepare_background(histogram: hist.Histogram) -> tuple[np.ndarray, np.ndarra if len(bin_edges) > n_bins: raise RuntimeError( "number of bins reached and initial bin edge" - f"is not minimal bin edge (edges: {bin_edges})") + f" is not minimal bin edge (edges: {bin_edges})", + ) bin_edges.append(low_edge) indices_gathering.append(num_events) From d2bca72a6ad0a056d3dcbf994c0f25d5846c668f Mon Sep 17 00:00:00 2001 From: Bogdan-Wiederspan <79155113+Bogdan-Wiederspan@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:07:27 +0200 Subject: [PATCH 08/25] more typos --- hbt/config/hist_hooks.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hbt/config/hist_hooks.py b/hbt/config/hist_hooks.py index 982feab9..ccc51791 100644 --- a/hbt/config/hist_hooks.py +++ b/hbt/config/hist_hooks.py @@ -187,7 +187,7 @@ def qcd_estimation(task, hists): return hists - def flat_s(task, hists: dict[od.Process, hist.Histogram]) -> dict[hist.Histogram]: + def flat_s(task, hists: dict[od.Process, hist.Histogram]) -> dict[od.Process, hist.Histogram]: """Rebinnig of the histograms in *hists* to archieve a flat-signal distribution. :param task: task instance that contains the process informations @@ -389,7 +389,6 @@ def apply_edges(h: hist.Hist, edges: np.ndarray, indices: np.ndarray, variable: given *edges* and their *indices*. The rebinned histogram is returned. - :param h: hist Histogram that is to be rebinned :param edges: a array of ascending bin edges :param indices: a array of indices that define the new bin edges From c80f4547ea6ce7937fba08028d5beb29fca142fa Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Thu, 24 Oct 2024 16:13:33 +0200 Subject: [PATCH 09/25] rename sync_csv to sync, to match naming convention of analysis and fixed some typos --- hbt/tasks/__init__.py | 2 +- hbt/tasks/{sync_csv.py => sync.py} | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) rename hbt/tasks/{sync_csv.py => sync.py} (96%) diff --git a/hbt/tasks/__init__.py b/hbt/tasks/__init__.py index 87c04d8a..2ec46037 100644 --- a/hbt/tasks/__init__.py +++ b/hbt/tasks/__init__.py @@ -5,4 +5,4 @@ import hbt.tasks.base import hbt.tasks.stats import hbt.tasks.studies -import hbt.tasks.sync_csv +import hbt.tasks.sync diff --git a/hbt/tasks/sync_csv.py b/hbt/tasks/sync.py similarity index 96% rename from hbt/tasks/sync_csv.py rename to hbt/tasks/sync.py index c69261f4..f2cfcc84 100644 --- a/hbt/tasks/sync_csv.py +++ b/hbt/tasks/sync.py @@ -7,7 +7,6 @@ import luigi import law - from columnflow.tasks.framework.base import Requirements, AnalysisTask, wrapper_factory from columnflow.tasks.framework.mixins import ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin from columnflow.tasks.framework.remote import RemoteWorkflow @@ -243,20 +242,12 @@ def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): self.output().dump(merged_pandas_framework, index=False, formatter="pandas") -# overwrite class defaults -check_finite_tasks = law.config.get_expanded("analysis", "check_finite_output", [], split_csv=True) -CreateSyncFile.check_finite_output = ChunkedIOMixin.check_finite_output.copy( - default=CreateSyncFile.task_family in check_finite_tasks, - add_default_to_description=True, -) - check_overlap_tasks = law.config.get_expanded("analysis", "check_overlapping_inputs", [], split_csv=True) CreateSyncFile.check_overlapping_inputs = ChunkedIOMixin.check_overlapping_inputs.copy( default=CreateSyncFile.task_family in check_overlap_tasks, add_default_to_description=True, ) - CreateSyncFileWrapper = wrapper_factory( base_cls=AnalysisTask, require_cls=CreateSyncFile, From 5f605b94ded0c02333c3e33bd55e15b5d1ff07c5 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 08:49:57 +0200 Subject: [PATCH 10/25] Add pandas contrib. --- hbt/__init__.py | 3 +++ modules/columnflow | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/hbt/__init__.py b/hbt/__init__.py index 78ef9c08..9e393f0c 100644 --- a/hbt/__init__.py +++ b/hbt/__init__.py @@ -1,8 +1,11 @@ # coding: utf-8 +import law from hbt.columnflow_patches import patch_all +law.contrib.load("pandas") + # apply cf patches once patch_all() diff --git a/modules/columnflow b/modules/columnflow index 11ef864d..0652ad1e 160000 --- a/modules/columnflow +++ b/modules/columnflow @@ -1 +1 @@ -Subproject commit 11ef864d4295e31ad9ff103e7236ee2b77f31aa1 +Subproject commit 0652ad1eb169deb80f32a26466df30ecbfda5230 From eadebe7477b57fb8e7ae0976552f46b2b2c149a5 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 11:17:31 +0200 Subject: [PATCH 11/25] Slim csv writing. --- hbt/tasks/sync.py | 212 ++++++++++++++++------------------------------ 1 file changed, 71 insertions(+), 141 deletions(-) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py index f2cfcc84..87efeb28 100644 --- a/hbt/tasks/sync.py +++ b/hbt/tasks/sync.py @@ -1,23 +1,28 @@ # coding: utf-8 """ -Task to create a single file csv file for framework sync with other frameworks. +Tasks that create files for synchronization efforts with other frameworks. """ -import luigi +from __future__ import annotations + import law from columnflow.tasks.framework.base import Requirements, AnalysisTask, wrapper_factory -from columnflow.tasks.framework.mixins import ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin +from columnflow.tasks.framework.mixins import ( + ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin, +) from columnflow.tasks.framework.remote import RemoteWorkflow from columnflow.tasks.reduction import ReducedEventsUser from columnflow.tasks.production import ProduceColumns from columnflow.tasks.ml import MLEvaluation -from columnflow.tasks.selection import MergeSelectionMasks -from columnflow.util import dev_sandbox, DotDict +from columnflow.util import dev_sandbox + +from hbt.tasks.base import HBTTask -class CreateSyncFile( +class CreateSyncFiles( + HBTTask, MLModelsMixin, ProducersMixin, ChunkedIOMixin, @@ -28,19 +33,12 @@ class CreateSyncFile( ): sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox")) - file_type = luigi.ChoiceParameter( - default="csv", - choices=("csv",), - description="the file type to create; choices: csv; default: csv", - ) - # upstream requirements reqs = Requirements( ReducedEventsUser.reqs, RemoteWorkflow.reqs, ProduceColumns=ProduceColumns, MLEvaluation=MLEvaluation, - MergeSelectionMasks=MergeSelectionMasks, ) def workflow_requires(self): @@ -85,110 +83,17 @@ def requires(self): @workflow_condition.output def output(self): - return self.target(f"sync_file_{self.branch}.{self.file_type}") + return self.target(f"sync_{self.dataset_inst.name}_{self.branch}.csv") @law.decorator.log - @law.decorator.localize(input=True, output=True) - @law.decorator.safe_output + @law.decorator.localize def run(self): - from columnflow.columnar_util import ( - Route, RouteFilter, mandatory_coffea_columns, update_ak_array, - sort_ak_fields, EMPTY_FLOAT, - ) import awkward as ak - import pandas as pd - def sync_columns(): - columns = { - "physics_objects": { - "Jet.{pt,eta,phi,mass}": 2, - }, - "flat_objects": [ - "event", - "run", - "channel_id", - "leptons_os", - "luminosityBlock", - "lep*", - ], - } - mapping = { - "luminosityBlock": "lumi", - "leptons_os": "is_os", - } - return columns, mapping - - def lepton_selection(events, attributes=["pt", "eta", "phi", "charge"]): - first_lepton = ak.concatenate([events["Electron"], events["Muon"]], axis=1) - second_lepton = events["Tau"] - for attribute in attributes: - events[f"lep1_{attribute}"] = first_lepton[attribute] - events[f"lep2_{attribute}"] = second_lepton[attribute] - return events - - def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): - """Helper function to convert awkward arrays to pandas dataframes. - - Args: - events (ak.array): Awkward array with nested structure. - physics_objects (Dict[str]): Dict of physics objects to consider, with value representing padding. - attributes (List[str]): List of attributes to consider. - flat_objects (List[str]): List of additional columns to consider, these are not padded. - - Returns: - pd.DataFrame: Pandas dataframe with flattened structure of the awkward array. - """ - events = sort_ak_fields(events) - f = DotDict() - # add meta columns (like event number, ...) - # these columns do not need padding - - # columns that need no padding (like event number, ...) - # resolve glob patterns - for flat_object_pattern in flat_objects: - for field in events.fields: - if law.util.fnmatch.fnmatch(field, flat_object_pattern): - f[field] = events[field] - - # add columns of physics objects - # columns are padded to given pad_length - for physics_pattern, pad_length in physics_objects.items(): - physics_objects = law.util.brace_expand(physics_pattern) - for physics_object in physics_objects: - physics_route = Route(physics_object) - parts = physics_route.fields - physics_array = physics_route.apply(events) - physics_array = ak.pad_none(physics_array, pad_length) - physics_array = ak.fill_none(physics_array, EMPTY_FLOAT) - for pad_number in range(0, pad_length): - full_name = f"{parts[0].lower()}{pad_number}_{parts[1].lower()}" - f[full_name] = physics_array[:, pad_number] - return ak.to_dataframe(f) + from columnflow.columnar_util import update_ak_array, EMPTY_FLOAT # prepare inputs and outputs inputs = self.input() - - # create a temp dir for saving intermediate files - tmp_dir = law.LocalDirectoryTarget(is_tmp=True) - tmp_dir.touch() - - # define columns that will be written - write_columns: set[Route] = set() - skip_columns: set[str] = set() - for c in self.config_inst.x.keep_columns.get(self.task_family, ["*"]): - for r in self._expand_keep_column(c): - if r.has_tag("skip"): - skip_columns.add(r.column) - else: - write_columns.add(r) - write_columns = { - r for r in write_columns - if not law.util.multi_match(r.column, skip_columns, mode=any) - } - route_filter = RouteFilter(write_columns) - - # define columns that need to be read - read_columns = write_columns | set(mandatory_coffea_columns) - read_columns = {Route(c) for c in read_columns} + output = self.output() # iterate over chunks of events and diffs files = [inputs["events"]["events"].abspath] @@ -197,11 +102,35 @@ def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): if self.ml_model_insts: files.extend([inp["mlcolumns"].abspath for inp in inputs["ml"]]) - pandas_frameworks = [] + # helper to replace our internal empty float placeholder with a custom one + empty_float = EMPTY_FLOAT # points to same value for now, but can be changed + def replace_empty_float(arr): + if empty_float != EMPTY_FLOAT: + arr = ak.where(arr == EMPTY_FLOAT, empty_float, arr) + return arr + + # helper to pad nested fields with an empty float if missing + def pad_nested(arr, n, *, axis=1): + return ak.fill_none(ak.pad_none(arr, n, axis=axis), empty_float) + + # helper to pad and select the last element on the first inner axis + def select(arr, idx): + return replace_empty_float(pad_nested(arr, idx + 1, axis=1)[:, idx]) + + # TODO: use this helper + # def lepton_selection(events, attributes=["pt", "eta", "phi", "charge"]): + # first_lepton = ak.concatenate([events["Electron"], events["Muon"]], axis=1) + # second_lepton = events["Tau"] + # for attribute in attributes: + # events[f"lep1_{attribute}"] = first_lepton[attribute] + # events[f"lep2_{attribute}"] = second_lepton[attribute] + # return events + + # event chunk loop for (events, *columns), pos in self.iter_chunked_io( files, source_type=len(files) * ["awkward_parquet"], - read_columns=len(files) * [read_columns], + pool_size=1, ): # optional check for overlapping inputs if self.check_overlapping_inputs: @@ -210,46 +139,47 @@ def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): # add additional columns events = update_ak_array(events, *columns) - # remove columns - events = route_filter(events) - # optional check for finite values if self.check_finite_output: self.raise_if_not_finite(events) - # construct first, second lepton columns - events = lepton_selection( - events, - attributes=["pt", "eta", "phi", "mass", "charge"], - ) - - # convert to pandas dataframe - keep_columns, mapping = sync_columns() - - events_pd = awkward_to_pandas( - events=events, - physics_objects=keep_columns["physics_objects"], - flat_objects=keep_columns["flat_objects"], + # project into dataframe + df = ak.to_dataframe({ + # index variables + "event": events.event, + "run": events.run, + "lumi": events.luminosityBlock, + # high-level events variables + "channel": events.channel_id, + "os": events.leptons_os * 1, + "iso": events.tau2_isolated * 1, + # jet variables + "jet1_pt": select(events.Jet.pt, 0), + "jet1_eta": select(events.Jet.eta, 0), + "jet2_pt": select(events.Jet.pt, 1), + "jet2_eta": select(events.Jet.eta, 1), + "jet8_eta": select(events.Jet.eta, 7), + # TODO: add additional variables + }) + + # save as csv in output, append if necessary + output.dump( + df, + formatter="pandas", + index=False, + header=pos.index == 0, + mode="w" if pos.index == 0 else "a", ) - pandas_frameworks.append(events_pd) - - # merge output files - merged_pandas_framework = pd.concat( - pandas_frameworks, - ignore_index=True, - ).rename(columns=mapping, inplace=False) - self.output().dump(merged_pandas_framework, index=False, formatter="pandas") - check_overlap_tasks = law.config.get_expanded("analysis", "check_overlapping_inputs", [], split_csv=True) -CreateSyncFile.check_overlapping_inputs = ChunkedIOMixin.check_overlapping_inputs.copy( - default=CreateSyncFile.task_family in check_overlap_tasks, +CreateSyncFiles.check_overlapping_inputs = ChunkedIOMixin.check_overlapping_inputs.copy( + default=CreateSyncFiles.task_family in check_overlap_tasks, add_default_to_description=True, ) -CreateSyncFileWrapper = wrapper_factory( +CreateSyncFilesWrapper = wrapper_factory( base_cls=AnalysisTask, - require_cls=CreateSyncFile, + require_cls=CreateSyncFiles, enable=["configs", "skip_configs", "datasets", "skip_datasets", "shifts", "skip_shifts"], ) From d1fb486e511b7b85172564ff1d39174de485e10d Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 11:20:38 +0200 Subject: [PATCH 12/25] Fix jet variables. --- hbt/tasks/sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py index 87efeb28..b1710637 100644 --- a/hbt/tasks/sync.py +++ b/hbt/tasks/sync.py @@ -156,9 +156,10 @@ def select(arr, idx): # jet variables "jet1_pt": select(events.Jet.pt, 0), "jet1_eta": select(events.Jet.eta, 0), + "jet1_phi": select(events.Jet.phi, 0), "jet2_pt": select(events.Jet.pt, 1), "jet2_eta": select(events.Jet.eta, 1), - "jet8_eta": select(events.Jet.eta, 7), + "jet2_phi": select(events.Jet.phi, 1), # TODO: add additional variables }) From a71310c59845efc80ae8857858fe07e73405f4b6 Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Fri, 25 Oct 2024 13:51:10 +0200 Subject: [PATCH 13/25] Removed wrapper since it is not needed, added Lepton selection --- hbt/tasks/sync.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py index b1710637..46193b4f 100644 --- a/hbt/tasks/sync.py +++ b/hbt/tasks/sync.py @@ -8,7 +8,7 @@ import law -from columnflow.tasks.framework.base import Requirements, AnalysisTask, wrapper_factory +from columnflow.tasks.framework.base import Requirements from columnflow.tasks.framework.mixins import ( ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin, ) @@ -89,7 +89,7 @@ def output(self): @law.decorator.localize def run(self): import awkward as ak - from columnflow.columnar_util import update_ak_array, EMPTY_FLOAT + from columnflow.columnar_util import update_ak_array, EMPTY_FLOAT, set_ak_column # prepare inputs and outputs inputs = self.input() @@ -117,14 +117,11 @@ def pad_nested(arr, n, *, axis=1): def select(arr, idx): return replace_empty_float(pad_nested(arr, idx + 1, axis=1)[:, idx]) - # TODO: use this helper - # def lepton_selection(events, attributes=["pt", "eta", "phi", "charge"]): - # first_lepton = ak.concatenate([events["Electron"], events["Muon"]], axis=1) - # second_lepton = events["Tau"] - # for attribute in attributes: - # events[f"lep1_{attribute}"] = first_lepton[attribute] - # events[f"lep2_{attribute}"] = second_lepton[attribute] - # return events + # helper to select leptons + def lepton_selection(events): + # first event any lepton, second alsways tau + lepton = ak.concatenate([events["Electron"], events["Muon"], events["Tau"]], axis=1) + return set_ak_column(events, "Lepton", lepton) # event chunk loop for (events, *columns), pos in self.iter_chunked_io( @@ -138,7 +135,7 @@ def select(arr, idx): # add additional columns events = update_ak_array(events, *columns) - + events = lepton_selection(events) # optional check for finite values if self.check_finite_output: self.raise_if_not_finite(events) @@ -160,6 +157,14 @@ def select(arr, idx): "jet2_pt": select(events.Jet.pt, 1), "jet2_eta": select(events.Jet.eta, 1), "jet2_phi": select(events.Jet.phi, 1), + "lep1_pt": select(events.Lepton.pt, 1), + "lep1_phi": select(events.Lepton.phi, 1), + "lep1_eta": select(events.Lepton.eta, 1), + "lep1_charge": select(events.Lepton.charge, 1), + "lep2_pt": select(events.Lepton.pt, 2), + "lep2_phi": select(events.Lepton.phi, 2), + "lep2_eta": select(events.Lepton.eta, 2), + "lep2_charge": select(events.Lepton.charge, 2), # TODO: add additional variables }) @@ -178,9 +183,3 @@ def select(arr, idx): default=CreateSyncFiles.task_family in check_overlap_tasks, add_default_to_description=True, ) - -CreateSyncFilesWrapper = wrapper_factory( - base_cls=AnalysisTask, - require_cls=CreateSyncFiles, - enable=["configs", "skip_configs", "datasets", "skip_datasets", "shifts", "skip_shifts"], -) From a7cc592d843f142438e46c642808e92375de53d6 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 14:19:08 +0200 Subject: [PATCH 14/25] Fix lepton handling. --- hbt/tasks/sync.py | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py index 46193b4f..cabf2dbc 100644 --- a/hbt/tasks/sync.py +++ b/hbt/tasks/sync.py @@ -118,10 +118,17 @@ def select(arr, idx): return replace_empty_float(pad_nested(arr, idx + 1, axis=1)[:, idx]) # helper to select leptons - def lepton_selection(events): - # first event any lepton, second alsways tau - lepton = ak.concatenate([events["Electron"], events["Muon"], events["Tau"]], axis=1) - return set_ak_column(events, "Lepton", lepton) + def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> ak.Array: + # ensure all lepton arrays have the same common fields + leptons = [events.Electron, events.Muon, events.Tau] + for i in range(len(leptons)): + lepton = leptons[i] + for field, default in common_fields.items(): + if field not in lepton.fields: + lepton = set_ak_column(lepton, field, default) + leptons[i] = lepton + # concatenate (first event any lepton, second alsways tau) and add to events + return set_ak_column(events, "Lepton", ak.concatenate(leptons, axis=1)) # event chunk loop for (events, *columns), pos in self.iter_chunked_io( @@ -135,10 +142,9 @@ def lepton_selection(events): # add additional columns events = update_ak_array(events, *columns) - events = lepton_selection(events) - # optional check for finite values - if self.check_finite_output: - self.raise_if_not_finite(events) + + # insert leptons + events = select_leptons(events, {"rawDeepTau2018v2p5VSjet": empty_float}) # project into dataframe df = ak.to_dataframe({ @@ -157,14 +163,16 @@ def lepton_selection(events): "jet2_pt": select(events.Jet.pt, 1), "jet2_eta": select(events.Jet.eta, 1), "jet2_phi": select(events.Jet.phi, 1), - "lep1_pt": select(events.Lepton.pt, 1), - "lep1_phi": select(events.Lepton.phi, 1), - "lep1_eta": select(events.Lepton.eta, 1), - "lep1_charge": select(events.Lepton.charge, 1), - "lep2_pt": select(events.Lepton.pt, 2), - "lep2_phi": select(events.Lepton.phi, 2), - "lep2_eta": select(events.Lepton.eta, 2), - "lep2_charge": select(events.Lepton.charge, 2), + "lep1_pt": select(events.Lepton.pt, 0), + "lep1_phi": select(events.Lepton.phi, 0), + "lep1_eta": select(events.Lepton.eta, 0), + "lep1_charge": select(events.Lepton.charge, 0), + "lep1_deeptauvsjet": select(events.Lepton.rawDeepTau2018v2p5VSjet, 0), + "lep2_pt": select(events.Lepton.pt, 1), + "lep2_phi": select(events.Lepton.phi, 1), + "lep2_eta": select(events.Lepton.eta, 1), + "lep2_charge": select(events.Lepton.charge, 1), + "lep2_deeptauvsjet": select(events.Lepton.rawDeepTau2018v2p5VSjet, 1), # TODO: add additional variables }) From b5df324c49657577423fa21a24df428d7b034d09 Mon Sep 17 00:00:00 2001 From: Anas <103462379+haddadanas@users.noreply.github.com> Date: Fri, 25 Oct 2024 16:35:14 +0200 Subject: [PATCH 15/25] Small improvements in the jet selection Remove double slicing + added one comment --- hbt/selection/jet.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbt/selection/jet.py b/hbt/selection/jet.py index 8fa9ed4d..0317723a 100644 --- a/hbt/selection/jet.py +++ b/hbt/selection/jet.py @@ -82,7 +82,7 @@ def jet_selection( hhbtag_scores = self[hhbtag](events, default_mask, lepton_results.x.lepton_pair, **kwargs) score_indices = ak.argsort(hhbtag_scores, axis=1, ascending=False) # pad the indices to simplify creating the hhbjet mask - padded_hhbjet_indices = ak.pad_none(score_indices, 2, axis=1)[..., :2][..., :2] + padded_hhbjet_indices = ak.pad_none(score_indices, 2, axis=1)[..., :2] hhbjet_mask = ((li == padded_hhbjet_indices[..., [0]]) | (li == padded_hhbjet_indices[..., [1]])) # get indices for actual book keeping only for events with both lepton candidates and where at # least two jets pass the default mask (bjet candidates) @@ -116,6 +116,7 @@ def jet_selection( cross_vbf_mask = ak.full_like(1 * events.event, False, dtype=bool) else: cross_vbf_masks = [events.trigger_ids == tid for tid in cross_vbf_ids] + # This combines "at least one cross trigger is fired" and "no other triggers are fired" cross_vbf_mask = ak.all(reduce(or_, cross_vbf_masks), axis=1) vbf_pair_mask = vbf_pair_mask & ( (~cross_vbf_mask) | ( From 5fa6472c274c7e68838ecd5050307754c1329c29 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 17:09:49 +0200 Subject: [PATCH 16/25] Update run 3 datasets. --- hbt/config/configs_hbt.py | 125 +++++++++++++++++++++++++------------- 1 file changed, 84 insertions(+), 41 deletions(-) diff --git a/hbt/config/configs_hbt.py b/hbt/config/configs_hbt.py index f7a88089..af18ddfa 100644 --- a/hbt/config/configs_hbt.py +++ b/hbt/config/configs_hbt.py @@ -56,19 +56,19 @@ def add_config( # helper to enable processes / datasets only for a specific era def if_era( *, - run: int | None = None, - year: int | None = None, - postfix: str | None = None, - tag: str | None = None, - values: list[str] | None = None, + run: int | set[int] | None = None, + year: int | set[int] | None = None, + postfix: str | set[int] | None = None, + tag: str | set[str] | None = None, + values: list[str | None] | None = None, ) -> list[str]: match = ( - (run is None or campaign.x.run == run) and - (year is None or campaign.x.year == year) and - (postfix is None or campaign.x.postfix == postfix) and - (tag is None or campaign.has_tag(tag)) + (run is None or campaign.x.run in law.util.make_set(run)) and + (year is None or campaign.x.year in law.util.make_set(year)) and + (postfix is None or campaign.x.postfix in law.util.make_set(postfix)) and + (tag is None or campaign.has_tag(tag, mode=any)) ) - return (values or []) if match else [] + return list(filter(bool, values or [])) if match else [] ################################################################################################ # processes @@ -94,7 +94,7 @@ def if_era( processes=[procs.n.ttv, procs.n.ttvv], ) - # add processes we are interested in + # processes we are interested in process_names = [ "data", "tt", @@ -147,9 +147,9 @@ def if_era( if process_name.startswith(("graviton_hh_", "radion_hh_")): proc.add_tag("signal") proc.add_tag("resonant_signal") - if process_name.startswith("tt"): - proc.add_tag("ttbar") - if process_name.startswith("dy"): + if process_name.startswith("tt_"): + proc.add_tag({"ttbar", "tt"}) + if process_name.startswith("dy_"): proc.add_tag("dy") # add the process @@ -172,6 +172,7 @@ def if_era( "hh_ggf_hbb_htt_kl0_kt1_powheg", "hh_ggf_hbb_htt_kl2p45_kt1_powheg", "hh_ggf_hbb_htt_kl5_kt1_powheg", + # vbf "hh_vbf_hbb_htt_kv1_k2v1_kl1_madgraph", "hh_vbf_hbb_htt_kv1_k2v1_kl2_madgraph", @@ -185,6 +186,7 @@ def if_era( "hh_vbf_hbb_htt_kvm1p6_k2v2p72_klm1p36_madgraph", "hh_vbf_hbb_htt_kvm1p83_k2v3p57_klm3p39_madgraph", "hh_vbf_hbb_htt_kvm2p12_k2v3p87_klm5p96_madgraph", + # some resonances "graviton_hh_ggf_hbb_htt_m450_madgraph", "graviton_hh_ggf_hbb_htt_m1200_madgraph", @@ -192,17 +194,13 @@ def if_era( ]), # backgrounds - "tt_sl_powheg", - "tt_dl_powheg", - "tt_fh_powheg", *if_era(run=3, year=2022, values=[ - "ttw_wlnu_amcatnlo", - "ttz_zqq_amcatnlo", - "ttz_zll_m4to50_amcatnlo", - "ttz_zll_m50toinf_amcatnlo", - "ttzz_madgraph", - "ttww_madgraph", - # "ttwz_madgraph", # not available yet + # ttbar + "tt_sl_powheg", + "tt_dl_powheg", + "tt_fh_powheg", + + # single top "st_tchannel_t_4f_powheg", "st_tchannel_tbar_4f_powheg", "st_twchannel_t_sl_powheg", @@ -211,8 +209,23 @@ def if_era( "st_twchannel_tbar_dl_powheg", "st_twchannel_t_fh_powheg", "st_twchannel_tbar_fh_powheg", - # "st_schannel_t_lep_4f_amcatnlo", # no cross section yet - # "st_schannel_tbar_lep_4f_amcatnlo", # no cross section yet + "st_schannel_t_lep_4f_amcatnlo", + "st_schannel_tbar_lep_4f_amcatnlo", + + # tt + v + "ttw_wlnu_amcatnlo", + "ttz_zqq_amcatnlo", + "ttz_zll_m4to50_amcatnlo", + "ttz_zll_m50toinf_amcatnlo", + + # tt + vv + "ttww_madgraph", + *if_era(run=3, year=2022, tag="postEE", values=[ + "ttwz_madgraph", # exists for post, but not for pre + ]), + "ttzz_madgraph", + + # dy "dy_m4to10_amcatnlo", "dy_m10to50_amcatnlo", "dy_m50toinf_amcatnlo", @@ -229,31 +242,58 @@ def if_era( "dy_m50toinf_2j_pt200to400_amcatnlo", "dy_m50toinf_2j_pt400to600_amcatnlo", "dy_m50toinf_2j_pt600toinf_amcatnlo", + + # w + jets "w_lnu_amcatnlo", - "z_qq_pt100to200_1j_amcatnlo", - "z_qq_pt100to200_2j_amcatnlo", - "z_qq_pt200to400_1j_amcatnlo", - "z_qq_pt200to400_2j_amcatnlo", # literally no events selected above 400 GeV + "w_lnu_0j_amcatnlo", + "w_lnu_1j_amcatnlo", + "w_lnu_2j_amcatnlo", + "w_lnu_pt40to100_1j_amcatnlo", + "w_lnu_pt40to100_2j_amcatnlo", + "w_lnu_pt100to200_1j_amcatnlo", + "w_lnu_pt100to200_2j_amcatnlo", + "w_lnu_pt200to400_1j_amcatnlo", + "w_lnu_pt200to400_2j_amcatnlo", + "w_lnu_pt400to600_1j_amcatnlo", + "w_lnu_pt400to600_2j_amcatnlo", + "w_lnu_pt600toinf_1j_amcatnlo", + "w_lnu_pt600toinf_2j_amcatnlo", + + # z + jets (not DY but qq) + # decided to drop z_qq for now as their contribution is negligible, + # but we should check that again at a much later stage + # "z_qq_pt100to200_1j_amcatnlo", + # "z_qq_pt100to200_2j_amcatnlo", + # "z_qq_pt200to400_1j_amcatnlo", + # "z_qq_pt200to400_2j_amcatnlo", + + # vv "zz_pythia", "wz_pythia", "ww_pythia", + + # vvv "zzz_amcatnlo", "wzz_amcatnlo", "wwz_4f_amcatnlo", "www_4f_amcatnlo", + + # single H "h_ggf_htt_powheg", "h_vbf_htt_powheg", "vh_hnonbb_amcatnlo", + "wmh_wlnu_hbb_powheg", + "wph_wlnu_hbb_powheg", + "wph_htt_powheg", + "wmh_htt_powheg", + "wph_wqq_hbb_powheg", + "wmh_wqq_hbb_powheg", "zh_zll_hbb_powheg", "zh_zqq_hbb_powheg", "zh_htt_powheg", - "wph_htt_powheg", - "wmh_htt_powheg", - "wph_wlnu_hbb_powheg", - "wmh_wlnu_hbb_powheg", "zh_gg_zll_hbb_powheg", - "zh_gg_znunu_hbb_powheg", "zh_gg_zqq_hbb_powheg", + "zh_gg_znunu_hbb_powheg", "tth_hbb_powheg", "tth_hnonbb_powheg", ]), @@ -262,18 +302,21 @@ def if_era( *if_era(run=3, year=2022, tag="preEE", values=[ f"data_{stream}_{period}" for stream in ["mu", "e", "tau", "met"] for period in "cd" ]), + *if_era(run=3, year=2022, tag="postEE", values=[ + f"data_{stream}_{period}" for stream in ["mu", "e", "tau", "met"] for period in "efg" + ]), ] for dataset_name in dataset_names: # add the dataset dataset = cfg.add_dataset(campaign.get_dataset(dataset_name)) # add tags to datasets - if dataset.name.startswith("tt"): - dataset.add_tag(("has_top", "is_ttbar")) - elif dataset.name.startswith("st"): - dataset.add_tag(("has_top", "is_single_top")) - if dataset.name.startswith("dy"): - dataset.add_tag("is_dy") + if dataset.name.startswith("tt_"): + dataset.add_tag({"has_top", "ttbar", "tt"}) + if dataset.name.startswith("st_"): + dataset.add_tag({"has_top", "single_top", "st"}) + if dataset.name.startswith("dy_"): + dataset.add_tag("dy") if re.match(r"^(ww|wz|zz)_.*pythia$", dataset.name): dataset.add_tag("no_lhe_weights") if dataset_name.startswith("hh_"): From 4efcc966a2d3eb4bb28743d7b388c24521ad719c Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Mon, 28 Oct 2024 08:28:10 +0100 Subject: [PATCH 17/25] Finish dataset sync, add 22 post configs. --- hbt/config/analysis_hbt.py | 77 +++++++++++++++++-------- hbt/config/configs_hbt.py | 114 ++++++++++++++++++++++++------------- hbt/tasks/stats.py | 53 +++++++++++------ law.cfg | 65 +++++++++++++++------ modules/cmsdb | 2 +- modules/columnflow | 2 +- setup.sh | 22 +++++-- 7 files changed, 229 insertions(+), 106 deletions(-) diff --git a/hbt/config/analysis_hbt.py b/hbt/config/analysis_hbt.py index 31687532..f76a429e 100644 --- a/hbt/config/analysis_hbt.py +++ b/hbt/config/analysis_hbt.py @@ -87,38 +87,67 @@ def factory(configs: od.UniqueObjectIndex): # Run 2 configs # -# 2016 HIPM (also known as APV or preVFP) -add_lazy_config( - campaign_module="cmsdb.campaigns.run2_2016_HIPM_nano_uhh_v12", - campaign_attr="campaign_run2_2016_HIPM_nano_uhh_v12", - config_name="run2_2016_HIPM_nano_uhh_v12", - config_id=1, -) +# 2016 HIPM (also known as APV or preVFP), TODO: campaign needs consistency and content check +# add_lazy_config( +# campaign_module="cmsdb.campaigns.run2_2016_HIPM_nano_uhh_v12", +# campaign_attr="campaign_run2_2016_HIPM_nano_uhh_v12", +# config_name="run2_2016_HIPM_nano_uhh_v12", +# config_id=1, +# ) + +# 2016 (also known postVFP), TODO: campaign needs consistency and content check +# add_lazy_config( +# campaign_module="cmsdb.campaigns.run2_2016_nano_uhh_v12", +# campaign_attr="campaign_run2_2016_nano_uhh_v12", +# config_name="run2_2016_nano_uhh_v12", +# config_id=2, +# ) + +# 2017, old nano version, TODO: needs re-processing +# add_lazy_config( +# campaign_module="cmsdb.campaigns.run2_2017_nano_uhh_v11", +# campaign_attr="campaign_run2_2017_nano_uhh_v11", +# config_name="run2_2017_nano_uhh_v11", +# config_id=3, +# ) + +# 2018, TODO: not processed yet +# add_lazy_config( +# campaign_module="cmsdb.campaigns.run2_2018_nano_uhh_v12", +# campaign_attr="campaign_run2_2018_nano_uhh_v12", +# config_name="run2_2018_nano_uhh_v12", +# config_id=4, +# ) -# 2016 (also known postVFP) -add_lazy_config( - campaign_module="cmsdb.campaigns.run2_2016_nano_uhh_v12", - campaign_attr="campaign_run2_2016_nano_uhh_v12", - config_name="run2_2016_nano_uhh_v12", - config_id=2, -) +# +# Run 3 configs +# -# 2017 +# 2022, preEE, TODO: disabled until re-production of missing datasets is done by Nathan +# add_lazy_config( +# campaign_module="cmsdb.campaigns.run3_2022_preEE_nano_uhh_v12", +# campaign_attr="campaign_run3_2022_preEE_nano_uhh_v12", +# config_name="run3_2022_preEE", +# config_id=5, +# ) + +# 2022, postEE add_lazy_config( - campaign_module="cmsdb.campaigns.run2_2017_nano_uhh_v11", - campaign_attr="campaign_run2_2017_nano_uhh_v11", - config_name="run2_2017_nano_uhh_v11", - config_id=3, + campaign_module="cmsdb.campaigns.run3_2022_postEE_nano_uhh_v12", + campaign_attr="campaign_run3_2022_postEE_nano_uhh_v12", + config_name="run3_2022_postEE", + config_id=6, ) # -# Run 3 configs +# sync configs # # 2022, preEE add_lazy_config( - campaign_module="cmsdb.campaigns.run3_2022_preEE_nano_uhh_v12", - campaign_attr="campaign_run3_2022_preEE_nano_uhh_v12", - config_name="run3_2022_preEE", - config_id=5, + campaign_module="cmsdb.campaigns.run3_2022_preEE_nano_v12", + campaign_attr="campaign_run3_2022_preEE_nano_v12", + config_name="run3_2022_preEE_sync", + config_id=5001, + sync=True, ) diff --git a/hbt/config/configs_hbt.py b/hbt/config/configs_hbt.py index af18ddfa..d39e010c 100644 --- a/hbt/config/configs_hbt.py +++ b/hbt/config/configs_hbt.py @@ -33,6 +33,7 @@ def add_config( config_name: str | None = None, config_id: int | None = None, limit_dataset_files: int | None = None, + sync: bool = False, ) -> od.Config: # gather campaign data run = campaign.x.run @@ -43,6 +44,10 @@ def add_config( assert run in {2, 3} assert year in {2016, 2017, 2018, 2022, 2023} + # renamings + sync_requested = sync + del sync + # get all root processes procs = get_root_processes_from_campaign(campaign) @@ -61,12 +66,14 @@ def if_era( postfix: str | set[int] | None = None, tag: str | set[str] | None = None, values: list[str | None] | None = None, + sync: bool = False, ) -> list[str]: match = ( (run is None or campaign.x.run in law.util.make_set(run)) and (year is None or campaign.x.year in law.util.make_set(year)) and (postfix is None or campaign.x.postfix in law.util.make_set(postfix)) and - (tag is None or campaign.has_tag(tag, mode=any)) + (tag is None or campaign.has_tag(tag, mode=any)) and + (sync is sync_requested) ) return list(filter(bool, values or [])) if match else [] @@ -75,24 +82,25 @@ def if_era( ################################################################################################ # add custom processes - cfg.add_process( - name="v", - id=7997, - label="W/Z", - processes=[procs.n.w, procs.n.z], - ) - cfg.add_process( - name="multiboson", - id=7998, - label="Multiboson", - processes=[procs.n.vv, procs.n.vvv], - ) - cfg.add_process( - name="tt_multiboson", - id=7999, - label=r"$t\bar{t}$ + Multiboson", - processes=[procs.n.ttv, procs.n.ttvv], - ) + if not sync_requested: + cfg.add_process( + name="v", + id=7997, + label="W/Z", + processes=[procs.n.w, procs.n.z], + ) + cfg.add_process( + name="multiboson", + id=7998, + label="Multiboson", + processes=[procs.n.vv, procs.n.vvv], + ) + cfg.add_process( + name="tt_multiboson", + id=7999, + label=r"$t\bar{t}$ + Multiboson", + processes=[procs.n.ttv, procs.n.ttvv], + ) # processes we are interested in process_names = [ @@ -125,9 +133,10 @@ def if_era( "hh_vbf_hbb_htt_kvm1p83_k2v3p57_klm3p39", "hh_vbf_hbb_htt_kvm2p12_k2v3p87_klm5p96", ]), + "radion_hh_ggf_hbb_htt_m450", + "radion_hh_ggf_hbb_htt_m1200", "graviton_hh_ggf_hbb_htt_m450", "graviton_hh_ggf_hbb_htt_m1200", - "radion_hh_ggf_hbb_htt_m700", ] for process_name in process_names: if process_name in procs: @@ -188,9 +197,10 @@ def if_era( "hh_vbf_hbb_htt_kvm2p12_k2v3p87_klm5p96_madgraph", # some resonances + "radion_hh_ggf_hbb_htt_m450_madgraph", + "radion_hh_ggf_hbb_htt_m1200_madgraph", "graviton_hh_ggf_hbb_htt_m450_madgraph", "graviton_hh_ggf_hbb_htt_m1200_madgraph", - "radion_hh_ggf_hbb_htt_m700_madgraph", ]), # backgrounds @@ -248,24 +258,28 @@ def if_era( "w_lnu_0j_amcatnlo", "w_lnu_1j_amcatnlo", "w_lnu_2j_amcatnlo", - "w_lnu_pt40to100_1j_amcatnlo", - "w_lnu_pt40to100_2j_amcatnlo", - "w_lnu_pt100to200_1j_amcatnlo", - "w_lnu_pt100to200_2j_amcatnlo", - "w_lnu_pt200to400_1j_amcatnlo", - "w_lnu_pt200to400_2j_amcatnlo", - "w_lnu_pt400to600_1j_amcatnlo", - "w_lnu_pt400to600_2j_amcatnlo", - "w_lnu_pt600toinf_1j_amcatnlo", - "w_lnu_pt600toinf_2j_amcatnlo", + "w_lnu_1j_pt40to100_amcatnlo", + "w_lnu_1j_pt100to200_amcatnlo", + "w_lnu_1j_pt200to400_amcatnlo", + "w_lnu_1j_pt400to600_amcatnlo", + "w_lnu_1j_pt600toinf_amcatnlo", + "w_lnu_2j_pt40to100_amcatnlo", + "w_lnu_2j_pt100to200_amcatnlo", + "w_lnu_2j_pt200to400_amcatnlo", + "w_lnu_2j_pt400to600_amcatnlo", + "w_lnu_2j_pt600toinf_amcatnlo", # z + jets (not DY but qq) # decided to drop z_qq for now as their contribution is negligible, # but we should check that again at a much later stage - # "z_qq_pt100to200_1j_amcatnlo", - # "z_qq_pt100to200_2j_amcatnlo", - # "z_qq_pt200to400_1j_amcatnlo", - # "z_qq_pt200to400_2j_amcatnlo", + # "z_qq_1j_pt100to200_amcatnlo", + # "z_qq_1j_pt200to400_amcatnlo", + # "z_qq_1j_pt400to600_amcatnlo", + # "z_qq_1j_pt600toinf_amcatnlo", + # "z_qq_2j_pt100to200_amcatnlo", + # "z_qq_2j_pt200to400_amcatnlo", + # "z_qq_2j_pt400to600_amcatnlo", + # "z_qq_2j_pt600toinf_amcatnlo", # vv "zz_pythia", @@ -305,6 +319,11 @@ def if_era( *if_era(run=3, year=2022, tag="postEE", values=[ f"data_{stream}_{period}" for stream in ["mu", "e", "tau", "met"] for period in "efg" ]), + + # sync + *if_era(run=3, year=2022, tag="preEE", sync=True, values=[ + "hh_ggf_hbb_htt_kl1_kt1_powheg", + ]), ] for dataset_name in dataset_names: # add the dataset @@ -322,16 +341,30 @@ def if_era( if dataset_name.startswith("hh_"): dataset.add_tag("signal") dataset.add_tag("nonresonant_signal") + if dataset_name.startswith("hh_ggf_"): + dataset.add_tag("ggf") + elif dataset_name.startswith("hh_vbf_"): + dataset.add_tag("vbf") if dataset_name.startswith(("graviton_hh_", "radion_hh_")): dataset.add_tag("signal") dataset.add_tag("resonant_signal") + if dataset_name.startswith(("graviton_hh_ggf_", "radion_hh_ggf")): + dataset.add_tag("ggf") + elif dataset_name.startswith(("graviton_hh_vbf_", "radion_hh_vbf")): + dataset.add_tag("vbf") # apply an optional limit on the number of files if limit_dataset_files: for info in dataset.info.values(): info.n_files = min(info.n_files, limit_dataset_files) - # verify that the root process of all datasets is part of any of the registered processes + # apply synchronization settings + if sync_requested: + # only first file per + for info in dataset.info.values(): + info.n_files = 1 + + # verify that the root process of each dataset is part of any of the registered processes verify_config_processes(cfg, warn=True) ################################################################################################ @@ -377,8 +410,9 @@ def if_era( "sm_data": ["data"] + sm_group, } - # define inclusive datasets for the dy process identification with corresponding leaf processes - if run == 3: + # define inclusive datasets for the stitched process identification with corresponding leaf processes + if run == 3 and not sync_requested: + # drell-yan cfg.x.dy_stitching = { "m50toinf": { "inclusive_dataset": cfg.datasets.n.dy_m50toinf_amcatnlo, @@ -394,6 +428,8 @@ def if_era( ], }, } + # w+jets + # TODO: add # dataset groups for conveniently looping over certain datasets # (used in wrapper_factory and during plotting) @@ -1149,7 +1185,7 @@ def add_external(name, value): cfg.x.get_dataset_lfns_sandbox = None # whether to validate the number of obtained LFNs in GetDatasetLFNs - cfg.x.validate_dataset_lfns = limit_dataset_files is None + cfg.x.validate_dataset_lfns = limit_dataset_files is None and not sync_requested # custom lfn retrieval method in case the underlying campaign is custom uhh if cfg.campaign.x("custom", {}).get("creator") == "uhh": diff --git a/hbt/tasks/stats.py b/hbt/tasks/stats.py index c5aa96fa..67692a66 100644 --- a/hbt/tasks/stats.py +++ b/hbt/tasks/stats.py @@ -30,48 +30,65 @@ def run(self): # color helpers green = functools.partial(law.util.colored, color="green") + green_bright = functools.partial(law.util.colored, color="green", style="bright") + yellow = functools.partial(law.util.colored, color="yellow") + yellow_bright = functools.partial(law.util.colored, color="yellow", style="bright") red = functools.partial(law.util.colored, color="red") - red_bright = functools.partial(law.util.colored, color="red", style="bright") cyan = functools.partial(law.util.colored, color="cyan") + cyan_bright = functools.partial(law.util.colored, color="cyan", style="bright") bright = functools.partial(law.util.colored, style="bright") + def get_color(dataset_inst): + if dataset_inst.is_data: + return red + if dataset_inst.has_tag("nonresonant_signal"): + return green if dataset_inst.has_tag("ggf") else green_bright + if dataset_inst.has_tag("resonant_signal"): + return cyan if dataset_inst.has_tag("ggf") else cyan_bright + return yellow + # headers headers = ["Dataset", "Files", "Events"] # content rows = [] - sum_files_nominal, sum_events_nominal = 0, 0 - sum_files_syst, sum_events_syst = 0, 0 + sum_files_s_nonres, sum_events_s_nonres = 0, 0 + sum_files_s_res, sum_events_s_res = 0, 0 + sum_files_b_nom, sum_events_b_nom = 0, 0 + sum_files_b_syst, sum_events_b_syst = 0, 0 sum_files_data, sum_events_data = 0, 0 for dataset_inst in self.config_inst.datasets: - col = ( - cyan - if dataset_inst.is_data - else (green if dataset_inst.name.startswith("hh_") else red) - ) + col = get_color(dataset_inst) # nominal info rows.append([col(dataset_inst.name), dataset_inst.n_files, dataset_inst.n_events]) # increment sums if dataset_inst.is_data: sum_files_data += dataset_inst.n_files sum_events_data += dataset_inst.n_events + elif dataset_inst.has_tag("nonresonant_signal"): + sum_files_s_nonres += dataset_inst.n_files + sum_events_s_nonres += dataset_inst.n_events + elif dataset_inst.has_tag("resonant_signal"): + sum_files_s_res += dataset_inst.n_files + sum_events_s_res += dataset_inst.n_events else: - sum_files_nominal += dataset_inst.n_files - sum_events_nominal += dataset_inst.n_events + sum_files_b_nom += dataset_inst.n_files + sum_events_b_nom += dataset_inst.n_events # potential shifts for shift_name, info in dataset_inst.info.items(): if shift_name == "nominal" or shift_name not in self.config_inst.shifts: continue - rows.append([red_bright(f" → {shift_name}"), info.n_files, info.n_events]) + rows.append([yellow_bright(f" → {shift_name}"), info.n_files, info.n_events]) # increment sums - sum_files_syst += info.n_files - sum_events_syst += info.n_events + sum_files_b_syst += info.n_files + sum_events_b_syst += info.n_events + # sums - rows.append([bright("total MC (nominal)"), sum_files_nominal, sum_events_nominal]) - if sum_files_syst or sum_events_syst: - sum_files = sum_files_nominal + sum_files_syst - sum_events = sum_events_nominal + sum_events_syst - rows.append([bright("total MC (all)"), sum_files, sum_events]) + rows.append([bright("total signal (non-res.)"), sum_files_s_nonres, sum_events_s_nonres]) + rows.append([bright("total signal (res.)"), sum_files_s_res, sum_events_s_res]) + rows.append([bright("total background (nominal)"), sum_files_b_nom, sum_events_b_nom]) + if sum_files_b_syst or sum_events_b_syst: + rows.append([bright("total background (syst.)"), sum_files_b_syst, sum_events_b_syst]) if sum_files_data or sum_events_data: rows.append([bright("total data"), sum_files_data, sum_events_data]) diff --git a/law.cfg b/law.cfg index a58dc10a..2c49d992 100644 --- a/law.cfg +++ b/law.cfg @@ -38,7 +38,7 @@ job_file_dir_mkdtemp: sub_{{task_id}}_XXX [analysis] default_analysis: hbt.config.analysis_hbt.analysis_hbt -default_config: run3_2022_preEE +default_config: run3_2022_postEE default_dataset: hh_ggf_hbb_htt_kl1_kt1_powheg calibration_modules: columnflow.calibration.cms.{jets,met,tau}, hbt.calibration.{default,fake_triggers} @@ -99,18 +99,32 @@ lfn_sources: wlcg_fs_desy_store, wlcg_fs_infn_redirector, wlcg_fs_global_redirec # the "store_parts_modifiers" can be the name of a function in the "store_parts_modifiers" aux dict # of the analysis instance, which is called with an output's store parts of an output to modify them # specific locations -run3_2022_preEE*__cf.CalibrateEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.SelectEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.ReduceEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeReducedEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeReductionStats: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeSelectionStats: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.ProduceColumns: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.UniteColumns: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.CreateHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeShiftedHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.CreateDatacards: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.CalibrateEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.SelectEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.ReduceEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeReducedEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeReductionStats: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeSelectionStats: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.ProduceColumns: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.UniteColumns: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.CreateHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeShiftedHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.CreateDatacards: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos + +run3_2022_postEE__cf.CalibrateEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.SelectEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.ReduceEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeReducedEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeReductionStats: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeSelectionStats: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.ProduceColumns: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.UniteColumns: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.CreateHistograms: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeHistograms: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeShiftedHistograms: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.CreateDatacards: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan + # fallbacks cf.BundleRepo: wlcg cf.BundleSoftware: wlcg @@ -137,10 +151,10 @@ cf.CreateSyncFile: wlcg [versions] -run3_2022_preEE*__cf.CalibrateEvents: prod1 -run3_2022_preEE*__cf.MergeSelectionStats: prod1 -run3_2022_preEE*__cf.MergeReductionStats: prod1 -run3_2022_preEE*__cf.ProvideReducedEvents: prod1 +run3_2022_preEE__cf.CalibrateEvents: prod1 +run3_2022_preEE__cf.MergeSelectionStats: prod1 +run3_2022_preEE__cf.MergeReductionStats: prod1 +run3_2022_preEE__cf.ProvideReducedEvents: prod1 # @@ -353,6 +367,23 @@ cache_global_lock: True base: file:///pnfs/desy.de/cms/tier2/store/user/nprouvos/nanogen_store/MergeNano/config_22pre_v12/prod3 +[wlcg_fs_run3_2022_postEE_nano_uhh_v12] + +webdav_base: davs://dcache-cms-webdav-wan.desy.de:2880/pnfs/desy.de/cms/tier2/store/user/aalvesan/nanogen_store/MergeNano/config_22post_v12/prod1 +gsiftp_base: gsiftp://dcache-door-cms04.desy.de:2811/pnfs/desy.de/cms/tier2/store/user/aalvesan/nanogen_store/MergeNano/config_22post_v12/prod1 +xrootd_base: root://dcache-cms-xrootd.desy.de:1094/pnfs/desy.de/cms/tier2/store/user/aalvesan/nanogen_store/MergeNano/config_22post_v12/prod1 +base: &::xrootd_base +use_cache: $CF_WLCG_USE_CACHE +cache_root: $CF_WLCG_CACHE_ROOT +cache_cleanup: $CF_WLCG_CACHE_CLEANUP +cache_max_size: 15GB +cache_global_lock: True + +[local_fs_run3_2022_postEE_nano_uhh_v12] + +base: file:///pnfs/desy.de/cms/tier2/store/user/aalvesan/nanogen_store/MergeNano/config_22post_v12/prod1 + + # # file systems for central LFNs # diff --git a/modules/cmsdb b/modules/cmsdb index c88e17bf..bb9d0255 160000 --- a/modules/cmsdb +++ b/modules/cmsdb @@ -1 +1 @@ -Subproject commit c88e17bfb30ab9a39e85fe73e070a221dea02f8d +Subproject commit bb9d02555b50e773a0435223e234c167e0af4d62 diff --git a/modules/columnflow b/modules/columnflow index 0652ad1e..b3db9ec0 160000 --- a/modules/columnflow +++ b/modules/columnflow @@ -1 +1 @@ -Subproject commit 0652ad1eb169deb80f32a26466df30ecbfda5230 +Subproject commit b3db9ec0648c9ce0a8cdf39bb89853433af84e0d diff --git a/setup.sh b/setup.sh index 62cca430..7864df38 100644 --- a/setup.sh +++ b/setup.sh @@ -45,6 +45,7 @@ setup_hbt() { local orig="${PWD}" local setup_name="${1:-default}" local setup_is_default="false" + local env_is_remote="$( [ "${CF_REMOTE_ENV}" = "1" ] && echo "true" || echo "false" )" [ "${setup_name}" = "default" ] && setup_is_default="true" # zsh options @@ -72,7 +73,7 @@ setup_hbt() { CF_SKIP_SETUP="1" source "${CF_BASE}/setup.sh" "" || return "$?" # interactive setup - if [ "${CF_REMOTE_ENV}" != "1" ]; then + if ! ${env_is_remote}; then cf_setup_interactive_body() { # the flavor will be cms export CF_FLAVOR="cms" @@ -133,12 +134,21 @@ setup_hbt() { export LAW_HOME="${LAW_HOME:-${HBT_BASE}/.law}" export LAW_CONFIG_FILE="${LAW_CONFIG_FILE:-${HBT_BASE}/law.cfg}" - if which law &> /dev/null; then - # source law's bash completion scipt - source "$( law completion )" "" + # run the indexing when not remote + if ! ${env_is_remote}; then + if which law &> /dev/null; then + # source law's bash completion scipt + source "$( law completion )" "" - # silently index - law index -q + # silently index + law index -q + fi + fi + + # update the law config file to switch from mirrored to bare wlcg targets + # as local mounts are typically not available remotely + if ${env_is_remote}; then + sed -i -r 's/(.+\: ?)wlcg_mirrored, local_.+, ?(wlcg_[^\s]+)/\1wlcg, \2/g' "${LAW_CONFIG_FILE}" fi # finalize From 83531208b0a59cf090334aa9c87fdba3f6d61e20 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Mon, 28 Oct 2024 13:45:11 +0100 Subject: [PATCH 18/25] Update upstream cf. --- modules/columnflow | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/columnflow b/modules/columnflow index b3db9ec0..d67acd24 160000 --- a/modules/columnflow +++ b/modules/columnflow @@ -1 +1 @@ -Subproject commit b3db9ec0648c9ce0a8cdf39bb89853433af84e0d +Subproject commit d67acd24352d8eb5d13d32d79e3cc776ea5eb686 From d799cc4340392e41fe2e3a560e3351483b29b88f Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Mon, 28 Oct 2024 13:55:15 +0100 Subject: [PATCH 19/25] Rename sync -> sync_mode. --- hbt/config/analysis_hbt.py | 2 +- hbt/config/configs_hbt.py | 16 ++++++---------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/hbt/config/analysis_hbt.py b/hbt/config/analysis_hbt.py index f76a429e..f7d86509 100644 --- a/hbt/config/analysis_hbt.py +++ b/hbt/config/analysis_hbt.py @@ -149,5 +149,5 @@ def factory(configs: od.UniqueObjectIndex): campaign_attr="campaign_run3_2022_preEE_nano_v12", config_name="run3_2022_preEE_sync", config_id=5001, - sync=True, + sync_mode=True, ) diff --git a/hbt/config/configs_hbt.py b/hbt/config/configs_hbt.py index d39e010c..3542d20e 100644 --- a/hbt/config/configs_hbt.py +++ b/hbt/config/configs_hbt.py @@ -33,7 +33,7 @@ def add_config( config_name: str | None = None, config_id: int | None = None, limit_dataset_files: int | None = None, - sync: bool = False, + sync_mode: bool = False, ) -> od.Config: # gather campaign data run = campaign.x.run @@ -44,10 +44,6 @@ def add_config( assert run in {2, 3} assert year in {2016, 2017, 2018, 2022, 2023} - # renamings - sync_requested = sync - del sync - # get all root processes procs = get_root_processes_from_campaign(campaign) @@ -73,7 +69,7 @@ def if_era( (year is None or campaign.x.year in law.util.make_set(year)) and (postfix is None or campaign.x.postfix in law.util.make_set(postfix)) and (tag is None or campaign.has_tag(tag, mode=any)) and - (sync is sync_requested) + (sync is sync_mode) ) return list(filter(bool, values or [])) if match else [] @@ -82,7 +78,7 @@ def if_era( ################################################################################################ # add custom processes - if not sync_requested: + if not sync_mode: cfg.add_process( name="v", id=7997, @@ -359,7 +355,7 @@ def if_era( info.n_files = min(info.n_files, limit_dataset_files) # apply synchronization settings - if sync_requested: + if sync_mode: # only first file per for info in dataset.info.values(): info.n_files = 1 @@ -411,7 +407,7 @@ def if_era( } # define inclusive datasets for the stitched process identification with corresponding leaf processes - if run == 3 and not sync_requested: + if run == 3 and not sync_mode: # drell-yan cfg.x.dy_stitching = { "m50toinf": { @@ -1185,7 +1181,7 @@ def add_external(name, value): cfg.x.get_dataset_lfns_sandbox = None # whether to validate the number of obtained LFNs in GetDatasetLFNs - cfg.x.validate_dataset_lfns = limit_dataset_files is None and not sync_requested + cfg.x.validate_dataset_lfns = limit_dataset_files is None and not sync_mode # custom lfn retrieval method in case the underlying campaign is custom uhh if cfg.campaign.x("custom", {}).get("creator") == "uhh": From d9bebbd39467ebf12206bbf466800e5eb65dc9e5 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Tue, 29 Oct 2024 17:52:15 +0100 Subject: [PATCH 20/25] Add back now completed 22pre config. --- hbt/config/analysis_hbt.py | 14 +++++++------- law.cfg | 3 ++- modules/cmsdb | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/hbt/config/analysis_hbt.py b/hbt/config/analysis_hbt.py index f7d86509..e8107512 100644 --- a/hbt/config/analysis_hbt.py +++ b/hbt/config/analysis_hbt.py @@ -123,13 +123,13 @@ def factory(configs: od.UniqueObjectIndex): # Run 3 configs # -# 2022, preEE, TODO: disabled until re-production of missing datasets is done by Nathan -# add_lazy_config( -# campaign_module="cmsdb.campaigns.run3_2022_preEE_nano_uhh_v12", -# campaign_attr="campaign_run3_2022_preEE_nano_uhh_v12", -# config_name="run3_2022_preEE", -# config_id=5, -# ) +# 2022, preEE +add_lazy_config( + campaign_module="cmsdb.campaigns.run3_2022_preEE_nano_uhh_v12", + campaign_attr="campaign_run3_2022_preEE_nano_uhh_v12", + config_name="run3_2022_preEE", + config_id=5, +) # 2022, postEE add_lazy_config( diff --git a/law.cfg b/law.cfg index 2c49d992..7e80b772 100644 --- a/law.cfg +++ b/law.cfg @@ -99,6 +99,7 @@ lfn_sources: wlcg_fs_desy_store, wlcg_fs_infn_redirector, wlcg_fs_global_redirec # the "store_parts_modifiers" can be the name of a function in the "store_parts_modifiers" aux dict # of the analysis instance, which is called with an output's store parts of an output to modify them # specific locations +; 22pre run3_2022_preEE__cf.CalibrateEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos run3_2022_preEE__cf.SelectEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos run3_2022_preEE__cf.ReduceEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos @@ -111,7 +112,7 @@ run3_2022_preEE__cf.CreateHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlc run3_2022_preEE__cf.MergeHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos run3_2022_preEE__cf.MergeShiftedHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos run3_2022_preEE__cf.CreateDatacards: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos - +; 22post run3_2022_postEE__cf.CalibrateEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan run3_2022_postEE__cf.SelectEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan run3_2022_postEE__cf.ReduceEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan diff --git a/modules/cmsdb b/modules/cmsdb index bb9d0255..2731b76d 160000 --- a/modules/cmsdb +++ b/modules/cmsdb @@ -1 +1 @@ -Subproject commit bb9d02555b50e773a0435223e234c167e0af4d62 +Subproject commit 2731b76dc166fb5cad377e38958fb3ae77caa51c From fd1063e170a3758fcde076333824a6c1b3d94d1b Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Wed, 30 Oct 2024 09:57:27 +0100 Subject: [PATCH 21/25] Store sync flag on config. --- hbt/config/configs_hbt.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/hbt/config/configs_hbt.py b/hbt/config/configs_hbt.py index 3542d20e..5d15f626 100644 --- a/hbt/config/configs_hbt.py +++ b/hbt/config/configs_hbt.py @@ -48,7 +48,14 @@ def add_config( procs = get_root_processes_from_campaign(campaign) # create a config by passing the campaign, so id and name will be identical - cfg = od.Config(name=config_name, id=config_id, campaign=campaign) + cfg = od.Config( + name=config_name, + id=config_id, + campaign=campaign, + aux={ + "sync": sync_mode, + }, + ) ################################################################################################ # helpers From 86d073496c0a540ef6b6401f285dbd443fe9d8bb Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 7 Nov 2024 18:51:53 +0100 Subject: [PATCH 22/25] Update upstream cf. --- modules/columnflow | 2 +- setup.sh | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/modules/columnflow b/modules/columnflow index d67acd24..273b0ba7 160000 --- a/modules/columnflow +++ b/modules/columnflow @@ -1 +1 @@ -Subproject commit d67acd24352d8eb5d13d32d79e3cc776ea5eb686 +Subproject commit 273b0ba7049e4999b742910ab53a67ad94526233 diff --git a/setup.sh b/setup.sh index 7864df38..2c8a0b07 100644 --- a/setup.sh +++ b/setup.sh @@ -28,7 +28,7 @@ setup_hbt() { # A flag that is set to 1 after the setup was successful. # prevent repeated setups - if [ "${HBT_SETUP}" = "1" ]; then + if [ "${HBT_SETUP}" = "1" ] && [ "${CF_ON_SLURM}" != "1" ]; then >&2 echo "the HH -> bbtautau analysis was already succesfully setup" >&2 echo "re-running the setup requires a new shell" return "1" @@ -140,6 +140,9 @@ setup_hbt() { # source law's bash completion scipt source "$( law completion )" "" + # add completion to the claw command + complete -o bashdefault -o default -F _law_complete claw + # silently index law index -q fi From bfa38a5ac41a1d8863e05ffb3b659c7531b1f3ef Mon Sep 17 00:00:00 2001 From: Nathan Prouvost Date: Wed, 13 Nov 2024 23:21:53 +0100 Subject: [PATCH 23/25] only allow specific data samples to pass the different channel selections, add test to ensure that no event ends up in two channels in mc --- hbt/selection/lepton.py | 171 +++++++++++++++++++++++----------------- 1 file changed, 97 insertions(+), 74 deletions(-) diff --git a/hbt/selection/lepton.py b/hbt/selection/lepton.py index e1c567df..493d47f8 100644 --- a/hbt/selection/lepton.py +++ b/hbt/selection/lepton.py @@ -43,6 +43,28 @@ def trigger_object_matching( return any_match +def new_channel_ids( + events: ak.Array, + previous_channel_ids: ak.Array, + correct_channel_id: int, + is_mask: ak.Array, +) -> ak.Array: + """ + Check if the events in the is_mask can be inside the given channel + or have already been sorted in another channel before. + """ + events_not_in_channel = (previous_channel_ids != 0) & (previous_channel_ids != correct_channel_id) + channel_id_overwrite = events_not_in_channel & is_mask + if ak.any(channel_id_overwrite): + raise ValueError( + "The channel_ids of some events are being set to two different values. " + "The first event of this chunk concerned has index", + ak.where(channel_id_overwrite)[0], + ) + else: + return ak.where(is_mask, correct_channel_id, previous_channel_ids) + + @selector( uses={ "Electron.pt", "Electron.eta", "Electron.phi", "Electron.dxy", "Electron.dz", @@ -439,85 +461,86 @@ def lepton_selection( ) # lepton pair selecton per trigger via lepton counting + if trigger.has_tag({"single_e", "cross_e_tau"}): - # expect 1 electron, 1 veto electron (the same one), 0 veto muons, and at least one tau - is_etau = ( - trigger_fired & - (ak.num(electron_indices, axis=1) == 1) & - (ak.num(electron_veto_indices, axis=1) == 1) & - (ak.num(muon_veto_indices, axis=1) == 0) & - (ak.num(tau_indices, axis=1) >= 1) - ) - is_iso = ak.sum(tau_iso_mask, axis=1) >= 1 - # determine the os/ss charge sign relation - e_charge = ak.firsts(events.Electron[electron_indices].charge, axis=1) - tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) - is_os = e_charge == -tau_charge - # store global variables - where = (channel_id == 0) & is_etau - channel_id = ak.where(where, ch_etau.id, channel_id) - tau2_isolated = ak.where(where, is_iso, tau2_isolated) - leptons_os = ak.where(where, is_os, leptons_os) - single_triggered = ak.where(where & is_single, True, single_triggered) - cross_triggered = ak.where(where & is_cross, True, cross_triggered) - sel_electron_indices = ak.where(where, electron_indices, sel_electron_indices) - sel_tau_indices = ak.where(where, tau_indices, sel_tau_indices) + if (self.dataset_inst.is_mc) or ("data_e_" in self.dataset_inst.name): + # expect 1 electron, 1 veto electron (the same one), 0 veto muons, and at least one tau + is_etau = ( + trigger_fired & + (ak.num(electron_indices, axis=1) == 1) & + (ak.num(electron_veto_indices, axis=1) == 1) & + (ak.num(muon_veto_indices, axis=1) == 0) & + (ak.num(tau_indices, axis=1) >= 1) + ) + is_iso = ak.sum(tau_iso_mask, axis=1) >= 1 + # determine the os/ss charge sign relation + e_charge = ak.firsts(events.Electron[electron_indices].charge, axis=1) + tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) + is_os = e_charge == -tau_charge + # store global variables + channel_id = new_channel_ids(events, channel_id, ch_etau.id, is_etau) + tau2_isolated = ak.where(is_etau, is_iso, tau2_isolated) + leptons_os = ak.where(is_etau, is_os, leptons_os) + single_triggered = ak.where(is_etau & is_single, True, single_triggered) + cross_triggered = ak.where(is_etau & is_cross, True, cross_triggered) + sel_electron_indices = ak.where(is_etau, electron_indices, sel_electron_indices) + sel_tau_indices = ak.where(is_etau, tau_indices, sel_tau_indices) elif trigger.has_tag({"single_mu", "cross_mu_tau"}): - # expect 1 muon, 1 veto muon (the same one), 0 veto electrons, and at least one tau - is_mutau = ( - trigger_fired & - (ak.num(muon_indices, axis=1) == 1) & - (ak.num(muon_veto_indices, axis=1) == 1) & - (ak.num(electron_veto_indices, axis=1) == 0) & - (ak.num(tau_indices, axis=1) >= 1) - ) - is_iso = ak.sum(tau_iso_mask, axis=1) >= 1 - # determine the os/ss charge sign relation - mu_charge = ak.firsts(events.Muon[muon_indices].charge, axis=1) - tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) - is_os = mu_charge == -tau_charge - # store global variables - where = (channel_id == 0) & is_mutau - channel_id = ak.where(where, ch_mutau.id, channel_id) - tau2_isolated = ak.where(where, is_iso, tau2_isolated) - leptons_os = ak.where(where, is_os, leptons_os) - single_triggered = ak.where(where & is_single, True, single_triggered) - cross_triggered = ak.where(where & is_cross, True, cross_triggered) - sel_muon_indices = ak.where(where, muon_indices, sel_muon_indices) - sel_tau_indices = ak.where(where, tau_indices, sel_tau_indices) + if (self.dataset_inst.is_mc) or ("data_mu_" in self.dataset_inst.name): + # expect 1 muon, 1 veto muon (the same one), 0 veto electrons, and at least one tau + is_mutau = ( + trigger_fired & + (ak.num(muon_indices, axis=1) == 1) & + (ak.num(muon_veto_indices, axis=1) == 1) & + (ak.num(electron_veto_indices, axis=1) == 0) & + (ak.num(tau_indices, axis=1) >= 1) + ) + is_iso = ak.sum(tau_iso_mask, axis=1) >= 1 + # determine the os/ss charge sign relation + mu_charge = ak.firsts(events.Muon[muon_indices].charge, axis=1) + tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) + is_os = mu_charge == -tau_charge + # store global variables + channel_id = new_channel_ids(events, channel_id, ch_mutau.id, is_mutau) + tau2_isolated = ak.where(is_mutau, is_iso, tau2_isolated) + leptons_os = ak.where(is_mutau, is_os, leptons_os) + single_triggered = ak.where(is_mutau & is_single, True, single_triggered) + cross_triggered = ak.where(is_mutau & is_cross, True, cross_triggered) + sel_muon_indices = ak.where(is_mutau, muon_indices, sel_muon_indices) + sel_tau_indices = ak.where(is_mutau, tau_indices, sel_tau_indices) elif trigger.has_tag({"cross_tau_tau", "cross_tau_tau_vbf", "cross_tau_tau_jet"}): - # expect 0 veto electrons, 0 veto muons and at least two taus of which one is isolated - is_tautau = ( - trigger_fired & - (ak.num(electron_veto_indices, axis=1) == 0) & - (ak.num(muon_veto_indices, axis=1) == 0) & - (ak.num(tau_indices, axis=1) >= 2) & - (ak.sum(tau_iso_mask, axis=1) >= 1) - ) - - # special case for cross tau vbf trigger: - # to avoid overlap, with non-vbf triggers, only one tau is allowed to have pt > 40 - if trigger.has_tag("cross_tau_tau_vbf"): - is_tautau = is_tautau & (ak.sum(events.Tau[tau_indices].pt > 40, axis=1) <= 1) - - is_iso = ak.sum(tau_iso_mask, axis=1) >= 2 - # tau_indices are sorted by highest isolation as cond. 1 and highest pt as cond. 2, so - # the first two indices are exactly those selected by the full-blown pairing algorithm - # and there is no need here to apply it again :) - # determine the os/ss charge sign relation - tau1_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) - tau2_charge = ak.firsts(events.Tau[tau_indices].charge[..., 1:], axis=1) - is_os = tau1_charge == -tau2_charge - # store global variables - where = (channel_id == 0) & is_tautau - channel_id = ak.where(where, ch_tautau.id, channel_id) - tau2_isolated = ak.where(where, is_iso, tau2_isolated) - leptons_os = ak.where(where, is_os, leptons_os) - single_triggered = ak.where(where & is_single, True, single_triggered) - cross_triggered = ak.where(where & is_cross, True, cross_triggered) - sel_tau_indices = ak.where(where, tau_indices, sel_tau_indices) + if (self.dataset_inst.is_mc) or ("data_tau_" in self.dataset_inst.name): + # expect 0 veto electrons, 0 veto muons and at least two taus of which one is isolated + is_tautau = ( + trigger_fired & + (ak.num(electron_veto_indices, axis=1) == 0) & + (ak.num(muon_veto_indices, axis=1) == 0) & + (ak.num(tau_indices, axis=1) >= 2) & + (ak.sum(tau_iso_mask, axis=1) >= 1) + ) + + # special case for cross tau vbf trigger: + # to avoid overlap, with non-vbf triggers, only one tau is allowed to have pt > 40 + if trigger.has_tag("cross_tau_tau_vbf"): + is_tautau = is_tautau & (ak.sum(events.Tau[tau_indices].pt > 40, axis=1) <= 1) + + is_iso = ak.sum(tau_iso_mask, axis=1) >= 2 + # tau_indices are sorted by highest isolation as cond. 1 and highest pt as cond. 2, so + # the first two indices are exactly those selected by the full-blown pairing algorithm + # and there is no need here to apply it again :) + # determine the os/ss charge sign relation + tau1_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) + tau2_charge = ak.firsts(events.Tau[tau_indices].charge[..., 1:], axis=1) + is_os = tau1_charge == -tau2_charge + # store global variables + channel_id = new_channel_ids(events, channel_id, ch_tautau.id, is_tautau) + tau2_isolated = ak.where(is_tautau, is_iso, tau2_isolated) + leptons_os = ak.where(is_tautau, is_os, leptons_os) + single_triggered = ak.where(is_tautau & is_single, True, single_triggered) + cross_triggered = ak.where(is_tautau & is_cross, True, cross_triggered) + sel_tau_indices = ak.where(is_tautau, tau_indices, sel_tau_indices) # some final type conversions channel_id = ak.values_astype(channel_id, np.uint8) From 2dcec63c771e94cf3169f648508f0520c65cf04a Mon Sep 17 00:00:00 2001 From: Nathan Prouvost Date: Thu, 14 Nov 2024 16:59:26 +0100 Subject: [PATCH 24/25] choose prettier names, remove changes for double counting --- hbt/selection/lepton.py | 152 +++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 79 deletions(-) diff --git a/hbt/selection/lepton.py b/hbt/selection/lepton.py index 493d47f8..5a22a612 100644 --- a/hbt/selection/lepton.py +++ b/hbt/selection/lepton.py @@ -43,26 +43,25 @@ def trigger_object_matching( return any_match -def new_channel_ids( +def update_channel_ids( events: ak.Array, previous_channel_ids: ak.Array, correct_channel_id: int, - is_mask: ak.Array, + channel_mask: ak.Array, ) -> ak.Array: """ Check if the events in the is_mask can be inside the given channel or have already been sorted in another channel before. """ events_not_in_channel = (previous_channel_ids != 0) & (previous_channel_ids != correct_channel_id) - channel_id_overwrite = events_not_in_channel & is_mask + channel_id_overwrite = events_not_in_channel & channel_mask if ak.any(channel_id_overwrite): raise ValueError( "The channel_ids of some events are being set to two different values. " "The first event of this chunk concerned has index", ak.where(channel_id_overwrite)[0], ) - else: - return ak.where(is_mask, correct_channel_id, previous_channel_ids) + return ak.where(channel_mask, correct_channel_id, previous_channel_ids) @selector( @@ -463,84 +462,79 @@ def lepton_selection( # lepton pair selecton per trigger via lepton counting if trigger.has_tag({"single_e", "cross_e_tau"}): - if (self.dataset_inst.is_mc) or ("data_e_" in self.dataset_inst.name): - # expect 1 electron, 1 veto electron (the same one), 0 veto muons, and at least one tau - is_etau = ( - trigger_fired & - (ak.num(electron_indices, axis=1) == 1) & - (ak.num(electron_veto_indices, axis=1) == 1) & - (ak.num(muon_veto_indices, axis=1) == 0) & - (ak.num(tau_indices, axis=1) >= 1) - ) - is_iso = ak.sum(tau_iso_mask, axis=1) >= 1 - # determine the os/ss charge sign relation - e_charge = ak.firsts(events.Electron[electron_indices].charge, axis=1) - tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) - is_os = e_charge == -tau_charge - # store global variables - channel_id = new_channel_ids(events, channel_id, ch_etau.id, is_etau) - tau2_isolated = ak.where(is_etau, is_iso, tau2_isolated) - leptons_os = ak.where(is_etau, is_os, leptons_os) - single_triggered = ak.where(is_etau & is_single, True, single_triggered) - cross_triggered = ak.where(is_etau & is_cross, True, cross_triggered) - sel_electron_indices = ak.where(is_etau, electron_indices, sel_electron_indices) - sel_tau_indices = ak.where(is_etau, tau_indices, sel_tau_indices) + # expect 1 electron, 1 veto electron (the same one), 0 veto muons, and at least one tau + is_etau = ( + trigger_fired & + (ak.num(electron_indices, axis=1) == 1) & + (ak.num(electron_veto_indices, axis=1) == 1) & + (ak.num(muon_veto_indices, axis=1) == 0) & + (ak.num(tau_indices, axis=1) >= 1) + ) + is_iso = ak.sum(tau_iso_mask, axis=1) >= 1 + # determine the os/ss charge sign relation + e_charge = ak.firsts(events.Electron[electron_indices].charge, axis=1) + tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) + is_os = e_charge == -tau_charge + # store global variables + channel_id = update_channel_ids(events, channel_id, ch_etau.id, is_etau) + tau2_isolated = ak.where(is_etau, is_iso, tau2_isolated) + leptons_os = ak.where(is_etau, is_os, leptons_os) + single_triggered = ak.where(is_etau & is_single, True, single_triggered) + cross_triggered = ak.where(is_etau & is_cross, True, cross_triggered) + sel_electron_indices = ak.where(is_etau, electron_indices, sel_electron_indices) + sel_tau_indices = ak.where(is_etau, tau_indices, sel_tau_indices) elif trigger.has_tag({"single_mu", "cross_mu_tau"}): - if (self.dataset_inst.is_mc) or ("data_mu_" in self.dataset_inst.name): - # expect 1 muon, 1 veto muon (the same one), 0 veto electrons, and at least one tau - is_mutau = ( - trigger_fired & - (ak.num(muon_indices, axis=1) == 1) & - (ak.num(muon_veto_indices, axis=1) == 1) & - (ak.num(electron_veto_indices, axis=1) == 0) & - (ak.num(tau_indices, axis=1) >= 1) - ) - is_iso = ak.sum(tau_iso_mask, axis=1) >= 1 - # determine the os/ss charge sign relation - mu_charge = ak.firsts(events.Muon[muon_indices].charge, axis=1) - tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) - is_os = mu_charge == -tau_charge - # store global variables - channel_id = new_channel_ids(events, channel_id, ch_mutau.id, is_mutau) - tau2_isolated = ak.where(is_mutau, is_iso, tau2_isolated) - leptons_os = ak.where(is_mutau, is_os, leptons_os) - single_triggered = ak.where(is_mutau & is_single, True, single_triggered) - cross_triggered = ak.where(is_mutau & is_cross, True, cross_triggered) - sel_muon_indices = ak.where(is_mutau, muon_indices, sel_muon_indices) - sel_tau_indices = ak.where(is_mutau, tau_indices, sel_tau_indices) + # expect 1 muon, 1 veto muon (the same one), 0 veto electrons, and at least one tau + is_mutau = ( + trigger_fired & + (ak.num(muon_indices, axis=1) == 1) & + (ak.num(muon_veto_indices, axis=1) == 1) & + (ak.num(electron_veto_indices, axis=1) == 0) & + (ak.num(tau_indices, axis=1) >= 1) + ) + is_iso = ak.sum(tau_iso_mask, axis=1) >= 1 + # determine the os/ss charge sign relation + mu_charge = ak.firsts(events.Muon[muon_indices].charge, axis=1) + tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) + is_os = mu_charge == -tau_charge + # store global variables + channel_id = update_channel_ids(events, channel_id, ch_mutau.id, is_mutau) + tau2_isolated = ak.where(is_mutau, is_iso, tau2_isolated) + leptons_os = ak.where(is_mutau, is_os, leptons_os) + single_triggered = ak.where(is_mutau & is_single, True, single_triggered) + cross_triggered = ak.where(is_mutau & is_cross, True, cross_triggered) + sel_muon_indices = ak.where(is_mutau, muon_indices, sel_muon_indices) + sel_tau_indices = ak.where(is_mutau, tau_indices, sel_tau_indices) elif trigger.has_tag({"cross_tau_tau", "cross_tau_tau_vbf", "cross_tau_tau_jet"}): - if (self.dataset_inst.is_mc) or ("data_tau_" in self.dataset_inst.name): - # expect 0 veto electrons, 0 veto muons and at least two taus of which one is isolated - is_tautau = ( - trigger_fired & - (ak.num(electron_veto_indices, axis=1) == 0) & - (ak.num(muon_veto_indices, axis=1) == 0) & - (ak.num(tau_indices, axis=1) >= 2) & - (ak.sum(tau_iso_mask, axis=1) >= 1) - ) - - # special case for cross tau vbf trigger: - # to avoid overlap, with non-vbf triggers, only one tau is allowed to have pt > 40 - if trigger.has_tag("cross_tau_tau_vbf"): - is_tautau = is_tautau & (ak.sum(events.Tau[tau_indices].pt > 40, axis=1) <= 1) - - is_iso = ak.sum(tau_iso_mask, axis=1) >= 2 - # tau_indices are sorted by highest isolation as cond. 1 and highest pt as cond. 2, so - # the first two indices are exactly those selected by the full-blown pairing algorithm - # and there is no need here to apply it again :) - # determine the os/ss charge sign relation - tau1_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) - tau2_charge = ak.firsts(events.Tau[tau_indices].charge[..., 1:], axis=1) - is_os = tau1_charge == -tau2_charge - # store global variables - channel_id = new_channel_ids(events, channel_id, ch_tautau.id, is_tautau) - tau2_isolated = ak.where(is_tautau, is_iso, tau2_isolated) - leptons_os = ak.where(is_tautau, is_os, leptons_os) - single_triggered = ak.where(is_tautau & is_single, True, single_triggered) - cross_triggered = ak.where(is_tautau & is_cross, True, cross_triggered) - sel_tau_indices = ak.where(is_tautau, tau_indices, sel_tau_indices) + # expect 0 veto electrons, 0 veto muons and at least two taus of which one is isolated + is_tautau = ( + trigger_fired & + (ak.num(electron_veto_indices, axis=1) == 0) & + (ak.num(muon_veto_indices, axis=1) == 0) & + (ak.num(tau_indices, axis=1) >= 2) & + (ak.sum(tau_iso_mask, axis=1) >= 1) + ) + # special case for cross tau vbf trigger: + # to avoid overlap, with non-vbf triggers, only one tau is allowed to have pt > 40 + if trigger.has_tag("cross_tau_tau_vbf"): + is_tautau = is_tautau & (ak.sum(events.Tau[tau_indices].pt > 40, axis=1) <= 1) + is_iso = ak.sum(tau_iso_mask, axis=1) >= 2 + # tau_indices are sorted by highest isolation as cond. 1 and highest pt as cond. 2, so + # the first two indices are exactly those selected by the full-blown pairing algorithm + # and there is no need here to apply it again :) + # determine the os/ss charge sign relation + tau1_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) + tau2_charge = ak.firsts(events.Tau[tau_indices].charge[..., 1:], axis=1) + is_os = tau1_charge == -tau2_charge + # store global variables + channel_id = update_channel_ids(events, channel_id, ch_tautau.id, is_tautau) + tau2_isolated = ak.where(is_tautau, is_iso, tau2_isolated) + leptons_os = ak.where(is_tautau, is_os, leptons_os) + single_triggered = ak.where(is_tautau & is_single, True, single_triggered) + cross_triggered = ak.where(is_tautau & is_cross, True, cross_triggered) + sel_tau_indices = ak.where(is_tautau, tau_indices, sel_tau_indices) # some final type conversions channel_id = ak.values_astype(channel_id, np.uint8) From 5ee7b03875eb6a21e4eeb6c31f663e7641b649a4 Mon Sep 17 00:00:00 2001 From: Bogdan-Wiederspan <79155113+Bogdan-Wiederspan@users.noreply.github.com> Date: Fri, 15 Nov 2024 10:57:26 +0100 Subject: [PATCH 25/25] Add task to find overlapping events between custom and central NANOAOD (#44) * Add overlap checking task. * add a simple hash function that calculates a unique number using 'event', 'luminisityBlock' and 'run' information of a nano AOD * linting * Completed the task to find overlapping events within our custom Nano AOD and the centralized created NANOAOD. Due to different compressions between our and the central NANOAODs we need to find out which overlap both have and filter them out. This task finds the events that overlap and savin the unique event identifier as tuple in a json. This json also contains the relative number of overlap as information. * make it more clear why the value is padded to this specific value * rearrange order of fields to the one used in the hash * add an assertion to check if unique identifier column exceed a specfic value, which is given by data (and may exceed in far future * moved output of unique ovelap identifier from json to parquet file, use this file to create filter mask in sync csv task * linting * moved type cast to hash function, refactor names * move imports into hash function * linting and add maybe import for numpy * overlapping events should not be chunk depending * add new check so that people will not create an int64 overflow * previous overlapping find compared chunkwise, but chunks are not always of same size. Changed this to a global comparison. * Changed default value of file path to empty string, since None is resolved by law to `/None` * Added case padding handling for array of dim < 2 Some array variables, e.g. like MET.covXX is of type numpy and not ListNumpy, meaning slicing is not possible. * added more variables for the sync * Apply suggestions from code review --------- Co-authored-by: Marcel R. Co-authored-by: Marcel Rieger --- hbt/tasks/sync.py | 179 +++++++++++++++++++++++++++++++++++++++++++++- hbt/util.py | 35 +++++++++ 2 files changed, 211 insertions(+), 3 deletions(-) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py index cabf2dbc..52d008bd 100644 --- a/hbt/tasks/sync.py +++ b/hbt/tasks/sync.py @@ -6,19 +6,108 @@ from __future__ import annotations +import luigi import law -from columnflow.tasks.framework.base import Requirements + +from columnflow.tasks.framework.base import Requirements, DatasetTask from columnflow.tasks.framework.mixins import ( ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin, ) from columnflow.tasks.framework.remote import RemoteWorkflow +from columnflow.tasks.external import GetDatasetLFNs from columnflow.tasks.reduction import ReducedEventsUser from columnflow.tasks.production import ProduceColumns from columnflow.tasks.ml import MLEvaluation from columnflow.util import dev_sandbox from hbt.tasks.base import HBTTask +from hbt.util import hash_events + + +class CheckExternalLFNOverlap( + HBTTask, + DatasetTask, +): + + lfn = luigi.Parameter( + description="local path to an external LFN to check for overlap with the dataset", + # fetched via nanogen's FetchLFN + default="/pnfs/desy.de/cms/tier2/store/user/mrieger/nanogen_store/FetchLFN/store/mc/Run3Summer22NanoAODv12/GluGlutoHHto2B2Tau_kl-1p00_kt-1p00_c2-0p00_LHEweights_TuneCP5_13p6TeV_powheg-pythia8/NANOAODSIM/130X_mcRun3_2022_realistic_v5-v2/50000/992697da-4a10-4435-b63a-413f6d33517e.root", # noqa + ) + + # no versioning required + version = None + + # default sandbox, might be overwritten by calibrator function + sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox")) + + # upstream requirements + reqs = Requirements( + GetDatasetLFNs=GetDatasetLFNs, + ) + + def requires(self): + return self.reqs.GetDatasetLFNs.req(self) + + def output(self): + return { + "overlap": self.target("lfn_overlap.json"), + "unique_identifiers": self.target("unique_identifiers.parquet"), + } + + def run(self): + import awkward as ak + import numpy as np + + # load the index columns of the reference lfn + output = self.output() + + with self.publish_step("loading reference ids"): + ref_hashes = hash_events(self.load_nano_index(law.LocalFileTarget(self.lfn))) + + # loop over all lfns in the dataset + n_files = self.dataset_inst.n_files + lfns_task = self.requires() + relative_overlap = {} + overlapping_identifier = [] + + for i, lfn_target in lfns_task.iter_nano_files(self, lfn_indices=list(range(n_files))): + with self.publish_step(f"loading ids of file {i}"): + file_arr = self.load_nano_index(lfn_target) + file_hashes = hash_events(file_arr) + # find unique hashes in the reference and the file + # faster than np. + overlapping_mask = np.isin( + file_hashes, + ref_hashes, + assume_unique=True, + kind="sort", + ) + + num_overlapping = np.sum(overlapping_mask) + if num_overlapping: + # calculate the relative overlap + relative_overlap[str(i)] = np.sum(num_overlapping) / len(file_hashes) + + # apply mask and store the overlapping identifiers + overlapping_identifier.extend(file_arr[overlapping_mask]) + + output["overlap"].dump( + relative_overlap, + formatter="json", + ) + + output["unique_identifiers"].dump( + ak.Array(overlapping_identifier), + formatter="awkward", + ) + + @classmethod + def load_nano_index(cls, lfn_target: law.FileSystemFileTarget) -> set[int]: + fields = ["event", "luminosityBlock", "run"] + arr = lfn_target.load(formatter="uproot")["Events"].arrays(fields) + return arr class CreateSyncFiles( @@ -31,6 +120,11 @@ class CreateSyncFiles( law.LocalWorkflow, RemoteWorkflow, ): + filter_file = luigi.Parameter( + description="local path to a file containing event unique identifier to filter them out", + default="", + ) + sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox")) # upstream requirements @@ -89,6 +183,7 @@ def output(self): @law.decorator.localize def run(self): import awkward as ak + import numpy as np from columnflow.columnar_util import update_ak_array, EMPTY_FLOAT, set_ak_column # prepare inputs and outputs @@ -115,7 +210,8 @@ def pad_nested(arr, n, *, axis=1): # helper to pad and select the last element on the first inner axis def select(arr, idx): - return replace_empty_float(pad_nested(arr, idx + 1, axis=1)[:, idx]) + padded = pad_nested(arr, idx + 1, axis=-1) + return replace_empty_float(padded if arr.ndim == 1 else padded[:, idx]) # helper to select leptons def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> ak.Array: @@ -131,11 +227,19 @@ def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> a return set_ak_column(events, "Lepton", ak.concatenate(leptons, axis=1)) # event chunk loop + + # optional filter to get only the events that overlap with given external LFN + if self.filter_file: + with self.publish_step("loading reference ids"): + events_to_filter = law.LocalFileTarget(self.filter_file).load(formatter="awkward") + filter_events = hash_events(events_to_filter) + for (events, *columns), pos in self.iter_chunked_io( files, source_type=len(files) * ["awkward_parquet"], pool_size=1, ): + # optional check for overlapping inputs if self.check_overlapping_inputs: self.raise_if_overlapping([events] + list(columns)) @@ -143,6 +247,19 @@ def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> a # add additional columns events = update_ak_array(events, *columns) + # apply mask if optional filter is given + # calculate mask by using 1D hash values + if self.filter_file: + mask = np.isin( + hash_events(events), + filter_events, + assume_unique=True, + kind="sort", + ) + + # apply mask + events = events[mask] + # insert leptons events = select_leptons(events, {"rawDeepTau2018v2p5VSjet": empty_float}) @@ -173,7 +290,63 @@ def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> a "lep2_eta": select(events.Lepton.eta, 1), "lep2_charge": select(events.Lepton.charge, 1), "lep2_deeptauvsjet": select(events.Lepton.rawDeepTau2018v2p5VSjet, 1), - # TODO: add additional variables + "electron1_charge": select(events.Electron.charge, 0), + "electron1_eta": select(events.Electron.eta, 0), + "electron1_mass": select(events.Electron.mass, 0), + "electron1_phi": select(events.Electron.phi, 0), + "electron1_pt": select(events.Electron.pt, 0), + "electron2_charge": select(events.Electron.charge, 1), + "electron2_eta": select(events.Electron.eta, 1), + "electron2_mass": select(events.Electron.mass, 1), + "electron2_phi": select(events.Electron.phi, 1), + "electron2_pt": select(events.Electron.pt, 1), + + "muon1_charge": select(events.Muon.charge, 0), + "muon1_eta": select(events.Muon.eta, 0), + "muon1_mass": select(events.Muon.mass, 0), + "muon1_phi": select(events.Muon.phi, 0), + "muon1_pt": select(events.Muon.pt, 0), + "muon2_charge": select(events.Muon.charge, 1), + "muon2_eta": select(events.Muon.eta, 1), + "muon2_mass": select(events.Muon.mass, 1), + "muon2_phi": select(events.Muon.phi, 1), + "muon2_pt": select(events.Muon.pt, 1), + + "tau1_charge": select(events.Tau.charge, 0), + "tau1_eta": select(events.Tau.eta, 0), + "tau1_mass": select(events.Tau.mass, 0), + "tau1_phi": select(events.Tau.phi, 0), + "tau1_pt": select(events.Tau.pt, 0), + "tau2_charge": select(events.Tau.charge, 1), + "tau2_eta": select(events.Tau.eta, 1), + "tau2_mass": select(events.Tau.mass, 1), + "tau2_phi": select(events.Tau.phi, 1), + "tau2_pt": select(events.Tau.pt, 1), + + "met1_covXX": select(events.MET.covXX, 0), + "met1_covXY": select(events.MET.covXY, 0), + "met1_covYY": select(events.MET.covYY, 0), + "met1_phi": select(events.MET.phi, 0), + "met1_pt": select(events.MET.pt, 0), + "met1_significance": select(events.MET.significance, 0), + + "fatjet1_eta": select(events.FatJet.eta, 0), + "fatjet1_mass": select(events.FatJet.mass, 0), + "fatjet1_phi": select(events.FatJet.phi, 0), + "fatjet1_pt": select(events.FatJet.pt, 0), + "fatjet1_tau1": select(events.FatJet.tau1, 0), + "fatjet1_tau2": select(events.FatJet.tau2, 0), + "fatjet1_tau3": select(events.FatJet.tau3, 0), + "fatjet1_tau4": select(events.FatJet.tau4, 0), + + "fatjet2_eta": select(events.FatJet.eta, 1), + "fatjet2_mass": select(events.FatJet.mass, 1), + "fatjet2_phi": select(events.FatJet.phi, 1), + "fatjet2_pt": select(events.FatJet.pt, 1), + "fatjet2_tau1": select(events.FatJet.tau1, 1), + "fatjet2_tau2": select(events.FatJet.tau2, 1), + "fatjet2_tau3": select(events.FatJet.tau3, 1), + "fatjet2_tau4": select(events.FatJet.tau4, 1), }) # save as csv in output, append if necessary diff --git a/hbt/util.py b/hbt/util.py index fa4a710a..5b50c02a 100644 --- a/hbt/util.py +++ b/hbt/util.py @@ -10,6 +10,9 @@ from columnflow.types import Any from columnflow.columnar_util import ArrayFunction, deferred_column +from columnflow.util import maybe_import + +np = maybe_import("numpy") @deferred_column @@ -57,3 +60,35 @@ def IF_DATASET_IS_DY( return self.get() return self.get() if func.dataset_inst.has_tag("is_dy") else None + + +def hash_events(arr: np.ndarray) -> np.ndarray: + """ + Helper function to create a hash value from the event, run and luminosityBlock columns. + The values are padded to specific lengths and concatenated to a single integer. + """ + import awkward as ak + + def assert_value(arr: np.ndarray, field: str, max_value: int) -> None: + """ + Helper function to check if a column does not exceed a maximum value. + """ + digits = len(str(arr[field].to_numpy().max())) + assert digits <= max_value, f"{field} digit count is {digits} and exceed max value {max_value}" + + max_digits_run = 6 + max_digits_luminosityBlock = 5 + max_digits_event = 8 + assert_value(arr, "run", max_digits_run) + assert_value(arr, "luminosityBlock", max_digits_luminosityBlock) + assert_value(arr, "event", max_digits_event) + + max_digits_hash = max_digits_event + max_digits_luminosityBlock + max_digits_run + assert max_digits_hash <= 19, "sum of digits exceeds int64" + + # upcast to int64 to avoid overflow + return ( + ak.values_astype(arr.event, np.int64) * 10**(max_digits_luminosityBlock + max_digits_run) + + ak.values_astype(arr.luminosityBlock, np.int64) * 10**max_digits_run + + ak.values_astype(arr.run, np.int64) + )