#################################################################################
## ##
## Copyright C 2018 Juan P. Dominguez-Morales ##
## ##
## This file is part of pyNAVIS. ##
## ##
## pyNAVIS is free software: you can redistribute it and/or modify ##
## it under the terms of the GNU General Public License as published by ##
## the Free Software Foundation, either version 3 of the License, or ##
## (at your option) any later version. ##
## ##
## pyNAVIS is distributed in the hope that it will be useful, ##
## but WITHOUT ANY WARRANTY; without even the implied warranty of ##
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the ##
## GNU General Public License for more details. ##
## ##
## You should have received a copy of the GNU General Public License ##
## along with pyNAVIS. If not, see <http://www.gnu.org/licenses/>. ##
## ##
#################################################################################
import math
import struct
import csv
[docs]class SpikesFile:
"""
Class that contains all the addresses and timestamps of a file.
Attributes:
timestamps (int[]): Timestamps of the file.
addresses (int[]): Addresses of the file.
Note:
Timestamps and addresses are matched, which means that timestamps[0] is the timestamp for the spike with address addresses[0].
"""
def __init__(self, addresses = [], timestamps = []):
self.addresses = addresses
self.timestamps = timestamps
class LocalizationFile:
"""
Class that contains all the events ant timestamps from the sound source localization model of a file.
Attributes:
mso_neurons_ids (int[]): Neuron's IDs of the MSO population of the file.
mso_channels (int[]): Frequency channels associated to the MSO neuron's IDs of the file.
mso_timestamps (int[]): Timestamps of the MSO neuron's IDs of the file.
lso_neuron_ids (int[]): Neuron's IDs of the LSO population of the file.
lso_channels (int[]): Frequency channels associated to the LSO neuron's IDs of the file.
lso_timestamps (int[]): Timestamps of the LSO neuron's IDs of the file.
Note:
Timestamps, addresses, and neurons' ID are matched, which means that mso_timestamps[0] is the timestamp for the spike with address mso_neuron_ids[0].
"""
def __init__(self, mso_neuron_ids = [], mso_channels = [], mso_timestamps = [], lso_neuron_ids = [], lso_channels = [], lso_timestamps = []):
self.mso_neuron_ids = mso_neuron_ids
self.mso_channels = mso_channels
self.mso_timestamps = mso_timestamps
self.lso_neuron_ids = lso_neuron_ids
self.lso_channels = lso_channels
self.lso_timestamps = lso_timestamps
[docs]class Loaders:
"""
Functionalities for loading spiking information from different formats.
"""
[docs] @staticmethod
def loadAEDAT(path, settings):
"""
Loads an AEDAT (.aedat) file.
Parameters:
path (string): Full path of the AEDAT file to be loaded, including name and extension.
settings (MainSettings): Configuration parameters for the file to load.
Returns:
SpikesFile: SpikesFile containing all the addresses and timestamps of the file.
Raises:
SettingsError: If settings.address_size is different than 2 and 4.
"""
unpack_param = ">H"
if settings.address_size == 2:
unpack_param = ">H"
elif settings.address_size == 4:
unpack_param = ">L"
else:
print("[Loaders.loadAEDAT] > SettingsError: Only address sizes implemented are 2 and 4 bytes")
with open(path, 'rb') as f:
## Check header ##
p = 0
lt = f.readline()
while lt and lt[0] == ord("#"):
p += len(lt)
lt = f.readline()
f.seek(p)
f.seek(0, 2)
eof = f.tell()
num_events = math.floor((eof-p)/(settings.address_size + 4))
f.seek(p)
events = [0] * int(num_events)
timestamps = [0] * int(num_events)
## Read file ##
i = 0
try:
while 1:
buff = f.read(settings.address_size)
x = struct.unpack(unpack_param, buff)[0]
events[i] = x
buff = f.read(4)
x = struct.unpack('>L', buff)[0]
timestamps[i] = x
i += 1
except Exception as inst:
pass
spikes_file = SpikesFile([], [])
spikes_file.addresses = events
spikes_file.timestamps = timestamps
return spikes_file
[docs] @staticmethod
def loadAEDATLocalization(path, settings, localization_settings):
"""
Loads an AEDAT (.aedat) file which contains events from both the NAS model and the SOC model (sound source localization).
Parameters:
path (string): Full path of the AEDAT file to be loaded, including name and extension.
settings (MainSettings): Configuration parameters for the file to load.
localization_settings (LocalizationSettings): Configuration parameters of the localization module for the file to load.
Returns:
SpikesFile: SpikesFile containing all the addresses and timestamps of the file.
LocalizationFile: LocalizationFile containing all the events from both the MSO and LSO models of the file.
Raises:
SettingsError: If settings.address_size is different than 2 and 4.
"""
unpack_param = ">H"
if settings.address_size == 2:
unpack_param = ">H"
elif settings.address_size == 4:
unpack_param = ">L"
else:
print("[Loaders.loadAEDATLocalization] > SettingsError: Only address sizes implemented are 2 and 4 bytes")
# Check the localization_settings values
localization_settings_error = False
# MSO start frequency channel range
if ((localization_settings.mso_start_channel < 0) or (localization_settings.mso_start_channel >= settings.num_channels)):
print("[Loaders.loadAEDATLocalization] > LocalizationSettingsError: MSO start frequency channel range should be in the range [0, num_channels-1]")
localization_settings_error = True
# MSO start frequency channel and end frequency channel
if (localization_settings.mso_end_channel < localization_settings.mso_start_channel):
print("[Loaders.loadAEDATLocalization] > LocalizationSettingsError: MSO start frequency channel should be lower than MSO end frequency channel")
localization_settings_error = True
# MSO start frequency channel and end frequency channel
if ((localization_settings.mso_num_neurons_channel < 1) or (localization_settings.mso_num_neurons_channel > 32)):
print("[Loaders.loadAEDATLocalization] > LocalizationSettingsError: MSO number of neurons value should be in the range [1, 32]")
localization_settings_error = True
# LSO start frequency channel range
if ((localization_settings.lso_start_channel < 0) or (localization_settings.lso_start_channel >= settings.num_channels)):
print("[Loaders.loadAEDATLocalization] > LocalizationSettingsError: LSO start frequency channel range should be in the range [0, num_channels-1]")
localization_settings_error = True
# LSO start frequency channel and end frequency channel
if (localization_settings.lso_end_channel < localization_settings.lso_start_channel):
print("[Loaders.loadAEDATLocalization] > LocalizationSettingsError: LSO start frequency channel should be lower than LSO end frequency channel")
localization_settings_error = True
# LSO start frequency channel and end frequency channel
if ((localization_settings.lso_num_neurons_channel < 1) or (localization_settings.lso_num_neurons_channel > 32)):
print("[Loaders.loadAEDATLocalization] > LocalizationSettingsError: lSO number of neurons value should be in the range [1, 32]")
localization_settings_error = True
if(localization_settings_error):
return None
with open(path, 'rb') as f:
## Check header ##
p = 0
lt = f.readline()
while lt and lt[0] == ord("#"):
p += len(lt)
lt = f.readline()
f.seek(p)
f.seek(0, 2)
eof = f.tell()
num_events = math.floor((eof-p)/(settings.address_size + 4))
f.seek(p)
events_nas = []
timestamps_nas = []
neuron_ids_mso = []
channels_mso = []
timestamps_mso = []
neuron_ids_lso = []
channels_lso = []
timestamps_lso = []
## Read file ##
i = 0
try:
while 1:
# Read a word and unpack the event data
buff = f.read(settings.address_size)
ev = struct.unpack(unpack_param, buff)[0]
# Read a word and unpack the event timestamp
buff = f.read(4)
ts = struct.unpack('>L', buff)[0]
# Check if the event is a NAS event of SOC event
auditory_model = (ev & 0x8000) >> 15
if auditory_model == 0:
# NAS event
events_nas.append(ev)
timestamps_nas.append(ts)
elif auditory_model == 1:
# Localization event
# Apply a mask to obtain the correct values
neuron_id = (ev & 0x3E00) >> 9
freq_channel = (ev & 0x00FE) >> 1
xso_type = (ev & 0x4000) >> 14
if xso_type == 0:
# MSO event
neuron_ids_mso.append(neuron_id)
channels_mso.append(freq_channel)
timestamps_mso.append(ts)
elif xso_type == 1:
# LSO event
neuron_ids_lso.append(neuron_id)
channels_lso.append(freq_channel)
timestamps_lso.append(ts)
else:
# Other case
print("[Loaders.loadAEDATLocalization] > DataError: MSO/LSO type not recognized!")
else:
# Other case
print("[Loaders.loadAEDATLocalization] > DataError: Auditory model not recognized!")
i += 1
except Exception as inst:
pass
spikes_file = SpikesFile([], [])
spikes_file.addresses = events_nas
spikes_file.timestamps = timestamps_nas
localization_file = LocalizationFile([], [], [], [], [], [])
localization_file.mso_neuron_ids = neuron_ids_mso
localization_file.mso_channels = channels_mso
localization_file.mso_timestamps = timestamps_mso
localization_file.lso_neuron_ids = neuron_ids_lso
localization_file.lso_channels = channels_lso
localization_file.lso_timestamps = timestamps_lso
return spikes_file, localization_file
[docs] @staticmethod
def loadCSV(path, delimiter=','):
"""
Loads a Comma-Separated Values (.csv) file.
Parameters:
path (string): Full path of the CSV file to be loaded, including name and extension.
delimiter (char): Delimiter to use in the CSV file.
Returns:
SpikesFile: SpikesFile containing all the addresses and timestamps of the file.
Note:
The CSV file should contain one line per event, and the information in each line should be: address, timestamp
"""
addresses = []
timestamps = []
with open(path) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=delimiter)
for row in csv_reader:
addresses.append(int(row[0]))
timestamps.append(int(row[1]))
spikes_file = SpikesFile([], [])
spikes_file.addresses = addresses
spikes_file.timestamps = timestamps
return spikes_file
[docs] @staticmethod
def loadCSVLocalization(path, delimiter=','):
"""
Loads a Comma-Separated Values (.csv) file which contains events from both the NAS model and the SOC model (sound source localization).
Parameters:
path (string): Full path of the CSV file to be loaded, including name and extension.
delimiter (char): Delimiter to use in the CSV file.
Returns:
SpikesFile: SpikesFile containing all the addresses and timestamps of the file.
LocalizationFile: LocalizationFile containing all the events from both the MSO and LSO models of the file.
Note:
The CSV file should contain one line per event, and the information in each line should be: address, timestamp
The CSV format should be: address, timestamp, auditory_model, xso_type, neuron_id.
"""
addresses_nas = []
timestamps_nas = []
neuron_ids_mso = []
channels_mso = []
timestamps_mso = []
neuron_ids_lso = []
channels_lso = []
timestamps_lso = []
with open(path) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=delimiter)
for row in csv_reader:
try:
auditory_model = int(row[2])
except Exception:
auditory_model = 0
pass
address = int(row[0])
timestamp = row[1]
# Check if the timestamp contains any time reference (from simulation)
timeref = "ps"
if timeref in timestamp:
# Remove string "ps" and convert the timestamps from picoseconds to microseconds
timestamp = timestamp.replace(" ps", "")
timestamp = int(timestamp)
timestamp = timestamp * 1.0e-6
if auditory_model == 0:
# NAS event
addresses_nas.append(address)
timestamps_nas.append(int(timestamp))
elif auditory_model == 1:
# Localization event
xso_type = int(row[3])
neuron_id = int(row[4])
freq_channel = address #>> 1
if xso_type == 0:
# MSO event
neuron_ids_mso.append(neuron_id)
channels_mso.append(freq_channel)
timestamps_mso.append(timestamp)
elif xso_type == 1:
# LSO event
neuron_ids_lso.append(neuron_id)
channels_lso.append(freq_channel)
timestamps_lso.append(timestamp)
else:
# Other case
print("[Loaders.loadCSVLocalization] > DataError: MSO/LSO type not recognized!")
else:
# Other case
print("[Loaders.loadCSVLocalization] > DataError: Auditory model not recognized!")
spikes_file = SpikesFile([], [])
spikes_file.addresses = addresses_nas
spikes_file.timestamps = timestamps_nas
localization_file = LocalizationFile([], [], [], [], [], [])
localization_file.mso_neuron_ids = neuron_ids_mso
localization_file.mso_channels = channels_mso
localization_file.mso_timestamps = timestamps_mso
localization_file.lso_neuron_ids = neuron_ids_lso
localization_file.lso_channels = channels_lso
localization_file.lso_timestamps = timestamps_lso
return spikes_file, localization_file
[docs] @staticmethod
def loadZynqGrabberData(path, settings, localization_settings):
"""
Loads a text (.txt) file with EAR events collected by the zynqGrabber.
Parameters:
path (string): Full path of the CSV file to be loaded, including name and extension.
settings (MainSettings): Configuration parameters for the file to load.
localization_settings (LocalizationSettings): Configuration parameters of the localization module for the file to load.
Returns:
spikes_file: SpikesFile containing all the addresses and timestamps of the file.
localization_file: LocalizationFile containing all the events from both the MSO and LSO models of the file.
"""
addresses = []
timestamps = []
neuron_ids_mso = []
channels_mso = []
timestamps_mso = []
neuron_ids_lso = []
channels_lso = []
timestamps_lso = []
txt_file = open(path, 'r')
txt_lines = txt_file.readlines()
count = 0
for line in txt_lines:
event = line.strip().split(',')
decoded_events_timestamps = float(event[0])
decoded_events_auditory_models = int(event[1]) # 0 if the event comes from the NAS, 1 for the SOC model
decoded_events_channels = int(event[2]) # 0 left, 1 right
decoded_events_xso_types = int(event[3]) # 0 for MSO, 1 for LSO
decoded_events_neuron_ids = int(event[4]) # Between 0 and 15
decoded_events_freq_ch_addrs = int(event[5]) # Between 0 and 32
decoded_events_polarities = int(event[6]) # 0 pos, 1 neg
# It could be either NAS (auditory_models = 0) or SOC events (auditory_models = 1)
if decoded_events_auditory_models == 0:
# NAS event
timestamps.append(int(decoded_events_timestamps))
addresses.append(int(decoded_events_freq_ch_addrs*(1+settings.on_off_both) + decoded_events_polarities + settings.num_channels*decoded_events_channels*(1+settings.on_off_both)))
elif decoded_events_auditory_models == 1:
# It could be either MSO (xso_type = 0) or LSO events (xso_type = 1)
if decoded_events_xso_types == 0:
# MSO event
timestamps_mso.append(int(decoded_events_timestamps))
channels_mso.append(int(decoded_events_freq_ch_addrs))
neuron_ids_mso.append(int(decoded_events_neuron_ids))
elif decoded_events_xso_types == 1:
# LSO event
timestamps_lso.append(int(decoded_events_timestamps))
channels_lso.append(int(decoded_events_freq_ch_addrs))
neuron_ids_lso.append(int(decoded_events_neuron_ids))
else:
# Other case
print("[Loaders.loadZynqGrabberData] > DataError: MSO/LSO type not recognized!")
else:
# Other case
print("[Loaders.loadZynqGrabberData] > DataError: Auditory model not recognized!")
spikes_file = SpikesFile([], [])
spikes_file.addresses = addresses
spikes_file.timestamps = timestamps
localization_file = LocalizationFile([], [], [], [], [], [])
localization_file.mso_neuron_ids = neuron_ids_mso
localization_file.mso_channels = channels_mso
localization_file.mso_timestamps = timestamps_mso
localization_file.lso_neuron_ids = neuron_ids_lso
localization_file.lso_channels = channels_lso
localization_file.lso_timestamps = timestamps_lso
return spikes_file, localization_file