Source code for spynnaker.pyNN.models.spike_source.spike_source_poisson_vertex

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from collections.abc import Sequence as Seq
from collections.abc import Sized
import logging
import math
from typing import (
    Any, Collection, Dict, List, Optional, Sequence, Tuple, Union,
    cast, TYPE_CHECKING)
from typing_extensions import TypeGuard
import numpy
from numpy.typing import NDArray
import scipy.stats
from pyNN.space import Grid2D, Grid3D, BaseStructure
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinn_utilities.ranged import RangeDictionary, RangedList
from spinn_utilities.config_holder import get_config_int
from pacman.model.graphs.application import ApplicationEdge
from pacman.model.graphs.common import Slice
from pacman.model.resources import AbstractSDRAM, ConstantSDRAM
from pacman.model.partitioner_interfaces import LegacyPartitionerAPI
from pacman.model.partitioner_splitters import AbstractSplitterCommon
from spinn_front_end_common.interface.buffer_management import (
    recording_utilities)
from spinn_front_end_common.utilities.constants import (
    SYSTEM_BYTES_REQUIREMENT)
from spinn_front_end_common.interface.profiling import profile_utils
from spynnaker.pyNN.data import SpynnakerDataView
from spynnaker.pyNN.models.common import MultiSpikeRecorder
from spynnaker.pyNN.utilities.utility_calls import create_mars_kiss_seeds
from spynnaker.pyNN.models.abstract_models import SupportsStructure
from spynnaker.pyNN.models.common import (
    ParameterHolder, PopulationApplicationVertex)
from spynnaker.pyNN.models.common.types import Names
from spynnaker.pyNN.utilities.buffer_data_type import BufferDataType
from spynnaker.pyNN.models.neuron.synapse_dynamics.types import (
    ConnectionsArray)
from .spike_source_poisson_machine_vertex import (
    SpikeSourcePoissonMachineVertex, _flatten, get_rates_bytes,
    get_sdram_edge_params_bytes, get_expander_rates_bytes, get_params_bytes)
if TYPE_CHECKING:
    from spinn_utilities.ranged.abstract_sized import Selector
    from .spike_source_poisson import SpikeSourcePoisson
    from .spike_source_poisson_variable import SpikeSourcePoissonVariable
    from spynnaker.pyNN.models.neural_projections import SynapseInformation
    from spynnaker.pyNN.models.projection import Projection
    from spynnaker.pyNN.models.common.types import Values

logger = FormatAdapter(logging.getLogger(__name__))

# uint32_t n_rates; uint32_t index
PARAMS_WORDS_PER_NEURON = 2

# start_scaled, end_scaled, next_scaled, is_fast_source, exp_minus_lambda,
# sqrt_lambda, isi_val, time_to_spike
PARAMS_WORDS_PER_RATE = 8

SLOW_RATE_PER_TICK_CUTOFF = (
    SpikeSourcePoissonMachineVertex.SLOW_RATE_PER_TICK_CUTOFF)

OVERFLOW_TIMESTEPS_FOR_SDRAM = 5

# The microseconds per timestep will be divided by this to get the max offset
_MAX_OFFSET_DENOMINATOR = 10

# Indicates a duration that never ends
DURATION_FOREVER = 0xFFFFFFFF


def _is_list_of_lists(value: Any) -> TypeGuard[
        Sequence[Sequence[Union[int, float]]]]:
    return isinstance(value, (Seq, numpy.ndarray)) and isinstance(
        value[0], (Seq, numpy.ndarray))


def _normalize_rates(
        rate: Union[float, Sequence[float], None],
        rates: Union[Sequence[float], NDArray[numpy.floating], None]
        ) -> Union[NDArray[numpy.floating], List[NDArray[numpy.floating]]]:
    if rates is None:
        if isinstance(rate, (Sequence, numpy.ndarray)):
            # Single rate per neuron for whole simulation
            return [numpy.array([r]) for r in rate]
        else:
            # Single rate for all neurons for whole simulation
            return numpy.array([rate])
    elif _is_list_of_lists(rates):
        # Convert each list to numpy array
        return [numpy.array(r) for r in rates]
    else:
        return numpy.array(rates)


def _normalize_times(
        time: Union[int, Sequence[int], None],
        times: Union[Sequence[int], NDArray[numpy.integer], None]
        ) -> Union[NDArray[numpy.integer], List[NDArray[numpy.integer]], None]:
    if times is None:
        if time is None:
            return None
        if isinstance(time, Sequence):
            # Single time per neuron for whole simulation
            return [numpy.array([r]) for r in time]
        else:
            # Single time for all neurons for whole simulation
            return numpy.array([time])
    elif _is_list_of_lists(times):
        # Convert each list to numpy array
        return [numpy.array(r) for r in times]
    else:
        return numpy.array(times)


def is_iterable(value: Values) -> TypeGuard[
        Union[Sequence[float], NDArray[numpy.floating]]]:
    """
    Check that the Value is iterable.

    :param value: The value to check.
    :returns: True if Value is iterable, False otherwise.
    """
    return hasattr(value, "__iter__")


class SpikeSourcePoissonVertex(
        PopulationApplicationVertex,
        LegacyPartitionerAPI, SupportsStructure):
    """
    A SpiNNaker vertex that is a Poisson-distributed Spike source.
    """

    __slots__ = (
        "__last_rate_read_time",
        "__model",
        "__model_name",
        "__n_atoms",
        "__rng",
        "__seed",
        "__spike_recorder",
        "__kiss_seed",  # dict indexed by vertex slice
        "__max_rate",
        "__max_n_rates",
        "__n_profile_samples",
        "__data",
        "__is_variable_rate",
        "__outgoing_projections",
        "__incoming_control_edge",
        "__structure",
        "__allowed_parameters",
        "__n_colour_bits")

    SPIKE_RECORDING_REGION_ID = 0

    def __init__(
            self, n_neurons: int, label: str, seed: Optional[int],
            max_atoms_per_core: Optional[Union[int, Tuple[int, ...]]],
            model: Union[SpikeSourcePoisson, SpikeSourcePoissonVariable],
            rate: Union[float, Sequence[float], None] = None,
            start: Union[int, Sequence[int], None] = None,
            duration: Union[int, Sequence[int], None] = None,
            rates: Union[
                Sequence[float], NDArray[numpy.floating], None] = None,
            starts: Union[Sequence[int], NDArray[numpy.integer], None] = None,
            durations: Union[
                Sequence[int], NDArray[numpy.integer], None] = None,
            max_rate: Optional[float] = None,
            splitter: Optional[AbstractSplitterCommon] = None,
            n_colour_bits: Optional[int] = None):
        """
        :param n_neurons: The number of neurons in this vertex.
        :param label: The optional name of the vertex.
        :param max_atoms_per_core: The max number of atoms that can be
            placed on a core for each dimension, used in partitioning.
            If the vertex is n-dimensional, with n > 1, the value must be a
            tuple with a value for each dimension.  If it is single-dimensional
            the value can be a 1-tuple or an int.
        :param model: The model to get the parameters from
        :param rate: The spike rate of all neuron
        :param start: The start time of spikes on all neurons
        :param duration: The duration of spikes on all neurons
        :param rates: The spike rate of each neuron
        :param starts: The start time of spikes on each neuron
        :param durations: The duration of spikes on each neuron
        :param max_rate: The maximum number of spikes
            for any neuron at any timestamp
        :param splitter: The splitter object needed for this vertex.
            Leave as `None` to delegate the choice of splitter to the selector.
        :param n_colour_bits:
        """
        super().__init__(label, max_atoms_per_core, splitter)

        # atoms params
        self.__n_atoms = self.round_n_atoms(n_neurons, "n_neurons")
        self.__model_name = "SpikeSourcePoisson"
        self.__model = model
        self.__seed = seed
        self.__kiss_seed: Dict[Slice, Tuple[int, ...]] = dict()

        self.__spike_recorder = MultiSpikeRecorder()

        # Check for disallowed pairs of parameters
        if (rates is not None) and (rate is not None):
            raise ValueError("Exactly one of rate and rates can be specified")
        if (starts is not None) and (start is not None):
            raise ValueError(
                "Exactly one of start and starts can be specified")
        if (durations is not None) and (duration is not None):
            raise ValueError(
                "Exactly one of duration and durations can be specified")
        if rate is None and rates is None:
            raise ValueError("One of rate or rates must be specified")

        # Normalise the parameters
        self.__is_variable_rate = rates is not None
        _rates = _normalize_rates(rate, rates)
        _starts = _normalize_times(start, starts)
        _durations = _normalize_times(duration, durations)
        if _durations is None:
            if _is_list_of_lists(_rates):
                _durations = [numpy.array([DURATION_FOREVER for _r in _rate])
                              for _rate in _rates]
            else:
                _durations = numpy.array(
                    [DURATION_FOREVER for _rate in _rates])

        # Check that there is either one list for all neurons,
        # or one per neuron
        if _is_list_of_lists(_rates) and len(_rates) != n_neurons:
            raise ValueError(
                "Must specify one rate for all neurons or one per neuron")
        if _is_list_of_lists(_starts) and len(_starts) != n_neurons:
            raise ValueError(
                "Must specify one start for all neurons or one per neuron")
        if _is_list_of_lists(_durations) and len(_durations) != n_neurons:
            raise ValueError(
                "Must specify one duration for all neurons or one per neuron")

        # Check that for each rate there is a start and duration if needed
        # TODO: Could be more efficient for case where parameters are not one
        #       per neuron
        for i in range(n_neurons):
            rate_set = _rates[i] if _is_list_of_lists(_rates) else _rates
            if not isinstance(rate_set, Sized):
                raise ValueError("Multiple rates must be a list")
            if starts is None and len(rate_set) > 1:
                raise ValueError(
                    "When multiple rates are specified,"
                    " each must have a start")
            elif _starts is not None:
                start_set = (
                    _starts[i] if _is_list_of_lists(_starts) else _starts)
                if len(start_set) != len(rate_set):
                    raise ValueError("Each rate must have a start")
                if any(s is None for s in start_set):
                    raise ValueError("Start must not be None")
            if _durations is not None:
                duration_set = (
                    _durations[i] if _is_list_of_lists(_durations)
                    else _durations)
                if len(duration_set) != len(rate_set):
                    raise ValueError("Each rate must have its own duration")

        self.__data: RangeDictionary[
            Union[NDArray[numpy.floating], NDArray[numpy.integer]]
            ] = RangeDictionary(n_neurons)
        rates_list: RangedList = RangedList(
            n_neurons, _rates,
            use_list_as_value=not _is_list_of_lists(_rates))
        self.__data["rates"] = rates_list
        self.__data["starts"] = RangedList(
            n_neurons, _starts,
            use_list_as_value=not _is_list_of_lists(_starts))
        self.__data["durations"] = RangedList(
            n_neurons, _durations,
            use_list_as_value=not _is_list_of_lists(_durations))
        self.__rng = numpy.random.RandomState(seed)

        self.__n_profile_samples = get_config_int(
            "Reports", "n_profile_samples")

        # Prepare for recording, and to get spikes
        self.__spike_recorder = MultiSpikeRecorder()

        if max_rate is None:
            all_rates: List[numpy.floating] = list(
                _flatten(self.__data["rates"]))
            self.__max_rate = numpy.amax(all_rates) if all_rates else 0
        else:
            self.__max_rate = max_rate
        self.__max_n_rates = max(len(r) for r in rates_list)

        # Keep track of how many outgoing projections exist
        self.__outgoing_projections: List[Projection] = list()
        self.__incoming_control_edge: Optional[ApplicationEdge] = None

        self.__structure: Optional[BaseStructure] = None

        if self.__is_variable_rate:
            self.__allowed_parameters = frozenset(
                {"rates", "durations", "starts"})
        else:
            self.__allowed_parameters = frozenset(
                {"rate", "duration", "start"})

        self.__last_rate_read_time: Optional[float] = None

        if n_colour_bits is None:
            n_colour_bits = get_config_int("Simulation", "n_colour_bits")
        self.__n_colour_bits = n_colour_bits

[docs] @overrides(SupportsStructure.set_structure) def set_structure(self, structure: BaseStructure) -> None: self.__structure = structure
@property def rates(self) -> RangedList[NDArray[numpy.floating]]: """ Get the rates. """ # UGH! Mypy has been defeated! return cast(Any, self.__data["rates"])
[docs] def add_outgoing_projection(self, projection: Projection) -> None: """ Add an outgoing projection from this vertex. :param projection: The projection to add """ self.__outgoing_projections.append(projection)
@property def outgoing_projections(self) -> Sequence[Projection]: """ The projections outgoing from this vertex. """ return self.__outgoing_projections @property def n_profile_samples(self) -> int: """ The n_profile_samples read from the config """ return self.__n_profile_samples @property def time_to_spike(self) -> RangedList: """ The "time_to_spike range list. """ return self.__data["time_to_spike"] def __read_parameters_now(self) -> None: # If we already read the parameters at this time, don't do it again current_time = SpynnakerDataView().get_current_run_time_ms() if self.__last_rate_read_time == current_time: return self.__last_rate_read_time = current_time for m_vertex in self.machine_vertices: placement = SpynnakerDataView.get_placement_of_vertex(m_vertex) m_vertex.read_parameters_from_machine(placement) def __read_parameter(self, name: str, selector: Selector) -> Sequence: if (SpynnakerDataView.is_ran_last() and SpynnakerDataView.has_transceiver()): self.__read_parameters_now() return self.__data[self.__full_name(name)].get_values(selector) def __full_name(self, name: str) -> str: if self.__is_variable_rate: return name return f"{name}s"
[docs] @overrides(PopulationApplicationVertex.get_parameter_values) def get_parameter_values( self, names: Names, selector: Selector = None) -> ParameterHolder: self._check_parameters(names, self.__allowed_parameters) return ParameterHolder(names, self.__read_parameter, selector)
[docs] @overrides(PopulationApplicationVertex.set_parameter_values) def set_parameter_values( self, name: str, value: Values, selector: Selector = None) -> None: self._check_parameters(name, self.__allowed_parameters) if self.__is_variable_rate: raise KeyError(f"Cannot set the {name} of a variable rate Poisson") # If we have just run, we need to read parameters to avoid overwrite if SpynnakerDataView().is_ran_last(): self.__read_parameters_now() for m_vertex in self.machine_vertices: m_vertex.set_rate_changed() # Must be parameter without the s fixed_name = f"{name}s" if is_iterable(value): # Single start per neuron for whole simulation self.__data[fixed_name].set_value_by_selector( selector, [numpy.array([s]) for s in value]) else: # Single start for all neurons for whole simulation self.__data[fixed_name].set_value_by_selector( selector, numpy.array([value]), use_list_as_value=True)
[docs] @overrides(PopulationApplicationVertex.get_parameters) def get_parameters(self) -> List[str]: return list(self.__allowed_parameters)
[docs] @overrides(PopulationApplicationVertex.get_units) def get_units(self, name: str) -> str: if name == "spikes": return "" if name == "rates" or name == "rates": return "Hz" if (name == "duration" or name == "start" or name == "durations" or name == "starts"): return "ms" raise KeyError(f"Units for {name} unknown")
[docs] @overrides(PopulationApplicationVertex.get_recordable_variables) def get_recordable_variables(self) -> List[str]: return ["spikes"]
[docs] def get_buffer_data_type(self, name: str) -> BufferDataType: if name == "spikes": return BufferDataType.MULTI_SPIKES raise KeyError(f"Cannot record {name}")
[docs] def get_recording_region(self, name: str) -> int: if name != "spikes": raise KeyError(f"Cannot record {name}") return 0
[docs] @overrides(PopulationApplicationVertex.set_recording) def set_recording( self, name: str, sampling_interval: Optional[float] = None, indices: Optional[Collection[int]] = None) -> None: if name != "spikes": raise KeyError(f"Cannot record {name}") if sampling_interval is not None: logger.warning("Sampling interval currently not supported for " "SpikeSourcePoisson so being ignored") if indices is not None: logger.warning("Indices currently not supported for " "SpikeSourcePoisson so being ignored") if not self.__spike_recorder.record: SpynnakerDataView.set_requires_mapping() self.__spike_recorder.record = True
[docs] @overrides(PopulationApplicationVertex.get_recording_variables) def get_recording_variables(self) -> List[str]: if self.__spike_recorder.record: return ["spikes"] return []
[docs] @overrides(PopulationApplicationVertex.set_not_recording) def set_not_recording(self, name: str, indices: Optional[Collection[int]] = None) -> None: if name != "spikes": raise KeyError(f"Cannot record {name}") if indices is not None: logger.warning("Indices currently not supported for " "SpikeSourceArray so being ignored") self.__spike_recorder.record = False
[docs] @overrides(PopulationApplicationVertex.get_sampling_interval_ms) def get_sampling_interval_ms(self, name: str) -> int: # TODO microseconds or milliseconds? if name != "spikes": raise KeyError(f"Cannot record {name}") return SpynnakerDataView.get_simulation_time_step_us()
[docs] @overrides(PopulationApplicationVertex.get_data_type) def get_data_type(self, name: str) -> None: if name != "spikes": raise KeyError(f"Cannot record {name}") return None
[docs] @overrides(PopulationApplicationVertex.get_neurons_recording) def get_neurons_recording(self, name: str, vertex_slice: Slice) -> NDArray: if name != "spikes": raise KeyError(f"Cannot record {name}") return vertex_slice.get_raster_ids()
[docs] def max_spikes_per_ts(self) -> float: """ Compute the maximum spike rate. :return: The maximum number of spikes per simulation timestep. """ ts_per_second = SpynnakerDataView.get_simulation_time_step_per_s() if float(self.__max_rate) / ts_per_second < SLOW_RATE_PER_TICK_CUTOFF: return 1.0 # Experiments show at 1000 this result is typically higher than actual chance_ts = 1000 max_spikes_per_ts = scipy.stats.poisson.ppf( 1.0 - (1.0 / float(chance_ts)), float(self.__max_rate) / ts_per_second) return int(math.ceil(max_spikes_per_ts)) + 1.0
[docs] def get_recording_sdram_usage(self, vertex_slice: Slice) -> AbstractSDRAM: """ :param vertex_slice: Slice to get cost for :returns: SDRAm cost for recording """ variable_sdram = self.__spike_recorder.get_sdram_usage_in_bytes( vertex_slice.n_atoms, self.max_spikes_per_ts()) constant_sdram = ConstantSDRAM(math.ceil( variable_sdram.per_timestep * OVERFLOW_TIMESTEPS_FOR_SDRAM)) return variable_sdram + constant_sdram
[docs] @overrides(LegacyPartitionerAPI.get_sdram_used_by_atoms) def get_sdram_used_by_atoms(self, vertex_slice: Slice) -> AbstractSDRAM: poisson_params_sz = get_params_bytes(vertex_slice.n_atoms) poisson_rates_sz = get_rates_bytes( vertex_slice.n_atoms, vertex_slice.n_atoms * self.__max_n_rates) poisson_expander_sz = get_expander_rates_bytes( vertex_slice.n_atoms, vertex_slice.n_atoms * self.__max_n_rates) sdram_sz = get_sdram_edge_params_bytes(vertex_slice) other = ConstantSDRAM( SYSTEM_BYTES_REQUIREMENT + SpikeSourcePoissonMachineVertex.get_provenance_data_size(1) + poisson_params_sz + poisson_rates_sz + poisson_expander_sz + recording_utilities.get_recording_header_size(1) + recording_utilities.get_recording_data_constant_size(1) + profile_utils.get_profile_region_size(self.__n_profile_samples) + sdram_sz) recording = self.get_recording_sdram_usage(vertex_slice) return recording + other
@property @overrides(PopulationApplicationVertex.n_atoms) def n_atoms(self) -> int: return self.__n_atoms @property @overrides(PopulationApplicationVertex.atoms_shape) def atoms_shape(self) -> Tuple[int, ...]: if isinstance(self.__structure, (Grid2D, Grid3D)): return self.__structure.calculate_size(self.__n_atoms) return super().atoms_shape
[docs] @overrides(LegacyPartitionerAPI.create_machine_vertex) def create_machine_vertex( self, vertex_slice: Slice, sdram: AbstractSDRAM, label: Optional[str] = None) -> SpikeSourcePoissonMachineVertex: return SpikeSourcePoissonMachineVertex( sdram, self.__spike_recorder.record, label, self, vertex_slice)
@property def max_rate(self) -> float: """ The highest rate or 0 if no rate set. """ return float(self.__max_rate) @property def max_n_rates(self) -> int: """ the long length of any rates list. """ return self.__max_n_rates @property def seed(self) -> Optional[int]: """ The seed set if any. """ return self.__seed @seed.setter def seed(self, seed: int) -> None: self.__seed = seed self.__kiss_seed = dict() self.__rng = numpy.random.RandomState(seed)
[docs] def kiss_seed(self, vertex_slice: Slice) -> Tuple[int, ...]: """ The seed for this vertex slice. Generates and checks that the seed values generated by the given random number generator or seed to a random number generator are suitable for use as a mars 64 kiss seed. :param vertex_slice: :return: a list of 4 integers which are used by the mars64 kiss random number generator for seeds. """ if vertex_slice not in self.__kiss_seed: self.__kiss_seed[vertex_slice] = create_mars_kiss_seeds(self.__rng) return self.__kiss_seed[vertex_slice]
[docs] def update_kiss_seed( self, vertex_slice: Slice, seed: Sequence[int]) -> None: """ Updates a KISS seed from the machine. :param vertex_slice: the vertex slice to update seed of :param seed: the seed """ self.__kiss_seed[vertex_slice] = tuple(seed)
[docs] def clear_spike_recording(self) -> None: """ Clears the spike data from the buffer manager for this vertex. """ buffer_manager = SpynnakerDataView.get_buffer_manager() for machine_vertex in self.machine_vertices: placement = SpynnakerDataView.get_placement_of_vertex( machine_vertex) buffer_manager.clear_recorded_data( placement.x, placement.y, placement.p, SpikeSourcePoissonVertex.SPIKE_RECORDING_REGION_ID)
[docs] def describe( self) -> Dict[str, Union[str, ParameterHolder, Dict[str, Any]]]: """ Return a human-readable description of the cell or synapse type. The output may be customised by specifying a different template together with an associated template engine (see :py:mod:`pyNN.descriptions`). If template is `None`, then a dictionary containing the template context will be returned. :returns: human-readable description of the vertex """ parameters = self.get_parameter_values(self.__model.default_parameters) return { "name": self.__model_name, "default_parameters": self.__model.default_parameters, "default_initial_values": self.__model.default_parameters, "parameters": parameters, }
[docs] def set_live_poisson_control_edge(self, edge: ApplicationEdge) -> None: """ Sets the poisson generator. :param edge: :raises ValueError: if already set """ if self.__incoming_control_edge is not None: raise ValueError( "The Poisson generator can only be controlled by one source") self.__incoming_control_edge = edge
@property def incoming_control_edge(self) -> Optional[ApplicationEdge]: """ The live poisson control edge/ generator is set """ return self.__incoming_control_edge @property def data(self) -> RangeDictionary[ Union[NDArray[numpy.floating], NDArray[numpy.integer]]]: """ A dictionary holding all the data as ranges """ return self.__data @property def n_colour_bits(self) -> int: return self.__n_colour_bits
[docs] def read_connections( self, synapse_info: SynapseInformation) -> List[ConnectionsArray]: """ Read Poisson connections from the machine :param synapse_info: The synapse information of the data being read :return: The set of connections from all machine vertices """ connections = list() for m_vertex in self.machine_vertices: connections.append(m_vertex.read_connections(synapse_info)) return connections