Source code for spynnaker.pyNN.models.neural_projections.connectors.kernel_connector

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy
from spinn_utilities.overrides import overrides
from spinn_front_end_common.interface.ds import DataType
from spinn_front_end_common.utilities.constants import BYTES_PER_WORD
from pyNN.random import RandomDistribution
from .abstract_connector import AbstractConnector
from spynnaker.pyNN.exceptions import SpynnakerException
from .abstract_generate_connector_on_machine import (
    AbstractGenerateConnectorOnMachine, ConnectorIDs)
from .abstract_generate_connector_on_host import (
    AbstractGenerateConnectorOnHost)
from spynnaker.pyNN.utilities.constants import SPIKE_PARTITION_ID

HEIGHT, WIDTH = 0, 1
N_KERNEL_PARAMS = 8


class ConvolutionKernel(numpy.ndarray):
    pass


def shape2word(sw, sh):
    return (((numpy.uint32(sh) & 0xFFFF) << 16) |
            (numpy.uint32(sw) & 0xFFFF))


class KernelConnector(AbstractGenerateConnectorOnMachine,
                      AbstractGenerateConnectorOnHost):
    """
    Where the pre- and post-synaptic populations are considered as a 2D
    array. Connect every post(row, column) neuron to many
    pre(row, column, kernel)
    through a (kernel) set of weights and/or delays.

    .. admonition:: TODO

        Should these include `allow_self_connections` and `with_replacement`?
    """

    def __init__(
            self, shape_pre, shape_post, shape_kernel, weight_kernel=None,
            delay_kernel=None, shape_common=None,
            pre_sample_steps_in_post=None, pre_start_coords_in_post=None,
            post_sample_steps_in_pre=None, post_start_coords_in_pre=None,
            safe=True, space=None, verbose=False, callback=None):
        """
        :param shape_pre:
            2D shape of the pre-population (rows/height, columns/width, usually
            the input image shape)
        :type shape_pre: list(int) or tuple(int,int)
        :param shape_post:
            2D shape of the post-population (rows/height, columns/width)
        :type shape_post: list(int) or tuple(int,int)
        :param shape_kernel:
            2D shape of the kernel (rows/height, columns/width)
        :type shape_kernel: list(int) or tuple(int,int)
        :param weight_kernel: (optional)
            2D matrix of size shape_kernel describing the weights
        :type weight_kernel: ~numpy.ndarray or ~pyNN.random.RandomDistribution
            or int or float or list(int) or list(float) or None
        :param delay_kernel: (optional)
            2D matrix of size shape_kernel describing the delays
        :type delay_kernel: ~numpy.ndarray or ~pyNN.random.RandomDistribution
            or int or float or list(int) or list(float) or None
        :param shape_common: (optional)
            2D shape of common coordinate system (for both pre- and post-,
            usually the input image sizes)
        :type shape_common: list(int) or tuple(int,int) or None
        :param pre_sample_steps_in_post: (optional)
            Sampling steps/jumps for pre-population <=> (stepX, stepY)
        :type pre_sample_steps_in_post: None or list(int) or tuple(int,int)
        :param pre_start_coords_in_post: (optional)
            Starting row/column for pre-population sampling <=> (offX, offY)
        :type pre_start_coords_in_post: None or list(int) or tuple(int,int)
        :param post_sample_steps_in_pre: (optional)
            Sampling steps/jumps for post-population <=> (stepX, stepY)
        :type post_sample_steps_in_pre: None or list(int) or tuple(int,int)
        :param post_start_coords_in_pre: (optional)
            Starting row/column for post-population sampling <=> (offX, offY)
        :type post_start_coords_in_pre: None or list(int) or tuple(int,int)
        :param bool safe:
            Whether to check that weights and delays have valid values.
            If ``False``, this check is skipped.
        :param ~pyNN.space.Space space:
            Currently ignored; for future compatibility.
        :param bool verbose:
            Whether to output extra information about the connectivity to a
            CSV file
        :param callable callback: (ignored)
        """
        super().__init__(safe=safe, callback=callback, verbose=verbose)
        assert space is None, "non-None space unsupported"

        # Get the kernel size
        self._kernel_w = shape_kernel[WIDTH]
        self._kernel_h = shape_kernel[HEIGHT]

        # The half-value used here indicates the half-way array position
        self._hlf_k_w = shape_kernel[WIDTH] // 2
        self._hlf_k_h = shape_kernel[HEIGHT] // 2

        # Cache values for the pre and post sizes
        self._pre_w = shape_pre[WIDTH]
        self._pre_h = shape_pre[HEIGHT]
        self._post_w = shape_post[WIDTH]
        self._post_h = shape_post[HEIGHT]

        # Get the starting coords and step sizes (or defaults if not given)
        if pre_start_coords_in_post is None:
            self._pre_start_w = 0
            self._pre_start_h = 0
        else:
            self._pre_start_w = pre_start_coords_in_post[WIDTH]
            self._pre_start_h = pre_start_coords_in_post[HEIGHT]

        if post_start_coords_in_pre is None:
            self._post_start_w = 0
            self._post_start_h = 0
        else:
            self._post_start_w = post_start_coords_in_pre[WIDTH]
            self._post_start_h = post_start_coords_in_pre[HEIGHT]

        if pre_sample_steps_in_post is None:
            self._pre_step_w = 1
            self._pre_step_h = 1
        else:
            self._pre_step_w = pre_sample_steps_in_post[WIDTH]
            self._pre_step_h = pre_sample_steps_in_post[HEIGHT]

        if post_sample_steps_in_pre is None:
            self._post_step_w = 1
            self._post_step_h = 1
        else:
            self._post_step_w = post_sample_steps_in_pre[WIDTH]
            self._post_step_h = post_sample_steps_in_pre[HEIGHT]

        # Make sure the supplied values are in the correct format
        self._krn_weights = self.__get_kernel_vals(weight_kernel)
        self._krn_delays = self.__get_kernel_vals(delay_kernel)

        self._shape_common = \
            shape_pre if shape_common is None else shape_common
        self._common_w = self._shape_common[WIDTH]
        self._common_h = self._shape_common[HEIGHT]
        self._shape_pre = shape_pre
        self._shape_post = shape_post

        # Create storage for later
        self._post_as_pre = {}

    def __to_post_coords(self, post_vertex_slice):
        """
        Get a list of possible post-slice coordinates.

        :param ~pacman.model.graphs.common.Slice post_vertex_slice:
        :rtype: tuple(~numpy.ndarray, ~numpy.ndarray)
        """
        post = numpy.arange(
            post_vertex_slice.lo_atom, post_vertex_slice.hi_atom + 1)
        return numpy.divmod(post, self._post_w)

    def __map_to_pre_coords(self, post_r, post_c):
        """
        Get a map from post to pre-population coordinates.

        :param ~numpy.ndarray post_r: rows
        :param ~numpy.ndarray post_c: columns
        :rtype: tuple(~numpy.ndarray, ~numpy.ndarray)
        """
        return (self._post_start_h + post_r * self._post_step_h,
                self._post_start_w + post_c * self._post_step_w)

    def __post_as_pre(self, post_vertex_slice):
        """
        Write post-population coordinates as pre-population coordinates.

        :param ~pacman.model.graphs.common.Slice post_vertex_slice:
        :rtype: tuple(~numpy.ndarray, ~numpy.ndarray)
        """
        # TODO: When slices become hashable, update this code to use them
        # directly as the cache index
        if str(post_vertex_slice) not in self._post_as_pre:
            post_r, post_c = self.__to_post_coords(post_vertex_slice)
            self._post_as_pre[str(post_vertex_slice)] = \
                self.__map_to_pre_coords(post_r, post_c)
        return self._post_as_pre[str(post_vertex_slice)]

    def __pre_as_post(self, pre_r, pre_c):
        """
        Write pre-population coordinates as post-population coordinates.

        :param int pre_r: row
        :param int pre_c: column
        :rtype: tuple(int,int)
        """
        r = ((pre_r - self._pre_start_h - 1) // self._pre_step_h) + 1
        c = ((pre_c - self._pre_start_w - 1) // self._pre_step_w) + 1
        return (r, c)

    def __get_kernel_vals(self, vals):
        """
        Convert kernel values given into the correct format.

        :param vals:
        :type vals: int or float or ~pyNN.random.RandomDistribution
            or ~numpy.ndarray or ConvolutionKernel
        :rtype: ~numpy.ndarray
        """
        if vals is None:
            return None
        if isinstance(vals, list):
            vals = numpy.asarray(vals)
        krn_size = self._kernel_h * self._kernel_w
        krn_shape = (self._kernel_h, self._kernel_w)
        if isinstance(vals, RandomDistribution):
            return numpy.array(vals.next(krn_size)).reshape(krn_shape)
        elif numpy.isscalar(vals):
            return vals * numpy.ones(krn_shape)
        elif ((isinstance(vals, numpy.ndarray) or
                isinstance(vals, ConvolutionKernel)) and
                vals.shape[HEIGHT] == self._kernel_h and
                vals.shape[WIDTH] == self._kernel_w):
            return vals.view(ConvolutionKernel)
        raise SpynnakerException(
            "Error generating KernelConnector values; if you have supplied "
            "weight and/or delay kernel then ensure they are the same size "
            "as specified by the shape kernel values (height: "
            f"{self._kernel_h} and width: {self._kernel_w}).")

    def __compute_statistics(
            self, weights, delays, post_vertex_slice, n_pre_neurons):
        """
        Compute the relevant information required for the connections.

        :param weights:
        :type weights: int or float or ~pyNN.random.RandomDistribution or
            ~numpy.ndarray or ConvolutionKernel
        :param delays:
        :type delays: int or float or ~pyNN.random.RandomDistribution or
            ~numpy.ndarray or ConvolutionKernel
        :param ~pacman.model.graphs.common.Slice post_vertex_slice:
        """
        # If __compute_statistics is called more than once, there's
        # no need to get the user-supplied weights and delays again
        if self._krn_weights is None:
            self._krn_weights = self.__get_kernel_vals(weights)
        if self._krn_delays is None:
            self._krn_delays = self.__get_kernel_vals(delays)

        post_as_pre_r, post_as_pre_c = self.__post_as_pre(post_vertex_slice)
        coords = {}
        hh, hw = self._hlf_k_h, self._hlf_k_w
        all_pre_ids = []
        all_post_ids = []
        all_delays = []
        all_weights = []
        count = 0
        post_lo = post_vertex_slice.lo_atom

        # Loop over pre-vertices
        for pre_idx in range(n_pre_neurons):
            pre_r, pre_c = divmod(pre_idx, self._pre_w)
            coords[pre_idx] = []

            # Test whether the coordinates should be included based on the
            # step function (in the pre) and skip if not
            if not (((pre_r - self._pre_start_h) % self._pre_step_h == 0) and
                    ((pre_c - self._pre_start_w) % self._pre_step_w == 0)):
                continue

            # Loop over post-vertices
            for post_idx in range(
                    post_vertex_slice.lo_atom, post_vertex_slice.hi_atom + 1):

                # convert to common coord system
                pac_r = post_as_pre_r[post_idx - post_lo]
                pac_c = post_as_pre_c[post_idx - post_lo]

                # now convert common to pre coords
                pap_r, pap_c = self.__pre_as_post(pac_r, pac_c)

                # Obtain coordinates to test against kernel sizes
                dr = pap_r - pre_r
                kr = hh - dr
                dc = pap_c - pre_c
                kc = hw - dc

                if 0 <= kr < self._kernel_h and 0 <= kc < self._kernel_w:
                    if post_idx in coords[pre_idx]:
                        continue
                    coords[pre_idx].append(post_idx)

                    # Store weights, delays and pre/post ids
                    w = self._krn_weights[kr, kc]
                    d = self._krn_delays[kr, kc]

                    count += 1

                    all_pre_ids.append(pre_idx)
                    all_post_ids.append(post_idx)
                    all_delays.append(d)
                    all_weights.append(w)

        # Now the loop is complete, return relevant data
        return (count, numpy.array(all_post_ids, dtype='uint32'),
                numpy.array(all_pre_ids, dtype='uint32'),
                numpy.array(all_delays), numpy.array(all_weights))

[docs] @overrides(AbstractConnector.get_delay_maximum) def get_delay_maximum(self, synapse_info): # Use the kernel delays if user has supplied them if self._krn_delays is not None: return numpy.max(self._krn_delays) # I think this is overestimated, but not by much n_conns = ( self._pre_w * self._pre_h * self._kernel_w * self._kernel_h) # if not then use the values that came in return self._get_delay_maximum( synapse_info.delays, n_conns, synapse_info)
[docs] @overrides(AbstractConnector.get_delay_minimum) def get_delay_minimum(self, synapse_info): # Use the kernel delays if user has supplied them if self._krn_delays is not None: return numpy.min(self._krn_delays) # I think this is overestimated, but not by much n_conns = ( self._pre_w * self._pre_h * self._kernel_w * self._kernel_h) # if not then use the values that came in return self._get_delay_minimum( synapse_info.delays, n_conns, synapse_info)
[docs] @overrides(AbstractConnector.get_delay_variance) def get_delay_variance(self, delays, synapse_info): if self._krn_delays is not None: return numpy.var(self._krn_delays) return super(KernelConnector, self).get_delay_variance( delays, synapse_info)
[docs] @overrides(AbstractConnector.get_n_connections_from_pre_vertex_maximum) def get_n_connections_from_pre_vertex_maximum( self, n_post_atoms, synapse_info, min_delay=None, max_delay=None): return numpy.clip(self._kernel_h * self._kernel_w, 0, n_post_atoms)
[docs] @overrides(AbstractConnector.get_n_connections_to_post_vertex_maximum) def get_n_connections_to_post_vertex_maximum(self, synapse_info): return numpy.clip( self._kernel_h * self._kernel_w, 0, 255)
[docs] @overrides(AbstractConnector.get_weight_maximum) def get_weight_maximum(self, synapse_info): # Use the kernel weights if user has supplied them if self._krn_weights is not None: return numpy.max(self._krn_weights) # I think this is overestimated, but not by much n_conns = ( self._pre_w * self._pre_h * self._kernel_w * self._kernel_h) return self._get_weight_maximum( synapse_info.weights, n_conns, synapse_info)
[docs] @overrides(AbstractConnector.get_weight_mean) def get_weight_mean(self, weights, synapse_info): # Use the kernel weights if user has supplied them if self._krn_weights is not None: return numpy.mean(self._krn_weights) return super(KernelConnector, self).get_weight_mean( weights, synapse_info)
[docs] @overrides(AbstractConnector.get_weight_variance) def get_weight_variance(self, weights, synapse_info): # Use the kernel weights if user has supplied them if self._krn_weights is not None: return numpy.var(self._krn_weights) return super(KernelConnector, self).get_weight_variance( weights, synapse_info)
def __repr__(self): return \ f"KernelConnector(shape_kernel[{self._kernel_w},{self._kernel_h}])"
[docs] @overrides(AbstractGenerateConnectorOnHost.create_synaptic_block) def create_synaptic_block( self, post_slices, post_vertex_slice, synapse_type, synapse_info): (n_connections, all_post, all_pre_in_range, all_pre_in_range_delays, all_pre_in_range_weights) = self.__compute_statistics( synapse_info.weights, synapse_info.delays, post_vertex_slice, synapse_info.n_pre_neurons) syn_dtypes = AbstractConnector.NUMPY_SYNAPSES_DTYPE if n_connections <= 0: return numpy.zeros(0, dtype=syn_dtypes) # 0 for exc, 1 for inh syn_type = numpy.array(all_pre_in_range_weights < 0) block = numpy.zeros(n_connections, dtype=syn_dtypes) block["source"] = all_pre_in_range block["target"] = all_post block["weight"] = all_pre_in_range_weights block["delay"] = all_pre_in_range_delays block["synapse_type"] = syn_type.astype('uint8') return block
@property @overrides(AbstractGenerateConnectorOnMachine.gen_connector_id) def gen_connector_id(self): return ConnectorIDs.KERNEL_CONNECTOR.value
[docs] @overrides(AbstractGenerateConnectorOnMachine.gen_connector_params) def gen_connector_params(self): data = numpy.array([ shape2word(self._common_w, self._common_h), shape2word(self._pre_w, self._pre_h), shape2word(self._post_w, self._post_h), shape2word(self._pre_start_w, self._pre_start_h), shape2word(self._post_start_w, self._post_start_h), shape2word(self._pre_step_w, self._pre_step_h), shape2word(self._post_step_w, self._post_step_h), shape2word(self._kernel_w, self._kernel_h), shape2word(int(self._krn_weights is not None), int(self._krn_delays is not None))], dtype="uint32") extra_data = [] if self._krn_weights is not None: extra_data.append(DataType.S1615.encode_as_numpy_int_array( self._krn_weights.flatten())) if self._krn_delays is not None: extra_data.append(DataType.S1615.encode_as_numpy_int_array( self._krn_delays.flatten())) if extra_data: return numpy.concatenate((data, *extra_data)) return data
@property @overrides( AbstractGenerateConnectorOnMachine.gen_connector_params_size_in_bytes) def gen_connector_params_size_in_bytes(self): return N_KERNEL_PARAMS * BYTES_PER_WORD
[docs] @overrides(AbstractGenerateConnectorOnMachine.get_connected_vertices) def get_connected_vertices(self, s_info, source_vertex, target_vertex): src_splitter = source_vertex.splitter return [ (t_vert, [s_vert for s_vert in src_splitter.get_out_going_vertices( SPIKE_PARTITION_ID) if self.__connects(s_vert, t_vert)]) for t_vert in target_vertex.splitter.get_in_coming_vertices( SPIKE_PARTITION_ID)]
def __connects(self, src_machine_vertex, dest_machine_vertex): # If the pre- and post-slices are not 2-dimensional slices, we have # to let them pass pre_slice = src_machine_vertex.vertex_slice post_slice = dest_machine_vertex.vertex_slice if (pre_slice.shape is None or len(pre_slice.shape) != 2 or post_slice.shape is None or len(post_slice.shape) != 2): return True pre_slice_x = pre_slice.get_slice(0) pre_slice_y = pre_slice.get_slice(1) post_slice_x = post_slice.get_slice(0) post_slice_y = post_slice.get_slice(1) min_pre_x = post_slice_x.start - self._hlf_k_w max_pre_x = (post_slice_x.stop + self._hlf_k_w) - 1 min_pre_y = post_slice_y.start - self._hlf_k_h max_pre_y = (post_slice_y.stop + self._hlf_k_h) - 1 # No part of the pre square overlaps the post-square, don't connect if (pre_slice_x.stop <= min_pre_x or pre_slice_x.start > max_pre_x or pre_slice_y.stop <= min_pre_y or pre_slice_y.start > max_pre_y): return False # Otherwise, they do return True