Source code for spynnaker.pyNN.models.neuron.synaptic_matrix_app

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import List, Optional, Tuple, TYPE_CHECKING

import numpy
from numpy import floating, uint32
from numpy.typing import NDArray

from pacman.model.graphs.common import Slice
from pacman.model.placements import Placement

from spinn_front_end_common.utilities.helpful_functions import (
    locate_memory_region_for_placement)
from spinn_front_end_common.utilities.constants import BYTES_PER_WORD
from spinn_front_end_common.interface.buffer_management import BufferManager

from spynnaker.pyNN.data import SpynnakerDataView
from spynnaker.pyNN.models.neuron.synapse_dynamics import (
    AbstractSynapseDynamicsStructural)
from spynnaker.pyNN.models.neural_projections.connectors import (
    AbstractGenerateConnectorOnHost)

from .generator_data import GeneratorData
from .synapse_io import read_all_synapses, convert_to_connections, get_synapses

if TYPE_CHECKING:
    from spynnaker.pyNN.models.neural_projections import (
        ProjectionApplicationEdge, SynapseInformation)
    from spynnaker.pyNN.models.neuron.synaptic_matrices import AppKeyInfo
    from .master_pop_table import MasterPopTableAsBinarySearch


[docs] class SynapticMatrixApp(object): """ The synaptic matrix (and delay matrix if applicable) for an incoming application edge. """ # pylint: disable=unused-private-member # https://github.com/SpiNNakerManchester/sPyNNaker/issues/1201 __slots__ = ( # The synaptic info that these matrices are for "__synapse_info", # The application edge that these matrices are for "__app_edge", # The number of synapse types incoming "__n_synapse_types", # The ID of the synaptic matrix region "__synaptic_matrix_region", # The maximum row length of delayed and undelayed matrices "__max_row_info", # The maximum summed size of the synaptic matrices "__all_syn_block_sz", # The application-level key information for the incoming edge "__app_key_info", # The application-level key information for the incoming delay edge "__delay_app_key_info", # The weight scaling used by each synapse type "__weight_scales", # The expected size in bytes of a synaptic matrix "__matrix_size", # The expected size in bytes of a delayed synaptic matrix "__delay_matrix_size", # The offset of the undelayed synaptic matrix in the region "__syn_mat_offset", # The offset of the delayed synaptic matrix in the region "__delay_syn_mat_offset", # The index of the synaptic matrix within the master population table "__index", # The index of the delayed synaptic matrix within the master population # table "__delay_index", # The number of bits to use for neuron IDs "__max_atoms_per_core", # The download index for the undelayed synaptic matrix "__download_index", # The download index for the delayed synaptic matrix "__download_delay_index") def __init__( self, synapse_info: SynapseInformation, app_edge: ProjectionApplicationEdge, n_synapse_types: int, synaptic_matrix_region: int, max_atoms_per_core: int, all_syn_block_sz: int, app_key_info: Optional[AppKeyInfo], delay_app_key_info: Optional[AppKeyInfo], weight_scales: NDArray[floating]): """ :param SynapseInformation synapse_info: The projection synapse information :param ProjectionApplicationEdge app_edge: The projection application edge :param int n_synapse_types: The number of synapse types accepted :param int synaptic_matrix_region: The region where synaptic matrices are stored :param int all_syn_block_sz: The space available for all synaptic matrices on the core :param AppKeyInfo app_key_info: Application-level routing key information for undelayed vertices :param AppKeyInfo delay_app_key_info: Application-level routing key information for delayed vertices :param list(float) weight_scales: Weight scale for each synapse edge """ self.__synapse_info = synapse_info self.__app_edge = app_edge self.__n_synapse_types = n_synapse_types self.__synaptic_matrix_region = synaptic_matrix_region self.__max_atoms_per_core = max_atoms_per_core # Calculate the max row info for this edge self.__max_row_info = self.__app_edge.post_vertex.get_max_row_info( synapse_info, max_atoms_per_core, app_edge) self.__all_syn_block_sz = all_syn_block_sz self.__app_key_info = app_key_info self.__delay_app_key_info = delay_app_key_info self.__weight_scales = weight_scales self.__matrix_size = ( self.__app_edge.pre_vertex.n_atoms * self.__max_row_info.undelayed_max_bytes) self.__delay_matrix_size = ( self.__app_edge.pre_vertex.n_atoms * self.__app_edge.n_delay_stages * self.__max_row_info.delayed_max_bytes) # These are computed during synaptic generation self.__syn_mat_offset: Optional[int] = None self.__delay_syn_mat_offset: Optional[int] = None self.__index: Optional[int] = None self.__delay_index: Optional[int] = None self.__download_index: Optional[int] = None self.__download_delay_index: Optional[int] = None @property def gen_size(self) -> int: """ Size of a block. :rtype: int """ max_row_length = max( self.__max_row_info.undelayed_max_bytes, self.__max_row_info.delayed_max_bytes) return (max_row_length * self.__app_edge.pre_vertex.n_atoms * (self.__app_edge.n_delay_stages + 1))
[docs] def reserve_matrices( self, block_addr: int, pop_table: MasterPopTableAsBinarySearch) -> int: """ Allocate the master pop table entries for the blocks. :param int block_addr: Where the allocation can start from :param MasterPopTableAsBinarySearch pop_table: The master population table :return: Where the next allocation can start from :rtype: int """ block_addr = self.__reserve_app_matrix(block_addr, pop_table) block_addr = self.__reserve_delay_app_matrix(block_addr, pop_table) return block_addr
def __reserve_app_matrix( self, block_addr: int, pop_table: MasterPopTableAsBinarySearch) -> int: """ Reserve space for the matrix in the master pop table. :param int block_addr: The address in the synaptic matrix region to start writing at :param MasterPopTableAsBinarySearch pop_table: The master population table :return: The updated block address :rtype: int """ # If there is no routing info, don't write anything if self.__app_key_info is None: return block_addr # If we have routing info but no synapses, write an invalid entry if self.__max_row_info.undelayed_max_n_synapses == 0: self.__index = pop_table.add_invalid_application_entry( self.__app_key_info.key_and_mask, self.__app_key_info.core_mask, self.__app_key_info.core_shift, self.__app_key_info.n_neurons, self.__app_key_info.n_colour_bits) return block_addr # Write a matrix for the whole application vertex block_addr = pop_table.get_next_allowed_address(block_addr) self.__index = pop_table.add_application_entry( block_addr, self.__max_row_info.undelayed_max_words, self.__app_key_info.key_and_mask, self.__app_key_info.core_mask, self.__app_key_info.core_shift, self.__app_key_info.n_neurons, self.__app_key_info.n_colour_bits) self.__syn_mat_offset = block_addr block_addr = self.__next_addr(block_addr, self.__matrix_size) return block_addr def __reserve_delay_app_matrix( self, block_addr: int, pop_table: MasterPopTableAsBinarySearch) -> int: """ Reserve space in the master pop table for a delayed matrix. :param int block_addr: The address in the synaptic matrix region to start writing at :param MasterPopTableAsBinarySearch pop_table: The master population table :return: The updated block address :rtype: int """ # If there is no routing info, don't write anything if self.__delay_app_key_info is None: return block_addr # If we have routing info but no synapses, write an invalid entry if self.__max_row_info.delayed_max_n_synapses == 0: self.__delay_index = pop_table.add_invalid_application_entry( self.__delay_app_key_info.key_and_mask, self.__delay_app_key_info.core_mask, self.__delay_app_key_info.core_shift, self.__delay_app_key_info.n_neurons, self.__delay_app_key_info.n_colour_bits) return block_addr # Write a matrix for the whole application vertex block_addr = pop_table.get_next_allowed_address(block_addr) self.__delay_index = pop_table.add_application_entry( block_addr, self.__max_row_info.delayed_max_words, self.__delay_app_key_info.key_and_mask, self.__delay_app_key_info.core_mask, self.__delay_app_key_info.core_shift, self.__delay_app_key_info.n_neurons, self.__delay_app_key_info.n_colour_bits) self.__delay_syn_mat_offset = block_addr block_addr = self.__next_addr(block_addr, self.__delay_matrix_size) return block_addr def __next_addr(self, block_addr: int, size: int) -> int: """ Get the next address after a block, checking it is in range. :param int block_addr: The address of the start of the block :param int size: The size of the block in bytes :param int max_addr: The maximum allowed address :return: The updated address :rtype: int :raises ValueError: If the updated address is out of range """ next_addr = block_addr + size if next_addr > self.__all_syn_block_sz: raise ValueError( "Too much synaptic memory has been written: " f"{next_addr} of {self.__all_syn_block_sz} ") return next_addr
[docs] def append_matrix( self, post_vertex_slice: Slice, data_to_write: List[NDArray[uint32]], block_addr: int) -> int: """ Append a synaptic matrix from be written from host. :param ~pacman.model.graphs.common.Slice post_vertex_slice: The slice of the post-vertex the matrix is for :param list data_to_write: List to append the data to write to :param int block_addr: The amount of data written so far :return: The amount of data written after this data has been written :rtype: int """ row_data, delay_row_data = self.__get_row_data(post_vertex_slice) self.__update_connection_holders( row_data, delay_row_data, post_vertex_slice) if self.__syn_mat_offset is not None: block_addr = self.__get_padding( data_to_write, self.__syn_mat_offset, block_addr) data_to_write.append(row_data) block_addr += self.__matrix_size if self.__delay_syn_mat_offset is not None: block_addr = self.__get_padding( data_to_write, self.__delay_syn_mat_offset, block_addr) data_to_write.append(delay_row_data) block_addr += self.__delay_matrix_size return block_addr
def __get_padding( self, data_to_write: List[NDArray[uint32]], expected_offset: int, block_addr: int) -> int: if expected_offset < block_addr: raise ValueError( "The block address is already beyond where is expected!:" f" {expected_offset} expected, {block_addr} found.") if expected_offset > block_addr: padding = (expected_offset - block_addr) // BYTES_PER_WORD data_to_write.append(numpy.zeros(padding, dtype=uint32)) return block_addr + (padding * BYTES_PER_WORD) return block_addr def __get_row_data( self, post_vertex_slice: Slice) -> Tuple[NDArray, NDArray]: """ Generate the row data for a synaptic matrix from the description. :return: The data and the delayed data :rtype: tuple(~numpy.ndarray or None, ~numpy.ndarray or None) """ # Get the actual connections post_slices =\ self.__app_edge.post_vertex.splitter.get_in_coming_slices() connector = self.__synapse_info.connector assert isinstance(connector, AbstractGenerateConnectorOnHost) connections = connector.create_synaptic_block( post_slices, post_vertex_slice, self.__synapse_info.synapse_type, self.__synapse_info) # Get the row data; note that we use the availability of the routing # keys to decide if we should actually generate any data; this is # because a single edge might have been filtered (row_data, delayed_row_data) = get_synapses( connections, self.__synapse_info, self.__app_edge.n_delay_stages, self.__n_synapse_types, self.__weight_scales, self.__app_edge, self.__max_row_info, self.__app_key_info is not None, self.__delay_app_key_info is not None, self.__max_atoms_per_core) # Set connections for structural plasticity if isinstance(self.__synapse_info.synapse_dynamics, AbstractSynapseDynamicsStructural): self.__synapse_info.synapse_dynamics.set_connections( connections, post_vertex_slice, self.__app_edge, self.__synapse_info) if self.__app_edge.delay_edge is None and len(delayed_row_data) != 0: raise ValueError( "Found delayed source IDs but no delay " f"edge for {self.__app_edge.label}") return row_data, delayed_row_data def __update_connection_holders( self, data: NDArray[uint32], delayed_data: NDArray[uint32], post_vertex_slice: Slice) -> None: """ Fill in connections in the connection holders as they are created. :param ~numpy.ndarray data: The row data created :param ~numpy.ndarray delayed_data: The delayed row data created :param ~pacman.model.graphs.machine.MachineVertex m_vertex: The machine edge the connections are for """ post_splitter = self.__app_edge.post_vertex.splitter post_vertex_max_delay_ticks = post_splitter.max_support_delay() for conn_holder in self.__synapse_info.pre_run_connection_holders: conn_holder.add_connections(read_all_synapses( data, delayed_data, self.__synapse_info, self.__n_synapse_types, self.__weight_scales, post_vertex_slice, self.__app_edge.pre_vertex.n_atoms, post_vertex_max_delay_ticks, self.__max_row_info, self.__max_atoms_per_core))
[docs] def get_generator_data(self) -> GeneratorData: """ Prepare to write a matrix using an on-chip generator. :return: The data to generate with :rtype: GeneratorData """ max_pre_atoms_per_core = min( self.__app_edge.pre_vertex.n_atoms, self.__app_edge.pre_vertex.get_max_atoms_per_core()) return GeneratorData( self.__syn_mat_offset, self.__delay_syn_mat_offset, self.__app_edge, self.__synapse_info, self.__max_row_info, max_pre_atoms_per_core, self.__max_atoms_per_core)
[docs] def read_generated_connection_holders(self, placement: Placement) -> None: """ Read any pre-run connection holders after data has been generated. :param ~pacman.model.placements.Placement placement: Where the matrix is on the machine """ if self.__synapse_info.pre_run_connection_holders: connections = self.get_connections(placement) if connections: conns = numpy.concatenate(connections) for holder in self.__synapse_info.pre_run_connection_holders: holder.add_connections(conns)
[docs] def get_connections(self, placement: Placement) -> List[NDArray]: """ Read connections from an address on the machine. :param ~pacman.model.placements.Placement placement: Where the matrix is on the machine :return: A list of arrays of connections, each with dtype :py:const:`~.NUMPY_CONNECTORS_DTYPE` :rtype: list(~numpy.ndarray) """ connections = list() synapses_address: Optional[int] = None buffers: Optional[BufferManager] = None if (self.__download_index is None and self.__download_delay_index is None): synapses_address = locate_memory_region_for_placement( placement, self.__synaptic_matrix_region) else: buffers = SpynnakerDataView().get_buffer_manager() splitter = self.__app_edge.post_vertex.splitter vertex_slice = placement.vertex.vertex_slice if self.__syn_mat_offset is not None: if self.__download_index is not None: assert buffers is not None block, _ = buffers.get_download( placement, self.__download_index) else: assert synapses_address is not None block = self.__get_block(placement, synapses_address) connections.append(convert_to_connections( self.__synapse_info, vertex_slice, self.__app_edge.pre_vertex.n_atoms, self.__max_row_info.undelayed_max_words, self.__n_synapse_types, self.__weight_scales, block, False, splitter.max_support_delay(), self.__max_atoms_per_core)) if self.__delay_syn_mat_offset is not None: if self.__download_delay_index is not None: assert buffers is not None block, _ = buffers.get_download( placement, self.__download_delay_index) else: assert synapses_address is not None block = self.__get_delayed_block(placement, synapses_address) connections.append(convert_to_connections( self.__synapse_info, vertex_slice, self.__app_edge.pre_vertex.n_atoms, self.__max_row_info.delayed_max_words, self.__n_synapse_types, self.__weight_scales, block, True, splitter.max_support_delay(), self.__max_atoms_per_core)) return connections
def __get_block( self, placement: Placement, synapses_address: int) -> bytes: """ Get a block of data for undelayed synapses. :param Placement placement: Where the matrix is on the machine :param int synapses_address: The base address of the synaptic matrix region :return: The raw data from the synaptic matrix :rtype: bytes """ assert self.__syn_mat_offset is not None address = self.__syn_mat_offset + synapses_address return SpynnakerDataView.read_memory( placement.x, placement.y, address, self.__matrix_size) def __get_delayed_block( self, placement: Placement, synapses_address: int) -> bytes: """ Get a block of data for delayed synapses. :param ~pacman.model.placements.Placement placement: Where the matrix is on the machine :param int synapses_address: The base address of the synaptic matrix region :return: The raw data from the delayed synaptic matrix :rtype: bytes """ assert self.__delay_syn_mat_offset is not None address = self.__delay_syn_mat_offset + synapses_address return SpynnakerDataView.read_memory( placement.x, placement.y, address, self.__delay_matrix_size)
[docs] def get_index(self) -> int: """ Get the index in the master population table of the matrix. :param ~pacman.model.graphs.machine.MachineVertex m_vertex: The source machine vertex :rtype: int """ if self.__index is None: raise RuntimeError("master pop table space not yet reserved") return self.__index
[docs] def get_download_regions( self, placement: Placement, start_index: int) -> List[Tuple[int, int, int]]: """ Get the data regions that should be downloaded when the simulation pauses. :param ~pacman.model.placements.Placement placement: The placement of the vertex :param int start_index: The index to use for the first region to download :return: A list of tuples of (index, address, size) to download """ if not self.__synapse_info.download_on_pause: return [] synapses_address = locate_memory_region_for_placement( placement, self.__synaptic_matrix_region) regions = list() if self.__syn_mat_offset is not None: regions.append((start_index, synapses_address + self.__syn_mat_offset, self.__matrix_size)) self.__download_index = start_index start_index += 1 if self.__delay_syn_mat_offset is not None: regions.append((start_index, synapses_address + self.__delay_syn_mat_offset, self.__delay_matrix_size)) self.__download_delay_index = start_index return regions