# Machine Learning Practical (INFR11119), # Pawel Swietojanski, University of Edinburgh import numpy import logging from mlp.costs import Cost logger = logging.getLogger(__name__) class MLP(object): """ This is a container for an arbitrary sequence of other transforms On top of this, the class also keeps the state of the model, i.e. the result of forward (activations) and backward (deltas) passes through the model (for a mini-batch), which is required to compute the gradients for the parameters """ def __init__(self, cost): assert isinstance(cost, Cost), ( "Cost needs to be of type mlp.costs.Cost, got %s" % type(cost) ) self.layers = [] #the actual list of network layers self.activations = [] #keeps forward-pass activations (h from equations) # for a given minibatch (or features at 0th index) self.deltas = [] #keeps back-propagated error signals (deltas from equations) # for a given minibatch and each layer self.cost = cost def fprop(self, x): """ :param inputs: mini-batch of data-points x :return: y (top layer activation) which is an estimate of y given x """ if len(self.activations) != len(self.layers) + 1: self.activations = [None]*(len(self.layers) + 1) self.activations[0] = x for i in xrange(0, len(self.layers)): self.activations[i+1] = self.layers[i].fprop(self.activations[i]) return self.activations[-1] def bprop(self, cost_grad): """ :param cost_grad: matrix -- grad of the cost w.r.t y :return: None, the deltas are kept in the model """ # allocate the list of deltas for each layer # note, we do not use all of those fields but # want to keep it aligned 1:1 with activations, # which will simplify indexing later on when # computing grads w.r.t parameters if len(self.deltas) != len(self.activations): self.deltas = [None]*len(self.activations) # treat the top layer in special way, as it deals with the # cost, which may lead to some simplifications top_layer_idx = len(self.layers) self.deltas[top_layer_idx], ograds = self.layers[top_layer_idx - 1].\ bprop_cost(self.activations[top_layer_idx], cost_grad, self.cost) # then back-prop through remaining layers for i in xrange(top_layer_idx - 1, 0, -1): self.deltas[i], ograds = self.layers[i - 1].\ bprop(self.activations[i], ograds) def add_layer(self, layer): self.layers.append(layer) def set_layers(self, layers): self.layers = layers def get_name(self): return 'mlp' class Layer(object): """ Abstract class defining an interface for other transforms. """ def __init__(self, rng=None): if rng is None: seed=[2015, 10, 1] self.rng = numpy.random.RandomState(seed) else: self.rng = rng def fprop(self, inputs): """ Implements a forward propagation through the i-th layer, that is some form of: a^i = xW^i + b^i h^i = f^i(a^i) with f^i, W^i, b^i denoting a non-linearity, weight matrix and biases at the i-th layer, respectively and x denoting inputs. :param inputs: matrix of features (x) or the output of the previous layer h^{i-1} :return: h^i, matrix of transformed by layer features """ raise NotImplementedError() def bprop(self, h, igrads): """ Implements a backward propagation through the layer, that is, given h^i denotes the output of the layer and x^i the input, we compute: dh^i/dx^i which by chain rule is dh^i/da^i da^i/dx^i x^i could be either features (x) or the output of the lower layer h^{i-1} :param h: it's an activation produced in forward pass :param igrads, error signal (or gradient) flowing to the layer, note, this in general case does not corresponds to 'deltas' used to update the layer's parameters, to get deltas ones need to multiply it with the dh^i/da^i derivative :return: a tuple (deltas, ograds) where: deltas = igrads * dh^i/da^i ograds = deltas \times da^i/dx^i """ raise NotImplementedError() def bprop_cost(self, h, igrads, cost=None): """ Implements a backward propagation in case the layer directly deals with the optimised cost (i.e. the top layer) By default, method should implement a back-prop for default cost, that is the one that is natural to the layer's output, i.e.: linear -> mse, softmax -> cross-entropy, sigmoid -> binary cross-entropy :param h: it's an activation produced in forward pass :param igrads, error signal (or gradient) flowing to the layer, note, this in general case does not corresponds to 'deltas' used to update the layer's parameters, to get deltas ones need to multiply it with the dh^i/da^i derivative :return: a tuple (deltas, ograds) where: deltas = igrads * dh^i/da^i ograds = deltas \times da^i/dx^i """ raise NotImplementedError() def pgrads(self, inputs, deltas, **kwargs): """ Return gradients w.r.t parameters """ raise NotImplementedError() def get_params(self): raise NotImplementedError() def set_params(self): raise NotImplementedError() def get_name(self): return 'abstract_layer' class Linear(Layer): def __init__(self, idim, odim, rng=None, irange=0.1): super(Linear, self).__init__(rng=rng) self.idim = idim self.odim = odim self.W = self.rng.uniform( -irange, irange, (self.idim, self.odim)) self.b = numpy.zeros((self.odim,), dtype=numpy.float32) def fprop(self, inputs): """ Implements a forward propagation through the i-th layer, that is some form of: a^i = xW^i + b^i h^i = f^i(a^i) with f^i, W^i, b^i denoting a non-linearity, weight matrix and biases of this (i-th) layer, respectively and x denoting inputs. :param inputs: matrix of features (x) or the output of the previous layer h^{i-1} :return: h^i, matrix of transformed by layer features """ a = numpy.dot(inputs, self.W) + self.b # here f() is an identity function, so just return a linear transformation return a def bprop(self, h, igrads): """ Implements a backward propagation through the layer, that is, given h^i denotes the output of the layer and x^i the input, we compute: dh^i/dx^i which by chain rule is dh^i/da^i da^i/dx^i x^i could be either features (x) or the output of the lower layer h^{i-1} :param h: it's an activation produced in forward pass :param igrads, error signal (or gradient) flowing to the layer, note, this in general case does not corresponds to 'deltas' used to update the layer's parameters, to get deltas ones need to multiply it with the dh^i/da^i derivative :return: a tuple (deltas, ograds) where: deltas = igrads * dh^i/da^i ograds = deltas \times da^i/dx^i """ # since df^i/da^i = 1 (f is assumed identity function), # deltas are in fact the same as igrads ograds = numpy.dot(igrads, self.W.T) return igrads, ograds def bprop_cost(self, h, igrads, cost): """ Implements a backward propagation in case the layer directly deals with the optimised cost (i.e. the top layer) By default, method should implement a bprop for default cost, that is the one that is natural to the layer's output, i.e.: here we implement linear -> mse scenario :param h: it's an activation produced in forward pass :param igrads, error signal (or gradient) flowing to the layer, note, this in general case does not corresponds to 'deltas' used to update the layer's parameters, to get deltas ones need to multiply it with the dh^i/da^i derivative :param cost, mlp.costs.Cost instance defining the used cost :return: a tuple (deltas, ograds) where: deltas = igrads * dh^i/da^i ograds = deltas \times da^i/dx^i """ if cost is None or cost.get_name() == 'mse': # for linear layer and mean square error cost, # cost back-prop is the same as standard back-prop return self.bprop(h, igrads) else: raise NotImplementedError('Linear.bprop_cost method not implemented ' 'for the %s cost' % cost.get_name()) def pgrads(self, inputs, deltas, l1_weight=0, l2_weight=0): """ Return gradients w.r.t parameters :param inputs, input to the i-th layer :param deltas, deltas computed in bprop stage up to -ith layer :param kwargs, key-value optional arguments :return list of grads w.r.t parameters dE/dW and dE/db in *exactly* the same order as the params are returned by get_params() Note: deltas here contain the whole chain rule leading from the cost up to the the i-th layer, i.e. dE/dy^L dy^L/da^L da^L/dh^{L-1} dh^{L-1}/da^{L-1} ... dh^{i}/da^{i} and here we are just asking about 1) da^i/dW^i and 2) da^i/db^i since W and b are only layer's parameters """ l2_W_penalty, l2_b_penalty = 0, 0 if l2_weight > 0: l2_W_penalty = l2_weight*self.W l2_b_penalty = l2_weight*self.b l1_W_penalty, l1_b_penalty = 0, 0 if l1_weight > 0: l1_W_penalty = l1_weight*numpy.sign(self.W) l1_b_penalty = l1_weight*numpy.sign(self.b) grad_W = numpy.dot(inputs.T, deltas) + l2_W_penalty + l1_W_penalty grad_b = numpy.sum(deltas, axis=0) + l2_b_penalty + l1_b_penalty return [grad_W, grad_b] def get_params(self): return [self.W, self.b] def set_params(self, params): #we do not make checks here, but the order on the list #is assumed to be exactly the same as get_params() returns self.W = params[0] self.b = params[1] def get_name(self): return 'linear' class Sigmoid(Linear): def __init__(self, idim, odim, rng=None, irange=0.1): super(Sigmoid, self).__init__(idim, odim, rng, irange) def fprop(self, inputs): #get the linear activations a = super(Sigmoid, self).fprop(inputs) #stabilise the exp() computation in case some values in #'a' get very negative. We limit both tails, however only #negative values may lead to numerical issues -- exp(-a) #clip() function does the following operation faster: # a[a < -30.] = 30, # a[a > 30.] = 30. numpy.clip(a, -30.0, 30.0, out=a) h = 1.0/(1 + numpy.exp(-a)) return h def bprop(self, h, igrads): dsigm = h * (1.0 - h) deltas = igrads * dsigm ___, ograds = super(Sigmoid, self).bprop(h=None, igrads=deltas) return deltas, ograds def cost_bprop(self, h, igrads, cost): if cost is None or cost.get_name() == 'bce': return super(Sigmoid, self).bprop(h=h, igrads=igrads) else: raise NotImplementedError('Sigmoid.bprop_cost method not implemented ' 'for the %s cost' % cost.get_name()) def get_name(self): return 'sigmoid' class Softmax(Linear): def __init__(self,idim, odim, rng=None, irange=0.1): super(Softmax, self).__init__(idim, odim, rng=rng, irange=irange) def fprop(self, inputs): # compute the linear outputs a = super(Softmax, self).fprop(inputs) # apply numerical stabilisation by subtracting max # from each row (not required for the coursework) # then compute exponent assert a.ndim in [1, 2], ( "Expected the linear activation in Softmax layer to be either " "vector or matrix, got %ith dimensional tensor" % a.ndim ) axis = a.ndim - 1 exp_a = numpy.exp(a - numpy.max(a, axis=axis, keepdims=True)) # finally, normalise by the sum within each example y = exp_a/numpy.sum(exp_a, axis=axis, keepdims=True) return y def bprop(self, h, igrads): raise NotImplementedError() def bprop_cost(self, h, igrads, cost): if cost is None or cost.get_name() == 'ce': return super(Softmax, self).bprop(h=h, igrads=igrads) else: raise NotImplementedError('Softmax.bprop_cost method not implemented ' 'for %s cost' % cost.get_name()) def get_name(self): return 'softmax'