Source code for spynnaker.pyNN.external_devices_models.push_bot.parameters.push_bot_retina_viewer

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from threading import Thread, RLock
from time import sleep

from matplotlib import pyplot  # type: ignore[import]
import numpy

from spinn_utilities.log import FormatAdapter

_logger = FormatAdapter(logging.getLogger(__name__))
MAX_VALUE = 33.0
ADD_VALUE = 1.0
DECAY_FACTOR = 0.5
SLEEP_TIME = 0.1


class PushBotRetinaViewer():
    """
    Viewer of retina from the PushBot.
    """
    __slots__ = (
        "__image_data", "__image_lock",
        "__without_polarity_mask", "__height",
        "__fig", "__plot",
        "__running", "__sim", "__conn")

    def __init__(self, retina_resolution, label, sim):
        pyplot.ion()
        self.__image_data = numpy.zeros(
            (retina_resolution.value.pixels, retina_resolution.value.pixels),
            dtype=numpy.float32)
        self.__fig, axes = pyplot.subplots(figsize=(8, 8))
        self.__plot = axes.imshow(
            self.__image_data, interpolation="nearest", cmap="Greens",
            vmin=0, vmax=MAX_VALUE)
        self.__fig.canvas.draw()
        self.__fig.canvas.flush_events()

        self.__without_polarity_mask = (
            2 ** (retina_resolution.value.bits_per_coordinate * 2)) - 1
        self.__height = retina_resolution.value.pixels

        self.__running = True
        self.__image_lock = RLock()

        self.__sim = sim
        self.__conn = sim.external_devices.SpynnakerLiveSpikesConnection(
            receive_labels=[label], local_port=None)
        self.__conn.add_receive_callback(label, self.__recv)

    @property
    def port(self):
        """
        The port the connection is listening on.

        :rtype: int
        """
        return self.__conn.local_port

    # pylint: disable=unused-argument
    def __recv(self, label, time, spikes):
        np_spikes = numpy.array(spikes) & self.__without_polarity_mask
        x_vals, y_vals = numpy.divmod(np_spikes, self.__height)
        self.__image_lock.acquire()
        self.__image_data[x_vals, y_vals] += 1.0
        self.__image_lock.release()

    def __run_sim_forever(self):
        try:
            self.__sim.external_devices.run_forever()
            self.__running = False
            self.__sim.end()
        except KeyboardInterrupt:
            pass
        except Exception:  # pylint: disable=broad-except
            _logger.exception("unexpected exception in simulation thread")

    def __run_sim(self, run_time):
        try:
            self.__sim.run(run_time)
            self.__running = False
            self.__sim.end()
        except KeyboardInterrupt:
            pass
        except Exception:  # pylint: disable=broad-except
            _logger.exception("unexpected exception in simulation thread")

    def __run(self, run_thread):
        try:
            while self.__running and self.__fig.get_visible():
                self.__image_lock.acquire()
                self.__plot.set_array(self.__image_data)
                self.__fig.canvas.draw()
                self.__fig.canvas.flush_events()
                self.__image_data *= DECAY_FACTOR
                self.__image_lock.release()
                sleep(0.1)
        except KeyboardInterrupt:
            pass
        except Exception:  # pylint: disable=broad-except
            _logger.exception("unexpected exception in drawing thread")

[docs] def run_until_closed(self): """ Run the viewer and simulation until the viewer is closed. """ run_thread = Thread(target=self.__run_sim_forever) run_thread.start() try: self.__run(run_thread) finally: self.__sim.external_devices.request_stop() run_thread.join()
[docs] def run(self, run_time): """ Run the viewer and simulation for a fixed time. """ run_thread = Thread(target=self.__run_sim, args=[run_time]) run_thread.start() try: self.__run(run_thread) finally: run_thread.join()