-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
executable file
·129 lines (106 loc) · 4.56 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# UTILS FILE
import keras
from keras.applications.densenet import DenseNet121
from keras.models import Model
from keras.layers import Dense, Activation, Flatten, Dropout, BatchNormalization, GlobalAveragePooling2D
from keras.callbacks import ModelCheckpoint, CSVLogger, LearningRateScheduler, ReduceLROnPlateau, EarlyStopping, TensorBoard
from keras import backend as K
from keras.preprocessing import image
from keras.preprocessing.image import ImageDataGenerator
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import time
import cv2
import pickle
# For part 2
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
import lifelines
IMAGE_DIR = "nih_new/images-small/"
def get_mean_std_per_batch(df, H=320, W=320):
sample_data = []
for idx, img in enumerate(df.sample(100)["Image"].values):
path = IMAGE_DIR + img
sample_data.append(np.array(image.load_img(path, target_size=(H, W))))
mean = np.mean(sample_data[0])
std = np.std(sample_data[0])
return mean, std
def load_image_normalize(path, mean, std, H=320, W=320):
x = image.load_img(path, target_size=(H, W))
x -= mean
x /= std
x = np.expand_dims(x, axis=0)
return x
def load_image(path, df, preprocess=True, H = 320, W = 320):
"""Load and preprocess image."""
x = image.load_img(path, target_size=(H, W))
if preprocess:
mean, std = get_mean_std_per_batch(df, H=H, W=W)
x -= mean
x /= std
x = np.expand_dims(x, axis=0)
return x
def compute_gradcam(model, img, data_dir, df, labels, selected_labels, layer_name='bn'):
img_path = data_dir + img
preprocessed_input = load_image(img_path, df)
predictions = model.predict(preprocessed_input)
print("Ground Truth: ", ", ".join(np.take(labels, np.nonzero(df[df["Image"] == img][labels].values[0]))[0]))
plt.figure(figsize=(15, 10))
plt.subplot(151)
plt.title("Original")
plt.axis('off')
plt.imshow(load_image(img_path, df, preprocess=False), cmap='gray')
j = 1
for i in range(len(labels)):
if labels[i] in selected_labels:
print("Generating gradcam for class %s (p=%2.2f)" % (labels[i], round(predictions[0][i], 3)))
gradcam = grad_cam(model, preprocessed_input, i, layer_name)
plt.subplot(151 + j)
plt.title(labels[i] + ": " + str(round(predictions[0][i], 3)))
plt.axis('off')
plt.imshow(load_image(img_path, df, preprocess=False), cmap='gray')
plt.imshow(gradcam, cmap='jet', alpha=min(0.5, predictions[0][i]))
j +=1
def cindex(y_true, scores):
return lifelines.utils.concordance_index(y_true, scores)
# LOAD MODEL FROM C1M2
def load_C3M3_model():
labels = ['Cardiomegaly', 'Emphysema', 'Effusion', 'Hernia', 'Infiltration', 'Mass', 'Nodule', 'Atelectasis',
'Pneumothorax', 'Pleural_Thickening', 'Pneumonia', 'Fibrosis', 'Edema', 'Consolidation']
train_df = pd.read_csv("nih_new/train-small.csv")
valid_df = pd.read_csv("nih_new/valid-small.csv")
test_df = pd.read_csv("nih_new/test.csv")
class_pos = train_df.loc[:, labels].sum(axis=0)
class_neg = len(train_df) - class_pos
class_total = class_pos + class_neg
pos_weights = class_pos / class_total
neg_weights = class_neg / class_total
print("Got loss weights")
# create the base pre-trained model
base_model = DenseNet121(weights='densenet.hdf5', include_top=False)
print("Loaded DenseNet")
# add a global spatial average pooling layer
x = base_model.output
x = GlobalAveragePooling2D()(x)
# and a logistic layer
predictions = Dense(len(labels), activation="sigmoid")(x)
print("Added layers")
model = Model(inputs=base_model.input, outputs=predictions)
def get_weighted_loss(neg_weights, pos_weights, epsilon=1e-7):
def weighted_loss(y_true, y_pred):
# L(X, y) = −w * y log p(Y = 1|X) − w * (1 − y) log p(Y = 0|X)
# from https://arxiv.org/pdf/1711.05225.pdf
loss = 0
for i in range(len(neg_weights)):
loss -= (neg_weights[i] * y_true[:, i] * K.log(y_pred[:, i] + epsilon) +
pos_weights[i] * (1 - y_true[:, i]) * K.log(1 - y_pred[:, i] + epsilon))
loss = K.sum(loss)
return loss
return weighted_loss
model.compile(optimizer='adam', loss=get_weighted_loss(neg_weights, pos_weights))
print("Compiled Model")
model.load_weights("nih_new/pretrained_model.h5")
print("Loaded Weights")
return model