Source code for spynnaker.pyNN.models.neuron.local_only.local_only_convolution

# Copyright (c) 2021 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from math import ceil
from typing import (
    Dict, Iterable, List, Tuple, cast, TYPE_CHECKING)

import numpy
from numpy import floating, uint32
from numpy.typing import NDArray

from spinn_utilities.overrides import overrides

from pacman.model.graphs.application import ApplicationVertex

from spinn_front_end_common.interface.ds import (
    DataType, DataSpecificationGenerator)
from spinn_front_end_common.utilities.constants import (
    BYTES_PER_SHORT, BYTES_PER_WORD)

from spynnaker.pyNN.exceptions import SynapticConfigurationException
from spynnaker.pyNN.models.neural_projections.connectors import (
    ConvolutionConnector, AbstractConnector)
from spynnaker.pyNN.models.neuron.synapse_dynamics import (
    AbstractSupportsSignedWeights)
from spynnaker.pyNN.models.common import PopulationApplicationVertex
from spynnaker.pyNN.types import Weight_Delay_In_Types
from spynnaker.pyNN.models.common.local_only_2d_common import (
    get_div_const, get_rinfo_for_spike_source, get_sources_for_target,
    BITS_PER_SHORT, N_COLOUR_BITS_BITS, KEY_INFO_SIZE,
    get_first_and_last_slice, Source)
from .abstract_local_only import AbstractLocalOnly

if TYPE_CHECKING:
    from spynnaker.pyNN.models.neuron.abstract_population_vertex import (
        AbstractPopulationVertex)
    from spynnaker.pyNN.models.projection import Projection
    from spynnaker.pyNN.models.neuron import (
        PopulationMachineLocalOnlyCombinedVertex)
    from spynnaker.pyNN.models.neuron.synapse_dynamics import (
        AbstractSynapseDynamics)


#: Size of convolution config main bytes
CONV_CONFIG_SIZE = (6 * BYTES_PER_SHORT) + (4 * BYTES_PER_WORD)

#: Size of source information
SOURCE_INFO_SIZE = KEY_INFO_SIZE + (6 * BYTES_PER_WORD)


class LocalOnlyConvolution(AbstractLocalOnly, AbstractSupportsSignedWeights):
    """
    A convolution synapse dynamics that can process spikes with only DTCM.
    """

    __slots__ = (
        "__cached_sources",
    )

    def __init__(self, delay: Weight_Delay_In_Types = None):

        """
        :param float delay:
            The delay used in the connection; by default 1 time step
        """
        super().__init__(delay)

        # Store the sources to avoid recalculation
        self.__cached_sources: Dict[ApplicationVertex, Dict[
                Tuple[PopulationApplicationVertex, str],
                List[Source]]] = dict()

    @property
    def _delay(self) -> float:
        # Guaranteed by check in init
        return cast(float, self.delay)

[docs] @overrides(AbstractLocalOnly.merge) def merge(self, synapse_dynamics: AbstractSynapseDynamics ) -> LocalOnlyConvolution: if not isinstance(synapse_dynamics, LocalOnlyConvolution): raise SynapticConfigurationException( "All targets of this Population must have a synapse_type of" " Convolution") return synapse_dynamics
[docs] @overrides(AbstractLocalOnly.get_vertex_executable_suffix) def get_vertex_executable_suffix(self) -> str: return "_conv"
@property @overrides(AbstractLocalOnly.changes_during_run) def changes_during_run(self) -> bool: return False
[docs] @overrides(AbstractLocalOnly.get_parameters_usage_in_bytes) def get_parameters_usage_in_bytes( self, n_atoms: int, incoming_projections: Iterable[Projection]) -> int: # pylint: disable=protected-access n_bytes = 0 kernel_bytes = 0 connectors_seen = set() edges_seen = set() for incoming in incoming_projections: s_info = incoming._synapse_information if not isinstance(s_info.connector, ConvolutionConnector): raise SynapticConfigurationException( "Only ConvolutionConnector can be used with a synapse type" " of Convolution") app_edge = incoming._projection_edge if app_edge not in edges_seen: edges_seen.add(app_edge) n_bytes += SOURCE_INFO_SIZE if s_info.connector not in connectors_seen: connectors_seen.add(s_info.connector) kernel_bytes += s_info.connector.kernel_n_bytes n_bytes += s_info.connector.parameters_n_bytes if kernel_bytes % BYTES_PER_WORD != 0: kernel_bytes += BYTES_PER_SHORT return CONV_CONFIG_SIZE + n_bytes + kernel_bytes
[docs] @overrides(AbstractLocalOnly.write_parameters) def write_parameters( self, spec: DataSpecificationGenerator, region: int, machine_vertex: PopulationMachineLocalOnlyCombinedVertex, weight_scales: NDArray[floating]): # pylint: disable=unexpected-keyword-arg, protected-access # Get incoming sources for this vertex app_vertex = machine_vertex._pop_vertex sources = self.__get_sources_for_target(app_vertex) size = self.get_parameters_usage_in_bytes( machine_vertex.vertex_slice.n_atoms, app_vertex.incoming_projections) spec.reserve_memory_region(region, size, label="LocalOnlyConvolution") spec.switch_write_focus(region) # Get spec for each incoming source connector_weight_index: Dict[AbstractConnector, int] = dict() next_weight_index: int = 0 source_data = list() connector_data: List[NDArray[uint32]] = list() weight_data = list() for (pre_vertex, part_id), source_infos in sources.items(): # Add connectors as needed first_conn_index = len(connector_data) for source in source_infos: # pylint: disable=protected-access conn = cast( ConvolutionConnector, source.projection._synapse_information.connector) app_edge = source.projection._projection_edge # Work out whether the connector needs a new weight index if conn in connector_weight_index: weight_index = connector_weight_index[conn] else: weight_index = next_weight_index connector_weight_index[conn] = weight_index next_weight_index += conn.kernel_n_weights weight_data.append(conn.get_encoded_kernel_weights( app_edge, weight_scales)) connector_data.append(conn.get_local_only_data( app_edge, source.local_delay, source.delay_stage, weight_index)) # Get the source routing information r_info, core_mask, mask_shift = get_rinfo_for_spike_source( pre_vertex, part_id) # Get the width / height per core / last_core first_slice, last_slice = get_first_and_last_slice(pre_vertex) width_per_core = first_slice.shape[0] height_per_core = first_slice.shape[1] width_on_last_core = last_slice.shape[0] height_on_last_core = last_slice.shape[1] # Get cores per width / height pre_shape = list(pre_vertex.atoms_shape) cores_per_width = int(ceil(pre_shape[0] / width_per_core)) cores_per_height = int(ceil(pre_shape[1] / height_per_core)) # Add the key and mask... source_data.extend([r_info.key, r_info.mask]) # ... start connector index, n_colour_bits, count of connectors ... source_data.append( (len(source_infos) << BITS_PER_SHORT) + (pre_vertex.n_colour_bits << (BITS_PER_SHORT - N_COLOUR_BITS_BITS)) + first_conn_index) # ... core mask, mask shift ... source_data.append((mask_shift << BITS_PER_SHORT) + core_mask) # ... height / width per core ... source_data.append( (width_per_core << BITS_PER_SHORT) + height_per_core) # ... height / width last core ... source_data.append( (width_on_last_core << BITS_PER_SHORT) + height_on_last_core) # ... cores per height / width ... source_data.append( (cores_per_width << BITS_PER_SHORT) + cores_per_height) # ... 1 / width per core ... source_data.append(get_div_const(width_per_core)) # ... 1 / width last core ... source_data.append(get_div_const(width_on_last_core)) # ... 1 / cores_per_width source_data.append(get_div_const(cores_per_width)) if next_weight_index % 2 != 0: weight_data.append(numpy.array([0], dtype="int16")) # Write the common spec post_slice = machine_vertex.vertex_slice post_start = numpy.array(post_slice.start) post_shape = numpy.array(post_slice.shape) post_end = (post_start + post_shape) - 1 spec.write_value(post_start[1], data_type=DataType.INT16) spec.write_value(post_start[0], data_type=DataType.INT16) spec.write_value(post_end[1], data_type=DataType.INT16) spec.write_value(post_end[0], data_type=DataType.INT16) spec.write_value(post_shape[1], data_type=DataType.INT16) spec.write_value(post_shape[0], data_type=DataType.INT16) spec.write_value(len(sources)) spec.write_value(len(connector_data)) spec.write_value(next_weight_index) # Write the data # pylint: disable=unexpected-keyword-arg spec.write_array(numpy.array(source_data, dtype="uint32")) spec.write_array(numpy.concatenate(connector_data, dtype="uint32")) spec.write_array( numpy.concatenate(weight_data, dtype="int16").view("uint32"))
def __get_sources_for_target( self, app_vertex: AbstractPopulationVertex) -> Dict[ Tuple[PopulationApplicationVertex, str], List[Source]]: """ Get all the application vertex sources that will hit the given application vertex. :param AbstractPopulationVertex app_vertex: The vertex being targeted :return: A dict of source PopulationApplicationVertex to list of source information :rtype: dict(tuple(PopulationApplicationVertex, str), list(Source)) """ sources = self.__cached_sources.get(app_vertex) if sources is None: sources = get_sources_for_target(app_vertex) self.__cached_sources[app_vertex] = sources return sources @staticmethod def __connector(projection: Projection) -> ConvolutionConnector: # pylint: disable=protected-access return cast(ConvolutionConnector, projection._synapse_information.connector) @staticmethod def __get_synapse_type(proj: Projection, target: str) -> int: edge = proj._projection_edge # pylint: disable=protected-access synapse_type = edge.post_vertex.get_synapse_id_by_target(target) # Checked during connection validation, assumed constant assert synapse_type is not None return synapse_type
[docs] @overrides(AbstractSupportsSignedWeights.get_positive_synapse_index) def get_positive_synapse_index( self, incoming_projection: Projection) -> int: return self.__get_synapse_type( incoming_projection, self.__connector(incoming_projection).positive_receptor_type)
[docs] @overrides(AbstractSupportsSignedWeights.get_negative_synapse_index) def get_negative_synapse_index( self, incoming_projection: Projection) -> int: return self.__get_synapse_type( incoming_projection, self.__connector(incoming_projection).negative_receptor_type)
[docs] @overrides(AbstractSupportsSignedWeights.get_maximum_positive_weight) def get_maximum_positive_weight( self, incoming_projection: Projection) -> float: conn = self.__connector(incoming_projection) # We know the connector doesn't care about the argument # conn.kernel_weights known to be an array of floats max_weight = cast(float, numpy.amax(conn.kernel_weights)) return max_weight if max_weight > 0 else 0
[docs] @overrides(AbstractSupportsSignedWeights.get_minimum_negative_weight) def get_minimum_negative_weight( self, incoming_projection: Projection) -> float: conn = self.__connector(incoming_projection) # This is different because the connector happens to support this # conn.kernel_weights known to be an array of floats min_weight = cast(float, numpy.amin(conn.kernel_weights)) return min_weight if min_weight < 0 else 0
[docs] @overrides(AbstractSupportsSignedWeights.get_mean_positive_weight) def get_mean_positive_weight( self, incoming_projection: Projection) -> float: conn = self.__connector(incoming_projection) pos_weights = conn.kernel_weights[conn.kernel_weights > 0] if len(pos_weights) == 0: return 0 return numpy.mean(pos_weights)
[docs] @overrides(AbstractSupportsSignedWeights.get_mean_negative_weight) def get_mean_negative_weight( self, incoming_projection: Projection) -> float: conn = self.__connector(incoming_projection) neg_weights = conn.kernel_weights[conn.kernel_weights < 0] if len(neg_weights) == 0: return 0 return numpy.mean(neg_weights)
[docs] @overrides(AbstractSupportsSignedWeights.get_variance_positive_weight) def get_variance_positive_weight( self, incoming_projection: Projection) -> float: conn = self.__connector(incoming_projection) pos_weights = conn.kernel_weights[conn.kernel_weights > 0] if len(pos_weights) == 0: return 0 return numpy.var(pos_weights)
[docs] @overrides(AbstractSupportsSignedWeights.get_variance_negative_weight) def get_variance_negative_weight( self, incoming_projection: Projection) -> float: conn = self.__connector(incoming_projection) neg_weights = conn.kernel_weights[conn.kernel_weights < 0] if len(neg_weights) == 0: return 0 return numpy.var(neg_weights)