# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import Dict, List, Optional, Sequence, Tuple, Union, TYPE_CHECKING
import numpy
from numpy import floating, integer, ndarray, uint32
from numpy.typing import NDArray
from typing_extensions import TypeAlias
from pyNN.random import RandomDistribution
from pyNN.space import Space
from spinn_utilities.overrides import overrides
from pacman.model.graphs import AbstractVertex
from pacman.model.graphs.application import ApplicationVertex
from pacman.model.graphs.machine import MachineVertex
from pacman.model.graphs.common import Slice
from spinn_front_end_common.interface.ds import DataType
from spinn_front_end_common.utilities.constants import BYTES_PER_WORD
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spynnaker.pyNN.exceptions import SpynnakerException
from spynnaker.pyNN.types import (
Delay_Types, Weight_Delay_Types, Weight_Types)
from .abstract_connector import AbstractConnector
from .abstract_generate_connector_on_machine import (
AbstractGenerateConnectorOnMachine, ConnectorIDs)
from .abstract_generate_connector_on_host import (
AbstractGenerateConnectorOnHost)
if TYPE_CHECKING:
from spynnaker.pyNN.models.neural_projections import (
ProjectionApplicationEdge, SynapseInformation)
_TwoD: TypeAlias = Union[List[int], Tuple[int, int]]
_Kernel: TypeAlias = Union[
float, int, List[float], NDArray[numpy.floating], RandomDistribution]
HEIGHT, WIDTH = 0, 1
N_KERNEL_PARAMS = 9
class ConvolutionKernel(ndarray):
"""
Thin wrapper around a numpy array.
"""
def shape2word(
short1: Union[int, integer], short2: Union[int, integer]) -> uint32:
"""
Combines two short values into 1 int by shifting the first 16 places
:param int short1: first 2 byte value
:param int short2: second 2 bytes value
:rtype: int
"""
return uint32(((uint32(short2) & 0xFFFF) << 16)
| (uint32(short1) & 0xFFFF))
class KernelConnector(AbstractGenerateConnectorOnMachine,
AbstractGenerateConnectorOnHost):
"""
Where the pre- and post-synaptic populations are considered as a 2D
array. Connect every post(row, column) neuron to many
pre(row, column, kernel)
through a (kernel) set of weights and/or delays.
"""
__slots__ = (
"_kernel_w", "_kernel_h",
"_hlf_k_w", "_hlf_k_h",
"_pre_w", "_pre_h",
"_post_w", "_post_h",
"_pre_start_w", "_pre_start_h",
"_post_start_w", "_post_start_h",
"_pre_step_w", "_pre_step_h",
"_post_step_w", "_post_step_h",
"_krn_weights", "_krn_delays", "_shape_common",
"_common_w", "_common_h",
"_shape_pre", "_shape_post",
"_post_as_pre")
def __init__(
self, shape_pre: _TwoD, shape_post: _TwoD, shape_kernel: _TwoD,
weight_kernel: Optional[_Kernel] = None,
delay_kernel: Optional[_Kernel] = None,
shape_common: Optional[_TwoD] = None,
pre_sample_steps_in_post: Optional[_TwoD] = None,
pre_start_coords_in_post: Optional[_TwoD] = None,
post_sample_steps_in_pre: Optional[_TwoD] = None,
post_start_coords_in_pre: Optional[_TwoD] = None,
safe: bool = True, space: Optional[Space] = None,
verbose: bool = False, callback: None = None):
"""
:param shape_pre:
2D shape of the pre-population (rows/height, columns/width, usually
the input image shape)
:type shape_pre: list(int) or tuple(int,int)
:param shape_post:
2D shape of the post-population (rows/height, columns/width)
:type shape_post: list(int) or tuple(int,int)
:param shape_kernel:
2D shape of the kernel (rows/height, columns/width)
:type shape_kernel: list(int) or tuple(int,int)
:param weight_kernel: (optional)
2D matrix of size shape_kernel describing the weights
:type weight_kernel: ~numpy.ndarray or ~pyNN.random.RandomDistribution
or int or float or list(int) or list(float) or None
:param delay_kernel: (optional)
2D matrix of size shape_kernel describing the delays
:type delay_kernel: ~numpy.ndarray or ~pyNN.random.RandomDistribution
or int or float or list(int) or list(float) or None
:param shape_common: (optional)
2D shape of common coordinate system (for both pre- and post-,
usually the input image sizes)
:type shape_common: list(int) or tuple(int,int) or None
:param pre_sample_steps_in_post: (optional)
Sampling steps/jumps for pre-population <=> (stepX, stepY)
:type pre_sample_steps_in_post: None or list(int) or tuple(int,int)
:param pre_start_coords_in_post: (optional)
Starting row/column for pre-population sampling <=> (offX, offY)
:type pre_start_coords_in_post: None or list(int) or tuple(int,int)
:param post_sample_steps_in_pre: (optional)
Sampling steps/jumps for post-population <=> (stepX, stepY)
:type post_sample_steps_in_pre: None or list(int) or tuple(int,int)
:param post_start_coords_in_pre: (optional)
Starting row/column for post-population sampling <=> (offX, offY)
:type post_start_coords_in_pre: None or list(int) or tuple(int,int)
:param bool safe:
Whether to check that weights and delays have valid values.
If ``False``, this check is skipped.
:param ~pyNN.space.Space space:
Currently ignored; for future compatibility.
:param bool verbose:
Whether to output extra information about the connectivity to a
CSV file
:param callable callback: (ignored)
"""
super().__init__(safe=safe, callback=callback, verbose=verbose)
assert space is None, "non-None space unsupported"
# Get the kernel size
self._kernel_w = shape_kernel[WIDTH]
self._kernel_h = shape_kernel[HEIGHT]
# The half-value used here indicates the half-way array position
self._hlf_k_w = shape_kernel[WIDTH] // 2
self._hlf_k_h = shape_kernel[HEIGHT] // 2
# Cache values for the pre and post sizes
self._pre_w = shape_pre[WIDTH]
self._pre_h = shape_pre[HEIGHT]
self._post_w = shape_post[WIDTH]
self._post_h = shape_post[HEIGHT]
# Get the starting coordinates and step sizes
# (or defaults if not given)
if pre_start_coords_in_post is None:
self._pre_start_w = 0
self._pre_start_h = 0
else:
self._pre_start_w = pre_start_coords_in_post[WIDTH]
self._pre_start_h = pre_start_coords_in_post[HEIGHT]
if post_start_coords_in_pre is None:
self._post_start_w = 0
self._post_start_h = 0
else:
self._post_start_w = post_start_coords_in_pre[WIDTH]
self._post_start_h = post_start_coords_in_pre[HEIGHT]
if pre_sample_steps_in_post is None:
self._pre_step_w = 1
self._pre_step_h = 1
else:
self._pre_step_w = pre_sample_steps_in_post[WIDTH]
self._pre_step_h = pre_sample_steps_in_post[HEIGHT]
if post_sample_steps_in_pre is None:
self._post_step_w = 1
self._post_step_h = 1
else:
self._post_step_w = post_sample_steps_in_pre[WIDTH]
self._post_step_h = post_sample_steps_in_pre[HEIGHT]
# Make sure the supplied values are in the correct format
self._krn_weights = self.__get_kernel_vals(weight_kernel)
self._krn_delays = self.__get_kernel_vals(delay_kernel)
self._shape_common = \
shape_pre if shape_common is None else shape_common
self._common_w = self._shape_common[WIDTH]
self._common_h = self._shape_common[HEIGHT]
self._shape_pre = shape_pre
self._shape_post = shape_post
# Create storage for later
self._post_as_pre: Dict[
Slice, Tuple[NDArray[integer], NDArray[integer]]] = {}
def __to_post_coords(
self, post_vertex_slice: Slice) -> Tuple[
NDArray[integer], NDArray[integer]]:
"""
Get a list of possible post-slice coordinates.
:param ~pacman.model.graphs.common.Slice post_vertex_slice:
:rtype: tuple(~numpy.ndarray, ~numpy.ndarray)
"""
post = numpy.arange(
post_vertex_slice.lo_atom, post_vertex_slice.hi_atom + 1,
dtype=uint32)
return numpy.divmod(post, self._post_w)
def __map_to_pre_coords(
self, post_r: NDArray[integer], post_c: NDArray[integer]) -> Tuple[
NDArray[integer], NDArray[integer]]:
"""
Get a map from post to pre-population coordinates.
:param ~numpy.ndarray post_r: rows
:param ~numpy.ndarray post_c: columns
:rtype: tuple(~numpy.ndarray, ~numpy.ndarray)
"""
return (self._post_start_h + post_r * self._post_step_h,
self._post_start_w + post_c * self._post_step_w)
def __post_as_pre(self, post_vertex_slice: Slice) -> Tuple[
NDArray[integer], NDArray[integer]]:
"""
Write post-population coordinates as pre-population coordinates.
:param ~pacman.model.graphs.common.Slice post_vertex_slice:
:rtype: tuple(~numpy.ndarray, ~numpy.ndarray)
"""
# directly as the cache index
if post_vertex_slice not in self._post_as_pre:
post_r, post_c = self.__to_post_coords(post_vertex_slice)
self._post_as_pre[post_vertex_slice] = \
self.__map_to_pre_coords(post_r, post_c)
return self._post_as_pre[post_vertex_slice]
def __pre_as_post(self, pre_r: int, pre_c: int) -> Tuple[int, int]:
"""
Write pre-population coordinates as post-population coordinates.
:param int pre_r: row
:param int pre_c: column
:rtype: tuple(int,int)
"""
r = ((pre_r - self._pre_start_h - 1) // self._pre_step_h) + 1
c = ((pre_c - self._pre_start_w - 1) // self._pre_step_w) + 1
return (r, c)
def __get_kernel_vals(self, values: Optional[Union[
_Kernel, Weight_Delay_Types]]) -> Optional[ConvolutionKernel]:
"""
Convert kernel values given into the correct format.
:param values:
:type values: int or float or ~pyNN.random.RandomDistribution
or ~numpy.ndarray or ConvolutionKernel
:rtype: ~numpy.ndarray
"""
if values is None:
return None
if isinstance(values, list):
values = numpy.asarray(values)
krn_size = self._kernel_h * self._kernel_w
krn_shape = (self._kernel_h, self._kernel_w)
if isinstance(values, RandomDistribution):
return numpy.array(values.next(krn_size)).reshape(krn_shape).view(
ConvolutionKernel)
elif numpy.isscalar(values):
return numpy.full(krn_shape, values).view(ConvolutionKernel)
elif ((isinstance(values, numpy.ndarray) or
isinstance(values, ConvolutionKernel)) and
values.shape[HEIGHT] == self._kernel_h and
values.shape[WIDTH] == self._kernel_w):
return values.view(ConvolutionKernel)
raise SpynnakerException(
"Error generating KernelConnector values; if you have supplied "
"weight and/or delay kernel then ensure they are the same size "
"as specified by the shape kernel values (height: "
f"{self._kernel_h} and width: {self._kernel_w}).")
def __compute_statistics(
self, weights: Optional[Weight_Types],
delays: Optional[Delay_Types], post_vertex_slice: Slice,
n_pre_neurons: int) -> Tuple[
int, NDArray[uint32], NDArray[uint32], NDArray[floating],
NDArray[floating]]:
"""
Compute the relevant information required for the connections.
:param weights:
:type weights: int or float or ~pyNN.random.RandomDistribution or
~numpy.ndarray or ConvolutionKernel
:param delays:
:type delays: int or float or ~pyNN.random.RandomDistribution or
~numpy.ndarray or ConvolutionKernel
:param ~pacman.model.graphs.common.Slice post_vertex_slice:
"""
# If __compute_statistics is called more than once, there's
# no need to get the user-supplied weights and delays again
if self._krn_weights is None:
self._krn_weights = self.__get_kernel_vals(weights)
if self._krn_delays is None:
self._krn_delays = self.__get_kernel_vals(delays)
assert self._krn_weights is not None
assert self._krn_delays is not None
post_as_pre_r, post_as_pre_c = self.__post_as_pre(post_vertex_slice)
coords: Dict[int, List[int]] = {}
hh, hw = self._hlf_k_h, self._hlf_k_w
all_pre_ids: List[int] = []
all_post_ids: List[int] = []
all_delays: List[NDArray[floating]] = []
all_weights: List[NDArray[floating]] = []
count = 0
post_lo = post_vertex_slice.lo_atom
# Loop over pre-vertices
for pre_idx in range(n_pre_neurons):
pre_r, pre_c = divmod(pre_idx, self._pre_w)
coords[pre_idx] = []
# Test whether the coordinates should be included based on the
# step function (in the pre) and skip if not
if not (((pre_r - self._pre_start_h) % self._pre_step_h == 0) and
((pre_c - self._pre_start_w) % self._pre_step_w == 0)):
continue
# Loop over post-vertices
for post_idx in range(
post_vertex_slice.lo_atom, post_vertex_slice.hi_atom + 1):
# convert to common coordinate system
pac_r = post_as_pre_r[post_idx - post_lo]
pac_c = post_as_pre_c[post_idx - post_lo]
# now convert common to pre coordinates
pap_r, pap_c = self.__pre_as_post(pac_r, pac_c)
# Obtain coordinates to test against kernel sizes
dr = pap_r - pre_r
kr = hh - dr
dc = pap_c - pre_c
kc = hw - dc
if 0 <= kr < self._kernel_h and 0 <= kc < self._kernel_w:
if post_idx in coords[pre_idx]:
continue
coords[pre_idx].append(post_idx)
# Store weights, delays and pre/post ids
w = self._krn_weights[kr, kc]
d = self._krn_delays[kr, kc]
count += 1
all_pre_ids.append(pre_idx)
all_post_ids.append(post_idx)
all_delays.append(d)
all_weights.append(w)
# Now the loop is complete, return relevant data
return (count, numpy.array(all_post_ids, dtype=uint32),
numpy.array(all_pre_ids, dtype=uint32),
numpy.array(all_delays), numpy.array(all_weights))
[docs]
@overrides(AbstractConnector.get_delay_maximum)
def get_delay_maximum(self, synapse_info: SynapseInformation) -> float:
# Use the kernel delays if user has supplied them
if self._krn_delays is not None:
return numpy.max(self._krn_delays)
# I think this is overestimated, but not by much
n_conns = (
self._pre_w * self._pre_h * self._kernel_w * self._kernel_h)
# if not then use the values that came in
return self._get_delay_maximum(
synapse_info.delays, n_conns, synapse_info)
[docs]
@overrides(AbstractConnector.get_delay_minimum)
def get_delay_minimum(self, synapse_info: SynapseInformation) -> float:
# Use the kernel delays if user has supplied them
if self._krn_delays is not None:
return numpy.min(self._krn_delays)
# I think this is overestimated, but not by much
n_conns = (
self._pre_w * self._pre_h * self._kernel_w * self._kernel_h)
# if not then use the values that came in
return self._get_delay_minimum(
synapse_info.delays, n_conns, synapse_info)
[docs]
@overrides(AbstractConnector.get_delay_variance)
def get_delay_variance(self, delays: Delay_Types,
synapse_info: SynapseInformation) -> float:
if self._krn_delays is not None:
return float(numpy.var(self._krn_delays))
return super().get_delay_variance(delays, synapse_info)
[docs]
@overrides(AbstractConnector.get_n_connections_from_pre_vertex_maximum)
def get_n_connections_from_pre_vertex_maximum(
self, n_post_atoms: int, synapse_info: SynapseInformation,
min_delay: Optional[float] = None,
max_delay: Optional[float] = None) -> int:
return numpy.clip(self._kernel_h * self._kernel_w, 0, n_post_atoms)
[docs]
@overrides(AbstractConnector.get_n_connections_to_post_vertex_maximum)
def get_n_connections_to_post_vertex_maximum(
self, synapse_info: SynapseInformation) -> int:
return numpy.clip(self._kernel_h * self._kernel_w, 0, 255)
[docs]
@overrides(AbstractConnector.get_weight_maximum)
def get_weight_maximum(self, synapse_info: SynapseInformation) -> float:
# Use the kernel weights if user has supplied them
if self._krn_weights is not None:
return numpy.max(self._krn_weights)
# I think this is overestimated, but not by much
n_conns = self._pre_w * self._pre_h * self._kernel_w * self._kernel_h
return self._get_weight_maximum(
synapse_info.weights, n_conns, synapse_info)
[docs]
@overrides(AbstractConnector.get_weight_mean)
def get_weight_mean(self, weights: Weight_Types,
synapse_info: SynapseInformation) -> float:
# Use the kernel weights if user has supplied them
if self._krn_weights is not None:
return float(numpy.mean(self._krn_weights))
return super().get_weight_mean(weights, synapse_info)
[docs]
@overrides(AbstractConnector.get_weight_variance)
def get_weight_variance(self, weights: Weight_Types,
synapse_info: SynapseInformation) -> float:
# Use the kernel weights if user has supplied them
if self._krn_weights is not None:
return float(numpy.var(self._krn_weights))
return super().get_weight_variance(weights, synapse_info)
def __repr__(self):
return \
f"KernelConnector(shape_kernel[{self._kernel_w},{self._kernel_h}])"
[docs]
@overrides(AbstractGenerateConnectorOnHost.create_synaptic_block)
def create_synaptic_block(
self, post_slices: Sequence[Slice], post_vertex_slice: Slice,
synapse_type: int, synapse_info: SynapseInformation) -> NDArray:
(n_connections, all_post, all_pre_in_range, all_pre_in_range_delays,
all_pre_in_range_weights) = self.__compute_statistics(
synapse_info.weights, synapse_info.delays, post_vertex_slice,
synapse_info.n_pre_neurons)
syn_dtypes = AbstractConnector.NUMPY_SYNAPSES_DTYPE
if n_connections <= 0:
return numpy.zeros(0, dtype=syn_dtypes)
# 0 for excitatory, 1 for inhibitory
syn_type = numpy.array(all_pre_in_range_weights < 0)
block = numpy.zeros(n_connections, dtype=syn_dtypes)
block["source"] = all_pre_in_range
block["target"] = all_post
block["weight"] = all_pre_in_range_weights
block["delay"] = all_pre_in_range_delays
block["synapse_type"] = syn_type.astype('uint8')
return block
@property
@overrides(AbstractGenerateConnectorOnMachine.gen_connector_id)
def gen_connector_id(self) -> int:
return ConnectorIDs.KERNEL_CONNECTOR.value
[docs]
@overrides(AbstractGenerateConnectorOnMachine.gen_connector_params)
def gen_connector_params(
self, synapse_info: SynapseInformation) -> NDArray[uint32]:
data = numpy.array([
shape2word(self._common_w, self._common_h),
shape2word(self._pre_w, self._pre_h),
shape2word(self._post_w, self._post_h),
shape2word(self._pre_start_w, self._pre_start_h),
shape2word(self._post_start_w, self._post_start_h),
shape2word(self._pre_step_w, self._pre_step_h),
shape2word(self._post_step_w, self._post_step_h),
shape2word(self._kernel_w, self._kernel_h),
shape2word(int(self._krn_weights is not None),
int(self._krn_delays is not None))], dtype=uint32)
extra_data = []
if self._krn_weights is not None:
extra_data.append(DataType.S1615.encode_as_numpy_int_array(
self._krn_weights.flatten()))
if self._krn_delays is not None:
extra_data.append(DataType.S1615.encode_as_numpy_int_array(
self._krn_delays.flatten()))
if extra_data:
return numpy.concatenate((data, *extra_data))
return data
@property
@overrides(
AbstractGenerateConnectorOnMachine.gen_connector_params_size_in_bytes)
def gen_connector_params_size_in_bytes(self) -> int:
size = N_KERNEL_PARAMS * BYTES_PER_WORD
if self._krn_weights is not None:
size += sum(len(x) for x in self._krn_weights) * BYTES_PER_WORD
if self._krn_delays is not None:
size += sum(len(x) for x in self._krn_delays) * BYTES_PER_WORD
return size
[docs]
@overrides(AbstractGenerateConnectorOnMachine.get_connected_vertices)
def get_connected_vertices(
self, s_info: SynapseInformation, source_vertex: ApplicationVertex,
target_vertex: ApplicationVertex) -> Sequence[
Tuple[MachineVertex, Sequence[AbstractVertex]]]:
src_splitter = source_vertex.splitter
return [
(t_vert,
[s_vert for s_vert in src_splitter.get_out_going_vertices(
s_info.partition_id) if self.__connects(s_vert, t_vert)])
for t_vert in target_vertex.splitter.get_in_coming_vertices(
s_info.partition_id)]
def __connects(self, src_machine_vertex: MachineVertex,
dest_machine_vertex: MachineVertex) -> bool:
# If the pre- and post-slices are not 2-dimensional slices, we have
# to let them pass
pre_slice = src_machine_vertex.vertex_slice
post_slice = dest_machine_vertex.vertex_slice
if (pre_slice.shape is None or len(pre_slice.shape) != 2 or
post_slice.shape is None or len(post_slice.shape) != 2):
return True
pre_slice_x = pre_slice.get_slice(0)
pre_slice_y = pre_slice.get_slice(1)
post_slice_x = post_slice.get_slice(0)
post_slice_y = post_slice.get_slice(1)
min_pre_x = post_slice_x.start - self._hlf_k_w
max_pre_x = (post_slice_x.stop + self._hlf_k_w) - 1
min_pre_y = post_slice_y.start - self._hlf_k_h
max_pre_y = (post_slice_y.stop + self._hlf_k_h) - 1
# No part of the pre square overlaps the post-square, don't connect
if (pre_slice_x.stop <= min_pre_x or
pre_slice_x.start > max_pre_x or
pre_slice_y.stop <= min_pre_y or
pre_slice_y.start > max_pre_y):
return False
# Otherwise, they do
return True
[docs]
@overrides(AbstractConnector.validate_connection)
def validate_connection(
self, application_edge: ProjectionApplicationEdge,
synapse_info: SynapseInformation):
pre = application_edge.pre_vertex
post = application_edge.post_vertex
if len(pre.atoms_shape) != 1 or len(post.atoms_shape) != 1:
raise ConfigurationException(
"The Kernel Connector is designed to work with 1D vertices")