# -*- coding: utf-8 -*-
import numpy as np
from .base import Layer
from ztlearn.utils import get_pad
from ztlearn.utils import unroll_inputs
from ztlearn.utils import im2col_indices
from ztlearn.utils import col2im_indices
from ztlearn.utils import get_output_dims
from ztlearn.initializers import InitializeWeights as init
from ztlearn.optimizers import OptimizationFunction as optimizer
[docs]class Conv(Layer):
def __init__(self,
filters = 32,
kernel_size = (3, 3),
activation = None,
input_shape = (1, 8, 8),
strides = (1, 1),
padding = 'valid'):
self.filters = filters
self.strides = strides
self.padding = padding
self.activation = activation
self.kernel_size = kernel_size
self.input_shape = input_shape
self.init_method = None
self.optimizer_kwargs = None
self.is_trainable = True
@property
def trainable(self):
return self.is_trainable
@trainable.setter
def trainable(self, is_trainable):
self.is_trainable = is_trainable
@property
def weight_initializer(self):
return self.init_method
@weight_initializer.setter
def weight_initializer(self, init_method):
self.init_method = init_method
@property
def weight_optimizer(self):
return self.optimizer_kwargs
@weight_optimizer.setter
def weight_optimizer(self, optimizer_kwargs = {}):
self.optimizer_kwargs = optimizer_kwargs
@property
def layer_activation(self):
return self.activation
@layer_activation.setter
def layer_activation(self, activation):
self.activation = activation
@property
def layer_parameters(self):
return sum([np.prod(param.shape) for param in [self.weights, self.bias]])
@property
def output_shape(self):
pad_height, pad_width = get_pad(self.padding,
self.input_shape[1],
self.input_shape[2],
self.strides[0],
self.strides[1],
self.kernel_size[0],
self.kernel_size[1])
output_height, output_width = get_output_dims(self.input_shape[1],
self.input_shape[2],
self.kernel_size,
self.strides,
self.padding)
return self.filters, int(output_height), int(output_width)
[docs] def prep_layer(self):
self.kernel_shape = (self.filters, self.input_shape[0], self.kernel_size[0], self.kernel_size[1])
self.weights = init(self.weight_initializer).initialize_weights(self.kernel_shape)
self.bias = np.zeros((self.kernel_shape[0], 1))
[docs]class Conv2D(Conv):
def __init__(self,
filters = 32,
kernel_size = (3, 3),
activation = None,
input_shape = (1, 8, 8),
strides = (1, 1),
padding = 'valid'):
super(Conv2D, self).__init__(filters, kernel_size, activation, input_shape, strides, padding)
[docs] def pass_forward(self, inputs, train_mode = True, **kwargs):
self.filter_num, _, _, _ = self.weights.shape
self.input_shape = inputs.shape
self.inputs = inputs
input_num, input_depth, input_height, input_width = inputs.shape
pad_height, pad_width = get_pad(self.padding,
input_height,
input_width,
self.strides[0],
self.strides[1],
self.kernel_size[0],
self.kernel_size[1])
# confirm dimensions
assert (input_height + np.sum(pad_height) - self.kernel_size[0]) % self.strides[0] == 0, 'height does not work'
assert (input_width + np.sum(pad_width) - self.kernel_size[1]) % self.strides[1] == 0, 'width does not work'
# compute output_height and output_width
output_height, output_width = get_output_dims(input_height, input_width, self.kernel_size, self.strides, self.padding)
# convert to columns
self.input_col = im2col_indices(inputs,
self.kernel_size[0],
self.kernel_size[1],
padding = (pad_height, pad_width),
stride = 1)
self.weight_col = self.weights.reshape(self.filter_num, -1)
# calculate ouput
output = self.weight_col @ self.input_col + self.bias
output = output.reshape(self.filter_num, int(output_height), int(output_width), input_num)
return output.transpose(3, 0, 1, 2)
[docs] def pass_backward(self, grad, epoch_num, batch_num, batch_size):
input_num, input_depth, input_height, input_width = self.input_shape
doutput_reshaped = grad.transpose(1, 2, 3, 0).reshape(self.filter_num, -1)
if self.is_trainable:
dbias = np.sum(grad, axis = (0, 2, 3))
dbias = dbias.reshape(self.filter_num, -1)
dweights = doutput_reshaped @ self.input_col.T
dweights = dweights.reshape(self.weights.shape)
# optimize the weights and bias
self.weights = optimizer(self.weight_optimizer).update(self.weights, dweights, epoch_num, batch_num, batch_size)
self.bias = optimizer(self.weight_optimizer).update(self.bias, dbias, epoch_num, batch_num, batch_size)
# endif self.is_trainable
weight_reshape = self.weights.reshape(self.filter_num, -1)
dinput_col = weight_reshape.T @ doutput_reshaped
pad_height, pad_width = get_pad(self.padding,
input_height,
input_width,
self.strides[0],
self.strides[1],
self.kernel_size[0],
self.kernel_size[1])
dinputs = col2im_indices(dinput_col,
self.input_shape,
self.kernel_size[0],
self.kernel_size[1],
padding = (pad_height, pad_width),
stride = self.strides[0])
return dinputs
[docs]class ConvLoop2D(Conv):
def __init__(self,
filters = 32,
kernel_size = (3, 3),
activation = None,
input_shape = (1, 8, 8),
strides = (1, 1),
padding = 'valid'):
super(ConvLoop2D, self).__init__(filters, kernel_size, activation, input_shape, strides, padding)
[docs] def pass_forward(self, inputs, train_mode = True, **kwargs):
self.filter_num, _, _, _ = self.weights.shape
self.input_shape = inputs.shape
self.inputs = inputs
input_num, input_depth, input_height, input_width = inputs.shape
pad_height, pad_width = get_pad(self.padding,
input_height,
input_width,
self.strides[0],
self.strides[1],
self.kernel_size[0],
self.kernel_size[1])
x_padded = np.pad(self.inputs, ((0, 0), (0, 0), pad_height, pad_width), mode = 'constant')
# confirm dimensions
assert (input_height + np.sum(pad_height) - self.kernel_size[0]) % self.strides[0] == 0, 'height does not work'
assert (input_width + np.sum(pad_width) - self.kernel_size[1]) % self.strides[1] == 0, 'width does not work'
# compute output_height and output_width
output_height, output_width = get_output_dims(input_height, input_width, self.kernel_size, self.strides, self.padding)
output = np.zeros((input_num, self.filter_num, output_height, output_width))
# convolutions
for b in np.arange(input_num): # batch number
for f in np.arange(self.filter_num): # filter number
for h in np.arange(output_height): # output height
for w in np.arange(output_width): # output width
h_stride, w_stride = h * self.strides[0], w * self.strides[1]
x_patch = x_padded[b, :, h_stride: h_stride + self.kernel_size[0],
w_stride: w_stride + self.kernel_size[1]]
output[b, f, h, w] = np.sum(x_patch * self.weights[f]) + self.bias[f]
return output
[docs] def pass_backward(self, grad, epoch_num, batch_num, batch_size):
input_num, input_depth, input_height, input_width = self.inputs.shape
# initialize the gradient(s)
dinputs = np.zeros(self.inputs.shape)
if self.is_trainable:
# initialize the gradient(s)
dweights = np.zeros(self.weights.shape)
dbias = np.zeros(self.bias.shape)
pad_height, pad_width = get_pad(self.padding,
input_height,
input_width,
self.strides[0],
self.strides[1],
self.kernel_size[0],
self.kernel_size[1])
pad_size = (np.sum(pad_height)/2).astype(int)
if pad_size != 0:
grad = grad[:, :, pad_size: -pad_size, pad_size: -pad_size]
# dweights
for f in np.arange(self.filter_num): # filter number
for c in np.arange(input_depth): # input depth (channels)
for h in np.arange(self.kernel_size[0]): # kernel height
for w in np.arange(self.kernel_size[1]): # kernel width
input_patch = self.inputs[:,
c,
h: input_height - self.kernel_size[0] + h + 1: self.strides[0],
w: input_width - self.kernel_size[1] + w + 1: self.strides[1]]
grad_patch = grad[:, f]
dweights[f, c, h, w] = np.sum(input_patch * grad_patch) / input_num
# dbias
for f in np.arange(self.filter_num): # filter number
dbias[f] = np.sum(grad[:, f]) / input_num
# optimize the weights and bias
self.weights = optimizer(self.weight_optimizer).update(self.weights, dweights, epoch_num, batch_num, batch_size)
self.bias = optimizer(self.weight_optimizer).update(self.bias, dbias, epoch_num, batch_num, batch_size)
# endif self.is_trainable
# dinputs
for b in np.arange(input_num): # batch number
for f in np.arange(self.filter_num): # filter number
for c in np.arange(input_depth): # input depth (channels)
for h in np.arange(self.kernel_size[0]): # kernel height
for w in np.arange(self.kernel_size[1]): # kernel width
h_stride, w_stride = h * self.strides[0], w * self.strides[1]
dinputs[b,
c,
h_stride: h_stride + self.kernel_size[0],
w_stride: w_stride + self.kernel_size[1]] += self.weights[f, c] * grad[b, f, h, w]
return dinputs
[docs]class ConvToeplitzMat(Conv):
def __init__(self,
filters = 32,
kernel_size = (3, 3),
activation = None,
input_shape = (1, 8, 8),
strides = (1, 1),
padding = 'valid'):
super(ConvToeplitzMat, self).__init__(filters, kernel_size, activation, input_shape, strides, padding)
[docs] def pass_forward(self, inputs, train_mode = True, **kwargs):
self.filter_num, _, _, _ = self.weights.shape
self.input_shape = inputs.shape
self.inputs = inputs
input_num, input_depth, input_height, input_width = inputs.shape
pad_height, pad_width = get_pad(self.padding,
input_height,
input_width,
self.strides[0],
self.strides[1],
self.kernel_size[0],
self.kernel_size[1])
x_padded = np.pad(self.inputs, ((0, 0), (0, 0), pad_height, pad_width), mode = 'constant')
# confirm dimensions
assert (input_height + np.sum(pad_height) - self.kernel_size[0]) % self.strides[0] == 0, 'height does not work'
assert (input_width + np.sum(pad_width) - self.kernel_size[1]) % self.strides[1] == 0, 'width does not work'
# compute output_height and output_width
output_height, output_width = get_output_dims(input_height, input_width, self.kernel_size, self.strides, self.padding)
output = np.zeros((input_num, self.filter_num, output_height, output_width))
self.input_col = unroll_inputs(x_padded,
x_padded.shape[0],
x_padded.shape[1],
output_height,
output_width,
self.kernel_size[0])
# TODO: weights need to be rearraged in a way to have a matrix
# multiplication with the generated toeplitz matrix
self.weight_col = self.weights.reshape(self.filter_num, -1)
# calculate ouput
output = self.weight_col @ self.input_col + self.bias
# output = np.matmul(self.weight_col, self.input_col) + self.bias
output = output.reshape(self.filter_num, int(output_height), int(output_width), input_num)
return output.transpose(3, 0, 1, 2)
[docs] def pass_backward(self, grad, epoch_num, batch_num, batch_size): pass