diff --git a/hbt/__init__.py b/hbt/__init__.py index 78ef9c08..9e393f0c 100644 --- a/hbt/__init__.py +++ b/hbt/__init__.py @@ -1,8 +1,11 @@ # coding: utf-8 +import law from hbt.columnflow_patches import patch_all +law.contrib.load("pandas") + # apply cf patches once patch_all() diff --git a/hbt/calibration/default.py b/hbt/calibration/default.py index 98ed60f1..6f1b5678 100644 --- a/hbt/calibration/default.py +++ b/hbt/calibration/default.py @@ -20,14 +20,17 @@ # derive calibrators to add settings jec_full = jec.derive("jec_full", cls_dict={"mc_only": True, "nominal_only": True}) +# version of jer that uses the first random number from deterministic_seeds +deterministic_jer = jer.derive("deterministic_jer", cls_dict={"deterministic_seed_index": 0}) + @calibrator( uses={ - mc_weight, jec_nominal, jec_full, jer, tec_nominal, tec, deterministic_seeds, + mc_weight, deterministic_seeds, jec_nominal, jec_full, deterministic_jer, tec_nominal, tec, IF_RUN_2(met_phi), }, produces={ - mc_weight, jec_nominal, jec_full, jer, tec_nominal, tec, deterministic_seeds, + mc_weight, deterministic_seeds, jec_nominal, jec_full, deterministic_jer, tec_nominal, tec, IF_RUN_2(met_phi), }, ) @@ -40,7 +43,7 @@ def default(self: Calibrator, events: ak.Array, **kwargs) -> ak.Array: events = self[jec_nominal](events, **kwargs) else: events = self[jec_full](events, **kwargs) - events = self[jer](events, **kwargs) + events = self[deterministic_jer](events, **kwargs) if self.config_inst.campaign.x.run == 2: events = self[met_phi](events, **kwargs) diff --git a/hbt/config/analysis_hbt.py b/hbt/config/analysis_hbt.py index 31687532..e8107512 100644 --- a/hbt/config/analysis_hbt.py +++ b/hbt/config/analysis_hbt.py @@ -87,38 +87,67 @@ def factory(configs: od.UniqueObjectIndex): # Run 2 configs # -# 2016 HIPM (also known as APV or preVFP) -add_lazy_config( - campaign_module="cmsdb.campaigns.run2_2016_HIPM_nano_uhh_v12", - campaign_attr="campaign_run2_2016_HIPM_nano_uhh_v12", - config_name="run2_2016_HIPM_nano_uhh_v12", - config_id=1, -) +# 2016 HIPM (also known as APV or preVFP), TODO: campaign needs consistency and content check +# add_lazy_config( +# campaign_module="cmsdb.campaigns.run2_2016_HIPM_nano_uhh_v12", +# campaign_attr="campaign_run2_2016_HIPM_nano_uhh_v12", +# config_name="run2_2016_HIPM_nano_uhh_v12", +# config_id=1, +# ) + +# 2016 (also known postVFP), TODO: campaign needs consistency and content check +# add_lazy_config( +# campaign_module="cmsdb.campaigns.run2_2016_nano_uhh_v12", +# campaign_attr="campaign_run2_2016_nano_uhh_v12", +# config_name="run2_2016_nano_uhh_v12", +# config_id=2, +# ) + +# 2017, old nano version, TODO: needs re-processing +# add_lazy_config( +# campaign_module="cmsdb.campaigns.run2_2017_nano_uhh_v11", +# campaign_attr="campaign_run2_2017_nano_uhh_v11", +# config_name="run2_2017_nano_uhh_v11", +# config_id=3, +# ) + +# 2018, TODO: not processed yet +# add_lazy_config( +# campaign_module="cmsdb.campaigns.run2_2018_nano_uhh_v12", +# campaign_attr="campaign_run2_2018_nano_uhh_v12", +# config_name="run2_2018_nano_uhh_v12", +# config_id=4, +# ) + +# +# Run 3 configs +# -# 2016 (also known postVFP) +# 2022, preEE add_lazy_config( - campaign_module="cmsdb.campaigns.run2_2016_nano_uhh_v12", - campaign_attr="campaign_run2_2016_nano_uhh_v12", - config_name="run2_2016_nano_uhh_v12", - config_id=2, + campaign_module="cmsdb.campaigns.run3_2022_preEE_nano_uhh_v12", + campaign_attr="campaign_run3_2022_preEE_nano_uhh_v12", + config_name="run3_2022_preEE", + config_id=5, ) -# 2017 +# 2022, postEE add_lazy_config( - campaign_module="cmsdb.campaigns.run2_2017_nano_uhh_v11", - campaign_attr="campaign_run2_2017_nano_uhh_v11", - config_name="run2_2017_nano_uhh_v11", - config_id=3, + campaign_module="cmsdb.campaigns.run3_2022_postEE_nano_uhh_v12", + campaign_attr="campaign_run3_2022_postEE_nano_uhh_v12", + config_name="run3_2022_postEE", + config_id=6, ) # -# Run 3 configs +# sync configs # # 2022, preEE add_lazy_config( - campaign_module="cmsdb.campaigns.run3_2022_preEE_nano_uhh_v12", - campaign_attr="campaign_run3_2022_preEE_nano_uhh_v12", - config_name="run3_2022_preEE", - config_id=5, + campaign_module="cmsdb.campaigns.run3_2022_preEE_nano_v12", + campaign_attr="campaign_run3_2022_preEE_nano_v12", + config_name="run3_2022_preEE_sync", + config_id=5001, + sync_mode=True, ) diff --git a/hbt/config/configs_hbt.py b/hbt/config/configs_hbt.py index 8f8a782a..1ff39f45 100644 --- a/hbt/config/configs_hbt.py +++ b/hbt/config/configs_hbt.py @@ -33,6 +33,7 @@ def add_config( config_name: str | None = None, config_id: int | None = None, limit_dataset_files: int | None = None, + sync_mode: bool = False, ) -> od.Config: # gather campaign data run = campaign.x.run @@ -47,7 +48,14 @@ def add_config( procs = get_root_processes_from_campaign(campaign) # create a config by passing the campaign, so id and name will be identical - cfg = od.Config(name=config_name, id=config_id, campaign=campaign) + cfg = od.Config( + name=config_name, + id=config_id, + campaign=campaign, + aux={ + "sync": sync_mode, + }, + ) ################################################################################################ # helpers @@ -56,45 +64,48 @@ def add_config( # helper to enable processes / datasets only for a specific era def if_era( *, - run: int | None = None, - year: int | None = None, - postfix: str | None = None, - tag: str | None = None, - values: list[str] | None = None, + run: int | set[int] | None = None, + year: int | set[int] | None = None, + postfix: str | set[int] | None = None, + tag: str | set[str] | None = None, + values: list[str | None] | None = None, + sync: bool = False, ) -> list[str]: match = ( - (run is None or campaign.x.run == run) and - (year is None or campaign.x.year == year) and - (postfix is None or campaign.x.postfix == postfix) and - (tag is None or campaign.has_tag(tag)) + (run is None or campaign.x.run in law.util.make_set(run)) and + (year is None or campaign.x.year in law.util.make_set(year)) and + (postfix is None or campaign.x.postfix in law.util.make_set(postfix)) and + (tag is None or campaign.has_tag(tag, mode=any)) and + (sync is sync_mode) ) - return (values or []) if match else [] + return list(filter(bool, values or [])) if match else [] ################################################################################################ # processes ################################################################################################ # add custom processes - cfg.add_process( - name="v", - id=7997, - label="W/Z", - processes=[procs.n.w, procs.n.z], - ) - cfg.add_process( - name="multiboson", - id=7998, - label="Multiboson", - processes=[procs.n.vv, procs.n.vvv], - ) - cfg.add_process( - name="tt_multiboson", - id=7999, - label=r"$t\bar{t}$ + Multiboson", - processes=[procs.n.ttv, procs.n.ttvv], - ) + if not sync_mode: + cfg.add_process( + name="v", + id=7997, + label="W/Z", + processes=[procs.n.w, procs.n.z], + ) + cfg.add_process( + name="multiboson", + id=7998, + label="Multiboson", + processes=[procs.n.vv, procs.n.vvv], + ) + cfg.add_process( + name="tt_multiboson", + id=7999, + label=r"$t\bar{t}$ + Multiboson", + processes=[procs.n.ttv, procs.n.ttvv], + ) - # add processes we are interested in + # processes we are interested in process_names = [ "data", "tt", @@ -125,9 +136,10 @@ def if_era( "hh_vbf_hbb_htt_kvm1p83_k2v3p57_klm3p39", "hh_vbf_hbb_htt_kvm2p12_k2v3p87_klm5p96", ]), + "radion_hh_ggf_hbb_htt_m450", + "radion_hh_ggf_hbb_htt_m1200", "graviton_hh_ggf_hbb_htt_m450", "graviton_hh_ggf_hbb_htt_m1200", - "radion_hh_ggf_hbb_htt_m700", ] for process_name in process_names: if process_name in procs: @@ -147,6 +159,10 @@ def if_era( if process_name.startswith(("graviton_hh_", "radion_hh_")): proc.add_tag("signal") proc.add_tag("resonant_signal") + if process_name.startswith("tt_"): + proc.add_tag({"ttbar", "tt"}) + if process_name.startswith("dy_"): + proc.add_tag("dy") # add the process cfg.add_process(proc) @@ -168,6 +184,7 @@ def if_era( "hh_ggf_hbb_htt_kl0_kt1_powheg", "hh_ggf_hbb_htt_kl2p45_kt1_powheg", "hh_ggf_hbb_htt_kl5_kt1_powheg", + # vbf "hh_vbf_hbb_htt_kv1_k2v1_kl1_madgraph", "hh_vbf_hbb_htt_kv1_k2v1_kl2_madgraph", @@ -181,24 +198,22 @@ def if_era( "hh_vbf_hbb_htt_kvm1p6_k2v2p72_klm1p36_madgraph", "hh_vbf_hbb_htt_kvm1p83_k2v3p57_klm3p39_madgraph", "hh_vbf_hbb_htt_kvm2p12_k2v3p87_klm5p96_madgraph", + # some resonances + "radion_hh_ggf_hbb_htt_m450_madgraph", + "radion_hh_ggf_hbb_htt_m1200_madgraph", "graviton_hh_ggf_hbb_htt_m450_madgraph", "graviton_hh_ggf_hbb_htt_m1200_madgraph", - "radion_hh_ggf_hbb_htt_m700_madgraph", ]), # backgrounds - "tt_sl_powheg", - "tt_dl_powheg", - "tt_fh_powheg", *if_era(run=3, year=2022, values=[ - "ttw_wlnu_amcatnlo", - "ttz_zqq_amcatnlo", - "ttz_zll_m4to50_amcatnlo", - "ttz_zll_m50toinf_amcatnlo", - "ttzz_madgraph", - "ttww_madgraph", - # "ttwz_madgraph", # not available yet + # ttbar + "tt_sl_powheg", + "tt_dl_powheg", + "tt_fh_powheg", + + # single top "st_tchannel_t_4f_powheg", "st_tchannel_tbar_4f_powheg", "st_twchannel_t_sl_powheg", @@ -207,8 +222,23 @@ def if_era( "st_twchannel_tbar_dl_powheg", "st_twchannel_t_fh_powheg", "st_twchannel_tbar_fh_powheg", - # "st_schannel_t_lep_4f_amcatnlo", # no cross section yet - # "st_schannel_tbar_lep_4f_amcatnlo", # no cross section yet + "st_schannel_t_lep_4f_amcatnlo", + "st_schannel_tbar_lep_4f_amcatnlo", + + # tt + v + "ttw_wlnu_amcatnlo", + "ttz_zqq_amcatnlo", + "ttz_zll_m4to50_amcatnlo", + "ttz_zll_m50toinf_amcatnlo", + + # tt + vv + "ttww_madgraph", + *if_era(run=3, year=2022, tag="postEE", values=[ + "ttwz_madgraph", # exists for post, but not for pre + ]), + "ttzz_madgraph", + + # dy "dy_m4to10_amcatnlo", "dy_m10to50_amcatnlo", "dy_m50toinf_amcatnlo", @@ -225,31 +255,62 @@ def if_era( "dy_m50toinf_2j_pt200to400_amcatnlo", "dy_m50toinf_2j_pt400to600_amcatnlo", "dy_m50toinf_2j_pt600toinf_amcatnlo", + + # w + jets "w_lnu_amcatnlo", - "z_qq_pt100to200_1j_amcatnlo", - "z_qq_pt100to200_2j_amcatnlo", - "z_qq_pt200to400_1j_amcatnlo", - "z_qq_pt200to400_2j_amcatnlo", # literally no events selected above 400 GeV + "w_lnu_0j_amcatnlo", + "w_lnu_1j_amcatnlo", + "w_lnu_2j_amcatnlo", + "w_lnu_1j_pt40to100_amcatnlo", + "w_lnu_1j_pt100to200_amcatnlo", + "w_lnu_1j_pt200to400_amcatnlo", + "w_lnu_1j_pt400to600_amcatnlo", + "w_lnu_1j_pt600toinf_amcatnlo", + "w_lnu_2j_pt40to100_amcatnlo", + "w_lnu_2j_pt100to200_amcatnlo", + "w_lnu_2j_pt200to400_amcatnlo", + "w_lnu_2j_pt400to600_amcatnlo", + "w_lnu_2j_pt600toinf_amcatnlo", + + # z + jets (not DY but qq) + # decided to drop z_qq for now as their contribution is negligible, + # but we should check that again at a much later stage + # "z_qq_1j_pt100to200_amcatnlo", + # "z_qq_1j_pt200to400_amcatnlo", + # "z_qq_1j_pt400to600_amcatnlo", + # "z_qq_1j_pt600toinf_amcatnlo", + # "z_qq_2j_pt100to200_amcatnlo", + # "z_qq_2j_pt200to400_amcatnlo", + # "z_qq_2j_pt400to600_amcatnlo", + # "z_qq_2j_pt600toinf_amcatnlo", + + # vv "zz_pythia", "wz_pythia", "ww_pythia", + + # vvv "zzz_amcatnlo", "wzz_amcatnlo", "wwz_4f_amcatnlo", "www_4f_amcatnlo", + + # single H "h_ggf_htt_powheg", "h_vbf_htt_powheg", "vh_hnonbb_amcatnlo", + "wmh_wlnu_hbb_powheg", + "wph_wlnu_hbb_powheg", + "wph_htt_powheg", + "wmh_htt_powheg", + "wph_wqq_hbb_powheg", + "wmh_wqq_hbb_powheg", "zh_zll_hbb_powheg", "zh_zqq_hbb_powheg", "zh_htt_powheg", - "wph_htt_powheg", - "wmh_htt_powheg", - "wph_wlnu_hbb_powheg", - "wmh_wlnu_hbb_powheg", "zh_gg_zll_hbb_powheg", - "zh_gg_znunu_hbb_powheg", "zh_gg_zqq_hbb_powheg", + "zh_gg_znunu_hbb_powheg", "tth_hbb_powheg", "tth_hnonbb_powheg", ]), @@ -258,33 +319,55 @@ def if_era( *if_era(run=3, year=2022, tag="preEE", values=[ f"data_{stream}_{period}" for stream in ["mu", "e", "tau", "met"] for period in "cd" ]), + *if_era(run=3, year=2022, tag="postEE", values=[ + f"data_{stream}_{period}" for stream in ["mu", "e", "tau", "met"] for period in "efg" + ]), + + # sync + *if_era(run=3, year=2022, tag="preEE", sync=True, values=[ + "hh_ggf_hbb_htt_kl1_kt1_powheg", + ]), ] for dataset_name in dataset_names: # add the dataset dataset = cfg.add_dataset(campaign.get_dataset(dataset_name)) # add tags to datasets - if dataset.name.startswith("tt"): - dataset.add_tag(("has_top", "is_ttbar")) - elif dataset.name.startswith("st"): - dataset.add_tag(("has_top", "is_single_top")) - if dataset.name.startswith("dy"): - dataset.add_tag("is_dy") + if dataset.name.startswith("tt_"): + dataset.add_tag({"has_top", "ttbar", "tt"}) + if dataset.name.startswith("st_"): + dataset.add_tag({"has_top", "single_top", "st"}) + if dataset.name.startswith("dy_"): + dataset.add_tag("dy") if re.match(r"^(ww|wz|zz)_.*pythia$", dataset.name): dataset.add_tag("no_lhe_weights") if dataset_name.startswith("hh_"): dataset.add_tag("signal") dataset.add_tag("nonresonant_signal") + if dataset_name.startswith("hh_ggf_"): + dataset.add_tag("ggf") + elif dataset_name.startswith("hh_vbf_"): + dataset.add_tag("vbf") if dataset_name.startswith(("graviton_hh_", "radion_hh_")): dataset.add_tag("signal") dataset.add_tag("resonant_signal") + if dataset_name.startswith(("graviton_hh_ggf_", "radion_hh_ggf")): + dataset.add_tag("ggf") + elif dataset_name.startswith(("graviton_hh_vbf_", "radion_hh_vbf")): + dataset.add_tag("vbf") # apply an optional limit on the number of files if limit_dataset_files: for info in dataset.info.values(): info.n_files = min(info.n_files, limit_dataset_files) - # verify that the root process of all datasets is part of any of the registered processes + # apply synchronization settings + if sync_mode: + # only first file per + for info in dataset.info.values(): + info.n_files = 1 + + # verify that the root process of each dataset is part of any of the registered processes verify_config_processes(cfg, warn=True) ################################################################################################ @@ -344,8 +427,9 @@ def if_era( "sm_data": ["data"] + sm_group, } - # define inclusive datasets for the dy process identification with corresponding leaf processes - if run == 3: + # define inclusive datasets for the stitched process identification with corresponding leaf processes + if run == 3 and not sync_mode: + # drell-yan cfg.x.dy_stitching = { "m50toinf": { "inclusive_dataset": cfg.datasets.n.dy_m50toinf_amcatnlo, @@ -361,6 +445,8 @@ def if_era( ], }, } + # w+jets + # TODO: add # dataset groups for conveniently looping over certain datasets # (used in wrapper_factory and during plotting) @@ -1122,7 +1208,7 @@ def add_external(name, value): cfg.x.get_dataset_lfns_sandbox = None # whether to validate the number of obtained LFNs in GetDatasetLFNs - cfg.x.validate_dataset_lfns = limit_dataset_files is None + cfg.x.validate_dataset_lfns = limit_dataset_files is None and not sync_mode # custom lfn retrieval method in case the underlying campaign is custom uhh if cfg.campaign.x("custom", {}).get("creator") == "uhh": diff --git a/hbt/config/hist_hooks.py b/hbt/config/hist_hooks.py index 1b7cc9f6..9efccbed 100644 --- a/hbt/config/hist_hooks.py +++ b/hbt/config/hist_hooks.py @@ -188,6 +188,290 @@ def qcd_estimation(task, hists): return hists + def flat_s(task, hists: dict[od.Process, hist.Histogram]) -> dict[od.Process, hist.Histogram]: + """Rebinnig of the histograms in *hists* to archieve a flat-signal distribution. + + :param task: task instance that contains the process informations + :param hists: A dictionary of histograms using Process instances as keys + + :raises RuntimeError: If the wanted number of bins is reached and the initial + bin edge is not minimal. + :raises Exception: If the number of actual bins ended up larger than requested. + :return: A dictionary of histograms using Process instances as keys + """ + def find_edges(signal_histogram, background_histograms, variable, n_bins=10) -> tuple[np.ndarray, np.ndarray]: + """ + Determine new bin edges that result in a flat signal distribution. + The edges are determined by the signal distribution, while the background distribution + is used to ensure that the background yield in each bin is sufficient. + + :param signal_histogram: The histogram that describes the signal distribution. + :param background_histograms: A dictionary of histograms using the process as key + that describe the background distribution. + :param variable: The variable name that is rebinned. + :param n_bins: The number of bins that the signal distribution should be rebinned to. + + :return: A tuple containing the new bin edges and the indices that define the new bin edges. + """ + def get_integral(cumulative_weights, stop, offset=0): + """ + Helper to calculate the integral of *cumulative_weights* between the *offset* (included) + and the *stop* index (not included) + + :param cumulative_weights: The cumulative weights array. + :param stop: The index up to which the integral is calculated. + :param offset: The index from which the integral is calculated. + + :return The integral of the weights between the given indices. + """ + return cumulative_weights[stop - 1] - (0 if offset == 0 else cumulative_weights[offset - 1]) + + def prepare_background(histogram: hist.Histogram) -> tuple[np.ndarray, np.ndarray, np.ndarray]: # noqa + """ + Helper to extract information from background histograms. + + :param histogram: A histogram that describes the background distribution. + + :return: A tuple containing the array that describe bin yield, + the number of equivalent bins and the cumulative bin yield. + """ + bin_yield = histogram.counts() + # y^2 / sigma^2, where y is the yield, sigma is the uncertainty + # these are the number of events with weight 1 and same statistical fluctuation + number_of_equivalent_bins = bin_yield**2 / histogram.variances() + bin_yield = np.flip(bin_yield, axis=-1) + cumulative_bin_yield = np.cumsum(bin_yield, axis=0) + return ( + bin_yield, + np.flip(number_of_equivalent_bins, axis=-1), + cumulative_bin_yield, + ) + # prepare parameters + low_edge, max_edge = 0, 1 + bin_edges = [max_edge] + indices_gathering = [0] + + # bookkeep reasons for stopping binning + stop_reason = "" + # accumulated signal yield up to the current index + y_already_binned = 0.0 + y_min = 1.0e-5 + # during binning, do not remove leading entries + # instead remember the index that denotes the start of the bin + offset = 0 + + # prepare signal + # fine binned histograms bin centers are approx equivalent to dnn output + # flip arrays to start from the right + dnn_score_signal = np.flip(signal_histogram.axes[variable].centers, axis=-1) + y_signal = np.flip(signal_histogram.counts(), axis=-1) + + # calculate cumulative of reversed signal yield and yield per bin + cumulu_y_signal = np.cumsum(y_signal, axis=0) + full_cum = cumulu_y_signal[-1] + y_per_bin = full_cum / n_bins + num_events = len(cumulu_y_signal) + + # prepare background + + for process, histogram in background_histograms.items(): + if process.name == "ttbar": + tt_y, tt_num_eq, cumulu_tt_y = prepare_background(histogram) + elif process.name == "dy": + dy_y, dy_num_eq, cumulu_dy_y = prepare_background(histogram) + + # start binning + while len(bin_edges) < n_bins: + # stopping condition 1: reached end of events + if offset >= num_events: + stop_reason = "no more events left" + break + # stopping condition 2: remaining signal yield too small + # this would lead to a bin complelty filled with background + y_remaining = full_cum - y_already_binned + if y_remaining < y_min: + stop_reason = "remaining signal yield insufficient" + break + # find the index of the event that would result in a signal yield increase of more + # than the expected per-bin yield; + # this index would mark the start of the next bin given all constraints are met + if y_remaining >= y_per_bin: + threshold = y_already_binned + y_per_bin + # get indices of array of values above threshold + # first entry defines the next bin edge + # shift next idx by offset + next_idx = offset + np.where(cumulu_y_signal[offset:] > threshold)[0][0] + else: + # special case: remaining signal yield smaller than the expected per-bin yield, + # so find the last event + next_idx = offset + np.where(cumulu_y_signal[offset:])[0][-1] + 1 + + # Background constraints + while next_idx < num_events: + # get the number of monte carlo tt and dy events + tt_num_events = get_integral(tt_num_eq, next_idx, offset) + dy_num_events = get_integral(tt_num_eq, next_idx, offset) + tt_yield = get_integral(cumulu_tt_y, next_idx, offset) + dy_yield = get_integral(cumulu_dy_y, next_idx, offset) + + # evaluate constraints + # TODO: potentially relax constraints here, e.g when there are 3 (4?) tt events, drop the constraint + # on dy, and vice-versa + constraints_met = ( + # have atleast 1 tt, 1 dy and atleast 4 background events + # scale by lumi ratio to be more fair to the smaller dataset + tt_num_events >= 1 and + dy_num_events >= 1 and + tt_num_events + dy_num_events >= 4 and + + # yields must be positive to avoid negative sums of weights per process + tt_yield > 0 and + dy_yield > 0 + ) + if constraints_met: + # TODO: maybe also check if the background conditions are just barely met and advance next_idx + # to the middle between the current value and the next one that would change anything about the + # background predictions; this might be more stable as the current implementation can highly + # depend on the exact value of a single event (the one that tips the constraints over the edge + # to fulfillment) + # bin found, stop + break + + # constraints not met, advance index to include the next tt or dy event and try again + next_idx += 1 + else: + # stopping condition 3: no more events left, so the last bin (most left one) does not fullfill + # constraints; however, this should practically never happen + stop_reason = "no more events left while trying to fulfill constraints" + break + + # next_idx found, update values + # get next edge or set to low edge if end is reached + if next_idx == num_events: + edge_value = low_edge + else: + # calculate bin center as new edge + edge_value = float(dnn_score_signal[next_idx - 1:next_idx + 1].mean()) + # prevent out of bounds values and push them to the boundaries + bin_edges.append(max(min(edge_value, max_edge), low_edge)) + + y_already_binned += get_integral(cumulu_y_signal, next_idx, offset) + offset = next_idx + indices_gathering.append(next_idx) + + # make sure the lower dnn_output (max events) is included + if bin_edges[-1] != low_edge: + if len(bin_edges) > n_bins: + raise RuntimeError( + "number of bins reached and initial bin edge" + f" is not minimal bin edge (edges: {bin_edges})", + ) + bin_edges.append(low_edge) + indices_gathering.append(num_events) + + # some debugging output + n_bins_actual = len(bin_edges) - 1 + if n_bins_actual > n_bins: + raise Exception("number of actual bins ended up larger than requested (implementation bug)") + if n_bins_actual < n_bins: + print( + f" started from {num_events} bins, targeted {n_bins} but ended at {n_bins_actual} bins\n" + f" -> reason: {stop_reason or 'NO REASON!?'}", + ) + n_bins = n_bins_actual + + # flip indices to the right order + indices_gathering = (np.flip(indices_gathering) - num_events) * -1 + return np.flip(np.array(bin_edges), axis=-1), indices_gathering + + def apply_edges(h: hist.Hist, edges: np.ndarray, indices: np.ndarray, variable: str) -> hist.Hist: + """ + Rebin the content axes determined by *variable* of a given hist histogram *h* to + given *edges* and their *indices*. + The rebinned histogram is returned. + + :param h: hist Histogram that is to be rebinned + :param edges: a array of ascending bin edges + :param indices: a array of indices that define the new bin edges + :param variable: variable name that is rebinned + + :return: rebinned hist histogram + """ + # sort edges and indices, by default they are sorted + ascending_order = np.argsort(edges) + edges, indices = edges[ascending_order], indices[ascending_order] + + # create new hist and add axes with coresponding edges + # define new axes, from old histogram and rebinned variable with new axis + axes = ( + [h.axes[axis] for axis in h.axes.name if axis not in variable] + + [hist.axis.Variable(edges, name=variable, label=f"{variable}-flat-s")] + ) + + new_hist = hist.Hist(*axes, storage=hist.storage.Weight()) + + # slice the old histogram storage view with new edges + # sum over sliced bin contents to get rebinned content + slices = [slice(int(indices[index]), int(indices[index + 1])) for index in range(0, len(indices) - 1)] + slice_array = [np.sum(h.view()[..., _slice], axis=-1, keepdims=True) for _slice in slices] + # concatenate the slices to get the new bin content + # store in new histogram storage view + np.concatenate(slice_array, axis=-1, out=new_hist.view()) + + return new_hist + + import hist + n_bins = 10 + # find signal histogram for which you will optimize, only 1 signal process is allowed + background_hists = {} + for process, histogram in hists.items(): + if process.has_tag("signal"): + signal_proc = process + signal_hist = histogram + else: + background_hists[process] = histogram + + if not signal_proc: + logger.warning("could not find any signal process, return hist unchanged") + return hists + + # 1. preparation + # get the leaf categories (e.g. {etau,mutau}__os__iso) + leaf_cats = task.config_inst.get_category(task.branch_data.category).get_leaf_categories() + + # sum over different leaf categories + cat_ids_locations = [hist.loc(category.id) for category in leaf_cats] + combined_signal_hist = signal_hist[{"category": cat_ids_locations}] + combined_signal_hist = combined_signal_hist[{"category": sum}] + # remove shift axis, since its always nominal + combined_signal_hist = combined_signal_hist[{"shift": hist.loc(0)}] + + # same for background + for process, histogram in background_hists.items(): + combined_background_hist = histogram[{"category": cat_ids_locations}] + combined_background_hist = combined_background_hist[{"category": sum}] + combined_background_hist = combined_background_hist[{"shift": hist.loc(0)}] + background_hists[process] = combined_background_hist + + # 2. determine bin edges + flat_s_edges, flat_s_indices = find_edges( + signal_histogram=combined_signal_hist, + variable=task.variables[0], + background_histograms=background_hists, + n_bins=n_bins, + ) + + # 3. apply to hists + for process, histogram in hists.items(): + hists[process] = apply_edges( + histogram, + flat_s_edges, + flat_s_indices, + task.variables[0], + ) + + return hists + def general_higgs_morphing( task, hists, @@ -742,6 +1026,7 @@ def LO_primitive_funtion(values, a, b, c, d, e, f): config.x.hist_hooks = { "qcd": qcd_estimation, + "flat_s": flat_s, "hh_ggf_exact_morphing_kl0_kt1": partial( general_higgs_morphing, diff --git a/hbt/config/variables.py b/hbt/config/variables.py index b75626d5..1f4e83ff 100644 --- a/hbt/config/variables.py +++ b/hbt/config/variables.py @@ -264,3 +264,10 @@ def add_variables(config: od.Config) -> None: binning=(25, 0.0, 1.0), x_title=rf"{proc.upper()} output node, res. DNN", ) + + config.add_variable( + name=f"res_dnn_{proc}_fine", + expression=f"res_dnn_{proc}", + binning=(5000, 0.0, 1.0), + x_title=rf"{proc.upper()} output node, res. DNN", + ) diff --git a/hbt/selection/jet.py b/hbt/selection/jet.py index 8fa9ed4d..0317723a 100644 --- a/hbt/selection/jet.py +++ b/hbt/selection/jet.py @@ -82,7 +82,7 @@ def jet_selection( hhbtag_scores = self[hhbtag](events, default_mask, lepton_results.x.lepton_pair, **kwargs) score_indices = ak.argsort(hhbtag_scores, axis=1, ascending=False) # pad the indices to simplify creating the hhbjet mask - padded_hhbjet_indices = ak.pad_none(score_indices, 2, axis=1)[..., :2][..., :2] + padded_hhbjet_indices = ak.pad_none(score_indices, 2, axis=1)[..., :2] hhbjet_mask = ((li == padded_hhbjet_indices[..., [0]]) | (li == padded_hhbjet_indices[..., [1]])) # get indices for actual book keeping only for events with both lepton candidates and where at # least two jets pass the default mask (bjet candidates) @@ -116,6 +116,7 @@ def jet_selection( cross_vbf_mask = ak.full_like(1 * events.event, False, dtype=bool) else: cross_vbf_masks = [events.trigger_ids == tid for tid in cross_vbf_ids] + # This combines "at least one cross trigger is fired" and "no other triggers are fired" cross_vbf_mask = ak.all(reduce(or_, cross_vbf_masks), axis=1) vbf_pair_mask = vbf_pair_mask & ( (~cross_vbf_mask) | ( diff --git a/hbt/selection/lepton.py b/hbt/selection/lepton.py index 8ba64679..5a22a612 100644 --- a/hbt/selection/lepton.py +++ b/hbt/selection/lepton.py @@ -43,6 +43,27 @@ def trigger_object_matching( return any_match +def update_channel_ids( + events: ak.Array, + previous_channel_ids: ak.Array, + correct_channel_id: int, + channel_mask: ak.Array, +) -> ak.Array: + """ + Check if the events in the is_mask can be inside the given channel + or have already been sorted in another channel before. + """ + events_not_in_channel = (previous_channel_ids != 0) & (previous_channel_ids != correct_channel_id) + channel_id_overwrite = events_not_in_channel & channel_mask + if ak.any(channel_id_overwrite): + raise ValueError( + "The channel_ids of some events are being set to two different values. " + "The first event of this chunk concerned has index", + ak.where(channel_id_overwrite)[0], + ) + return ak.where(channel_mask, correct_channel_id, previous_channel_ids) + + @selector( uses={ "Electron.pt", "Electron.eta", "Electron.phi", "Electron.dxy", "Electron.dz", @@ -439,6 +460,7 @@ def lepton_selection( ) # lepton pair selecton per trigger via lepton counting + if trigger.has_tag({"single_e", "cross_e_tau"}): # expect 1 electron, 1 veto electron (the same one), 0 veto muons, and at least one tau is_etau = ( @@ -454,14 +476,13 @@ def lepton_selection( tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) is_os = e_charge == -tau_charge # store global variables - where = (channel_id == 0) & is_etau - channel_id = ak.where(where, ch_etau.id, channel_id) - tau2_isolated = ak.where(where, is_iso, tau2_isolated) - leptons_os = ak.where(where, is_os, leptons_os) - single_triggered = ak.where(where & is_single, True, single_triggered) - cross_triggered = ak.where(where & is_cross, True, cross_triggered) - sel_electron_indices = ak.where(where, electron_indices, sel_electron_indices) - sel_tau_indices = ak.where(where, tau_indices, sel_tau_indices) + channel_id = update_channel_ids(events, channel_id, ch_etau.id, is_etau) + tau2_isolated = ak.where(is_etau, is_iso, tau2_isolated) + leptons_os = ak.where(is_etau, is_os, leptons_os) + single_triggered = ak.where(is_etau & is_single, True, single_triggered) + cross_triggered = ak.where(is_etau & is_cross, True, cross_triggered) + sel_electron_indices = ak.where(is_etau, electron_indices, sel_electron_indices) + sel_tau_indices = ak.where(is_etau, tau_indices, sel_tau_indices) elif trigger.has_tag({"single_mu", "cross_mu_tau"}): # expect 1 muon, 1 veto muon (the same one), 0 veto electrons, and at least one tau @@ -478,14 +499,13 @@ def lepton_selection( tau_charge = ak.firsts(events.Tau[tau_indices].charge, axis=1) is_os = mu_charge == -tau_charge # store global variables - where = (channel_id == 0) & is_mutau - channel_id = ak.where(where, ch_mutau.id, channel_id) - tau2_isolated = ak.where(where, is_iso, tau2_isolated) - leptons_os = ak.where(where, is_os, leptons_os) - single_triggered = ak.where(where & is_single, True, single_triggered) - cross_triggered = ak.where(where & is_cross, True, cross_triggered) - sel_muon_indices = ak.where(where, muon_indices, sel_muon_indices) - sel_tau_indices = ak.where(where, tau_indices, sel_tau_indices) + channel_id = update_channel_ids(events, channel_id, ch_mutau.id, is_mutau) + tau2_isolated = ak.where(is_mutau, is_iso, tau2_isolated) + leptons_os = ak.where(is_mutau, is_os, leptons_os) + single_triggered = ak.where(is_mutau & is_single, True, single_triggered) + cross_triggered = ak.where(is_mutau & is_cross, True, cross_triggered) + sel_muon_indices = ak.where(is_mutau, muon_indices, sel_muon_indices) + sel_tau_indices = ak.where(is_mutau, tau_indices, sel_tau_indices) elif trigger.has_tag({"cross_tau_tau", "cross_tau_tau_vbf", "cross_tau_tau_jet"}): # expect 0 veto electrons, 0 veto muons and at least two taus of which one is isolated @@ -496,12 +516,10 @@ def lepton_selection( (ak.num(tau_indices, axis=1) >= 2) & (ak.sum(tau_iso_mask, axis=1) >= 1) ) - # special case for cross tau vbf trigger: # to avoid overlap, with non-vbf triggers, only one tau is allowed to have pt > 40 if trigger.has_tag("cross_tau_tau_vbf"): - is_tautau = is_tautau & (ak.num(events.Tau[tau_indices].pt > 40, axis=1) <= 1) - + is_tautau = is_tautau & (ak.sum(events.Tau[tau_indices].pt > 40, axis=1) <= 1) is_iso = ak.sum(tau_iso_mask, axis=1) >= 2 # tau_indices are sorted by highest isolation as cond. 1 and highest pt as cond. 2, so # the first two indices are exactly those selected by the full-blown pairing algorithm @@ -511,13 +529,12 @@ def lepton_selection( tau2_charge = ak.firsts(events.Tau[tau_indices].charge[..., 1:], axis=1) is_os = tau1_charge == -tau2_charge # store global variables - where = (channel_id == 0) & is_tautau - channel_id = ak.where(where, ch_tautau.id, channel_id) - tau2_isolated = ak.where(where, is_iso, tau2_isolated) - leptons_os = ak.where(where, is_os, leptons_os) - single_triggered = ak.where(where & is_single, True, single_triggered) - cross_triggered = ak.where(where & is_cross, True, cross_triggered) - sel_tau_indices = ak.where(where, tau_indices, sel_tau_indices) + channel_id = update_channel_ids(events, channel_id, ch_tautau.id, is_tautau) + tau2_isolated = ak.where(is_tautau, is_iso, tau2_isolated) + leptons_os = ak.where(is_tautau, is_os, leptons_os) + single_triggered = ak.where(is_tautau & is_single, True, single_triggered) + cross_triggered = ak.where(is_tautau & is_cross, True, cross_triggered) + sel_tau_indices = ak.where(is_tautau, tau_indices, sel_tau_indices) # some final type conversions channel_id = ak.values_astype(channel_id, np.uint8) diff --git a/hbt/tasks/__init__.py b/hbt/tasks/__init__.py index 381a505b..2ec46037 100644 --- a/hbt/tasks/__init__.py +++ b/hbt/tasks/__init__.py @@ -5,3 +5,4 @@ import hbt.tasks.base import hbt.tasks.stats import hbt.tasks.studies +import hbt.tasks.sync diff --git a/hbt/tasks/stats.py b/hbt/tasks/stats.py index c5aa96fa..67692a66 100644 --- a/hbt/tasks/stats.py +++ b/hbt/tasks/stats.py @@ -30,48 +30,65 @@ def run(self): # color helpers green = functools.partial(law.util.colored, color="green") + green_bright = functools.partial(law.util.colored, color="green", style="bright") + yellow = functools.partial(law.util.colored, color="yellow") + yellow_bright = functools.partial(law.util.colored, color="yellow", style="bright") red = functools.partial(law.util.colored, color="red") - red_bright = functools.partial(law.util.colored, color="red", style="bright") cyan = functools.partial(law.util.colored, color="cyan") + cyan_bright = functools.partial(law.util.colored, color="cyan", style="bright") bright = functools.partial(law.util.colored, style="bright") + def get_color(dataset_inst): + if dataset_inst.is_data: + return red + if dataset_inst.has_tag("nonresonant_signal"): + return green if dataset_inst.has_tag("ggf") else green_bright + if dataset_inst.has_tag("resonant_signal"): + return cyan if dataset_inst.has_tag("ggf") else cyan_bright + return yellow + # headers headers = ["Dataset", "Files", "Events"] # content rows = [] - sum_files_nominal, sum_events_nominal = 0, 0 - sum_files_syst, sum_events_syst = 0, 0 + sum_files_s_nonres, sum_events_s_nonres = 0, 0 + sum_files_s_res, sum_events_s_res = 0, 0 + sum_files_b_nom, sum_events_b_nom = 0, 0 + sum_files_b_syst, sum_events_b_syst = 0, 0 sum_files_data, sum_events_data = 0, 0 for dataset_inst in self.config_inst.datasets: - col = ( - cyan - if dataset_inst.is_data - else (green if dataset_inst.name.startswith("hh_") else red) - ) + col = get_color(dataset_inst) # nominal info rows.append([col(dataset_inst.name), dataset_inst.n_files, dataset_inst.n_events]) # increment sums if dataset_inst.is_data: sum_files_data += dataset_inst.n_files sum_events_data += dataset_inst.n_events + elif dataset_inst.has_tag("nonresonant_signal"): + sum_files_s_nonres += dataset_inst.n_files + sum_events_s_nonres += dataset_inst.n_events + elif dataset_inst.has_tag("resonant_signal"): + sum_files_s_res += dataset_inst.n_files + sum_events_s_res += dataset_inst.n_events else: - sum_files_nominal += dataset_inst.n_files - sum_events_nominal += dataset_inst.n_events + sum_files_b_nom += dataset_inst.n_files + sum_events_b_nom += dataset_inst.n_events # potential shifts for shift_name, info in dataset_inst.info.items(): if shift_name == "nominal" or shift_name not in self.config_inst.shifts: continue - rows.append([red_bright(f" → {shift_name}"), info.n_files, info.n_events]) + rows.append([yellow_bright(f" → {shift_name}"), info.n_files, info.n_events]) # increment sums - sum_files_syst += info.n_files - sum_events_syst += info.n_events + sum_files_b_syst += info.n_files + sum_events_b_syst += info.n_events + # sums - rows.append([bright("total MC (nominal)"), sum_files_nominal, sum_events_nominal]) - if sum_files_syst or sum_events_syst: - sum_files = sum_files_nominal + sum_files_syst - sum_events = sum_events_nominal + sum_events_syst - rows.append([bright("total MC (all)"), sum_files, sum_events]) + rows.append([bright("total signal (non-res.)"), sum_files_s_nonres, sum_events_s_nonres]) + rows.append([bright("total signal (res.)"), sum_files_s_res, sum_events_s_res]) + rows.append([bright("total background (nominal)"), sum_files_b_nom, sum_events_b_nom]) + if sum_files_b_syst or sum_events_b_syst: + rows.append([bright("total background (syst.)"), sum_files_b_syst, sum_events_b_syst]) if sum_files_data or sum_events_data: rows.append([bright("total data"), sum_files_data, sum_events_data]) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py new file mode 100644 index 00000000..52d008bd --- /dev/null +++ b/hbt/tasks/sync.py @@ -0,0 +1,366 @@ +# coding: utf-8 + +""" +Tasks that create files for synchronization efforts with other frameworks. +""" + +from __future__ import annotations + +import luigi +import law + + +from columnflow.tasks.framework.base import Requirements, DatasetTask +from columnflow.tasks.framework.mixins import ( + ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin, +) +from columnflow.tasks.framework.remote import RemoteWorkflow +from columnflow.tasks.external import GetDatasetLFNs +from columnflow.tasks.reduction import ReducedEventsUser +from columnflow.tasks.production import ProduceColumns +from columnflow.tasks.ml import MLEvaluation +from columnflow.util import dev_sandbox + +from hbt.tasks.base import HBTTask +from hbt.util import hash_events + + +class CheckExternalLFNOverlap( + HBTTask, + DatasetTask, +): + + lfn = luigi.Parameter( + description="local path to an external LFN to check for overlap with the dataset", + # fetched via nanogen's FetchLFN + default="/pnfs/desy.de/cms/tier2/store/user/mrieger/nanogen_store/FetchLFN/store/mc/Run3Summer22NanoAODv12/GluGlutoHHto2B2Tau_kl-1p00_kt-1p00_c2-0p00_LHEweights_TuneCP5_13p6TeV_powheg-pythia8/NANOAODSIM/130X_mcRun3_2022_realistic_v5-v2/50000/992697da-4a10-4435-b63a-413f6d33517e.root", # noqa + ) + + # no versioning required + version = None + + # default sandbox, might be overwritten by calibrator function + sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox")) + + # upstream requirements + reqs = Requirements( + GetDatasetLFNs=GetDatasetLFNs, + ) + + def requires(self): + return self.reqs.GetDatasetLFNs.req(self) + + def output(self): + return { + "overlap": self.target("lfn_overlap.json"), + "unique_identifiers": self.target("unique_identifiers.parquet"), + } + + def run(self): + import awkward as ak + import numpy as np + + # load the index columns of the reference lfn + output = self.output() + + with self.publish_step("loading reference ids"): + ref_hashes = hash_events(self.load_nano_index(law.LocalFileTarget(self.lfn))) + + # loop over all lfns in the dataset + n_files = self.dataset_inst.n_files + lfns_task = self.requires() + relative_overlap = {} + overlapping_identifier = [] + + for i, lfn_target in lfns_task.iter_nano_files(self, lfn_indices=list(range(n_files))): + with self.publish_step(f"loading ids of file {i}"): + file_arr = self.load_nano_index(lfn_target) + file_hashes = hash_events(file_arr) + # find unique hashes in the reference and the file + # faster than np. + overlapping_mask = np.isin( + file_hashes, + ref_hashes, + assume_unique=True, + kind="sort", + ) + + num_overlapping = np.sum(overlapping_mask) + if num_overlapping: + # calculate the relative overlap + relative_overlap[str(i)] = np.sum(num_overlapping) / len(file_hashes) + + # apply mask and store the overlapping identifiers + overlapping_identifier.extend(file_arr[overlapping_mask]) + + output["overlap"].dump( + relative_overlap, + formatter="json", + ) + + output["unique_identifiers"].dump( + ak.Array(overlapping_identifier), + formatter="awkward", + ) + + @classmethod + def load_nano_index(cls, lfn_target: law.FileSystemFileTarget) -> set[int]: + fields = ["event", "luminosityBlock", "run"] + arr = lfn_target.load(formatter="uproot")["Events"].arrays(fields) + return arr + + +class CreateSyncFiles( + HBTTask, + MLModelsMixin, + ProducersMixin, + ChunkedIOMixin, + ReducedEventsUser, + SelectorMixin, + law.LocalWorkflow, + RemoteWorkflow, +): + filter_file = luigi.Parameter( + description="local path to a file containing event unique identifier to filter them out", + default="", + ) + + sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox")) + + # upstream requirements + reqs = Requirements( + ReducedEventsUser.reqs, + RemoteWorkflow.reqs, + ProduceColumns=ProduceColumns, + MLEvaluation=MLEvaluation, + ) + + def workflow_requires(self): + reqs = super().workflow_requires() + + # require the full merge forest + reqs["events"] = self.reqs.ProvideReducedEvents.req(self) + + if not self.pilot: + if self.producer_insts: + reqs["producers"] = [ + self.reqs.ProduceColumns.req(self, producer=producer_inst.cls_name) + for producer_inst in self.producer_insts + if producer_inst.produced_columns + ] + if self.ml_model_insts: + reqs["ml"] = [ + self.reqs.MLEvaluation.req(self, ml_model=m) + for m in self.ml_models + ] + + return reqs + + def requires(self): + reqs = {"events": self.reqs.ProvideReducedEvents.req(self)} + + if self.producer_insts: + reqs["producers"] = [ + self.reqs.ProduceColumns.req(self, producer=producer_inst.cls_name) + for producer_inst in self.producer_insts + if producer_inst.produced_columns + ] + if self.ml_model_insts: + reqs["ml"] = [ + self.reqs.MLEvaluation.req(self, ml_model=m) + for m in self.ml_models + ] + + return reqs + + workflow_condition = ReducedEventsUser.workflow_condition.copy() + + @workflow_condition.output + def output(self): + return self.target(f"sync_{self.dataset_inst.name}_{self.branch}.csv") + + @law.decorator.log + @law.decorator.localize + def run(self): + import awkward as ak + import numpy as np + from columnflow.columnar_util import update_ak_array, EMPTY_FLOAT, set_ak_column + + # prepare inputs and outputs + inputs = self.input() + output = self.output() + + # iterate over chunks of events and diffs + files = [inputs["events"]["events"].abspath] + if self.producer_insts: + files.extend([inp["columns"].abspath for inp in inputs["producers"]]) + if self.ml_model_insts: + files.extend([inp["mlcolumns"].abspath for inp in inputs["ml"]]) + + # helper to replace our internal empty float placeholder with a custom one + empty_float = EMPTY_FLOAT # points to same value for now, but can be changed + def replace_empty_float(arr): + if empty_float != EMPTY_FLOAT: + arr = ak.where(arr == EMPTY_FLOAT, empty_float, arr) + return arr + + # helper to pad nested fields with an empty float if missing + def pad_nested(arr, n, *, axis=1): + return ak.fill_none(ak.pad_none(arr, n, axis=axis), empty_float) + + # helper to pad and select the last element on the first inner axis + def select(arr, idx): + padded = pad_nested(arr, idx + 1, axis=-1) + return replace_empty_float(padded if arr.ndim == 1 else padded[:, idx]) + + # helper to select leptons + def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> ak.Array: + # ensure all lepton arrays have the same common fields + leptons = [events.Electron, events.Muon, events.Tau] + for i in range(len(leptons)): + lepton = leptons[i] + for field, default in common_fields.items(): + if field not in lepton.fields: + lepton = set_ak_column(lepton, field, default) + leptons[i] = lepton + # concatenate (first event any lepton, second alsways tau) and add to events + return set_ak_column(events, "Lepton", ak.concatenate(leptons, axis=1)) + + # event chunk loop + + # optional filter to get only the events that overlap with given external LFN + if self.filter_file: + with self.publish_step("loading reference ids"): + events_to_filter = law.LocalFileTarget(self.filter_file).load(formatter="awkward") + filter_events = hash_events(events_to_filter) + + for (events, *columns), pos in self.iter_chunked_io( + files, + source_type=len(files) * ["awkward_parquet"], + pool_size=1, + ): + + # optional check for overlapping inputs + if self.check_overlapping_inputs: + self.raise_if_overlapping([events] + list(columns)) + + # add additional columns + events = update_ak_array(events, *columns) + + # apply mask if optional filter is given + # calculate mask by using 1D hash values + if self.filter_file: + mask = np.isin( + hash_events(events), + filter_events, + assume_unique=True, + kind="sort", + ) + + # apply mask + events = events[mask] + + # insert leptons + events = select_leptons(events, {"rawDeepTau2018v2p5VSjet": empty_float}) + + # project into dataframe + df = ak.to_dataframe({ + # index variables + "event": events.event, + "run": events.run, + "lumi": events.luminosityBlock, + # high-level events variables + "channel": events.channel_id, + "os": events.leptons_os * 1, + "iso": events.tau2_isolated * 1, + # jet variables + "jet1_pt": select(events.Jet.pt, 0), + "jet1_eta": select(events.Jet.eta, 0), + "jet1_phi": select(events.Jet.phi, 0), + "jet2_pt": select(events.Jet.pt, 1), + "jet2_eta": select(events.Jet.eta, 1), + "jet2_phi": select(events.Jet.phi, 1), + "lep1_pt": select(events.Lepton.pt, 0), + "lep1_phi": select(events.Lepton.phi, 0), + "lep1_eta": select(events.Lepton.eta, 0), + "lep1_charge": select(events.Lepton.charge, 0), + "lep1_deeptauvsjet": select(events.Lepton.rawDeepTau2018v2p5VSjet, 0), + "lep2_pt": select(events.Lepton.pt, 1), + "lep2_phi": select(events.Lepton.phi, 1), + "lep2_eta": select(events.Lepton.eta, 1), + "lep2_charge": select(events.Lepton.charge, 1), + "lep2_deeptauvsjet": select(events.Lepton.rawDeepTau2018v2p5VSjet, 1), + "electron1_charge": select(events.Electron.charge, 0), + "electron1_eta": select(events.Electron.eta, 0), + "electron1_mass": select(events.Electron.mass, 0), + "electron1_phi": select(events.Electron.phi, 0), + "electron1_pt": select(events.Electron.pt, 0), + "electron2_charge": select(events.Electron.charge, 1), + "electron2_eta": select(events.Electron.eta, 1), + "electron2_mass": select(events.Electron.mass, 1), + "electron2_phi": select(events.Electron.phi, 1), + "electron2_pt": select(events.Electron.pt, 1), + + "muon1_charge": select(events.Muon.charge, 0), + "muon1_eta": select(events.Muon.eta, 0), + "muon1_mass": select(events.Muon.mass, 0), + "muon1_phi": select(events.Muon.phi, 0), + "muon1_pt": select(events.Muon.pt, 0), + "muon2_charge": select(events.Muon.charge, 1), + "muon2_eta": select(events.Muon.eta, 1), + "muon2_mass": select(events.Muon.mass, 1), + "muon2_phi": select(events.Muon.phi, 1), + "muon2_pt": select(events.Muon.pt, 1), + + "tau1_charge": select(events.Tau.charge, 0), + "tau1_eta": select(events.Tau.eta, 0), + "tau1_mass": select(events.Tau.mass, 0), + "tau1_phi": select(events.Tau.phi, 0), + "tau1_pt": select(events.Tau.pt, 0), + "tau2_charge": select(events.Tau.charge, 1), + "tau2_eta": select(events.Tau.eta, 1), + "tau2_mass": select(events.Tau.mass, 1), + "tau2_phi": select(events.Tau.phi, 1), + "tau2_pt": select(events.Tau.pt, 1), + + "met1_covXX": select(events.MET.covXX, 0), + "met1_covXY": select(events.MET.covXY, 0), + "met1_covYY": select(events.MET.covYY, 0), + "met1_phi": select(events.MET.phi, 0), + "met1_pt": select(events.MET.pt, 0), + "met1_significance": select(events.MET.significance, 0), + + "fatjet1_eta": select(events.FatJet.eta, 0), + "fatjet1_mass": select(events.FatJet.mass, 0), + "fatjet1_phi": select(events.FatJet.phi, 0), + "fatjet1_pt": select(events.FatJet.pt, 0), + "fatjet1_tau1": select(events.FatJet.tau1, 0), + "fatjet1_tau2": select(events.FatJet.tau2, 0), + "fatjet1_tau3": select(events.FatJet.tau3, 0), + "fatjet1_tau4": select(events.FatJet.tau4, 0), + + "fatjet2_eta": select(events.FatJet.eta, 1), + "fatjet2_mass": select(events.FatJet.mass, 1), + "fatjet2_phi": select(events.FatJet.phi, 1), + "fatjet2_pt": select(events.FatJet.pt, 1), + "fatjet2_tau1": select(events.FatJet.tau1, 1), + "fatjet2_tau2": select(events.FatJet.tau2, 1), + "fatjet2_tau3": select(events.FatJet.tau3, 1), + "fatjet2_tau4": select(events.FatJet.tau4, 1), + }) + + # save as csv in output, append if necessary + output.dump( + df, + formatter="pandas", + index=False, + header=pos.index == 0, + mode="w" if pos.index == 0 else "a", + ) + + +check_overlap_tasks = law.config.get_expanded("analysis", "check_overlapping_inputs", [], split_csv=True) +CreateSyncFiles.check_overlapping_inputs = ChunkedIOMixin.check_overlapping_inputs.copy( + default=CreateSyncFiles.task_family in check_overlap_tasks, + add_default_to_description=True, +) diff --git a/hbt/util.py b/hbt/util.py index fa4a710a..5df01c05 100644 --- a/hbt/util.py +++ b/hbt/util.py @@ -10,6 +10,9 @@ from columnflow.types import Any from columnflow.columnar_util import ArrayFunction, deferred_column +from columnflow.util import maybe_import + +np = maybe_import("numpy") @deferred_column @@ -57,3 +60,35 @@ def IF_DATASET_IS_DY( return self.get() return self.get() if func.dataset_inst.has_tag("is_dy") else None + + +def hash_events(arr: np.ndarray) -> np.ndarray: + """ + Helper function to create a hash value from the event, run and luminosityBlock columns. + The values are padded to specific lengths and concatenated to a single integer. + """ + import awkward as ak + + def assert_value(arr: np.ndarray, field: str, max_value: int) -> None: + """ + Helper function to check if a column does not exceed a maximum value. + """ + digits = len(str(arr[field].to_numpy().max())) + assert digits <= max_value, f"{field} digit count is {digits} and exceed max value {max_value}" + + max_digits_run = 6 + max_digits_luminosityBlock = 6 + max_digits_event = 8 + assert_value(arr, "run", max_digits_run) + assert_value(arr, "luminosityBlock", max_digits_luminosityBlock) + assert_value(arr, "event", max_digits_event) + + max_digits_hash = max_digits_event + max_digits_luminosityBlock + max_digits_run + assert max_digits_hash <= 20, "sum of digits exceeds int64" + + # upcast to int64 to avoid overflow + return ( + ak.values_astype(arr.run, np.int64) * 10**(max_digits_luminosityBlock + max_digits_event) + + ak.values_astype(arr.luminosityBlock, np.int64) * 10**max_digits_event + + ak.values_astype(arr.event, np.int64) + ) diff --git a/law.cfg b/law.cfg index 81e7119d..365058a1 100644 --- a/law.cfg +++ b/law.cfg @@ -99,18 +99,33 @@ lfn_sources: wlcg_fs_desy_store, wlcg_fs_infn_redirector, wlcg_fs_global_redirec # the "store_parts_modifiers" can be the name of a function in the "store_parts_modifiers" aux dict # of the analysis instance, which is called with an output's store parts of an output to modify them # specific locations -run3_2022_preEE*__cf.CalibrateEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.SelectEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.ReduceEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeReducedEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeReductionStats: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeSelectionStats: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.ProduceColumns: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.UniteColumns: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.CreateHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.MergeShiftedHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos -run3_2022_preEE*__cf.CreateDatacards: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +; 22pre +run3_2022_preEE__cf.CalibrateEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.SelectEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.ReduceEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeReducedEvents: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeReductionStats: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeSelectionStats: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.ProduceColumns: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.UniteColumns: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.CreateHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.MergeShiftedHistograms: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +run3_2022_preEE__cf.CreateDatacards: wlcg_mirrored, local_fs_desy_nprouvos, wlcg_fs_desy_nprouvos +; 22post +run3_2022_postEE__cf.CalibrateEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.SelectEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.ReduceEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeReducedEvents: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeReductionStats: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeSelectionStats: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.ProduceColumns: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.UniteColumns: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.CreateHistograms: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeHistograms: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.MergeShiftedHistograms: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan +run3_2022_postEE__cf.CreateDatacards: wlcg_mirrored, local_fs_desy_aalvesan, wlcg_fs_desy_aalvesan + # fallbacks cf.BundleRepo: wlcg cf.BundleSoftware: wlcg @@ -132,14 +147,15 @@ cf.MergeMLEvents: wlcg cf.MLTraining: wlcg cf.MLEvaluation: wlcg cf.UniteColumns: wlcg +cf.CreateSyncFile: wlcg [versions] -run3_2022_preEE*__cf.CalibrateEvents: prod1 -run3_2022_preEE*__cf.MergeSelectionStats: prod1 -run3_2022_preEE*__cf.MergeReductionStats: prod1 -run3_2022_preEE*__cf.ProvideReducedEvents: prod1 +run3_2022_preEE__cf.CalibrateEvents: prod1 +run3_2022_preEE__cf.MergeSelectionStats: prod1 +run3_2022_preEE__cf.MergeReductionStats: prod1 +run3_2022_preEE__cf.ProvideReducedEvents: prod1 # @@ -352,6 +368,23 @@ cache_global_lock: True base: file:///pnfs/desy.de/cms/tier2/store/user/nprouvos/nanogen_store/MergeNano/config_22pre_v12/prod3 +[wlcg_fs_run3_2022_postEE_nano_uhh_v12] + +webdav_base: davs://dcache-cms-webdav-wan.desy.de:2880/pnfs/desy.de/cms/tier2/store/user/aalvesan/nanogen_store/MergeNano/config_22post_v12/prod1 +gsiftp_base: gsiftp://dcache-door-cms04.desy.de:2811/pnfs/desy.de/cms/tier2/store/user/aalvesan/nanogen_store/MergeNano/config_22post_v12/prod1 +xrootd_base: root://dcache-cms-xrootd.desy.de:1094/pnfs/desy.de/cms/tier2/store/user/aalvesan/nanogen_store/MergeNano/config_22post_v12/prod1 +base: &::xrootd_base +use_cache: $CF_WLCG_USE_CACHE +cache_root: $CF_WLCG_CACHE_ROOT +cache_cleanup: $CF_WLCG_CACHE_CLEANUP +cache_max_size: 15GB +cache_global_lock: True + +[local_fs_run3_2022_postEE_nano_uhh_v12] + +base: file:///pnfs/desy.de/cms/tier2/store/user/aalvesan/nanogen_store/MergeNano/config_22post_v12/prod1 + + # # file systems for central LFNs # @@ -417,3 +450,4 @@ naf_wiedersb: 5000 naf_prouvost: 5000 naf_haddadan: 5000 naf_nguyenth: 5000 +naf_wardrobe: 5000 diff --git a/modules/cmsdb b/modules/cmsdb index c88e17bf..2731b76d 160000 --- a/modules/cmsdb +++ b/modules/cmsdb @@ -1 +1 @@ -Subproject commit c88e17bfb30ab9a39e85fe73e070a221dea02f8d +Subproject commit 2731b76dc166fb5cad377e38958fb3ae77caa51c diff --git a/modules/columnflow b/modules/columnflow index 11ef864d..895a3ebe 160000 --- a/modules/columnflow +++ b/modules/columnflow @@ -1 +1 @@ -Subproject commit 11ef864d4295e31ad9ff103e7236ee2b77f31aa1 +Subproject commit 895a3ebe963babe1fb6dc9c74f9448828808ac0b diff --git a/sandboxes/columnar_tf.txt b/sandboxes/columnar_tf.txt index 869daec7..7c7aab36 100644 --- a/sandboxes/columnar_tf.txt +++ b/sandboxes/columnar_tf.txt @@ -1,4 +1,4 @@ -# version 8 +# version 9 -r ../modules/columnflow/sandboxes/columnar.txt diff --git a/sandboxes/columnar_torch.txt b/sandboxes/columnar_torch.txt new file mode 100644 index 00000000..e8838457 --- /dev/null +++ b/sandboxes/columnar_torch.txt @@ -0,0 +1,7 @@ +# version 1 + +-r ../modules/columnflow/sandboxes/columnar.txt + +torch~=2.5.1 +lightning~=2.4.0 +torchmetrics~=1.6.0 diff --git a/sandboxes/venv_columnar_torch.sh b/sandboxes/venv_columnar_torch.sh new file mode 100644 index 00000000..589742c0 --- /dev/null +++ b/sandboxes/venv_columnar_torch.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash + +# Script that sets up a virtual env in $CF_VENV_PATH. +# For more info on functionality and parameters, see the generic setup script _setup_venv.sh. + +action() { + local shell_is_zsh="$( [ -z "${ZSH_VERSION}" ] && echo "false" || echo "true" )" + local this_file="$( ${shell_is_zsh} && echo "${(%):-%x}" || echo "${BASH_SOURCE[0]}" )" + local this_dir="$( cd "$( dirname "${this_file}" )" && pwd )" + + # set variables and source the generic venv setup + export CF_SANDBOX_FILE="${CF_SANDBOX_FILE:-${this_file}}" + export CF_VENV_NAME="$( basename "${this_file%.sh}" )" + export CF_VENV_REQUIREMENTS="${this_dir}/columnar_torch.txt" + + source "${CF_BASE}/sandboxes/_setup_venv.sh" "$@" +} +action "$@" diff --git a/sandboxes/venv_columnar_torch_dev.sh b/sandboxes/venv_columnar_torch_dev.sh new file mode 100644 index 00000000..27c35e68 --- /dev/null +++ b/sandboxes/venv_columnar_torch_dev.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash + +# Script that sets up a virtual env in $CF_VENV_PATH. +# For more info on functionality and parameters, see the generic setup script _setup_venv.sh. + +action() { + local shell_is_zsh="$( [ -z "${ZSH_VERSION}" ] && echo "false" || echo "true" )" + local this_file="$( ${shell_is_zsh} && echo "${(%):-%x}" || echo "${BASH_SOURCE[0]}" )" + local this_dir="$( cd "$( dirname "${this_file}" )" && pwd )" + + # set variables and source the generic venv setup + export CF_SANDBOX_FILE="${CF_SANDBOX_FILE:-${this_file}}" + export CF_VENV_NAME="$( basename "${this_file%.sh}" )" + export CF_VENV_REQUIREMENTS="${this_dir}/columnar_torch.txt,${CF_BASE}/sandboxes/dev.txt" + + source "${CF_BASE}/sandboxes/_setup_venv.sh" "$@" +} +action "$@" diff --git a/setup.sh b/setup.sh index 62cca430..2c8a0b07 100644 --- a/setup.sh +++ b/setup.sh @@ -28,7 +28,7 @@ setup_hbt() { # A flag that is set to 1 after the setup was successful. # prevent repeated setups - if [ "${HBT_SETUP}" = "1" ]; then + if [ "${HBT_SETUP}" = "1" ] && [ "${CF_ON_SLURM}" != "1" ]; then >&2 echo "the HH -> bbtautau analysis was already succesfully setup" >&2 echo "re-running the setup requires a new shell" return "1" @@ -45,6 +45,7 @@ setup_hbt() { local orig="${PWD}" local setup_name="${1:-default}" local setup_is_default="false" + local env_is_remote="$( [ "${CF_REMOTE_ENV}" = "1" ] && echo "true" || echo "false" )" [ "${setup_name}" = "default" ] && setup_is_default="true" # zsh options @@ -72,7 +73,7 @@ setup_hbt() { CF_SKIP_SETUP="1" source "${CF_BASE}/setup.sh" "" || return "$?" # interactive setup - if [ "${CF_REMOTE_ENV}" != "1" ]; then + if ! ${env_is_remote}; then cf_setup_interactive_body() { # the flavor will be cms export CF_FLAVOR="cms" @@ -133,12 +134,24 @@ setup_hbt() { export LAW_HOME="${LAW_HOME:-${HBT_BASE}/.law}" export LAW_CONFIG_FILE="${LAW_CONFIG_FILE:-${HBT_BASE}/law.cfg}" - if which law &> /dev/null; then - # source law's bash completion scipt - source "$( law completion )" "" + # run the indexing when not remote + if ! ${env_is_remote}; then + if which law &> /dev/null; then + # source law's bash completion scipt + source "$( law completion )" "" - # silently index - law index -q + # add completion to the claw command + complete -o bashdefault -o default -F _law_complete claw + + # silently index + law index -q + fi + fi + + # update the law config file to switch from mirrored to bare wlcg targets + # as local mounts are typically not available remotely + if ${env_is_remote}; then + sed -i -r 's/(.+\: ?)wlcg_mirrored, local_.+, ?(wlcg_[^\s]+)/\1wlcg, \2/g' "${LAW_CONFIG_FILE}" fi # finalize