# Copyright (c) 2021 The University of Manchester
# Based on work Copyright (c) The University of Sussex,
# Garibaldi Pineda Garcia, James Turner, James Knight and Thomas Nowotny
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from collections.abc import (Iterable, Sequence)
from typing import (
List, Optional, Sequence as TSequence, Tuple, Union,
cast, overload, TYPE_CHECKING)
import numpy
from numpy import floating, float64, integer, int16, uint16, uint32
from numpy.typing import NDArray
from pyNN.random import RandomDistribution
from spinn_utilities.overrides import overrides
from pacman.model.graphs.abstract_vertex import AbstractVertex
from pacman.model.graphs.application import ApplicationVertex
from pacman.model.graphs.common import Slice
from pacman.model.graphs.machine import MachineVertex
from spinn_front_end_common.utilities.constants import (
BYTES_PER_SHORT, BYTES_PER_WORD)
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spynnaker.pyNN.exceptions import SynapticConfigurationException
from spynnaker.pyNN.models.common.local_only_2d_common import get_div_const
from .abstract_connector import AbstractConnector
if TYPE_CHECKING:
from spynnaker.pyNN.models.neural_projections import (
ProjectionApplicationEdge, SynapseInformation)
#: The size of the connector struct in bytes
CONNECTOR_CONFIG_SIZE = (10 * BYTES_PER_SHORT) + (4 * BYTES_PER_WORD)
_Weights = Union[
int, float, List[Union[int, float]], Tuple[Union[int, float], ...],
NDArray[float64], RandomDistribution]
_Shape = Union[int, Tuple[int, int], None]
_Padding = Union[bool, _Shape]
class ConvolutionConnector(AbstractConnector):
"""
A 2D connector that centres on a post neuron.
Special connector which dynamically maps a 2D kernel over a 2D plane,
so that when the kernel is centred on a post-neuron,
that post-neuron receives input from each pre-neuron
that the kernel touches,
and the weights of those pre-post connections
are then the values of the kernel.
"""
__slots__ = (
"__kernel_weights",
"__strides",
"__padding_shape",
"__pool_shape",
"__pool_stride",
"__positive_receptor_type",
"__negative_receptor_type",
"__filter_edges"
)
def __init__(self, kernel_weights: _Weights,
kernel_shape: _Shape = None,
strides: _Shape = None, padding: _Padding = None,
pool_shape: _Shape = None, pool_stride: _Shape = None,
positive_receptor_type: str = "excitatory",
negative_receptor_type: str = "inhibitory",
safe=True, verbose=False, callback=None, filter_edges=True):
"""
:param kernel_weights:
The synaptic strengths, shared by neurons in the post population.
Can be:
* single value: `kernel_shape` must be provided;
the same value will be used for all weights
* simple list: `kernel_shape` must be provided; the list must
be sized shape width * height
* 2D list: If `kernel_shape` is provided, it must match
* :py:class:`~numpy.ndarray`: As above for simple or 2D list
* :py:class:`~spynnaker.pyNN.RandomDistribution`:
`kernel_shape` must be provided; weights will be drawn from the
distribution
:type kernel_weights:
int or list or ~numpy.ndarray or ~spynnaker.pyNN.RandomDistribution
:param kernel_shape:
The shape of the kernel if it cannot be determined from
`kernel_weights`. If a single value is provided, a square kernel
will be assumed. If two values are provided, it will be assumed to
be (n_rows, n_columns)
:type kernel_shape: int or tuple(int,int)
:param strides:
Spatial sampling frequency, jumps between the post neurons.
This matches the meaning of standard ML packages. If a single
value is provided, the same stride will be used for rows and
columns. If two values are provided it will be assumed to be
(stride_rows, stride_columns)
:type strides: int or tuple(int, int)
:param padding:
How many 'extra pixels' around the pre-population will be added,
only zero-valued pixels are currently supported. If a single
value is provided, the same padding will be used for rows and
columns. If two values are provided it will be assumed to be
`(padding_rows, padding_columns)`. If True, automatic padding will
be used based on the kernel shape. If False or `None`, no padding
will be used.
:type padding: bool or int or tuple(int, int) or None
:param pool_shape:
Area of pooling, only average pooling is supported (and seems to
make sense). If a single value is provided, the pooling area will
be square. If two values are provided it will be assumed to be
`(pooling_rows, pooling_columns)`.
:type pool_shape: int or tuple(int, int) or None
:param pool_stride:
Jumps between pooling regions. If a single value is provided, the
same stride will be used for rows and columns. If two values are
provided it will be assumed to be `(stride_rows, stride_columns)`
:type pool_stride: int or tuple(int, int) or None
:param str positive_receptor_type:
The receptor type to add the positive weights to. By default this
is "``excitatory``".
:param str negative_receptor_type:
The receptor type to add the negative weights to. By default this
is "``inhibitory``".
:param bool safe: (ignored)
:param bool verbose: (ignored)
:param callable callback: (ignored)
:param bool filter_edges:
Whether to filter the edges based on connectivity or not; filtered
means that the receiving cores will receive fewer packets, whereas
non-filtered means that receiving cores will receive all packets
whether relevant or not. However non-filtered may be more
efficient in the routing tables, so may be needed if routing
compression doesn't work.
"""
super().__init__(safe=safe, callback=callback, verbose=verbose)
self.__kernel_weights = self.__decode_kernel(
kernel_weights, kernel_shape)
self.__padding_shape = self.__decode_padding(padding)
self.__filter_edges = filter_edges
if strides is None:
self.__strides = numpy.array((1, 1), dtype=integer)
else:
self.__strides = self.__to_2d_shape(strides, "strides")
self.__pool_shape = self.__to_2d_shape(pool_shape, "pool_shape")
self.__pool_stride = self.__to_2d_shape(pool_stride, "pool_stride")
if self.__pool_stride is None:
self.__pool_stride = self.__pool_shape
if self.__pool_shape is not None:
self.__kernel_weights /= numpy.prod(self.__pool_shape)
self.__positive_receptor_type = positive_receptor_type
self.__negative_receptor_type = negative_receptor_type
@property
def positive_receptor_type(self) -> str:
"""
The receptor type to add the positive weights to.
:rtype: str
"""
return self.__positive_receptor_type
@property
def negative_receptor_type(self) -> str:
"""
The receptor type to add the negative weights to.
:rtype: str
"""
return self.__negative_receptor_type
@property
def kernel_weights(self) -> NDArray[float64]:
"""
The weights for this connection.
:rtype: ndarray
"""
return self.__kernel_weights
def __get_kernel_shape(self, shape: _Shape) -> Tuple[int, int]:
if shape is None:
raise SynapticConfigurationException(
"kernel_shape must be provided")
if numpy.isscalar(shape):
assert isinstance(shape, int)
return (shape, shape)
if isinstance(shape, tuple) and len(shape) == 2:
return shape
raise SynapticConfigurationException(f"Unknown kernel_shape: {shape}")
def __decode_kernel(self, w: _Weights, shape: _Shape) -> NDArray[float64]:
if isinstance(w, (int, float)):
_shape = self.__get_kernel_shape(shape)
return numpy.full(_shape, w, dtype=float64)
elif isinstance(w, (Sequence, numpy.ndarray)):
if all(isinstance(lst, (Sequence, numpy.ndarray)) for lst in w):
ws = cast(TSequence[TSequence[float]], w)
len0 = len(ws[0])
# 2D list
if not all(len(lst) == len0 for lst in ws):
raise SynapticConfigurationException(
"kernel_weights must be a 2D array with every row the"
" same length")
return numpy.array(w, dtype=float64)
else:
# 1D list
_shape = self.__get_kernel_shape(shape)
return numpy.array(w, dtype=float64).reshape(_shape)
elif isinstance(w, RandomDistribution):
_shape = self.__get_kernel_shape(shape)
return numpy.array(
w.next(numpy.prod(_shape)), dtype=float64).reshape(_shape)
else:
raise SynapticConfigurationException(
f"Unknown combination of kernel_weights ({w}) and"
f" kernel_shape ({shape})")
@overload
@staticmethod
def __to_2d_shape(shape: Union[int, Tuple[int, int]],
param_name: str) -> NDArray[integer]:
...
@overload
@staticmethod
def __to_2d_shape(shape: None, param_name: str) -> None:
...
@staticmethod
def __to_2d_shape(shape: _Shape, param_name: str) -> Optional[
NDArray[integer]]:
if shape is None:
return None
if numpy.isscalar(shape):
return numpy.array([shape, shape], dtype=integer)
assert isinstance(shape, tuple)
if len(shape) == 1:
return numpy.array([shape[0], 1], dtype=integer)
elif len(shape) == 2:
return numpy.array(shape, dtype=integer)
raise SynapticConfigurationException(
f"{param_name} must be an int or a tuple(int, int)")
def __decode_padding(self, padding: _Padding) -> NDArray[integer]:
if isinstance(padding, (int, Iterable)):
return self.__to_2d_shape(padding, "padding")
elif padding is None or padding is False:
return numpy.zeros(2, dtype=integer)
elif padding:
return self.__kernel_weights.shape // 2
else:
raise SynapticConfigurationException(
f"Unrecognized padding {padding}")
[docs]
def get_post_shape(self, shape: Tuple[int, ...]):
"""
Get the shape of the post image given the pre-image shape.
"""
_shape = numpy.array(shape)
if self.__pool_shape is not None:
_shape = _shape // self.__pool_stride
kernel_shape = numpy.array(self.__kernel_weights.shape)
post_shape = (_shape - (kernel_shape - 1) +
(2 * self.__padding_shape))
return tuple(int(i) for i in numpy.clip(
post_shape // self.__strides, 1, numpy.inf).astype(integer))
[docs]
@overrides(AbstractConnector.validate_connection)
def validate_connection(
self, application_edge: ProjectionApplicationEdge,
synapse_info: SynapseInformation):
pre = application_edge.pre_vertex
post = application_edge.post_vertex
if len(pre.atoms_shape) != 2 or len(post.atoms_shape) != 2:
raise ConfigurationException(
"The ConvolutionConnector only works where the Populations"
" of a Projection are both 2D. Please ensure that both the"
" Populations use a Grid2D structure.")
expected_post_shape = tuple(self.get_post_shape(pre.atoms_shape))
if expected_post_shape != post.atoms_shape:
raise ConfigurationException(
f"With a source population with shape {pre.atoms_shape}, "
"for a Convolution connector with the given parameters, "
"the post-population must have a shape "
f"{expected_post_shape}")
if post.get_synapse_id_by_target(
self.__positive_receptor_type) is None:
raise ConfigurationException(
"The post population doesn't have a synaptic receptor type of"
f" {self.__positive_receptor_type}")
if post.get_synapse_id_by_target(
self.__negative_receptor_type) is None:
raise ConfigurationException(
"The post population doesn't have a synaptic receptor type of"
f" {self.__negative_receptor_type}")
if not isinstance(synapse_info.delays, float):
raise ConfigurationException(
"The ConvolutionConnector only supports simple uniform delays")
@staticmethod
def __delay(synapse_info: SynapseInformation) -> float:
# Checked by validate_connection above
return cast(float, synapse_info.delays)
[docs]
@overrides(AbstractConnector.get_delay_minimum)
def get_delay_minimum(self, synapse_info: SynapseInformation) -> float:
return self.__delay(synapse_info)
[docs]
@overrides(AbstractConnector.get_delay_maximum)
def get_delay_maximum(self, synapse_info: SynapseInformation) -> float:
return self.__delay(synapse_info)
[docs]
@overrides(AbstractConnector.get_n_connections_from_pre_vertex_maximum)
def get_n_connections_from_pre_vertex_maximum(
self, n_post_atoms: int, synapse_info: SynapseInformation,
min_delay: Optional[float] = None,
max_delay: Optional[float] = None) -> int:
if min_delay is not None and max_delay is not None:
if not (min_delay <= self.__delay(synapse_info) <= max_delay):
return 0
w, h = self.__kernel_weights.shape
return numpy.clip(w * h, 0, n_post_atoms)
[docs]
@overrides(AbstractConnector.get_n_connections_to_post_vertex_maximum)
def get_n_connections_to_post_vertex_maximum(
self, synapse_info: SynapseInformation) -> int:
w, h = self.__kernel_weights.shape
return numpy.clip(w * h, 0, synapse_info.n_pre_neurons)
[docs]
@overrides(AbstractConnector.get_weight_maximum)
def get_weight_maximum(self, synapse_info: SynapseInformation) -> float:
return float(numpy.amax(self.__kernel_weights))
[docs]
@overrides(AbstractConnector.get_connected_vertices)
def get_connected_vertices(
self, s_info: SynapseInformation,
source_vertex: ApplicationVertex,
target_vertex: ApplicationVertex) -> Sequence[
Tuple[MachineVertex, Sequence[AbstractVertex]]]:
if not self.__filter_edges:
return super(ConvolutionConnector, self).get_connected_vertices(
s_info, source_vertex, target_vertex)
pre_vertices = numpy.array(
source_vertex.splitter.get_out_going_vertices(s_info.partition_id))
post_slice_ranges = self.__pre_as_post_slice_ranges(
m_vertex.vertex_slice for m_vertex in pre_vertices)
hlf_k_w, hlf_k_h = numpy.array(self.__kernel_weights.shape) // 2
connected: List[Tuple[MachineVertex, List[MachineVertex]]] = []
for post in target_vertex.splitter.get_in_coming_vertices(
s_info.partition_id):
post_slice = post.vertex_slice
post_slice_x = post_slice.get_slice(0)
post_slice_y = post_slice.get_slice(1)
# Get ranges allowed in post
min_x = post_slice_x.start - hlf_k_w
max_x = (post_slice_x.stop + hlf_k_w) - 1
min_y = post_slice_y.start - hlf_k_h
max_y = (post_slice_y.stop + hlf_k_h) - 1
# Test that the start coordinates are in range i.e. less than max
start_in_range = numpy.logical_not(
numpy.any(post_slice_ranges[:, 0] > [max_x, max_y], axis=1))
# Test that the end coordinates are in range i.e. more than min
end_in_range = numpy.logical_not(
numpy.any(post_slice_ranges[:, 1] < [min_x, min_y], axis=1))
# When both things are true, we have a vertex in range
pre_in_range = pre_vertices[
numpy.logical_and(start_in_range, end_in_range)]
connected.append((post, list(pre_in_range)))
return connected
def __pre_as_post_slice_ranges(
self, slices: Iterable[Slice]) -> NDArray[integer]:
"""
Convert a generator of (multi-dimensional) pre-slices into an array of
post-slices.
"""
pre_slices = ((s.get_slice(0), s.get_slice(1)) for s in slices)
coords = numpy.array([
((px.start, py.start), (px.stop - 1, py.stop - 1))
for px, py in pre_slices])
if self.__pool_stride is not None:
coords //= self.__pool_stride
kernel_shape = numpy.array(self.__kernel_weights.shape)
coords = coords - kernel_shape // 2 + self.__padding_shape
coords //= self.__strides
return coords
@property
def kernel_n_bytes(self) -> int:
"""
Size of the weights in bytes
:rtype: int
"""
n_weights = self.__kernel_weights.size
return n_weights * BYTES_PER_SHORT
@property
def kernel_n_weights(self) -> int:
"""
Size of the weights.
:rtype: int
"""
return self.__kernel_weights.size
@property
def parameters_n_bytes(self) -> int:
"""
:rtype: int
"""
return CONNECTOR_CONFIG_SIZE
[docs]
def get_local_only_data(
self, app_edge: ProjectionApplicationEdge, local_delay: int,
delay_stage: int, weight_index: int) -> NDArray[uint32]:
"""
Gets the local only data
:param ProjectionApplicationEdge app_edge:
:param int local_delay:
:param int delay_stage:
:param int weight_index:
:rtype: ndarray
"""
# Get info about things
kernel_shape = self.__kernel_weights.shape
ps_x, ps_y = 1, 1
if self.__pool_stride is not None:
ps_x, ps_y = self.__pool_stride
# Do a new list for remaining connector details as uint16s
pos_synapse_type = app_edge.post_vertex.get_synapse_id_by_target(
self.__positive_receptor_type)
neg_synapse_type = app_edge.post_vertex.get_synapse_id_by_target(
self.__negative_receptor_type)
# Produce the values needed
short_values = numpy.array([
kernel_shape[1], kernel_shape[0],
self.__padding_shape[1], self.__padding_shape[0],
pos_synapse_type, neg_synapse_type, delay_stage, local_delay,
weight_index, 0], dtype=uint16)
long_values = numpy.array([
get_div_const(self.__strides[1]), get_div_const(self.__strides[0]),
get_div_const(ps_y), get_div_const(ps_x)], dtype=uint32)
return numpy.concatenate((short_values.view(uint32), long_values))
[docs]
def get_encoded_kernel_weights(
self, app_edge: ProjectionApplicationEdge,
weight_scales: NDArray[floating]) -> NDArray[int16]:
"""
Encode weights with weight scaling.
:param ProjectionApplicationEdge app_edge:
:param ndarray weight_scales:
:rtype: ndarray
"""
encoded_kernel_weights = self.__kernel_weights.flatten()
neg_weights = encoded_kernel_weights < 0
pos_weights = encoded_kernel_weights > 0
pos_synapse_type = app_edge.post_vertex.get_synapse_id_by_target(
self.__positive_receptor_type)
neg_synapse_type = app_edge.post_vertex.get_synapse_id_by_target(
self.__negative_receptor_type)
encoded_kernel_weights[neg_weights] *= weight_scales[neg_synapse_type]
encoded_kernel_weights[pos_weights] *= weight_scales[pos_synapse_type]
return numpy.round(encoded_kernel_weights).astype(int16)