Source code for spynnaker.pyNN.models.neural_projections.connectors.abstract_connector

# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import math
import re
from typing import Any, Dict, Optional, Sequence, Tuple, Union, TYPE_CHECKING
from typing_extensions import Never

import numpy
from numpy import float64, uint32, uint16, uint8
from numpy.typing import NDArray

from pyNN import descriptions
from pyNN.random import NumpyRNG, RandomDistribution
from pyNN.space import Space

from spinn_utilities.log import FormatAdapter
from spinn_utilities.logger_utils import warn_once
from spinn_utilities.safe_eval import SafeEval
from spinn_utilities.abstract_base import AbstractBase, abstractmethod

from pacman.model.graphs import AbstractVertex
from pacman.model.graphs.common import Slice
from pacman.model.graphs.application import ApplicationVertex
from pacman.model.graphs.machine import MachineVertex

from spinn_front_end_common.interface.provenance import ProvenanceWriter
from spinn_front_end_common.utilities.exceptions import ConfigurationException

from spynnaker.pyNN.data import SpynnakerDataView
from spynnaker.pyNN.types import (
    Delays, is_scalar, WeightsDelays, Weights)
from spynnaker.pyNN.utilities import utility_calls
from spynnaker.pyNN.exceptions import SpynnakerException

if TYPE_CHECKING:
    from spynnaker.pyNN.models.neural_projections import (
        ProjectionApplicationEdge, SynapseInformation)
    from spynnaker.pyNN.models.projection import Projection
    from spynnaker.pyNN.models.populations import Population, PopulationView

# global objects
logger = FormatAdapter(logging.getLogger(__name__))
_expr_context = SafeEval(
    math, numpy, numpy.arccos, numpy.arcsin, numpy.arctan, numpy.arctan2,
    numpy.ceil, numpy.cos, numpy.cosh, numpy.exp, numpy.fabs, numpy.floor,
    numpy.fmod, numpy.hypot, numpy.ldexp, numpy.log, numpy.log10, numpy.modf,
    numpy.power, numpy.sin, numpy.sinh, numpy.sqrt, numpy.tan, numpy.tanh,
    numpy.maximum, numpy.minimum, e=numpy.e, pi=numpy.pi)


class AbstractConnector(object, metaclass=AbstractBase):
    """
    Abstract class that all PyNN Connectors extend.
    """

    NUMPY_SYNAPSES_DTYPE = numpy.dtype(
        [("source", uint32), ("target", uint16),
         ("weight", float64), ("delay", float64),
         ("synapse_type", uint8)])

    __slots__ = (
        "__min_delay",
        "__n_clipped_delays",
        "__safe",
        "__space",
        "__verbose",
        "__param_seeds",
        "__used")

    def __init__(self, safe: bool = True, callback: None = None,
                 verbose: bool = False):
        """
        :param safe: if True, check that weights and delays have valid
            values. If False, this check is skipped. (NB: SpiNNaker always
            checks.)
        :param callback:
            if given, a callable that display a progress bar on the terminal.

            .. note::
                Not supported by sPyNNaker.
        :param verbose:
            Whether to output extra information about the connectivity to a
            CSV file
        """
        if callback is not None:
            warn_once(logger, "sPyNNaker ignores connector callbacks.")
        self.__safe = safe
        self.__space: Optional[Space] = None
        self.__verbose = verbose

        self.__n_clipped_delays = numpy.int64(0)
        self.__min_delay = 0.0
        self.__param_seeds: Dict[Tuple[int, int], int] = dict()
        self.__used = False

[docs] def set_space(self, space: Space) -> None: """ Set the space object (allowed after instantiation). :param space: """ self.__space = space
[docs] def set_projection_information( self, synapse_info: SynapseInformation) -> None: """ Sets a connectors projection info. :param synapse_info: the synapse info """ _ = synapse_info self.__min_delay = SpynnakerDataView.get_simulation_time_step_ms()
def _get_delay_minimum( self, delays: Delays, n_connections: int, synapse_info: SynapseInformation) -> float: """ Get the minimum delay given a float or RandomDistribution. :param delays: the delays :param n_connections: how many connections """ if isinstance(delays, RandomDistribution): low_estimated_delay = utility_calls.get_minimum_probable_value( delays, n_connections) low = utility_calls.low(delays) if low is None: return low_estimated_delay # The minimum is the maximum of the possible maximums return max(low_estimated_delay, low, 1) elif isinstance(delays, str): d = self._get_distances(delays, synapse_info) return numpy.min(_expr_context.eval(delays, d=d)) elif is_scalar(delays): return delays raise self.delay_type_exception(delays) def _get_delay_maximum( self, delays: Delays, n_connections: int, synapse_info: SynapseInformation) -> float: """ Get the maximum delay given a float or RandomDistribution. """ if isinstance(delays, RandomDistribution): max_estimated_delay = utility_calls.get_maximum_probable_value( delays, n_connections) high = utility_calls.high(delays) if high is None: return max_estimated_delay # The maximum is the minimum of the possible maximums return min(max_estimated_delay, high) elif isinstance(delays, str): d = self._get_distances(delays, synapse_info) return numpy.max(_expr_context.eval(delays, d=d)) elif is_scalar(delays): return delays raise self.delay_type_exception(delays)
[docs] @abstractmethod def get_delay_maximum( self, synapse_info: SynapseInformation) -> float: """ :param synapse_info: the synapse info :returns: The maximum delay specified by the user in ms,. """ raise NotImplementedError
[docs] @abstractmethod def get_delay_minimum( self, synapse_info: SynapseInformation) -> Optional[float]: """ :param synapse_info: :returns: The minimum delay specified by the user in ms, or `None` if unbounded. """ raise NotImplementedError
[docs] def get_delay_variance(self, delays: Delays, synapse_info: SynapseInformation) -> float: """ :param delays: :param synapse_info: Info to get distances from (if needed) :returns: The variance of the delays. """ if isinstance(delays, RandomDistribution): return utility_calls.get_variance(delays) elif isinstance(delays, str): d = self._get_distances(delays, synapse_info) return numpy.var(_expr_context.eval(delays, d=d)) elif is_scalar(delays): return 0.0 raise self.delay_type_exception(delays)
def _get_n_connections_from_pre_vertex_with_delay_maximum( self, delays: Delays, n_total_connections: int, n_connections: int, min_delay: float, max_delay: float, synapse_info: SynapseInformation) -> int: """ :param delays: :param n_total_connections: :param n_connections: :param min_delay: :param max_delay: :returns: The expected number of delays that will fall within min_delay andmax_delay given a float, RandomDistribution or list of delays. """ if isinstance(delays, RandomDistribution): prob_in_range = utility_calls.get_probability_within_range( delays, min_delay, max_delay) if prob_in_range > 0: v = int(math.ceil(utility_calls.get_probable_maximum_selected( n_total_connections, n_connections, prob_in_range))) # If the probability is so low as to result in 0, assume # at least 1 if there is some probability that the delay is # in range return max(v, 1) else: return 0 elif isinstance(delays, str): d = self._get_distances(delays, synapse_info) delays = _expr_context.eval(delays, d=d) n_delayed = sum([len([ delay for delay in delays if min_delay <= delay <= max_delay])]) if n_delayed == 0: return 0 n_total = len(delays) prob_delayed = float(n_delayed) / float(n_total) return int(math.ceil(utility_calls.get_probable_maximum_selected( n_total_connections, n_connections, prob_delayed))) elif is_scalar(delays): if min_delay <= delays <= max_delay: return int(math.ceil(n_connections)) return 0 raise self.delay_type_exception(delays)
[docs] @abstractmethod def get_n_connections_from_pre_vertex_maximum( self, n_post_atoms: int, synapse_info: SynapseInformation, min_delay: Optional[float] = None, max_delay: Optional[float] = None) -> int: """ Get the maximum number of connections from any neuron in the pre vertex to the neurons in the post_vertex_slice, for connections with a delay between min_delay and max_delay (inclusive) if both specified (otherwise all connections). Not all concrete connectors support omitting the delay range. :param n_post_atoms: :param synapse_info: :param min_delay: :param max_delay: :returns: maximum number of connections """ raise NotImplementedError
[docs] @abstractmethod def get_n_connections_to_post_vertex_maximum( self, synapse_info: SynapseInformation) -> int: """ :param synapse_info: :returns: The maximum number of connections to any neuron in the post vertex from neurons in the pre vertex. """ raise NotImplementedError
[docs] def get_weight_mean(self, weights: Weights, synapse_info: SynapseInformation) -> float: """ :param weights: :param synapse_info: Info to get distances from (if needed) :returns: The mean of the weights. """ if isinstance(weights, RandomDistribution): return abs(utility_calls.get_mean(weights)) elif isinstance(weights, str): d = self._get_distances(weights, synapse_info) return numpy.mean(_expr_context.eval(weights, d=d)) elif is_scalar(weights): return abs(weights) raise self.weight_type_exception(synapse_info)
def _get_weight_maximum( self, weights: Weights, n_connections: int, synapse_info: SynapseInformation) -> float: """ Get the maximum of the weights. """ if isinstance(weights, RandomDistribution): mean_weight = utility_calls.get_mean(weights) if mean_weight < 0: min_weight = utility_calls.get_minimum_probable_value( weights, n_connections) low = utility_calls.low(weights) if low is None: return abs(min_weight) return abs(max(min_weight, low)) else: max_weight = utility_calls.get_maximum_probable_value( weights, n_connections) high = utility_calls.high(weights) if high is None: return abs(max_weight) return abs(min(max_weight, high)) elif isinstance(weights, str): d = self._get_distances(weights, synapse_info) return numpy.max(_expr_context.eval(weights, d=d)) elif is_scalar(weights): return abs(weights) raise self.weight_type_exception(weights)
[docs] @abstractmethod def get_weight_maximum(self, synapse_info: SynapseInformation) -> float: """ :param synapse_info: :returns: The maximum of the weights for this connection. """ raise NotImplementedError
[docs] def get_weight_variance(self, weights: Weights, synapse_info: SynapseInformation) -> float: """ :param weights: :param synapse_info: Info to get distances from (if needed) :returns: The variance of the weights. """ if isinstance(weights, RandomDistribution): return utility_calls.get_variance(weights) elif isinstance(weights, str): d = self._get_distances(weights, synapse_info) return numpy.var(_expr_context.eval(weights, d=d)) elif is_scalar(weights): return 0.0 raise self.weight_type_exception(weights)
def _expand_distances(self, d_expression: str) -> bool: """ Check if a distance expression contains at least one term `d[x]`. If yes, then the distances are expanded to distances in the separate coordinates rather than the overall distance over all coordinates, and we assume the user has specified an expression such as `d[0] + d[2]`. :param d_expression: """ regexpr = re.compile(r'.*d\[\d*\].*') return bool(regexpr.match(d_expression)) def _get_distances(self, values: str, synapse_info: SynapseInformation) -> NDArray[float64]: if self.__space is None: raise self._no_space_exception(values, synapse_info) expand_distances = self._expand_distances(values) return self.__space.distances( synapse_info.pre_population.positions, synapse_info.post_population.positions, expand_distances) def _generate_random_values( self, values: RandomDistribution, n_connections: int, post_vertex_slice: Slice) -> NDArray[float64]: """ :param values: :param n_connections: :param post_vertex_slice: """ key = (id(post_vertex_slice), id(values)) seed = self.__param_seeds.get(key, None) if seed is None: seed = int(values.rng.next() * 0x7FFFFFFF) self.__param_seeds[key] = seed new_rng = NumpyRNG(seed) copy_rd = RandomDistribution( values.name, parameters_pos=None, rng=new_rng, **values.parameters) if n_connections == 1: return numpy.array([copy_rd.next(1)], dtype=float64) return copy_rd.next(n_connections) def _no_space_exception( self, values: WeightsDelays, synapse_info: SynapseInformation) -> SpynnakerException: """ :param values: :param synapse_info: :returns: A SpynnakerException about there being no space defined """ return SpynnakerException( f"Str Weights or delays {values} are distance-dependent " f"but no space object was specified in projection " f"{synapse_info.pre_population}-" f"{synapse_info.post_population}")
[docs] def weight_type_exception( self, weights: Weights) -> SpynnakerException: """ :param weights: :returns: An Exception explaining incorrect weight or delay type """ if weights is None: return SpynnakerException( f"The Synapse used is not is not supported with a " f"{(type(self))} as neither provided weights") elif isinstance(weights, str): return SpynnakerException( f"Str Weights {weights} not supported by a {(type(self))}") elif isinstance(weights, numpy.ndarray): # The problem is that these methods are for a MachineVertex/ core # while weight and delay are supplied at the application level # The FromList is also the one designed to handle the 2D case return SpynnakerException( f"For efficiency reason {type(self)} does not supports " f"list or arrays for weight." f"Please use a FromListConnector instead") else: return SpynnakerException(f"Unrecognised weight {weights}")
[docs] def delay_type_exception(self, delays: Delays) -> SpynnakerException: """ :param delays: :returns: An Exception explaining incorrect delay type """ if isinstance(delays, str): return SpynnakerException( f"Str delays {delays} not supported by {(type(self))}") elif isinstance(delays, numpy.ndarray): # The problem is that these methods are for a MachineVertex/ core # while weight and delay are supplied at the application level # The FromList is also the one designed to handle the 2D case return SpynnakerException( f"For efficiency reason {type(self)} does not supports " f"list or arrays for weight or delay." f"Please use a FromListConnector instead") else: return SpynnakerException(f"Unrecognised delay {delays}")
def _generate_values( self, values: WeightsDelays, sources: numpy.ndarray, targets: numpy.ndarray, n_connections: int, post_slice: Slice, synapse_info: SynapseInformation, weights: bool) -> NDArray[float64]: if isinstance(values, RandomDistribution): return self._generate_random_values( values, n_connections, post_slice) elif isinstance(values, str) or callable(values): if self.__space is None: raise self._no_space_exception(values, synapse_info) expand_distances = True if isinstance(values, str): expand_distances = self._expand_distances(values) # At this point we need to now get the values corresponding to # the distances between connections in "sources" and "targets" eval_values = numpy.zeros(n_connections, dtype=float64) for i in range(n_connections): # get the distance for this source and target pair dist = self.__space.distances( synapse_info.pre_population.positions[sources[i]], synapse_info.post_population.positions[targets[i]], expand_distances) # evaluate expression at this distance eval_values[i:i+1] = _expr_context.eval(values, d=dist) return eval_values d = self.__space.distances( synapse_info.pre_population.positions[sources[i]], synapse_info.post_population.positions[targets[i]], expand_distances) return values(d) elif is_scalar(values): return numpy.repeat([values], n_connections).astype(float64) if weights: raise self.weight_type_exception(values) else: raise self.delay_type_exception(values) def _generate_weights( self, sources: numpy.ndarray, targets: numpy.ndarray, n_connections: int, post_slice: Slice, synapse_info: SynapseInformation) -> numpy.ndarray: """ Generate weight values. """ weights = self._generate_values( synapse_info.weights, sources, targets, n_connections, post_slice, synapse_info, weights=True) if self.__safe: if not weights.size: warn_once(logger, "No connection in " + str(self)) elif numpy.amin(weights) < 0 < numpy.amax(weights): raise SpynnakerException( "Weights must be either all positive or all negative in " f"projection {synapse_info.pre_population.label}->" f"{synapse_info.post_population.label}") return numpy.abs(weights) def _clip_delays(self, delays: NDArray[float64]) -> NDArray[float64]: """ Clip delay values, keeping track of how many have been clipped. """ # count values that could be clipped self.__n_clipped_delays = numpy.sum(delays < self.__min_delay) # clip values if delays.size: delays[delays < self.__min_delay] = self.__min_delay return delays def _generate_delays( self, sources: numpy.ndarray, targets: numpy.ndarray, n_connections: int, post_slice: Slice, synapse_info: SynapseInformation) -> numpy.ndarray: """ Generate valid delay values. """ delays = self._generate_values( synapse_info.delays, sources, targets, n_connections, post_slice, synapse_info, weights=False) return self._clip_delays(delays) @staticmethod def __pop_label(pop: Union[Population, PopulationView]) -> str: lbl = pop.label if lbl is None: raise ValueError("unlabelled population") return lbl
[docs] def get_provenance_data(self, synapse_info: SynapseInformation) -> None: """ Adds the synapse information to provenance database. :param synapse_info: """ # Convert to native Python integer; provenance system assumption ncd = self.__n_clipped_delays.item() with ProvenanceWriter() as db: db.insert_connector( self.__pop_label(synapse_info.pre_population), self.__pop_label(synapse_info.post_population), self.__class__.__name__, "Times_synaptic_delays_got_clipped", ncd) if ncd > 0: db.insert_report( f"The delays in the connector {self.__class__.__name__} " f"from {synapse_info.pre_population.label} " f"to {synapse_info.post_population.label} " f"was clipped to {self.__min_delay} a total of {ncd} " f"times. This can be avoided by reducing the timestep or " f"increasing the minimum delay to one timestep")
@property def safe(self) -> bool: """ Safe value supplied. Note ignored by sPyNNaker """ return self.__safe @property def space(self) -> Optional[Space]: """ The space object (may be updated after instantiation). """ return self.__space @property def verbose(self) -> bool: """ verbose value supplied by user """ return self.__verbose
[docs] def get_connected_vertices( self, s_info: SynapseInformation, source_vertex: ApplicationVertex, target_vertex: ApplicationVertex) -> Sequence[ Tuple[MachineVertex, Sequence[AbstractVertex]]]: """ Get the machine vertices that are connected to each other with this connector :param s_info: The synapse information of the connection :param source_vertex: The source of the spikes :param target_vertex: The target of the spikes :return: A list of tuples of (target machine vertex, list of sources) """ # By default, just return that the whole target connects to the # whole source return [(m_vertex, [source_vertex]) for m_vertex in target_vertex.splitter.get_in_coming_vertices( s_info.partition_id)]
[docs] def connect(self, projection: Projection) -> Never: """ Apply this connector to a projection. .. warning:: Do *not* call this! SpyNNaker does not work that way. :param projection: :raises SpynnakerException: Always. Method not supported; profiled out. """ raise SpynnakerException("Standard pyNN connect method not supported")
@staticmethod def _roundsize(size: Union[int, float], label: str) -> int: """ Ensures that the ``size`` is an integer. Approximate integers are rounded; other values cause exceptions. :param size: The value to be rounded :param label: The type-name of the connection, for messages :raises SpynnakerException: If the size is non-integer and not close """ if isinstance(size, int): return size # Allow a float which has a near int value temp = int(round(size)) if abs(temp - size) < 0.001: logger.warning("Size of {} rounded from {} to {}. " "Please use int values for size", label, size, temp) return temp raise SpynnakerException( f"Size of {label} must be an int, received {size}")
[docs] def validate_connection( self, application_edge: ProjectionApplicationEdge, synapse_info: SynapseInformation) -> None: """ Checks that the edge supports the connector. Returns nothing; it is assumed that an Exception will be raised if anything is wrong. By default this checks only that the views are not used on multi-dimensional vertices. :param application_edge: The edge of the connection :param synapse_info: The synaptic information """ _ = application_edge if ((synapse_info.prepop_is_view and len(synapse_info.pre_vertex.atoms_shape) > 1) or (synapse_info.postpop_is_view and len(synapse_info.post_vertex.atoms_shape) > 1)): raise ConfigurationException( "Using a projection where the source or target is a " "PopulationView on a multi-dimensional Population is not " "supported")
[docs] def get_parameters(self) -> Dict[str, Any]: """ A list of parameters that would recreate this connector. May not be exactly the same as the ones used to construct this. Will also not include any information or changes added by later calls. :return: A map of the init parameters to the values passed in. """ # The default is error # This to avoid missing parameters in user Connectors raise NotImplementedError( f"{type(self)} does not implement " f"Standard pyNN get_parameters method")
def _get_parameters(self) -> Dict[str, Any]: """ :return: A map of the init parameters to the values passed in. """ return { "safe": self.safe, "verbose": self.verbose, "callback": None }
[docs] def clone(self) -> "AbstractConnector": """ Create a clone of the Connector at init point This is a best effort attempt and its use is not recommended :return: A clone of the Connector. Ignoring any SynapseInformation """ theType = type(self) params = self.get_parameters() logger.warning( f"Cloning type{self} which may lead to incorrect results.") return theType(**params)
[docs] def get_unused(self) -> "AbstractConnector": """ Checks the Connector is unused and clones if needed This method should just check the connector is unused, mark it as used and return it. If the Connector is used will attempt to make a clone, log warning that this is not recommended and may be incorrect, and returns the clone. :return: Ideally this Connector """ if self.__used: logger.warning( "Reusing Connectors in sPyNNaker is not recommended") clone = self.clone() # call get_unused to mark used return clone.get_unused() self.__used = True return self
[docs] def describe(self, template: Optional[str] = None, engine: str = 'default') -> str: """ Returns a human-readable description of the connection method. The output may be customized by specifying a different template together with an associated template engine (see pyNN.descriptions). If template is None, then a dictionary containing the template context will be returned. :return: A human-readable description of the connector. """ context = {"Type": self.__class__.__name__} context.update(self.get_parameters()) return descriptions.render(engine, template, context)