Source code for spynnaker.pyNN.models.neuron.abstract_population_vertex

# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from collections import defaultdict
from functools import reduce
import logging
import math
import operator
from typing import (
    Any, Collection, Dict, Iterable, List, Optional, Sequence, Tuple, Union,
    cast, TYPE_CHECKING)

import numpy
from numpy.typing import NDArray
from scipy import special  # @UnresolvedImport
from typing_extensions import TypeGuard

from pyNN.space import Grid2D, Grid3D, BaseStructure
from pyNN.random import RandomDistribution

from spinn_utilities.config_holder import (
    get_config_int, get_config_float, get_config_bool)
from spinn_utilities.helpful_functions import is_singleton
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinn_utilities.progress_bar import ProgressBar
from spinn_utilities.ranged import RangeDictionary
from spinn_utilities.ranged.abstract_list import Selector

from pacman.exceptions import PacmanConfigurationException
from pacman.model.graphs.common import Slice
from pacman.model.resources import AbstractSDRAM, MultiRegionSDRAM
from pacman.utilities.utility_calls import get_n_bits

from spinn_front_end_common.abstract_models import (
    AbstractCanReset)
from spinn_front_end_common.interface.buffer_management\
    .recording_utilities import (
       get_recording_header_size, get_recording_data_constant_size)
from spinn_front_end_common.interface.ds import DataType
from spinn_front_end_common.interface.profiling.profile_utils import (
    get_profile_region_size)
from spinn_front_end_common.interface.provenance import (
    ProvidesProvenanceDataFromMachineImpl)
from spinn_front_end_common.utilities.constants import (
    BYTES_PER_WORD, SYSTEM_BYTES_REQUIREMENT)

from spynnaker.pyNN.data import SpynnakerDataView
from spynnaker.pyNN.exceptions import (
    SynapticConfigurationException, SpynnakerException)

from spynnaker.pyNN.models.abstract_models import (
    AbstractAcceptsIncomingSynapses, AbstractMaxSpikes, HasSynapses,
    SupportsStructure)
from spynnaker.pyNN.models.common import (
    ParameterHolder, PopulationApplicationVertex, NeuronRecorder)
from spynnaker.pyNN.models.common.types import Names, Values
from spynnaker.pyNN.models.neuron.local_only import AbstractLocalOnly
from spynnaker.pyNN.models.common.param_generator_data import (
    MAX_PARAMS_BYTES, is_param_generatable)
from spynnaker.pyNN.models.neural_projections.connectors import (
    AbstractGenerateConnectorOnMachine)
from spynnaker.pyNN.models.neuron.population_machine_common import (
    CommonRegions)
from spynnaker.pyNN.models.neuron.population_machine_neurons import (
    NeuronRegions)
from spynnaker.pyNN.models.neuron.synapse_dynamics import (
    AbstractSynapseDynamics, AbstractSynapseDynamicsStructural,
    AbstractSDRAMSynapseDynamics, AbstractSupportsSignedWeights,
    SynapseDynamicsStatic)
from spynnaker.pyNN.models.neuron.synapse_dynamics.types import (
    NUMPY_CONNECTORS_DTYPE)
from spynnaker.pyNN.models.spike_source import SpikeSourcePoissonVertex

from spynnaker.pyNN.utilities.bit_field_utilities import get_sdram_for_keys
from spynnaker.pyNN.utilities.buffer_data_type import BufferDataType
from spynnaker.pyNN.utilities.constants import (
    POSSION_SIGMA_SUMMATION_LIMIT)
from spynnaker.pyNN.utilities.utility_calls import (
    create_mars_kiss_seeds, check_rng)
from spynnaker.pyNN.utilities.running_stats import RunningStats
from spynnaker.pyNN.utilities.struct import StructRepeat

from .generator_data import GeneratorData
from .master_pop_table import MasterPopTableAsBinarySearch
from .population_machine_neurons import PopulationMachineNeurons
from .synaptic_matrices import SYNAPSES_BASE_GENERATOR_SDRAM_USAGE_IN_BYTES
from .synapse_io import get_max_row_info

if TYPE_CHECKING:
    from spynnaker.pyNN.extra_algorithms.splitter_components import (
        SplitterAbstractPopulationVertex)
    from spynnaker.pyNN.models.current_sources import AbstractCurrentSource
    from spynnaker.pyNN.models.neural_projections import (
        SynapseInformation, ProjectionApplicationEdge)
    from spynnaker.pyNN.models.neuron import AbstractPyNNNeuronModel
    from spynnaker.pyNN.models.neuron.implementations import AbstractNeuronImpl
    from spynnaker.pyNN.models.neuron.synapse_io import MaxRowInfo
    from spynnaker.pyNN.models.neuron.synapse_dynamics.types import (
        ConnectionsArray)
    from spynnaker.pyNN.models.projection import Projection

logger = FormatAdapter(logging.getLogger(__name__))


# TODO: Make sure these values are correct (particularly CPU cycles)
_NEURON_BASE_DTCM_USAGE_IN_BYTES = 9 * BYTES_PER_WORD
_NEURON_BASE_N_CPU_CYCLES_PER_NEURON = 22
_NEURON_BASE_N_CPU_CYCLES = 10

_NEURON_GENERATOR_BASE_SDRAM = 12 * BYTES_PER_WORD
_NEURON_GENERATOR_PER_STRUCT = 4 * BYTES_PER_WORD
_NEURON_GENERATOR_PER_PARAM = 2 * BYTES_PER_WORD
_NEURON_GENERATOR_PER_ITEM = (2 * BYTES_PER_WORD) + MAX_PARAMS_BYTES

# 1 for number of neurons
# 1 for number of synapse types
# 1 for number of neuron bits
# 1 for number of synapse type bits
# 1 for number of delay bits
# 1 for drop late packets,
# 1 for incoming spike buffer size
_SYNAPSES_BASE_SDRAM_USAGE_IN_BYTES = 7 * BYTES_PER_WORD

_EXTRA_RECORDABLE_UNITS = {NeuronRecorder.SPIKES: "",
                           NeuronRecorder.PACKETS: "",
                           NeuronRecorder.REWIRING: ""}


def _prod(iterable):
    """
    Finds the product of the iterable.

    :param iterable iterable: Things to multiply together
    """
    return reduce(operator.mul, iterable, 1)


def _all_gen(rd):
    """
    Determine if all the values of a ranged dictionary can be generated.

    :rtype: bool
    """
    for key in rd.keys():
        if is_singleton(rd[key]):
            if not is_param_generatable(rd[key]):
                return False
        else:
            if not rd[key].range_based():
                return False
            for _start, _stop, val in rd[key].iter_ranges():
                if not is_param_generatable(val):
                    return False
    return True


def _check_random_dists(rd):
    """
    Check all RandomDistribution instances in a range dictionary to see if
    they have the rng value set.
    """
    for key in rd.keys():
        if is_singleton(rd[key]):
            if isinstance(rd[key], RandomDistribution):
                check_rng(rd[key].rng, f"RandomDistribtion for {key}")
        else:
            for _start, _stop, val in rd[key].iter_ranges():
                if isinstance(val, RandomDistribution):
                    check_rng(val.rng, f"RandomDistribution for {key}")


def _is_structural(dynamics) -> TypeGuard[AbstractSynapseDynamicsStructural]:
    return isinstance(dynamics, AbstractSynapseDynamicsStructural)


class AbstractPopulationVertex(
        PopulationApplicationVertex, AbstractAcceptsIncomingSynapses,
        AbstractCanReset, SupportsStructure):
    """
    Underlying vertex model for Neural Populations.
    Not actually abstract.
    """

    __slots__ = (
        "__incoming_spike_buffer_size",
        "__n_atoms",
        "__n_profile_samples",
        "__neuron_impl",
        "__neuron_recorder",
        "__synapse_recorder",
        "__parameters",
        "__pynn_model",
        "__state_variables",
        "__initial_state_variables",
        "__updated_state_variables",
        "__ring_buffer_sigma",
        "__spikes_per_second",
        "__drop_late_spikes",
        "__incoming_projections",
        "__incoming_poisson_projections",
        "__synapse_dynamics",
        "__max_row_info",
        "__self_projection",
        "__current_sources",
        "__current_source_id_list",
        "__structure",
        "__rng",
        "__pop_seed",
        "__core_seeds",
        "__connection_cache",
        "__read_initial_values",
        "__have_read_initial_values",
        "__last_parameter_read_time",
        "__n_colour_bits",
        "__extra_partitions")

    #: recording region IDs
    _SPIKE_RECORDING_REGION = 0

    #: the size of the runtime SDP port data region
    _RUNTIME_SDP_PORT_SIZE = BYTES_PER_WORD

    #: The Buffer traffic type
    _TRAFFIC_IDENTIFIER = "BufferTraffic"

    _C_MAIN_BASE_N_CPU_CYCLES = 0
    _NEURON_BASE_N_CPU_CYCLES_PER_NEURON = 22
    _NEURON_BASE_N_CPU_CYCLES = 10
    _SYNAPSE_BASE_N_CPU_CYCLES_PER_NEURON = 22
    _SYNAPSE_BASE_N_CPU_CYCLES = 10

    # Elements before the start of global parameters
    # 1. has key, 2. key, 3. n atoms, 4. n_atoms_peak 5. n_colour_bits
    CORE_PARAMS_BASE_SIZE = 5 * BYTES_PER_WORD

    def __init__(
            self, *, n_neurons: int, label: str,
            max_atoms_per_core: Union[int, Tuple[int, ...]],
            spikes_per_second: Optional[float],
            ring_buffer_sigma: Optional[float],
            max_expected_summed_weight: Optional[List[float]],
            incoming_spike_buffer_size: Optional[int],
            neuron_impl: AbstractNeuronImpl,
            pynn_model: AbstractPyNNNeuronModel, drop_late_spikes: bool,
            splitter: Optional[SplitterAbstractPopulationVertex],
            seed: Optional[int], n_colour_bits: Optional[int],
            extra_partitions: Optional[List[str]] = None):
        """
        :param int n_neurons: The number of neurons in the population
        :param str label: The label on the population
        :param int max_atoms_per_core:
            The maximum number of atoms (neurons) per SpiNNaker core.
        :param spikes_per_second: Expected spike rate
        :type spikes_per_second: float or None
        :param ring_buffer_sigma:
            How many SD above the mean to go for upper bound of ring buffer
            size; a good starting choice is 5.0. Given length of simulation
            we can set this for approximate number of saturation events.
        :type ring_buffer_sigma: float or None
        :param max_expected_summed_weight:
            The maximum expected summed weights for each synapse type.
        :param incoming_spike_buffer_size:
        :type incoming_spike_buffer_size: int or None
        :param bool drop_late_spikes: control flag for dropping late packets.
        :param AbstractNeuronImpl neuron_impl:
            The (Python side of the) implementation of the neurons themselves.
        :param AbstractPyNNNeuronModel pynn_model:
            The PyNN neuron model that this vertex is working on behalf of.
        :param splitter: splitter object
        :type splitter: SplitterAbstractPopulationVertex or None
        :param seed:
            The Population seed, used to ensure the same random generation
            on each run.
        :param int n_colour_bits: The number of colour bits to use
        :param extra_partitions:
            Extra partitions that are to be sent by the vertex
        :type extra_partitions: list(str) or None
        """
        # pylint: disable=too-many-arguments
        super().__init__(label, max_atoms_per_core, splitter)

        self.__n_atoms = self.round_n_atoms(n_neurons, "n_neurons")

        # buffer data
        if incoming_spike_buffer_size is None:
            self.__incoming_spike_buffer_size = get_config_int(
                "Simulation", "incoming_spike_buffer_size")
        else:
            self.__incoming_spike_buffer_size = incoming_spike_buffer_size

        if ring_buffer_sigma is None:
            self.__ring_buffer_sigma = get_config_float(
                "Simulation", "ring_buffer_sigma")
        else:
            self.__ring_buffer_sigma = ring_buffer_sigma

        if spikes_per_second is None:
            self.__spikes_per_second = get_config_float(
                "Simulation", "spikes_per_second")
        else:
            self.__spikes_per_second = spikes_per_second

        self.__max_expected_summed_weight = max_expected_summed_weight
        if (max_expected_summed_weight is not None and
                len(max_expected_summed_weight) !=
                neuron_impl.get_n_synapse_types()):
            raise ValueError(
                "The number of expected summed weights does not match "
                "the number of synapses in the neuron model "
                f"({neuron_impl.get_n_synapse_types()})")

        self.__drop_late_spikes = drop_late_spikes
        if self.__drop_late_spikes is None:
            self.__drop_late_spikes = get_config_bool(
                "Simulation", "drop_late_spikes")

        self.__neuron_impl = neuron_impl
        self.__pynn_model = pynn_model
        self.__parameters: RangeDictionary[float] = RangeDictionary(n_neurons)
        self.__neuron_impl.add_parameters(self.__parameters)
        self.__initial_state_variables: RangeDictionary[float] = \
            RangeDictionary(n_neurons)
        self.__neuron_impl.add_state_variables(self.__initial_state_variables)
        self.__state_variables = self.__initial_state_variables.copy()
        if n_colour_bits is None:
            self.__n_colour_bits = get_config_int(
                "Simulation", "n_colour_bits")
        else:
            self.__n_colour_bits = n_colour_bits

        # Set up for recording
        neuron_recordable_variables = list(
            self.__neuron_impl.get_recordable_variables())
        record_data_types = dict(
            self.__neuron_impl.get_recordable_data_types())
        self.__neuron_recorder = NeuronRecorder(
            neuron_recordable_variables, record_data_types,
            [NeuronRecorder.SPIKES], n_neurons, [], {}, [], {})
        self.__synapse_recorder = NeuronRecorder(
            [], {}, [],
            n_neurons, [NeuronRecorder.PACKETS],
            {NeuronRecorder.PACKETS: NeuronRecorder.PACKETS_TYPE},
            [NeuronRecorder.REWIRING],
            {NeuronRecorder.REWIRING: NeuronRecorder.REWIRING_TYPE})

        # Current sources for this vertex
        self.__current_sources: List[AbstractCurrentSource] = []
        self.__current_source_id_list: Dict[
            AbstractCurrentSource, Selector] = dict()

        # Set up for profiling
        self.__n_profile_samples = get_config_int(
            "Reports", "n_profile_samples")

        # Set up for incoming
        self.__incoming_projections: Dict[
            PopulationApplicationVertex, List[Projection]] = defaultdict(list)
        self.__incoming_poisson_projections: List[Projection] = list()
        self.__max_row_info: Dict[
            Tuple[ProjectionApplicationEdge, SynapseInformation, int],
            MaxRowInfo] = dict()
        self.__self_projection: Optional[Projection] = None

        # Keep track of the synapse dynamics for the vertex overall
        self.__synapse_dynamics: Union[
            AbstractLocalOnly, AbstractSDRAMSynapseDynamics] = \
            SynapseDynamicsStatic()

        self.__structure: Optional[BaseStructure] = None

        # An RNG for use in synaptic generation
        self.__rng = numpy.random.RandomState(seed)
        self.__pop_seed = create_mars_kiss_seeds(self.__rng)
        self.__core_seeds: Dict[Slice, Sequence[int]] = dict()

        # Store connections read from machine until asked to clear
        # Key is app_edge, synapse_info
        self.__connection_cache: Dict[Tuple[
            ProjectionApplicationEdge, SynapseInformation], NDArray] = dict()
        self.__read_initial_values = False
        self.__have_read_initial_values = False
        self.__last_parameter_read_time: Optional[float] = None
        self.__extra_partitions = extra_partitions

    @property
    def extra_partitions(self) -> List[str]:
        """ The extra partitions that are to be sent by the vertex. """
        if self.__extra_partitions is None:
            return []
        return self.__extra_partitions

    @property  # type: ignore[override]
    @overrides(PopulationApplicationVertex.splitter)
    def splitter(self) -> SplitterAbstractPopulationVertex:
        s = self._splitter
        if s is None:
            raise PacmanConfigurationException(
                f"The splitter object on {self._label} has not yet been set.")
        return cast('SplitterAbstractPopulationVertex', s)

    @splitter.setter
    def splitter(self, splitter: SplitterAbstractPopulationVertex):
        if self._splitter == splitter:
            return
        if self.has_splitter:
            raise PacmanConfigurationException(
                f"The splitter object on {self._label} has already been set, "
                "it cannot be reset. Please fix and try again.")
        # Circularity
        # pylint: disable=import-outside-toplevel
        from spynnaker.pyNN.extra_algorithms.splitter_components import (
            SplitterAbstractPopulationVertex as ValidSplitter)
        if not isinstance(splitter, ValidSplitter):
            raise PacmanConfigurationException(
                f"The splitter object on {self._label} must be set to one "
                "capable of handling an AbstractPopulationVertex.")
        self._splitter = cast(Any, splitter)
        splitter.set_governed_app_vertex(self)

[docs] @overrides(PopulationApplicationVertex.get_max_atoms_per_core) def get_max_atoms_per_core(self) -> int: max_atoms = super().get_max_atoms_per_core() # Dynamically adjust depending on the needs of the synapse dynamics return min( max_atoms, self.__synapse_dynamics.absolute_max_atoms_per_core)
[docs] @overrides( PopulationApplicationVertex.get_max_atoms_per_dimension_per_core) def get_max_atoms_per_dimension_per_core(self) -> Tuple[int, ...]: max_atoms = self.get_max_atoms_per_core() # If single dimensional, we can use the max atoms calculation if len(self.atoms_shape) == 1: return (max_atoms, ) # If not, the user has to be more specific if the total number of # atoms is not small enough to fit on one core max_per_dim = super().get_max_atoms_per_dimension_per_core() if numpy.prod(max_per_dim) > max_atoms: raise SpynnakerException( "When using a multidimensional Population, a maximum number of" " neurons per core for each dimension must be provided such" " that the total number of neurons per core is less than or" f" equal to {max_atoms}") if len(max_per_dim) != len(self.atoms_shape): raise SpynnakerException( "When using a multidimensional Population, a maximum number of" " neurons per core must be provided for each dimension (in" " this case, please set a max neurons per core with" f" {len(self.atoms_shape)} dimensions)") return max_per_dim
[docs] @overrides(PopulationApplicationVertex. set_max_atoms_per_dimension_per_core) def set_max_atoms_per_dimension_per_core( self, new_value: Union[None, int, Tuple[int, ...]]): if new_value is not None: max_atoms = self.__synapse_dynamics.absolute_max_atoms_per_core if numpy.prod(new_value) > max_atoms: raise SpynnakerException( "In the current configuration, the maximum number of" " neurons for each dimension must be such that the total" " number of neurons per core is less than or equal to" f" {max_atoms}") super().set_max_atoms_per_dimension_per_core(new_value)
[docs] @overrides(SupportsStructure.set_structure) def set_structure(self, structure: BaseStructure): self.__structure = structure
@property def combined_core_capable(self) -> bool: """ Whether the vertex can manage to operate on a combined neuron-synapse core, or if a split synapse-core is more appropriate. .. note:: This is currently based only on the ITCM available, not on the incoming synapses, but could be combined with n_synapse_cores_required to determine if, and how-many, synapse cores are needed. :rtype: bool """ return self.__synapse_dynamics.is_combined_core_capable @property def n_synapse_cores_required(self) -> int: """ The estimated number of synapse cores required, when using a split synapse-neuron core model. .. note:: This is currently hard-coded but could be updated to work this out based on the number of incoming synapses. :rtype: int """ return 1 @property def synapse_dynamics(self) -> AbstractSynapseDynamics: """ The synapse dynamics used by the synapses e.g. plastic or static. Settable. :rtype: AbstractSynapseDynamics """ return self.__synapse_dynamics @synapse_dynamics.setter def synapse_dynamics(self, synapse_dynamics: AbstractSynapseDynamics): """ Set the synapse dynamics. .. note:: After setting, the dynamics might not be the type set as it can be combined with the existing dynamics in exciting ways. :param AbstractSynapseDynamics synapse_dynamics: The synapse dynamics to set """ merged = self.__synapse_dynamics.merge(synapse_dynamics) assert isinstance(merged, ( AbstractLocalOnly, AbstractSDRAMSynapseDynamics)), \ f"unhandled type of merged synapse dynamics: {type(merged)}" self.__synapse_dynamics = merged
[docs] def add_incoming_projection(self, projection: Projection): """ Add a projection incoming to this vertex. :param ~spynnaker.pyNN.models.projection.Projection projection: The new projection to add """ # Reset the ring buffer shifts as a projection has been added SpynnakerDataView.set_requires_mapping() self.__max_row_info.clear() # pylint: disable=protected-access pre_vertex = projection._projection_edge.pre_vertex self.__incoming_projections[pre_vertex].append(projection) if pre_vertex == self: self.__self_projection = projection if isinstance(pre_vertex, SpikeSourcePoissonVertex): self.__incoming_poisson_projections.append(projection)
@property def self_projection(self) -> Optional[Projection]: """ Any projection from this vertex to itself. :rtype: ~spynnaker.pyNN.models.projection.Projection or None """ return self.__self_projection @property @overrides(PopulationApplicationVertex.n_atoms) def n_atoms(self) -> int: return self.__n_atoms @property @overrides(PopulationApplicationVertex.atoms_shape) def atoms_shape(self) -> Tuple[int, ...]: if isinstance(self.__structure, (Grid2D, Grid3D)): return self.__structure.calculate_size(self.__n_atoms) return super().atoms_shape @property def size(self) -> int: """ The number of neurons in the vertex. :rtype: int """ return self.__n_atoms @property def incoming_spike_buffer_size(self) -> int: """ The size of the incoming spike buffer to be used on the cores. :rtype: int """ return self.__incoming_spike_buffer_size @property def parameters(self) -> RangeDictionary[float]: """ The parameters of the neurons in the population. :rtype: ~spinn_utilities.ranged.RangeDictionary """ return self.__parameters @property def state_variables(self) -> RangeDictionary[float]: """ The state variables of the neuron in the population. :rtype: ~spinn_utilities.ranged.RangeDictionary """ return self.__state_variables @property def initial_state_variables(self) -> RangeDictionary[float]: """ The initial values of the state variables of the neurons. :rtype: ~spinn_utilities.ranged.RangeDictionary """ return self.__initial_state_variables @property def neuron_impl(self) -> AbstractNeuronImpl: """ The neuron implementation. :rtype: AbstractNeuronImpl """ return self.__neuron_impl @property def n_profile_samples(self) -> int: """ The maximum number of profile samples to report. :rtype: int """ return self.__n_profile_samples @property def neuron_recorder(self) -> NeuronRecorder: """ The recorder for neurons. :rtype: NeuronRecorder """ return self.__neuron_recorder @property def synapse_recorder(self) -> NeuronRecorder: """ The recorder for synapses. :rtype: NeuronRecorder """ return self.__synapse_recorder @property def drop_late_spikes(self) -> bool: """ Whether spikes should be dropped if not processed in a timestep. :rtype: bool """ return self.__drop_late_spikes
[docs] def get_sdram_usage_for_core_neuron_params(self, n_atoms: int) -> int: """ :param int n_atoms: The number of atoms per core :return: The SDRAM required for the core neuron parameters :rtype: int """ return ( self.CORE_PARAMS_BASE_SIZE + (self.__neuron_impl.get_n_synapse_types() * BYTES_PER_WORD) + # The keys per neuron n_atoms * BYTES_PER_WORD)
[docs] def get_sdram_usage_for_neuron_params(self, n_atoms: int) -> int: """ Calculate the SDRAM usage for just the neuron parameters region. :param int n_atoms: The number of atoms per core :return: The SDRAM required for the neuron region :rtype: int """ return sum(s.get_size_in_whole_words(n_atoms) if s.repeat_type == StructRepeat.PER_NEURON else s.get_size_in_whole_words() for s in self.__neuron_impl.structs) * BYTES_PER_WORD
[docs] def get_sdram_usage_for_neuron_generation(self, n_atoms: int) -> int: """ Calculate the SDRAM usage for the neuron generation region. :param int n_atoms: The number of atoms per core :return: The SDRAM required for the neuron generator region :rtype: int """ return (self.__get_sdram_usage_for_neuron_struct_generation(n_atoms) + self.__neuron_recorder.get_generator_sdram_usage_in_bytes( n_atoms))
def __get_sdram_usage_for_neuron_struct_generation( self, n_atoms: int) -> int: """ Calculate the SDRAM usage for the neuron struct generation region. :param int n_atoms: The number of atoms per core :return: The SDRAM required for the neuron generator region :rtype: int """ # Uses nothing if not generatable structs = self.__neuron_impl.structs for struct in structs: if not struct.is_generatable: return 0 # If structs are generatable, we can guess that parameters are, # and then assume each parameter is different for maximum SDRAM. n_structs = len(structs) n_params = sum(len(s.fields) for s in structs) return sum([ _NEURON_GENERATOR_BASE_SDRAM, _NEURON_GENERATOR_PER_STRUCT * n_structs, _NEURON_GENERATOR_PER_PARAM * n_params, _NEURON_GENERATOR_PER_ITEM * n_params * n_atoms ])
[docs] def get_sdram_usage_for_current_source_params(self, n_atoms: int) -> int: """ Calculate the SDRAM usage for the current source parameters region. :param int n_atoms: The number of atoms to account for :return: The SDRAM required for the current source region :rtype: int """ # If non at all, just output size of 0 declaration if not self.__current_sources: return BYTES_PER_WORD # This is a worst-case count, assuming all sources apply to all atoms # Start with the count of sources + count of sources per neuron sdram_usage = BYTES_PER_WORD + (n_atoms * BYTES_PER_WORD) # There is a number of each different type of current source sdram_usage += 4 * BYTES_PER_WORD # Add on size of neuron id list per source (remember assume all atoms) sdram_usage += ( len(self.__current_sources) * 2 * n_atoms * BYTES_PER_WORD) # Add on the size of the current source data + neuron id list per # source (remember, assume all neurons for worst case) for current_source in self.__current_sources: sdram_usage += current_source.get_sdram_usage_in_bytes() return sdram_usage
def __read_parameters_now(self) -> None: # If we already read the parameters at this time, don't do it again current_time = SpynnakerDataView().get_current_run_time_ms() if self.__last_parameter_read_time == current_time: return self.__last_parameter_read_time = current_time for m_vertex in self.machine_vertices: placement = SpynnakerDataView.get_placement_of_vertex(m_vertex) if isinstance(m_vertex, PopulationMachineNeurons): m_vertex.read_parameters_from_machine(placement) def __read_initial_parameters_now(self) -> None: # If we already read the initial parameters, don't do it again if self.__have_read_initial_values: return for m_vertex in self.machine_vertices: placement = SpynnakerDataView.get_placement_of_vertex(m_vertex) if isinstance(m_vertex, PopulationMachineNeurons): m_vertex.read_initial_parameters_from_machine(placement) def __read_parameter( self, name: str, selector: Selector = None) -> Sequence[float]: return self.__parameters[name].get_values(selector)
[docs] @overrides(PopulationApplicationVertex.get_parameter_values) def get_parameter_values( self, names: Names, selector: Selector = None) -> ParameterHolder: self._check_parameters(names, set(self.__parameters.keys())) # If we haven't yet run, or have just reset, note to read the values # when they are ready if not SpynnakerDataView.is_ran_last(): self.__read_initial_values = True elif SpynnakerDataView.has_transceiver(): self.__read_parameters_now() return ParameterHolder(names, self.__read_parameter, selector)
[docs] @overrides(PopulationApplicationVertex.set_parameter_values) def set_parameter_values( self, name: str, value: Values, selector: Selector = None): # If we have run, and not reset, we need to read the values back # so that we don't overwrite the state. Note that a reset will # then make this a waste, but we can't see the future... if SpynnakerDataView.is_ran_last(): self.__read_parameters_now() self.__tell_neuron_vertices_to_regenerate() self.__parameters[name].set_value_by_selector(selector, value)
[docs] @overrides(PopulationApplicationVertex.get_parameters) def get_parameters(self) -> List[str]: return list(self.__pynn_model.default_parameters.keys())
def __read_initial_state_variable( self, name: str, selector: Selector = None) -> Sequence[float]: return self.__initial_state_variables[name].get_values(selector)
[docs] @overrides(PopulationApplicationVertex.get_initial_state_values) def get_initial_state_values( self, names: Names, selector: Selector = None) -> ParameterHolder: self._check_variables(names, set(self.__state_variables.keys())) # If we haven't yet run, or have just reset, note to read the values # when they are ready if not SpynnakerDataView.is_ran_last(): self.__read_initial_values = True else: self.__read_initial_parameters_now() return ParameterHolder( names, self.__read_initial_state_variable, selector)
[docs] @overrides(PopulationApplicationVertex.set_initial_state_values) def set_initial_state_values( self, name: str, value: Values, selector: Selector = None): self._check_variables([name], set(self.__state_variables.keys())) if not SpynnakerDataView.is_ran_last(): self.__state_variables[name].set_value_by_selector( selector, value) self.__initial_state_variables[name].set_value_by_selector( selector, value)
def __read_current_state_variable( self, name: str, selector: Selector = None) -> Sequence[float]: return self.__state_variables[name].get_values(selector)
[docs] @overrides(PopulationApplicationVertex.get_current_state_values) def get_current_state_values( self, names: Names, selector: Selector = None) -> ParameterHolder: self._check_variables(names, set(self.__state_variables.keys())) # If we haven't yet run, or have just reset, note to read the values # when they are ready if not SpynnakerDataView.is_ran_last(): self.__read_initial_values = True else: self.__read_parameters_now() return ParameterHolder( names, self.__read_current_state_variable, selector)
[docs] @overrides(PopulationApplicationVertex.set_current_state_values) def set_current_state_values( self, name: str, value: Values, selector: Selector = None): self._check_variables([name], set(self.__state_variables.keys())) # If we have run, and not reset, we need to read the values back # so that we don't overwrite all the state. Note that a reset will # then make this a waste, but we can't see the future... if SpynnakerDataView.is_ran_last(): self.__read_parameters_now() self.__tell_neuron_vertices_to_regenerate() self.__state_variables[name].set_value_by_selector( selector, value)
[docs] @overrides(PopulationApplicationVertex.get_state_variables) def get_state_variables(self) -> List[str]: return list(self.__pynn_model.default_initial_values.keys())
[docs] @overrides(PopulationApplicationVertex.get_units) def get_units(self, name: str) -> str: if name in _EXTRA_RECORDABLE_UNITS: return _EXTRA_RECORDABLE_UNITS[name] if self.__neuron_impl.is_recordable(name): return self.__neuron_impl.get_recordable_units(name) if (name not in self.__parameters and name not in self.__state_variables): raise KeyError(f"No such parameter {name}") return self.__neuron_impl.get_units(name)
@property @overrides(PopulationApplicationVertex.conductance_based) def conductance_based(self) -> bool: return self.__neuron_impl.is_conductance_based
[docs] @overrides(PopulationApplicationVertex.get_recordable_variables) def get_recordable_variables(self) -> List[str]: return [ *self.__neuron_recorder.get_recordable_variables(), *self.__synapse_recorder.get_recordable_variables()]
[docs] @overrides(PopulationApplicationVertex.get_buffer_data_type) def get_buffer_data_type(self, name: str) -> BufferDataType: if self.__neuron_recorder.is_recordable(name): return self.__neuron_recorder.get_buffer_data_type(name) if self.__synapse_recorder.is_recordable(name): return self.__synapse_recorder.get_buffer_data_type(name) raise KeyError(f"It is not possible to record {name}")
[docs] @overrides(PopulationApplicationVertex.set_recording) def set_recording( self, name: str, sampling_interval: Optional[float] = None, indices: Optional[Collection[int]] = None): if self.__neuron_recorder.is_recordable(name): self.__neuron_recorder.set_recording( name, True, sampling_interval, indices) elif self.__synapse_recorder.is_recordable(name): self.__synapse_recorder.set_recording( name, True, sampling_interval, indices) else: raise KeyError(f"It is not possible to record {name}") SpynnakerDataView.set_requires_mapping()
[docs] @overrides(PopulationApplicationVertex.set_not_recording) def set_not_recording( self, name: str, indices: Optional[Collection[int]] = None): if self.__neuron_recorder.is_recordable(name): self.__neuron_recorder.set_recording(name, False, indexes=indices) elif self.__synapse_recorder.is_recordable(name): self.__synapse_recorder.set_recording(name, False, indexes=indices) else: raise KeyError(f"It is not possible to record {name}")
[docs] @overrides(PopulationApplicationVertex.get_recording_variables) def get_recording_variables(self) -> List[str]: return [ *self.__neuron_recorder.recording_variables, *self.__synapse_recorder.recording_variables]
[docs] @overrides(PopulationApplicationVertex.get_sampling_interval_ms) def get_sampling_interval_ms(self, name: str) -> float: if self.__neuron_recorder.is_recordable(name): return self.__neuron_recorder.get_sampling_interval_ms(name) if self.__synapse_recorder.is_recordable(name): return self.__synapse_recorder.get_sampling_interval_ms(name) raise KeyError(f"It is not possible to record {name}")
[docs] @overrides(PopulationApplicationVertex.get_data_type) def get_data_type(self, name: str) -> Optional[DataType]: if self.__neuron_recorder.is_recordable(name): return self.__neuron_recorder.get_data_type(name) if self.__synapse_recorder.is_recordable(name): return self.__synapse_recorder.get_data_type(name) raise KeyError(f"It is not possible to record {name}")
[docs] @overrides(PopulationApplicationVertex.get_recording_region) def get_recording_region(self, name: str) -> int: if self.__neuron_recorder.is_recordable(name): return self.__neuron_recorder.get_region(name) if self.__synapse_recorder.is_recordable(name): return self.__synapse_recorder.get_region(name) raise KeyError(f"It is not possible to record {name}")
[docs] @overrides(PopulationApplicationVertex.get_neurons_recording) def get_neurons_recording( self, name: str, vertex_slice: Slice) -> Optional[Collection[int]]: if self.__neuron_recorder.is_recordable(name): return self.__neuron_recorder.neurons_recording( name, vertex_slice) if self.__synapse_recorder.is_recordable(name): return self.__synapse_recorder.neurons_recording( name, vertex_slice) raise KeyError(f"It is not possible to record {name}")
@property def weight_scale(self) -> float: """ :rtype: float """ return self.__neuron_impl.get_global_weight_scale() @property def ring_buffer_sigma(self) -> float: """ :rtype: float """ return self.__ring_buffer_sigma @ring_buffer_sigma.setter def ring_buffer_sigma(self, ring_buffer_sigma: float): self.__ring_buffer_sigma = ring_buffer_sigma @property def spikes_per_second(self) -> float: """ :rtype: float """ return self.__spikes_per_second @spikes_per_second.setter def spikes_per_second(self, spikes_per_second: float): self.__spikes_per_second = spikes_per_second
[docs] def set_synapse_dynamics(self, synapse_dynamics: AbstractSynapseDynamics): """ Set the synapse dynamics of this population. :param AbstractSynapseDynamics synapse_dynamics: The synapse dynamics to set """ self.synapse_dynamics = synapse_dynamics
[docs] def clear_connection_cache(self) -> None: """ Flush the cache of connection information; needed for a second run. """ self.__connection_cache.clear()
[docs] def describe(self): """ Get a human-readable description of the cell or synapse type. The output may be customised by specifying a different template together with an associated template engine (see :py:mod:`pyNN.descriptions`). If template is `None`, then a dictionary containing the template context will be returned. :rtype: dict(str, any) """ parameters = dict(self.get_parameter_values( self.__pynn_model.default_parameters.keys())) context = { "name": self.__neuron_impl.model_name, "default_parameters": self.__pynn_model.default_parameters, "default_initial_values": self.__pynn_model.default_parameters, "parameters": parameters, } return context
[docs] def get_synapse_id_by_target(self, target: str) -> Optional[int]: """ Get the id of synapse using its target name. :param str target: The synapse to get the id of :rtype: int """ return self.__neuron_impl.get_synapse_id_by_target(target)
[docs] @overrides(PopulationApplicationVertex.inject) def inject( self, current_source: AbstractCurrentSource, selector: Selector = None): self.__current_sources.append(current_source) self.__current_source_id_list[current_source] = selector # set the associated vertex (for multi-run case) current_source.set_app_vertex(self) # set to reload for multi-run case for m_vertex in self.machine_vertices: m_vertex.set_reload_required(True)
@property def current_sources(self) -> List[AbstractCurrentSource]: """ Current sources needed to be available to machine vertex. :rtype: list(AbstractCurrentSource) """ return self.__current_sources @property def current_source_id_list(self) -> Dict[AbstractCurrentSource, Selector]: """ Current source ID list needed to be available to machine vertex. :rtype: dict(AbstractCurrentSource,any) """ return self.__current_source_id_list def __str__(self) -> str: return f"{self.label} with {self.n_atoms} atoms" def __repr__(self): return self.__str__()
[docs] @overrides(AbstractCanReset.reset_to_first_timestep) def reset_to_first_timestep(self) -> None: # Reset state variables self.__state_variables.copy_into(self.__initial_state_variables) # If synapses change during the run also regenerate these to get # back to the initial state if self.__synapse_dynamics.changes_during_run: SpynnakerDataView.set_requires_data_generation() else: # We only get neuron vertices to regenerate not redoing data # generation self.__tell_neuron_vertices_to_regenerate()
@staticmethod def _ring_buffer_expected_upper_bound( weight_mean: float, weight_std_dev: float, spikes_per_second: float, n_synapses_in: int, sigma: float) -> float: """ Provides expected upper bound on accumulated values in a ring buffer element. Requires an assessment of maximum Poisson input rate. Assumes knowledge of mean and SD of weight distribution, fan-in and timestep. All arguments should be assumed real values except n_synapses_in which will be an integer. :param float weight_mean: Mean of weight distribution (in either nA or microSiemens as required) :param float weight_std_dev: SD of weight distribution :param float spikes_per_second: Maximum expected Poisson rate in Hz :param int machine_timestep: in us :param int n_synapses_in: No of connected synapses :param float sigma: How many SD above the mean to go for upper bound; a good starting choice is 5.0. Given length of simulation we can set this for approximate number of saturation events. :rtype: float """ # E[ number of spikes ] in a timestep average_spikes_per_timestep = ( float(n_synapses_in * spikes_per_second) / SpynnakerDataView.get_simulation_time_step_per_s()) # Exact variance contribution from inherent Poisson variation poisson_variance = average_spikes_per_timestep * (weight_mean ** 2) # Upper end of range for Poisson summation required below # upper_bound needs to be an integer upper_bound = int(round(average_spikes_per_timestep + POSSION_SIGMA_SUMMATION_LIMIT * math.sqrt(average_spikes_per_timestep))) # pylint:disable=wrong-spelling-in-comment # Closed-form exact solution for summation that gives the variance # contributed by weight distribution variation when modulated by # Poisson PDF. Requires scipy.special for gamma and incomplete gamma # functions. Beware: incomplete gamma doesn't work the same as # Mathematica because (1) it's regularised and needs a further # multiplication and (2) it's actually the complement that is needed # i.e. 'gammaincc'] weight_variance = 0.0 if weight_std_dev > 0: # pylint: disable=no-member lngamma = special.gammaln(1 + upper_bound) gammai = special.gammaincc( 1 + upper_bound, average_spikes_per_timestep) big_ratio = (math.log(average_spikes_per_timestep) * upper_bound - lngamma) if -701.0 < big_ratio < 701.0 and big_ratio != 0.0: log_weight_variance = ( -average_spikes_per_timestep + math.log(average_spikes_per_timestep) + 2.0 * math.log(weight_std_dev) + math.log(math.exp(average_spikes_per_timestep) * gammai - math.exp(big_ratio))) weight_variance = math.exp(log_weight_variance) # upper bound calculation -> mean + n * SD return ((average_spikes_per_timestep * weight_mean) + (sigma * math.sqrt(poisson_variance + weight_variance)))
[docs] def get_ring_buffer_shifts(self) -> List[int]: """ Get the shift of the ring buffers for transfer of values into the input buffers for this model. :rtype: list(int) """ n_synapse_types = self.__neuron_impl.get_n_synapse_types() max_weights = numpy.zeros(n_synapse_types) if self.__max_expected_summed_weight is not None: max_weights[:] = self.__max_expected_summed_weight max_weights *= self.__neuron_impl.get_global_weight_scale() else: stats = _Stats(self.__neuron_impl, self.__spikes_per_second, self.__ring_buffer_sigma) for proj in self.incoming_projections: # pylint: disable=protected-access synapse_info = proj._synapse_information # Skip if this is a synapse dynamics synapse type if synapse_info.synapse_type_from_dynamics: continue stats.add_projection(proj) for synapse_type in range(n_synapse_types): max_weights[synapse_type] = stats.get_max_weight(synapse_type) # Convert these to powers; we could use int.bit_length() for this if # they were integers, but they aren't... max_weight_powers = ( 0 if w <= 0 else int(math.ceil(max(0, math.log2(w)))) for w in max_weights) # If 2^max_weight_power equals the max weight, we have to add another # power, as range is 0 - (just under 2^max_weight_power)! max_weight_powers = ( w + 1 if (2 ** w) <= a else w for w, a in zip(max_weight_powers, max_weights)) return list(max_weight_powers)
@staticmethod def __get_weight_scale(ring_buffer_to_input_left_shift: int) -> float: """ Return the amount to scale the weights by to convert them from floating point values to 16-bit fixed point numbers which can be shifted left by ring_buffer_to_input_left_shift to produce an s1615 fixed point number. :param int ring_buffer_to_input_left_shift: :rtype: float """ return float(math.pow(2, 16 - (ring_buffer_to_input_left_shift + 1)))
[docs] def get_weight_scales( self, ring_buffer_shifts: Iterable[int] ) -> NDArray[numpy.floating]: """ Get the weight scaling to apply to weights in synapses. :param list(int) ring_buffer_shifts: The shifts to convert to weight scales :rtype: list(int) """ weight_scale = self.__neuron_impl.get_global_weight_scale() return numpy.array([ self.__get_weight_scale(r) * weight_scale for r in ring_buffer_shifts])
[docs] @overrides(AbstractAcceptsIncomingSynapses.get_connections_from_machine) def get_connections_from_machine( self, app_edge: ProjectionApplicationEdge, synapse_info: SynapseInformation) -> ConnectionsArray: # If we already have connections cached, return them if (app_edge, synapse_info) in self.__connection_cache: return self.__connection_cache[app_edge, synapse_info] # Start with something in the list so that concatenate works connections = [numpy.zeros(0, dtype=NUMPY_CONNECTORS_DTYPE)] progress = ProgressBar( len(self.machine_vertices), f"Getting synaptic data between {app_edge.pre_vertex.label} " f"and {app_edge.post_vertex.label}") for post_vertex in progress.over(self.machine_vertices): placement = SpynnakerDataView.get_placement_of_vertex(post_vertex) if isinstance(post_vertex, HasSynapses): connections.extend(post_vertex.get_connections_from_machine( placement, app_edge, synapse_info)) all_connections = numpy.concatenate(connections) self.__connection_cache[app_edge, synapse_info] = all_connections return all_connections
[docs] def get_synapse_params_size(self) -> int: """ Get the size of the synapse parameters, in bytes. :rtype: int """ # This will only hold ring buffer scaling for the neuron synapse # types return (_SYNAPSES_BASE_SDRAM_USAGE_IN_BYTES + (BYTES_PER_WORD * self.__neuron_impl.get_n_synapse_types()))
[docs] def get_synapse_dynamics_size(self, n_atoms: int) -> int: """ Get the size of the synapse dynamics region, in bytes. :rtype: int """ if isinstance(self.__synapse_dynamics, AbstractLocalOnly): return self.__synapse_dynamics.get_parameters_usage_in_bytes( n_atoms, self.incoming_projections) return self.__synapse_dynamics.get_parameters_sdram_usage_in_bytes( n_atoms, self.__neuron_impl.get_n_synapse_types())
[docs] def get_structural_dynamics_size(self, n_atoms: int) -> int: """ Get the size of the structural dynamics region, in bytes. :param int n_atoms: The number of atoms in the slice :rtype: int """ if not _is_structural(self.__synapse_dynamics): return 0 return self.__synapse_dynamics\ .get_structural_parameters_sdram_usage_in_bytes( self.incoming_projections, n_atoms)
[docs] def get_synapses_size(self, n_post_atoms: int) -> int: """ Get the maximum SDRAM usage for the synapses on a vertex slice. :param int n_post_atoms: The number of atoms projected to :param incoming_projections: The projections to consider in the calculations :type incoming_projections: list(~spynnaker.pyNN.models.projection.Projection) :rtype: int """ if isinstance(self.__synapse_dynamics, AbstractLocalOnly): return 0 addr = 2 * BYTES_PER_WORD for proj in self.incoming_projections: addr = self.__add_matrix_size(addr, proj, n_post_atoms) return addr
def __add_matrix_size(self, address: int, projection: Projection, n_post_atoms: int) -> int: """ Add to the address the size of the matrices for the projection to the vertex slice. :param int address: The address to start from :param ~spynnaker.pyNN.models.projection.Projection: The projection to add :param int n_post_atoms: The number of atoms projected to :rtype: int """ # pylint: disable=protected-access synapse_info = projection._synapse_information app_edge = projection._projection_edge max_row_info = self.get_max_row_info( synapse_info, n_post_atoms, app_edge) vertex = app_edge.pre_vertex max_atoms = vertex.get_max_atoms_per_core() n_sub_atoms = int(min(max_atoms, vertex.n_atoms)) n_sub_edges = int(math.ceil(vertex.n_atoms / n_sub_atoms)) if max_row_info.undelayed_max_n_synapses > 0: size = n_sub_atoms * max_row_info.undelayed_max_bytes for _ in range(n_sub_edges): try: address = \ MasterPopTableAsBinarySearch.get_next_allowed_address( address) except SynapticConfigurationException as ex: values = self.__incoming_projections.values() n_projections = (sum(len(x) for x in values)) if n_projections > 100: raise SpynnakerException( f"{self} has {n_projections} incoming Projections " f"which is more than Spynnaker can handle.")\ from ex raise address += size if max_row_info.delayed_max_n_synapses > 0: size = (n_sub_atoms * max_row_info.delayed_max_bytes * app_edge.n_delay_stages) for _ in range(n_sub_edges): address = \ MasterPopTableAsBinarySearch.get_next_allowed_address( address) address += size return address
[docs] def get_max_row_info( self, synapse_info: SynapseInformation, n_post_atoms: int, app_edge: ProjectionApplicationEdge) -> MaxRowInfo: """ Get maximum row length data. :param SynapseInformation synapse_info: Information about synapses :param int n_post_atoms: The number of atoms projected to :param ProjectionApplicationEdge app_edge: The edge of the projection :rtype: MaxRowInfo """ key = (app_edge, synapse_info, n_post_atoms) if key in self.__max_row_info: return self.__max_row_info[key] max_row_info = get_max_row_info( synapse_info, n_post_atoms, app_edge.n_delay_stages, app_edge) self.__max_row_info[key] = max_row_info return max_row_info
[docs] def get_synapse_expander_size(self) -> int: """ Get the size of the synapse expander region, in bytes. :rtype: int """ size = SYNAPSES_BASE_GENERATOR_SDRAM_USAGE_IN_BYTES size += (self.__neuron_impl.get_n_synapse_types() * DataType.U3232.size) for proj in self.incoming_projections: # pylint: disable=protected-access synapse_info = proj._synapse_information app_edge = proj._projection_edge n_sub_edges = len( app_edge.pre_vertex.splitter.get_out_going_slices()) if not n_sub_edges: vertex = app_edge.pre_vertex max_atoms = float(min(vertex.get_max_atoms_per_core(), vertex.n_atoms)) n_sub_edges = int(math.ceil(vertex.n_atoms / max_atoms)) size += self.__generator_info_size(synapse_info) * n_sub_edges size += get_sdram_for_keys(self.incoming_projections) return size
@staticmethod def __generator_info_size(synapse_info: SynapseInformation) -> int: """ The number of bytes required by the generator information. :param SynapseInformation synapse_info: The synapse information to use :rtype: int """ if not synapse_info.may_generate_on_machine(): return 0 connector = cast( AbstractGenerateConnectorOnMachine, synapse_info.connector) return ( GeneratorData.BASE_SIZE + connector.gen_delay_params_size_in_bytes(synapse_info.delays) + connector.gen_weight_params_size_in_bytes(synapse_info.weights) + connector.gen_connector_params_size_in_bytes + synapse_info.synapse_dynamics.gen_matrix_params_size_in_bytes) @property def synapse_executable_suffix(self) -> str: """ The suffix of the executable name due to the type of synapses in use. :rtype: str """ return self.__synapse_dynamics.get_vertex_executable_suffix() @property def neuron_recordables(self) -> List[str]: """ The names of variables that can be recorded by the neuron. :rtype: list(str) """ return self.__neuron_recorder.get_recordable_variables() @property def synapse_recordables(self) -> List[str]: """ The names of variables that can be recorded by the synapses. :rtype: list(str) """ return self.__synapse_recorder.get_recordable_variables()
[docs] def get_common_constant_sdram( self, n_record: int, n_provenance: int, common_regions: CommonRegions) -> MultiRegionSDRAM: """ Get the amount of SDRAM used by common parts. :param int n_record: The number of recording regions :param int n_provenance: The number of provenance items :param CommonRegions common_regions: Region IDs :rtype: MultiRegionSDRAM """ sdram = MultiRegionSDRAM() sdram.add_cost(common_regions.system, SYSTEM_BYTES_REQUIREMENT) sdram.add_cost( common_regions.recording, get_recording_header_size(n_record) + get_recording_data_constant_size(n_record)) sdram.add_cost( common_regions.provenance, ProvidesProvenanceDataFromMachineImpl.get_provenance_data_size( n_provenance)) sdram.add_cost( common_regions.profile, get_profile_region_size(self.__n_profile_samples or 0)) return sdram
[docs] def get_neuron_variable_sdram(self, vertex_slice: Slice) -> AbstractSDRAM: """ Get the amount of SDRAM per timestep used by neuron parts. :param ~pacman.model.graphs.common.Slice vertex_slice: The slice of neurons to get the size of :rtype: int """ return self.__neuron_recorder.get_variable_sdram_usage(vertex_slice)
[docs] def get_max_neuron_variable_sdram(self, n_neurons: int) -> AbstractSDRAM: """ Get the amount of SDRAM per timestep used by neuron parts. :rtype: int """ return self.__neuron_recorder.get_max_variable_sdram_usage(n_neurons)
[docs] def get_synapse_variable_sdram(self, vertex_slice: Slice) -> AbstractSDRAM: """ Get the amount of SDRAM per timestep used by synapse parts. :param ~pacman.model.graphs.common.Slice vertex_slice: The slice of neurons to get the size of :rtype: int """ if _is_structural(self.__synapse_dynamics): self.__synapse_recorder.set_max_rewires_per_ts( self.__synapse_dynamics.get_max_rewires_per_ts()) return self.__synapse_recorder.get_variable_sdram_usage(vertex_slice)
[docs] def get_max_synapse_variable_sdram(self, n_neurons: int) -> AbstractSDRAM: """ Get the amount of SDRAM per timestep used by synapse parts. :rtype: int """ if _is_structural(self.__synapse_dynamics): self.__synapse_recorder.set_max_rewires_per_ts( self.__synapse_dynamics.get_max_rewires_per_ts()) return self.__synapse_recorder.get_max_variable_sdram_usage(n_neurons)
[docs] def get_neuron_constant_sdram( self, n_atoms: int, neuron_regions: NeuronRegions) -> MultiRegionSDRAM: """ Get the amount of fixed SDRAM used by neuron parts. :param NeuronRegions neuron_regions: Region IDs :rtype: int """ params_cost = self.get_sdram_usage_for_neuron_params(n_atoms) sdram = MultiRegionSDRAM() sdram.add_cost( neuron_regions.core_params, self.get_sdram_usage_for_core_neuron_params(n_atoms)) sdram.add_cost(neuron_regions.neuron_params, params_cost) sdram.add_cost( neuron_regions.current_source_params, self.get_sdram_usage_for_current_source_params(n_atoms)) sdram.add_cost( neuron_regions.neuron_recording, self.__neuron_recorder.get_metadata_sdram_usage_in_bytes( n_atoms)) sdram.add_cost( neuron_regions.neuron_builder, self.get_sdram_usage_for_neuron_generation(n_atoms)) sdram.add_cost(neuron_regions.initial_values, params_cost) return sdram
@property def incoming_projections(self) -> Iterable[Projection]: """ The projections that target this population vertex. :rtype: iterable(~spynnaker.pyNN.models.projection.Projection) """ for proj_list in self.__incoming_projections.values(): yield from proj_list
[docs] def get_incoming_projections_from( self, source_vertex: PopulationApplicationVertex ) -> Iterable[Projection]: """ The projections that target this population vertex from the given source. :rtype: iterable(~spynnaker.pyNN.models.projection.Projection) """ return self.__incoming_projections[source_vertex]
@property def incoming_poisson_projections(self) -> Sequence[Projection]: """ The projections that target this population vertex which originate from a Poisson source. :rtype: iterable(~spynnaker.pyNN.models.projection.Projection) """ return self.__incoming_poisson_projections @property def pop_seed(self) -> Sequence[int]: """ The seed to use for the population overall; a list of four integers. :rtype: list(int) """ return self.__pop_seed
[docs] def core_seed(self, vertex_slice: Slice) -> Sequence[int]: """ The seed to use for a core. :param ~pacman.model.graphs.common.Slice vertex_slice: The machine vertex that the seed is for :return: A list of 4 integers :rtype: list(int) """ if vertex_slice not in self.__core_seeds: self.__core_seeds[vertex_slice] = create_mars_kiss_seeds( self.__rng) return self.__core_seeds[vertex_slice]
[docs] def copy_initial_state_variables(self, vertex_slice: Slice): """ Copies the state variables into the initial state variables. :param ~pacman.model.graphs.common.Slice vertex_slice: The slice to copy now """ for key in self.__state_variables.keys(): value = self.__state_variables[key][vertex_slice.get_raster_ids()] self.__initial_state_variables[key].set_value_by_ids( vertex_slice.get_raster_ids(), value) # This is called during reading of initial values, so we don't # need to do it again self.__read_initial_values = False self.__have_read_initial_values = True
@property def read_initial_values(self) -> bool: """ Whether initial values need to be stored. :rtype: bool """ return self.__read_initial_values def __tell_neuron_vertices_to_regenerate(self) -> None: for vertex in self.machine_vertices: if isinstance(vertex, PopulationMachineNeurons): vertex.set_do_neuron_regeneration() @property @overrides(PopulationApplicationVertex.n_colour_bits) def n_colour_bits(self) -> int: return self.__n_colour_bits
[docs] def get_max_delay(self, max_ring_buffer_bits: int) -> Tuple[int, bool]: """ Get the maximum delay and whether a delay extension is needed for a given maximum number of ring buffer bits. :param int max_ring_buffer_bits: The maximum number of bits that can be used for the ring buffer identifier (i.e. delay, synapse type, neuron index) :return: Tuple of the maximum delay supported on the core and whether a delay extension is needed to support delays :rtype: tuple(int, bool) """ # Find the maximum delay from incoming synapses max_delay_ms = 0 for proj in self.incoming_projections: # pylint: disable=protected-access s_info = proj._synapse_information proj_max_delay = s_info.synapse_dynamics.get_delay_maximum( s_info.connector, s_info) max_delay_ms = max(max_delay_ms, proj_max_delay) max_delay_steps = math.ceil( max_delay_ms / SpynnakerDataView.get_simulation_time_step_ms()) max_delay_bits = get_n_bits(max_delay_steps) # Find the maximum possible delay n_atom_bits = self.get_n_atom_bits() n_synapse_bits = get_n_bits( self.neuron_impl.get_n_synapse_types()) n_delay_bits = max_ring_buffer_bits - (n_atom_bits + n_synapse_bits) # Pick the smallest between the two, so that not too many bits are used final_n_delay_bits = min(n_delay_bits, max_delay_bits) return 2 ** final_n_delay_bits, max_delay_bits > final_n_delay_bits
[docs] def get_n_atom_bits(self) -> int: """ :rtype: int """ return get_n_bits(min(self.n_atoms, self.get_max_atoms_per_core()))
[docs] def can_generate_on_machine(self) -> bool: """ Determine if the parameters of this vertex can be generated on the machine :rtype: bool """ # Check that all the structs can actually be generated for struct in self.__neuron_impl.structs: if not struct.is_generatable: # If this is false, we can't generate anything on machine return False if (not _all_gen(self.__parameters) or not _all_gen(self.__state_variables)): return False _check_random_dists(self.__parameters) _check_random_dists(self.__state_variables) return True
class _Stats(object): """ Object to keep hold of and process statistics for ring buffer scaling. """ __slots__ = ( "w_scale", "w_scale_sq", "n_synapse_types", "running_totals", "delay_running_totals", "total_weights", "biggest_weight", "rate_stats", "steps_per_second", "default_spikes_per_second", "ring_buffer_sigma") def __init__( self, neuron_impl: AbstractNeuronImpl, default_spikes_per_second: float, ring_buffer_sigma: float): self.w_scale = neuron_impl.get_global_weight_scale() self.w_scale_sq = self.w_scale ** 2 n_synapse_types = neuron_impl.get_n_synapse_types() self.running_totals = [ RunningStats() for _ in range(n_synapse_types)] self.delay_running_totals = [ RunningStats() for _ in range(n_synapse_types)] self.total_weights = numpy.zeros(n_synapse_types) self.biggest_weight = numpy.zeros(n_synapse_types, dtype=numpy.double) self.rate_stats = [RunningStats() for _ in range(n_synapse_types)] self.steps_per_second = ( SpynnakerDataView.get_simulation_time_step_per_s()) self.default_spikes_per_second = default_spikes_per_second self.ring_buffer_sigma = ring_buffer_sigma def add_projection(self, projection: Projection): """ Adds the projection. :param Projection projection: """ # pylint: disable=protected-access s_dynamics = projection._synapse_information.synapse_dynamics if isinstance(s_dynamics, AbstractSupportsSignedWeights): self.__add_signed_projection(projection) else: self.__add_unsigned_projection(projection) def __add_signed_projection(self, proj: Projection): # pylint: disable=protected-access s_info = proj._synapse_information connector = s_info.connector s_dynamics = s_info.synapse_dynamics n_conns = connector.get_n_connections_to_post_vertex_maximum(s_info) d_var = s_dynamics.get_delay_variance(connector, s_info.delays, s_info) s_type_pos = s_dynamics.get_positive_synapse_index(proj) w_mean_pos = s_dynamics.get_mean_positive_weight(proj) w_var_pos = s_dynamics.get_variance_positive_weight(proj) w_max_pos = s_dynamics.get_maximum_positive_weight(proj) self.__add_details( proj, s_type_pos, n_conns, w_mean_pos, w_var_pos, w_max_pos, d_var) s_type_neg = s_dynamics.get_negative_synapse_index(proj) w_mean_neg = -s_dynamics.get_mean_negative_weight(proj) w_var_neg = -s_dynamics.get_variance_negative_weight(proj) w_max_neg = -s_dynamics.get_minimum_negative_weight(proj) self.__add_details( proj, s_type_neg, n_conns, w_mean_neg, w_var_neg, w_max_neg, d_var) def __add_unsigned_projection(self, proj: Projection): # pylint: disable=protected-access s_info = proj._synapse_information s_type = s_info.synapse_type s_dynamics = s_info.synapse_dynamics connector = s_info.connector n_conns = connector.get_n_connections_to_post_vertex_maximum(s_info) w_mean = s_dynamics.get_weight_mean(connector, s_info) w_var = s_dynamics.get_weight_variance( connector, s_info.weights, s_info) w_max = s_dynamics.get_weight_maximum(connector, s_info) d_var = s_dynamics.get_delay_variance(connector, s_info.delays, s_info) self.__add_details(proj, s_type, n_conns, w_mean, w_var, w_max, d_var) def __add_details( self, proj: Projection, s_type: int, n_conns: int, w_mean: float, w_var: float, w_max: float, d_var: float): self.running_totals[s_type].add_items( w_mean * self.w_scale, w_var * self.w_scale_sq, n_conns) self.biggest_weight[s_type] = max( self.biggest_weight[s_type], w_max * self.w_scale) self.delay_running_totals[s_type].add_items(0.0, d_var, n_conns) spikes_per_tick, spikes_per_second = self.__pre_spike_stats(proj) self.rate_stats[s_type].add_items(spikes_per_second, 0, n_conns) self.total_weights[s_type] += spikes_per_tick * (w_max * n_conns) def __pre_spike_stats(self, proj: Projection) -> Tuple[float, float]: spikes_per_tick = max( 1.0, self.default_spikes_per_second / self.steps_per_second) spikes_per_second = self.default_spikes_per_second # pylint: disable=protected-access pre_vertex = proj._projection_edge.pre_vertex if isinstance(pre_vertex, AbstractMaxSpikes): rate = pre_vertex.max_spikes_per_second() if rate > 0: spikes_per_second = rate spikes_per_tick = pre_vertex.max_spikes_per_ts() return spikes_per_tick, spikes_per_second def get_max_weight(self, s_type: int) -> float: """ Get the max weight. :param int s_type: synapse_type :rtype: float """ if self.delay_running_totals[s_type].variance == 0.0: return max(self.total_weights[s_type], self.biggest_weight[s_type]) stats = self.running_totals[s_type] rates = self.rate_stats[s_type] # pylint: disable=protected-access w_max = AbstractPopulationVertex._ring_buffer_expected_upper_bound( stats.mean, stats.standard_deviation, rates.mean, stats.n_items, self.ring_buffer_sigma) w_max = min(w_max, self.total_weights[s_type]) w_max = max(w_max, self.biggest_weight[s_type]) return w_max