|
| 1 | +import numpy as np |
| 2 | +import scipy.stats as sps |
| 3 | +import torch |
| 4 | + |
| 5 | +from sklearn.svm import LinearSVC |
| 6 | +from sklearn.decomposition import PCA |
| 7 | +from sklearn.decomposition import TruncatedSVD |
| 8 | +from sklearn.linear_model import SGDClassifier |
| 9 | +from sklearn.svm import LinearSVC, SVC |
| 10 | +from sklearn.tree import DecisionTreeClassifier |
| 11 | +from sklearn.ensemble import RandomForestClassifier |
| 12 | + |
| 13 | +import functional as F |
| 14 | +import utils |
| 15 | + |
| 16 | + |
| 17 | + |
| 18 | +def evaluate(eval_dir, method, train_features, train_labels, test_features, test_labels, **kwargs): |
| 19 | + if method == 'svm': |
| 20 | + acc_train, acc_test = svm(train_features, train_labels, test_features, test_labels) |
| 21 | + elif method == 'knn': |
| 22 | + acc_train, acc_test = knn(train_features, train_labels, test_features, test_labels, **kwargs) |
| 23 | + elif method == 'nearsub': |
| 24 | + acc_train, acc_test = nearsub(train_features, train_labels, test_features, test_labels, **kwargs) |
| 25 | + elif method == 'nearsub_pca': |
| 26 | + acc_train, acc_test = knn(train_features, train_labels, test_features, test_labels, **kwargs) |
| 27 | + acc_dict = {'train': acc_train, 'test': acc_test} |
| 28 | + utils.save_params(eval_dir, acc_dict, name=f'acc_{method}') |
| 29 | + |
| 30 | +def svm(train_features, train_labels, test_features, test_labels): |
| 31 | + svm = LinearSVC(verbose=0, random_state=10) |
| 32 | + svm.fit(train_features, train_labels) |
| 33 | + acc_train = svm.score(train_features, train_labels) |
| 34 | + acc_test = svm.score(test_features, test_labels) |
| 35 | + print("SVM: {}, {}".format(acc_train, acc_test)) |
| 36 | + return acc_train, acc_test |
| 37 | + |
| 38 | +# def knn(train_features, train_labels, test_features, test_labels, k=5): |
| 39 | +# sim_mat = train_features @ train_features.T |
| 40 | +# topk = torch.from_numpy(sim_mat).topk(k=k, dim=0) |
| 41 | +# topk_pred = train_labels[topk.indices] |
| 42 | +# test_pred = torch.tensor(topk_pred).mode(0).values.detach() |
| 43 | +# acc_train = compute_accuracy(test_pred.numpy(), train_labels) |
| 44 | + |
| 45 | +# sim_mat = train_features @ test_features.T |
| 46 | +# topk = torch.from_numpy(sim_mat).topk(k=k, dim=0) |
| 47 | +# topk_pred = train_labels[topk.indices] |
| 48 | +# test_pred = torch.tensor(topk_pred).mode(0).values.detach() |
| 49 | +# acc_test = compute_accuracy(test_pred.numpy(), test_labels) |
| 50 | +# print("kNN: {}, {}".format(acc_train, acc_test)) |
| 51 | +# return acc_train, acc_test |
| 52 | + |
| 53 | +def knn(train_features, train_labels, test_features, test_labels, k=5): |
| 54 | + sim_mat = train_features @ train_features.T |
| 55 | + topk = sim_mat.topk(k=k, dim=0) |
| 56 | + topk_pred = train_labels[topk.indices] |
| 57 | + test_pred = topk_pred.mode(0).values.detach() |
| 58 | + acc_train = compute_accuracy(test_pred, train_labels) |
| 59 | + |
| 60 | + sim_mat = train_features @ test_features.T |
| 61 | + topk = sim_mat.topk(k=k, dim=0) |
| 62 | + topk_pred = train_labels[topk.indices] |
| 63 | + test_pred = topk_pred.mode(0).values.detach() |
| 64 | + acc_test = compute_accuracy(test_pred, test_labels) |
| 65 | + print("kNN: {}, {}".format(acc_train, acc_test)) |
| 66 | + return acc_train, acc_test |
| 67 | + |
| 68 | +# # TODO: 1. implement pytorch version 2. suport batches |
| 69 | +# def nearsub(train_features, train_labels, test_features, test_labels, num_classes, n_comp=10, return_pred=False): |
| 70 | +# train_scores, test_scores = [], [] |
| 71 | +# classes = np.arange(num_classes) |
| 72 | +# features_sort, _ = utils.sort_dataset(train_features, train_labels, |
| 73 | +# classes=classes, stack=False) |
| 74 | +# fd = features_sort[0].shape[1] |
| 75 | +# if n_comp >= fd: |
| 76 | +# n_comp = fd - 1 |
| 77 | +# for j in classes: |
| 78 | +# svd = TruncatedSVD(n_components=n_comp).fit(features_sort[j]) |
| 79 | +# subspace_j = np.eye(fd) - svd.components_.T @ svd.components_ |
| 80 | +# train_j = subspace_j @ train_features.T |
| 81 | +# test_j = subspace_j @ test_features.T |
| 82 | +# train_scores_j = np.linalg.norm(train_j, ord=2, axis=0) |
| 83 | +# test_scores_j = np.linalg.norm(test_j, ord=2, axis=0) |
| 84 | +# train_scores.append(train_scores_j) |
| 85 | +# test_scores.append(test_scores_j) |
| 86 | +# train_pred = np.argmin(train_scores, axis=0) |
| 87 | +# test_pred = np.argmin(test_scores, axis=0) |
| 88 | +# if return_pred: |
| 89 | +# return train_pred.tolist(), test_pred.tolist() |
| 90 | +# train_acc = compute_accuracy(classes[train_pred], train_labels) |
| 91 | +# test_acc = compute_accuracy(classes[test_pred], test_labels) |
| 92 | +# print('SVD: {}, {}'.format(train_acc, test_acc)) |
| 93 | +# return train_acc, test_acc |
| 94 | + |
| 95 | +def nearsub(train_features, train_labels, test_features, test_labels, |
| 96 | + num_classes, n_comp=10, return_pred=False): |
| 97 | + train_scores, test_scores = [], [] |
| 98 | + classes = np.arange(num_classes) |
| 99 | + features_sort, _ = utils.sort_dataset(train_features, train_labels, |
| 100 | + classes=classes, stack=False) |
| 101 | + fd = features_sort[0].shape[1] |
| 102 | + for j in classes: |
| 103 | + _, _, V = torch.svd(features_sort[j]) |
| 104 | + components = V[:, :n_comp].T |
| 105 | + subspace_j = torch.eye(fd) - components.T @ components |
| 106 | + train_j = subspace_j @ train_features.T |
| 107 | + test_j = subspace_j @ test_features.T |
| 108 | + train_scores_j = torch.linalg.norm(train_j, ord=2, axis=0) |
| 109 | + test_scores_j = torch.linalg.norm(test_j, ord=2, axis=0) |
| 110 | + train_scores.append(train_scores_j) |
| 111 | + test_scores.append(test_scores_j) |
| 112 | + train_pred = torch.stack(train_scores).argmin(0) |
| 113 | + test_pred = torch.stack(test_scores).argmin(0) |
| 114 | + if return_pred: |
| 115 | + return train_pred.numpy(), test_pred.numpy() |
| 116 | + train_acc = compute_accuracy(classes[train_pred], train_labels.numpy()) |
| 117 | + test_acc = compute_accuracy(classes[test_pred], test_labels.numpy()) |
| 118 | + print('SVD: {}, {}'.format(train_acc, test_acc)) |
| 119 | + return train_acc, test_acc |
| 120 | + |
| 121 | +def nearsub_pca(train_features, train_labels, test_features, test_labels, num_classes, n_comp=10): |
| 122 | + scores_pca = [] |
| 123 | + classes = np.arange(num_classes) |
| 124 | + features_sort, _ = utils.sort_dataset(train_features, train_labels, classes=classes, stack=False) |
| 125 | + fd = features_sort[0].shape[1] |
| 126 | + if n_comp >= fd: |
| 127 | + n_comp = fd - 1 |
| 128 | + for j in np.arange(len(classes)): |
| 129 | + pca = PCA(n_components=n_comp).fit(features_sort[j]) |
| 130 | + pca_subspace = pca.components_.T |
| 131 | + mean = np.mean(features_sort[j], axis=0) |
| 132 | + pca_j = (np.eye(fd) - pca_subspace @ pca_subspace.T) \ |
| 133 | + @ (test_features - mean).T |
| 134 | + score_pca_j = np.linalg.norm(pca_j, ord=2, axis=0) |
| 135 | + scores_pca.append(score_pca_j) |
| 136 | + test_predict_pca = np.argmin(scores_pca, axis=0) |
| 137 | + acc_pca = compute_accuracy(classes[test_predict_pca], test_labels) |
| 138 | + print('PCA: {}'.format(acc_pca)) |
| 139 | + return acc_pca |
| 140 | + |
| 141 | +def argmax(train_features, train_labels, test_features, test_labels): |
| 142 | + train_pred = train_features.argmax(1) |
| 143 | + train_acc = compute_accuracy(train_pred, train_labels) |
| 144 | + test_pred = test_features.argmax(1) |
| 145 | + test_acc = compute_accuracy(test_pred, test_labels) |
| 146 | + return train_acc, test_acc |
| 147 | + |
| 148 | +def compute_accuracy(y_pred, y_true): |
| 149 | + """Compute accuracy by counting correct classification. """ |
| 150 | + assert y_pred.shape == y_true.shape |
| 151 | + if type(y_pred) == torch.Tensor: |
| 152 | + n_wrong = torch.count_nonzero(y_pred - y_true).item() |
| 153 | + elif type(y_pred) == np.ndarray: |
| 154 | + n_wrong = np.count_nonzero(y_pred - y_true) |
| 155 | + else: |
| 156 | + raise TypeError("Not Tensor nor Array type.") |
| 157 | + n_samples = len(y_pred) |
| 158 | + return 1 - n_wrong / n_samples |
| 159 | + |
| 160 | +def baseline(train_features, train_labels, test_features, test_labels): |
| 161 | + test_models = {'log_l2': SGDClassifier(loss='log', max_iter=10000, random_state=42), |
| 162 | + 'SVM_linear': LinearSVC(max_iter=10000, random_state=42), |
| 163 | + 'SVM_RBF': SVC(kernel='rbf', random_state=42), |
| 164 | + 'DecisionTree': DecisionTreeClassifier(), |
| 165 | + 'RandomForrest': RandomForestClassifier()} |
| 166 | + for model_name in test_models: |
| 167 | + test_model = test_models[model_name] |
| 168 | + test_model.fit(train_features, train_labels) |
| 169 | + score = test_model.score(test_features, test_labels) |
| 170 | + print(f"{model_name}: {score}") |
| 171 | + |
| 172 | +def majority_vote(pred, true): |
| 173 | + pred_majority = sps.mode(pred, axis=0)[0].squeeze() |
| 174 | + return compute_accuracy(pred_majority, true) |
0 commit comments