import tensorflow as tf
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras import layers, models
import tensorflow.keras.backend as K
import numpy as np
import matplotlib.pyplot as plt
import os
import glob
from pathlib import Path
root_path = "lung_segmentation"
N_ROWS = 160
N_COLS = 160
N_LABELS = 1
def read_image(image_path, mask_path):
# Leemos la imagen del directorio
image = tf.io.read_file(image_path)
# Decodificamos la imagen en función del tipo de codificación (PNG) en nuestro caso
image = tf.image.decode_png(image, channels=3)
# Convertimos la imagen a float32 --> Este método al mismo tiempo, al especificarle tipo de dato float hace el escalado a [0,1]
image = tf.image.convert_image_dtype(image, tf.float32)
# Redimensionamos la imagen al tamaño deseado
image = tf.image.resize(image, (N_ROWS, N_COLS), method='nearest')
# Leemos la máscara del directorio
mask = tf.io.read_file(mask_path)
# Decodificamos la imagen en función del tipo de codificación (PNG) en nuestro caso
mask = tf.image.decode_png(mask, channels=1)
# Las máscaras están guardadas de manera que el número de la etiqueta (0-12) se encuentra en el primer canal
# Por lo tanto, nos quedamos con el valor máximo de cada canal para crear una matriz de tamaño [FILAS, COLUMNAS, 1]
mask = tf.math.reduce_max(mask, axis=-1, keepdims=True)
# Redimensionamos la imagen al tamaño deseado
mask = tf.image.resize(mask, (N_ROWS, N_COLS), method='nearest')
mask = mask / 255
mask = tf.cast(mask, 'uint8')
return image, mask
def dataset_generator(image_paths, mask_paths, buffer_size, batch_size):
image_list = tf.constant(image_paths)
mask_list = tf.constant(mask_paths)
dataset = tf.data.Dataset.from_tensor_slices((image_list, mask_list))
# A cada elemento del dataset (directorios imagenes y máscaras) les aplicamos el método read_image
dataset = dataset.map(read_image, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.cache().shuffle(buffer_size).batch(batch_size)
return dataset
img_dir_train = 'lung_segmentation/train/images'
mask_dir_train = 'lung_segmentation/train/masks'
train_image_paths = sorted(glob.glob(os.path.join(img_dir_train, '*.png')))
train_mask_paths = sorted(glob.glob(os.path.join(mask_dir_train, '*.png')))
img_dir_val = 'lung_segmentation/validation/images'
mask_dir_val = 'lung_segmentation/validation/masks'
val_image_paths = sorted(glob.glob(os.path.join(img_dir_val, '*.png')))
val_mask_paths = sorted(glob.glob(os.path.join(mask_dir_val, '*.png')))
img_dir_test = 'lung_segmentation/test/images'
mask_dir_test = 'lung_segmentation/test/masks'
test_image_paths = sorted(glob.glob(os.path.join(img_dir_val, '*.png')))
test_mask_paths = sorted(glob.glob(os.path.join(mask_dir_val, '*.png')))
print('Número de imágenes de entrenamiento: ', len(train_image_paths))
print('Número de imágenes de validación: ', len(val_image_paths))
print('Número de imágenes de test: ', len(test_image_paths))
BATCH_SIZE = 32
BUFFER_SIZE = 500
train_dataset = dataset_generator(train_image_paths, train_mask_paths, BUFFER_SIZE, BATCH_SIZE)
validation_dataset = dataset_generator(val_image_paths, val_mask_paths, BUFFER_SIZE, BATCH_SIZE)
test_dataset = dataset_generator(test_image_paths, test_mask_paths, BUFFER_SIZE, BATCH_SIZE)
for images, masks in train_dataset.take(1):
for i in range(3):
image, mask = images[i], masks[i]
plt.figure(figsize=(20,8))
plt.subplot(121)
plt.imshow(image)
plt.axis('off')
plt.title('Imagen')
plt.subplot(122)
plt.imshow(mask[:,:,0])
plt.axis('off')
plt.title('Máscara')
plt.show()
IMAGE_SHAPE = (N_ROWS, N_COLS, 3)
ACTIVATION = 'sigmoid'
def get_unet():
# Encoding phase
inputs = layers.Input(IMAGE_SHAPE)
conv1 = layers.Conv2D(32, (3, 3), padding='same')(inputs)
conv1 = layers.BatchNormalization()(conv1)
conv1 = layers.Activation('relu')(conv1)
conv1 = layers.Conv2D(32, (3, 3), padding='same')(conv1)
conv1 = layers.BatchNormalization()(conv1)
conv1 = layers.Activation('relu')(conv1)
pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)
conv2 = layers.Conv2D(64, (3, 3), padding='same')(pool1)
conv2 = layers.BatchNormalization()(conv2)
conv2 = layers.Activation('relu')(conv2)
conv2 = layers.Conv2D(64, (3, 3), padding='same')(conv2)
conv2 = layers.BatchNormalization()(conv2)
conv2 = layers.Activation('relu')(conv2)
pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)
conv3 = layers.Conv2D(128, (3, 3), padding='same')(pool2)
conv3 = layers.BatchNormalization()(conv3)
conv3 = layers.Activation('relu')(conv3)
conv3 = layers.Conv2D(128, (3, 3), padding='same')(conv3)
conv3 = layers.BatchNormalization()(conv3)
conv3 = layers.Activation('relu')(conv3)
pool3 = layers.MaxPooling2D(pool_size=(2, 2))(conv3)
conv4 = layers.Conv2D(256, (3, 3), padding='same')(pool3)
conv4 = layers.BatchNormalization()(conv4)
conv4 = layers.Activation('relu')(conv4)
conv4 = layers.Conv2D(256, (3, 3), padding='same')(conv4)
conv4 = layers.BatchNormalization()(conv4)
conv4 = layers.Activation('relu')(conv4)
pool4 = layers.MaxPooling2D(pool_size=(2, 2))(conv4)
# Decoding phase
conv5 = layers.Conv2D(512, (3, 3), padding='same')(pool4)
conv5 = layers.BatchNormalization()(conv5)
conv5 = layers.Activation('relu')(conv5)
conv5 = layers.Conv2D(512, (3, 3), padding='same')(conv5)
conv5 = layers.BatchNormalization()(conv5)
conv5 = layers.Activation('relu')(conv5)
up6 = layers.concatenate([layers.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(conv5), conv4], axis=3)
conv6 = layers.Conv2D(256, (3, 3), padding='same')(up6)
conv6 = layers.BatchNormalization()(conv6)
conv6 = layers.Activation('relu')(conv6)
conv6 = layers.Conv2D(256, (3, 3), padding='same')(conv6)
conv6 = layers.BatchNormalization()(conv6)
conv6 = layers.Activation('relu')(conv6)
up7 = layers.concatenate([layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv6), conv3], axis=3)
conv7 = layers.Conv2D(128, (3, 3), padding='same')(up7)
conv7 = layers.BatchNormalization()(conv7)
conv7 = layers.Activation('relu')(conv7)
conv7 = layers.Conv2D(128, (3, 3), padding='same')(conv7)
conv7 = layers.BatchNormalization()(conv7)
conv7 = layers.Activation('relu')(conv7)
up8 = layers.concatenate([layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv7), conv2], axis=3)
conv8 = layers.Conv2D(64, (3, 3), padding='same')(up8)
conv8 = layers.BatchNormalization()(conv8)
conv8 = layers.Activation('relu')(conv8)
conv8 = layers.Conv2D(64, (3, 3), padding='same')(conv8)
conv8 = layers.BatchNormalization()(conv8)
conv8 = layers.Activation('relu')(conv8)
up9 = layers.concatenate([layers.Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv8), conv1], axis=3)
conv9 = layers.Conv2D(32, (3, 3), padding='same')(up9)
conv9 = layers.BatchNormalization()(conv9)
conv9 = layers.Activation('relu')(conv9)
conv9 = layers.Conv2D(32, (3, 3), padding='same')(conv9)
conv9 = layers.BatchNormalization()(conv9)
conv9 = layers.Activation('relu')(conv9)
out = layers.Conv2D(1, (1, 1))(conv9)
# Output
output = layers.Activation(ACTIVATION)(out)
# Compile model with inputs and outputs
model = models.Model(inputs=[inputs], outputs=[output])
return model
model_unet = get_unet()
model_unet.summary()
# smooth para evitar dividir por 0
smooth = 1.
def dice_coef(targets, inputs):
inputs = K.flatten(inputs)
targets = K.flatten(targets)
intersection = K.sum(targets * inputs)
dice = (2*intersection + smooth) / (K.sum(targets) + K.sum(inputs) + smooth)
return dice
def iou(targets, inputs):
inputs = K.flatten(inputs)
targets = K.flatten(targets)
intersection = K.sum(targets * inputs)
total = K.sum(targets) + K.sum(inputs)
union = total - intersection
IoU = (intersection + smooth) / (union + smooth)
return IoU
model_unet.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy', dice_coef, iou])
path_models = root_path + '/segmentation_binaria/' + 'models'
path_experiment = path_models + 'Train'
EPOCHS = 12
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=path_experiment + 'model.h5',
monitor='dice_coef',
mode='max',
save_best_only=True,
verbose=1)
history_unet = model_unet.fit(train_dataset,
epochs=EPOCHS,
validation_data=validation_dataset,
callbacks=[model_checkpoint_callback],
verbose=1)
# Representamos la exactitud, coeficiente DICE e IOU en entrenamiento y validación
def show_learning_curves(history):
plt.figure(figsize=(20,5))
# Exactitud
plt.subplot(1, 3, 1)
plt.plot(history.history['accuracy'], label='Entrenamiento')
plt.plot(history.history['val_accuracy'], label='Validación')
plt.xlabel('Época')
plt.ylabel('Exactitud')
plt.title('Exactitud')
plt.ylim([0, 1])
plt.legend()
# Coeficiente DICE
plt.subplot(1, 3, 2)
plt.plot(history.history['dice_coef'], label='Entrenamiento')
plt.plot(history.history['val_dice_coef'], label='Validación')
plt.xlabel('Época')
plt.ylabel('DC')
plt.title('Coeficiente Dice')
plt.ylim([0, 1])
plt.legend()
# Intersección sobre la unión
plt.subplot(1, 3, 3)
plt.plot(history.history['iou'], label='Entrenamiento')
plt.plot(history.history['val_iou'], label='Validación')
plt.xlabel('Época')
plt.ylabel('IOU')
plt.title('Intersección sobre la unión')
plt.ylim([0, 1])
plt.legend()
show_learning_curves(history_unet)
# Observamos visualmente la predicciones comparadas con las segmentaciones de referencia en algunas imágenes del set de test
def show_prediction(model):
for images, masks in test_dataset.take(1):
pred_masks = model.predict(images)
for i in range(3):
image, mask = images[i], masks[i]
pred_mask = pred_masks[i]
plt.figure(figsize=(20,10))
plt.subplot(131)
plt.imshow(image)
plt.axis('off')
plt.title('Imagen')
plt.subplot(132)
plt.imshow(mask[:,:,-1])
plt.axis('off')
plt.title('Máscara real')
plt.subplot(133)
plt.imshow(pred_mask[:,:,-1])
plt.axis('off')
plt.title('Máscara predicha')
plt.show()
show_prediction(model_unet)
model_unet.save("model_unet.h5")
! pip3 install -U segmentation-models
import segmentation_models as sm
import tensorflow as tf
sm.set_framework('tf.keras')
BACKBONE = 'mobilenetv2'
preprocess_input = sm.get_preprocessing(BACKBONE)
N_ROWS = 160
N_COLS = 160
N_LABELS = 1
def read_image_transf(image_path, mask_path):
# Leemos la imagen del directorio
image = tf.io.read_file(image_path)
# Decodificamos la imagen en función del tipo de codificación (PNG) en nuestro caso
image = tf.image.decode_png(image, channels=3)
# Convertimos la imagen a tipo de datos en coma flotante para evitar que la normalización nos de error
image = tf.cast(image, tf.float32)
# Normalizamos la imagen con la capa de referencia de MobilenetV2
image = preprocess_input(image)
# Redimensionamos la imagen al tamaño deseado
image = tf.image.resize(image, (N_ROWS, N_COLS), method='nearest')
# Leemos la máscara del directorio
mask = tf.io.read_file(mask_path)
# Decodificamos la imagen en función del tipo de codificación (PNG) en nuestro caso
mask = tf.image.decode_png(mask, channels=3)
# Las máscaras están guardadas de manera que el número de la etiqueta (0-12) se encuentra en el primer canal
# Por lo tanto, nos quedamos con el valor máximo de cada canal para crear una matriz de tamaño [FILAS, COLUMNAS, 1]
mask = tf.math.reduce_max(mask, axis=-1, keepdims=True)
# Redimensionamos la imagen al tamaño deseado
mask = tf.image.resize(mask, (N_ROWS, N_COLS), method='nearest')
# Eliminamos el último canal para quedarnos con matriz [FILAS, COLUMNAS]
mask = tf.squeeze(mask, axis=-1)
# Codificamos matriz a OneHot
mask = tf.one_hot(tf.cast(mask, tf.int32), N_LABELS)
return image, mask
def dataset_generator_transf(image_paths, mask_paths, buffer_size, batch_size):
image_list = tf.constant(image_paths)
mask_list = tf.constant(mask_paths)
dataset = tf.data.Dataset.from_tensor_slices((image_list, mask_list))
# A cada elemento del dataset (directorios imagenes y máscaras) les aplicamos el método read_image_transf
dataset = dataset.map(read_image_transf, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.cache().shuffle(buffer_size).batch(batch_size)
return dataset
train_dataset = dataset_generator_transf(train_image_paths, train_mask_paths, BUFFER_SIZE, BATCH_SIZE)
validation_dataset = dataset_generator_transf(val_image_paths, val_mask_paths, BUFFER_SIZE, BATCH_SIZE)
test_dataset = dataset_generator_transf(test_image_paths, test_mask_paths, BUFFER_SIZE, BATCH_SIZE)
model_tl_unet = sm.Unet(BACKBONE, input_shape=(N_COLS, N_ROWS, 3), classes=N_LABELS, activation='sigmoid', encoder_weights='imagenet', encoder_freeze=True)
model_tl_unet.summary()
EPOCHS = 12
path_experiment = "lung_segmentation/segmentacion_binaria/models/Train2"
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=path_experiment + 'model_unet_transf.h5',
monitor='val_loss',
mode='min',
save_best_only=True,
verbose=1)
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=5,
verbose=1,
mode='min',
restore_best_weights=False
)
model_tl_unet.compile(optimizer='adam',
loss=sm.losses.DiceLoss(),
metrics=['accuracy', dice_coef])
history_unet_transf = model_tl_unet.fit(train_dataset,
epochs=EPOCHS,
validation_data=validation_dataset,
callbacks=[model_checkpoint_callback, early_stopping],
verbose=1)
# Representamos la exactitud, coeficiente DICE y las pérdidas en entrenamiento y validación
def show_learning_curves_transf(history):
plt.figure(figsize=(20,5))
# Exactitud
plt.subplot(1, 3, 1)
plt.plot(history.history['accuracy'], label='Entrenamiento')
plt.plot(history.history['val_accuracy'], label='Validación')
plt.xlabel('Época')
plt.ylabel('Exactitud')
plt.title('Exactitud')
plt.ylim([0, 1])
plt.legend()
# Coeficiente DICE
plt.subplot(1, 3, 2)
plt.plot(history.history['dice_coef'], label='Entrenamiento')
plt.plot(history.history['val_dice_coef'], label='Validación')
plt.xlabel('Época')
plt.ylabel('Coeficiente Dice')
plt.title('Coeficiente Dice')
plt.ylim([0, 1])
plt.legend()
# Pérdidas
plt.subplot(1, 3, 3)
plt.plot(history.history['loss'], label='Entrenamiento')
plt.plot(history.history['val_loss'], label='Validación')
plt.xlabel('Época')
plt.ylabel('Pérdidas')
plt.title('Pérdidas')
plt.ylim([0, 1])
plt.legend()
show_learning_curves_transf(history_unet_transf)
show_prediction(model_tl_unet)
model_tl_unet.save("model_unet_transf.h5")
model_tl_unet_sin_dice = sm.Unet(BACKBONE, input_shape=(N_COLS, N_ROWS, 3), classes=N_LABELS, activation='sigmoid', encoder_weights='imagenet', encoder_freeze=True)
model_tl_unet_sin_dice.compile(optimizer='adam',
loss="binary_crossentropy",
metrics=['accuracy'])
history_unet_transf_sin_dice = model_tl_unet_sin_dice.fit(train_dataset,
epochs=EPOCHS,
validation_data=validation_dataset,
callbacks=[model_checkpoint_callback, early_stopping],
verbose=1)
model_tl_unet_sin_dice.save("model_unet_transf_sin_dice.h5")
show_prediction(model_tl_unet_sin_dice)