From 5f480910941f6a222c5c5a3aa4166016d316c85e Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Thu, 24 Oct 2024 13:26:37 +0200 Subject: [PATCH 1/7] Add task to create csv file for syncronization tasks --- hbt/tasks/__init__.py | 1 + hbt/tasks/sync_csv.py | 264 ++++++++++++++++++++++++++++++++++++++++++ law.cfg | 1 + 3 files changed, 266 insertions(+) create mode 100644 hbt/tasks/sync_csv.py diff --git a/hbt/tasks/__init__.py b/hbt/tasks/__init__.py index 381a505b..87c04d8a 100644 --- a/hbt/tasks/__init__.py +++ b/hbt/tasks/__init__.py @@ -5,3 +5,4 @@ import hbt.tasks.base import hbt.tasks.stats import hbt.tasks.studies +import hbt.tasks.sync_csv diff --git a/hbt/tasks/sync_csv.py b/hbt/tasks/sync_csv.py new file mode 100644 index 00000000..c69261f4 --- /dev/null +++ b/hbt/tasks/sync_csv.py @@ -0,0 +1,264 @@ +# coding: utf-8 + +""" +Task to create a single file csv file for framework sync with other frameworks. +""" + +import luigi +import law + + +from columnflow.tasks.framework.base import Requirements, AnalysisTask, wrapper_factory +from columnflow.tasks.framework.mixins import ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin +from columnflow.tasks.framework.remote import RemoteWorkflow +from columnflow.tasks.reduction import ReducedEventsUser +from columnflow.tasks.production import ProduceColumns +from columnflow.tasks.ml import MLEvaluation +from columnflow.tasks.selection import MergeSelectionMasks +from columnflow.util import dev_sandbox, DotDict + + +class CreateSyncFile( + MLModelsMixin, + ProducersMixin, + ChunkedIOMixin, + ReducedEventsUser, + SelectorMixin, + law.LocalWorkflow, + RemoteWorkflow, +): + sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox")) + + file_type = luigi.ChoiceParameter( + default="csv", + choices=("csv",), + description="the file type to create; choices: csv; default: csv", + ) + + # upstream requirements + reqs = Requirements( + ReducedEventsUser.reqs, + RemoteWorkflow.reqs, + ProduceColumns=ProduceColumns, + MLEvaluation=MLEvaluation, + MergeSelectionMasks=MergeSelectionMasks, + ) + + def workflow_requires(self): + reqs = super().workflow_requires() + + # require the full merge forest + reqs["events"] = self.reqs.ProvideReducedEvents.req(self) + + if not self.pilot: + if self.producer_insts: + reqs["producers"] = [ + self.reqs.ProduceColumns.req(self, producer=producer_inst.cls_name) + for producer_inst in self.producer_insts + if producer_inst.produced_columns + ] + if self.ml_model_insts: + reqs["ml"] = [ + self.reqs.MLEvaluation.req(self, ml_model=m) + for m in self.ml_models + ] + + return reqs + + def requires(self): + reqs = {"events": self.reqs.ProvideReducedEvents.req(self)} + + if self.producer_insts: + reqs["producers"] = [ + self.reqs.ProduceColumns.req(self, producer=producer_inst.cls_name) + for producer_inst in self.producer_insts + if producer_inst.produced_columns + ] + if self.ml_model_insts: + reqs["ml"] = [ + self.reqs.MLEvaluation.req(self, ml_model=m) + for m in self.ml_models + ] + + return reqs + + workflow_condition = ReducedEventsUser.workflow_condition.copy() + + @workflow_condition.output + def output(self): + return self.target(f"sync_file_{self.branch}.{self.file_type}") + + @law.decorator.log + @law.decorator.localize(input=True, output=True) + @law.decorator.safe_output + def run(self): + from columnflow.columnar_util import ( + Route, RouteFilter, mandatory_coffea_columns, update_ak_array, + sort_ak_fields, EMPTY_FLOAT, + ) + import awkward as ak + import pandas as pd + def sync_columns(): + columns = { + "physics_objects": { + "Jet.{pt,eta,phi,mass}": 2, + }, + "flat_objects": [ + "event", + "run", + "channel_id", + "leptons_os", + "luminosityBlock", + "lep*", + ], + } + mapping = { + "luminosityBlock": "lumi", + "leptons_os": "is_os", + } + return columns, mapping + + def lepton_selection(events, attributes=["pt", "eta", "phi", "charge"]): + first_lepton = ak.concatenate([events["Electron"], events["Muon"]], axis=1) + second_lepton = events["Tau"] + for attribute in attributes: + events[f"lep1_{attribute}"] = first_lepton[attribute] + events[f"lep2_{attribute}"] = second_lepton[attribute] + return events + + def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): + """Helper function to convert awkward arrays to pandas dataframes. + + Args: + events (ak.array): Awkward array with nested structure. + physics_objects (Dict[str]): Dict of physics objects to consider, with value representing padding. + attributes (List[str]): List of attributes to consider. + flat_objects (List[str]): List of additional columns to consider, these are not padded. + + Returns: + pd.DataFrame: Pandas dataframe with flattened structure of the awkward array. + """ + events = sort_ak_fields(events) + f = DotDict() + # add meta columns (like event number, ...) + # these columns do not need padding + + # columns that need no padding (like event number, ...) + # resolve glob patterns + for flat_object_pattern in flat_objects: + for field in events.fields: + if law.util.fnmatch.fnmatch(field, flat_object_pattern): + f[field] = events[field] + + # add columns of physics objects + # columns are padded to given pad_length + for physics_pattern, pad_length in physics_objects.items(): + physics_objects = law.util.brace_expand(physics_pattern) + for physics_object in physics_objects: + physics_route = Route(physics_object) + parts = physics_route.fields + physics_array = physics_route.apply(events) + physics_array = ak.pad_none(physics_array, pad_length) + physics_array = ak.fill_none(physics_array, EMPTY_FLOAT) + for pad_number in range(0, pad_length): + full_name = f"{parts[0].lower()}{pad_number}_{parts[1].lower()}" + f[full_name] = physics_array[:, pad_number] + return ak.to_dataframe(f) + + # prepare inputs and outputs + inputs = self.input() + + # create a temp dir for saving intermediate files + tmp_dir = law.LocalDirectoryTarget(is_tmp=True) + tmp_dir.touch() + + # define columns that will be written + write_columns: set[Route] = set() + skip_columns: set[str] = set() + for c in self.config_inst.x.keep_columns.get(self.task_family, ["*"]): + for r in self._expand_keep_column(c): + if r.has_tag("skip"): + skip_columns.add(r.column) + else: + write_columns.add(r) + write_columns = { + r for r in write_columns + if not law.util.multi_match(r.column, skip_columns, mode=any) + } + route_filter = RouteFilter(write_columns) + + # define columns that need to be read + read_columns = write_columns | set(mandatory_coffea_columns) + read_columns = {Route(c) for c in read_columns} + + # iterate over chunks of events and diffs + files = [inputs["events"]["events"].abspath] + if self.producer_insts: + files.extend([inp["columns"].abspath for inp in inputs["producers"]]) + if self.ml_model_insts: + files.extend([inp["mlcolumns"].abspath for inp in inputs["ml"]]) + + pandas_frameworks = [] + for (events, *columns), pos in self.iter_chunked_io( + files, + source_type=len(files) * ["awkward_parquet"], + read_columns=len(files) * [read_columns], + ): + # optional check for overlapping inputs + if self.check_overlapping_inputs: + self.raise_if_overlapping([events] + list(columns)) + + # add additional columns + events = update_ak_array(events, *columns) + + # remove columns + events = route_filter(events) + + # optional check for finite values + if self.check_finite_output: + self.raise_if_not_finite(events) + + # construct first, second lepton columns + events = lepton_selection( + events, + attributes=["pt", "eta", "phi", "mass", "charge"], + ) + + # convert to pandas dataframe + keep_columns, mapping = sync_columns() + + events_pd = awkward_to_pandas( + events=events, + physics_objects=keep_columns["physics_objects"], + flat_objects=keep_columns["flat_objects"], + ) + + pandas_frameworks.append(events_pd) + + # merge output files + merged_pandas_framework = pd.concat( + pandas_frameworks, + ignore_index=True, + ).rename(columns=mapping, inplace=False) + self.output().dump(merged_pandas_framework, index=False, formatter="pandas") + + +# overwrite class defaults +check_finite_tasks = law.config.get_expanded("analysis", "check_finite_output", [], split_csv=True) +CreateSyncFile.check_finite_output = ChunkedIOMixin.check_finite_output.copy( + default=CreateSyncFile.task_family in check_finite_tasks, + add_default_to_description=True, +) + +check_overlap_tasks = law.config.get_expanded("analysis", "check_overlapping_inputs", [], split_csv=True) +CreateSyncFile.check_overlapping_inputs = ChunkedIOMixin.check_overlapping_inputs.copy( + default=CreateSyncFile.task_family in check_overlap_tasks, + add_default_to_description=True, +) + + +CreateSyncFileWrapper = wrapper_factory( + base_cls=AnalysisTask, + require_cls=CreateSyncFile, + enable=["configs", "skip_configs", "datasets", "skip_datasets", "shifts", "skip_shifts"], +) diff --git a/law.cfg b/law.cfg index 81e7119d..a58dc10a 100644 --- a/law.cfg +++ b/law.cfg @@ -132,6 +132,7 @@ cf.MergeMLEvents: wlcg cf.MLTraining: wlcg cf.MLEvaluation: wlcg cf.UniteColumns: wlcg +cf.CreateSyncFile: wlcg [versions] From c80f4547ea6ce7937fba08028d5beb29fca142fa Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Thu, 24 Oct 2024 16:13:33 +0200 Subject: [PATCH 2/7] rename sync_csv to sync, to match naming convention of analysis and fixed some typos --- hbt/tasks/__init__.py | 2 +- hbt/tasks/{sync_csv.py => sync.py} | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) rename hbt/tasks/{sync_csv.py => sync.py} (96%) diff --git a/hbt/tasks/__init__.py b/hbt/tasks/__init__.py index 87c04d8a..2ec46037 100644 --- a/hbt/tasks/__init__.py +++ b/hbt/tasks/__init__.py @@ -5,4 +5,4 @@ import hbt.tasks.base import hbt.tasks.stats import hbt.tasks.studies -import hbt.tasks.sync_csv +import hbt.tasks.sync diff --git a/hbt/tasks/sync_csv.py b/hbt/tasks/sync.py similarity index 96% rename from hbt/tasks/sync_csv.py rename to hbt/tasks/sync.py index c69261f4..f2cfcc84 100644 --- a/hbt/tasks/sync_csv.py +++ b/hbt/tasks/sync.py @@ -7,7 +7,6 @@ import luigi import law - from columnflow.tasks.framework.base import Requirements, AnalysisTask, wrapper_factory from columnflow.tasks.framework.mixins import ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin from columnflow.tasks.framework.remote import RemoteWorkflow @@ -243,20 +242,12 @@ def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): self.output().dump(merged_pandas_framework, index=False, formatter="pandas") -# overwrite class defaults -check_finite_tasks = law.config.get_expanded("analysis", "check_finite_output", [], split_csv=True) -CreateSyncFile.check_finite_output = ChunkedIOMixin.check_finite_output.copy( - default=CreateSyncFile.task_family in check_finite_tasks, - add_default_to_description=True, -) - check_overlap_tasks = law.config.get_expanded("analysis", "check_overlapping_inputs", [], split_csv=True) CreateSyncFile.check_overlapping_inputs = ChunkedIOMixin.check_overlapping_inputs.copy( default=CreateSyncFile.task_family in check_overlap_tasks, add_default_to_description=True, ) - CreateSyncFileWrapper = wrapper_factory( base_cls=AnalysisTask, require_cls=CreateSyncFile, From 5f605b94ded0c02333c3e33bd55e15b5d1ff07c5 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 08:49:57 +0200 Subject: [PATCH 3/7] Add pandas contrib. --- hbt/__init__.py | 3 +++ modules/columnflow | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/hbt/__init__.py b/hbt/__init__.py index 78ef9c08..9e393f0c 100644 --- a/hbt/__init__.py +++ b/hbt/__init__.py @@ -1,8 +1,11 @@ # coding: utf-8 +import law from hbt.columnflow_patches import patch_all +law.contrib.load("pandas") + # apply cf patches once patch_all() diff --git a/modules/columnflow b/modules/columnflow index 11ef864d..0652ad1e 160000 --- a/modules/columnflow +++ b/modules/columnflow @@ -1 +1 @@ -Subproject commit 11ef864d4295e31ad9ff103e7236ee2b77f31aa1 +Subproject commit 0652ad1eb169deb80f32a26466df30ecbfda5230 From eadebe7477b57fb8e7ae0976552f46b2b2c149a5 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 11:17:31 +0200 Subject: [PATCH 4/7] Slim csv writing. --- hbt/tasks/sync.py | 212 ++++++++++++++++------------------------------ 1 file changed, 71 insertions(+), 141 deletions(-) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py index f2cfcc84..87efeb28 100644 --- a/hbt/tasks/sync.py +++ b/hbt/tasks/sync.py @@ -1,23 +1,28 @@ # coding: utf-8 """ -Task to create a single file csv file for framework sync with other frameworks. +Tasks that create files for synchronization efforts with other frameworks. """ -import luigi +from __future__ import annotations + import law from columnflow.tasks.framework.base import Requirements, AnalysisTask, wrapper_factory -from columnflow.tasks.framework.mixins import ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin +from columnflow.tasks.framework.mixins import ( + ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin, +) from columnflow.tasks.framework.remote import RemoteWorkflow from columnflow.tasks.reduction import ReducedEventsUser from columnflow.tasks.production import ProduceColumns from columnflow.tasks.ml import MLEvaluation -from columnflow.tasks.selection import MergeSelectionMasks -from columnflow.util import dev_sandbox, DotDict +from columnflow.util import dev_sandbox + +from hbt.tasks.base import HBTTask -class CreateSyncFile( +class CreateSyncFiles( + HBTTask, MLModelsMixin, ProducersMixin, ChunkedIOMixin, @@ -28,19 +33,12 @@ class CreateSyncFile( ): sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox")) - file_type = luigi.ChoiceParameter( - default="csv", - choices=("csv",), - description="the file type to create; choices: csv; default: csv", - ) - # upstream requirements reqs = Requirements( ReducedEventsUser.reqs, RemoteWorkflow.reqs, ProduceColumns=ProduceColumns, MLEvaluation=MLEvaluation, - MergeSelectionMasks=MergeSelectionMasks, ) def workflow_requires(self): @@ -85,110 +83,17 @@ def requires(self): @workflow_condition.output def output(self): - return self.target(f"sync_file_{self.branch}.{self.file_type}") + return self.target(f"sync_{self.dataset_inst.name}_{self.branch}.csv") @law.decorator.log - @law.decorator.localize(input=True, output=True) - @law.decorator.safe_output + @law.decorator.localize def run(self): - from columnflow.columnar_util import ( - Route, RouteFilter, mandatory_coffea_columns, update_ak_array, - sort_ak_fields, EMPTY_FLOAT, - ) import awkward as ak - import pandas as pd - def sync_columns(): - columns = { - "physics_objects": { - "Jet.{pt,eta,phi,mass}": 2, - }, - "flat_objects": [ - "event", - "run", - "channel_id", - "leptons_os", - "luminosityBlock", - "lep*", - ], - } - mapping = { - "luminosityBlock": "lumi", - "leptons_os": "is_os", - } - return columns, mapping - - def lepton_selection(events, attributes=["pt", "eta", "phi", "charge"]): - first_lepton = ak.concatenate([events["Electron"], events["Muon"]], axis=1) - second_lepton = events["Tau"] - for attribute in attributes: - events[f"lep1_{attribute}"] = first_lepton[attribute] - events[f"lep2_{attribute}"] = second_lepton[attribute] - return events - - def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): - """Helper function to convert awkward arrays to pandas dataframes. - - Args: - events (ak.array): Awkward array with nested structure. - physics_objects (Dict[str]): Dict of physics objects to consider, with value representing padding. - attributes (List[str]): List of attributes to consider. - flat_objects (List[str]): List of additional columns to consider, these are not padded. - - Returns: - pd.DataFrame: Pandas dataframe with flattened structure of the awkward array. - """ - events = sort_ak_fields(events) - f = DotDict() - # add meta columns (like event number, ...) - # these columns do not need padding - - # columns that need no padding (like event number, ...) - # resolve glob patterns - for flat_object_pattern in flat_objects: - for field in events.fields: - if law.util.fnmatch.fnmatch(field, flat_object_pattern): - f[field] = events[field] - - # add columns of physics objects - # columns are padded to given pad_length - for physics_pattern, pad_length in physics_objects.items(): - physics_objects = law.util.brace_expand(physics_pattern) - for physics_object in physics_objects: - physics_route = Route(physics_object) - parts = physics_route.fields - physics_array = physics_route.apply(events) - physics_array = ak.pad_none(physics_array, pad_length) - physics_array = ak.fill_none(physics_array, EMPTY_FLOAT) - for pad_number in range(0, pad_length): - full_name = f"{parts[0].lower()}{pad_number}_{parts[1].lower()}" - f[full_name] = physics_array[:, pad_number] - return ak.to_dataframe(f) + from columnflow.columnar_util import update_ak_array, EMPTY_FLOAT # prepare inputs and outputs inputs = self.input() - - # create a temp dir for saving intermediate files - tmp_dir = law.LocalDirectoryTarget(is_tmp=True) - tmp_dir.touch() - - # define columns that will be written - write_columns: set[Route] = set() - skip_columns: set[str] = set() - for c in self.config_inst.x.keep_columns.get(self.task_family, ["*"]): - for r in self._expand_keep_column(c): - if r.has_tag("skip"): - skip_columns.add(r.column) - else: - write_columns.add(r) - write_columns = { - r for r in write_columns - if not law.util.multi_match(r.column, skip_columns, mode=any) - } - route_filter = RouteFilter(write_columns) - - # define columns that need to be read - read_columns = write_columns | set(mandatory_coffea_columns) - read_columns = {Route(c) for c in read_columns} + output = self.output() # iterate over chunks of events and diffs files = [inputs["events"]["events"].abspath] @@ -197,11 +102,35 @@ def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): if self.ml_model_insts: files.extend([inp["mlcolumns"].abspath for inp in inputs["ml"]]) - pandas_frameworks = [] + # helper to replace our internal empty float placeholder with a custom one + empty_float = EMPTY_FLOAT # points to same value for now, but can be changed + def replace_empty_float(arr): + if empty_float != EMPTY_FLOAT: + arr = ak.where(arr == EMPTY_FLOAT, empty_float, arr) + return arr + + # helper to pad nested fields with an empty float if missing + def pad_nested(arr, n, *, axis=1): + return ak.fill_none(ak.pad_none(arr, n, axis=axis), empty_float) + + # helper to pad and select the last element on the first inner axis + def select(arr, idx): + return replace_empty_float(pad_nested(arr, idx + 1, axis=1)[:, idx]) + + # TODO: use this helper + # def lepton_selection(events, attributes=["pt", "eta", "phi", "charge"]): + # first_lepton = ak.concatenate([events["Electron"], events["Muon"]], axis=1) + # second_lepton = events["Tau"] + # for attribute in attributes: + # events[f"lep1_{attribute}"] = first_lepton[attribute] + # events[f"lep2_{attribute}"] = second_lepton[attribute] + # return events + + # event chunk loop for (events, *columns), pos in self.iter_chunked_io( files, source_type=len(files) * ["awkward_parquet"], - read_columns=len(files) * [read_columns], + pool_size=1, ): # optional check for overlapping inputs if self.check_overlapping_inputs: @@ -210,46 +139,47 @@ def awkward_to_pandas(events, physics_objects, flat_objects=["event"]): # add additional columns events = update_ak_array(events, *columns) - # remove columns - events = route_filter(events) - # optional check for finite values if self.check_finite_output: self.raise_if_not_finite(events) - # construct first, second lepton columns - events = lepton_selection( - events, - attributes=["pt", "eta", "phi", "mass", "charge"], - ) - - # convert to pandas dataframe - keep_columns, mapping = sync_columns() - - events_pd = awkward_to_pandas( - events=events, - physics_objects=keep_columns["physics_objects"], - flat_objects=keep_columns["flat_objects"], + # project into dataframe + df = ak.to_dataframe({ + # index variables + "event": events.event, + "run": events.run, + "lumi": events.luminosityBlock, + # high-level events variables + "channel": events.channel_id, + "os": events.leptons_os * 1, + "iso": events.tau2_isolated * 1, + # jet variables + "jet1_pt": select(events.Jet.pt, 0), + "jet1_eta": select(events.Jet.eta, 0), + "jet2_pt": select(events.Jet.pt, 1), + "jet2_eta": select(events.Jet.eta, 1), + "jet8_eta": select(events.Jet.eta, 7), + # TODO: add additional variables + }) + + # save as csv in output, append if necessary + output.dump( + df, + formatter="pandas", + index=False, + header=pos.index == 0, + mode="w" if pos.index == 0 else "a", ) - pandas_frameworks.append(events_pd) - - # merge output files - merged_pandas_framework = pd.concat( - pandas_frameworks, - ignore_index=True, - ).rename(columns=mapping, inplace=False) - self.output().dump(merged_pandas_framework, index=False, formatter="pandas") - check_overlap_tasks = law.config.get_expanded("analysis", "check_overlapping_inputs", [], split_csv=True) -CreateSyncFile.check_overlapping_inputs = ChunkedIOMixin.check_overlapping_inputs.copy( - default=CreateSyncFile.task_family in check_overlap_tasks, +CreateSyncFiles.check_overlapping_inputs = ChunkedIOMixin.check_overlapping_inputs.copy( + default=CreateSyncFiles.task_family in check_overlap_tasks, add_default_to_description=True, ) -CreateSyncFileWrapper = wrapper_factory( +CreateSyncFilesWrapper = wrapper_factory( base_cls=AnalysisTask, - require_cls=CreateSyncFile, + require_cls=CreateSyncFiles, enable=["configs", "skip_configs", "datasets", "skip_datasets", "shifts", "skip_shifts"], ) From d1fb486e511b7b85172564ff1d39174de485e10d Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 11:20:38 +0200 Subject: [PATCH 5/7] Fix jet variables. --- hbt/tasks/sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py index 87efeb28..b1710637 100644 --- a/hbt/tasks/sync.py +++ b/hbt/tasks/sync.py @@ -156,9 +156,10 @@ def select(arr, idx): # jet variables "jet1_pt": select(events.Jet.pt, 0), "jet1_eta": select(events.Jet.eta, 0), + "jet1_phi": select(events.Jet.phi, 0), "jet2_pt": select(events.Jet.pt, 1), "jet2_eta": select(events.Jet.eta, 1), - "jet8_eta": select(events.Jet.eta, 7), + "jet2_phi": select(events.Jet.phi, 1), # TODO: add additional variables }) From a71310c59845efc80ae8857858fe07e73405f4b6 Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Fri, 25 Oct 2024 13:51:10 +0200 Subject: [PATCH 6/7] Removed wrapper since it is not needed, added Lepton selection --- hbt/tasks/sync.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py index b1710637..46193b4f 100644 --- a/hbt/tasks/sync.py +++ b/hbt/tasks/sync.py @@ -8,7 +8,7 @@ import law -from columnflow.tasks.framework.base import Requirements, AnalysisTask, wrapper_factory +from columnflow.tasks.framework.base import Requirements from columnflow.tasks.framework.mixins import ( ProducersMixin, MLModelsMixin, ChunkedIOMixin, SelectorMixin, ) @@ -89,7 +89,7 @@ def output(self): @law.decorator.localize def run(self): import awkward as ak - from columnflow.columnar_util import update_ak_array, EMPTY_FLOAT + from columnflow.columnar_util import update_ak_array, EMPTY_FLOAT, set_ak_column # prepare inputs and outputs inputs = self.input() @@ -117,14 +117,11 @@ def pad_nested(arr, n, *, axis=1): def select(arr, idx): return replace_empty_float(pad_nested(arr, idx + 1, axis=1)[:, idx]) - # TODO: use this helper - # def lepton_selection(events, attributes=["pt", "eta", "phi", "charge"]): - # first_lepton = ak.concatenate([events["Electron"], events["Muon"]], axis=1) - # second_lepton = events["Tau"] - # for attribute in attributes: - # events[f"lep1_{attribute}"] = first_lepton[attribute] - # events[f"lep2_{attribute}"] = second_lepton[attribute] - # return events + # helper to select leptons + def lepton_selection(events): + # first event any lepton, second alsways tau + lepton = ak.concatenate([events["Electron"], events["Muon"], events["Tau"]], axis=1) + return set_ak_column(events, "Lepton", lepton) # event chunk loop for (events, *columns), pos in self.iter_chunked_io( @@ -138,7 +135,7 @@ def select(arr, idx): # add additional columns events = update_ak_array(events, *columns) - + events = lepton_selection(events) # optional check for finite values if self.check_finite_output: self.raise_if_not_finite(events) @@ -160,6 +157,14 @@ def select(arr, idx): "jet2_pt": select(events.Jet.pt, 1), "jet2_eta": select(events.Jet.eta, 1), "jet2_phi": select(events.Jet.phi, 1), + "lep1_pt": select(events.Lepton.pt, 1), + "lep1_phi": select(events.Lepton.phi, 1), + "lep1_eta": select(events.Lepton.eta, 1), + "lep1_charge": select(events.Lepton.charge, 1), + "lep2_pt": select(events.Lepton.pt, 2), + "lep2_phi": select(events.Lepton.phi, 2), + "lep2_eta": select(events.Lepton.eta, 2), + "lep2_charge": select(events.Lepton.charge, 2), # TODO: add additional variables }) @@ -178,9 +183,3 @@ def select(arr, idx): default=CreateSyncFiles.task_family in check_overlap_tasks, add_default_to_description=True, ) - -CreateSyncFilesWrapper = wrapper_factory( - base_cls=AnalysisTask, - require_cls=CreateSyncFiles, - enable=["configs", "skip_configs", "datasets", "skip_datasets", "shifts", "skip_shifts"], -) From a7cc592d843f142438e46c642808e92375de53d6 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 14:19:08 +0200 Subject: [PATCH 7/7] Fix lepton handling. --- hbt/tasks/sync.py | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/hbt/tasks/sync.py b/hbt/tasks/sync.py index 46193b4f..cabf2dbc 100644 --- a/hbt/tasks/sync.py +++ b/hbt/tasks/sync.py @@ -118,10 +118,17 @@ def select(arr, idx): return replace_empty_float(pad_nested(arr, idx + 1, axis=1)[:, idx]) # helper to select leptons - def lepton_selection(events): - # first event any lepton, second alsways tau - lepton = ak.concatenate([events["Electron"], events["Muon"], events["Tau"]], axis=1) - return set_ak_column(events, "Lepton", lepton) + def select_leptons(events: ak.Array, common_fields: dict[str, int | float]) -> ak.Array: + # ensure all lepton arrays have the same common fields + leptons = [events.Electron, events.Muon, events.Tau] + for i in range(len(leptons)): + lepton = leptons[i] + for field, default in common_fields.items(): + if field not in lepton.fields: + lepton = set_ak_column(lepton, field, default) + leptons[i] = lepton + # concatenate (first event any lepton, second alsways tau) and add to events + return set_ak_column(events, "Lepton", ak.concatenate(leptons, axis=1)) # event chunk loop for (events, *columns), pos in self.iter_chunked_io( @@ -135,10 +142,9 @@ def lepton_selection(events): # add additional columns events = update_ak_array(events, *columns) - events = lepton_selection(events) - # optional check for finite values - if self.check_finite_output: - self.raise_if_not_finite(events) + + # insert leptons + events = select_leptons(events, {"rawDeepTau2018v2p5VSjet": empty_float}) # project into dataframe df = ak.to_dataframe({ @@ -157,14 +163,16 @@ def lepton_selection(events): "jet2_pt": select(events.Jet.pt, 1), "jet2_eta": select(events.Jet.eta, 1), "jet2_phi": select(events.Jet.phi, 1), - "lep1_pt": select(events.Lepton.pt, 1), - "lep1_phi": select(events.Lepton.phi, 1), - "lep1_eta": select(events.Lepton.eta, 1), - "lep1_charge": select(events.Lepton.charge, 1), - "lep2_pt": select(events.Lepton.pt, 2), - "lep2_phi": select(events.Lepton.phi, 2), - "lep2_eta": select(events.Lepton.eta, 2), - "lep2_charge": select(events.Lepton.charge, 2), + "lep1_pt": select(events.Lepton.pt, 0), + "lep1_phi": select(events.Lepton.phi, 0), + "lep1_eta": select(events.Lepton.eta, 0), + "lep1_charge": select(events.Lepton.charge, 0), + "lep1_deeptauvsjet": select(events.Lepton.rawDeepTau2018v2p5VSjet, 0), + "lep2_pt": select(events.Lepton.pt, 1), + "lep2_phi": select(events.Lepton.phi, 1), + "lep2_eta": select(events.Lepton.eta, 1), + "lep2_charge": select(events.Lepton.charge, 1), + "lep2_deeptauvsjet": select(events.Lepton.rawDeepTau2018v2p5VSjet, 1), # TODO: add additional variables })