Source code for spynnaker.pyNN.models.neural_projections.connectors.kernel_connector

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import (Any, Dict, List, Final, Optional, Sequence, Tuple, Union,
                    TYPE_CHECKING)

import numpy
from numpy import floating, integer, ndarray, uint32
from numpy.typing import NDArray
from typing_extensions import TypeAlias

from pyNN.random import RandomDistribution
from pyNN.space import Space

from spinn_utilities.overrides import overrides

from pacman.model.graphs import AbstractVertex
from pacman.model.graphs.application import ApplicationVertex
from pacman.model.graphs.machine import MachineVertex
from pacman.model.graphs.common import Slice

from spinn_front_end_common.interface.ds import DataType
from spinn_front_end_common.utilities.constants import BYTES_PER_WORD
from spinn_front_end_common.utilities.exceptions import ConfigurationException

from spynnaker.pyNN.exceptions import SpynnakerException
from spynnaker.pyNN.types import (
    Delays, WeightsDelays, Weights)

from .abstract_connector import AbstractConnector
from .abstract_generate_connector_on_machine import (
    AbstractGenerateConnectorOnMachine, ConnectorIDs)
from .abstract_generate_connector_on_host import (
    AbstractGenerateConnectorOnHost)

if TYPE_CHECKING:
    from spynnaker.pyNN.models.neural_projections import (
        ProjectionApplicationEdge, SynapseInformation)


_TWOD: Final['TypeAlias'] = Union[List[int], Tuple[int, int]]
_KERNAL: Final['TypeAlias'] = Union[
    float, int, List[float], NDArray[numpy.floating], RandomDistribution]

HEIGHT, WIDTH = 0, 1
N_KERNEL_PARAMS = 9


class ConvolutionKernel(ndarray):
    """
    Thin wrapper around a numpy array.
    """


def shape2word(
        short1: Union[int, integer], short2: Union[int, integer]) -> uint32:
    """
    Combines two short values into 1 int by shifting the first 16 places

    :param short1: first 2 byte value
    :param short2: second 2 bytes value
    :returns: Combination of the two values with the first shifted 16 places
    """
    return uint32(((uint32(short2) & 0xFFFF) << 16)
                  | (uint32(short1) & 0xFFFF))


class KernelConnector(AbstractGenerateConnectorOnMachine,
                      AbstractGenerateConnectorOnHost):
    """
    Where the pre- and post-synaptic populations are considered as a 2D
    array. Connect every post(row, column) neuron to many
    pre(row, column, kernel)
    through a (kernel) set of weights and/or delays.
    """
    __slots__ = (
        "_kernel_w", "_kernel_h",
        "_hlf_k_w", "_hlf_k_h",
        "_pre_w", "_pre_h",
        "_post_w", "_post_h",
        "_pre_start_w", "_pre_start_h",
        "_post_start_w", "_post_start_h",
        "_pre_step_w", "_pre_step_h",
        "_post_step_w", "_post_step_h",
        "_krn_weights", "_krn_delays", "_shape_common",
        "_common_w", "_common_h",
        "_shape_pre", "_shape_post",
        "_post_as_pre")

    def __init__(
            self, shape_pre: _TWOD, shape_post: _TWOD, shape_kernel: _TWOD,
            weight_kernel: Optional[_KERNAL] = None,
            delay_kernel: Optional[_KERNAL] = None,
            shape_common: Optional[_TWOD] = None,
            pre_sample_steps_in_post: Optional[_TWOD] = None,
            pre_start_coords_in_post: Optional[_TWOD] = None,
            post_sample_steps_in_pre: Optional[_TWOD] = None,
            post_start_coords_in_pre: Optional[_TWOD] = None,
            safe: bool = True, space: Optional[Space] = None,
            verbose: bool = False, callback: None = None):
        """
        :param shape_pre:
            2D shape of the pre-population (rows/height, columns/width, usually
            the input image shape)
        :param shape_post:
            2D shape of the post-population (rows/height, columns/width)
        :param shape_kernel:
            2D shape of the kernel (rows/height, columns/width)
        :param weight_kernel: (optional)
            2D matrix of size shape_kernel describing the weights
        :param delay_kernel: (optional)
            2D matrix of size shape_kernel describing the delays
        :param shape_common: (optional)
            2D shape of common coordinate system (for both pre- and post-,
            usually the input image sizes)
        :param pre_sample_steps_in_post: (optional)
            Sampling steps/jumps for pre-population <=> (stepX, stepY)
        :param pre_start_coords_in_post: (optional)
            Starting row/column for pre-population sampling <=> (offX, offY)
        :param post_sample_steps_in_pre: (optional)
            Sampling steps/jumps for post-population <=> (stepX, stepY)
        :param post_start_coords_in_pre: (optional)
            Starting row/column for post-population sampling <=> (offX, offY)
        :param safe:
            Whether to check that weights and delays have valid values.
            If ``False``, this check is skipped.
        :param space:
            Currently ignored; for future compatibility.
        :param verbose:
            Whether to output extra information about the connectivity to a
            CSV file
        :param callback:
            if given, a callable that display a progress bar on the terminal.

            .. note::
                Not supported by sPyNNaker.
        """
        super().__init__(safe=safe, callback=callback, verbose=verbose)
        assert space is None, "non-None space unsupported"

        # Get the kernel size
        self._kernel_w = shape_kernel[WIDTH]
        self._kernel_h = shape_kernel[HEIGHT]

        # The half-value used here indicates the half-way array position
        self._hlf_k_w = shape_kernel[WIDTH] // 2
        self._hlf_k_h = shape_kernel[HEIGHT] // 2

        # Cache values for the pre and post sizes
        self._pre_w = shape_pre[WIDTH]
        self._pre_h = shape_pre[HEIGHT]
        self._post_w = shape_post[WIDTH]
        self._post_h = shape_post[HEIGHT]

        # Get the starting coordinates and step sizes
        # (or defaults if not given)
        if pre_start_coords_in_post is None:
            self._pre_start_w = 0
            self._pre_start_h = 0
        else:
            self._pre_start_w = pre_start_coords_in_post[WIDTH]
            self._pre_start_h = pre_start_coords_in_post[HEIGHT]

        if post_start_coords_in_pre is None:
            self._post_start_w = 0
            self._post_start_h = 0
        else:
            self._post_start_w = post_start_coords_in_pre[WIDTH]
            self._post_start_h = post_start_coords_in_pre[HEIGHT]

        if pre_sample_steps_in_post is None:
            self._pre_step_w = 1
            self._pre_step_h = 1
        else:
            self._pre_step_w = pre_sample_steps_in_post[WIDTH]
            self._pre_step_h = pre_sample_steps_in_post[HEIGHT]

        if post_sample_steps_in_pre is None:
            self._post_step_w = 1
            self._post_step_h = 1
        else:
            self._post_step_w = post_sample_steps_in_pre[WIDTH]
            self._post_step_h = post_sample_steps_in_pre[HEIGHT]

        # Make sure the supplied values are in the correct format
        self._krn_weights = self.__get_kernel_vals(weight_kernel)
        self._krn_delays = self.__get_kernel_vals(delay_kernel)

        self._shape_common = \
            shape_pre if shape_common is None else shape_common
        self._common_w = self._shape_common[WIDTH]
        self._common_h = self._shape_common[HEIGHT]
        self._shape_pre = shape_pre
        self._shape_post = shape_post

        # Create storage for later
        self._post_as_pre: Dict[
            Slice, Tuple[NDArray[integer], NDArray[integer]]] = {}

[docs] @overrides(AbstractGenerateConnectorOnMachine.get_parameters) def get_parameters(self) -> Dict[str, Any]: parameters = self._get_parameters() parameters["shape_pre"] = [self._pre_h, self._pre_w] parameters["shape_post"] = [self._post_h, self._post_w] parameters["shape_kernel"] = (self._kernel_h, self._kernel_w) parameters["weight_kernel"] = self._krn_weights parameters["delay_kernel"] = self._krn_delays parameters["shape_common"] = self._shape_common parameters["pre_sample_steps_in_post"] = ( self._pre_step_h, self._pre_step_w) parameters["pre_start_coords_in_post"] = ( self._pre_start_h, self._pre_start_w) parameters["post_sample_steps_in_pre"] = ( self._post_step_h, self._post_step_w) parameters["post_start_coords_in_pre"] = ( self._post_start_h, self._post_start_w) parameters["space"] = None return parameters
def __to_post_coords( self, post_vertex_slice: Slice) -> Tuple[ NDArray[integer], NDArray[integer]]: """ Get a list of possible post-slice coordinates. :param post_vertex_slice: """ post = numpy.arange( post_vertex_slice.lo_atom, post_vertex_slice.hi_atom + 1, dtype=uint32) return numpy.divmod(post, self._post_w) def __map_to_pre_coords( self, post_r: NDArray[integer], post_c: NDArray[integer]) -> Tuple[ NDArray[integer], NDArray[integer]]: """ Get a map from post to pre-population coordinates. """ return (self._post_start_h + post_r * self._post_step_h, self._post_start_w + post_c * self._post_step_w) def __post_as_pre(self, post_vertex_slice: Slice) -> Tuple[ NDArray[integer], NDArray[integer]]: """ Write post-population coordinates as pre-population coordinates. """ # directly as the cache index if post_vertex_slice not in self._post_as_pre: post_r, post_c = self.__to_post_coords(post_vertex_slice) self._post_as_pre[post_vertex_slice] = \ self.__map_to_pre_coords(post_r, post_c) return self._post_as_pre[post_vertex_slice] def __pre_as_post(self, pre_r: int, pre_c: int) -> Tuple[int, int]: """ Write pre-population coordinates as post-population coordinates. """ r = ((pre_r - self._pre_start_h - 1) // self._pre_step_h) + 1 c = ((pre_c - self._pre_start_w - 1) // self._pre_step_w) + 1 return (r, c) def __get_kernel_vals(self, values: Optional[Union[ _KERNAL, WeightsDelays]]) -> Optional[ConvolutionKernel]: """ Convert kernel values given into the correct format. """ if values is None: return None if isinstance(values, list): values = numpy.asarray(values) krn_size = self._kernel_h * self._kernel_w krn_shape = (self._kernel_h, self._kernel_w) if isinstance(values, RandomDistribution): return numpy.array(values.next(krn_size)).reshape(krn_shape).view( ConvolutionKernel) elif numpy.isscalar(values): return numpy.full(krn_shape, values).view(ConvolutionKernel) elif ((isinstance(values, numpy.ndarray) or isinstance(values, ConvolutionKernel)) and values.shape[HEIGHT] == self._kernel_h and values.shape[WIDTH] == self._kernel_w): return values.view(ConvolutionKernel) raise SpynnakerException( "Error generating KernelConnector values; if you have supplied " "weight and/or delay kernel then ensure they are the same size " "as specified by the shape kernel values (height: " f"{self._kernel_h} and width: {self._kernel_w}).") def __compute_statistics( self, weights: Optional[Weights], delays: Optional[Delays], post_vertex_slice: Slice, n_pre_neurons: int) -> Tuple[ int, NDArray[uint32], NDArray[uint32], NDArray[floating], NDArray[floating]]: """ Compute the relevant information required for the connections. """ # If __compute_statistics is called more than once, there's # no need to get the user-supplied weights and delays again if self._krn_weights is None: self._krn_weights = self.__get_kernel_vals(weights) if self._krn_delays is None: self._krn_delays = self.__get_kernel_vals(delays) assert self._krn_weights is not None assert self._krn_delays is not None post_as_pre_r, post_as_pre_c = self.__post_as_pre(post_vertex_slice) coords: Dict[int, List[int]] = {} hh, hw = self._hlf_k_h, self._hlf_k_w all_pre_ids: List[int] = [] all_post_ids: List[int] = [] all_delays: List[NDArray[floating]] = [] all_weights: List[NDArray[floating]] = [] count = 0 post_lo = post_vertex_slice.lo_atom # Loop over pre-vertices for pre_idx in range(n_pre_neurons): pre_r, pre_c = divmod(pre_idx, self._pre_w) coords[pre_idx] = [] # Test whether the coordinates should be included based on the # step function (in the pre) and skip if not if not (((pre_r - self._pre_start_h) % self._pre_step_h == 0) and ((pre_c - self._pre_start_w) % self._pre_step_w == 0)): continue # Loop over post-vertices for post_idx in range( post_vertex_slice.lo_atom, post_vertex_slice.hi_atom + 1): # convert to common coordinate system pac_r = post_as_pre_r[post_idx - post_lo] pac_c = post_as_pre_c[post_idx - post_lo] # now convert common to pre coordinates pap_r, pap_c = self.__pre_as_post(pac_r, pac_c) # Obtain coordinates to test against kernel sizes dr = pap_r - pre_r kr = hh - dr dc = pap_c - pre_c kc = hw - dc if 0 <= kr < self._kernel_h and 0 <= kc < self._kernel_w: if post_idx in coords[pre_idx]: continue coords[pre_idx].append(post_idx) # Store weights, delays and pre/post ids w = self._krn_weights[kr, kc] d = self._krn_delays[kr, kc] count += 1 all_pre_ids.append(pre_idx) all_post_ids.append(post_idx) all_delays.append(d) all_weights.append(w) # Now the loop is complete, return relevant data return (count, numpy.array(all_post_ids, dtype=uint32), numpy.array(all_pre_ids, dtype=uint32), numpy.array(all_delays), numpy.array(all_weights))
[docs] @overrides(AbstractConnector.get_delay_maximum) def get_delay_maximum(self, synapse_info: SynapseInformation) -> float: # Use the kernel delays if user has supplied them if self._krn_delays is not None: return numpy.max(self._krn_delays) # I think this is overestimated, but not by much n_conns = ( self._pre_w * self._pre_h * self._kernel_w * self._kernel_h) # if not then use the values that came in return self._get_delay_maximum( synapse_info.delays, n_conns, synapse_info)
[docs] @overrides(AbstractConnector.get_delay_minimum) def get_delay_minimum(self, synapse_info: SynapseInformation) -> float: # Use the kernel delays if user has supplied them if self._krn_delays is not None: return numpy.min(self._krn_delays) # I think this is overestimated, but not by much n_conns = ( self._pre_w * self._pre_h * self._kernel_w * self._kernel_h) # if not then use the values that came in return self._get_delay_minimum( synapse_info.delays, n_conns, synapse_info)
[docs] @overrides(AbstractConnector.get_delay_variance) def get_delay_variance(self, delays: Delays, synapse_info: SynapseInformation) -> float: if self._krn_delays is not None: return float(numpy.var(self._krn_delays)) return super().get_delay_variance(delays, synapse_info)
[docs] @overrides(AbstractConnector.get_n_connections_from_pre_vertex_maximum) def get_n_connections_from_pre_vertex_maximum( self, n_post_atoms: int, synapse_info: SynapseInformation, min_delay: Optional[float] = None, max_delay: Optional[float] = None) -> int: return numpy.clip(self._kernel_h * self._kernel_w, 0, n_post_atoms)
[docs] @overrides(AbstractConnector.get_n_connections_to_post_vertex_maximum) def get_n_connections_to_post_vertex_maximum( self, synapse_info: SynapseInformation) -> int: return numpy.clip(self._kernel_h * self._kernel_w, 0, 255)
[docs] @overrides(AbstractConnector.get_weight_maximum) def get_weight_maximum(self, synapse_info: SynapseInformation) -> float: # Use the kernel weights if user has supplied them if self._krn_weights is not None: return numpy.max(self._krn_weights) # I think this is overestimated, but not by much n_conns = self._pre_w * self._pre_h * self._kernel_w * self._kernel_h return self._get_weight_maximum( synapse_info.weights, n_conns, synapse_info)
[docs] @overrides(AbstractConnector.get_weight_mean) def get_weight_mean(self, weights: Weights, synapse_info: SynapseInformation) -> float: # Use the kernel weights if user has supplied them if self._krn_weights is not None: return float(numpy.mean(self._krn_weights)) return super().get_weight_mean(weights, synapse_info)
[docs] @overrides(AbstractConnector.get_weight_variance) def get_weight_variance(self, weights: Weights, synapse_info: SynapseInformation) -> float: # Use the kernel weights if user has supplied them if self._krn_weights is not None: return float(numpy.var(self._krn_weights)) return super().get_weight_variance(weights, synapse_info)
def __repr__(self) -> str: return \ f"KernelConnector(shape_kernel[{self._kernel_w},{self._kernel_h}])"
[docs] @overrides(AbstractGenerateConnectorOnHost.create_synaptic_block) def create_synaptic_block( self, post_slices: Sequence[Slice], post_vertex_slice: Slice, synapse_type: int, synapse_info: SynapseInformation) -> NDArray: (n_connections, all_post, all_pre_in_range, all_pre_in_range_delays, all_pre_in_range_weights) = self.__compute_statistics( synapse_info.weights, synapse_info.delays, post_vertex_slice, synapse_info.n_pre_neurons) syn_dtypes = AbstractConnector.NUMPY_SYNAPSES_DTYPE if n_connections <= 0: return numpy.zeros(0, dtype=syn_dtypes) # 0 for excitatory, 1 for inhibitory syn_type = numpy.array(all_pre_in_range_weights < 0) block = numpy.zeros(n_connections, dtype=syn_dtypes) block["source"] = all_pre_in_range block["target"] = all_post block["weight"] = all_pre_in_range_weights block["delay"] = all_pre_in_range_delays block["synapse_type"] = syn_type.astype('uint8') return block
@property @overrides(AbstractGenerateConnectorOnMachine.gen_connector_id) def gen_connector_id(self) -> int: return ConnectorIDs.KERNEL_CONNECTOR.value
[docs] @overrides(AbstractGenerateConnectorOnMachine.gen_connector_params) def gen_connector_params( self, synapse_info: SynapseInformation) -> NDArray[uint32]: data = numpy.array([ shape2word(self._common_w, self._common_h), shape2word(self._pre_w, self._pre_h), shape2word(self._post_w, self._post_h), shape2word(self._pre_start_w, self._pre_start_h), shape2word(self._post_start_w, self._post_start_h), shape2word(self._pre_step_w, self._pre_step_h), shape2word(self._post_step_w, self._post_step_h), shape2word(self._kernel_w, self._kernel_h), shape2word(int(self._krn_weights is not None), int(self._krn_delays is not None))], dtype=uint32) extra_data = [] if self._krn_weights is not None: extra_data.append(DataType.S1615.encode_as_numpy_int_array( self._krn_weights.flatten())) if self._krn_delays is not None: extra_data.append(DataType.S1615.encode_as_numpy_int_array( self._krn_delays.flatten())) if extra_data: return numpy.concatenate((data, *extra_data)) return data
@property @overrides( AbstractGenerateConnectorOnMachine.gen_connector_params_size_in_bytes) def gen_connector_params_size_in_bytes(self) -> int: size = N_KERNEL_PARAMS * BYTES_PER_WORD if self._krn_weights is not None: size += sum(len(x) for x in self._krn_weights) * BYTES_PER_WORD if self._krn_delays is not None: size += sum(len(x) for x in self._krn_delays) * BYTES_PER_WORD return size
[docs] @overrides(AbstractGenerateConnectorOnMachine.get_connected_vertices) def get_connected_vertices( self, s_info: SynapseInformation, source_vertex: ApplicationVertex, target_vertex: ApplicationVertex) -> Sequence[ Tuple[MachineVertex, Sequence[AbstractVertex]]]: src_splitter = source_vertex.splitter return [ (t_vert, [s_vert for s_vert in src_splitter.get_out_going_vertices( s_info.partition_id) if self.__connects(s_vert, t_vert)]) for t_vert in target_vertex.splitter.get_in_coming_vertices( s_info.partition_id)]
def __connects(self, src_machine_vertex: MachineVertex, dest_machine_vertex: MachineVertex) -> bool: # If the pre- and post-slices are not 2-dimensional slices, we have # to let them pass pre_slice = src_machine_vertex.vertex_slice post_slice = dest_machine_vertex.vertex_slice if (pre_slice.shape is None or len(pre_slice.shape) != 2 or post_slice.shape is None or len(post_slice.shape) != 2): return True pre_slice_x = pre_slice.get_slice(0) pre_slice_y = pre_slice.get_slice(1) post_slice_x = post_slice.get_slice(0) post_slice_y = post_slice.get_slice(1) min_pre_x = post_slice_x.start - self._hlf_k_w max_pre_x = (post_slice_x.stop + self._hlf_k_w) - 1 min_pre_y = post_slice_y.start - self._hlf_k_h max_pre_y = (post_slice_y.stop + self._hlf_k_h) - 1 # No part of the pre square overlaps the post-square, don't connect if (pre_slice_x.stop <= min_pre_x or pre_slice_x.start > max_pre_x or pre_slice_y.stop <= min_pre_y or pre_slice_y.start > max_pre_y): return False # Otherwise, they do return True
[docs] @overrides(AbstractConnector.validate_connection) def validate_connection( self, application_edge: ProjectionApplicationEdge, synapse_info: SynapseInformation) -> None: pre = application_edge.pre_vertex post = application_edge.post_vertex if len(pre.atoms_shape) != 1 or len(post.atoms_shape) != 1: raise ConfigurationException( "The Kernel Connector is designed to work with 1D vertices")