-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathDataSplitter_global_timestamp.py
439 lines (275 loc) · 18.3 KB
/
DataSplitter_global_timestamp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: Maurizio Ferrari Dacrema and Sujay Khandagale
"""
from matplotlib import use
import scipy.sparse as sps
import numpy as np
import os
from Base.DataIO import DataIO
from Data_manager.DataSplitter import DataSplitter as _DataSplitter
from Data_manager.DataReader import DataReader as _DataReader
from Data_manager.DataReader_utils import compute_density, reconcile_mapper_with_removed_tokens
from Data_manager.split_functions.split_train_validation_leave_k_out import split_train_leave_k_out_user_wise
from Data_manager.split_functions.split_data_on_timestamp import split_data_on_timestamp
from Data_manager.split_functions.split_data_on_global_timestamp import split_data_on_global_timestamp
from Data_manager.data_consistency_check import assert_disjoint_matrices, assert_URM_ICM_mapper_consistency
class DataSplitter_global_timestamp(_DataSplitter):
"""
The splitter tries to load from the specific folder related to a dataset, a split in the format corresponding to
the splitter class. Basically each split is in a different subfolder
- The "original" subfolder contains the whole dataset, is composed by a single URM with all data and may contain
ICMs as well, either one or many, depending on the dataset
- The other subfolders "warm", "cold" ecc contains the splitted data.
The dataReader class involvement is limited to the following cased:
- At first the dataSplitter tries to load from the subfolder corresponding to that split. Say "warm"
- If the dataReader is succesful in loading the files, then a split already exists and the loading is complete
- If the dataReader raises a FileNotFoundException, then no split is available.
- The dataSplitter then creates a new instance of dataReader using default parameters, so that the original data will be loaded
- At this point the chosen dataSplitter takes the URM_all and selected ICM to perform the split
- The dataSplitter saves the splitted data in the appropriate subfolder.
- Finally, the dataReader is instantiated again with the correct parameters, to load the data just saved
"""
"""
- It exposes the following functions
- load_data(save_folder_path = None, force_new_split = False) loads the data or creates a new split
"""
DATA_SPLITTER_NAME = "DataSplitter_global_timestamp"
SPLIT_URM_DICT = None
SPLIT_ICM_DICT = None
SPLIT_ICM_MAPPER_DICT = None
SPLIT_UCM_DICT = None
SPLIT_UCM_MAPPER_DICT = None
SPLIT_GLOBAL_MAPPER_DICT = None
def __init__(self, dataReader_object:_DataReader, k_out_percent = 10, forbid_new_split = False, force_new_split = False, use_validation_set = True, leave_random_out = True, folder=None, verbose=True):
"""
:param k_out_percent: Determines what percentile of timestamps go to test and eval. E.g when k_out_percent = 10, the 80th, 90th percentile timestamps are used to create train/val/test
:param dataReader_object:
:param n_folds:
:param force_new_split:
:param forbid_new_split:
"""
assert use_validation_set == True, "Not using val set not supported at the moment."
assert k_out_percent > 0 and k_out_percent < 49 # 2 * k_out_percent timestamps are reserved for test/eval
self.k_out_percent = k_out_percent
self.use_validation_set = use_validation_set
self.allow_cold_users = False
self.removed_cold_users = None
self.leave_random_out = leave_random_out
super(DataSplitter_global_timestamp, self).__init__(dataReader_object, forbid_new_split=forbid_new_split, force_new_split=force_new_split, folder=folder, verbose=verbose)
self._print("Cold users not allowed")
self.init_kwargs = {"k_out_percent": k_out_percent,
"forbid_new_split": forbid_new_split,
"force_new_split": force_new_split,
"use_validation_set": use_validation_set,
"leave_random_out": leave_random_out
}
def _get_split_subfolder_name(self):
"""
:return: warm_{n_folds}_fold/
"""
if self.leave_random_out:
order_suffix = "random"
else:
order_suffix = "last"
return "leave_{}_out_{}/".format(self.k_out_percent, order_suffix)
def get_statistics_URM(self):
self._assert_is_initialized()
n_users, n_items = self.SPLIT_URM_DICT["URM_train"].shape
statistics_string = "DataReader: {}\n" \
"\tNum items: {}\n" \
"\tNum users: {}\n" \
"\tTrain \t\tinteractions {}, \tdensity {:.2E}\n".format(
self.dataReader_object._get_dataset_name(),
n_items,
n_users,
self.SPLIT_URM_DICT["URM_train"].nnz, compute_density(self.SPLIT_URM_DICT["URM_train"]))
if self.use_validation_set:
statistics_string += "\tValidation \tinteractions {}, \tdensity {:.2E}\n".format(
self.SPLIT_URM_DICT["URM_validation"].nnz, compute_density(self.SPLIT_URM_DICT["URM_validation"]))
statistics_string += "\tTest \t\tinteractions {}, \tdensity {:.2E}\n".format(
self.SPLIT_URM_DICT["URM_test"].nnz, compute_density(self.SPLIT_URM_DICT["URM_test"]))
self._print(statistics_string)
self._print("\n")
def get_ICM_from_name(self, ICM_name):
return self.SPLIT_ICM_DICT[ICM_name].copy()
def get_statistics_ICM(self):
self._assert_is_initialized()
if len(self.dataReader_object.get_loaded_ICM_names())>0:
for ICM_name, ICM_object in self.SPLIT_ICM_DICT.items():
n_items, n_features = ICM_object.shape
statistics_string = "\tICM name: {}, Num features: {}, feature occurrences: {}, density {:.2E}".format(
ICM_name,
n_features,
ICM_object.nnz,
compute_density(ICM_object)
)
self._print(statistics_string)
self._print("\n")
def _assert_is_initialized(self):
assert self.SPLIT_URM_DICT is not None, "{}: Unable to load data split. The split has not been generated yet, call the load_data function to do so.".format(self.DATA_SPLITTER_NAME)
def get_holdout_split(self):
"""
The train set is defined as all data except the one of that fold, which is the test
:return: URM_train, URM_validation, URM_test
"""
self._assert_is_initialized()
if self.use_validation_set:
return self.SPLIT_URM_DICT["URM_train"].copy(),\
self.SPLIT_URM_DICT["URM_validation"].copy(),\
self.SPLIT_URM_DICT["URM_test"].copy()
return self.SPLIT_URM_DICT["URM_train"].copy(), self.SPLIT_URM_DICT["URM_test"].copy()
def _split_data_from_original_dataset(self, save_folder_path):
self.loaded_dataset = self.dataReader_object.load_data()
self._load_from_DataReader_ICM_and_mappers(self.loaded_dataset)
URM = self.loaded_dataset.get_URM_all()
URM = sps.csr_matrix(URM)
URM_timestamp = self.loaded_dataset.get_URM_timestamp()
split_number = 2
if self.use_validation_set:
split_number+=1
percentiles_to_compute = [100 - 2 * self.k_out_percent, 100 - self.k_out_percent]
ts_val, ts_test = np.percentile(sorted(URM_timestamp.data), percentiles_to_compute, interpolation='lower')
# if not self.allow_cold_users: # always satisfied
# user_to_preserve = np.array([any(URM_timestamp.data[URM.indptr[user_index]:URM.indptr[user_index+1]] < ts_val) for user_index in range(URM.shape[0])])
# self.removed_cold_users = np.logical_not(user_to_preserve)
# self._print("Removing {} ({:.2f} %) of {} users because they have no train interactions.".format(
# URM.shape[0] - user_to_preserve.sum(), (1-user_to_preserve.sum()/URM.shape[0])*100, URM.shape[0]))
# import pdb; pdb.set_trace()
# # print(sum([len(URM_timestamp.data[URM.indptr[user_index]+1:URM.indptr[user_index+1]+1]) == len(URM.data[URM.indptr[user_index]:URM.indptr[user_index+1]]) for user_index in range(URM.shape[0])]))
# URM = URM[user_to_preserve,:]
# URM_timestamp = URM_timestamp[user_to_preserve,:]
# self.SPLIT_GLOBAL_MAPPER_DICT["user_original_ID_to_index"] = reconcile_mapper_with_removed_tokens(self.SPLIT_GLOBAL_MAPPER_DICT["user_original_ID_to_index"],
# np.arange(0, len(self.removed_cold_users), dtype=np.int)[self.removed_cold_users])
# for UCM_name, UCM_object in self.SPLIT_UCM_DICT.items():
# UCM_object = UCM_object[user_to_preserve,:]
# self.SPLIT_UCM_DICT[UCM_name] = UCM_object
URM_train, URM_validation, URM_test, user_to_preserve = split_data_on_global_timestamp(URM, URM_timestamp, ts_val, ts_test)
if not self.allow_cold_users: # always satisfied
self.removed_cold_users = np.logical_not(user_to_preserve)
self.SPLIT_GLOBAL_MAPPER_DICT["user_original_ID_to_index"] = reconcile_mapper_with_removed_tokens(self.SPLIT_GLOBAL_MAPPER_DICT["user_original_ID_to_index"],
np.arange(0, len(self.removed_cold_users), dtype=np.int)[self.removed_cold_users])
for UCM_name, UCM_object in self.SPLIT_UCM_DICT.items():
UCM_object = UCM_object[user_to_preserve,:]
self.SPLIT_UCM_DICT[UCM_name] = UCM_object
self.SPLIT_URM_DICT = {
"URM_train": URM_train,
"URM_test": URM_test,
}
# ensure atleast 10 entries in train and test splits
assert URM_train.nnz > 10 and URM_test.nnz > 10, f"{URM_train.nnz} entries in train, {URM_test.nnz} entries in test splits"
if self.use_validation_set:
self.SPLIT_URM_DICT["URM_validation"] = URM_validation
self._save_split(save_folder_path)
self._print("Split complete")
def _save_split(self, save_folder_path):
if save_folder_path:
if not os.path.exists(save_folder_path):
os.makedirs(save_folder_path)
self.save_data_reader_splitter_class(save_folder_path)
if self.allow_cold_users:
allow_cold_users_suffix = "allow_cold_users"
else:
allow_cold_users_suffix = "only_warm_users"
if self.use_validation_set:
validation_set_suffix = "use_validation_set"
else:
validation_set_suffix = "no_validation_set"
name_suffix = "_{}_{}".format(allow_cold_users_suffix, validation_set_suffix)
split_parameters_dict = {"k_out_percent": self.k_out_percent,
"allow_cold_users": self.allow_cold_users,
"removed_cold_users": self.removed_cold_users,
}
dataIO = DataIO(folder_path = save_folder_path)
dataIO.save_data(data_dict_to_save = split_parameters_dict,
file_name = "split_parameters" + name_suffix)
dataIO.save_data(data_dict_to_save = self.SPLIT_GLOBAL_MAPPER_DICT,
file_name = "split_mappers" + name_suffix)
dataIO.save_data(data_dict_to_save = self.SPLIT_URM_DICT,
file_name = "split_URM" + name_suffix)
if len(self.SPLIT_ICM_DICT)>0:
dataIO.save_data(data_dict_to_save = self.SPLIT_ICM_DICT,
file_name = "split_ICM" + name_suffix)
dataIO.save_data(data_dict_to_save = self.SPLIT_ICM_MAPPER_DICT,
file_name = "split_ICM_mappers" + name_suffix)
if len(self.SPLIT_UCM_DICT)>0:
dataIO.save_data(data_dict_to_save = self.SPLIT_UCM_DICT,
file_name = "split_UCM" + name_suffix)
dataIO.save_data(data_dict_to_save = self.SPLIT_UCM_MAPPER_DICT,
file_name = "split_UCM_mappers" + name_suffix)
def _load_previously_built_split_and_attributes(self, save_folder_path):
"""
Loads all URM and ICM
:return:
"""
if self.use_validation_set:
validation_set_suffix = "use_validation_set"
else:
validation_set_suffix = "no_validation_set"
if self.allow_cold_users:
allow_cold_users_suffix = "allow_cold_users"
else:
allow_cold_users_suffix = "only_warm_users"
name_suffix = "_{}_{}".format(allow_cold_users_suffix, validation_set_suffix)
dataIO = DataIO(folder_path = save_folder_path)
split_parameters_dict = dataIO.load_data(file_name ="split_parameters" + name_suffix)
for attrib_name in split_parameters_dict.keys():
self.__setattr__(attrib_name, split_parameters_dict[attrib_name])
self.SPLIT_GLOBAL_MAPPER_DICT = dataIO.load_data(file_name ="split_mappers" + name_suffix)
self.SPLIT_URM_DICT = dataIO.load_data(file_name ="split_URM" + name_suffix)
if len(self.dataReader_object.get_loaded_ICM_names())>0:
self.SPLIT_ICM_DICT = dataIO.load_data(file_name ="split_ICM" + name_suffix)
self.SPLIT_ICM_MAPPER_DICT = dataIO.load_data(file_name ="split_ICM_mappers" + name_suffix)
if len(self.dataReader_object.get_loaded_UCM_names())>0:
self.SPLIT_UCM_DICT = dataIO.load_data(file_name ="split_UCM" + name_suffix)
self.SPLIT_UCM_MAPPER_DICT = dataIO.load_data(file_name ="split_UCM_mappers" + name_suffix)
#########################################################################################################
########## ##########
########## DATA CONSISTENCY ##########
########## ##########
#########################################################################################################
def _verify_data_consistency(self):
self._assert_is_initialized()
print_preamble = "{} consistency check: ".format(self.DATA_SPLITTER_NAME)
URM_to_load_list = ["URM_train", "URM_test"]
if self.use_validation_set:
URM_to_load_list.append("URM_validation")
assert len(self.SPLIT_URM_DICT) == len(URM_to_load_list),\
print_preamble + "The available URM are not as many as they are supposed to be. URMs are {}, expected URMs are {}".format(len(self.SPLIT_URM_DICT), len(URM_to_load_list))
assert all(URM_name in self.SPLIT_URM_DICT for URM_name in URM_to_load_list), print_preamble + "Not all URMs have been created"
assert all(URM_name in URM_to_load_list for URM_name in self.SPLIT_URM_DICT.keys()), print_preamble + "The split contains URMs that should not exist"
URM_shape = None
for URM_name, URM_object in self.SPLIT_URM_DICT.items():
if URM_shape is None:
URM_shape = URM_object.shape
n_users, n_items = URM_shape
assert n_users != 0, print_preamble + "Number of users in URM is 0"
assert n_items != 0, print_preamble + "Number of items in URM is 0"
assert URM_shape == URM_object.shape, print_preamble + "URM shape is inconsistent"
assert self.SPLIT_URM_DICT["URM_train"].nnz != 0, print_preamble + "Number of interactions in URM Train is 0"
assert self.SPLIT_URM_DICT["URM_test"].nnz != 0, print_preamble + "Number of interactions in URM Test is 0"
URM = self.SPLIT_URM_DICT["URM_test"].copy()
user_interactions = np.ediff1d(sps.csr_matrix(URM).indptr)
# assert np.all(user_interactions == self.k_out_value), print_preamble + "Not all users have the desired number of interactions in URM_test, {} users out of {}".format(
# (user_interactions != self.k_out_value).sum(), n_users)
if self.use_validation_set:
assert self.SPLIT_URM_DICT["URM_validation"].nnz != 0, print_preamble + "Number of interactions in URM Validation is 0"
URM = self.SPLIT_URM_DICT["URM_validation"].copy()
user_interactions = np.ediff1d(sps.csr_matrix(URM).indptr)
# assert np.all(user_interactions == self.k_out_value), print_preamble + "Not all users have the desired number of interactions in URM_validation, {} users out of {}".format(
# (user_interactions != self.k_out_value).sum(), n_users)
URM = self.SPLIT_URM_DICT["URM_train"].copy()
user_interactions = np.ediff1d(sps.csr_matrix(URM).indptr)
if not self.allow_cold_users:
assert np.all(user_interactions != 0), print_preamble + "Cold users exist despite not being allowed as per DataSplitter parameters, {} users out of {}".format(
(user_interactions == 0).sum(), n_users)
assert assert_disjoint_matrices(list(self.SPLIT_URM_DICT.values()))
assert_URM_ICM_mapper_consistency(URM_DICT = self.SPLIT_URM_DICT,
user_original_ID_to_index=self.SPLIT_GLOBAL_MAPPER_DICT["user_original_ID_to_index"],
item_original_ID_to_index=self.SPLIT_GLOBAL_MAPPER_DICT["item_original_ID_to_index"],
ICM_DICT = self.SPLIT_ICM_DICT,
ICM_MAPPER_DICT = self.SPLIT_ICM_MAPPER_DICT,
UCM_DICT = self.SPLIT_UCM_DICT,
UCM_MAPPER_DICT = self.SPLIT_UCM_MAPPER_DICT,
DATA_SPLITTER_NAME = self.DATA_SPLITTER_NAME)