Source code for spynnaker.pyNN.models.neural_projections.connectors.pool_dense_connector

# Copyright (c) 2021 The University of Manchester
# Based on work Copyright (c) The University of Sussex,
# Garibaldi Pineda Garcia, James Turner, James Knight and Thomas Nowotny
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Iterable
import numpy
from spinn_utilities.overrides import overrides
from spinn_front_end_common.utilities.constants import (
    BYTES_PER_WORD, BYTES_PER_SHORT)
from pyNN.random import RandomDistribution
from spynnaker.pyNN.exceptions import SynapticConfigurationException
from .abstract_connector import AbstractConnector
from spinn_front_end_common.interface.ds import DataType
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spynnaker.pyNN.models.abstract_models import HasShapeKeyFields
from spynnaker.pyNN.data.spynnaker_data_view import SpynnakerDataView


_DIMENSION_SIZE = (2 * BYTES_PER_WORD) + (6 * BYTES_PER_SHORT)
_KEY_INFO_SIZE = 3 * BYTES_PER_WORD
_CONN_SIZE = _KEY_INFO_SIZE + (3 * BYTES_PER_WORD) + (2 * BYTES_PER_SHORT)
_DIM_DTYPE = [("mask", "uint32"), ("shift", "uint32"), ("pre_start", "uint16"),
              ("pre_in_post_start", "uint16"), ("pre_in_post_end", "uint16"),
              ("pre_in_post_shape", "uint16"), ("recip_pool_stride", "uint16"),
              ("_PADDING", "uint16")]


class PoolDenseConnector(AbstractConnector):
    """
    Where the pre- and post-synaptic populations are considered as a 2D
    array. Connect every post(row, column) neuron to many
    pre(row, column, kernel)
    through a (kernel) set of weights and/or delays.
    """

    __slots__ = [
        "__weights",
        "__pool_shape",
        "__pool_stride",
        "__positive_receptor_type",
        "__negative_receptor_type"
    ]

    def __init__(self, weights, pool_shape=None, pool_stride=None,
                 positive_receptor_type="excitatory",
                 negative_receptor_type="inhibitory", safe=True,
                 verbose=False, callback=None):
        """
        :param weights:
            The synaptic strengths. Can be:

            * single value: the same value will be used for all weights
            * :py:class:`list`: the total number of elements must be
              (number after pooling * number post)
            * :py:class:`~numpy.ndarray`: As above for list
            * :py:class:`~spynnaker.pyNN.RandomDistribution`:
              weights will be drawn at random
        :type weights:
            int or float or list(int or float) or ~numpy.ndarray or
            ~spynnaker.pyNN.RandomDistribution
        :param pool_shape:
            Shape of average pooling. If a single value is provided, it will
            be used for every dimension, otherwise must be the same number of
            values as there are dimensions in the source.
        :type pool_shape: int or tuple(int) or None
        :param pool_stride:
            Jumps between pooling regions. If a single value is provided, the
            same stride will be used for all dimensions, otherwise must be
            the same number of values as there are dimensions in the source.
            If `None`, and pool_shape is provided, pool_stride will be set to
            pool_shape.
        :type pool_stride: int or tuple(int) or None
        :param str positive_receptor_type:
            The receptor type to add the positive weights to.  By default this
            is "excitatory".
        :param str negative_receptor_type:
            The receptor type to add the negative weights to.  By default this
            is "inhibitory".
        :param bool safe: (ignored)
        :param bool verbose: (ignored)
        :param callable callback: (ignored)
        """
        super(PoolDenseConnector, self).__init__(
            safe=safe, callback=callback, verbose=verbose)

        self.__weights = numpy.array(weights)

        self.__pool_shape = pool_shape
        self.__pool_stride = pool_stride
        if self.__pool_stride is None:
            self.__pool_stride = self.__pool_shape

        self.__positive_receptor_type = positive_receptor_type
        self.__negative_receptor_type = negative_receptor_type

    @property
    def positive_receptor_type(self):
        """
        :rtype: str
        """
        return self.__positive_receptor_type

    @property
    def negative_receptor_type(self):
        """
        :rtype: str
        """
        return self.__negative_receptor_type

    @property
    def weights(self):
        """
        :rtype: ~numpy.ndarray
        """
        return self.__weights

    def __decode_weights(
            self, pre_shape, post_shape, pre_vertex_slice, post_vertex_slice):
        if isinstance(self.__weights, (int, float)):
            n_weights = self.__get_n_sub_weights(
                pre_vertex_slice, post_vertex_slice.n_atoms)
            return numpy.full(n_weights, self.__weights, dtype="float64")
        elif isinstance(self.__weights, Iterable):
            pre_in_post_shape = tuple(self.__get_pre_in_post_shape(pre_shape))
            all_weights = numpy.array(self.__weights, dtype="float64").reshape(
                pre_in_post_shape + post_shape)
            pre_in_post_start = self.__pre_as_post(pre_vertex_slice.start)
            pre_in_post_end = self.__pre_as_post(pre_vertex_slice.end)
            pip_slices = tuple(
                slice(pip_start, pip_end + 1) for pip_start, pip_end in zip(
                    pre_in_post_start, pre_in_post_end))
            # TODO check this is correct
            post_slices = post_vertex_slice.dimension
            return all_weights[pip_slices + post_slices].flatten()
        elif isinstance(self.__weights, RandomDistribution):
            n_weights = self.__get_n_sub_weights(
                pre_vertex_slice, post_vertex_slice.n_atoms)
            return numpy.array(self.__weights.next(n_weights), dtype="float64")
        else:
            raise SynapticConfigurationException(
                f"Unknown weights ({self.__weights})")

    @staticmethod
    def __to_nd_shape(shape, n_dims, param_name):
        if shape is None:
            return None
        if numpy.isscalar(shape):
            return numpy.array([shape] * n_dims, dtype='int')
        elif len(shape) == n_dims:
            return numpy.array(shape, dtype='int')
        raise SynapticConfigurationException(
            f"{param_name} must be an int or a tuple(int) with {n_dims}"
            " dimensions")

[docs] @staticmethod def get_post_pool_shape( pre_shape, pool_shape=None, pool_stride=None): pool_shape = PoolDenseConnector.__to_nd_shape( pool_shape, len(pre_shape), "pool_shape") pool_stride = PoolDenseConnector.__to_nd_shape( pool_stride, len(pre_shape), "pool_stride") if pool_stride is None: pool_stride = pool_shape shape = numpy.array(pre_shape) if pool_shape is not None: post_pool_shape = shape - (pool_shape - 1) shape = (post_pool_shape // pool_stride) + 1 return shape
def __get_pre_in_post_shape(self, pre_shape): return self.get_post_pool_shape( pre_shape, self.__pool_shape, self.__pool_stride) def __get_n_weights(self, pre_shape, post_shape): """ Get the expected number of weights. """ shape = self.__get_pre_in_post_shape(pre_shape) return numpy.prod(shape) * numpy.prod(post_shape) def __get_n_sub_weights(self, pre_vertex_slice, n_post_atoms): pre_in_post_start = self.__pre_as_post(pre_vertex_slice.start) pre_in_post_end = self.__pre_as_post(pre_vertex_slice.end) return (numpy.prod((pre_in_post_end - pre_in_post_start) + 1) * n_post_atoms)
[docs] @overrides(AbstractConnector.validate_connection) def validate_connection(self, application_edge, synapse_info): pre = application_edge.pre_vertex post = application_edge.post_vertex if len(pre.atoms_shape) != 2: raise ConfigurationException( "The PoolDenseConnector only works where the pre-Population" " of a Projection is 2D. Please ensure that the" " Population uses a Grid2D structure.") if isinstance(self.__weights, Iterable): expected_n_weights = self.__get_n_weights( pre.atoms_shape, post.atoms_shape) if expected_n_weights != numpy.array(self.__weights).size: raise ConfigurationException( f"With a source population with shape {pre.atoms_shape}," f" and a target population with shape {post.atoms_shape}," f" this connector requires {expected_n_weights} weights") if post.get_synapse_id_by_target( self.__positive_receptor_type) is None: raise ConfigurationException( "The post population doesn't have a synaptic receptor type of" f" {self.__positive_receptor_type}") if post.get_synapse_id_by_target( self.__negative_receptor_type) is None: raise ConfigurationException( "The post population doesn't have a synaptic receptor type of" f" {self.__negative_receptor_type}")
[docs] @overrides(AbstractConnector.get_delay_minimum) def get_delay_minimum(self, synapse_info): return synapse_info.delays
[docs] @overrides(AbstractConnector.get_delay_maximum) def get_delay_maximum(self, synapse_info): return synapse_info.delays
[docs] @overrides(AbstractConnector.get_n_connections_from_pre_vertex_maximum) def get_n_connections_from_pre_vertex_maximum( self, n_post_atoms, synapse_info, min_delay=None, max_delay=None): if min_delay is not None and max_delay is not None: delay = synapse_info.delays if min_delay > delay or max_delay < delay: return 0 # Every pre connects to every post return n_post_atoms
[docs] @overrides(AbstractConnector.get_n_connections_to_post_vertex_maximum) def get_n_connections_to_post_vertex_maximum(self, synapse_info): # Every post connects to every pre return synapse_info.n_pre_neurons
[docs] @overrides(AbstractConnector.get_weight_maximum) def get_weight_maximum(self, synapse_info): if isinstance(self.__weights, Iterable): return numpy.amax(numpy.abs(self.__weights)) n_conns = synapse_info.n_pre_neurons * synapse_info.n_post_neurons return super(PoolDenseConnector, self)._get_weight_maximum( self.__weights, n_conns, synapse_info)
def __pre_as_post(self, pre_coords): """ Write pre coordinates as post coordinates. :param ~collections.abc.Iterable pre_coords: An iterable of (x, y) coordinates :rtype: ~numpy.ndarray """ coords = numpy.array(pre_coords) if self.__pool_stride is not None: coords //= self.__pool_stride return coords
[docs] def local_only_n_bytes(self, incoming_slices, n_post_atoms): """ :param iterable(~pacman.model.graphs.common.Slice) incoming_slices: :param int n_post_atoms: :rtype: int """ n_weights = [self.__get_n_sub_weights(s, n_post_atoms) for s in incoming_slices] n_weights = [n + 1 if n % 2 != 0 else n for n in n_weights] n_dims = [len(s.shape) for s in incoming_slices] return ((sum(n_dims) * _DIMENSION_SIZE) + (sum(n_weights) * BYTES_PER_SHORT) + (len(incoming_slices) * _CONN_SIZE))
[docs] def write_local_only_data( self, spec, app_edge, pre_vertex_slice, post_vertex_slice, key, mask, n_colour_bits, weight_scales): """ :param ~data_specification.DataSpecificationGenerator spec: :param ~pacman.model.graphs.application.ApplicationEdge app_edge: :param ~pacman.model.graphs.common.Slice pre_vertex_slice: :param ~pacman.model.graphs.common.Slice post_vertex_slice: :param int key: :param int mask: :param int n_colour_bits: :param weight_scales: """ # Write source key info spec.write_value(key, data_type=DataType.UINT32) spec.write_value(mask, data_type=DataType.UINT32) spec.write_value(n_colour_bits, data_type=DataType.UINT32) # Write numbers of things n_dims = len(pre_vertex_slice.shape) n_weights = self.__get_n_sub_weights( pre_vertex_slice, post_vertex_slice.n_atoms) spec.write_value(n_dims, data_type=DataType.UINT32) spec.write_value(n_weights, data_type=DataType.UINT32) # Write synapse information pos_synapse_type = app_edge.post_vertex.get_synapse_id_by_target( self.__positive_receptor_type) neg_synapse_type = app_edge.post_vertex.get_synapse_id_by_target( self.__negative_receptor_type) spec.write_value(pos_synapse_type, data_type=DataType.UINT16) spec.write_value(neg_synapse_type, data_type=DataType.UINT16) # Write delay delay_step = (app_edge.post_vertex.synapse_dynamics.delay * SpynnakerDataView.get_simulation_time_step_per_ms()) local_delay = (delay_step % app_edge.post_vertex.splitter.max_support_delay()) spec.write_value(local_delay) # Generate the dimension information dim_info = numpy.zeros(n_dims, dtype=_DIM_DTYPE) if self.__pool_stride is not None: stride = self.__to_nd_shape(self.__pool_stride, n_dims, "") dim_info["recip_pool_stride"] = [self.__recip(p) for p in stride] else: dim_info["recip_pool_stride"] = self.__recip(1) if isinstance(app_edge.pre_vertex, HasShapeKeyFields): pre_start_size_mask_shift = numpy.array( app_edge.pre_vertex.get_shape_key_fields(pre_vertex_slice)) start = pre_start_size_mask_shift[:, 0] size = pre_start_size_mask_shift[:, 1] dim_info["pre_start"] = start dim_info["mask"] = pre_start_size_mask_shift[:, 2] dim_info["shift"] = pre_start_size_mask_shift[:, 3] else: start = numpy.array(pre_vertex_slice.start) size = numpy.array(pre_vertex_slice.shape) n_bits = numpy.ceil(numpy.log2(size)).astype("int") shifts = numpy.concatenate(([0], numpy.cumsum(n_bits[:-1]))) masks = numpy.left_shift(numpy.left_shift(1, n_bits) - 1, shifts) dim_info["pre_start"] = start dim_info["mask"] = masks dim_info["shift"] = shifts dim_info["pre_in_post_start"] = self.__pre_as_post(start) dim_info["pre_in_post_end"] = self.__pre_as_post(start + size) dim_info["pre_in_post_shape"] = ( dim_info["pre_in_post_end"] - dim_info["pre_in_post_start"] + 1) spec.write_array(dim_info.view(numpy.uint32)) # Work out which weights are for this connection weights = self.__decode_weights( app_edge.pre_vertex.atoms_shape, app_edge.post_vertex.atoms_shape, pre_vertex_slice, post_vertex_slice) # Divide weights by pooling area if needed if self.__pool_shape is not None: shape = self.__to_nd_shape(self.__pool_shape, n_dims, "") area = numpy.prod(shape) weights = weights / area # Encode weights with weight scaling if len(weights) % 2 != 0: weights = numpy.concatenate((weights, [0])) neg_weights = weights < 0 pos_weights = weights > 0 weights[neg_weights] *= weight_scales[neg_synapse_type] weights[pos_weights] *= weight_scales[pos_synapse_type] final_weights = numpy.round(weights).astype(numpy.int16) spec.write_array(final_weights.view(numpy.uint32))
def __recip(self, v): """ Compute the reciprocal of a number as an signed 1-bit integer, 14-bit fractional fixed point number, encoded in an integer. """ return int(round((1 / v) * (1 << 14)))