# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import math
import numpy
import scipy.stats
from pyNN.space import Grid2D, Grid3D
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinn_utilities.ranged import RangeDictionary, RangedList
from spinn_utilities.config_holder import get_config_int
from pacman.model.partitioner_interfaces import LegacyPartitionerAPI
from pacman.model.resources import ConstantSDRAM
from spinn_front_end_common.interface.buffer_management import (
recording_utilities)
from spinn_front_end_common.utilities.constants import (
SYSTEM_BYTES_REQUIREMENT)
from spinn_front_end_common.interface.profiling import profile_utils
from spynnaker.pyNN.data import SpynnakerDataView
from spynnaker.pyNN.models.common import MultiSpikeRecorder
from spynnaker.pyNN.utilities.utility_calls import create_mars_kiss_seeds
from spynnaker.pyNN.models.abstract_models import SupportsStructure
from spynnaker.pyNN.models.common import PopulationApplicationVertex
from spynnaker.pyNN.models.common import ParameterHolder
from spynnaker.pyNN.utilities.buffer_data_type import BufferDataType
from .spike_source_poisson_machine_vertex import (
SpikeSourcePoissonMachineVertex, _flatten, get_rates_bytes,
get_sdram_edge_params_bytes, get_expander_rates_bytes, get_params_bytes)
logger = FormatAdapter(logging.getLogger(__name__))
# uint32_t n_rates; uint32_t index
PARAMS_WORDS_PER_NEURON = 2
# start_scaled, end_scaled, next_scaled, is_fast_source, exp_minus_lambda,
# sqrt_lambda, isi_val, time_to_spike
PARAMS_WORDS_PER_RATE = 8
SLOW_RATE_PER_TICK_CUTOFF = (
SpikeSourcePoissonMachineVertex.SLOW_RATE_PER_TICK_CUTOFF)
OVERFLOW_TIMESTEPS_FOR_SDRAM = 5
# The microseconds per timestep will be divided by this to get the max offset
_MAX_OFFSET_DENOMINATOR = 10
# Indicates a duration that never ends
DURATION_FOREVER = 0xFFFFFFFF
class SpikeSourcePoissonVertex(
PopulationApplicationVertex,
LegacyPartitionerAPI, SupportsStructure):
"""
A Poisson Spike source object.
"""
__slots__ = [
"__last_rate_read_time",
"__model",
"__model_name",
"__n_atoms",
"__rng",
"__seed",
"__spike_recorder",
"__kiss_seed", # dict indexed by vertex slice
"__max_rate",
"__max_n_rates",
"__n_profile_samples",
"__data",
"__is_variable_rate",
"__outgoing_projections",
"__incoming_control_edge",
"__structure",
"__allowed_parameters",
"__n_colour_bits"]
SPIKE_RECORDING_REGION_ID = 0
def __init__(
self, n_neurons, label, seed, max_atoms_per_core, model,
rate=None, start=None, duration=None, rates=None, starts=None,
durations=None, max_rate=None, splitter=None, n_colour_bits=None):
"""
:param int n_neurons:
:param str label:
:param float seed:
:param int max_atoms_per_core:
:param ~spynnaker.pyNN.models.spike_source.SpikeSourcePoisson model:
:param float rate:
:param int start:
:param int duration:
:param iterable(float) rates:
:param iterable(int) starts:
:param iterable(int) durations:
:param float max_rate:
:param splitter:
:type splitter:
~pacman.model.partitioner_splitters.AbstractSplitterCommon or None
:param int n_colour_bits:
"""
# pylint: disable=too-many-arguments
super().__init__(label, max_atoms_per_core, splitter)
# atoms params
self.__n_atoms = self.round_n_atoms(n_neurons, "n_neurons")
self.__model_name = "SpikeSourcePoisson"
self.__model = model
self.__seed = seed
self.__kiss_seed = dict()
self.__spike_recorder = MultiSpikeRecorder()
# Check for disallowed pairs of parameters
if (rates is not None) and (rate is not None):
raise ValueError("Exactly one of rate and rates can be specified")
if (starts is not None) and (start is not None):
raise ValueError(
"Exactly one of start and starts can be specified")
if (durations is not None) and (duration is not None):
raise ValueError(
"Exactly one of duration and durations can be specified")
if rate is None and rates is None:
raise ValueError("One of rate or rates must be specified")
# Normalise the parameters
self.__is_variable_rate = rates is not None
if rates is None:
if hasattr(rate, "__len__"):
# Single rate per neuron for whole simulation
rates = [numpy.array([r]) for r in rate]
else:
# Single rate for all neurons for whole simulation
rates = numpy.array([rate])
elif hasattr(rates[0], "__len__"):
# Convert each list to numpy array
rates = [numpy.array(r) for r in rates]
else:
rates = numpy.array(rates)
if starts is None and start is not None:
if hasattr(start, "__len__"):
starts = [numpy.array([s]) for s in start]
elif start is None:
starts = numpy.array([0])
else:
starts = numpy.array([start])
elif starts is not None and hasattr(starts[0], "__len__"):
starts = [numpy.array(s) for s in starts]
elif starts is not None:
starts = numpy.array(starts)
if durations is None and duration is not None:
if hasattr(duration, "__len__"):
durations = [numpy.array([d]) for d in duration]
else:
durations = numpy.array([duration])
elif durations is not None and hasattr(durations[0], "__len__"):
durations = [numpy.array(d) for d in durations]
elif durations is not None:
durations = numpy.array(durations)
else:
if hasattr(rates[0], "__len__"):
durations = [numpy.array([DURATION_FOREVER for r in _rate])
for _rate in rates]
else:
durations = numpy.array([DURATION_FOREVER for _rate in rates])
# Check that there is either one list for all neurons,
# or one per neuron
if hasattr(rates[0], "__len__") and len(rates) != n_neurons:
raise ValueError(
"Must specify one rate for all neurons or one per neuron")
if (starts is not None and hasattr(starts[0], "__len__") and
len(starts) != n_neurons):
raise ValueError(
"Must specify one start for all neurons or one per neuron")
if (durations is not None and hasattr(durations[0], "__len__") and
len(durations) != n_neurons):
raise ValueError(
"Must specify one duration for all neurons or one per neuron")
# Check that for each rate there is a start and duration if needed
# TODO: Could be more efficient for case where parameters are not one
# per neuron
for i in range(n_neurons):
rate_set = rates
if hasattr(rates[0], "__len__"):
rate_set = rates[i]
if not hasattr(rate_set, "__len__"):
raise ValueError("Multiple rates must be a list")
if starts is None and len(rate_set) > 1:
raise ValueError(
"When multiple rates are specified,"
" each must have a start")
elif starts is not None:
start_set = starts
if hasattr(starts[0], "__len__"):
start_set = starts[i]
if len(start_set) != len(rate_set):
raise ValueError("Each rate must have a start")
if any(s is None for s in start_set):
raise ValueError("Start must not be None")
if durations is not None:
duration_set = durations
if hasattr(durations[0], "__len__"):
duration_set = durations[i]
if len(duration_set) != len(rate_set):
raise ValueError("Each rate must have its own duration")
self.__data = RangeDictionary(n_neurons)
rates_list = RangedList(
n_neurons, rates,
use_list_as_value=not hasattr(rates[0], "__len__"))
self.__data["rates"] = rates_list
self.__data["starts"] = RangedList(
n_neurons, starts,
use_list_as_value=not hasattr(starts[0], "__len__"))
self.__data["durations"] = RangedList(
n_neurons, durations,
use_list_as_value=not hasattr(durations[0], "__len__"))
self.__rng = numpy.random.RandomState(seed)
self.__n_profile_samples = get_config_int(
"Reports", "n_profile_samples")
# Prepare for recording, and to get spikes
self.__spike_recorder = MultiSpikeRecorder()
all_rates = list(_flatten(self.__data["rates"]))
self.__max_rate = max_rate
if max_rate is None and len(all_rates):
self.__max_rate = numpy.amax(all_rates)
elif max_rate is None:
self.__max_rate = 0
self.__max_n_rates = max(len(r) for r in rates_list)
# Keep track of how many outgoing projections exist
self.__outgoing_projections = list()
self.__incoming_control_edge = None
self.__structure = None
if self.__is_variable_rate:
self.__allowed_parameters = {"rates", "durations", "starts"}
else:
self.__allowed_parameters = {"rate", "duration", "start"}
self.__last_rate_read_time = None
self.__n_colour_bits = n_colour_bits
if self.__n_colour_bits is None:
self.__n_colour_bits = get_config_int(
"Simulation", "n_colour_bits")
[docs]
@overrides(SupportsStructure.set_structure)
def set_structure(self, structure):
self.__structure = structure
@property
def rates(self):
"""
Get the rates.
:rtype: spinn_utilities.ranged.RangedList
"""
return self.__data["rates"]
[docs]
def add_outgoing_projection(self, projection):
"""
Add an outgoing projection from this vertex.
:param Projection projection: The projection to add
"""
self.__outgoing_projections.append(projection)
@property
def outgoing_projections(self):
"""
The projections outgoing from this vertex.
:rtype: list(Projection)
"""
return self.__outgoing_projections
@property
def n_profile_samples(self):
return self.__n_profile_samples
@property
def time_to_spike(self):
return self.__data["time_to_spike"]
def __read_parameters_now(self):
# If we already read the parameters at this time, don't do it again
current_time = SpynnakerDataView().get_current_run_time_ms()
if self.__last_rate_read_time == current_time:
return
self.__last_rate_read_time = current_time
for m_vertex in self.machine_vertices:
placement = SpynnakerDataView.get_placement_of_vertex(m_vertex)
m_vertex.read_parameters_from_machine(placement)
def __read_parameter(self, name, selector):
if (SpynnakerDataView.is_ran_last() and
SpynnakerDataView.has_transceiver()):
self.__read_parameters_now()
return self.__data[self.__full_name(name)].get_values(selector)
def __full_name(self, name):
if self.__is_variable_rate:
return name
return f"{name}s"
[docs]
@overrides(PopulationApplicationVertex.get_parameter_values)
def get_parameter_values(self, names, selector=None):
self._check_parameters(names, self.__allowed_parameters)
return ParameterHolder(names, self.__read_parameter, selector)
[docs]
@overrides(PopulationApplicationVertex.set_parameter_values)
def set_parameter_values(self, name, value, selector=None):
self._check_parameters(name, self.__allowed_parameters)
if self.__is_variable_rate:
raise KeyError(f"Cannot set the {name} of a variable rate Poisson")
# If we have just run, we need to read parameters to avoid overwrite
if SpynnakerDataView().is_ran_last():
self.__read_parameters_now()
for m_vertex in self.machine_vertices:
m_vertex.set_rate_changed()
# Must be parameter without the s
fixed_name = f"{name}s"
if hasattr(value, "__len__"):
# Single start per neuron for whole simulation
self.__data[fixed_name].set_value_by_selector(
selector, [numpy.array([s]) for s in value])
else:
# Single start for all neurons for whole simulation
self.__data[fixed_name].set_value_by_selector(
selector, numpy.array([value]), use_list_as_value=True)
[docs]
@overrides(PopulationApplicationVertex.get_parameters)
def get_parameters(self):
return self.__allowed_parameters
[docs]
@overrides(PopulationApplicationVertex.get_units)
def get_units(self, name):
if name == "spikes":
return ""
if name == "rates" or name == "rates":
return "Hz"
if (name == "duration" or name == "start" or name == "durations" or
name == "starts"):
return "ms"
raise KeyError(f"Units for {name} unknown")
[docs]
@overrides(PopulationApplicationVertex.get_recordable_variables)
def get_recordable_variables(self):
return ["spikes"]
[docs]
def get_buffer_data_type(self, name):
if name == "spikes":
return BufferDataType.MULTI_SPIKES
raise KeyError(f"Cannot record {name}")
[docs]
def get_recording_region(self, name):
if name != "spikes":
raise KeyError(f"Cannot record {name}")
return 0
[docs]
@overrides(PopulationApplicationVertex.set_recording)
def set_recording(self, name, sampling_interval=None, indices=None):
if name != "spikes":
raise KeyError(f"Cannot record {name}")
if sampling_interval is not None:
logger.warning("Sampling interval currently not supported for "
"SpikeSourcePoisson so being ignored")
if indices is not None:
logger.warning("Indices currently not supported for "
"SpikeSourcePoisson so being ignored")
if not self.__spike_recorder.record:
SpynnakerDataView.set_requires_mapping()
self.__spike_recorder.record = True
[docs]
@overrides(PopulationApplicationVertex.get_recording_variables)
def get_recording_variables(self):
if self.__spike_recorder.record:
return ["spikes"]
return []
[docs]
@overrides(PopulationApplicationVertex.set_not_recording)
def set_not_recording(self, name, indices=None):
if name != "spikes":
raise KeyError(f"Cannot record {name}")
if indices is not None:
logger.warning("Indices currently not supported for "
"SpikeSourceArray so being ignored")
self.__spike_recorder.record = False
[docs]
@overrides(PopulationApplicationVertex.get_sampling_interval_ms)
def get_sampling_interval_ms(self, name):
if name != "spikes":
raise KeyError(f"Cannot record {name}")
return SpynnakerDataView.get_simulation_time_step_us()
[docs]
@overrides(PopulationApplicationVertex.get_data_type)
def get_data_type(self, name):
if name != "spikes":
raise KeyError(f"Cannot record {name}")
return None
[docs]
def get_neurons_recording(self, name, vertex_slice):
if name != "spikes":
raise KeyError(f"Cannot record {name}")
return vertex_slice.get_raster_ids()
[docs]
def max_spikes_per_ts(self):
ts_per_second = SpynnakerDataView.get_simulation_time_step_per_s()
if float(self.__max_rate) / ts_per_second < \
SLOW_RATE_PER_TICK_CUTOFF:
return 1
# Experiments show at 1000 this result is typically higher than actual
chance_ts = 1000
max_spikes_per_ts = scipy.stats.poisson.ppf(
1.0 - (1.0 / float(chance_ts)),
float(self.__max_rate) / ts_per_second)
return int(math.ceil(max_spikes_per_ts)) + 1.0
[docs]
def get_recording_sdram_usage(self, vertex_slice):
"""
:param ~pacman.model.graphs.common.Slice vertex_slice:
"""
variable_sdram = self.__spike_recorder.get_sdram_usage_in_bytes(
vertex_slice.n_atoms, self.max_spikes_per_ts())
constant_sdram = ConstantSDRAM(
variable_sdram.per_timestep * OVERFLOW_TIMESTEPS_FOR_SDRAM)
return variable_sdram + constant_sdram
[docs]
@overrides(LegacyPartitionerAPI.get_sdram_used_by_atoms)
def get_sdram_used_by_atoms(self, vertex_slice):
"""
:param ~pacman.model.graphs.common.Slice vertex_slice:
"""
poisson_params_sz = get_params_bytes(vertex_slice.n_atoms)
poisson_rates_sz = get_rates_bytes(
vertex_slice.n_atoms, vertex_slice.n_atoms * self.__max_n_rates)
poisson_expander_sz = get_expander_rates_bytes(
vertex_slice.n_atoms, vertex_slice.n_atoms * self.__max_n_rates)
sdram_sz = get_sdram_edge_params_bytes(vertex_slice)
other = ConstantSDRAM(
SYSTEM_BYTES_REQUIREMENT +
SpikeSourcePoissonMachineVertex.get_provenance_data_size(0) +
poisson_params_sz + poisson_rates_sz + poisson_expander_sz +
recording_utilities.get_recording_header_size(1) +
recording_utilities.get_recording_data_constant_size(1) +
profile_utils.get_profile_region_size(self.__n_profile_samples) +
sdram_sz)
recording = self.get_recording_sdram_usage(vertex_slice)
return recording + other
@property
def n_atoms(self):
return self.__n_atoms
@property
@overrides(PopulationApplicationVertex.atoms_shape)
def atoms_shape(self):
if isinstance(self.__structure, (Grid2D, Grid3D)):
return self.__structure.calculate_size(self.__n_atoms)
return super(SpikeSourcePoissonVertex, self).atoms_shape
[docs]
@overrides(LegacyPartitionerAPI.create_machine_vertex)
def create_machine_vertex(self, vertex_slice, sdram, label=None):
return SpikeSourcePoissonMachineVertex(
sdram, self.__spike_recorder.record,
label, self, vertex_slice)
@property
def max_rate(self):
return self.__max_rate
@property
def max_n_rates(self):
return self.__max_n_rates
@property
def seed(self):
return self.__seed
@seed.setter
def seed(self, seed):
self.__seed = seed
self.__kiss_seed = dict()
self.__rng = numpy.random.RandomState(seed)
[docs]
def kiss_seed(self, vertex_slice):
if vertex_slice not in self.__kiss_seed:
self.__kiss_seed[vertex_slice] = create_mars_kiss_seeds(
self.__rng)
return self.__kiss_seed[vertex_slice]
[docs]
def update_kiss_seed(self, vertex_slice, seed):
"""
Updates a KISS seed from the machine.
:param ~pacman.model.graphs.common.Slice vertex_slice:
the vertex slice to update seed of
:param list(int) seed: the seed
"""
self.__kiss_seed[vertex_slice] = seed
[docs]
def clear_spike_recording(self):
buffer_manager = SpynnakerDataView.get_buffer_manager()
for machine_vertex in self.machine_vertices:
placement = SpynnakerDataView.get_placement_of_vertex(
machine_vertex)
buffer_manager.clear_recorded_data(
placement.x, placement.y, placement.p,
SpikeSourcePoissonVertex.SPIKE_RECORDING_REGION_ID)
[docs]
def describe(self):
"""
Return a human-readable description of the cell or synapse type.
The output may be customised by specifying a different template
together with an associated template engine
(see :py:mod:`pyNN.descriptions`).
If template is `None`, then a dictionary containing the template
context will be returned.
:rtype: dict(str, ...)
"""
parameters = self.get_parameter_values(self.__model.default_parameters)
context = {
"name": self.__model_name,
"default_parameters": self.__model.default_parameters,
"default_initial_values": self.__model.default_parameters,
"parameters": parameters,
}
return context
[docs]
def set_live_poisson_control_edge(self, edge):
if self.__incoming_control_edge is not None:
raise ValueError(
"The Poisson generator can only be controlled by one source")
self.__incoming_control_edge = edge
@property
def incoming_control_edge(self):
return self.__incoming_control_edge
@property
def data(self):
return self.__data
@property
def n_colour_bits(self):
return self.__n_colour_bits