import numpy as np # untuk perhitungan matematis
import matplotlib.pyplot as plt # untuk visualisasi
from typing import Union # untuk kebutuhan penulisan tipe data
from tensorflow import transpose as tf_transpose # untuk melakukan transpose pada tensor
from tensorflow.keras.utils import image_dataset_from_directory # untuk load dataset
@np.vectorize
def relu(x:np.ndarray):
return np.maximum(0,x)
@np.vectorize
def sigmoid(x:np.ndarray):
return 1 / (1 + np.exp(-x))
def pad_to_size(array:np.ndarray, target_shape:tuple, value=0, from_front:bool=True) -> np.ndarray:
assert len(array.shape) == len(target_shape), "Array and target dimension mismatch"
return np.pad(
array,
[(target_shape[i] - array.shape[i], 0) if from_front else (0, target_shape[i] - array.shape[i]) for i in range(len(array.shape))],
"constant",
constant_values=value
)
class Conv2DLayer:
""" Convolutional layer.
"""
def __init__(
self,
filter_shape:tuple = (2,2),
padding: Union[int, tuple] = (0,0),
stride:Union[int, tuple] = (1,1),
input_shape:tuple = None,
filter_count:int = 1,
activation_function: Union["relu", "sigmoid"] = "relu"
):
if input_shape is None:
self._input_shape = None
else:
self._input_shape = pad_to_size(np.array(input_shape), (4,), 1, True)
self._filter_shape = pad_to_size(np.array(filter_shape), (4,), 1, True)
self._filter_count = filter_count
if isinstance(padding, int):
self._padding = np.array((0, 0 ,padding, padding))
else:
self._padding = pad_to_size(np.array(padding), (4,), 0, True)
if isinstance(stride, int):
self._stride = (stride, stride)
else:
self._stride = stride
self._activation_function = activation_function
self._is_compiled = False
def __get_param_count(self):
assert self._input_shape is not None
# number of param = (filter width * filter height * input channels + 1) * number of filters
return ((self._filter_shape[-1] * self._filter_shape[-2] * self._input_shape[1] + 1) * self._filter_count)
def __get_output_shape(self):
assert self._input_shape is not None
shape = (
(
self._input_shape[2:] + \
2 * self._padding[2:] - \
self._filter_shape[2:]
) / self._stride + 1
).astype(int)
return (*self._input_shape[:2],*shape)
def _compile(self, input_shape: tuple = None, random=True):
self._is_compiled = True
if input_shape is not None:
self._input_shape = input_shape
# Batch, Filter count, channel, height, width
weights_shape = (self._filter_count, self._input_shape[1], *self._filter_shape[2:])
if not random:
self._weights = np.arange(np.prod(weights_shape)).reshape(weights_shape)
self._bias_weights = np.arange(np.prod(weights_shape[:-3])).reshape(weights_shape[:-3])
else:
self._weights = np.random.randn(*weights_shape)
self._bias_weights = np.random.randn(*weights_shape[:-3])
def _forward(self, x:np.ndarray):
assert self._is_compiled, "Layer is not compiled"
assert np.all(pad_to_size(np.array(x.shape), (4,), 1, True) == self._input_shape), "Input shape mismatch"
inp = x.copy().reshape(pad_to_size(np.array(x.shape), (4,), 1, True))
inp = np.pad(inp, ((0,0),(0,0),(self._padding[2], self._padding[2]), (self._padding[3], self._padding[3])))
# Use sliding windows to create convolution windows, and ignore filter count first
windows = np.lib.stride_tricks.sliding_window_view(inp, (1,*self._weights.shape[1:]))
# slice stride
windows = windows[:,:,::self._stride[-2],::self._stride[-1]]
# Create bias on output shape
bias = np.repeat(
self._bias_weights.reshape(1,-1),
np.prod(self.output_shape[-3:])
).reshape(
(*self._bias_weights.shape, *self.output_shape[-3:])
)
# Array Shape Format:
# input format (batch, channel, height, width)
# window format (batch, 1, out_height, out_width, 1 (filter_count), channel, kernel_height, kernel_width)
# weight format (filter_count, channels, kernel_height, kernel_width)
# using np.einsum:
# - iterate through batch and filter_count (from ellipsis)
# - sum based on kernel_height and kernel_width index with input_height and input_width (mn)
# - based on out_height, out_width, and channels, iterate to create output (lij)
return self._activation((np.einsum('...lmn,...ijklmn->...lij',self._weights,windows) + bias).sum(axis=1))
# Activation function
def _activation(self, inp: np.ndarray):
if self._activation_function == "relu":
return relu(inp)
elif self._activation_function == "sigmoid":
return sigmoid(inp.astype(np.float128))
def __call__(self, inp: np.ndarray):
self.input = inp
return self._forward(self.input)
output_shape = property(__get_output_shape)
param_count = property(__get_param_count)
class Pooling:
""" Pooling layer.
Args:
input_shape: tuple (batch, channel, height, width), shape of input received by the pooling layer
pool_size: integer or tuple of 2 integers, size of the pooling window
stride: integer or tuple of 2 integers, stride of the pooling operation
mode: string, 'max' or 'average'
"""
def __init__(
self,
input_shape: tuple = None,
pool_size: tuple = (2, 2),
stride: int=None,
mode: Union['max', 'average']='max'
):
# pool size is integer or tuple of 2 integers
# if tuple, it is (pool_height, pool_width) otherwise, pool_height = pool_width = pool_size
if isinstance(pool_size, int):
self.pool_h = self.pool_w = pool_size
elif isinstance(pool_size, tuple) and len(pool_size) == 2 and all(isinstance(x, int) for x in pool_size):
self.pool_h, self.pool_w = pool_size
else:
raise ValueError(
"Invalid pool_size. Must be an integer or a tuple of two integers.")
# stride is integer or tuple of 2 integers or None
# if tuple, it is (stride_height, stride_width) otherwise, stride_height = stride_width = stride
if stride is None:
stride = pool_size
if isinstance(stride, int):
self.stride_h = self.stride_w = stride
elif isinstance(stride, tuple) and len(stride) == 2 and all(isinstance(x, int) for x in stride):
self.stride_h, self.stride_w = stride
else:
raise ValueError(
"Invalid stride. Must be an integer or a tuple of two integers.")
# mode is either 'max' or 'average'
if mode not in ['max', 'average']:
raise ValueError(
"Invalid mode. Must be either 'max' or 'average'.")
self._input_shape = input_shape
self.mode = mode
self._is_compiled = False
def __call__(self, x):
self.input = x
return self._forward(self.input)
def _pool(self, x):
# Determine dimensions (batch_size, channels, height, width)
batch_size, channels, height, width = x.shape
# Calculate output dimensions
self.out_h = (height - self.pool_h) // self.stride_h + 1
self.out_w = (width - self.pool_w) // self.stride_w + 1
# Initialize output tensor based on calculated dimensions
x_pooled = np.zeros((batch_size, channels, self.out_h, self.out_w))
# Define a function to handle the pooling operation
def pool_operation(x_slice, mode):
return np.max(x_slice) if mode == 'max' else np.mean(x_slice)
# Loop through common dimensions
for i in range(batch_size):
for j in range(channels):
for k in range(self.out_h):
for l in range(self.out_w):
h_start, h_end = k * self.stride_h, k * self.stride_h + self.pool_h
w_start, w_end = l * self.stride_w, l * self.stride_w + self.pool_w
x_slice = x[i, j, h_start:h_end, w_start:w_end]
x_pooled[i, j, k, l] = pool_operation(x_slice, self.mode)
return x_pooled
def _compile(self, input_shape: tuple, random=True):
if input_shape is not None:
self._input_shape = np.array(input_shape)
self._is_compiled = True
def _forward(self, x):
assert self._is_compiled, "Layer is not compiled"
assert np.all(pad_to_size(np.array(x.shape), (4,), 1, True) == self._input_shape), "Input shape mismatch"
return self._pool(self.input)
def __get_output_shape(self):
assert self._is_compiled, "Layer is not compiled"
return (
*self._input_shape[:2],
*(
(
self._input_shape[2:] - \
np.array((self.pool_h, self.pool_w))
) / np.array((self.stride_h, self.stride_w)) + 1
).astype(int)
)
output_shape = property(__get_output_shape)
param_count = 0
class Flatten:
""" Flatten layer. Flattens a 4D tensor to 2D.
"""
def __init__(self, input_shape: tuple = None):
self._input_shape = input_shape
self._is_compiled = False
def __call__(self, x):
self.input = x
return self._forward(x)
def _flatten(self, x):
assert self._input_shape is not None, "Layer is not compiled"
# x.shape shape dimension is (batch, channels, height, width)
batch_size, channels, height, width = x.shape
# Flatten tensor
x_flat = x.reshape(batch_size, -1)
return x_flat
def _forward(self, x):
assert self._is_compiled, "Layer is not compiled"
# x is a 4D tensor of shape (batch_size, channels, height, width)
self.x = x
self.x_flat = self._flatten(self.x)
# x_flat shape dimension is (batch_size, units)
return self.x_flat
def _compile(self, input_shape: tuple, random=True):
if input_shape is not None:
self._input_shape = np.array(input_shape)
self._is_compiled = True
def __get_output_shape(self):
assert self._is_compiled, "Layer is not compiled"
return (self._input_shape[0], 1, 1, np.prod(self._input_shape[1:]))
output_shape = property(__get_output_shape)
param_count = 0
class Dense:
""" Dense layer.
Args:
units: positive integer, dimensionality of the output space
input_shape: tuple (batch, channel, height, width), shape of input received by the dense layer
activation: string, 'relu' or 'sigmoid'
"""
def __init__(self, units, input_shape: tuple = None, activation="relu"):
# units is positive integer
if not(isinstance(units, int) and units > 0):
raise ValueError("Invalid units. Must be a positive integer.")
# activation is either 'ReLU' or 'sigmoid'
if activation not in ["relu", "sigmoid"]:
raise ValueError("Invalid activation. Must be either 'relu' or 'sigmoid'.")
self.units = units
self.activation = activation
self.weights = None
self.bias = None
self._input_shape = input_shape
self._is_compiled = False
def __call__(self, x):
self.input = x
return self._forward(x)
def _activation(self, x):
if self.activation == "relu":
return relu(x)
elif self.activation == "sigmoid":
return sigmoid(np.float128(x))
def _compile(self, input_shape: tuple, random=True):
if input_shape is not None:
assert input_shape[-2] == 1, "Input must be 1D for dense layer"
self._input_shape = np.array(input_shape)
if random:
self.weights = np.random.randn(self._input_shape[-1], self.units)
self.bias = np.random.randn(self.units)
else:
self.weights = np.arange(self._input_shape[-1] * self.units).reshape(self._input_shape[-1], self.units)
self.bias = np.arange(self.units)
self._is_compiled = True
def _forward(self, x):
assert self._is_compiled, "Layer is not compiled"
# x is N-D tensor with 2D shape (batch_size, input_dim)
self.x = x
# Calculate output tensor
self.y = np.dot(x, self.weights) + self.bias
# x_dense shape dimension (batch_size, units)
return self._activation(self.y)
def __get_output_shape(self):
return (*self._input_shape[:2], 1, self.units)
def __get_param_count(self):
return ((self._input_shape[2]*self._input_shape[3] + 1) * self.units)
output_shape = property(__get_output_shape)
param_count = property(__get_param_count)
class Sequential:
""" Sequential class.
Sequential groups a linear stack of layers into a model
Sequential provides training and inference features on this model.
"""
def __init__(self, *layers):
self.layers = []
for l in layers:
self.layers.append(l)
def compile(self, random=True):
assert len(self.layers) > 0, "No layers are available"
assert self.layers[0]._input_shape is not None, "First layer needs an input shape"
self.layers[0]._compile(random=random)
for i in range(len(self.layers) - 1):
self.layers[i+1]._compile(self.layers[i].output_shape, random=random)
def summary(self):
print('Model: "sequential"')
print("-"*70)
print("{:<20}{:<30}{:<20}".format("Layer (type)", "Output Shape", "Param #"))
print("="*70)
total_params = 0
# Looping per layer
for layer in self.layers:
total_params+=layer.param_count
print('{:<20}{:<30}{:<20}'.format(type(layer).__name__, str(layer.output_shape), layer.param_count))
print("="*70)
print(f"Total params: {total_params}")
print(f"Trainable params: {total_params}")
print(f"Non-trainable params: 0")
def fit(self):
pass
def evaluate(self):
pass
def predict(self, inp: np.ndarray):
assert np.all(pad_to_size(np.array(inp.shape), (4,), 1, True) == self.layers[0]._input_shape), "Input shape mismatch"
result = inp.copy()
for layer in self.layers:
result = layer(result)
return result
Contoh Hasil Prediksi Menggunakan Model CNN yang Dibuat
Train = image_dataset_from_directory("PandasBears/Train",shuffle = True)
Test = image_dataset_from_directory("PandasBears/Test",shuffle = True)
# Transform from default 'channels_last' to 'channels_first' data format
def to_channels_first(image, label):
image = tf_transpose(image, (0, 3, 1, 2))
return image, label
Train = Train.map(to_channels_first)
Test = Test.map(to_channels_first)
model = Sequential(
Conv2DLayer(input_shape=(32,3,256,256), filter_count=6, filter_shape=(5,5), stride=1, activation_function='relu', padding=(2,2)),
Pooling(mode='max', pool_size=(2,2)),
Flatten(),
Dense(units=8, activation='relu'),
Dense(units=1, activation='sigmoid')
)
model.compile()
model.summary()
images, labels = Train.take(1).get_single_element()
predictions = model.predict(np.array(images))
images = tf_transpose(images, perm=[0,2,3,1])
num_images = len(images)
rows = int(np.ceil(num_images / 5.0)) # Calculate number of rows needed
# Create subplots: 'rows' number of rows and 5 columns for 5 images in each row
fig, axes = plt.subplots(rows, 5, figsize=(20, 4 * rows))
# Flatten the axes array to make it easier to iterate
axes = axes.flatten()
# Loop over each image, label, and prediction
for axes_index, (image, label, prediction) in enumerate(zip(images, labels, predictions.reshape(len(predictions)))):
# Convert the images back to 'channels_last' format
image = image.numpy().astype("uint8")
label = label.numpy()
# Plot the images
axes[axes_index].imshow(image)
axes[axes_index].axis('off')
axes[axes_index].set_title(f'Label: {label} Predicted: {prediction}')
# Turn off remaining axes to keep a clean look
for i in range(axes_index + 1, len(axes)):
axes[i].axis('off')
# Display the plot
plt.show()