In this project we develop a Deep Learning detector of Covid-19 in radiographs. For this purpose, we use images from the «Covid-chestxray-dataset» [3], generated by researchers from the Mila research group and the University of Montreal [4]. We also use images of radiographs of healthy and bacterial pneumonia patients extracted from Kaggle’s «Chest X-Ray Images (Pneumonia)» competition [5].
In total, we have a number of 426 images, divided into training (339 images), validation (42 images) and test (45 images) sets.
The partitions are divided into training (339 images), validation (42 images) and test (45 images).
The partitions are given in «.txt» lists, in which each image is assigned a tag:
- 0) Healthy
- 1) Covid-19
- 2) Pneumonia
Note: The results obtained by the models trained in this database are purely for educational purposes and cannot be used for actual diagnosis without clinical validation.
References¶
- María Climent, 2020 Covid-19: La Inteligencia Artificial De La Española Quibim Puede Acelerar El Diagnóstico Del Coronavirus
- Angel Alberich-bayarri,2020 Imagin, AI and Radiomix to understand and fight Coronavirus Covid-19
- Ieee8023/covid-chestxray-dataset
- Cohen, J.P., Morrison, P. and Dao, L., 2020. COVID-19 image data collection.
- Paul Mooney, 2019 Chest X-ray Images (pneumonia)
from google.colab import drive
drive.mount('/content/gdrive')
#Import libraries
import numpy as np
import re,shutil,os,timeit,glob
import matplotlib.pyplot as plt
import random
from IPython.display import Image
from sklearn.dummy import DummyClassifier
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
from keras.models import Sequential
from keras.layers import Dense, Conv2D, Flatten, Activation, Dropout, MaxPooling2D, BatchNormalization
from sklearn.metrics import accuracy_score
from keras.callbacks import ModelCheckpoint
from keras.optimizers import Adam,RMSprop,SGD,Adadelta
#Function to clean txt files path.
def remChars(string):
#Remove first character
ret=string[2:]
ret=ret[:-1]
return ret
#Function to add noise to the images.
def add_noise(img):
VARIABILITY = 50
deviation = VARIABILITY*random.random()
noise = np.random.normal(0, deviation, img.shape)
img += noise
np.clip(img, 0., 255.)
return img
#Images path base
basepath="/content/gdrive/My Drive/"
#Let's create folder structure, train, test and validation with a folder per class inside them.
#Paths and txt files with the image names.
paths=['test','train','validation']
files=['testing.txt','training.txt','validation.txt']
#Read image names from the txt files
#Copy file in the proper folder.
for p,f in zip(paths, files):
file = open(basepath+f,"r")
imgfiles= file.readlines()
#Clean path and create folder structure
os.makedirs(basepath+p+"/COVID",exist_ok =True)
os.makedirs(basepath+p+"/HEALTHY",exist_ok =True)
os.makedirs(basepath+p+"/PNEUMONIA",exist_ok =True)
for s in imgfiles:
s=remChars(s)
if "COVID" in s:
shutil.copy(basepath+s, basepath+p+"/COVID")
elif "HEALTHY" in s:
shutil.copy(basepath+s,basepath+p+"/HEALTHY")
else:
shutil.copy(basepath+s,basepath+p+"/PNEUMONIA")
#Import images. Reduce size to 224x224
#Training dataset augmentation.
train_data_gen = ImageDataGenerator(
rescale=1./255,
rotation_range=25,
width_shift_range=0.3,
height_shift_range=0.3,
zoom_range=0.2,
horizontal_flip=True,
fill_mode='nearest',
preprocessing_function=add_noise)
#Only scale validation and test images.
validation_data_gen = ImageDataGenerator(rescale=1./255)
test_data_gen = ImageDataGenerator(rescale=1./255)
train_generator =train_data_gen.flow_from_directory(basepath+'train',
target_size=(224,224),
batch_size=32,
class_mode = 'categorical',
shuffle=True)
validation_generator = validation_data_gen.flow_from_directory(basepath+'validation',
target_size=(224,224),
batch_size=32,
class_mode = 'categorical',
shuffle=True)
test_generator = test_data_gen.flow_from_directory(basepath+'test',
target_size=(224,224),
batch_size=32,
class_mode = 'categorical',
shuffle=False)
#Show some transformed images examples.
img = load_img(train_generator.filepaths[0]) # this is a PIL image
x = img_to_array(img) # this is a Numpy array with shape (3, 150, 150)
x = x.reshape((1,) + x.shape) # this is a Numpy array with shape (1, 3, 150, 150)
i = 0
for batch in train_data_gen.flow(x, batch_size=1,
save_to_dir=basepath+'augmented', save_prefix='hi', save_format='jpeg'):
i += 1
if i > 2:
break
#Show images
for filename in glob.glob(basepath+'augmented/*.jpeg'): #assuming gif
display(Image(filename,width=150,height=150))
#Simple model as baseline to measure performance improvements.
num_classes=3
model = Sequential()
model.add(Conv2D(11, kernel_size=3,strides=1, activation='relu', input_shape=(224,224,3)))
model.add(Flatten())
model.add((Dense(num_classes, activation='softmax')))
#Adam optimizer
optimizer=Adam(lr=2E-4)
# Compile the model
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
#Reset batch generator
train_generator.reset()
validation_generator.reset()
test_generator.reset()
#Run just 10 epochs and storing the execution time.
start_time = timeit.default_timer()
mfit=model.fit_generator(
train_generator,
epochs=15,
validation_data=validation_generator)
elapsedbaseline = timeit.default_timer() - start_time
#The elapsed time of this model is:
print('Baseline model elapsed time: '+str(elapsedbaseline) + ' seg.')
#Accuracy and Loss graphs for training and validation
# TODO
# summarize history for accuracy
plt.plot(mfit.history['accuracy'])
plt.plot(mfit.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
# TODO
# summarize history for loss
plt.plot(mfit.history['loss'])
plt.plot(mfit.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
from keras.applications.vgg16 import VGG16
vgg_c = VGG16(weights='imagenet', include_top=False,classes=3,input_shape=(224,224,3))
#Let's do transfer learning using the weights available in imagenet,
#having a small dataset, different from the original one with which the network was trained, we will run
#one part (only blocks 4 and 5).
for l in vgg_c.layers[:-8]:
l.trainable=False
for l in vgg_c.layers:
print(l.name+" "+str(l.trainable))
#Creamos las capas de salida
vgg_model=Sequential()
vgg_model.add(vgg_c)
vgg_model.add(Flatten())
vgg_model.add(Dense(4096,activation="relu"))
vgg_model.add(Dense(4096,activation="relu"))
vgg_model.add(Dense(3,activation="softmax"))
#Reset batches generators
train_generator.reset()
validation_generator.reset()
test_generator.reset()
#Let's define callbacks in order to save checkpoints just in case the process stops.
filepath = basepath+"vgg_model.h5"
checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=1, save_best_only=True, mode='min')
callbacks_list = [checkpoint]
#Optimizer setup
optimizer=Adam(lr=1E-6)
#Use same optimizer than in the base model
vgg_model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
#Let's run 15 epochs, saving the elapsed time
start_time = timeit.default_timer()
mfitVGG=vgg_model.fit(
train_generator,
epochs=15,
validation_data=validation_generator,callbacks=callbacks_list)
elapsedVGG = timeit.default_timer() - start_time
#The elapsed time of this model has been:
print('VGG model elapsed time: '+str(elapsedVGG) + ' seg.')
#Train and validation accuarcy and loss graphs:
# TODO
# summarize history for accuracy
plt.plot(mfitVGG.history['accuracy'])
plt.plot(mfitVGG.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
# TODO
# summarize history for loss
plt.plot(mfitVGG.history['loss'])
plt.plot(mfitVGG.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
from keras.applications import InceptionV3
inception_c = InceptionV3(weights='imagenet', include_top=False,classes=3,input_shape=(299,299,3))
#We adapt the generators to the size of inception
train_generator =train_data_gen.flow_from_directory(basepath+'train',
target_size=(299,299),
batch_size=32,
class_mode = 'categorical',
shuffle=True)
validation_generator = validation_data_gen.flow_from_directory(basepath+'validation',
target_size=(299,299),
batch_size=32,
class_mode = 'categorical',
shuffle=True)
test_generator = test_data_gen.flow_from_directory(basepath+'test',
target_size=(299,299),
batch_size=32,
class_mode = 'categorical',
shuffle=False)
#Let's do transfer learning using the weights available in imagenet,
#having a small dataset, different from the original one with which the network was trained, we will run
#one part (only the last blocks).
for l in inception_c.layers[:-82]:
l.trainable=False
for l in inception_c.layers:
print(l.name+" "+str(l.trainable))
#We create the output layers
inception_model=Sequential()
inception_model.add(inception_c)
inception_model.add(Flatten())
inception_model.add(Dense(2048,activation="relu"))
inception_model.add(Dense(3,activation="softmax"))
#Reset batch generators
train_generator.reset()
validation_generator.reset()
test_generator.reset()
#We define callback to save checkpoints in case the process stops.
filepath = basepath+"inception.h5"
checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=1, save_best_only=True, mode='min')
callbacks_list = [checkpoint]
#Setup the optimizer
optimizer=Adam(lr=1E-6)
inception_model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
#We run the model, saving the execution time, with 15 epochs
start_time = timeit.default_timer()
mfitxception=inception_model.fit(
train_generator,
epochs=15,
validation_data=validation_generator,callbacks=callbacks_list)
elapsedinception = timeit.default_timer() - start_time
elapsedinception
#Accuracy and loss graphs for training and validation.
# TODO
# summarize history for accuracy
plt.plot(mfitxception.history['accuracy'])
plt.plot(mfitxception.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
# TODO
# summarize history for loss
plt.plot(mfitxception.history['loss'])
plt.plot(mfitxception.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
from keras.applications import DenseNet121
densenet = DenseNet121(weights=None, include_top=True,classes=3)
#for validation and test we just scale the image to the right size for the network
train_generator =train_data_gen.flow_from_directory(basepath+'train',
target_size=(224,224),
batch_size=32,
class_mode = 'categorical',
shuffle=True)
validation_generator = validation_data_gen.flow_from_directory(basepath+'validation',
target_size=(224,224),
batch_size=32,
class_mode = 'categorical',
shuffle=True)
test_generator = test_data_gen.flow_from_directory(basepath+'test',
target_size=(224,224),
batch_size=32,
class_mode = 'categorical',
shuffle=False)
train_generator.reset()
validation_generator.reset()
test_generator.reset()
#We define callback to save checkpoints in case the process stops.
filepath = basepath+"densenet.h5"
checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=1, save_best_only=True, mode='min')
callbacks_list = [checkpoint]
#Setup the optimizer
optimizer=SGD(lr=0.001)
densenet.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
#We run the model and save the runtime, with only 10 epochs
start_time = timeit.default_timer()
mfitdensenet=densenet.fit(
train_generator,
epochs=15,
validation_data=validation_generator,callbacks=callbacks_list)
densenetelapsed = timeit.default_timer() - start_time
densenetelapsed
#Accuracy and loss graphs for train and validation data
# TODO
# summarize history for accuracy
plt.plot(mfitdensenet.history['accuracy'])
plt.plot(mfitdensenet.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
# TODO
# summarize history for loss
plt.plot(mfitdensenet.history['loss'])
plt.plot(mfitdensenet.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
Prediction and comparison¶
En esta sección se debe implementar la fase de test de los mejores modelos desarrollados anteriormente.
test_generator = test_data_gen.flow_from_directory(basepath+'test',
target_size=(224,224),
batch_size=32,
class_mode = 'categorical',
shuffle=False)
test_generator.reset()
#Prediction
prediction=model.predict_generator(test_generator)
#Cast to classes with probabilities
pclasses=[]
for p in prediction:
pclasses.append(np.argmax(p))
#We measure accuracy by comparing prediction with test data.
accuracy_score(test_generator.classes, pclasses)
test_generator.reset()
# Realizamos la predicción con este modelo
prediction_vgg=vgg_model.predict(test_generator)
#Pasamos a clases con las probabilidades
pclasses_vgg=[]
for p in prediction_vgg:
pclasses_vgg.append(np.argmax(p))
#Medimos la precisión comparando la predicción con los datos de test
accuracy_score(test_generator.classes, pclasses_vgg)
test_generator = test_data_gen.flow_from_directory(basepath+'test',
target_size=(299,299),
batch_size=32,
class_mode = 'categorical',
shuffle=False)
test_generator.reset()
# Let's go to the prediction with this model
prediction_inception=inception_model.predict(test_generator)
#Let's cast to classes with probabilities
pclasses_inception=[]
for p in prediction_inception:
pclasses_inception.append(np.argmax(p))
#We measure accuracy by comparing prediction with test data.
accuracy_score(test_generator.classes, pclasses_inception)
# Let's perform the prediction with this model
prediction_densenet=densenet.predict(test_generator)
#Let's cast to classes with probabilities
pclasses_densenet=[]
for p in prediction_densenet:
pclasses_densenet.append(np.argmax(p))
#We measure accuracy by comparing prediction with test data.
accuracy_score(test_generator.classes, pclasses_densenet)
After evaluating the results of running the 4 networks, my conclusion is that for accuracy, run time and learning stability, the most suitable network for this dataset is VGG. We have been able to take good advantage of previously learned weights with other datasets and it seems that it can improve with more epochs. I think that by tuning the network more by modifying the number of retrained layers and adding epochs it can get more performance out of it than DenseNet. VGG is a deep network that uses small kernels to try to find more complex features in the network.
Explainability ¶
Let's try to visualize the important parts of one of the images for the CNN by implementing CAM (Class Activation Maps) with the library visualize_cam
#Gradcam algorithm implementation.
from keras.preprocessing.image import load_img, img_to_array
#Load and show the image
_img = load_img(basepath+"/train/COVID/ryct.2020200034.fig2.jpeg",target_size=(224,224))
plt.imshow(_img)
plt.show()
#We have to change scipy libraries in order to make visualize_cam to work.
!pip uninstall -y scipy
!pip install scipy==1.2.0
#Let's load the pretrained Densenet model.
from keras.applications.densenet import DenseNet121,preprocess_input
from keras.models import load_model
d_model=load_model(basepath+"densenet.h5")
#Let's generate a prediction with the image.
img = img_to_array(_img)
img = preprocess_input(img)
y_pred = d_model.predict(img[np.newaxis,...])
y_pred
class_idxs_sorted = np.argsort(y_pred.flatten())[::-1]
class_idxs_sorted
#We have to set the linear activation in the last layer.
from vis.utils import utils
from keras.activations import linear
# Buscamos la última capa del modelo
layer_idx = utils.find_layer_idx(d_model, 'fc1000')
# Cambiamos su activación de softmax a linear
d_model.layers[layer_idx].activation = linear
d_model = utils.apply_modifications(d_model)
layer_idx
#Let's search the last convolutional layer and its output.
penultima_capa = utils.find_layer_idx(d_model, "bn")
penultima_capa
#No we can generate the heat map.
from vis.visualization import visualize_cam
class_idx = class_idxs_sorted[0]
seed_input = img
grad_top1 = visualize_cam(d_model, layer_idx, class_idx, seed_input,
penultimate_layer_idx = penultima_capa,#None,
backprop_modifier = None,
grad_modifier = None)
#Show the results.
def plot_map(grads):
fig, axes = plt.subplots(1,2,figsize=(14,5))
axes[0].imshow(_img)
axes[1].imshow(_img)
i = axes[1].imshow(grads,cmap="jet",alpha=0.8)
fig.colorbar(i)
plt.suptitle("heatmap")
plot_map(grad_top1)