Source code for spynnaker.pyNN.models.neural_projections.connectors.one_to_one_connector

# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import math
from typing import Optional, Sequence, Tuple, TYPE_CHECKING

import numpy
from numpy import integer, floating, uint32
from numpy.typing import NDArray

from pyNN.random import RandomDistribution

from spinn_utilities.overrides import overrides
from spinn_utilities.safe_eval import SafeEval

from pacman.model.graphs.application import ApplicationVertex
from pacman.model.graphs.machine import MachineVertex
from pacman.model.graphs.common import Slice

from spinn_front_end_common.utilities.exceptions import ConfigurationException

from .abstract_connector import AbstractConnector
from .abstract_generate_connector_on_machine import (
    AbstractGenerateConnectorOnMachine, ConnectorIDs)
from .abstract_generate_connector_on_host import (
    AbstractGenerateConnectorOnHost)

if TYPE_CHECKING:
    from spynnaker.pyNN.models.neural_projections import SynapseInformation

_expr_context = SafeEval(
    math, numpy, numpy.arccos, numpy.arcsin, numpy.arctan, numpy.arctan2,
    numpy.ceil, numpy.cos, numpy.cosh, numpy.exp, numpy.fabs, numpy.floor,
    numpy.fmod, numpy.hypot, numpy.ldexp, numpy.log, numpy.log10, numpy.modf,
    numpy.power, numpy.sin, numpy.sinh, numpy.sqrt, numpy.tan, numpy.tanh,
    numpy.maximum, numpy.minimum, e=numpy.e, pi=numpy.pi)


class OneToOneConnector(AbstractGenerateConnectorOnMachine,
                        AbstractGenerateConnectorOnHost):
    """
    Where the pre- and postsynaptic populations have the same size,
    connect cell *i* in the presynaptic population to cell *i* in
    the postsynaptic population, for all *i*.
    """
    __slots__ = ()

    def __init__(self, safe: bool = True, callback: None = None,
                 verbose: bool = False) -> None:
        """
        :param bool safe:
            If ``True``, check that weights and delays have valid values.
            If ``False``, this check is skipped.
        :param callable callback:
            if given, a callable that display a progress bar on the terminal.

            .. note::
                Not supported by sPyNNaker.
        :param bool verbose:
            Whether to output extra information about the connectivity to a
            CSV file
        """
        # pylint: disable=useless-super-delegation
        super().__init__(safe, callback, verbose)

[docs] @overrides(AbstractConnector.get_delay_maximum) def get_delay_maximum(self, synapse_info: SynapseInformation) -> float: return self._get_delay_maximum( synapse_info.delays, max(synapse_info.n_pre_neurons, synapse_info.n_post_neurons), synapse_info)
[docs] @overrides(AbstractConnector.get_delay_minimum) def get_delay_minimum(self, synapse_info: SynapseInformation) -> float: return self._get_delay_minimum( synapse_info.delays, max(synapse_info.n_pre_neurons, synapse_info.n_post_neurons), synapse_info)
[docs] @overrides(AbstractConnector.get_n_connections_from_pre_vertex_maximum) def get_n_connections_from_pre_vertex_maximum( self, n_post_atoms: int, synapse_info: SynapseInformation, min_delay: Optional[float] = None, max_delay: Optional[float] = None) -> int: delays = synapse_info.delays if min_delay is None or max_delay is None or delays is None: return 1 if isinstance(delays, str): d = self._get_distances(delays, synapse_info) delays = _expr_context.eval(delays, d=d) if ((min_delay <= min(delays) <= max_delay) and ( min_delay <= max(delays) <= max_delay)): return 1 else: return 0 if isinstance(delays, (int, float, integer, floating)): return int(min_delay <= delays <= max_delay) if isinstance(delays, RandomDistribution): return 1 slice_min_delay = min(delays) slice_max_delay = max(delays) return int((min_delay <= slice_max_delay <= max_delay) or (min_delay <= slice_min_delay <= max_delay))
[docs] @overrides(AbstractConnector.get_n_connections_to_post_vertex_maximum) def get_n_connections_to_post_vertex_maximum( self, synapse_info: SynapseInformation) -> int: return 1
[docs] @overrides(AbstractConnector.get_weight_maximum) def get_weight_maximum(self, synapse_info: SynapseInformation) -> float: return self._get_weight_maximum( synapse_info.weights, max(synapse_info.n_pre_neurons, synapse_info.n_post_neurons), synapse_info)
[docs] @overrides(AbstractGenerateConnectorOnHost.create_synaptic_block) def create_synaptic_block( self, post_slices: Sequence[Slice], post_vertex_slice: Slice, synapse_type: int, synapse_info: SynapseInformation) -> NDArray: # Get each pre_vertex id for each post_vertex id post_atoms = post_vertex_slice.get_raster_ids() pre_atoms = numpy.array(post_atoms) # Filter out things where there isn't a cross over atom_filter = numpy.ones(len(post_atoms), dtype=numpy.bool_) if synapse_info.prepop_is_view or synapse_info.postpop_is_view: # If a view, we only keep things that are in the view if synapse_info.prepop_is_view: # pylint: disable=protected-access pre_lo, pre_hi = synapse_info.pre_population._view_range atom_filter &= (pre_atoms <= pre_hi & pre_atoms >= pre_lo) if synapse_info.postpop_is_view: # pylint: disable=protected-access post_lo, post_hi = synapse_info.post_population._view_range atom_filter &= (post_atoms <= post_hi & post_atoms >= post_lo) else: # If not a view we only keep things that are in the pre-population atom_filter &= (pre_atoms <= synapse_info.pre_population.size) post_atoms = post_atoms[atom_filter] pre_atoms = pre_atoms[atom_filter] # Convert to the correct coordinates post_atoms = post_vertex_slice.get_relative_indices(post_atoms) pre_atoms = synapse_info.pre_vertex.get_key_ordered_indices(pre_atoms) n_connections = len(post_atoms) block = numpy.zeros(n_connections, dtype=self.NUMPY_SYNAPSES_DTYPE) block["source"] = pre_atoms block["target"] = post_atoms block["weight"] = self._generate_weights( block["source"], block["target"], n_connections, post_vertex_slice, synapse_info) block["delay"] = self._generate_delays( block["source"], block["target"], n_connections, post_vertex_slice, synapse_info) block["synapse_type"] = synapse_type return block
def __repr__(self) -> str: return "OneToOneConnector()" @property @overrides(AbstractGenerateConnectorOnMachine.gen_connector_id) def gen_connector_id(self) -> int: return ConnectorIDs.ONE_TO_ONE_CONNECTOR.value
[docs] @overrides(AbstractGenerateConnectorOnMachine.gen_connector_params) def gen_connector_params( self, synapse_info: SynapseInformation) -> NDArray[uint32]: return numpy.array([], dtype="uint32")
@property @overrides( AbstractGenerateConnectorOnMachine.gen_connector_params_size_in_bytes) def gen_connector_params_size_in_bytes(self) -> int: return 0
[docs] @overrides(AbstractGenerateConnectorOnMachine.get_connected_vertices) def get_connected_vertices( self, s_info: SynapseInformation, source_vertex: ApplicationVertex, target_vertex: ApplicationVertex) -> Sequence[ Tuple[MachineVertex, Sequence[MachineVertex]]]: src_vtxs = source_vertex.splitter.get_out_going_vertices( s_info.partition_id) tgt_vtxs = target_vertex.splitter.get_in_coming_vertices( s_info.partition_id) # If doing a view, we must be single dimensional, so use old method if (len(s_info.pre_vertex.atoms_shape) == 1 and len(s_info.post_vertex.atoms_shape) == 1): pre_lo = 0 pre_hi = source_vertex.n_atoms - 1 post_lo = 0 post_hi = target_vertex.n_atoms - 1 if s_info.prepop_is_view: # pylint: disable=protected-access pre_lo, pre_hi = s_info.pre_population._view_range if s_info.postpop_is_view: # pylint: disable=protected-access post_lo, post_hi = s_info.post_population._view_range return [(t_vert, [s_vert for s_vert in src_vtxs if self.__connects( s_vert, pre_lo, pre_hi, t_vert, post_lo, post_hi)]) for t_vert in tgt_vtxs] if s_info.prepop_is_view or s_info.postpop_is_view: # Check again here in case the rules change elsewhere raise ConfigurationException( "The OneToOneConnector does not support PopulationView " "connections between vertices with more than 1 dimension") # Check for cross over of pre- and post- rasters, as that is how the # connector works return [(t_vert, [s_vert for s_vert in src_vtxs if any(numpy.isin( s_vert.vertex_slice.get_raster_ids(), t_vert.vertex_slice.get_raster_ids()))]) for t_vert in tgt_vtxs]
def __connects( self, s_vert: MachineVertex, pre_lo: int, pre_hi: int, t_vert: MachineVertex, post_lo: int, post_hi: int) -> bool: pre_slice = s_vert.vertex_slice post_slice = t_vert.vertex_slice # Check range of slices if pre_slice.hi_atom < pre_lo: return False if post_slice.hi_atom < post_lo: return False if pre_slice.lo_atom > pre_hi: return False if post_slice.lo_atom > post_hi: return False # Get slice range relative to view pre_s_hi = pre_slice.hi_atom - pre_lo post_s_hi = post_slice.hi_atom - post_lo pre_s_lo = pre_slice.lo_atom - pre_lo post_s_lo = post_slice.lo_atom - post_lo if pre_s_hi < post_s_lo: return False if pre_s_lo > post_s_hi: return False return True
[docs] @overrides(AbstractGenerateConnectorOnMachine.generate_on_machine) def generate_on_machine(self, synapse_info: SynapseInformation) -> bool: # If we are doing a 1:1 connector and the pre or post vertex is # multi-dimensional and have different dimensions pre = synapse_info.pre_vertex post = synapse_info.post_vertex if len(pre.atoms_shape) > 1 or len(post.atoms_shape) > 1: if (pre.atoms_shape != post.atoms_shape): print("Not generating on core!") return False if (pre.get_max_atoms_per_dimension_per_core() != post.get_max_atoms_per_dimension_per_core()): print("Not generating on core!") return False return super(OneToOneConnector, self).generate_on_machine(synapse_info)