Source code for spynnaker.pyNN.models.neuron.synapse_io

# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional, Tuple, Union, TYPE_CHECKING

import numpy
from numpy import floating, integer, uint32
from numpy.typing import NDArray

from pacman.model.graphs.application import ApplicationVertex
from pacman.model.graphs.common import Slice

from spinn_front_end_common.utilities.constants import BYTES_PER_WORD

from spynnaker.pyNN.data import SpynnakerDataView
from spynnaker.pyNN.models.neural_projections.connectors import (
    AbstractConnector)
from spynnaker.pyNN.exceptions import SynapseRowTooBigException
from spynnaker.pyNN.models.neuron.synapse_dynamics import (
    AbstractStaticSynapseDynamics, AbstractSDRAMSynapseDynamics,
    AbstractPlasticSynapseDynamics)
from spynnaker.pyNN.models.neuron.synapse_dynamics.types import (
    NUMPY_CONNECTORS_DTYPE, ConnectionsArray)

from .master_pop_table import MasterPopTableAsBinarySearch

if TYPE_CHECKING:
    from typing_extensions import TypeAlias
    from spynnaker.pyNN.models.neural_projections import (
        ProjectionApplicationEdge, SynapseInformation)
    from spynnaker.pyNN.models.neuron.synapse_dynamics import (
        AbstractSynapseDynamics)
    _RowData: TypeAlias = numpy.ndarray  # 2D

_N_HEADER_WORDS = 3
# There are 16 slots, one per time step
_STD_DELAY_SLOTS = 16


[docs] @dataclass(frozen=True) class MaxRowInfo(object): """ Information about the maximums for rows in a synaptic matrix. """ #: Maximum number of synapses in a row of the undelayed matrix. undelayed_max_n_synapses: int #: Maximum number of synapses in a row of the delayed matrix delayed_max_n_synapses: int #: Maximum number of bytes, including headers, in a row of the #: undelayed matrix, or 0 if no synapses. undelayed_max_bytes: int #: Maximum number of bytes, including headers, in a row of the #: delayed matrix, or 0 if no synapses. delayed_max_bytes: int #: Maximum number of words, excluding headers, in a row of the #: undelayed matrix. undelayed_max_words: int #: Maximum number of words, excluding headers, in a row of the #: delayed matrix. delayed_max_words: int
[docs] def get_maximum_delay_supported_in_ms( post_vertex_max_delay_ticks: int) -> float: """ Get the maximum delay supported by the synapse representation before extensions are required. :param int post_vertex_max_delay_ticks: post vertex max delay :return: Maximum delay, in milliseconds. :rtype: float """ return (post_vertex_max_delay_ticks * SpynnakerDataView.get_simulation_time_step_ms())
[docs] def get_max_row_info( synapse_info: SynapseInformation, n_post_atoms: int, n_delay_stages: int, in_edge: ProjectionApplicationEdge) -> MaxRowInfo: """ Get the information about the maximum lengths of delayed and undelayed rows in bytes (including header), words (without header) and number of synapses. :param SynapseInformation synapse_info: The synapse information to get the row data for :param int n_post_atoms: The number of post atoms to get the maximum for :param int n_delay_stages: The number of delay stages on the edge :param ProjectionApplicationEdge in_edge: The incoming edge on which the synapse information is held :rtype: MaxRowInfo :raises SynapseRowTooBigException: If the synapse information can't be represented """ max_delay_supported = get_maximum_delay_supported_in_ms( in_edge.post_vertex.splitter.max_support_delay()) max_delay = max_delay_supported * (n_delay_stages + 1) pad_to_length: Optional[int] = None if isinstance(synapse_info.synapse_dynamics, AbstractSDRAMSynapseDynamics): pad_to_length = synapse_info.synapse_dynamics.pad_to_length # delay point where delay extensions start min_delay_for_delay_extension = ( max_delay_supported + numpy.finfo(numpy.double).tiny).item() # row length for the non-delayed synaptic matrix max_undelayed_n_synapses = synapse_info.connector \ .get_n_connections_from_pre_vertex_maximum( n_post_atoms, synapse_info, 0, max_delay_supported) if pad_to_length is not None: max_undelayed_n_synapses = max( pad_to_length, max_undelayed_n_synapses) # determine the max row length in the delay extension max_delayed_n_synapses = 0 if n_delay_stages > 0: max_delayed_n_synapses = synapse_info.connector \ .get_n_connections_from_pre_vertex_maximum( n_post_atoms, synapse_info, min_delay_for_delay_extension, max_delay) if pad_to_length is not None: max_delayed_n_synapses = max( pad_to_length, max_delayed_n_synapses) # Get the row sizes dynamics = synapse_info.synapse_dynamics if isinstance(dynamics, AbstractStaticSynapseDynamics): undelayed_n_words = dynamics.get_n_words_for_static_connections( max_undelayed_n_synapses) delayed_n_words = dynamics.get_n_words_for_static_connections( max_delayed_n_synapses) elif isinstance(dynamics, AbstractPlasticSynapseDynamics): undelayed_n_words = dynamics.get_n_words_for_plastic_connections( max_undelayed_n_synapses) delayed_n_words = dynamics.get_n_words_for_plastic_connections( max_delayed_n_synapses) else: raise TypeError(f"{dynamics=} has an unexpected type {type(dynamics)}") # Adjust for the allowed row lengths from the population table undelayed_max_n_words = _get_allowed_row_length( undelayed_n_words, dynamics, in_edge, max_undelayed_n_synapses) delayed_max_n_words = _get_allowed_row_length( delayed_n_words, dynamics, in_edge, max_delayed_n_synapses) undelayed_max_bytes = 0 if undelayed_max_n_words > 0: undelayed_max_bytes = ( undelayed_max_n_words + _N_HEADER_WORDS) * BYTES_PER_WORD delayed_max_bytes = 0 if delayed_max_n_words > 0: delayed_max_bytes = ( delayed_max_n_words + _N_HEADER_WORDS) * BYTES_PER_WORD return MaxRowInfo( max_undelayed_n_synapses, max_delayed_n_synapses, undelayed_max_bytes, delayed_max_bytes, undelayed_max_n_words, delayed_max_n_words)
def _get_allowed_row_length( n_words: int, dynamics: AbstractSynapseDynamics, in_edge: ProjectionApplicationEdge, n_synapses: int) -> int: """ Get the allowed row length in words in the population table for a desired row length in words. :param int n_words: The number of words in the row :param AbstractSynapseDynamics dynamics: The synapse dynamics used :param ProjectionApplicationEdge in_edge: The incoming edge :param int n_synapses: The number of synapses for the number of words :raises SynapseRowTooBigException: If the given row is too big; the exception will detail the maximum number of synapses that are supported. """ if n_words == 0: return 0 try: return MasterPopTableAsBinarySearch.get_allowed_row_length(n_words) except SynapseRowTooBigException as e: # Find the number of synapses available for the maximum population # table size, as extracted from the exception if isinstance(dynamics, AbstractSDRAMSynapseDynamics): max_synapses = dynamics.get_max_synapses(e.max_size) raise SynapseRowTooBigException( max_synapses, f"The connection between {in_edge.pre_vertex} and " f"{in_edge.post_vertex} has more synapses ({n_synapses}) than " "can currently be supported on this implementation of PyNN " f"({max_synapses} for this connection type). " "Please reduce the size of the target population, or reduce " "the number of neurons per core.") from e # Ugh! raise
[docs] def get_synapses( connections: ConnectionsArray, synapse_info: SynapseInformation, n_delay_stages: int, n_synapse_types: int, weight_scales: NDArray[floating], app_edge: ProjectionApplicationEdge, max_row_info: MaxRowInfo, gen_undelayed: bool, gen_delayed: bool, max_atoms_per_core: int) -> Tuple[_RowData, _RowData]: """ Get the synapses as an array of words for non-delayed synapses and an array of words for delayed synapses. This is used to prepare information for *deployment to SpiNNaker*. :param ~numpy.ndarray connections: The connections to get the synapses from :param SynapseInformation synapse_info: The synapse information to convert to synapses :param int n_delay_stages: The number of delay stages in total to be represented :param int n_synapse_types: The number of synapse types in total to be represented :param list(float) weight_scales: The scaling of the weights for each synapse type :param ~pacman.model.graphs.application.ApplicationEdge app_edge: The incoming machine edge that the synapses are on :param MaxRowInfo max_row_info: The maximum row information for the synapses :param bool gen_undelayed: Whether to generate undelayed data :param bool gen_delayed: Whether to generate delayed data :param int max_atoms_per_core: The maximum number of atoms on a core :return: (``row_data``, ``delayed_row_data``) where: * ``row_data`` is the undelayed connectivity data arranged into a row per source, each row the same length * ``delayed_row_data`` is the delayed connectivity data arranged into a row per source per delay stage, each row the same length :rtype: tuple(~numpy.ndarray, ~numpy.ndarray) """ # pylint: disable=too-many-arguments # Get delays in timesteps max_delay = app_edge.post_vertex.splitter.max_support_delay() # Convert delays to timesteps connections["delay"] = numpy.rint( connections["delay"] * SpynnakerDataView.get_simulation_time_step_per_ms()) # Scale weights if not synapse_info.synapse_type_from_dynamics: connections["weight"] = (connections["weight"] * weight_scales[ synapse_info.synapse_type]) # Split the connections up based on the delays if max_delay is not None: plastic_delay_mask = (connections["delay"] <= max_delay) undelayed_connections = connections[numpy.where(plastic_delay_mask)] delayed_connections = connections[numpy.where(~plastic_delay_mask)] else: undelayed_connections = connections delayed_connections = numpy.zeros( 0, dtype=AbstractConnector.NUMPY_SYNAPSES_DTYPE) # Get the data for the connections row_data = numpy.zeros(0, dtype=uint32) if gen_undelayed and max_row_info.undelayed_max_n_synapses: # Get which row each connection will go into undelayed_row_indices = undelayed_connections["source"] row_data = _get_row_data( undelayed_connections, undelayed_row_indices, app_edge.pre_vertex.n_atoms, n_synapse_types, synapse_info.synapse_dynamics, max_row_info.undelayed_max_n_synapses, max_row_info.undelayed_max_words, max_atoms_per_core) del undelayed_row_indices del undelayed_connections # Get the data for the delayed connections delayed_row_data = numpy.zeros(0, dtype=uint32) if gen_delayed and max_row_info.delayed_max_n_synapses: # Get the delay stages and which row each delayed connection will # go into stages = numpy.floor((numpy.round( delayed_connections["delay"] - 1.0)) / max_delay).astype(uint32) delayed_cores = delayed_connections["source"] // max_atoms_per_core local_sources = delayed_connections["source"] - ( delayed_cores * max_atoms_per_core) delay_atoms_per_core = max_atoms_per_core * n_delay_stages delayed_pre_cores = delayed_cores * delay_atoms_per_core local_index = ((stages - 1) * max_atoms_per_core) + local_sources delayed_row_indices = delayed_pre_cores + local_index delayed_connections["delay"] -= max_delay * stages # Get the data delayed_row_data = _get_row_data( delayed_connections, delayed_row_indices, app_edge.pre_vertex.n_atoms * n_delay_stages, n_synapse_types, synapse_info.synapse_dynamics, max_row_info.delayed_max_n_synapses, max_row_info.delayed_max_words, max_atoms_per_core) del delayed_row_indices del delayed_connections return row_data, delayed_row_data
def _get_row_data( connections: ConnectionsArray, row_indices: NDArray[numpy.integer], n_rows: int, n_synapse_types: int, synapse_dynamics: AbstractSynapseDynamics, max_row_n_synapses: int, max_row_n_words: int, max_atoms_per_core: int) -> _RowData: """ :param ~numpy.ndarray connections: The connections to convert; the dtype is AbstractConnector.NUMPY_SYNAPSES_DTYPE :param ~numpy.ndarray row_indices: The row into which each connection should go; same length as connections :param int n_rows: The total number of rows :param int n_synapse_types: The number of synapse types allowed :param AbstractSynapseDynamics synapse_dynamics: The synapse dynamics of the synapses :param int max_row_n_synapses: The maximum number of synapses in a row :param int max_row_n_words: The maximum number of words in a row :param int max_atoms_per_core: The maximum number of atoms per core :rtype: ~numpy.ndarray """ # pylint: disable=too-many-arguments fp_data: Union[NDArray[uint32], List[NDArray[uint32]]] pp_data: Union[NDArray[uint32], List[NDArray[uint32]]] if isinstance(synapse_dynamics, AbstractStaticSynapseDynamics): # Get the static data ff_data, ff_size = synapse_dynamics.get_static_synaptic_data( connections, row_indices, n_rows, n_synapse_types, max_row_n_synapses, max_atoms_per_core) # Blank the plastic data fp_data = numpy.zeros((n_rows, 0), dtype=uint32) pp_data = numpy.zeros((n_rows, 0), dtype=uint32) fp_size: NDArray[uint32] fp_size = numpy.zeros((n_rows, 1), dtype=uint32) pp_size: NDArray[uint32] pp_size = numpy.zeros((n_rows, 1), dtype=uint32) else: assert isinstance(synapse_dynamics, AbstractPlasticSynapseDynamics) # Blank the static data ff_data = [numpy.zeros(0, dtype=uint32) for _ in range(n_rows)] ff_size = numpy.zeros((n_rows, 1), dtype=uint32) # Get the plastic data fp_data, pp_data, fp_size, pp_size = \ synapse_dynamics.get_plastic_synaptic_data( connections, row_indices, n_rows, n_synapse_types, max_row_n_synapses, max_atoms_per_core) # Add some padding row_lengths = [ pp_data[i].size + fp_data[i].size + ff_data[i].size for i in range(n_rows)] padding = [ numpy.zeros(max_row_n_words - row_length, dtype=uint32) for row_length in row_lengths] # Join the bits into rows rows = [numpy.concatenate(items) for items in zip( pp_size, pp_data, ff_size, fp_size, ff_data, fp_data, padding)] row_data = numpy.concatenate(rows) # Return the data return row_data
[docs] def convert_to_connections( synapse_info: SynapseInformation, post_vertex_slice: Slice, n_pre_atoms: int, max_row_length: int, n_synapse_types: int, weight_scales: NDArray[floating], data: Union[bytes, NDArray, None], delayed: bool, post_vertex_max_delay_ticks: int, max_atoms_per_core: int) -> ConnectionsArray: """ Read the synapses for a given projection synapse information object out of the given data and convert to connection data :param SynapseInformation synapse_info: The synapse information of the synapses :param int n_pre_atoms: The number of atoms in the pre-vertex :param ~pacman.model.graphs.common.Slice post_vertex_slice: The slice of the target neurons of the synapses in the data :param int max_row_length: The length of each row in the data :param int n_synapse_types: The number of synapse types in total :param list(float) weight_scales: The weight scaling of each synapse type :param bytearray data: The raw data containing the synapses :param bool delayed: True if the data should be considered delayed :param int post_vertex_max_delay_ticks: The maximum delayed ticks supported from post vertex :param int max_atoms_per_core: The maximum number of atoms on a core :return: The connections read from the data; the dtype is :py:const:`~.NUMPY_CONNECTORS_DTYPE` :rtype: ~numpy.ndarray """ # If there is no data, return nothing if data is None or len(data) == 0: return numpy.zeros(0, dtype=NUMPY_CONNECTORS_DTYPE) # Translate the data into rows row_data = numpy.frombuffer(data, dtype="<u4").reshape( -1, (max_row_length + _N_HEADER_WORDS)) dynamics = synapse_info.synapse_dynamics if isinstance(dynamics, AbstractStaticSynapseDynamics): # Read static data connections = _read_static_data( dynamics, n_pre_atoms, n_synapse_types, row_data, delayed, post_vertex_max_delay_ticks, max_atoms_per_core) elif isinstance(dynamics, AbstractPlasticSynapseDynamics): # Read plastic data connections = _read_plastic_data( dynamics, n_pre_atoms, n_synapse_types, row_data, delayed, post_vertex_max_delay_ticks, max_atoms_per_core) else: raise TypeError(f"{dynamics=} has unexpected type {type(dynamics)}") # There might still be no connections if the row was all padding if not connections.size: return numpy.zeros(0, dtype=NUMPY_CONNECTORS_DTYPE) # Convert 0 delays to max delays connections["delay"][connections["delay"] == 0] = ( post_vertex_max_delay_ticks) connections = __convert_sources_and_targets( connections, synapse_info.pre_vertex, post_vertex_slice) # Return the connections after appropriate scaling return _rescale_connections(connections, weight_scales, synapse_info)
[docs] def read_all_synapses( data: NDArray[uint32], delayed_data: NDArray[uint32], synapse_info: SynapseInformation, n_synapse_types: int, weight_scales: NDArray[floating], post_vertex_slice: Slice, n_pre_atoms: int, post_vertex_max_delay_ticks: int, max_row_info: MaxRowInfo, max_atoms_per_core: int ) -> ConnectionsArray: """ Read the synapses for a given projection synapse information object out of the given delayed and undelayed data. :param bytearray data: The raw data containing the undelayed synapses :param bytearray delayed_data: The raw data containing the delayed synapses :param SynapseInformation synapse_info: The synapse info that generated the synapses :param int n_synapse_types: The total number of synapse types available :param list(float) weight_scales: A weight scale for each synapse type :param int n_pre_atoms: The number of atoms in the pre-vertex :param ~pacman.model.graphs.common.Slice post_vertex_slice: The slice of the post-vertex to read the synapses for :param int post_vertex_max_delay_ticks: max delayed ticks supported from post vertex :param MaxRowInfo max_row_info: The maximum information for each of the rows :param int max_atoms_per_core: The maximum number of atoms on a core :return: The connections read from the data; the dtype is :py:const:`~.NUMPY_CONNECTORS_DTYPE` :rtype: ~numpy.ndarray """ connections: List[ConnectionsArray] = [] max_row_length = max_row_info.undelayed_max_words delayed_max_row_length = max_row_info.delayed_max_words connections.append(convert_to_connections( synapse_info, post_vertex_slice, n_pre_atoms, max_row_length, n_synapse_types, weight_scales, data, False, post_vertex_max_delay_ticks, max_atoms_per_core)) connections.append(convert_to_connections( synapse_info, post_vertex_slice, n_pre_atoms, delayed_max_row_length, n_synapse_types, weight_scales, delayed_data, True, post_vertex_max_delay_ticks, max_atoms_per_core)) # Join the connections into a single list and return it return numpy.concatenate(connections)
def _parse_static_data( row_data: _RowData, dynamics: AbstractStaticSynapseDynamics) -> Tuple[ NDArray[numpy.integer], List[_RowData]]: """ Parse static synaptic data. :param ~numpy.ndarray row_data: The raw row data :param AbstractStaticSynapseDynamics dynamics: The synapse dynamics that can decode the rows :return: A tuple of the recorded length of each row and the row data organised into rows :rtype: tuple(~numpy.ndarray, list(~numpy.ndarray)) """ n_rows = row_data.shape[0] ff_size = row_data[:, 1] ff_words = dynamics.get_n_static_words_per_row(ff_size) ff_start = _N_HEADER_WORDS ff_end = ff_start + ff_words return ( ff_size, [row_data[row, ff_start:ff_end[row]] for row in range(n_rows)]) def _read_static_data( dynamics: AbstractStaticSynapseDynamics, n_pre_atoms: int, n_synapse_types: int, row_data: _RowData, delayed: bool, post_vertex_max_delay_ticks: int, max_atoms_per_core: int) -> ConnectionsArray: """ Read static data from row data. :param AbstractStaticSynapseDynamics dynamics: The synapse dynamics that generated the data :param int n_pre_atoms: The number of atoms on the pre-vertex :param int n_synapse_types: The number of synapse types available :param ~numpy.ndarray row_data: The raw row data to read :param bool delayed: True if data should be considered delayed :param int post_vertex_max_delay_ticks: post vertex delay maximum :param int max_atoms_per_core: The maximum number of atoms on a core :return: the connections read with dtype :py:const:`~.NUMPY_CONNECTORS_DTYPE` :rtype: list(~numpy.ndarray) """ if row_data is None or not row_data.size: return numpy.zeros(0, dtype=NUMPY_CONNECTORS_DTYPE) ff_size, ff_data = _parse_static_data(row_data, dynamics) connections = dynamics.read_static_synaptic_data( n_synapse_types, ff_size, ff_data, max_atoms_per_core) if delayed: n_synapses = dynamics.get_n_synapses_in_rows(ff_size) connections = _convert_delayed_data( n_synapses, n_pre_atoms, connections, post_vertex_max_delay_ticks) return connections def _parse_plastic_data( row_data: _RowData, dynamics: AbstractPlasticSynapseDynamics) -> Tuple[ NDArray[uint32], List[_RowData], NDArray[uint32], List[_RowData]]: """ Parse plastic synapses from raw row data. :param ~numpy.ndarray row_data: The raw data to parse :param AbstractPlasticSynapseDynamics dynamics: The dynamics that generated the data :return: A tuple of the recorded length of the plastic-plastic data in each row; the plastic-plastic data organised into rows; the recorded length of the static-plastic data in each row; and the static-plastic data organised into rows :rtype: tuple(~numpy.ndarray, list(~numpy.ndarray), ~numpy.ndarray, list(~numpy.ndarray)) """ n_rows = row_data.shape[0] pp_size = row_data[:, 0] pp_words = dynamics.get_n_plastic_plastic_words_per_row(pp_size) fp_size = row_data[numpy.arange(n_rows), pp_words + 2] fp_words = dynamics.get_n_fixed_plastic_words_per_row(fp_size) fp_start = pp_size + _N_HEADER_WORDS fp_end = fp_start + fp_words row_ids = range(n_rows) return ( pp_size, [row_data[row, 1:pp_words[row] + 1] for row in row_ids], fp_size, [row_data[row, fp_start[row]:fp_end[row]] for row in row_ids]) def _read_plastic_data( dynamics: AbstractPlasticSynapseDynamics, n_pre_atoms: int, n_synapse_types: int, row_data: Optional[_RowData], delayed: bool, post_vertex_max_delay_ticks: int, max_atoms_per_core: int) -> ConnectionsArray: """ Read plastic data from raw data. :param AbstractStaticSynapseDynamics dynamics: The synapse dynamics that generated the data :param int n_pre_atoms: The number of atoms in the pre-vertex :param int n_synapse_types: The number of synapse types available :param ~numpy.ndarray row_data: The raw row data to read :param bool delayed: True if data should be considered delayed :param int post_vertex_max_delay_ticks: post vertex delay maximum :param int max_atoms_per_core: The maximum number of atoms on a core :return: the connections read with dtype :py:const:`~.NUMPY_CONNECTORS_DTYPE` :rtype: list(~numpy.ndarray) """ if row_data is None or not row_data.size: return numpy.zeros(0, dtype=NUMPY_CONNECTORS_DTYPE) pp_size, pp_data, fp_size, fp_data = _parse_plastic_data( row_data, dynamics) connections = dynamics.read_plastic_synaptic_data( n_synapse_types, pp_size, pp_data, fp_size, fp_data, max_atoms_per_core) if delayed: n_synapses = dynamics.get_n_synapses_in_rows(pp_size, fp_size) connections = _convert_delayed_data( n_synapses, n_pre_atoms, connections, post_vertex_max_delay_ticks) return connections def _rescale_connections( connections: ConnectionsArray, weight_scales: NDArray[floating], synapse_info: SynapseInformation) -> ConnectionsArray: """ Scale the connection data into machine values. :param ~numpy.ndarray connections: The connections to be rescaled :param list(float) weight_scales: The weight scale of each synapse type :param SynapseInformation synapse_info: The synapse information of the connections """ # Return the delays values to milliseconds connections["delay"] /= SpynnakerDataView.get_simulation_time_step_per_ms() # Undo the weight scaling connections["weight"] /= weight_scales[synapse_info.synapse_type] return connections def _convert_delayed_data( n_synapses: NDArray[integer], n_pre_atoms: int, delayed_connections: ConnectionsArray, post_vertex_max_delay_ticks: int) -> ConnectionsArray: """ Take the delayed_connections and convert the source ids and delay values back to global values. :param ~numpy.ndarray n_synapses: The number of synapses in each row :param int n_pre_atoms: number of atoms in the pre-vertex :param ~numpy.ndarray delayed_connections: The connections to convert of dtype :py:const:`~.NUMPY_CONNECTORS_DTYPE` :param int post_vertex_max_delay_ticks: post vertex delay maximum :return: The converted connection with the same dtype :rtype: ~numpy.ndarray """ # Work out the delay stage of each row; rows are the all the rows # from the first delay stage, then all from the second stage and so on synapse_ids = range(len(n_synapses)) row_stage = numpy.array([ i // n_pre_atoms for i in synapse_ids], dtype=uint32) # Work out the delay for each stage row_min_delay = (row_stage + 1) * post_vertex_max_delay_ticks # Repeat the delay for all connections in the same row connection_min_delay = numpy.concatenate([ numpy.repeat(row_min_delay[i], n_synapses[i]) for i in synapse_ids]) # Repeat the "extra" source id for all connections in the same row; # this converts the row id back to a source neuron id connection_source_extra = numpy.concatenate([ numpy.repeat( row_stage[i] * uint32(n_pre_atoms), n_synapses[i]) for i in synapse_ids]) # Do the conversions delayed_connections["source"] -= connection_source_extra delayed_connections["delay"] += connection_min_delay return delayed_connections def __convert_sources_and_targets( connections: ConnectionsArray, pre_vertex: ApplicationVertex, post_vertex_slice: Slice) -> ConnectionsArray: connections["source"] = pre_vertex.get_raster_ordered_indices( connections["source"]) connections["target"] = post_vertex_slice.get_raster_indices( connections["target"]) return connections