Source code for spynnaker.pyNN.models.common.local_only_2d_common

# Copyright (c) 2023 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from math import ceil, log2, floor
from collections import namedtuple, defaultdict
from pacman.model.graphs.application import ApplicationVirtualVertex
from pacman.model.graphs.common.slice import Slice
from pacman.model.graphs.common.mdslice import MDSlice
from spinn_front_end_common.utilities.constants import BYTES_PER_WORD
from spynnaker.pyNN.data.spynnaker_data_view import SpynnakerDataView
from spynnaker.pyNN.utilities.constants import SPIKE_PARTITION_ID
from spynnaker.pyNN.utilities.utility_calls import get_n_bits

#: The number of bits in a short value
BITS_PER_SHORT = 16

#: The number of bits in a byte value
BITS_PER_BYTE = 8

#: The number of bits to represent n_colour_bits
N_COLOUR_BITS_BITS = 3

#: Key info size in bytes
KEY_INFO_SIZE = 4 * BYTES_PER_WORD

#: A source
Source = namedtuple(
    "Source", ["projection", "local_delay", "delay_stage"])


[docs] def get_div_const(value): """ Get the values used to perform fast division by an integer constant :param int value: The value to be divided by :return: The values required encoded as fields of a 32-bit integer :rtype: int """ log_val = int(ceil(log2(value))) log_m_val = ((2 ** log_val) - value) / value m = int(floor((2 ** BITS_PER_SHORT) * log_m_val) + 1) sh1 = min(log_val, 1) sh2 = max(log_val - 1, 0) return ((sh2 << (BITS_PER_SHORT + BITS_PER_BYTE)) + (sh1 << BITS_PER_SHORT) + m)
[docs] def get_delay_for_source(incoming): """ Get the vertex which will send data from a given source projection, along with the delay stage and locally-handled delay value :param Projection incoming: The incoming projection to get the delay from :return: The vertex, the local delay, the delay stage :rtype: ApplicationVertex, int, int """ # pylint: disable=protected-access app_edge = incoming._projection_edge delay = incoming._synapse_information.synapse_dynamics.delay steps = delay * SpynnakerDataView.get_simulation_time_step_per_ms() max_delay = app_edge.post_vertex.splitter.max_support_delay() local_delay = steps % max_delay delay_stage = 0 pre_vertex = app_edge.pre_vertex if steps > max_delay: delay_stage = (steps // max_delay) - 1 pre_vertex = app_edge.delay_edge.pre_vertex return pre_vertex, local_delay, delay_stage
[docs] def get_rinfo_for_spike_source(pre_vertex): """ Get the routing information for the source of a projection in the SPIKE_PARTITION_ID partition. :param ApplicationVertex pre_vertex: The source of incoming data :return: Routing information, core mask, core mask shift :rtype: AppVertexRoutingInfo, int, int """ routing_info = SpynnakerDataView.get_routing_infos() # Find the routing information r_info = routing_info.get_routing_info_from_pre_vertex( pre_vertex, SPIKE_PARTITION_ID) n_cores = len(r_info.vertex.splitter.get_out_going_vertices( SPIKE_PARTITION_ID)) # If there is 1 core, we don't use the core mask # If there is a virtual vertex, these also don't use core masks if n_cores == 1 or isinstance(pre_vertex, ApplicationVirtualVertex): return r_info, 0, 0 mask_shift = r_info.n_bits_atoms core_mask = (2 ** get_n_bits(n_cores)) - 1 return r_info, core_mask, mask_shift
[docs] def get_sources_for_target(app_vertex): """ Get all the application vertex sources that will hit the given application vertex. :param AbstractPopulationVertex app_vertex: The vertex being targeted :return: A dict of source ApplicationVertex to list of source information :rtype: dict(ApplicationVertex, list(Source)) """ sources = defaultdict(list) for incoming in app_vertex.incoming_projections: pre_vertex, local_delay, delay_stage = get_delay_for_source( incoming) source = Source(incoming, local_delay, delay_stage) sources[pre_vertex].append(source) return sources
[docs] def get_first_and_last_slice(pre_vertex): """ Get the first and last slice of an application vertex. :param ApplicationVertex pre_vertex: The source vertex :rtype: tuple(Slice, Slice) or tuple(MDSlice, MDSlice) """ if isinstance(pre_vertex, ApplicationVirtualVertex): if len(pre_vertex.atoms_shape) == 1: full_slice = Slice(0, pre_vertex.n_atoms - 1) return full_slice, full_slice atoms_shape = pre_vertex.atoms_shape full_slice = MDSlice( 0, pre_vertex.n_atoms - 1, atoms_shape, tuple(0 for _ in atoms_shape), atoms_shape) return full_slice, full_slice m_vertices = list(pre_vertex.machine_vertices) return m_vertices[0].vertex_slice, m_vertices[-1].vertex_slice