Initial refactor of framework.

This commit is contained in:
Matt Graham 2016-09-19 07:31:31 +01:00
parent 58452a0de0
commit 89beb62fce
6 changed files with 630 additions and 560 deletions

View File

@ -1,64 +1,105 @@
# Machine Learning Practical (INFR11119),
# Pawel Swietojanski, University of Edinburgh
"""Model costs."""
import numpy
import numpy as np
class Cost(object):
class MeanSquaredErrorCost(object):
"""
Defines an interface for the cost object
"""
def cost(self, y, t, **kwargs):
"""
Implements a cost for monitoring purposes
:param y: matrix -- an output of the model
:param t: matrix -- an expected output the model should produce
:param kwargs: -- some optional parameters required by the cost
:return: the scalar value representing the cost given y and t
"""
raise NotImplementedError()
def grad(self, y, t, **kwargs):
"""
Implements a gradient of the cost w.r.t y
:param y: matrix -- an output of the model
:param t: matrix -- an expected output the model should produce
:param kwargs: -- some optional parameters required by the cost
:return: matrix - the gradient of the cost w.r.t y
"""
raise NotImplementedError()
def __call__(self, outputs, targets):
return 0.5 * np.mean(np.sum((outputs - targets)**2, axis=1))
def get_name(self):
return 'cost'
def grad(self, outputs, targets):
return outputs - targets
def __repr__(self):
return 'MeanSquaredErrorCost'
class MSECost(Cost):
def cost(self, y, t, **kwargs):
se = 0.5*numpy.sum((y - t)**2, axis=1)
return numpy.mean(se)
def grad(self, y, t, **kwargs):
return y - t
def get_name(self):
return 'mse'
class CECost(Cost):
class BinaryCrossEntropyCost(object):
"""
Cross Entropy (Negative log-likelihood) cost for multiple classes
"""
def cost(self, y, t, **kwargs):
#assumes t is 1-of-K coded and y is a softmax
#transformed estimate at the output layer
nll = t * numpy.log(y)
return -numpy.mean(numpy.sum(nll, axis=1))
def grad(self, y, t, **kwargs):
#assumes t is 1-of-K coded and y is a softmax
#transformed estimate at the output layer
return y - t
def __call__(self, outputs, targets):
return -np.mean(
targets * np.log(outputs) + (1. - targets) * np.log(1. - ouputs))
def get_name(self):
return 'ce'
def grad(self, outputs, targets):
return (1. - targets) / (1. - outputs) - (targets / outputs)
def __repr__(self):
return 'BinaryCrossEntropyCost'
class BinaryCrossEntropySigmoidCost(object):
"""
"""
def __call__(self, outputs, targets):
probs = 1. / (1. + np.exp(-outputs))
return -np.mean(
targets * np.log(probs) + (1. - targets) * np.log(1. - probs))
def grad(self, outputs, targets):
probs = 1. / (1. + np.exp(-outputs))
return probs - targets
def __repr__(self):
return 'BinaryCrossEntropySigmoidCost'
class BinaryAccuracySigmoidCost(object):
"""
"""
def __call__(self, outputs, targets):
return ((outputs > 0) == targets).mean()
def ___repr__(self):
return 'BinaryAccuracySigmoidCost'
class CrossEntropyCost(object):
"""
"""
def __call__(self, outputs, targets):
return -np.mean(np.sum(targets * np.log(outputs), axis=1))
def grad(self, outputs, targets):
return -targets / outputs
def __repr__(self):
return 'CrossEntropyCost'
class CrossEntropySoftmaxCost(object):
"""
"""
def __call__(self, outputs, targets):
probs = np.exp(outputs)
probs /= probs.sum(-1)[:, None]
return -np.mean(np.sum(targets * np.log(probs), axis=1))
def grad(self, outputs, targets):
probs = np.exp(outputs)
probs /= probs.sum(-1)[:, None]
return probs - targets
def __repr__(self):
return 'CrossEntropySoftmaxCost'
class MulticlassAccuracySoftmaxCost(object):
"""
"""
def __call__(self, outputs, targets):
probs = np.exp(outputs)
return np.mean(np.argmax(probs, -1) == np.argmax(targets, -1))
def __repr__(self):
return 'MulticlassAccuracySoftmaxCost'

66
mlp/initialisers.py Normal file
View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
"""Parameter initialisers.
This module defines classes to initialise the parameters in a layer.
"""
import numpy as np
DEFAULT_SEED = 123456 # Default random number generator seed if none provided.
class ConstantInit(object):
"""Constant parameter initialiser."""
def __init__(self, value):
"""Construct a constant parameter initialiser.
Args:
value: Value to initialise parameter to.
"""
self.value = value
def __call__(self, shape):
return np.ones(shape=shape) * self.value
class UniformInit(object):
"""Random uniform parameter initialiser."""
def __init__(self, low, high, rng=None):
"""Construct a random uniform parameter initialiser.
Args:
low: Lower bound of interval to sample from.
high: Upper bound of interval to sample from.
rng (RandomState): Seeded random number generator.
"""
self.low = low
self.high = high
if rng is None:
rng = np.random.RandomState(DEFAULT_SEED)
self.rng = rng
def __call__(self, shape):
return self.rng.uniform(low=self.low, high=self.high, size=shape)
class NormalInit(object):
"""Random normal parameter initialiser."""
def __init__(self, mean, std, rng=None):
"""Construct a random uniform parameter initialiser.
Args:
mean: Mean of distribution to sample from.
std: Standard deviation of distribution to sample from.
rng (RandomState): Seeded random number generator.
"""
self.mean = mean
self.std = std
if rng is None:
rng = np.random.RandomState(DEFAULT_SEED)
self.rng = rng
def __call__(self, shape):
return self.rng.normal(loc=self.mean, scale=self.std, size=shape)

View File

@ -1,561 +1,325 @@
# -*- coding: utf-8 -*-
"""Layer definitions.
# Machine Learning Practical (INFR11119),
# Pawel Swietojanski, University of Edinburgh
This module defines classes which encapsulate a single layer.
import numpy
import logging
from mlp.costs import Cost
These layers map input activations to output activation with the `fprop`
method and map gradients with repsect to outputs to gradients with respect to
their inputs with the `bprop` method.
Some layers will have learnable parameters and so will additionally define
methods for getting and setting parameter and calculating gradients with
respect to the layer parameters.
"""
logger = logging.getLogger(__name__)
def max_and_argmax(x, axes=None, keepdims_max=False, keepdims_argmax=False):
"""
Return both max and argmax for the given multi-dimensional array, possibly
preserve the original shapes
:param x: input tensor
:param axes: tuple of ints denoting axes across which
one should perform reduction
:param keepdims_max: boolean, if true, shape of x is preserved in result
:param keepdims_argmax:, boolean, if true, shape of x is preserved in result
:return: max (number) and argmax (indices) of max element along certain axes
in multi-dimensional tensor
"""
if axes is None:
rval_argmax = numpy.argmax(x)
if keepdims_argmax:
rval_argmax = numpy.unravel_index(rval_argmax, x.shape)
else:
if isinstance(axes, int):
axes = (axes,)
axes = tuple(axes)
keep_axes = numpy.array([i for i in range(x.ndim) if i not in axes])
transposed_x = numpy.transpose(x, numpy.concatenate((keep_axes, axes)))
reshaped_x = transposed_x.reshape(transposed_x.shape[:len(keep_axes)] + (-1,))
rval_argmax = numpy.asarray(numpy.argmax(reshaped_x, axis=-1), dtype=numpy.int64)
# rval_max_arg keeps the arg index referencing to the axis along which reduction was performed (axis=-1)
# when keepdims_argmax is True we need to map it back to the original shape of tensor x
# print 'rval maxaarg', rval_argmax.ndim, rval_argmax.shape, rval_argmax
if keepdims_argmax:
dim = tuple([x.shape[a] for a in axes])
rval_argmax = numpy.array([idx + numpy.unravel_index(val, dim)
for idx, val in numpy.ndenumerate(rval_argmax)])
# convert to numpy indexing convention (row indices first, then columns)
rval_argmax = zip(*rval_argmax)
if keepdims_max is False and keepdims_argmax is True:
# this could potentially save O(N) steps by not traversing array once more
# to get max value, haven't benchmark it though
rval_max = x[rval_argmax]
else:
rval_max = numpy.asarray(numpy.amax(x, axis=axes, keepdims=keepdims_max))
return rval_max, rval_argmax
class MLP(object):
"""
This is a container for an arbitrary sequence of other transforms
On top of this, the class also keeps the state of the model, i.e.
the result of forward (activations) and backward (deltas) passes
through the model (for a mini-batch), which is required to compute
the gradients for the parameters
"""
def __init__(self, cost, rng=None):
assert isinstance(cost, Cost), (
"Cost needs to be of type mlp.costs.Cost, got %s" % type(cost)
)
self.layers = [] #the actual list of network layers
self.activations = [] #keeps forward-pass activations (h from equations)
# for a given minibatch (or features at 0th index)
self.deltas = [] #keeps back-propagated error signals (deltas from equations)
# for a given minibatch and each layer
self.cost = cost
if rng is None:
self.rng = numpy.random.RandomState([2015,11,11])
else:
self.rng = rng
def fprop(self, x):
"""
:param inputs: mini-batch of data-points x
:return: y (top layer activation) which is an estimate of y given x
"""
if len(self.activations) != len(self.layers) + 1:
self.activations = [None]*(len(self.layers) + 1)
self.activations[0] = x
for i in xrange(0, len(self.layers)):
self.activations[i+1] = self.layers[i].fprop(self.activations[i])
return self.activations[-1]
def fprop_dropout(self, x, dp_scheduler):
"""
:param inputs: mini-batch of data-points x
:param dp_scheduler: dropout scheduler
:return: y (top layer activation) which is an estimate of y given x
"""
if len(self.activations) != len(self.layers) + 1:
self.activations = [None]*(len(self.layers) + 1)
p_inp, p_hid = dp_scheduler.get_rate()
d_inp = 1
p_inp_scaler, p_hid_scaler = 1.0/p_inp, 1.0/p_hid
if p_inp < 1:
d_inp = self.rng.binomial(1, p_inp, size=x.shape)
self.activations[0] = p_inp_scaler*d_inp*x #it's OK to scale the inputs by p_inp_scaler here
self.activations[1] = self.layers[0].fprop(self.activations[0])
for i in xrange(1, len(self.layers)):
d_hid = 1
if p_hid < 1:
d_hid = self.rng.binomial(1, p_hid, size=self.activations[i].shape)
self.activations[i] *= d_hid #but not the hidden activations, since the non-linearity grad *may* explicitly depend on them
self.activations[i+1] = self.layers[i].fprop(p_hid_scaler*self.activations[i])
return self.activations[-1]
def bprop(self, cost_grad, dp_scheduler=None):
"""
:param cost_grad: matrix -- grad of the cost w.r.t y
:return: None, the deltas are kept in the model
"""
# allocate the list of deltas for each layer
# note, we do not use all of those fields but
# want to keep it aligned 1:1 with activations,
# which will simplify indexing later on when
# computing grads w.r.t parameters
if len(self.deltas) != len(self.activations):
self.deltas = [None]*len(self.activations)
# treat the top layer in special way, as it deals with the
# cost, which may lead to some simplifications
top_layer_idx = len(self.layers)
self.deltas[top_layer_idx], ograds = self.layers[top_layer_idx - 1].\
bprop_cost(self.activations[top_layer_idx], cost_grad, self.cost)
p_hid_scaler = 1.0
if dp_scheduler is not None:
p_inp, p_hid = dp_scheduler.get_rate()
p_hid_scaler /= p_hid
# then back-prop through remaining layers
for i in xrange(top_layer_idx - 1, 0, -1):
self.deltas[i], ograds = self.layers[i - 1].\
bprop(self.activations[i], ograds*p_hid_scaler)
def add_layer(self, layer):
self.layers.append(layer)
def set_layers(self, layers):
self.layers = layers
def get_name(self):
return 'mlp'
import numpy as np
import mlp.initialisers as init
class Layer(object):
"""Abstract class defining the interface for a layer."""
def fprop(self, inputs):
"""Forward propagates activations through the layer transformation.
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
Returns:
outputs: Array of layer outputs of shape (batch_size, output_dim).
"""
raise NotImplementedError()
def bprop(self, inputs, outputs, grads_wrt_outputs):
"""Back propagates gradients through a layer.
Given gradients with respect to the outputs of the layer calculates the
gradients with respect to the layer inputs.
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
outputs: Array of layer outputs calculated in forward pass of
shape (batch_size, output_dim).
grads_wrt_outputs: Array of gradients with respect to the layer
outputs of shape (batch_size, output_dim).
Returns:
Array of gradients with respect to the layer inputs of shape
(batch_size, input_dim).
"""
raise NotImplementedError()
class LayerWithParameters(Layer):
"""Abstract class defining the interface for a layer with parameters."""
def grads_wrt_params(self, inputs, grads_wrt_outputs):
"""Calculates gradients with respect to layer parameters.
Args:
inputs: Array of inputs to layer of shape (batch_size, input_dim).
grads_wrt_to_outputs: Array of gradients with respect to the layer
outputs of shape (batch_size, output_dim).
Returns:
List of arrays of gradients with respect to the layer parameters
with parameter gradients appearing in same order in tuple as
returned from `get_params` method.
"""
raise NotImplementedError()
def params_cost(self):
"""Returns the parameter dependent cost term for this layer.
If no parameter-dependent cost terms are set this returns zero.
"""
raise NotImplementedError()
@property
def params(self):
"""Returns a list of parameters of layer.
Returns:
List of current parameter values. This list should be in the
corresponding order to the `values` argument to `set_params`.
"""
raise NotImplementedError()
@params.setter
def params(self, values):
"""Sets layer parameters from a list of values.
Args:
values: List of values to set parameters to. This list should be
in the corresponding order to what is returned by `get_params`.
"""
raise NotImplementedError()
class AffineLayer(LayerWithParameters):
"""Layer implementing an affine tranformation of its inputs.
This layer is parameterised by a weight matrix and bias vector.
"""
Abstract class defining an interface for
other transforms.
"""
def __init__(self, rng=None):
if rng is None:
seed=[2015, 10, 1]
self.rng = numpy.random.RandomState(seed)
else:
self.rng = rng
def __init__(self, input_dim, output_dim,
weights_initialiser=init.UniformInit(-0.1, 0.1),
biases_initialiser=init.ConstantInit(0.),
weights_cost=None, biases_cost=None):
"""Initialises a parameterised affine layer.
Args:
input_dim (int): Dimension of inputs to the layer.
output_dim (int): Dimension of the layer outputs.
weights_initialiser: Initialiser for the weight parameters.
biases_initialiser: Initialiser for the bias parameters.
weights_cost: Weights-dependent cost term.
biases_cost: Biases-dependent cost term.
"""
self.input_dim = input_dim
self.output_dim = output_dim
self.weights = weights_initialiser((self.output_dim, self.input_dim))
self.biases = biases_initialiser(self.output_dim)
self.weights_cost = weights_cost
self.biases_cost = biases_cost
def fprop(self, inputs):
"""
Implements a forward propagation through the i-th layer, that is
some form of:
a^i = xW^i + b^i
h^i = f^i(a^i)
with f^i, W^i, b^i denoting a non-linearity, weight matrix and
biases at the i-th layer, respectively and x denoting inputs.
"""Forward propagates activations through the layer transformation.
:param inputs: matrix of features (x) or the output of the previous layer h^{i-1}
:return: h^i, matrix of transformed by layer features
"""
raise NotImplementedError()
For inputs `x`, outputs `y`, weights `W` and biases `b` the layer
corresponds to `y = W.dot(x) + b`.
def bprop(self, h, igrads):
"""
Implements a backward propagation through the layer, that is, given
h^i denotes the output of the layer and x^i the input, we compute:
dh^i/dx^i which by chain rule is dh^i/da^i da^i/dx^i
x^i could be either features (x) or the output of the lower layer h^{i-1}
:param h: it's an activation produced in forward pass
:param igrads, error signal (or gradient) flowing to the layer, note,
this in general case does not corresponds to 'deltas' used to update
the layer's parameters, to get deltas ones need to multiply it with
the dh^i/da^i derivative
:return: a tuple (deltas, ograds) where:
deltas = igrads * dh^i/da^i
ograds = deltas \times da^i/dx^i
"""
raise NotImplementedError()
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
def bprop_cost(self, h, igrads, cost=None):
Returns:
outputs: Array of layer outputs of shape (batch_size, output_dim).
"""
Implements a backward propagation in case the layer directly
deals with the optimised cost (i.e. the top layer)
By default, method should implement a back-prop for default cost, that is
the one that is natural to the layer's output, i.e.:
linear -> mse, softmax -> cross-entropy, sigmoid -> binary cross-entropy
:param h: it's an activation produced in forward pass
:param igrads, error signal (or gradient) flowing to the layer, note,
this in general case does not corresponds to 'deltas' used to update
the layer's parameters, to get deltas ones need to multiply it with
the dh^i/da^i derivative
:return: a tuple (deltas, ograds) where:
deltas = igrads * dh^i/da^i
ograds = deltas \times da^i/dx^i
return self.weights.dot(inputs.T).T + self.biases
def bprop(self, inputs, outputs, grads_wrt_outputs):
"""Back propagates gradients through a layer.
Given gradients with respect to the outputs of the layer calculates the
gradients with respect to the layer inputs.
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
outputs: Array of layer outputs calculated in forward pass of
shape (batch_size, output_dim).
grads_wrt_outputs: Array of gradients with respect to the layer
outputs of shape (batch_size, output_dim).
Returns:
Array of gradients with respect to the layer inputs of shape
(batch_size, input_dim).
"""
return grads_wrt_outputs.dot(self.weights)
def grads_wrt_params(self, inputs, grads_wrt_outputs):
"""Calculates gradients with respect to layer parameters.
Args:
inputs: array of inputs to layer of shape (batch_size, input_dim)
grads_wrt_to_outputs: array of gradients with respect to the layer
outputs of shape (batch_size, output_dim)
Returns:
list of arrays of gradients with respect to the layer parameters
`[grads_wrt_weights, grads_wrt_biases]`.
"""
raise NotImplementedError()
grads_wrt_weights = np.dot(grads_wrt_outputs.T, inputs)
grads_wrt_biases = np.sum(grads_wrt_outputs, axis=0)
def pgrads(self, inputs, deltas, **kwargs):
if self.weights_cost is not None:
grads_wrt_weights += self.weights_cost.grad(self.weights)
if self.biases_cost is not None:
grads_wrt_biases += self.biases_cost.grads(self.biases)
return [grads_wrt_weights, grads_wrt_biases]
def params_cost(self):
"""Returns the parameter dependent cost term for this layer.
If no parameter-dependent cost terms are set this returns zero.
"""
Return gradients w.r.t parameters
"""
raise NotImplementedError()
params_cost = 0
if self.weights_cost is not None:
params_cost += self.weights_cost(self.weights)
if self.biases_cost is not None:
params_cost += self.biases_cost(self.biases)
return params_cost
def get_params(self):
raise NotImplementedError()
@property
def params(self):
"""A list of layer parameter values: `[weights, biases]`."""
return [self.weights, self.biases]
def set_params(self):
raise NotImplementedError()
@params.setter
def params(self, values):
self.weights = values[0]
self.biases = values[1]
def get_name(self):
return 'abstract_layer'
def __repr__(self):
return 'AffineLayer(input_dim={0}, output_dim={1})'.format(
self.input_dim, self.output_dim)
class Linear(Layer):
def __init__(self, idim, odim,
rng=None,
irange=0.1):
super(Linear, self).__init__(rng=rng)
self.idim = idim
self.odim = odim
self.W = self.rng.uniform(
-irange, irange,
(self.idim, self.odim))
self.b = numpy.zeros((self.odim,), dtype=numpy.float32)
class SigmoidLayer(Layer):
"""Layer implementing an element-wise logistic sigmoid transformation."""
def fprop(self, inputs):
"""Forward propagates activations through the layer transformation.
For inputs `x` and outputs `y` this corresponds to
`y = 1 / (1 + exp(-x))`.
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
Returns:
outputs: Array of layer outputs of shape (batch_size, output_dim).
"""
Implements a forward propagation through the i-th layer, that is
some form of:
a^i = xW^i + b^i
h^i = f^i(a^i)
with f^i, W^i, b^i denoting a non-linearity, weight matrix and
biases of this (i-th) layer, respectively and x denoting inputs.
return 1. / (1. + np.exp(-inputs))
:param inputs: matrix of features (x) or the output of the previous layer h^{i-1}
:return: h^i, matrix of transformed by layer features
def bprop(self, inputs, outputs, grads_wrt_outputs):
"""Back propagates gradients through a layer.
Given gradients with respect to the outputs of the layer calculates the
gradients with respect to the layer inputs.
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
outputs: Array of layer outputs calculated in forward pass of
shape (batch_size, output_dim).
grads_wrt_outputs: Array of gradients with respect to the layer
outputs of shape (batch_size, output_dim).
Returns:
Array of gradients with respect to the layer inputs of shape
(batch_size, input_dim).
"""
return grads_wrt_outputs * outputs * (1. - outputs)
#input comes from 4D convolutional tensor, reshape to expected shape
if inputs.ndim == 4:
inputs = inputs.reshape(inputs.shape[0], -1)
a = numpy.dot(inputs, self.W) + self.b
# here f() is an identity function, so just return a linear transformation
return a
def bprop(self, h, igrads):
"""
Implements a backward propagation through the layer, that is, given
h^i denotes the output of the layer and x^i the input, we compute:
dh^i/dx^i which by chain rule is dh^i/da^i da^i/dx^i
x^i could be either features (x) or the output of the lower layer h^{i-1}
:param h: it's an activation produced in forward pass
:param igrads, error signal (or gradient) flowing to the layer, note,
this in general case does not corresponds to 'deltas' used to update
the layer's parameters, to get deltas ones need to multiply it with
the dh^i/da^i derivative
:return: a tuple (deltas, ograds) where:
deltas = igrads * dh^i/da^i
ograds = deltas \times da^i/dx^i
"""
# since df^i/da^i = 1 (f is assumed identity function),
# deltas are in fact the same as igrads
ograds = numpy.dot(igrads, self.W.T)
return igrads, ograds
def bprop_cost(self, h, igrads, cost):
"""
Implements a backward propagation in case the layer directly
deals with the optimised cost (i.e. the top layer)
By default, method should implement a bprop for default cost, that is
the one that is natural to the layer's output, i.e.:
here we implement linear -> mse scenario
:param h: it's an activation produced in forward pass
:param igrads, error signal (or gradient) flowing to the layer, note,
this in general case does not corresponds to 'deltas' used to update
the layer's parameters, to get deltas ones need to multiply it with
the dh^i/da^i derivative
:param cost, mlp.costs.Cost instance defining the used cost
:return: a tuple (deltas, ograds) where:
deltas = igrads * dh^i/da^i
ograds = deltas \times da^i/dx^i
"""
if cost is None or cost.get_name() == 'mse':
# for linear layer and mean square error cost,
# cost back-prop is the same as standard back-prop
return self.bprop(h, igrads)
else:
raise NotImplementedError('Linear.bprop_cost method not implemented '
'for the %s cost' % cost.get_name())
def pgrads(self, inputs, deltas, l1_weight=0, l2_weight=0):
"""
Return gradients w.r.t parameters
:param inputs, input to the i-th layer
:param deltas, deltas computed in bprop stage up to -ith layer
:param kwargs, key-value optional arguments
:return list of grads w.r.t parameters dE/dW and dE/db in *exactly*
the same order as the params are returned by get_params()
Note: deltas here contain the whole chain rule leading
from the cost up to the the i-th layer, i.e.
dE/dy^L dy^L/da^L da^L/dh^{L-1} dh^{L-1}/da^{L-1} ... dh^{i}/da^{i}
and here we are just asking about
1) da^i/dW^i and 2) da^i/db^i
since W and b are only layer's parameters
"""
#input comes from 4D convolutional tensor, reshape to expected shape
if inputs.ndim == 4:
inputs = inputs.reshape(inputs.shape[0], -1)
#you could basically use different scalers for biases
#and weights, but it is not implemented here like this
l2_W_penalty, l2_b_penalty = 0, 0
if l2_weight > 0:
l2_W_penalty = l2_weight*self.W
l2_b_penalty = l2_weight*self.b
l1_W_penalty, l1_b_penalty = 0, 0
if l1_weight > 0:
l1_W_penalty = l1_weight*numpy.sign(self.W)
l1_b_penalty = l1_weight*numpy.sign(self.b)
grad_W = numpy.dot(inputs.T, deltas) + l2_W_penalty + l1_W_penalty
grad_b = numpy.sum(deltas, axis=0) + l2_b_penalty + l1_b_penalty
return [grad_W, grad_b]
def get_params(self):
return [self.W, self.b]
def set_params(self, params):
#we do not make checks here, but the order on the list
#is assumed to be exactly the same as get_params() returns
self.W = params[0]
self.b = params[1]
def get_name(self):
return 'linear'
def __repr__(self):
return 'SigmoidLayer'
class Sigmoid(Linear):
def __init__(self, idim, odim,
rng=None,
irange=0.1):
super(Sigmoid, self).__init__(idim, odim, rng, irange)
class ReluLayer(Layer):
"""Layer implementing an element-wise rectified linear transformation."""
def fprop(self, inputs):
#get the linear activations
a = super(Sigmoid, self).fprop(inputs)
#stabilise the exp() computation in case some values in
#'a' get very negative. We limit both tails, however only
#negative values may lead to numerical issues -- exp(-a)
#clip() function does the following operation faster:
# a[a < -30.] = -30,
# a[a > 30.] = 30.
numpy.clip(a, -30.0, 30.0, out=a)
h = 1.0/(1 + numpy.exp(-a))
return h
"""Forward propagates activations through the layer transformation.
def bprop(self, h, igrads):
dsigm = h * (1.0 - h)
deltas = igrads * dsigm
___, ograds = super(Sigmoid, self).bprop(h=None, igrads=deltas)
return deltas, ograds
For inputs `x` and outputs `y` this corresponds to `y = max(0, x)`.
def bprop_cost(self, h, igrads, cost):
if cost is None or cost.get_name() == 'bce':
return super(Sigmoid, self).bprop(h=h, igrads=igrads)
else:
raise NotImplementedError('Sigmoid.bprop_cost method not implemented '
'for the %s cost' % cost.get_name())
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
def get_name(self):
return 'sigmoid'
Returns:
outputs: Array of layer outputs of shape (batch_size, output_dim).
"""
return np.maximum(inputs, 0.)
def bprop(self, inputs, outputs, grads_wrt_outputs):
"""Back propagates gradients through a layer.
Given gradients with respect to the outputs of the layer calculates the
gradients with respect to the layer inputs.
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
outputs: Array of layer outputs calculated in forward pass of
shape (batch_size, output_dim).
grads_wrt_outputs: Array of gradients with respect to the layer
outputs of shape (batch_size, output_dim).
Returns:
Array of gradients with respect to the layer inputs of shape
(batch_size, input_dim).
"""
return (outputs > 0) * grads_wrt_outputs
def __repr__(self):
return 'ReluLayer'
class Softmax(Linear):
def __init__(self,idim, odim,
rng=None,
irange=0.1):
super(Softmax, self).__init__(idim,
odim,
rng=rng,
irange=irange)
class TanhLayer(Layer):
"""Layer implementing an element-wise hyperbolic tangent transformation."""
def fprop(self, inputs):
"""Forward propagates activations through the layer transformation.
# compute the linear outputs
a = super(Softmax, self).fprop(inputs)
# apply numerical stabilisation by subtracting max
# from each row (not required for the coursework)
# then compute exponent
assert a.ndim in [1, 2], (
"Expected the linear activation in Softmax layer to be either "
"vector or matrix, got %ith dimensional tensor" % a.ndim
)
axis = a.ndim - 1
exp_a = numpy.exp(a - numpy.max(a, axis=axis, keepdims=True))
# finally, normalise by the sum within each example
y = exp_a/numpy.sum(exp_a, axis=axis, keepdims=True)
For inputs `x` and outputs `y` this corresponds to `y = tanh(x)`.
return y
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
def bprop(self, h, igrads):
raise NotImplementedError('Softmax.bprop not implemented for hidden layer.')
Returns:
outputs: Array of layer outputs of shape (batch_size, output_dim).
"""
return np.tanh(inputs)
def bprop_cost(self, h, igrads, cost):
def bprop(self, inputs, outputs, grads_wrt_outputs):
"""Back propagates gradients through a layer.
if cost is None or cost.get_name() == 'ce':
return super(Softmax, self).bprop(h=h, igrads=igrads)
else:
raise NotImplementedError('Softmax.bprop_cost method not implemented '
'for %s cost' % cost.get_name())
Given gradients with respect to the outputs of the layer calculates the
gradients with respect to the layer inputs.
def get_name(self):
return 'softmax'
Args:
inputs: Array of layer inputs of shape (batch_size, input_dim).
outputs: Array of layer outputs calculated in forward pass of
shape (batch_size, output_dim).
grads_wrt_outputs: Array of gradients with respect to the layer
outputs of shape (batch_size, output_dim).
Returns:
Array of gradients with respect to the layer inputs of shape
(batch_size, input_dim).
"""
return (1. - outputs**2) * grads_wrt_outputs
class Relu(Linear):
def __init__(self, idim, odim,
rng=None,
irange=0.1):
super(Relu, self).__init__(idim, odim, rng, irange)
def fprop(self, inputs):
#get the linear activations
a = super(Relu, self).fprop(inputs)
h = numpy.clip(a, 0, 20.0)
#h = numpy.maximum(a, 0)
return h
def bprop(self, h, igrads):
deltas = (h > 0)*igrads
___, ograds = super(Relu, self).bprop(h=None, igrads=deltas)
return deltas, ograds
def bprop_cost(self, h, igrads, cost):
raise NotImplementedError('Relu.bprop_cost method not implemented '
'for the %s cost' % cost.get_name())
def get_name(self):
return 'relu'
class Tanh(Linear):
def __init__(self, idim, odim,
rng=None,
irange=0.1):
super(Tanh, self).__init__(idim, odim, rng, irange)
def fprop(self, inputs):
#get the linear activations
a = super(Tanh, self).fprop(inputs)
numpy.clip(a, -30.0, 30.0, out=a)
h = numpy.tanh(a)
return h
def bprop(self, h, igrads):
deltas = (1.0 - h**2) * igrads
___, ograds = super(Tanh, self).bprop(h=None, igrads=deltas)
return deltas, ograds
def bprop_cost(self, h, igrads, cost):
raise NotImplementedError('Tanh.bprop_cost method not implemented '
'for the %s cost' % cost.get_name())
def get_name(self):
return 'tanh'
class Maxout(Linear):
def __init__(self, idim, odim, k,
rng=None,
irange=0.05):
super(Maxout, self).__init__(idim, odim*k, rng, irange)
self.max_odim = odim
self.k = k
def fprop(self, inputs):
#get the linear activations
a = super(Maxout, self).fprop(inputs)
ar = a.reshape(a.shape[0], self.max_odim, self.k)
h, h_argmax = max_and_argmax(ar, axes=2, keepdims_max=True, keepdims_argmax=True)
self.h_argmax = h_argmax
return h[:, :, 0] #get rid of the last reduced dimensison (of size 1)
def bprop(self, h, igrads):
#hack for dropout backprop (ignore dropped neurons). Note, this is not
#entirely correct when h fires at 0 exaclty (but is not dropped, in which case
#derivative should be 1). However, this is rather unlikely to happen (that h fires as 0)
#and probably can be ignored for now. Otherwise, one would have to keep the dropped unit
#indexes and zero grads according to them.
igrads = (h != 0)*igrads
#convert into the shape where upsampling is easier
igrads_up = igrads.reshape(igrads.shape[0], self.max_odim, 1)
#upsample to the linear dimension (but reshaped to (batch_size, maxed_num (1), pool_size)
igrads_up = numpy.tile(igrads_up, (1, 1, self.k))
#generate mask matrix and set to 1 maxed elements
mask = numpy.zeros_like(igrads_up)
mask[self.h_argmax] = 1.0
#do bprop through max operator and then reshape into 2D
deltas = (igrads_up * mask).reshape(igrads_up.shape[0], -1)
#and then do bprop thorough linear part
___, ograds = super(Maxout, self).bprop(h=None, igrads=deltas)
return deltas, ograds
def bprop_cost(self, h, igrads, cost):
raise NotImplementedError('Maxout.bprop_cost method not implemented '
'for the %s cost' % cost.get_name())
def get_name(self):
return 'maxout'
def __repr__(self):
return 'TanhLayer'

42
mlp/learning_rules.py Normal file
View File

@ -0,0 +1,42 @@
"""Learning rules."""
import numpy as np
class GradientDescentLearningRule(object):
def __init__(self, learning_rate=1e-3):
self.learning_rate = learning_rate
def initialise(self, params):
self.params = params
def reset(self):
pass
def update_params(self, grads_wrt_params):
for param, grad in zip(self.params, grads_wrt_params):
param -= self.learning_rate * grad
class MomentumLearningRule(object):
def __init__(self, learning_rate=1e-3, mom_coeff=0.9):
self.learning_rate = learning_rate
self.mom_coeff = mom_coeff
def initialise(self, params):
self.params = params
self.moms = []
for param in self.params:
self.moms.append(np.zeros_like(param))
def reset(self):
for mom in zip(self.moms):
mom *= 0.
def update_params(self, grads_wrt_params):
for param, mom, grad in zip(self.params, self.moms, grads_wrt_params):
mom *= self.mom_coeff
mom -= self.learning_rate * grad
param += mom

92
mlp/models.py Normal file
View File

@ -0,0 +1,92 @@
"""
Model definitions.
"""
from mlp.layers import LayerWithParameters
class SingleLayerModel(object):
"""
"""
def __init__(self, layer):
self.layer = layer
@property
def params(self):
"""
"""
return self.layer.params
def fprop(self, inputs):
"""
"""
activations = [inputs, self.layer.fprop(inputs)]
return activations
def grads_wrt_params(self, activations, grads_wrt_outputs):
"""
"""
return self.layer.grads_wrt_params(activations[0], grads_wrt_outputs)
def params_cost(self):
"""
"""
return self.layer.params_cost()
def __repr__(self):
return 'SingleLayerModel(' + str(layer) + ')'
class MultipleLayerModel(object):
"""
"""
def __init__(self, layers):
self.layers = layers
@property
def params(self):
"""
"""
params = []
for layer in self.layers:
if isinstance(layer, LayerWithParameters):
params += layer.params
return params
def fprop(self, inputs):
"""
"""
activations = [inputs]
for i, layer in enumerate(self.layers):
activations.append(self.layers[i].fprop(activations[i]))
return activations
def grads_wrt_params(self, activations, grads_wrt_outputs):
"""
"""
grads_wrt_params = []
for i, layer in enumerate(self.layers[::-1]):
inputs = activations[-i - 2]
outputs = activations[-i - 1]
grads_wrt_inputs = layer.bprop(inputs, outputs, grads_wrt_outputs)
if isinstance(layer, LayerWithParameters):
grads_wrt_params += layer.grads_wrt_params(
inputs, grads_wrt_outputs)[::-1]
grads_wrt_outputs = grads_wrt_inputs
return grads_wrt_params[::-1]
def params_cost(self):
"""
"""
params_cost = 0.
for layer in self.layers:
if isinstance(layer, LayerWithParameters):
params_cost += layer.params_cost()
return params_cost
def __repr__(self):
return (
'MultiLayerModel(\n ' +
'\n '.join([str(layer) for layer in self.layers]) +
'\n)'
)

65
mlp/trainers.py Normal file
View File

@ -0,0 +1,65 @@
"""Model trainers."""
import time
import logging
from collections import OrderedDict
logger = logging.getLogger(__name__)
class Trainer(object):
def __init__(self, model, cost, learning_rule, train_dataset,
valid_dataset=None):
self.model = model
self.cost = cost
self.learning_rule = learning_rule
self.learning_rule.initialise(self.model.params)
self.train_dataset = train_dataset
self.valid_dataset = valid_dataset
def do_training_epoch(self):
for inputs_batch, targets_batch in self.train_dataset:
activations = self.model.fprop(inputs_batch)
grads_wrt_outputs = self.cost.grad(activations[-1], targets_batch)
grads_wrt_params = self.model.grads_wrt_params(
activations, grads_wrt_outputs)
self.learning_rule.update_params(grads_wrt_params)
self.train_dataset.reset()
def data_cost(self, dataset):
cost = 0.
for inputs_batch, targets_batch in dataset:
activations = self.model.fprop(inputs_batch)
cost += self.cost(activations[-1], targets_batch)
dataset.reset()
return cost
def get_epoch_stats(self):
epoch_stats = OrderedDict()
epoch_stats['cost(train)'] = self.data_cost(
self.train_dataset)
epoch_stats['cost(valid)'] = self.data_cost(
self.valid_dataset)
epoch_stats['cost(param)'] = self.model.params_cost()
return epoch_stats
def log_stats(self, epoch, stats):
logger.info('Epoch {0}: {1}'.format(
epoch,
', '.join(['{0}={1:.3f}'.format(k, v) for (k, v) in stats.items()])
))
def train(self, n_epochs, stats_interval=5):
run_stats = []
for epoch in range(n_epochs):
start_time = time.clock()
self.do_training_epoch()
epoch_time = time.clock() - start_time
if epoch % stats_interval == 0:
stats = self.get_epoch_stats()
stats['time'] = epoch_time
self.log_stats(epoch, stats)
run_stats.append(stats.items())
return np.array(run_stats), stats.keys()