-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathold_ml_model.py
635 lines (523 loc) · 22.6 KB
/
old_ml_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
# coding: utf-8
"""
First implementation of DNN for HH analysis, generalized (TODO)
"""
from __future__ import annotations
from typing import Any, Sequence
import gc
from time import time
import law
import order as od
from columnflow.ml import MLModel
from columnflow.util import maybe_import, dev_sandbox
from columnflow.columnar_util import Route, set_ak_column, remove_ak_column
from columnflow.tasks.selection import MergeSelectionStatsWrapper
from hbw.config.categories import add_categories_ml
np = maybe_import("numpy")
ak = maybe_import("awkward")
tf = maybe_import("tensorflow")
pickle = maybe_import("pickle")
keras = maybe_import("tensorflow.keras")
logger = law.logger.get_logger(__name__)
class SimpleDNN(MLModel):
def __init__(
self,
*args,
folds: int | None = None,
**kwargs,
):
"""
Parameters that need to be set by derived model:
folds, layers, learningrate, batchsize, epochs, eqweight, dropout,
processes, custom_procweights, dataset_names, input_features, store_name,
"""
single_config = True # noqa
super().__init__(*args, **kwargs)
# class- to instance-level attributes
# (before being set, self.folds refers to a class-level attribute)
self.folds = folds or self.folds
# DNN model parameters
"""
self.layers = [512, 512, 512]
self.learningrate = 0.00050
self.batchsize = 2048
self.epochs = 6 # 200
self.eqweight = 0.50
# Dropout: either False (disable) or a value between 0 and 1 (dropout_rate)
self.dropout = False
"""
def setup(self):
# dynamically add variables for the quantities produced by this model
for proc in self.processes:
if f"{self.cls_name}.score_{proc}" not in self.config_inst.variables:
self.config_inst.add_variable(
name=f"{self.cls_name}.score_{proc}",
null_value=-1,
binning=(40, 0., 1.),
x_title=f"DNN output score {self.config_inst.get_process(proc).x.ml_label}",
)
hh_bins = [0.0, .4, .45, .5, .55, .6, .65, .7, .75, .8, .85, .92, 1.0]
bkg_bins = [0.0, 0.4, 0.7, 1.0]
self.config_inst.add_variable(
name=f"{self.cls_name}.score_{proc}_rebin1",
expression=f"{self.cls_name}.score_{proc}",
null_value=-1,
binning=hh_bins if "HH" in proc else bkg_bins,
x_title=f"DNN output score {self.config_inst.get_process(proc).x.ml_label}",
)
# one variable to bookkeep truth labels
# TODO: still needs implementation
if f"{self.cls_name}.ml_label" not in self.config_inst.variables:
self.config_inst.add_variable(
name=f"{self.cls_name}.ml_label",
null_value=-1,
binning=(len(self.processes) + 1, -1.5, len(self.processes) - 0.5),
x_title="DNN truth score",
)
# dynamically add ml categories (but only if production categories have been added)
if (
self.config_inst.x("add_categories_ml", True) and
not self.config_inst.x("add_categories_production", True)
):
add_categories_ml(self.config_inst, ml_model_inst=self)
self.config_inst.x.add_categories_ml = False
def requires(self, task: law.Task) -> str:
# add selection stats to requires; NOTE: not really used at the moment
return MergeSelectionStatsWrapper.req(
task,
shifts="nominal",
configs=self.config_inst.name,
datasets=self.dataset_names,
)
def sandbox(self, task: law.Task) -> str:
return dev_sandbox("bash::$CF_BASE/sandboxes/venv_ml_tf.sh")
def datasets(self, config_inst: od.Config) -> set[od.Dataset]:
return {config_inst.get_dataset(dataset_name) for dataset_name in self.dataset_names}
def uses(self, config_inst: od.Config) -> set[Route | str]:
return {"normalization_weight", "category_ids"} | set(self.input_features)
def produces(self, config_inst: od.Config) -> set[Route | str]:
produced = set()
for proc in self.processes:
produced.add(f"{self.cls_name}.score_{proc}")
produced.add("category_ids")
return produced
def output(self, task: law.Task) -> law.FileSystemDirectoryTarget:
return task.target(f"mlmodel_f{task.branch}of{self.folds}", dir=True)
def open_model(self, target: law.LocalDirectoryTarget) -> tf.keras.models.Model:
# return target.load(formatter="keras_model")
with open(f"{target.path}/model_history.pkl", "rb") as f:
history = pickle.load(f)
model = tf.keras.models.load_model(target.path)
return model, history
def training_configs(self, requested_configs: Sequence[str]) -> list[str]:
# default config
if len(requested_configs) == 1:
return list(requested_configs)
else:
return ["c17"]
def training_calibrators(self, config_inst: od.Config, requested_calibrators: Sequence[str]) -> list[str]:
# fix MLTraining Phase Space
return ["skip_jecunc"]
def training_selector(self, config_inst: od.Config, requested_selector: str) -> str:
# fix MLTraining Phase Space
return "sl"
def training_producers(self, config_inst: od.Config, requested_producers: Sequence[str]) -> list[str]:
# fix MLTraining Phase Space
return ["ml_inputs"]
def prepare_inputs(
self,
task,
input,
) -> dict[str, np.array]:
# max_events_per_fold = int(self.max_events / (self.folds - 1))
process_insts = [self.config_inst.get_process(proc) for proc in self.processes]
N_events_processes = np.array(len(self.processes) * [0])
custom_procweights = np.array(len(self.processes) * [0])
sum_eventweights_processes = np.array(len(self.processes) * [0])
dataset_proc_idx = {} # bookkeeping which process each dataset belongs to
#
# determine process of each dataset and count number of events & sum of eventweights for this process
#
for dataset, files in input["events"][self.config_inst.name].items():
t0 = time()
dataset_inst = self.config_inst.get_dataset(dataset)
if len(dataset_inst.processes) != 1:
raise Exception("only 1 process inst is expected for each dataset")
# TODO: use stats here instead
N_events = sum([len(ak.from_parquet(inp["mlevents"].fn)) for inp in files])
# NOTE: this only works as long as each dataset only contains one process
sum_eventweights = sum([
ak.sum(ak.from_parquet(inp["mlevents"].fn).normalization_weight)
for inp in files],
)
for i, proc in enumerate(process_insts):
custom_procweights[i] = self.custom_procweights[proc.name]
leaf_procs = [p for p, _, _ in self.config_inst.get_process(proc).walk_processes(include_self=True)]
if dataset_inst.processes.get_first() in leaf_procs:
logger.info(f"the dataset *{dataset}* is used for training the *{proc.name}* output node")
dataset_proc_idx[dataset] = i
N_events_processes[i] += N_events
sum_eventweights_processes[i] += sum_eventweights
continue
if dataset_proc_idx.get(dataset, -1) == -1:
raise Exception(f"dataset {dataset} is not matched to any of the given processes")
logger.info(f"Weights done for {dataset} in {(time() - t0):.3f}s")
# Number to scale weights such that the largest weights are at the order of 1
# (only implemented for eqweight = True)
weights_scaler = min(N_events_processes / custom_procweights)
#
# set inputs, weights and targets for each datset and fold
#
DNN_inputs = {
"weights": None,
"inputs": None,
"target": None,
}
sum_nnweights_processes = {}
for dataset, files in input["events"][self.config_inst.name].items():
t0 = time()
this_proc_idx = dataset_proc_idx[dataset]
proc_name = self.processes[this_proc_idx]
N_events_proc = N_events_processes[this_proc_idx]
sum_eventweights_proc = sum_eventweights_processes[this_proc_idx]
logger.info(
f"dataset: {dataset}, \n #Events: {N_events_proc}, "
f"\n Sum Eventweights: {sum_eventweights_proc}",
)
sum_nnweights = 0
for inp in files:
events = ak.from_parquet(inp["mlevents"].path)
weights = events.normalization_weight
if self.eqweight:
weights = weights * weights_scaler / sum_eventweights_proc
custom_procweight = self.custom_procweights[proc_name]
weights = weights * custom_procweight
weights = ak.to_numpy(weights)
if np.any(~np.isfinite(weights)):
raise Exception(f"Infinite values found in weights from dataset {dataset}")
sum_nnweights += sum(weights)
sum_nnweights_processes.setdefault(proc_name, 0)
sum_nnweights_processes[proc_name] += sum(weights)
# remove columns not used in training
for var in events.fields:
if var not in self.input_features:
events = remove_ak_column(events, var)
# transform events into numpy ndarray
# TODO: at this point we should save the order of our input variables
# to ensure that they will be loaded in the correct order when
# doing the evaluation
events = ak.to_numpy(events)
events = events.astype(
[(name, np.float32) for name in events.dtype.names], copy=False,
).view(np.float32).reshape((-1, len(events.dtype)))
if np.any(~np.isfinite(events)):
raise Exception(f"Infinite values found in inputs from dataset {dataset}")
# create the truth values for the output layer
target = np.zeros((len(events), len(self.processes)))
target[:, this_proc_idx] = 1
if np.any(~np.isfinite(target)):
raise Exception(f"Infinite values found in target from dataset {dataset}")
if DNN_inputs["weights"] is None:
DNN_inputs["weights"] = weights
DNN_inputs["inputs"] = events
DNN_inputs["target"] = target
else:
DNN_inputs["weights"] = np.concatenate([DNN_inputs["weights"], weights])
DNN_inputs["inputs"] = np.concatenate([DNN_inputs["inputs"], events])
DNN_inputs["target"] = np.concatenate([DNN_inputs["target"], target])
logger.debug(f" weights: {weights[:5]}")
logger.debug(f" Sum NN weights: {sum_nnweights}")
logger.info(f"Inputs done for {dataset} in {(time() - t0):.3f}s")
logger.info(f"Sum of weights per process: {sum_nnweights_processes}")
#
# shuffle events and split into train and validation fold
#
inputs_size = sum([arr.size * arr.itemsize for arr in DNN_inputs.values()])
logger.info(f"inputs size is {inputs_size / 1024**3} GB")
shuffle_indices = np.array(range(len(DNN_inputs["weights"])))
np.random.shuffle(shuffle_indices)
validation_fraction = 0.25
N_validation_events = int(validation_fraction * len(DNN_inputs["weights"]))
train, validation = {}, {}
for k in DNN_inputs.keys():
DNN_inputs[k] = DNN_inputs[k][shuffle_indices]
validation[k] = DNN_inputs[k][:N_validation_events]
train[k] = DNN_inputs[k][N_validation_events:]
return train, validation
def train(
self,
task: law.Task,
input: Any,
output: law.LocalDirectoryTarget,
) -> ak.Array:
# np.random.seed(1337) # for reproducibility
physical_devices = tf.config.list_physical_devices("GPU")
try:
tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
# Invalid device or cannot modify virtual devices once initialized.
pass
#
# input preparation
#
train, validation = self.prepare_inputs(task, input)
# check for infinite values
for key in train.keys():
if np.any(~np.isfinite(train[key])):
raise Exception(f"Infinite values found in training {key}")
if np.any(~np.isfinite(validation[key])):
raise Exception(f"Infinite values found in validation {key}")
gc.collect()
logger.info("garbage collected")
#
# model preparation
#
n_inputs = len(self.input_features)
n_outputs = len(self.processes)
from keras.models import Sequential
from keras.layers import Dense, BatchNormalization
# define the DNN model
# TODO: do this Funcional instead of Sequential
model = Sequential()
# BatchNormalization layer with input shape
model.add(BatchNormalization(input_shape=(n_inputs,)))
activation_settings = {
"elu": ("ELU", "he_uniform", "Dropout"),
"relu": ("ReLU", "he_uniform", "Dropout"),
"prelu": ("PReLU", "he_normal", "Dropout"),
"selu": ("selu", "lecun_normal", "AlphaDropout"),
"tanh": ("tanh", "glorot_normal", "Dropout"),
"softmax": ("softmax", "glorot_normal", "Dropout"),
}
keras_act_name, init_name, dropout_layer = activation_settings[self.activation]
# following hidden layers
for n_nodes in self.layers:
model.add(Dense(
units=n_nodes,
activation=keras_act_name,
))
# Potentially add dropout layer after each hidden layer
if self.dropout:
Dropout = getattr(keras.layers, dropout_layer)
model.add(Dropout(self.dropout))
# output layer
model.add(Dense(n_outputs, activation="softmax"))
# compile the network
optimizer = keras.optimizers.Adam(
learning_rate=self.learningrate, beta_1=0.9, beta_2=0.999,
epsilon=1e-6, amsgrad=False,
)
model.compile(
loss="categorical_crossentropy",
optimizer=optimizer,
weighted_metrics=["categorical_accuracy"],
)
#
# training
#
# early stopping to determine the 'best' model
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor="val_loss",
min_delta=0,
patience=int(self.epochs / 4),
verbose=0,
mode="auto",
baseline=None,
restore_best_weights=True,
start_from_epoch=0,
)
logger.info("input to tf Dataset")
with tf.device("CPU"):
tf_train = tf.data.Dataset.from_tensor_slices(
(train["inputs"], train["target"], train["weights"]),
).batch(self.batchsize)
tf_validate = tf.data.Dataset.from_tensor_slices(
(validation["inputs"], validation["target"], validation["weights"]),
).batch(self.batchsize)
fit_kwargs = {
"epochs": self.epochs,
"callbacks": [early_stopping],
"verbose": 2,
}
# train the model
logger.info("Start training...")
model.fit(
tf_train,
validation_data=tf_validate,
**fit_kwargs,
)
# save the model and history; TODO: use formatter
# output.dump(model, formatter="tf_keras_model")
output.parent.touch()
model.save(output.path)
with open(f"{output.path}/model_history.pkl", "wb") as f:
pickle.dump(model.history.history, f)
def evaluate(
self,
task: law.Task,
events: ak.Array,
models: list(Any),
fold_indices: ak.Array,
events_used_in_training: bool = True,
) -> None:
logger.info(f"Evaluation of dataset {task.dataset}")
models, history = zip(*models)
# TODO: use history for loss+acc plotting
# create a copy of the inputs to use for evaluation
inputs = ak.copy(events)
# remove columns not used in training
for var in inputs.fields:
if var not in self.input_features:
inputs = remove_ak_column(inputs, var)
# transform inputs into numpy ndarray
inputs = ak.to_numpy(inputs)
inputs = inputs.astype(
[(name, np.float32) for name in inputs.dtype.names], copy=False,
).view(np.float32).reshape((-1, len(inputs.dtype)))
# do prediction for all models and all inputs
predictions = []
for i, model in enumerate(models):
pred = ak.from_numpy(model.predict_on_batch(inputs))
if len(pred[0]) != len(self.processes):
raise Exception("Number of output nodes should be equal to number of processes")
predictions.append(pred)
# Save predictions for each model
# TODO: create train/val/test plots (confusion, ROC, nodes) using these predictions?
"""
for j, proc in enumerate(self.processes):
events = set_ak_column(
events, f"{self.cls_name}.fold{i}_score_{proc}", pred[:, j],
)
"""
# combine all models into 1 output score, using the model that has not seen test set yet
outputs = ak.where(ak.ones_like(predictions[0]), -1, -1)
for i in range(self.folds):
logger.info(f"Evaluation fold {i}")
# reshape mask from N*bool to N*k*bool (TODO: simpler way?)
idx = ak.to_regular(ak.concatenate([ak.singletons(fold_indices == i)] * len(self.processes), axis=1))
outputs = ak.where(idx, predictions[i], outputs)
if len(outputs[0]) != len(self.processes):
raise Exception("Number of output nodes should be equal to number of processes")
for i, proc in enumerate(self.processes):
events = set_ak_column(
events, f"{self.cls_name}.score_{proc}", outputs[:, i],
)
# ML categorization on top of existing categories
ml_categories = [cat for cat in self.config_inst.categories if "ml_" in cat.name]
ml_proc_to_id = {cat.name.replace("ml_", ""): cat.id for cat in ml_categories}
scores = ak.Array({
f.replace("score_", ""): events[self.cls_name, f]
for f in events[self.cls_name].fields if f.startswith("score_")
})
ml_category_ids = max_score = ak.Array(np.zeros(len(events)))
for proc in scores.fields:
ml_category_ids = ak.where(scores[proc] > max_score, ml_proc_to_id[proc], ml_category_ids)
max_score = ak.where(scores[proc] > max_score, scores[proc], max_score)
category_ids = ak.where(
events.category_ids != 1, # Do not split Inclusive category into DNN sub-categories
events.category_ids + ak.values_astype(ml_category_ids, np.int32),
events.category_ids,
)
events = set_ak_column(events, "category_ids", category_ids)
return events
#
# deriving of usable ml models
#
processes = [
"hh_ggf_hbb_hvvqqlnu_kl1_kt1",
"tt",
"st",
"w_lnu",
"dy",
]
custom_procweights = {
"hh_ggf_hbb_hvvqqlnu_kl1_kt1": 1,
"tt": 8,
"st": 8,
"w_lnu": 8,
"dy": 8,
}
dataset_names = {
"hh_ggf_hbb_hvvqqlnu_kl1_kt1_powheg",
# TTbar
"tt_sl_powheg",
"tt_dl_powheg",
"tt_fh_powheg",
# SingleTop
"st_tchannel_t_powheg",
"st_tchannel_tbar_powheg",
"st_twchannel_t_powheg",
"st_twchannel_tbar_powheg",
"st_schannel_lep_amcatnlo",
# "st_schannel_had_amcatnlo",
# WJets
"w_lnu_ht70To100_madgraph",
"w_lnu_ht100To200_madgraph",
"w_lnu_ht200To400_madgraph",
"w_lnu_ht400To600_madgraph",
"w_lnu_ht600To800_madgraph",
"w_lnu_ht800To1200_madgraph",
"w_lnu_ht1200To2500_madgraph",
"w_lnu_ht2500_madgraph",
# DY
"dy_m50toinf_ht70to100_madgraph",
"dy_m50toinf_ht100to200_madgraph",
"dy_m50toinf_ht200to400_madgraph",
"dy_m50toinf_ht400to600_madgraph",
"dy_m50toinf_ht600to800_madgraph",
"dy_m50toinf_ht800to1200_madgraph",
"dy_m50toinf_ht1200to2500_madgraph",
"dy_m50toinf_ht2500_madgraph",
}
input_features = [
# "ht",
# "m_bb",
# "deltaR_bb",
"mli_ht", "mli_n_jet", "mli_n_deepjet",
# "mli_deepjetsum", "mli_b_deepjetsum", "mli_l_deepjetsum",
"mli_dr_bb", "mli_dphi_bb", "mli_mbb", "mli_mindr_lb", "mli_dr_jj", "mli_dphi_jj", "mli_mjj", "mli_mindr_lj",
"mli_dphi_lnu", "mli_mlnu", "mli_mjjlnu", "mli_mjjl", "mli_dphi_bb_jjlnu", "mli_dr_bb_jjlnu",
"mli_dphi_bb_jjl", "mli_dr_bb_jjl", "mli_dphi_bb_nu", "mli_dphi_jj_nu", "mli_dr_bb_l", "mli_dr_jj_l",
"mli_mbbjjlnu", "mli_mbbjjl", "mli_s_min",
] + [
f"mli_{obj}_{var}"
for obj in ["b1", "b2", "j1", "j2", "lep", "met"]
for var in ["pt", "eta"]
] + [
f"mli_{obj}_{var}"
for obj in ["fj"]
for var in ["pt", "eta", "phi", "mass", "msoftdrop", "deepTagMD_HbbvsQCD"]
]
default_cls_dict = {
"folds": 5,
# "max_events": 10**6, # TODO
"layers": [512, 512, 512],
"activation": "relu", # Options: elu, relu, prelu, selu, tanh, softmax
"learningrate": 0.00050,
"batchsize": 131072,
"epochs": 200,
"eqweight": True,
"dropout": 0.50,
"processes": processes,
"custom_procweights": custom_procweights,
"dataset_names": dataset_names,
"input_features": input_features,
"store_name": "inputs1",
}
# derived model, usable on command line
default_dnn = SimpleDNN.derive("default", cls_dict=default_cls_dict)
# test model settings
cls_dict = default_cls_dict
cls_dict["epochs"] = 6
cls_dict["batchsize"] = 2048
cls_dict["processes"] = [
"hh_ggf_hbb_hvvqqlnu_kl1_kt1",
"st",
"w_lnu",
]
cls_dict["dataset_names"] = {
"hh_ggf_hbb_hvvqqlnu_kl1_kt1_powheg",
"st_tchannel_t_powheg",
"w_lnu_ht400To600_madgraph",
}
test_dnn = SimpleDNN.derive("test", cls_dict=cls_dict)