# Copyright (c) 2022 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import csv
from datetime import datetime
import logging
import math
import numpy
import os
import quantities
import struct
import re
from spinn_utilities.log import FormatAdapter
from spinnman.messages.eieio.data_messages import EIEIODataHeader
from spinn_front_end_common.interface.ds import DataType
from pacman.model.graphs.common import MDSlice
from pacman.utilities.utility_calls import get_field_based_index
from spinn_front_end_common.interface.buffer_management.storage_objects \
import BufferDatabase
from spinn_front_end_common.utilities.constants import (
BYTES_PER_WORD, BITS_PER_WORD)
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spynnaker.pyNN.data import SpynnakerDataView
from spynnaker.pyNN.exceptions import SpynnakerException
from spynnaker.pyNN.utilities.buffer_data_type import BufferDataType
from spynnaker.pyNN.utilities.constants import SPIKES
from spynnaker.pyNN.utilities.neo_csv import NeoCsv
logger = FormatAdapter(logging.getLogger(__name__))
[docs]
class NeoBufferDatabase(BufferDatabase, NeoCsv):
"""
Extra support for Neo on top of the Database for SQLite 3.
This is the same database as used by BufferManager but with
extra tables and access methods added.
"""
# pylint: disable=c-extension-no-member
__N_BYTES_FOR_TIMESTAMP = BYTES_PER_WORD
__TWO_WORDS = struct.Struct("<II")
__NEO_DDL_FILE = os.path.join(os.path.dirname(__file__), "db.sql")
#: rewiring: shift values to decode recorded value
__PRE_ID_SHIFT = 9
__POST_ID_SHIFT = 1
__POST_ID_FACTOR = 2 ** 8
__FIRST_BIT = 1
#: number of words per rewiring entry
__REWIRING_N_WORDS = 2
def __init__(self, database_file=None, read_only=None):
"""
:param database_file:
The name of a file that contains (or will contain) an SQLite
database holding the data.
If omitted the default location will be used.
:type database_file: None or str
:param bool read_only:
By default the database is read-only if given a database file.
This allows to override that (mainly for clear)
"""
if database_file is None:
database_file = self.default_database_file()
if read_only is None:
read_only = False
else:
if read_only is None:
read_only = True
super().__init__(database_file, read_only=read_only)
with open(self.__NEO_DDL_FILE, encoding="utf-8") as f:
sql = f.read()
# pylint: disable=no-member
self._SQLiteDB__db.executescript(sql)
[docs]
def write_t_stop(self):
"""
Records the current run time as `t_Stop`.
This writes information held in :py:class:`SpynnakerDataView` so that
the database is usable stand-alone.
.. note::
The database must be writable for this to work!
"""
t_stop = SpynnakerDataView.get_current_run_time_ms()
self.execute(
"""
UPDATE segment
SET t_stop = ?
""", (t_stop, ))
def __get_segment_info(self):
"""
Gets the metadata for the segment.
:return: segment number, record time, last run time recorded,
simulator timestep in ms, simulator name
:rtype: tuple(int, ~datetime.datetime, float, float, str)
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
for row in self.execute(
"""
SELECT segment_number, rec_datetime, t_stop, dt, simulator
FROM segment
LIMIT 1
"""):
t_str = str(row[self._REC_DATETIME], "utf-8")
time = datetime.strptime(t_str, "%Y-%m-%d %H:%M:%S.%f")
if row[self._T_STOP] is None:
t_stop = 0.0
logger.warning("Data from a virtual run will be empty")
else:
t_stop = row[self._T_STOP]
return (row[self._SEGMENT_NUMBER], time, t_stop, row[self._DT],
str(row[self._SIMULATOR], 'utf-8'))
raise ConfigurationException(
"No recorded data. Did the simulation run?")
def __get_simulation_time_step_ms(self):
"""
The simulation time step, in milliseconds.
The value that would be/have been returned by
SpynnakerDataView.get_simulation_time_step_ms()
:rtype: float
:return: The timestep
"""
for row in self.execute(
"""
SELECT simulation_time_step_ms
FROM segment
LIMIT 1
"""):
return row["simulation_time_step_ms"]
raise ConfigurationException("No segment data")
def __get_population_id(self, pop_label, population):
"""
Gets an ID for this population label.
Will create a new population if required.
For speed does not verify the additional fields if a record already
exists.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:param ~spynnaker.pyNN.models.populations.Population population:
the population to record for
:return: The ID
"""
for row in self.execute(
"""
SELECT pop_id FROM population
WHERE label = ?
LIMIT 1
""", (pop_label,)):
return row["pop_id"]
self.execute(
"""
INSERT INTO population
(label, first_id, description, pop_size)
VALUES (?, ?, ?, ?)
""", (pop_label, population.first_id, population.describe(),
population.size))
return self.lastrowid
def __get_recording_id(
self, pop_label, variable, population,
sampling_interval_ms, data_type, buffered_type, units,
n_colour_bits):
"""
Gets an ID for this population and recording label combination.
Will create a new population/recording record if required.
For speed does not verify the additional fields if a record already
exists.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:param str variable:
:param ~spynnaker.pyNN.models.populations.Population population:
the population to record for
:param Population population:
:param sampling_interval:
The simulation time in milliseconds between sampling.
Typically the sampling rate * simulation_timestep_ms
:type sampling_interval_ms: float or None
:type data_type: DataType or None
:param BufferDataType buffered_type:
:param str units:
:param int n_colour_bits:
:return: The ID
"""
for row in self.execute(
"""
SELECT rec_id FROM recording_view
WHERE label = ? AND variable = ?
LIMIT 1
""", (pop_label, variable)):
return row["rec_id"]
pop_id = self.__get_population_id(pop_label, population)
if data_type:
data_type_name = data_type.name
else:
data_type_name = None
self.execute(
"""
INSERT INTO recording
(pop_id, variable, data_type, buffered_type, t_start,
sampling_interval_ms, units, n_colour_bits)
VALUES (?, ?, ?, ?, 0, ?, ?, ?)
""", (pop_id, variable, data_type_name, str(buffered_type),
sampling_interval_ms, units, n_colour_bits))
return self.lastrowid
def __get_population_metadata(self, pop_label):
"""
Gets the metadata for the population with this label
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:return: population size, first ID and description
:rtype: (int, int, str)
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
for row in self.execute(
"""
SELECT pop_size, first_id, description
FROM population
WHERE label = ?
LIMIT 1
""", (pop_label,)):
return (int(row["pop_size"]), int(row["first_id"]),
str(row["description"], 'utf-8'))
raise ConfigurationException(f"No metadata for {pop_label}")
[docs]
def get_population_metdadata(self, pop_label):
"""
Gets the metadata for the population with this label
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:return: population size, first id and description
:rtype: (int, int, str)
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
return self.__get_population_metadata(pop_label)
[docs]
def get_recording_populations(self):
"""
Gets a list of the labels of Populations recording.
Or to be exact the ones with metadata saved so likely to be recording.
.. note::
These are actually the labels of the Application Vertices.
Typically the Population label, corrected for `None` or
duplicate values
:return: List of population labels
:rtype: list(str)
"""
results = []
for row in self.execute(
"""
SELECT label
FROM population
"""):
results.append(str(row["label"], 'utf-8'))
return results
[docs]
def get_population(self, pop_label):
"""
Gets an Object with the same data retrieval API as a Population.
Retrieval is limited to recorded data and a little metadata needed to
create a single Neo Segment wrapped in a Neo Block.
.. note::
As each database only includes data for one run (with resets
creating another database) the structure is relatively simple.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:return: An Object which acts like a Population for getting neo data
:rtype: DataPopulation
"""
# delayed import due to circular dependencies
from .data_population import DataPopulation
# DataPopulation validates the pop_label so no need to do hre too
return DataPopulation(self._database_file, pop_label)
[docs]
def get_recording_variables(self, pop_label):
"""
List of the names of variables recording.
Or, to be exact, list of the names of variables with metadata so likely
to be recording.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:return: List of variable names
"""
return self.__get_recording_variables(pop_label)
def __get_recording_variables(self, pop_label):
"""
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:return: List of variable registered as recording.
Even if there is no data
:rtype: list(str)
"""
results = []
for row in self.execute(
"""
SELECT variable
FROM recording_view
WHERE label = ?
GROUP BY variable
""", (pop_label,)):
results.append(str(row["variable"], 'utf-8'))
return results
def __get_recording_metadeta(self, pop_label, variable):
"""
Gets the metadata id for this population and recording label
combination.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typical the Population label, corrected for `None` or
duplicate values
:param str variable:
:return:
id, data_type, buffered_type, t_start,
sampling_interval_ms, first_id, pop_size, units
:rtype:
tuple(int, DataType, BufferedDataType, float, float, int, int, str)
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
for row in self.execute(
"""
SELECT rec_id, data_type, buffered_type, t_start,
sampling_interval_ms, pop_size, units, n_colour_bits
FROM recording_view
WHERE label = ? AND variable = ?
LIMIT 1
""", (pop_label, variable)):
if row["data_type"]:
data_type_st = str(row["data_type"], 'utf-8')
data_type = DataType[data_type_st]
else:
data_type = None
if row["units"]:
units = str(row["units"], 'utf-8')
else:
units = None
buffered_type = BufferDataType[str(row["buffered_type"], 'utf-8')]
return (row["rec_id"], data_type, buffered_type, row["t_start"],
row["sampling_interval_ms"], row["pop_size"], units,
row["n_colour_bits"])
raise ConfigurationException(
f"No metadata for {variable} on {pop_label}")
def __get_region_metadata(self, rec_id):
"""
:param int rec_id:
:return:
region_id, neurons, vertex_slice, selective_recording, base_key
:rtype: iterable(tuple(int, ~numpy.ndarray, Slice, bool, int))
"""
rows = list(self.execute(
"""
SELECT region_id, recording_neurons_st, vertex_slice, base_key
FROM region_metadata
WHERE rec_id = ?
ORDER BY region_metadata_id
""", [rec_id]))
index = 0
for row in rows:
vertex_slice = MDSlice.from_string(
str(row["vertex_slice"], "utf-8"))
recording_neurons_st = row["recording_neurons_st"]
if recording_neurons_st:
neurons = numpy.array(self.string_to_array(
recording_neurons_st))
selective_recording = len(neurons) != vertex_slice.n_atoms
else:
selective_recording = None
neurons = None
yield (row["region_id"], neurons, vertex_slice,
selective_recording, row["base_key"], index)
index += 1
def __get_spikes_by_region(
self, region_id, neurons, simulation_time_step_ms,
selective_recording, spike_times, spike_ids):
"""
Adds spike data for this region to the lists.
:param int region_id: Region data came from
:param array(int) neurons: mapping of local ID to global ID
:param float simulation_time_step_ms:
:param bool selective_recording: flag to say if
:param list(float) spike_times: List to add spike times to
:param list(int) spike_ids: List to add spike IDs to
"""
neurons_recording = len(neurons)
if neurons_recording == 0:
return [], []
n_words = int(math.ceil(neurons_recording / BITS_PER_WORD))
n_bytes = n_words * BYTES_PER_WORD
n_words_with_timestamp = n_words + 1
record_raw = self._read_contents(region_id)
if len(record_raw) == 0:
return
raw_data = (
numpy.asarray(record_raw, dtype="uint8").view(
dtype="<i4")).reshape([-1, n_words_with_timestamp])
record_time = (raw_data[:, 0] * simulation_time_step_ms)
spikes = raw_data[:, 1:].byteswap().view("uint8")
bits = numpy.fliplr(numpy.unpackbits(spikes).reshape(
(-1, 32))).reshape((-1, n_bytes * 8))
time_indices, local_indices = numpy.where(bits == 1)
if selective_recording:
for time_indice, local in zip(time_indices, local_indices):
if local < neurons_recording:
spike_ids.append(neurons[local])
spike_times.append(record_time[time_indice])
else:
indices = neurons[local_indices]
times = record_time[time_indices].reshape((-1))
spike_ids.extend(indices)
spike_times.extend(times)
def __get_neuron_spikes(self, rec_id):
"""
Gets the spikes for this population/recording ID.
:param int rec_id:
:return: numpy array of spike IDs and spike times, all IDs recording
:rtype: tuple(~numpy.ndarray, list(int))
"""
spike_times = list()
spike_ids = list()
simulation_time_step_ms = self.__get_simulation_time_step_ms()
indexes = []
for region_id, neurons, _, selective_recording, _, _ in \
self.__get_region_metadata(rec_id):
indexes.extend(neurons)
self.__get_spikes_by_region(
region_id, neurons, simulation_time_step_ms,
selective_recording, spike_times, spike_ids)
result = numpy.column_stack((spike_ids, spike_times))
return result[numpy.lexsort((spike_times, spike_ids))], indexes
def __get_eieio_spike_by_region(
self, region_id, simulation_time_step_ms, base_key,
vertex_slice, n_colour_bits, results):
"""
Adds spike data for this region to the list.
:param int region_id: Region data came from
:param float simulation_time_step_ms:
:param int base_key:
:param ~pacman.model.graphs.common.Slice vertex_slice:
:param int n_colour_bits:
:return: all recording indexes spikes or not
:rtype: list(int)
"""
spike_data = self._read_contents(region_id)
number_of_bytes_written = len(spike_data)
offset = 0
indices = get_field_based_index(base_key, vertex_slice, n_colour_bits)
slice_ids = vertex_slice.get_raster_ids()
colour_mask = (2 ** n_colour_bits) - 1
inv_colour_mask = ~colour_mask & 0xFFFFFFFF
while offset < number_of_bytes_written:
length, time = self.__TWO_WORDS.unpack_from(spike_data, offset)
time *= simulation_time_step_ms
data_offset = offset + 2 * BYTES_PER_WORD
eieio_header = EIEIODataHeader.from_bytestring(
spike_data, data_offset)
if eieio_header.eieio_type.payload_bytes > 0:
raise ValueError("Can only read spikes as keys")
data_offset += eieio_header.size
timestamps = numpy.repeat([time], eieio_header.count)
key_bytes = eieio_header.eieio_type.key_bytes
keys = numpy.frombuffer(
spike_data, dtype=f"<u{key_bytes}",
count=eieio_header.count, offset=data_offset)
keys = numpy.bitwise_and(keys, inv_colour_mask)
local_ids = numpy.array([indices[key] for key in keys])
neuron_ids = slice_ids[local_ids]
offset += length + 2 * BYTES_PER_WORD
results.append(numpy.dstack((neuron_ids, timestamps))[0])
return slice_ids
def __get_eieio_spikes(self, rec_id, n_colour_bits):
"""
Gets the spikes for this population/recording ID.
:param int rec_id:
:param int n_colour_bits:
:return: numpy array of spike IDs and spike times, all IDs recording
:rtype: tuple(~numpy.ndarray, list(int))
"""
simulation_time_step_ms = self.__get_simulation_time_step_ms()
results = []
indexes = []
for region_id, _, vertex_slice, selective_recording, base_key, _ in \
self.__get_region_metadata(rec_id):
if selective_recording:
raise NotImplementedError(
"Unable to handle selective recording")
indexes.extend(self.__get_eieio_spike_by_region(
region_id, simulation_time_step_ms,
base_key, vertex_slice, n_colour_bits, results))
if not results:
return numpy.empty(shape=(0, 2)), indexes
result = numpy.vstack(results)
return result[numpy.lexsort((result[:, 1], result[:, 0]))], indexes
def __get_multi_spikes_by_region(
self, region_id, neurons, simulation_time_step_ms,
spike_times, spike_ids):
"""
Adds spike data for this region to the lists.
:param int region_id: Region data came from
:param ~numpy.ndarray neurons:
:param float simulation_time_step_ms:
:param list(float) spike_times: List to add spike times to
:param list(int) spike_ids: List to add spike IDs to
:return: all recording indexes spikes or not
:rtype: list(int)
"""
raw_data = self._read_contents(region_id)
n_words = int(math.ceil(len(neurons) / BITS_PER_WORD))
n_bytes_per_block = n_words * BYTES_PER_WORD
offset = 0
while offset < len(raw_data):
time, n_blocks = self.__TWO_WORDS.unpack_from(raw_data, offset)
offset += self.__TWO_WORDS.size
spike_data = numpy.frombuffer(
raw_data, dtype="uint8",
count=n_bytes_per_block * n_blocks, offset=offset)
offset += n_bytes_per_block * n_blocks
spikes = spike_data.view("<i4").byteswap().view("uint8")
bits = numpy.fliplr(numpy.unpackbits(spikes).reshape(
(-1, 32))).reshape((-1, n_bytes_per_block * 8))
local_indices = numpy.nonzero(bits)[1]
indices = neurons[local_indices]
times = numpy.repeat(
[time * simulation_time_step_ms],
len(indices))
spike_ids.append(indices)
spike_times.append(times)
def __get_multi_spikes(self, rec_id):
"""
Gets the spikes for this population/recording ID.
:param int rec_id:
:return: numpy array of spike IDs and spike times, all IDs recording
:rtype: tuple(~numpy.ndarray, list(int))
"""
spike_times = list()
spike_ids = list()
indexes = []
simulation_time_step_ms = self.__get_simulation_time_step_ms()
for region_id, neurons, _, selective_recording, _, _ in \
self.__get_region_metadata(rec_id):
if selective_recording:
raise NotImplementedError(
"Unable to handle selective recording")
indexes.extend(neurons)
self.__get_multi_spikes_by_region(
region_id, neurons, simulation_time_step_ms,
spike_times, spike_ids)
if not spike_ids:
return numpy.zeros((0, 2)), indexes
spike_ids = numpy.hstack(spike_ids)
spike_times = numpy.hstack(spike_times)
result = numpy.dstack((spike_ids, spike_times))[0]
return result[numpy.lexsort((spike_times, spike_ids))], indexes
def __combine_indexes(self, view_indexes, data_indexes, variable):
"""
:param view_indexes:
:param data_indexes:
:param str variable:
:return: indices
:rtype: list
"""
# keep just the view indexes in the data
data_set = set(data_indexes)
indexes = [i for i in view_indexes if i in data_set]
# check for missing and report
view_set = set(view_indexes)
missing = view_set.difference(data_indexes)
if missing:
missing_list = list(missing)
missing_list.sort()
logger.warning("No {} available for neurons {}",
variable, missing_list)
return indexes
def __get_spikes(self, rec_id, view_indexes, buffer_type,
n_colour_bits, variable):
"""
Gets the data as a Numpy array for one population and variable.
:param int rec_id:
:param list(int) view_indexes:
:param buffer_type:
:param int n_colour_bits:
:param str variable:
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
:rtype: tuple(~numpy.ndarray, list(int))
"""
if buffer_type == BufferDataType.NEURON_SPIKES:
spikes, data_indexes = self.__get_neuron_spikes(rec_id)
elif buffer_type == BufferDataType.EIEIO_SPIKES:
spikes, data_indexes = self.__get_eieio_spikes(
rec_id, n_colour_bits)
elif buffer_type == BufferDataType.MULTI_SPIKES:
spikes, data_indexes = self.__get_multi_spikes(rec_id)
else:
raise NotImplementedError(buffer_type)
if view_indexes is None or list(view_indexes) == list(data_indexes):
indexes = numpy.array(data_indexes)
else:
# keep just the view indexes in the data
indexes = self.__combine_indexes(
view_indexes, data_indexes, variable)
# keep just data columns in the view
spikes = spikes[numpy.isin(spikes[:, 0], indexes)]
return spikes, indexes
def __get_matrix_data_by_region(self, region_id, neurons, data_type):
"""
Extracts data for this region.
:param int region_id: Region data came from
:param array(int) neurons: mapping of local ID to global ID
:param DataType data_type: type of data to extract
:return: times, data
:rtype: tuple(~numpy.ndarray, ~numpy.ndarray)
"""
# for buffering output info is taken form the buffer manager
record_raw = self._read_contents(region_id)
record_length = len(record_raw)
# There is one column for time and one for each neuron recording
data_row_length = len(neurons) * data_type.size
full_row_length = data_row_length + self.__N_BYTES_FOR_TIMESTAMP
n_rows = record_length // full_row_length
row_data = numpy.asarray(record_raw, dtype="uint8").reshape(
n_rows, full_row_length)
time_bytes = (
row_data[:, 0: self.__N_BYTES_FOR_TIMESTAMP].reshape(
n_rows * self.__N_BYTES_FOR_TIMESTAMP))
times = time_bytes.view("<i4").reshape(n_rows, 1)
var_data = (row_data[:, self.__N_BYTES_FOR_TIMESTAMP:].reshape(
n_rows * data_row_length))
placement_data = data_type.decode_array(var_data).reshape(
n_rows, len(neurons))
return times, placement_data
def __get_matrix_data(
self, rec_id, data_type, view_indexes, pop_size, variable):
"""
Gets the matrix data for this population/recording ID.
:param int rec_id:
:param DataType data_type: type of data to extract
:param view_indexes:
The indexes for which data should be returned. Or `None` for all
:type view_indexes: list(int) or None
:param int pop_size:
:param str variable:
:return: numpy array of the data, neurons
:rtype: tuple(~numpy.ndarray, ~numpy.ndarray or list(int))
"""
signal_array = None
pop_times = None
pop_neurons = []
indexes = []
for region_id, neurons, _, _, _, index in \
self.__get_region_metadata(rec_id):
if neurons is not None:
pop_neurons.extend(neurons)
else:
indexes.append(index)
neurons = [index]
times, data = self.__get_matrix_data_by_region(
region_id, neurons, data_type)
if signal_array is None:
signal_array = data
pop_times = times
elif numpy.array_equal(pop_times, times):
signal_array = numpy.append(
signal_array, data, axis=1)
else:
raise NotImplementedError("times differ")
if signal_array is None:
signal_array = []
if len(indexes) > 0:
assert (len(pop_neurons) == 0)
if view_indexes is not None:
raise SpynnakerException(
f"{variable} data can not be extracted using a view")
return signal_array, indexes
data_indexes = numpy.array(pop_neurons)
if view_indexes is None:
view_indexes = range(pop_size)
if list(view_indexes) == list(data_indexes):
indexes = numpy.array(data_indexes)
else:
# keep just the view indexes in the data
indexes = self.__combine_indexes(
view_indexes, data_indexes, variable)
# keep just data columns in the view
map_indexes = [list(data_indexes).index(i) for i in indexes]
signal_array = signal_array[:, map_indexes]
return signal_array, indexes
def __get_rewires_by_region(
self, region_id, vertex_slice, rewire_values,
rewire_postids, rewire_preids, rewire_times, sampling_interval_ms):
"""
Extracts rewires data for this region and adds it to the lists.
:param int region_id: Region data came from
:param ~pacman.model.graphs.common.Slice vertex_slice:
slice of this region
:param list(int) rewire_values:
:param list(int) rewire_postids:
:param list(int) rewire_preids:
:param list(int) rewire_times:
"""
record_raw = self._read_contents(region_id)
if len(record_raw) > 0:
raw_data = (
numpy.asarray(record_raw, dtype="uint8").view(
dtype="<i4")).reshape([-1, self.__REWIRING_N_WORDS])
else:
return
record_time = (raw_data[:, 0] * sampling_interval_ms)
rewires_raw = raw_data[:, 1:]
rew_length = len(rewires_raw)
# rewires is 0 (elimination) or 1 (formation) in the first bit
rewires = [rewires_raw[i][0] & self.__FIRST_BIT
for i in range(rew_length)]
# the post-neuron ID is stored in the next 8 bytes
post_ids = [((int(rewires_raw[i]) >> self.__POST_ID_SHIFT) %
self.__POST_ID_FACTOR) + vertex_slice.lo_atom
for i in range(rew_length)]
# the pre-neuron ID is stored in the remaining 23 bytes
pre_ids = [int(rewires_raw[i]) >> self.__PRE_ID_SHIFT
for i in range(rew_length)]
rewire_values.extend(rewires)
rewire_postids.extend(post_ids)
rewire_preids.extend(pre_ids)
rewire_times.extend(record_time)
def __get_rewires(self, rec_id, sampling_interval_ms):
"""
Extracts rewires data for this region.
:param int rec_id:
:return: (rewire_values, rewire_postids, rewire_preids, rewire_times)
:rtype: ~numpy.ndarray(tuple(int, int, int, int))
"""
rewire_times = list()
rewire_values = list()
rewire_postids = list()
rewire_preids = list()
for region_id, _, vertex_slice, _, _, _ in \
self.__get_region_metadata(rec_id):
# as no neurons for "rewires" selective_recording will be true
self.__get_rewires_by_region(
region_id, vertex_slice, rewire_values, rewire_postids,
rewire_preids, rewire_times, sampling_interval_ms)
if len(rewire_values) == 0:
return numpy.zeros((0, 4), dtype="float")
result = numpy.column_stack(
(rewire_times, rewire_preids, rewire_postids, rewire_values))
return result[numpy.lexsort(
(rewire_values, rewire_postids, rewire_preids, rewire_times))]
def __get_recorded_pynn7(
self, rec_id, data_type, sampling_interval_ms,
as_matrix, view_indexes, pop_size, variable):
"""
Get recorded data in PyNN 0.7 format. Must not be spikes.
:param int rec_id:
:param DataType data_type: type of data to extract
:param float sampling_interval_ms:
:param bool as_matrix:
:param view_indexes:
The indexes for which data should be returned. Or `None` for all
:type view_indexes: list(int) or None
:param int pop_size:
:param str variable:
:rtype: ~numpy.ndarray
"""
data, indexes = self.__get_matrix_data(
rec_id, data_type, view_indexes, pop_size, variable)
if as_matrix:
return data
# Convert to triples as Pynn 0,7 did
n_machine_time_steps = len(data)
n_neurons = len(indexes)
column_length = n_machine_time_steps * n_neurons
times = [i * sampling_interval_ms
for i in range(0, n_machine_time_steps)]
return numpy.column_stack((
numpy.repeat(indexes, n_machine_time_steps, 0),
numpy.tile(times, n_neurons),
numpy.transpose(data).reshape(column_length)))
[docs]
def spinnaker_get_data(
self, pop_label, variable, as_matrix=False, view_indexes=None):
if not isinstance(variable, str):
if len(variable) != 1:
raise ConfigurationException(
"Only one type of data at a time is supported")
variable = variable[0]
# called to trigger the virtual data warning if applicable
self.__get_segment_info()
(rec_id, data_type, buffered_type, _, sampling_interval_ms,
pop_size, _, n_colour_bits) = \
self.__get_recording_metadeta(pop_label, variable)
if buffered_type == BufferDataType.MATRIX:
return self.__get_recorded_pynn7(
rec_id, data_type, sampling_interval_ms,
as_matrix, view_indexes, pop_size, variable)
# NO BufferedDataType.REWIRES get_spike will go boom
else:
if as_matrix:
logger.warning("Ignoring as matrix for {}", variable)
return self.__get_spikes(
rec_id, view_indexes, buffered_type,
n_colour_bits, variable)[0]
[docs]
def get_spike_counts(self, pop_label, view_indexes=None):
# called to trigger the virtual data warning if applicable
self.__get_segment_info()
(rec_id, _, buffered_type, _, _, pop_size, _, n_colour_bits) = \
self.__get_recording_metadeta(pop_label, SPIKES)
if view_indexes is None:
view_indexes = range(pop_size)
# get_spike will go boom if buffered_type not spikes
spikes = self.__get_spikes(
rec_id, view_indexes, buffered_type, n_colour_bits, SPIKES)[0]
counts = numpy.bincount(spikes[:, 0].astype(dtype=numpy.int32),
minlength=pop_size)
return {i: counts[i] for i in view_indexes}
def __add_data(
self, pop_label, variable, segment, view_indexes, t_stop):
"""
Gets the data as a Numpy array for one population and variable.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:param str variable:
:param ~neo.core.Block block: neo block
:param ~neo.core.Segment segment: Segment to add data to
:param float t_stop:
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
(rec_id, data_type, buffer_type, t_start, sampling_interval_ms,
pop_size, units, n_colour_bits) = \
self.__get_recording_metadeta(pop_label, variable)
if buffer_type == BufferDataType.MATRIX:
signal_array, indexes = self.__get_matrix_data(
rec_id, data_type, view_indexes, pop_size, variable)
sampling_rate = 1000/sampling_interval_ms * quantities.Hz
t_start = t_start * quantities.ms
self._insert_matrix_data(
variable, segment, signal_array,
indexes, t_start, sampling_rate, units)
elif buffer_type == BufferDataType.REWIRES:
if view_indexes is not None:
raise SpynnakerException(
f"{variable} can not be extracted using a view")
event_array = self.__get_rewires(rec_id, sampling_interval_ms)
self._insert_neo_rewirings(segment, event_array, variable)
else:
if view_indexes is None:
view_indexes = range(pop_size)
spikes, indexes = self.__get_spikes(
rec_id, view_indexes, buffer_type, n_colour_bits, variable)
sampling_rate = 1000 / sampling_interval_ms * quantities.Hz
self._insert_spike_data(
view_indexes, segment, spikes, t_start, t_stop,
sampling_rate)
def __read_and_csv_data(self, pop_label, variable, csv_writer,
view_indexes, t_stop):
"""
Reads the data for one variable and adds it to the CSV file.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:param str variable:
:param ~csv.writer csv_writer: Open CSV writer to write to
:param view_indexes:
:type view_indexes: None, ~numpy.array or list(int)
:param float t_stop:
"""
(rec_id, data_type, buffer_type, t_start, sampling_interval_ms,
pop_size, units, n_colour_bits) = \
self.__get_recording_metadeta(pop_label, variable)
if buffer_type == BufferDataType.MATRIX:
self._csv_variable_metdata(
csv_writer, self._MATRIX, variable, t_start, t_stop,
sampling_interval_ms, units)
signal_array, indexes = self.__get_matrix_data(
rec_id, data_type, view_indexes, pop_size, variable)
self._csv_matrix_data(csv_writer, signal_array, indexes)
elif buffer_type == BufferDataType.REWIRES:
self._csv_variable_metdata(
csv_writer, self._EVENT, variable, t_start, t_stop,
sampling_interval_ms, units)
if view_indexes is not None:
raise SpynnakerException(
f"{variable} can not be extracted using a view")
event_array = self.__get_rewires(rec_id, sampling_interval_ms)
self._csv_rewirings(csv_writer, event_array)
else:
self._csv_variable_metdata(
csv_writer, self._SPIKES, variable, t_start, t_stop,
sampling_interval_ms, units)
spikes, indexes = self.__get_spikes(
rec_id, view_indexes, buffer_type, n_colour_bits, variable)
self._csv_spike_data(csv_writer, spikes, indexes)
def __get_empty_block(self, pop_label, annotations):
"""
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:param variables:
One or more variable names or `None` for all available
:type variables: str, list(str) or None
:param annotations: annotations to put on the neo block
:type annotations: None or dict(str, ...)
:return: The Neo block
:rtype: ~neo.core.Block
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
_, _, _, dt, simulator = self.__get_segment_info()
pop_size, first_id, description = \
self.__get_population_metadata(pop_label)
return self._insert_empty_block(
pop_label, description, pop_size, first_id, dt, simulator,
annotations)
[docs]
def get_empty_block(self, pop_label, annotations=None):
"""
Creates a block with just metadata but not data segments.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:param variables:
One or more variable names or `None` for all available
:type variables: str, list(str) or None
:param view_indexes: List of neurons IDs to include or `None` for all
:type view_indexes: None or list(int)
:param annotations: annotations to put on the neo block
:type annotations: None or dict(str, ...)
:return: The Neo block
:rtype: ~neo.core.Block
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
return self.__get_empty_block(pop_label, annotations)
[docs]
def get_full_block(self, pop_label, variables, view_indexes, annotations):
"""
Creates a block with metadata and data for this segment.
Any previous segments will be empty.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:param variables:
One or more variable names or `None` for all available
:type variables: str, list(str) or None
:param view_indexes: List of neurons IDs to include or `None` for all
:type view_indexes: None or list(int)
:param annotations: annotations to put on the neo block
:type annotations: None or dict(str, ...)
:return: The Neo block
:rtype: ~neo.core.Block
"""
block = self.__get_empty_block(pop_label, annotations)
self.__add_segment(block, pop_label, variables, view_indexes)
return block
[docs]
def csv_segment(
self, csv_file, pop_label, variables, view_indexes=None):
"""
Writes the data including metadata to a CSV file.
:param str csvfile: Path to file to write block metadata to
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typical the Population label, corrected for `None` or
duplicate values
:param variables:
One or more variable names or `None` for all available
:type variables: str, list(str) or None
:param view_indexes: List of neurons IDs to include or `None` for all
:type view_indexes: None or list(int)
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
if not os.path.isfile(csv_file):
raise SpynnakerException("PLease call csv_block_metadata first")
with open(csv_file, 'a', newline='', encoding="utf-8") as csvfile:
csv_writer = csv.writer(csvfile, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
segment_number, rec_datetime, t_stop, _, _ = \
self.__get_segment_info()
self._csv_segment_metadata(
csv_writer, segment_number, rec_datetime)
variables = self.__clean_variables(variables, pop_label)
for variable in variables:
self.__read_and_csv_data(
pop_label, variable, csv_writer, view_indexes, t_stop)
[docs]
def add_segment(self, block, pop_label, variables, view_indexes=None):
"""
Adds a segment to the block.
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:param variables:
One or more variable names or `None` for all available
:type variables: str, list(str) or None
:param view_indexes: List of neurons IDs to include or `None` for all
:type view_indexes: None or list(int)
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
self.__add_segment(block, pop_label, variables, view_indexes)
def __clean_variables(self, variables, pop_label):
if isinstance(variables, str):
variables = [variables]
if 'all' in variables:
variables = None
if variables is None:
variables = self.__get_recording_variables(pop_label)
return variables
def __add_segment(self, block, pop_label, variables, view_indexes):
"""
Adds a segment to the block.
:param ~neo.core.Block block:
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typically the Population label, corrected for `None` or
duplicate values
:param variables:
One or more variable names or `None` for all available
:type variables: str, list(str) or None
:param view_indexes: List of neurons IDs to include or `None` for all
:type view_indexes: None or list(int)
:raises \
~spinn_front_end_common.utilities.exceptions.ConfigurationException:
If the recording metadata not setup correctly
"""
segment_number, rec_datetime, t_stop, _, _ = \
self.__get_segment_info()
segment = self._insert_empty_segment(
block, segment_number, rec_datetime)
variables = self.__clean_variables(variables, pop_label)
for variable in variables:
self.__add_data(
pop_label, variable, segment, view_indexes, t_stop)
[docs]
def clear_data(self, pop_label, variables):
"""
Clears the data for one population and given variables.
.. note:::
The database must be writable for this to work!
:param str pop_label: The label for the population of interest
.. note::
This is actually the label of the Application Vertex.
Typical the Population label, corrected for `None` or
duplicate values
:param list(str) variables: names of variable to get data for
"""
t_start = SpynnakerDataView.get_current_run_time_ms()
variables = self.__clean_variables(variables, pop_label)
for variable in variables:
self.execute(
"""
UPDATE recording SET
t_start = ?
WHERE rec_id in
(SELECT rec_id
FROM recording_view
WHERE label = ? AND variable = ?)
""", (t_start, pop_label, variable))
self.execute(
"""
UPDATE region SET
content = CAST('' AS BLOB), content_len = 0,
fetches = 0, append_time = NULL
WHERE region_id in
(SELECT region_id
FROM region_metadata NATURAL JOIN recording_view
WHERE label = ? AND variable = ?)
""", (pop_label, variable))
self.execute(
"""
DELETE FROM region_extra
WHERE region_id in
(SELECT region_id
FROM region_metadata NATURAL JOIN recording_view
WHERE label = ? AND variable = ?)
""", (pop_label, variable))
def __write_metadata(self, population, variable):
# pylint: disable=protected-access
app_vertex = population._vertex
buffered_data_type = \
app_vertex.get_buffer_data_type(variable)
data_type = app_vertex.get_data_type(variable)
sampling_interval_ms = \
app_vertex.get_sampling_interval_ms(variable)
units = app_vertex.get_units(variable)
n_colour_bits = app_vertex.n_colour_bits
rec_id = self.__get_recording_id(
app_vertex.label, variable, population, sampling_interval_ms,
data_type, buffered_data_type, units, n_colour_bits)
region = app_vertex.get_recording_region(variable)
machine_vertices = (
app_vertex.splitter.machine_vertices_for_recording(
variable))
for vertex in machine_vertices:
placement = SpynnakerDataView.get_placement_of_vertex(
vertex)
region_id = self._get_region_id(
placement.x, placement.y, placement.p, region)
vertex_slice = vertex.vertex_slice
neurons = app_vertex.get_neurons_recording(
variable, vertex_slice)
if neurons is None:
recording_neurons_st = None
elif len(neurons) == 0:
continue
else:
recording_neurons_st = self.array_to_string(
neurons)
if buffered_data_type == BufferDataType.EIEIO_SPIKES:
base_key = vertex.get_virtual_key()
else:
base_key = None
self.execute(
"""
INSERT INTO region_metadata
(rec_id, region_id, recording_neurons_st,
base_key, vertex_slice)
VALUES (?, ?, ?, ?, ?)
""",
(rec_id, region_id, recording_neurons_st,
base_key, str(vertex.vertex_slice)))
[docs]
@staticmethod
def array_to_string(indexes):
"""
Converts a list of integers into a compact string.
Works best if the list is sorted.
IDs are comma separated, except when a series of IDs is sequential then
the start:end is used.
:param list(int) indexes:
:rtype: str
"""
if indexes is None or len(indexes) == 0:
return ""
previous = indexes[0]
results = str(previous)
in_range = False
for index in indexes[1:]:
if index == previous + 1:
if not in_range:
results += ":"
in_range = True
else:
if in_range:
results += str(previous)
results += ","
results += str(index)
in_range = False
previous = index
if in_range:
results += str(previous)
return results
[docs]
@staticmethod
def string_to_array(string):
"""
Converts a string into a list of integers.
Assumes the string was created by :py:meth:`array_to_string`
:param str string:
:rtype: list(int)
"""
if not string:
return []
if not isinstance(string, str):
string = str(string, "utf-8")
results = []
parts = re.findall(r"\d+[,:]*", string)
start = None
for part in parts:
if part.endswith(":"):
start = int(part[:-1])
else:
if part.endswith(","):
val = int(part[:-1])
else:
val = int(part)
if start is not None:
results.extend(range(start, val+1))
start = None
else:
results.append(val)
return results