diff --git a/neo/io/nixio.py b/neo/io/nixio.py index 57c017140..42f1a6db5 100644 --- a/neo/io/nixio.py +++ b/neo/io/nixio.py @@ -23,7 +23,6 @@ from datetime import date, time, datetime from collections.abc import Iterable from collections import OrderedDict -import itertools from uuid import uuid4 import warnings from distutils.version import LooseVersion as Version @@ -39,6 +38,8 @@ from ..io.proxyobjects import BaseProxy from ..version import version as neover +neover = Version(neover) + try: import nixio as nix @@ -202,8 +203,8 @@ def __init__(self, filename, mode="rw"): else: # new file filemd = self.nix_file.create_section("neo", "neo.metadata") - filemd["version"] = neover - self._file_version = neover + filemd["version"] = str(neover) + self._file_version = str(neover) self._block_read_counter = 0 @@ -318,15 +319,25 @@ def _nix_to_neo_block(self, nix_block): parent.groups.append(newgrp) # find free floating (Groupless) signals and spiketrains - blockdas = self._group_signals(nix_block.data_arrays) - for name, das in blockdas.items(): - if name not in self._neo_map: - if das[0].type == "neo.analogsignal": - self._nix_to_neo_analogsignal(das) - elif das[0].type == "neo.irregularlysampledsignal": - self._nix_to_neo_irregularlysampledsignal(das) - elif das[0].type == "neo.imagesequence": - self._nix_to_neo_imagesequence(das) + if self._file_version < Version('0.11.0dev0'): + blockdas = self._group_signals(nix_block.data_arrays) + for name, das in blockdas.items(): + if name not in self._neo_map: + if das[0].type == "neo.analogsignal": + self._nix_to_neo_analogsignal(das) + elif das[0].type == "neo.irregularlysampledsignal": + self._nix_to_neo_irregularlysampledsignal(das) + elif das[0].type == "neo.imagesequence": + self._nix_to_neo_imagesequence(das) + else: + for da in nix_block.data_arrays: + if da.name not in self._neo_map: + if da.type == "neo.analogsignal": + self._nix_to_neo_analogsignal(da) + elif da.type == "neo.irregularlysampledsignal": + self._nix_to_neo_irregularlysampledsignal(da) + elif da.type == "neo.imagesequence": + self._nix_to_neo_imagesequence(da) for mt in nix_block.multi_tags: if mt.type == "neo.spiketrain" and mt.name not in self._neo_map: self._nix_to_neo_spiketrain(mt) @@ -356,24 +367,44 @@ def _nix_to_neo_segment(self, nix_group): "neo.irregularlysampledsignal", "neo.imagesequence",), nix_group.data_arrays)) - dataarrays = self._group_signals(dataarrays) - # descend into DataArrays - for name, das in dataarrays.items(): - if das[0].type == "neo.analogsignal": - newasig = self._nix_to_neo_analogsignal(das) - neo_segment.analogsignals.append(newasig) - # parent reference - newasig.segment = neo_segment - elif das[0].type == "neo.irregularlysampledsignal": - newisig = self._nix_to_neo_irregularlysampledsignal(das) - neo_segment.irregularlysampledsignals.append(newisig) - # parent reference - newisig.segment = neo_segment - elif das[0].type == "neo.imagesequence": - new_imgseq = self._nix_to_neo_imagesequence(das) - neo_segment.imagesequences.append(new_imgseq) - # parent reference - new_imgseq.segment = neo_segment + + if self._file_version < Version('0.11.0dev0'): + dataarrays = self._group_signals(dataarrays) + # descend into DataArrays + for name, das in dataarrays.items(): + if das[0].type == "neo.analogsignal": + newasig = self._nix_to_neo_analogsignal(das) + neo_segment.analogsignals.append(newasig) + # parent reference + newasig.segment = neo_segment + elif das[0].type == "neo.irregularlysampledsignal": + newisig = self._nix_to_neo_irregularlysampledsignal(das) + neo_segment.irregularlysampledsignals.append(newisig) + # parent reference + newisig.segment = neo_segment + elif das[0].type == "neo.imagesequence": + new_imgseq = self._nix_to_neo_imagesequence(das) + neo_segment.imagesequences.append(new_imgseq) + # parent reference + new_imgseq.segment = neo_segment + else: + # descend into DataArrays + for da in dataarrays: + if da.type == "neo.analogsignal": + newasig = self._nix_to_neo_analogsignal(da) + neo_segment.analogsignals.append(newasig) + # parent reference + newasig.segment = neo_segment + elif da.type == "neo.irregularlysampledsignal": + newisig = self._nix_to_neo_irregularlysampledsignal(da) + neo_segment.irregularlysampledsignals.append(newisig) + # parent reference + newisig.segment = neo_segment + elif da.type == "neo.imagesequence": + new_imgseq = self._nix_to_neo_imagesequence(da) + neo_segment.imagesequences.append(new_imgseq) + # parent reference + new_imgseq.segment = neo_segment # descend into MultiTags for mtag in nix_group.multi_tags: @@ -405,24 +436,36 @@ def _nix_to_neo_group(self, nix_group): "neo.irregularlysampledsignal", "neo.imagesequence",), nix_group.data_arrays)) - dataarrays = self._group_signals(dataarrays) - # descend into DataArrays - for name in dataarrays: - obj = self._neo_map[name] - neo_group.add(obj) + + if self._file_version < Version('0.11.0dev0'): + dataarrays = self._group_signals(dataarrays) + # descend into DataArrays + for name in dataarrays: + obj = self._neo_map[name] + neo_group.add(obj) + else: + # descend into DataArrays + for da in dataarrays: + obj = self._neo_map[da.name] + neo_group.add(obj) # descend into MultiTags for mtag in nix_group.multi_tags: if mtag.type == "neo.channelview" and mtag.name not in self._neo_map: self._nix_to_neo_channelview(mtag) obj = self._neo_map[mtag.name] neo_group.add(obj) + # TODO: descend into groups? return neo_group, parent_name def _nix_to_neo_channelview(self, nix_mtag): neo_attrs = self._nix_attr_to_neo(nix_mtag) index = nix_mtag.positions - nix_name, = self._group_signals(nix_mtag.references).keys() + if self._file_version < Version('0.11.0dev0'): + nix_name, = self._group_signals(nix_mtag.references).keys() + else: + assert len(nix_mtag.references) == 1 + nix_name = nix_mtag.references[0].name obj = self._neo_map[nix_name] neo_chview = ChannelView(obj, index, **neo_attrs) self._neo_map[nix_mtag.name] = neo_chview @@ -437,15 +480,25 @@ def _nix_to_neo_analogsignal(self, nix_da_group): :param nix_da_group: a list of NIX DataArray objects :return: a Neo AnalogSignal object """ - neo_attrs = self._nix_attr_to_neo(nix_da_group[0]) - metadata = nix_da_group[0].metadata - neo_attrs["nix_name"] = metadata.name # use the common base name - unit = nix_da_group[0].unit - signaldata = np.array([d[:] for d in nix_da_group]).transpose() + if self._file_version < Version('0.11.0dev0'): + da = nix_da_group[0] + signaldata = np.array([d[:] for d in nix_da_group]).transpose() + + else: + assert isinstance(nix_da_group, nix.data_array.DataArray) + da = nix_da_group + signaldata = np.array(da) + + neo_attrs = self._nix_attr_to_neo(da) + metadata = da.metadata + unit = da.unit + timedim = self._get_time_dimension(da) signaldata = create_quantity(signaldata, unit) - timedim = self._get_time_dimension(nix_da_group[0]) - sampling_period = create_quantity(timedim.sampling_interval, timedim.unit) + sampling_period = create_quantity(timedim.sampling_interval, + timedim.unit) + + neo_attrs['nix_name'] = metadata.name # t_start should have been added to neo_attrs via the NIX # object's metadata. This may not be present since in older # versions, we didn't store t_start in the metadata when it @@ -461,7 +514,7 @@ def _nix_to_neo_analogsignal(self, nix_da_group): t_start=t_start, **neo_attrs) self._neo_map[neo_attrs["nix_name"]] = neo_signal # all DAs reference the same sources - srcnames = list(src.name for src in nix_da_group[0].sources) + srcnames = list(src.name for src in da.sources) for n in srcnames: if n not in self._ref_map: self._ref_map[n] = list() @@ -478,29 +531,54 @@ def _nix_to_neo_imagesequence(self, nix_da_group): :return: a Neo ImageSequence object """ - neo_attrs = self._nix_attr_to_neo(nix_da_group[0]) - metadata = nix_da_group[0].metadata - neo_attrs["nix_name"] = metadata.name # use the common base name - unit = nix_da_group[0].unit - imgseq = np.array([d[:] for d in nix_da_group]).transpose() - - sampling_rate = neo_attrs["sampling_rate"] - del neo_attrs["sampling_rate"] - spatial_scale = neo_attrs["spatial_scale"] - del neo_attrs["spatial_scale"] - if "t_start" in neo_attrs: - t_start = neo_attrs["t_start"] - del neo_attrs["t_start"] + if self._file_version < Version('0.11.0dev0'): + da = nix_da_group[0] + imgseq = np.array([d[:] for d in nix_da_group]).transpose() else: - t_start = 0.0 * pq.ms + da = nix_da_group + imgseq = np.array(da) - neo_seq = ImageSequence(image_data=imgseq, sampling_rate=sampling_rate, + neo_attrs = self._nix_attr_to_neo(da) + metadata = da.metadata + neo_attrs["nix_name"] = metadata.name # use the common base name + unit = da.unit + + if self._file_version < Version('0.11.0dev0'): + neo_attrs = self._nix_attr_to_neo(nix_da_group[0]) + metadata = nix_da_group[0].metadata + neo_attrs["nix_name"] = metadata.name # use the common base name + unit = nix_da_group[0].unit + imgseq = np.array([d[:] for d in nix_da_group]).transpose() + + sampling_period = 1 / neo_attrs["sampling_rate"] + del neo_attrs["sampling_rate"] + spatial_scale = neo_attrs["spatial_scale"] + del neo_attrs["spatial_scale"] + if "t_start" in neo_attrs: + t_start = neo_attrs["t_start"] + del neo_attrs["t_start"] + else: + t_start = 0.0 * pq.ms + else: + sampling_period = da.dimensions[0].sampling_interval + sampling_unit = da.dimensions[0].unit + sampling_period = create_quantity(sampling_period, sampling_unit) + spatial_scale = da.dimensions[1].sampling_interval + assert spatial_scale == da.dimensions[2].sampling_interval + spatial_unit = da.dimensions[1].unit + assert spatial_unit == da.dimensions[2].unit + spatial_scale = create_quantity(spatial_scale, spatial_unit) + t_start = da.dimensions[0].offset + t_start_unit = da.dimensions[0].unit + t_start = create_quantity(t_start, t_start_unit) + + neo_seq = ImageSequence(image_data=imgseq, frame_duration=sampling_period, spatial_scale=spatial_scale, units=unit, t_start=t_start, **neo_attrs) self._neo_map[neo_attrs["nix_name"]] = neo_seq # all DAs reference the same sources - srcnames = list(src.name for src in nix_da_group[0].sources) + srcnames = list(src.name for src in da.sources) for n in srcnames: if n not in self._ref_map: self._ref_map[n] = list() @@ -513,23 +591,30 @@ def _nix_to_neo_irregularlysampledsignal(self, nix_da_group): This method expects a list of data arrays that all represent the same, multidimensional Neo IrregularlySampledSignal object. - :param nix_da_group: a list of NIX DataArray objects + :param nix_da_group: a NIX DataArray object :return: a Neo IrregularlySampledSignal object """ - neo_attrs = self._nix_attr_to_neo(nix_da_group[0]) - metadata = nix_da_group[0].metadata + + if self._file_version < Version('0.11.0dev0'): + da = nix_da_group[0] + signaldata = np.array([d[:] for d in nix_da_group]) + else: + da = nix_da_group + signaldata = np.array(da) + + neo_attrs = self._nix_attr_to_neo(da) + metadata = da.metadata neo_attrs["nix_name"] = metadata.name # use the common base name - unit = nix_da_group[0].unit - signaldata = np.array([d[:] for d in nix_da_group]).transpose() + unit = da.unit signaldata = create_quantity(signaldata, unit) - timedim = self._get_time_dimension(nix_da_group[0]) + timedim = self._get_time_dimension(da) times = create_quantity(timedim.ticks, timedim.unit) neo_signal = IrregularlySampledSignal(signal=signaldata, times=times, **neo_attrs) self._neo_map[neo_attrs["nix_name"]] = neo_signal # all DAs reference the same sources - srcnames = list(src.name for src in nix_da_group[0].sources) + srcnames = list(src.name for src in da.sources) for n in srcnames: if n not in self._ref_map: self._ref_map[n] = list() @@ -694,7 +779,7 @@ def _write_channelview(self, chview, nixblock, nixgroup): # but for a first pass this simplifies my mental model raise Exception("Need to save signals before saving views") nix_name = chview.obj.annotations["nix_name"] - nixmt.references.extend(self._signal_map[nix_name]) + nixmt.references.append(self._signal_map[nix_name]) else: nixmt = self._view_map[nix_name] @@ -737,6 +822,8 @@ def _write_segment(self, segment, nixblock): self._write_analogsignal(asig, nixblock, nixgroup) for isig in segment.irregularlysampledsignals: self._write_irregularlysampledsignal(isig, nixblock, nixgroup) + for imagesequence in segment.imagesequences: + self._write_imagesequence(imagesequence, nixblock, nixgroup) for event in segment.events: self._write_event(event, nixblock, nixgroup) for epoch in segment.epochs: @@ -744,9 +831,6 @@ def _write_segment(self, segment, nixblock): for spiketrain in segment.spiketrains: self._write_spiketrain(spiketrain, nixblock, nixgroup) - for imagesequence in segment.imagesequences: - self._write_imagesequence(imagesequence, nixblock, nixgroup) - def _write_group(self, neo_group, nixblock, parent=None): """ Convert the provided Neo Group to a NIX Group and write it to the @@ -798,8 +882,7 @@ def _write_group(self, neo_group, nixblock, parent=None): raise Exception("Orphan signals/image sequences cannot be stored, needs to belong to a Segment") objnames.append(obj.annotations["nix_name"]) for name in objnames: - for da in self._signal_map[name]: - nixgroup.data_arrays.append(da) + nixgroup.data_arrays.append(self._signal_map[name]) # link events, epochs and spiketrains objnames = [] @@ -812,7 +895,8 @@ def _write_group(self, neo_group, nixblock, parent=None): and obj.annotations["nix_name"] in nixblock.multi_tags): # the following restriction could be relaxed later # but for a first pass this simplifies my mental model - raise Exception("Orphan epochs/events/spiketrains cannot be stored, needs to belong to a Segment") + raise Exception("Orphan epochs/events/spiketrains cannot be " + "stored, needs to belong to a Segment") objnames.append(obj.annotations["nix_name"]) for name in objnames: mt = nixblock.multi_tags[name] @@ -828,10 +912,8 @@ def _write_group(self, neo_group, nixblock, parent=None): def _write_analogsignal(self, anasig, nixblock, nixgroup): """ - Convert the provided ``anasig`` (AnalogSignal) to a list of NIX - DataArray objects and write them to the NIX file. All DataArray objects - created from the same AnalogSignal have their metadata section point to - the same object. + Convert the provided ``anasig`` (AnalogSignal) to a NIX + DataArray object and write it to the NIX file. :param anasig: The Neo AnalogSignal to be written :param nixblock: NIX Block where the DataArrays will be created @@ -843,46 +925,50 @@ def _write_analogsignal(self, anasig, nixblock, nixgroup): nix_name = f"neo.analogsignal.{self._generate_nix_name()}" anasig.annotate(nix_name=nix_name) - if f"{nix_name}.0" in nixblock.data_arrays and nixgroup: + if nix_name in nixblock.data_arrays and nixgroup: # AnalogSignal is in multiple Segments. - # Append DataArrays to Group and return. - dalist = list() - for idx in itertools.count(): - daname = f"{nix_name}.{idx}" - if daname in nixblock.data_arrays: - dalist.append(nixblock.data_arrays[daname]) - else: - break - nixgroup.data_arrays.extend(dalist) + # Append DataArray to Group. + nixgroup.data_arrays.append(nixblock.data_arrays[nix_name]) return if isinstance(anasig, BaseProxy): - data = np.transpose(anasig.load()[:].magnitude) + data = anasig.load()[:].magnitude else: - data = np.transpose(anasig[:].magnitude) + data = anasig[:].magnitude parentmd = nixgroup.metadata if nixgroup else nixblock.metadata metadata = parentmd.create_section(nix_name, "neo.analogsignal.metadata") - nixdas = list() - for idx, row in enumerate(data): - daname = f"{nix_name}.{idx}" - da = nixblock.create_data_array(daname, "neo.analogsignal", data=row) - da.metadata = metadata - da.definition = anasig.description - da.unit = units_to_string(anasig.units) - - sampling_period = anasig.sampling_period.magnitude.item() - timedim = da.append_sampled_dimension(sampling_period) - timedim.unit = units_to_string(anasig.sampling_period.units) - tstart = anasig.t_start - metadata["t_start"] = tstart.magnitude.item() - metadata.props["t_start"].unit = units_to_string(tstart.units) - timedim.offset = tstart.rescale(timedim.unit).magnitude.item() - timedim.label = "time" - - nixdas.append(da) - if nixgroup: - nixgroup.data_arrays.append(da) + + # create nix data array + da = nixblock.create_data_array(nix_name, "neo.analogsignal", data=data) + da.metadata = metadata + da.definition = anasig.description + da.unit = units_to_string(anasig.units) + + # store metadata + n_dims = len(anasig.shape) + sampling_period = anasig.sampling_period.magnitude.item() + timedim = da.append_sampled_dimension(sampling_period) + timedim.unit = units_to_string(anasig.sampling_period.units) + tstart = anasig.t_start + timedim.offset = tstart.rescale(timedim.unit).magnitude.item() + timedim.label = "time" + + # add additional dimension information + # dimensions (time, , channel) + for arb_dim in range(1, n_dims - 2): + dim_idxs = range(da.shape[arb_dim]) + da.append_set_dimension(dim_idxs) + if n_dims > 1: + channel_ids = anasig.array_annotations.get('channel_ids', + np.arange(anasig.shape[-1])) + + ch_ids = da.append_set_dimension(list(channel_ids.astype('str'))) + # SetDimension does not yet support `label` + # ch_ids.label = "channel" + + if nixgroup: + nixgroup.data_arrays.append(da) neoname = anasig.name if anasig.name is not None else "" metadata["neo_name"] = neoname @@ -894,14 +980,12 @@ def _write_analogsignal(self, anasig, nixblock, nixgroup): p = self._write_property(metadata, k, v) p.type = ARRAYANNOTATION - self._signal_map[nix_name] = nixdas + self._signal_map[nix_name] = da def _write_imagesequence(self, imgseq, nixblock, nixgroup): """ - Convert the provided ``imgseq`` (ImageSequence) to a list of NIX - DataArray objects and write them to the NIX file. All DataArray objects - created from the same ImageSequence have their metadata section point to - the same object. + Convert the provided ``imgseq`` (ImageSequence) to a NIX + DataArray object and write it to the NIX file. :param anasig: The Neo ImageSequence to be written :param nixblock: NIX Block where the DataArrays will be created @@ -914,62 +998,55 @@ def _write_imagesequence(self, imgseq, nixblock, nixgroup): nix_name = f"neo.imagesequence.{self._generate_nix_name()}" imgseq.annotate(nix_name=nix_name) - if f"{nix_name}.0" in nixblock.data_arrays and nixgroup: - - dalist = list() - for idx in itertools.count(): - daname = f"{nix_name}.{idx}" - if daname in nixblock.data_arrays: - dalist.append(nixblock.data_arrays[daname]) - else: - break - nixgroup.data_arrays.extend(dalist) + if nix_name in nixblock.data_arrays and nixgroup: + # ImageSequence is in multiple Segments. + # Append DataArray to Group. + nixgroup.data_arrays.append(nixblock.data_arrays[nix_name]) return if isinstance(imgseq, BaseProxy): - data = np.transpose(imgseq.load()[:].magnitude) + data = imgseq.load()[:].magnitude else: - data = np.transpose(imgseq[:].magnitude) + data = imgseq[:].magnitude parentmd = nixgroup.metadata if nixgroup else nixblock.metadata metadata = parentmd.create_section(nix_name, "neo.imagesequence.metadata") - nixdas = list() - for idx, row in enumerate(data): - daname = f"{nix_name}.{idx}" - da = nixblock.create_data_array(daname, "neo.imagesequence", data=row) - - da.metadata = metadata - da.definition = imgseq.description - da.unit = units_to_string(imgseq.units) - - metadata["sampling_rate"] = imgseq.sampling_rate.magnitude.item() - units = imgseq.sampling_rate.units - metadata.props["sampling_rate"].unit = units_to_string(units) - metadata["spatial_scale"] = imgseq.spatial_scale.magnitude.item() - units = imgseq.spatial_scale.units - metadata.props["spatial_scale"].unit = units_to_string(units) - metadata["t_start"] = imgseq.t_start.magnitude.item() - units = imgseq.t_start.units - metadata.props["t_start"].unit = units_to_string(units) - - nixdas.append(da) - if nixgroup: - nixgroup.data_arrays.append(da) + da = nixblock.create_data_array(nix_name, "neo.imagesequence", data=data) + + da.metadata = metadata + da.definition = imgseq.description + da.unit = units_to_string(imgseq.units) + + # store dimension metadata + n_dims = len(imgseq.shape) + assert n_dims == 3 + sampling_period = imgseq.frame_duration + timedim = da.append_sampled_dimension(sampling_period.magnitude.item()) + timedim.unit = units_to_string(sampling_period.units) + tstart = imgseq.t_start + timedim.offset = tstart.rescale(timedim.unit).magnitude.item() + timedim.label = "time" + + sp_scale_mag = imgseq.spatial_scale.magnitude.item() + sp_scale_units = units_to_string(imgseq.spatial_scale) + da.append_sampled_dimension(sp_scale_mag, 'row', sp_scale_units, 0) + da.append_sampled_dimension(sp_scale_mag, 'col', sp_scale_units, 0) + + if nixgroup: + nixgroup.data_arrays.append(da) neoname = imgseq.name if imgseq.name is not None else "" metadata["neo_name"] = neoname if imgseq.annotations: for k, v in imgseq.annotations.items(): self._write_property(metadata, k, v) - self._signal_map[nix_name] = nixdas + self._signal_map[nix_name] = da def _write_irregularlysampledsignal(self, irsig, nixblock, nixgroup): """ - Convert the provided ``irsig`` (IrregularlySampledSignal) to a list of - NIX DataArray objects and write them to the NIX file at the location. - All DataArray objects created from the same IrregularlySampledSignal - have their metadata section point to the same object. + Convert the provided ``irsig`` (IrregularlySampledSignal) to a + NIX DataArray object and write it to the NIX file. :param irsig: The Neo IrregularlySampledSignal to be written :param nixblock: NIX Block where the DataArrays will be created @@ -981,41 +1058,46 @@ def _write_irregularlysampledsignal(self, irsig, nixblock, nixgroup): nix_name = f"neo.irregularlysampledsignal.{self._generate_nix_name()}" irsig.annotate(nix_name=nix_name) - if f"{nix_name}.0" in nixblock.data_arrays and nixgroup: + if nix_name in nixblock.data_arrays and nixgroup: # IrregularlySampledSignal is in multiple Segments. - # Append DataArrays to Group and return. - dalist = list() - for idx in itertools.count(): - daname = f"{nix_name}.{idx}" - if daname in nixblock.data_arrays: - dalist.append(nixblock.data_arrays[daname]) - else: - break - nixgroup.data_arrays.extend(dalist) + # Append DataArray to Group. + nixgroup.data_arrays.append(nixblock.data_arrays[nix_name]) return if isinstance(irsig, BaseProxy): - data = np.transpose(irsig.load()[:].magnitude) + data = irsig.load()[:].magnitude else: - data = np.transpose(irsig[:].magnitude) + data = irsig[:].magnitude parentmd = nixgroup.metadata if nixgroup else nixblock.metadata metadata = parentmd.create_section(nix_name, "neo.irregularlysampledsignal.metadata") - nixdas = list() - for idx, row in enumerate(data): - daname = f"{nix_name}.{idx}" - da = nixblock.create_data_array(daname, "neo.irregularlysampledsignal", data=row) - da.metadata = metadata - da.definition = irsig.description - da.unit = units_to_string(irsig.units) - - timedim = da.append_range_dimension(irsig.times.magnitude) - timedim.unit = units_to_string(irsig.times.units) - timedim.label = "time" - - nixdas.append(da) - if nixgroup: - nixgroup.data_arrays.append(da) + + da = nixblock.create_data_array(nix_name, "neo.irregularlysampledsignal", data=data) + da.metadata = metadata + da.definition = irsig.description + da.unit = units_to_string(irsig.units) + + timedim = da.append_range_dimension(irsig.times.magnitude) + timedim.unit = units_to_string(irsig.times.units) + timedim.label = "time" + + n_dims = len(da.shape) + # add additional dimension information + # dimensions (time, , channel) + for arb_dim in range(1, n_dims - 2): + dim_idxs = range(da.shape[arb_dim]) + da.append_set_dimension(dim_idxs) + if n_dims > 1: + channel_ids = irsig.array_annotations.get('channel_ids', + range(irsig.shape[-1])) + channel_ids = np.asarray(channel_ids, dtype='str') + + ch_ids = da.append_set_dimension(list(channel_ids)) + # SetDimension currently does not support `label` + # ch_ids.label = "channel" + + if nixgroup: + nixgroup.data_arrays.append(da) neoname = irsig.name if irsig.name is not None else "" metadata["neo_name"] = neoname @@ -1027,7 +1109,7 @@ def _write_irregularlysampledsignal(self, irsig, nixblock, nixgroup): p = self._write_property(metadata, k, v) p.type = ARRAYANNOTATION - self._signal_map[nix_name] = nixdas + self._signal_map[nix_name] = da def _write_event(self, event, nixblock, nixgroup): """ @@ -1079,9 +1161,11 @@ def _write_event(self, event, nixblock, nixgroup): nixgroup.multi_tags.append(nixmt) - # reference all AnalogSignals and IrregularlySampledSignals in Group + # reference all AnalogSignals and IrregularlySampledSignals and + # ImageSequencesin Group for da in nixgroup.data_arrays: - if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal"): + if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal", + "neo.imagesequence"): nixmt.references.append(da) def _write_epoch(self, epoch, nixblock, nixgroup): @@ -1142,7 +1226,8 @@ def _write_epoch(self, epoch, nixblock, nixgroup): # reference all AnalogSignals and IrregularlySampledSignals in Group for da in nixgroup.data_arrays: - if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal"): + if da.type in ("neo.analogsignal", "neo.irregularlysampledsignal", + "neo.imagesequence"): nixmt.references.append(da) def _write_spiketrain(self, spiketrain, nixblock, nixgroup): @@ -1333,6 +1418,7 @@ def _nix_attr_to_neo(nix_obj): return neo_attrs + # TODO: This is only used for old (< Version('0.11.0dev0')) files @staticmethod def _group_signals(dataarrays): """ @@ -1347,7 +1433,10 @@ def _group_signals(dataarrays): # now start grouping groups = OrderedDict() for da in dataarrays: - basename = ".".join(da.name.split(".")[:-1]) + if '.' in da.name: + basename = ".".join(da.name.split(".")[:-1]) + else: + basename = da.name if basename not in groups: groups[basename] = list() groups[basename].append(da) diff --git a/neo/rawio/nixrawio.py b/neo/rawio/nixrawio.py index 39ddde98c..9abe1d628 100644 --- a/neo/rawio/nixrawio.py +++ b/neo/rawio/nixrawio.py @@ -7,8 +7,7 @@ Author: Chek Yin Choi, Julia Sprenger """ -import os.path - +from distutils.version import LooseVersion as Version import numpy as np from .baserawio import (BaseRawIO, _signal_channel_dtype, _signal_stream_dtype, @@ -51,69 +50,230 @@ def _source_name(self): def _parse_header(self): self.file = nix.File.open(self.filename, nix.FileMode.ReadOnly) + if 'version' in self.file.sections['neo']: + self._file_version = Version(self.file.sections['neo']['version']) + else: + self._file_version = Version('0.5.2') # default if unknown + signal_channels = [] - anasig_ids = {0: []} # ids of analogsignals by segment - stream_ids = [] + self.neo_struct = {'blocks': []} + bl_idx = 0 for bl in self.file.blocks: + seg_dict = {'segments': []} + self.neo_struct['blocks'].append(seg_dict) + seg_idx = 0 for seg in bl.groups: + if seg.type != 'neo.segment': + continue + signal_dict = {'signals': [], + 'signal_types': [], + 'signal_ids': []} + self.neo_struct['blocks'][bl_idx]['segments'].append(signal_dict) + + # assume consistent stream / signal order across segments for da_idx, da in enumerate(seg.data_arrays): - if da.type == "neo.analogsignal": - chan_id = da_idx - ch_name = da.metadata['neo_name'] - units = str(da.unit) - dtype = str(da.dtype) - sr = 1 / da.dimensions[0].sampling_interval - anasig_id = da.name.split('.')[-2] - if anasig_id not in anasig_ids[0]: - anasig_ids[0].append(anasig_id) - stream_id = anasig_ids[0].index(anasig_id) - if stream_id not in stream_ids: - stream_ids.append(stream_id) - gain = 1 - offset = 0. - signal_channels.append((ch_name, chan_id, sr, dtype, - units, gain, offset, stream_id)) - # only read structure of first segment and assume the same - # across segments - break - break + # todo: This should also cover irreg & imagseq signals once supported by rawio + if da.type in ["neo.analogsignal"]: + if self._file_version < Version('0.11.0dev0'): + anasig_id = da.name.split('.')[-2] + else: + anasig_id = da.name + + # start a new signal if analogsignal id is new or changed + # This can be simplified when dropping support for old mapping + # no object exists yet -> create new object + if len(signal_dict['signals']) == 0: + signal_idx = 0 + signal_dict['signals'].append({'data': [da]}) + signal_dict['signal_types'].append(da.type) + signal_dict['signal_ids'].append(anasig_id) + # object is different -> create new object + elif anasig_id != signal_dict['signal_ids'][signal_idx]: + signal_idx += 1 + signal_dict['signals'].append({'data': [da]}) + signal_dict['signal_types'].append(da.type) + signal_dict['signal_ids'].append(anasig_id) + # object already exists (old nix mapping version) + else: + assert signal_dict['signal_ids'][signal_idx] == anasig_id + assert signal_dict['signal_types'][signal_idx] == da.type + signal_dict['signals'][signal_idx]['data'].append(da) + seg_idx += 1 + bl_idx += 1 + + # extract metadata from collected streams (t_start, t_stop, units, dtype, sampling_rate) + for bl_idx, bl in enumerate(self.neo_struct['blocks']): + for seg_idx, seg in enumerate(bl['segments']): + for signal_idx, signal in enumerate(seg['signals']): + signal['units'] = [] + signal['channel_names'] = [] + t_start, t_stop = np.inf, -np.inf + chan_count, sample_count = 0, None + units, dtype, sampling_rate = None, None, None + for da in signal['data']: + time_dim = da.dimensions[0] # in neo convention time is always dim 0 + t_start = min(t_start, time_dim.offset) + duration = time_dim.sampling_interval * da.shape[0] + t_stop = max(t_start, time_dim.offset + duration) + + n_chan = da.shape[-1] if len(da.shape) > 1 else 1 + chan_count += n_chan + sample_count = da.shape[0] if sample_count is None else sample_count + assert sample_count == da.shape[0] + dtype = da.dtype if dtype is None else dtype + assert dtype == da.dtype + if sampling_rate is None: + sampling_rate = 1 / da.dimensions[0].sampling_interval + assert sampling_rate == 1 / da.dimensions[0].sampling_interval + # only channel_names and units are not shared by channels + signal['channel_names'].extend([da.metadata['neo_name']] * n_chan) + signal['units'].extend([da.unit] * n_chan) + signal['t_start'] = t_start + signal['t_stop'] = t_stop + signal['channel_count'] = chan_count + signal['sample_count'] = sample_count + signal['dtype'] = dtype + signal['sampling_rate'] = sampling_rate + + # calculate t_start and t_stop on segment level + t_start, t_stop = np.inf, -np.inf + for signal_idx, signal in enumerate(seg['signals']): + t_start = min(t_start, signal['t_start']) + t_stop = max(t_stop, signal['t_stop']) + seg['t_start'] = t_start + seg['t_stop'] = t_stop + + # extract streams from collected data objects + seg0 = self.neo_struct['blocks'][0]['segments'][0] + self.streams = {'signals': [], 'stream_ids': []} + + # consistency checks of data array across blocks and segments + for bl_idx in range(1, len(self.neo_struct['blocks'])): + bl_dict = self.neo_struct['blocks'][bl_idx] + for seg_idx in range(1, len(bl_dict['segments'])): + seg = bl_dict['segments'][seg_idx] + assert len(seg0['signals']) == len(seg['signals']) + for do_idx in range(len(seg0['signals'])): + assert seg0['signals'][do_idx]['channel_count'] == \ + seg['signals'][do_idx]['channel_count'] + + for signal_idx, signal in enumerate(seg0['signals']): + # using the signal id in block 0 seg 0 to identify the whole stream across blocks + stream_id = seg0['signal_ids'][signal_idx] + self.streams['stream_ids'].append(stream_id) + self.streams['signals'].append([]) + for bl_idx in range(len(self.neo_struct['blocks'])): + bl = self.neo_struct['blocks'][bl_idx] + for seg_idx in range(len(bl['segments'])): + seg = bl['segments'][seg_idx] + do = seg['signals'][signal_idx] + do['stream_id'] = stream_id + + self.streams['signals'][signal_idx].append(do) + + # generate global signal channels for rawio + chan_id = 0 + for signals_dict in seg0['signals']: + stream_id = signals_dict['stream_id'] + dtype = signals_dict['dtype'] + sr = signals_dict['sampling_rate'] + gain = 1 + offset = 0. + for inner_ch_idx in range(signals_dict['channel_count']): + ch_name = signals_dict['channel_names'][inner_ch_idx] + units = signals_dict['units'][inner_ch_idx] + signal_channels.append((ch_name, chan_id, sr, dtype, + units, gain, offset, stream_id)) + chan_id += 1 + signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype) - signal_streams = np.zeros(len(stream_ids), dtype=_signal_stream_dtype) - signal_streams['id'] = stream_ids + + signal_streams = np.zeros(len(self.streams['stream_ids']), dtype=_signal_stream_dtype) + signal_streams['id'] = self.streams['stream_ids'] signal_streams['name'] = '' + # SPIKETRAINS + self.spiketrain_list = {'blocks': []} + for block_index, blk in enumerate(self.file.blocks): + seg_groups = [g for g in blk.groups if g.type == "neo.segment"] + d = {'segments': []} + self.spiketrain_list['blocks'].append(d) + for seg_index, seg in enumerate(seg_groups): + d = {'spiketrains': []} + self.spiketrain_list['blocks'][block_index]['segments'].append(d) + st_idx = 0 + for st in seg.multi_tags: + block = self.spiketrain_list['blocks'][block_index] + segment = block['segments'][seg_index] + if st.type == 'neo.spiketrain': + d = {'waveforms': None, + 'spiketrain_id': st.id, + 'unit_id': None, + 'data': st, + 'spike_count': len(st.positions), + 't_start': None, + 't_stop': None + } + segment['spiketrains'].append(d) + wftypestr = "neo.waveforms" + t_start = st.metadata['t_start'] + t_stop = st.metadata['t_stop'] + d['t_start'] = t_start + d['t_stop'] = t_stop + if (st.features and st.features[0].data.type == wftypestr): + waveforms = st.features[0].data + if waveforms: + d['waveforms'] = waveforms + # assume one spiketrain has one waveform + + # spiketrains of first segment are used for unit ids across segment + if (block_index, seg_index) == (0, 0): + d['unit_id'] = d['spiketrain_id'] + else: + seg0 = self.spiketrain_list['blocks'][0]['segments'][0] + d['unit_id'] = seg0['spiketrains'][st_idx]['unit_id'] + + st_idx += 1 + segment['t_start'] = min([s['t_start'] for s in segment['spiketrains']]) + segment['t_stop'] = max([s['t_stop'] for s in segment['spiketrains']]) + + # check for consistent spiketrain channels across blocks and segments + # For now assume that the order of spiketrain channels across segments is consistent + + seg0 = self.spiketrain_list['blocks'][0]['segments'][0] + # use spiketrain id in first segment as unit id across segments + for bl_idx, bl in enumerate(self.spiketrain_list['blocks']): + for seg_idx, seg in enumerate(bl['segments']): + assert len(seg['spiketrains']) == len(seg0['spiketrains']) + for st_idx, st in enumerate(seg['spiketrains']): + assert st['unit_id'] == seg0['spiketrains'][st_idx]['unit_id'] + + # create neo.rawio spike_channels spike_channels = [] - unit_name = "" - unit_id = "" - for bl in self.file.blocks: - seg_groups = [g for g in bl.groups if g.type == "neo.segment"] - - for seg in seg_groups: - for mt in seg.multi_tags: - if mt.type == "neo.spiketrain": - unit_name = mt.metadata['neo_name'] - unit_id = mt.id - wf_left_sweep = 0 - wf_units = None - wf_sampling_rate = 0 - if mt.features: - wf = mt.features[0].data - wf_units = wf.unit - dim = wf.dimensions[2] - interval = dim.sampling_interval - wf_sampling_rate = 1 / interval - if wf.metadata: - wf_left_sweep = wf.metadata["left_sweep"] - wf_gain = 1 - wf_offset = 0. - spike_channels.append( - (unit_name, unit_id, wf_units, wf_gain, - wf_offset, wf_left_sweep, wf_sampling_rate) - ) - break - break + seg0 = self.spiketrain_list['blocks'][0]['segments'][0] + for st in seg0['spiketrains']: + unit_name = st['data'].metadata['neo_name'] + unit_id = st['unit_id'] + wf_left_sweep = 0 + wf_units = None + wf_sampling_rate = 0 + if st['data'].features: + wf = st['data'].features[0].data + wf_units = wf.unit + dim = wf.dimensions[-1] # last wf dimension is time + interval = dim.sampling_interval + wf_sampling_rate = 1 / interval + if wf.metadata: + wf_left_sweep = wf.metadata["left_sweep"] + wf_gain = 1 + wf_offset = 0. + spike_channels.append( + (unit_name, unit_id, wf_units, wf_gain, + wf_offset, wf_left_sweep, wf_sampling_rate) + ) spike_channels = np.array(spike_channels, dtype=_spike_channel_dtype) + event_channels = [] event_count = 0 epoch_count = 0 @@ -137,58 +297,6 @@ def _parse_header(self): break event_channels = np.array(event_channels, dtype=_event_channel_dtype) - self.da_list = {'blocks': []} - for block_index, blk in enumerate(self.file.blocks): - seg_groups = [g for g in blk.groups if g.type == "neo.segment"] - d = {'segments': []} - self.da_list['blocks'].append(d) - for seg_index, seg in enumerate(seg_groups): - d = {'signals': []} - self.da_list['blocks'][block_index]['segments'].append(d) - size_list = [] - data_list = [] - da_name_list = [] - for da in seg.data_arrays: - if da.type == 'neo.analogsignal': - size_list.append(da.size) - data_list.append(da) - da_name_list.append(da.metadata['neo_name']) - block = self.da_list['blocks'][block_index] - segment = block['segments'][seg_index] - segment['data_size'] = size_list - segment['data'] = data_list - segment['ch_name'] = da_name_list - - self.unit_list = {'blocks': []} - for block_index, blk in enumerate(self.file.blocks): - seg_groups = [g for g in blk.groups if g.type == "neo.segment"] - d = {'segments': []} - self.unit_list['blocks'].append(d) - for seg_index, seg in enumerate(seg_groups): - d = {'spiketrains': [], - 'spiketrains_id': [], - 'spiketrains_unit': []} - self.unit_list['blocks'][block_index]['segments'].append(d) - st_idx = 0 - for st in seg.multi_tags: - d = {'waveforms': []} - block = self.unit_list['blocks'][block_index] - segment = block['segments'][seg_index] - segment['spiketrains_unit'].append(d) - if st.type == 'neo.spiketrain': - segment['spiketrains'].append(st.positions) - segment['spiketrains_id'].append(st.id) - wftypestr = "neo.waveforms" - if (st.features and st.features[0].data.type == wftypestr): - waveforms = st.features[0].data - stdict = segment['spiketrains_unit'][st_idx] - if waveforms: - stdict['waveforms'] = waveforms - else: - stdict['waveforms'] = None - # assume one spiketrain one waveform - st_idx += 1 - self.header = {} self.header['nb_block'] = len(self.file.blocks) self.header['nb_segment'] = [ @@ -222,10 +330,24 @@ def _parse_header(self): # if order is preserving, the annotations # should go to the right place, need test if mt.type == "neo.event" or mt.type == "neo.epoch": + neo_type = mt.type.replace('neo.', '') + + # only add annotations when events exist if seg_ann['events'] != []: event_ann = seg_ann['events'][ev_idx] - props = mt.metadata.inherited_properties() - event_ann.update(self._filter_properties(props, 'event')) + + # adding regular annotations + props = [p for p in mt.metadata.props + if p.type != 'ARRAYANNOTATION'] + props_dict = self._filter_properties(props, neo_type) + event_ann.update(props_dict) + + # adding array_annotations + props = [p for p in mt.metadata.props + if p.type == 'ARRAYANNOTATION'] + props_dict = self._filter_properties(props, neo_type) + event_ann['__array_annotations__'].update(props_dict) + ev_idx += 1 # adding array annotations to analogsignals @@ -237,12 +359,17 @@ def _parse_header(self): for da_idx, da in enumerate(group.data_arrays): if da.type != "neo.analogsignal": continue - anasig_id = da.name.split('.')[-2] - # skip already annotated signals as each channel already - # contains the complete set of annotations and - # array_annotations - if anasig_id in annotated_anasigs: - continue + + if self._file_version < Version('0.11.0dev0'): + anasig_id = da.name.split('.')[-2] + # skip already annotated signals as each channel already + # contains the complete set of annotations and + # array_annotations + if anasig_id in annotated_anasigs: + continue + else: + anasig_id = da.name + annotated_anasigs.append(anasig_id) # collect annotation properties @@ -261,80 +388,63 @@ def _parse_header(self): stream_id += 1 def _segment_t_start(self, block_index, seg_index): - t_start = 0 - for mt in self.file.blocks[block_index].groups[seg_index].multi_tags: - if mt.type == "neo.spiketrain": - t_start = mt.metadata['t_start'] - return t_start + return min(self.neo_struct['blocks'][block_index]['segments'][seg_index]['t_start'], + self.spiketrain_list['blocks'][block_index]['segments'][seg_index]['t_start']) def _segment_t_stop(self, block_index, seg_index): - t_stop = 0 - for mt in self.file.blocks[block_index].groups[seg_index].multi_tags: - if mt.type == "neo.spiketrain": - t_stop = mt.metadata['t_stop'] - return t_stop + return max(self.neo_struct['blocks'][block_index]['segments'][seg_index]['t_stop'], + self.spiketrain_list['blocks'][block_index]['segments'][seg_index]['t_stop']) def _get_signal_size(self, block_index, seg_index, stream_index): - stream_id = self.header['signal_streams'][stream_index]['id'] - keep = self.header['signal_channels']['stream_id'] == stream_id - channel_indexes, = np.nonzero(keep) - ch_idx = channel_indexes[0] - block = self.da_list['blocks'][block_index] - segment = block['segments'][seg_index] - size = segment['data_size'][ch_idx] - return size # size is per signal, not the sum of all channel_indexes + stream_id = self.streams['stream_ids'][stream_index] + for do in self.neo_struct['blocks'][block_index]['segments'][seg_index]['signals']: + if do['stream_id'] == stream_id: + return do['sample_count'] + + raise ValueError(f'Could not find data object for block {block_index}, segment ' + f'{seg_index} and stream {stream_id}.') def _get_signal_t_start(self, block_index, seg_index, stream_index): - stream_id = self.header['signal_streams'][stream_index]['id'] - keep = self.header['signal_channels']['stream_id'] == stream_id - channel_indexes, = np.nonzero(keep) - ch_idx = channel_indexes[0] - block = self.file.blocks[block_index] - das = [da for da in block.groups[seg_index].data_arrays] - da = das[ch_idx] - sig_t_start = float(da.metadata['t_start']) + seg = self.neo_struct['blocks'][block_index]['segments'][seg_index] + sig_t_start = seg['signals'][stream_index]['t_start'] return sig_t_start # assume same group_id always same t_start def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, stream_index, channel_indexes): - stream_id = self.header['signal_streams'][stream_index]['id'] - keep = self.header['signal_channels']['stream_id'] == stream_id - global_channel_indexes, = np.nonzero(keep) - if channel_indexes is not None: - global_channel_indexes = global_channel_indexes[channel_indexes] if i_start is None: i_start = 0 if i_stop is None: i_stop = self.get_signal_size(block_index, seg_index, stream_index) - raw_signals_list = [] - da_list = self.da_list['blocks'][block_index]['segments'][seg_index] - for idx in global_channel_indexes: - da = da_list['data'][idx] - raw_signals_list.append(da[i_start:i_stop]) + segment = self.neo_struct['blocks'][block_index]['segments'][seg_index] + if self._file_version < Version('0.11.0dev0'): + das = segment['signals'][stream_index]['data'] + da = np.asarray(das).transpose() + else: + da = segment['signals'][stream_index]['data'][0] - raw_signals = np.array(raw_signals_list) - raw_signals = np.transpose(raw_signals) + if channel_indexes is not None: + mask = channel_indexes + else: + mask = slice(None, None) + raw_signals = da[..., mask][i_start: i_stop] return raw_signals def _spike_count(self, block_index, seg_index, unit_index): - count = 0 - head_id = self.header['spike_channels'][unit_index][1] - for mt in self.file.blocks[block_index].groups[seg_index].multi_tags: - for src in mt.sources: - if mt.type == 'neo.spiketrain' and [src.type == "neo.unit"]: - if head_id == src.id: - return len(mt.positions) - return count + # unit index == unit id + seg = self.spiketrain_list['blocks'][block_index]['segments'][seg_index] + st = seg['spiketrains'][unit_index] + assert st['unit_id'] == self.header['spike_channels'][unit_index][1] + return st['spike_count'] def _get_spike_timestamps(self, block_index, seg_index, unit_index, t_start, t_stop): - block = self.unit_list['blocks'][block_index] + block = self.spiketrain_list['blocks'][block_index] segment = block['segments'][seg_index] spike_dict = segment['spiketrains'] - spike_timestamps = spike_dict[unit_index] - spike_timestamps = np.transpose(spike_timestamps) + spike_timestamps = np.array(spike_dict[unit_index]['data'].positions) # dtype = float + # spike_timestamps = np.transpose(spike_timestamps) if t_start is not None or t_stop is not None: lim0 = t_start @@ -350,8 +460,8 @@ def _rescale_spike_timestamp(self, spike_timestamps, dtype): def _get_spike_raw_waveforms(self, block_index, seg_index, unit_index, t_start, t_stop): # this must return a 3D numpy array (nb_spike, nb_channel, nb_sample) - seg = self.unit_list['blocks'][block_index]['segments'][seg_index] - waveforms = seg['spiketrains_unit'][unit_index]['waveforms'] + seg = self.spiketrain_list['blocks'][block_index]['segments'][seg_index] + waveforms = seg['spiketrains'][unit_index]['waveforms'] if not waveforms: return None raw_waveforms = np.array(waveforms) diff --git a/neo/test/iotest/test_nixio.py b/neo/test/iotest/test_nixio.py index 792074c49..577704922 100644 --- a/neo/test/iotest/test_nixio.py +++ b/neo/test/iotest/test_nixio.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016, German Neuroinformatics Node (G-Node) +# Copyright (c) 2016-2021, German Neuroinformatics Node (G-Node) # Achilleas Koutsou # # All rights reserved. @@ -26,15 +26,14 @@ import numpy as np import quantities as pq -from neo.core import (Block, Segment, AnalogSignal, - IrregularlySampledSignal, SpikeTrain, +from neo.version import version as neoversion +from neo.core import (Block, Segment, AnalogSignal, IrregularlySampledSignal, SpikeTrain, Event, Epoch, ImageSequence, Group, ChannelView) from neo.test.iotest.common_io_test import BaseTestIO -from neo.io.nixio import (NixIO, create_quantity, units_to_string, neover, - dt_from_nix, dt_to_nix, DATETIMEANNOTATION) +from neo.io.nixio import (NixIO, create_quantity, units_to_string, neover, dt_from_nix, dt_to_nix, + DATETIMEANNOTATION) from neo.io.nixio_fr import NixIO as NixIO_lazy -from neo.io.proxyobjects import (AnalogSignalProxy, SpikeTrainProxy, - EventProxy, EpochProxy) +from neo.io.proxyobjects import (AnalogSignalProxy, SpikeTrainProxy, EventProxy, EpochProxy) try: import nixio as nix @@ -78,13 +77,14 @@ def check_refs(self, neoblock, nixblock): :param nixblock: The corresponding NIX block """ - # Events and Epochs must reference all Signals in the Group (NIX only) + # Events and Epochs must reference all Signals in the NIX Group for nixgroup in nixblock.groups: nixevep = list(mt for mt in nixgroup.multi_tags if mt.type in ["neo.event", "neo.epoch"]) nixsigs = list(da.name for da in nixgroup.data_arrays if da.type in ["neo.analogsignal", - "neo.irregularlysampledsignal"]) + "neo.irregularlysampledsignal", + "neo.imagesequence"]) for nee in nixevep: for ns in nixsigs: self.assertIn(ns, nee.references) @@ -92,78 +92,72 @@ def check_refs(self, neoblock, nixblock): def compare_segment_group(self, neoseg, nixgroup): self.compare_attr(neoseg, nixgroup) neo_signals = neoseg.analogsignals + neoseg.irregularlysampledsignals \ - + neoseg.imagesequences + + neoseg.imagesequences self.compare_signals_das(neo_signals, nixgroup.data_arrays) neo_eests = neoseg.epochs + neoseg.events + neoseg.spiketrains self.compare_eests_mtags(neo_eests, nixgroup.multi_tags) def compare_signals_das(self, neosignals, data_arrays): - totalsignals = 0 + self.assertEqual(len(neosignals), len(data_arrays)) for sig in neosignals: - dalist = list() + data_array = None nixname = sig.annotations["nix_name"] - for da in data_arrays: - if da.metadata.name == nixname: - dalist.append(da) - nsig = np.shape(sig)[-1] - totalsignals += nsig - self.assertEqual(nsig, len(dalist)) - self.compare_signal_dalist(sig, dalist) - self.assertEqual(totalsignals, len(data_arrays)) - - def compare_signal_dalist(self, neosig, nixdalist): + # find matching data array for given neo signal + for d in data_arrays: + if d.metadata.name == nixname: + data_array = d + break + self.assertEqual(sig.shape, data_array.shape) + self.compare_signal_da(sig, data_array) + + def compare_signal_da(self, neosig, da): """ Check if a Neo Analog or IrregularlySampledSignal matches a list of NIX DataArrays. :param neosig: Neo Analog or IrregularlySampledSignal - :param nixdalist: List of DataArrays + :param da: DataArray """ - nixmd = nixdalist[0].metadata - self.assertTrue(all(nixmd == da.metadata for da in nixdalist)) neounit = neosig.units if isinstance(neosig, AnalogSignalProxy): neosig = neosig.load() - for sig, da in zip(np.transpose(neosig), nixdalist): - self.compare_attr(neosig, da) - daquant = create_quantity(da[:], da.unit) - np.testing.assert_almost_equal(sig.view(pq.Quantity), daquant) - nixunit = create_quantity(1, da.unit) - self.assertEqual(neounit, nixunit) - - if isinstance(neosig, AnalogSignal): - timedim = da.dimensions[0] - self.assertEqual(timedim.dimension_type, - nix.DimensionType.Sample) - neosp = neosig.sampling_period - nixsp = create_quantity(timedim.sampling_interval, - timedim.unit) - self.assertEqual(neosp, nixsp) - tsunit = timedim.unit - if "t_start.units" in da.metadata.props: - tsunit = da.metadata["t_start.units"] - neots = neosig.t_start - nixts = create_quantity(timedim.offset, tsunit) - self.assertEqual(neots, nixts) - elif isinstance(neosig, IrregularlySampledSignal): - timedim = da.dimensions[0] - self.assertEqual(timedim.dimension_type, - nix.DimensionType.Range) - np.testing.assert_almost_equal(neosig.times.magnitude, - timedim.ticks) - self.assertEqual(timedim.unit, - units_to_string(neosig.times.units)) - elif isinstance(neosig, ImageSequence): - rate = da.metadata["sampling_rate"] - unit = da.metadata.props["sampling_rate"].unit - sampling_rate = create_quantity(rate, unit) - neosr = neosig.sampling_rate - self.assertEqual(sampling_rate, neosr) - scale = da.metadata["spatial_scale"] - unit = da.metadata.props["spatial_scale"].unit - spatial_scale = create_quantity(scale, unit) - neosps = neosig.spatial_scale - self.assertEqual(spatial_scale, neosps) + + sig = neosig + self.compare_attr(neosig, da) + daquant = create_quantity(da[:], da.unit) + np.testing.assert_almost_equal(sig.view(pq.Quantity), daquant) + nixunit = create_quantity(1, da.unit) + self.assertEqual(neounit, nixunit) + + if isinstance(neosig, AnalogSignal): + timedim = da.dimensions[0] + self.assertEqual(timedim.dimension_type, nix.DimensionType.Sample) + neosp = neosig.sampling_period + nixsp = create_quantity(timedim.sampling_interval, timedim.unit) + self.assertEqual(neosp, nixsp) + tsunit = timedim.unit + if "t_start.units" in da.metadata.props: + tsunit = da.metadata["t_start.units"] + neots = neosig.t_start + nixts = create_quantity(timedim.offset, tsunit) + self.assertEqual(neots, nixts) + elif isinstance(neosig, IrregularlySampledSignal): + timedim = da.dimensions[0] + self.assertEqual(timedim.dimension_type, nix.DimensionType.Range) + np.testing.assert_almost_equal(neosig.times.magnitude, + timedim.ticks) + self.assertEqual(timedim.unit, units_to_string(neosig.times.units)) + elif isinstance(neosig, ImageSequence): + rate = da.dimensions[0].sampling_interval + unit = da.dimensions[0].unit + sampling_period = create_quantity(rate, unit) + neofd = neosig.frame_duration + self.assertEqual(sampling_period, neofd) + scale = da.dimensions[1].sampling_interval + unit = da.dimensions[1].unit + spatial_scale = create_quantity(scale, unit) + neosps = neosig.spatial_scale + self.assertEqual(spatial_scale, neosps) def compare_eests_mtags(self, eestlist, mtaglist): self.assertEqual(len(eestlist), len(mtaglist)) @@ -189,8 +183,7 @@ def compare_epoch_mtag(self, epoch, mtag): extquant = create_quantity(ext[:], ext.unit) np.testing.assert_almost_equal(epoch.as_quantity(), posquant) np.testing.assert_almost_equal(epoch.durations, extquant) - for neol, nixl in zip(epoch.labels, - mtag.positions.dimensions[0].labels): + for neol, nixl in zip(epoch.labels, mtag.positions.dimensions[0].labels): self.assertEqual(neol, nixl) def compare_event_mtag(self, event, mtag): @@ -199,8 +192,7 @@ def compare_event_mtag(self, event, mtag): pos = mtag.positions posquant = create_quantity(pos[:], pos.unit) np.testing.assert_almost_equal(event.as_quantity(), posquant) - for neol, nixl in zip(event.labels, - mtag.positions.dimensions[0].labels): + for neol, nixl in zip(event.labels, mtag.positions.dimensions[0].labels): self.assertEqual(neol, nixl) def compare_spiketrain_mtag(self, spiketrain, mtag): @@ -216,30 +208,20 @@ def compare_spiketrain_mtag(self, spiketrain, mtag): for nixwf, neowf in zip(nixwfs, neowfs): for nixrow, neorow in zip(nixwf, neowf): for nixv, neov in zip(nixrow, neorow): - self.assertEqual(create_quantity(nixv, nixwfs.unit), - neov) - self.assertEqual(nixwfs.dimensions[0].dimension_type, - nix.DimensionType.Set) - self.assertEqual(nixwfs.dimensions[1].dimension_type, - nix.DimensionType.Set) - self.assertEqual(nixwfs.dimensions[2].dimension_type, - nix.DimensionType.Sample) + self.assertEqual(create_quantity(nixv, nixwfs.unit), neov) + self.assertEqual(nixwfs.dimensions[0].dimension_type, nix.DimensionType.Set) + self.assertEqual(nixwfs.dimensions[1].dimension_type, nix.DimensionType.Set) + self.assertEqual(nixwfs.dimensions[2].dimension_type, nix.DimensionType.Sample) def compare_attr(self, neoobj, nixobj): - if isinstance(neoobj, (AnalogSignal, IrregularlySampledSignal, - ImageSequence)): - nix_name = ".".join(nixobj.name.split(".")[:-1]) - else: - nix_name = nixobj.name + nix_name = nixobj.name self.assertEqual(neoobj.annotations["nix_name"], nix_name) self.assertEqual(neoobj.description, nixobj.definition) if hasattr(neoobj, "rec_datetime") and neoobj.rec_datetime: - self.assertEqual(neoobj.rec_datetime, - datetime.fromtimestamp(nixobj.created_at)) + self.assertEqual(neoobj.rec_datetime, datetime.fromtimestamp(nixobj.created_at)) if hasattr(neoobj, "file_datetime") and neoobj.file_datetime: - nixdt = dt_from_nix(nixobj.metadata["file_datetime"], - DATETIMEANNOTATION) + nixdt = dt_from_nix(nixobj.metadata["file_datetime"], DATETIMEANNOTATION) assert neoobj.file_datetime == nixdt self.assertEqual(neoobj.file_datetime, nixdt) if neoobj.annotations: @@ -255,8 +237,7 @@ def compare_attr(self, neoobj, nixobj): nixvalue = np.array(nixvalue) np.testing.assert_almost_equal(nixvalue, v.magnitude) else: - self.assertEqual(nixmd[str(k)], v, - "Property value mismatch: {}".format(k)) + self.assertEqual(nixmd[str(k)], v, "Property value mismatch: {}".format(k)) if hasattr(neoobj, 'array_annotations'): if neoobj.array_annotations: nixmd = nixobj.metadata @@ -280,6 +261,9 @@ def compare_attr(self, neoobj, nixobj): def create_full_nix_file(cls, filename): nixfile = nix.File.open(filename, nix.FileMode.Overwrite) + nixfile.create_section('neo', 'neo.metadata') + nixfile.sections['neo']['version'] = neoversion + nix_block_a = nixfile.create_block(cls.rword(10), "neo.block") nix_block_a.definition = cls.rsentence(5, 10) nix_block_b = nixfile.create_block(cls.rword(10), "neo.block") @@ -302,143 +286,117 @@ def create_full_nix_file(cls, filename): group = blk.create_group(cls.rword(), "neo.segment") group.definition = cls.rsentence(10, 15) - group_md = blk.metadata.create_section( - group.name, group.name + ".metadata" - ) + group_md = blk.metadata.create_section(group.name, group.name + ".metadata") group.metadata = group_md blk = nix_blocks[0] group = blk.groups[0] allspiketrains = list() - allsignalgroups = list() + allsignals = list() # analogsignals for n in range(5): - siggroup = list() asig_name = "{}_asig{}".format(cls.rword(10), n) asig_definition = cls.rsentence(5, 5) - asig_md = group.metadata.create_section(asig_name, - asig_name + ".metadata") + asig_md = group.metadata.create_section(asig_name, asig_name + ".metadata") arr_ann_name, arr_ann_val = 'anasig_arr_ann', cls.rquant(10, pq.uV) - asig_md.create_property(arr_ann_name, - arr_ann_val.magnitude.flatten()) + asig_md.create_property(arr_ann_name, arr_ann_val.magnitude.flatten()) asig_md.props[arr_ann_name].unit = str(arr_ann_val.dimensionality) asig_md.props[arr_ann_name].type = 'ARRAYANNOTATION' - for idx in range(10): - da_asig = blk.create_data_array( - "{}.{}".format(asig_name, idx), - "neo.analogsignal", - data=cls.rquant(100, 1) - ) - da_asig.definition = asig_definition - da_asig.unit = "mV" - - da_asig.metadata = asig_md - - timedim = da_asig.append_sampled_dimension(0.01) - timedim.unit = "ms" - timedim.label = "time" - timedim.offset = 10 - da_asig.append_set_dimension() - group.data_arrays.append(da_asig) - siggroup.append(da_asig) + # signal with 10 channels each 100 samples + da_asig = blk.create_data_array("{}".format(asig_name), "neo.analogsignal", + data=cls.rquant((100, 10), 1)) + da_asig.definition = asig_definition + da_asig.unit = "mV" + + da_asig.metadata = asig_md + + timedim = da_asig.append_sampled_dimension(0.01) + timedim.unit = "ms" + timedim.label = "time" + timedim.offset = 10 + channel_ids = [f'chan {i}' for i in range(da_asig.shape[-1])] + da_asig.append_set_dimension(channel_ids) + group.data_arrays.append(da_asig) asig_md["t_start.dim"] = "ms" - allsignalgroups.append(siggroup) + + allsignals.append(da_asig) + # imagesequence for n in range(5): - imgseqgroup = list() imgseq_name = "{}_imgs{}".format(cls.rword(10), n) imgseq_definition = cls.rsentence(5, 5) - imgseq_md = group.metadata.create_section(imgseq_name, - imgseq_name + ".metadata") + imgseq_md = group.metadata.create_section(imgseq_name, imgseq_name + ".metadata") arr_ann_name, arr_ann_val = 'imgseq_arr_ann', cls.rquant(10, pq.V) - imgseq_md.create_property(arr_ann_name, - arr_ann_val.magnitude.flatten()) + imgseq_md.create_property(arr_ann_name, arr_ann_val.magnitude.flatten()) imgseq_md.props[arr_ann_name].unit = str(arr_ann_val.dimensionality) imgseq_md.props[arr_ann_name].type = 'ARRAYANNOTATION' - for idx in range(10): - da_imgseq = blk.create_data_array( - "{}.{}".format(imgseq_name, idx), - "neo.imagesequence", - data=cls.rquant((20, 10), 1) - ) - da_imgseq.definition = imgseq_definition - da_imgseq.unit = "mV" - - da_imgseq.metadata = imgseq_md - imgseq_md["sampling_rate"] = 10 - imgseq_md.props["sampling_rate"].unit = units_to_string(pq.V) - imgseq_md["spatial_scale"] = 10 - imgseq_md.props["spatial_scale"].unit = units_to_string(pq.micrometer) - - group.data_arrays.append(da_imgseq) - imgseqgroup.append(da_imgseq) - - allsignalgroups.append(imgseqgroup) + da_imgseq = blk.create_data_array(imgseq_name, "neo.imagesequence", + data=cls.rquant((20, 10, 10), 1)) + da_imgseq.definition = imgseq_definition + da_imgseq.unit = "mV" + + da_imgseq.metadata = imgseq_md + da_imgseq.append_sampled_dimension(12, 'time', 'ms', 200) + da_imgseq.append_sampled_dimension(4, 'row', 'mm', 20) + da_imgseq.append_sampled_dimension(4, 'col', 'mm', 10) + + group.data_arrays.append(da_imgseq) + + allsignals.append(da_imgseq) + # irregularlysampledsignals for n in range(2): - siggroup = list() isig_name = "{}_isig{}".format(cls.rword(10), n) isig_definition = cls.rsentence(12, 12) - isig_md = group.metadata.create_section(isig_name, - isig_name + ".metadata") + isig_md = group.metadata.create_section(isig_name, isig_name + ".metadata") isig_times = cls.rquant(200, 1, True) arr_ann_name, arr_ann_val = 'irrsig_arr_ann', cls.rquant(7, pq.uV) - isig_md.create_property(arr_ann_name, - arr_ann_val.magnitude.flatten()) + isig_md.create_property(arr_ann_name, arr_ann_val.magnitude.flatten()) isig_md.props[arr_ann_name].unit = str(arr_ann_val.dimensionality) isig_md.props[arr_ann_name].type = 'ARRAYANNOTATION' - for idx in range(7): - da_isig = blk.create_data_array( - "{}.{}".format(isig_name, idx), - "neo.irregularlysampledsignal", - data=cls.rquant(200, 1) - ) - da_isig.definition = isig_definition - da_isig.unit = "mV" - - da_isig.metadata = isig_md - - timedim = da_isig.append_range_dimension(isig_times) - timedim.unit = "s" - timedim.label = "time" - da_isig.append_set_dimension() - group.data_arrays.append(da_isig) - siggroup.append(da_isig) - allsignalgroups.append(siggroup) + + da_isig = blk.create_data_array(isig_name, "neo.irregularlysampledsignal", + data=cls.rquant((200, 7), 1)) + da_isig.definition = isig_definition + da_isig.unit = "mV" + + da_isig.metadata = isig_md + + timedim = da_isig.append_range_dimension(isig_times) + timedim.unit = "s" + timedim.label = "time" + channel_ids = [f'chan {i}' for i in range(da_isig.shape[1])] + da_isig.append_set_dimension(channel_ids) + group.data_arrays.append(da_isig) + allsignals.append(da_isig) + # SpikeTrains with Waveforms for n in range(4): stname = "{}-st{}".format(cls.rword(20), n) times = cls.rquant(40, 1, True) - times_da = blk.create_data_array( - "{}.times".format(stname), - "neo.spiketrain.times", - data=times - ) + times_da = blk.create_data_array("{}.times".format(stname), "neo.spiketrain.times", + data=times) times_da.unit = "ms" mtag_st = blk.create_multi_tag(stname, "neo.spiketrain", times_da) group.multi_tags.append(mtag_st) mtag_st.definition = cls.rsentence(20, 30) - mtag_st_md = group.metadata.create_section( - mtag_st.name, mtag_st.name + ".metadata" - ) + mtag_st_md = group.metadata.create_section(mtag_st.name, mtag_st.name + ".metadata") mtag_st.metadata = mtag_st_md mtag_st_md.create_property("t_stop", times[-1] + 1.0) arr_ann_name, arr_ann_val = 'st_arr_ann', cls.rquant(40, pq.uV) - mtag_st_md.create_property(arr_ann_name, - arr_ann_val.magnitude.flatten()) + mtag_st_md.create_property(arr_ann_name, arr_ann_val.magnitude.flatten()) mtag_st_md.props[arr_ann_name].unit = str(arr_ann_val.dimensionality) mtag_st_md.props[arr_ann_name].type = 'ARRAYANNOTATION' waveforms = cls.rquant((10, 8, 5), 1) wfname = "{}.waveforms".format(mtag_st.name) - wfda = blk.create_data_array(wfname, "neo.waveforms", - data=waveforms) + wfda = blk.create_data_array(wfname, "neo.waveforms", data=waveforms) wfda.unit = "mV" mtag_st.create_feature(wfda, nix.LinkType.Indexed) wfda.append_set_dimension() # spike dimension @@ -446,85 +404,60 @@ def create_full_nix_file(cls, filename): wftimedim = wfda.append_sampled_dimension(0.1) wftimedim.unit = "ms" wftimedim.label = "time" - wfda.metadata = mtag_st_md.create_section( - wfname, "neo.waveforms.metadata" - ) - wfda.metadata.create_property("left_sweep", - [20] * 5) + wfda.metadata = mtag_st_md.create_section(wfname, "neo.waveforms.metadata") + wfda.metadata.create_property("left_sweep", [20] * 5) allspiketrains.append(mtag_st) # Epochs for n in range(3): epname = "{}-ep{}".format(cls.rword(5), n) times = cls.rquant(5, 1, True) - times_da = blk.create_data_array( - "{}.times".format(epname), - "neo.epoch.times", - data=times - ) + times_da = blk.create_data_array("{}.times".format(epname), "neo.epoch.times", + data=times) times_da.unit = "s" extents = cls.rquant(5, 1) - extents_da = blk.create_data_array( - "{}.durations".format(epname), - "neo.epoch.durations", - data=extents - ) + extents_da = blk.create_data_array("{}.durations".format(epname), + "neo.epoch.durations", data=extents) extents_da.unit = "s" - mtag_ep = blk.create_multi_tag( - epname, "neo.epoch", times_da - ) - mtag_ep.metadata = group.metadata.create_section( - epname, epname + ".metadata" - ) + mtag_ep = blk.create_multi_tag(epname, "neo.epoch", times_da) + mtag_ep.metadata = group.metadata.create_section(epname, epname + ".metadata") group.multi_tags.append(mtag_ep) mtag_ep.definition = cls.rsentence(2) mtag_ep.extents = extents_da arr_ann_name, arr_ann_val = 'ep_arr_ann', cls.rquant(5, pq.uV) - mtag_ep.metadata.create_property(arr_ann_name, - arr_ann_val.magnitude.flatten()) + mtag_ep.metadata.create_property(arr_ann_name, arr_ann_val.magnitude.flatten()) mtag_ep.metadata.props[arr_ann_name].unit = str(arr_ann_val.dimensionality) mtag_ep.metadata.props[arr_ann_name].type = 'ARRAYANNOTATION' label_dim = mtag_ep.positions.append_set_dimension() label_dim.labels = cls.rsentence(5).split(" ") # reference all signals in the group - for siggroup in allsignalgroups: - mtag_ep.references.extend(siggroup) + mtag_ep.references.extend(allsignals) # Events for n in range(2): evname = "{}-ev{}".format(cls.rword(5), n) times = cls.rquant(5, 1, True) - times_da = blk.create_data_array( - "{}.times".format(evname), - "neo.event.times", - data=times - ) + times_da = blk.create_data_array("{}.times".format(evname), "neo.event.times", + data=times) times_da.unit = "s" - mtag_ev = blk.create_multi_tag( - evname, "neo.event", times_da - ) - mtag_ev.metadata = group.metadata.create_section( - evname, evname + ".metadata" - ) + mtag_ev = blk.create_multi_tag(evname, "neo.event", times_da) + mtag_ev.metadata = group.metadata.create_section(evname, evname + ".metadata") group.multi_tags.append(mtag_ev) mtag_ev.definition = cls.rsentence(2) - arr_ann_name, arr_ann_val = 'ev_arr_ann',\ - cls.rquant(5, pq.uV) - mtag_ev.metadata.create_property(arr_ann_name, - arr_ann_val.magnitude.flatten()) + arr_ann_name, arr_ann_val = 'ev_arr_ann', cls.rquant(5, pq.uV) + mtag_ev.metadata.create_property(arr_ann_name, arr_ann_val.magnitude.flatten()) mtag_ev.metadata.props[arr_ann_name].unit = str(arr_ann_val.dimensionality) mtag_ev.metadata.props[arr_ann_name].type = 'ARRAYANNOTATION' label_dim = mtag_ev.positions.append_set_dimension() label_dim.labels = cls.rsentence(5).split(" ") # reference all signals in the group - for siggroup in allsignalgroups: - mtag_ev.references.extend(siggroup) + mtag_ev.references.extend(allsignals) # CHX nixchx = blk.create_source(cls.rword(10), @@ -552,20 +485,17 @@ def create_full_nix_file(cls, filename): for idx in range(nunits): unitname = "{}-unit{}".format(cls.rword(5), idx) nixunit = nixchx.create_source(unitname, "neo.unit") - nixunit.metadata = nixchx.metadata.create_section( - unitname, unitname + ".metadata" - ) + nixunit.metadata = nixchx.metadata.create_section(unitname, unitname + ".metadata") nixunit.definition = cls.rsentence(4, 10) for st in stsperunit[idx]: st.sources.append(nixchx) st.sources.append(nixunit) # pick a few signal groups to reference this CHX - rand_idxs = np.random.choice(range(len(allsignalgroups)), 5, False) - randsiggroups = [allsignalgroups[idx] for idx in rand_idxs] - for siggroup in randsiggroups: - for sig in siggroup: - sig.sources.append(nixchx) + rand_idxs = np.random.choice(range(len(allsignals)), 5, False) + randsigs = [allsignals[idx] for idx in rand_idxs] + for sig in randsigs: + sig.sources.append(nixchx) return nixfile @@ -586,8 +516,7 @@ def rword(n=10): @classmethod def rsentence(cls, n=3, maxwl=10): - return " ".join(cls.rword(np.random.randint(1, maxwl)) - for _ in range(n)) + return " ".join(cls.rword(np.random.randint(1, maxwl)) for _ in range(n)) @classmethod def rdict(cls, nitems): @@ -606,9 +535,8 @@ def rquant(shape, unit, incr=False): except TypeError: dim = 1 if incr and dim > 1: - raise TypeError("Shape of quantity array may only be " - "one-dimensional when incremental values are " - "requested.") + raise TypeError("Shape of quantity array may only be one-dimensional when incremental " + "values are requested.") arr = np.random.random(shape) if incr: arr = np.array(np.cumsum(arr)) @@ -629,13 +557,11 @@ def create_all_annotated(cls): cls.populate_dates(seg) blk.segments.append(seg) - asig = AnalogSignal(signal=signal, sampling_rate=pq.Hz, - array_annotations=signal_ann) + asig = AnalogSignal(signal=signal, sampling_rate=pq.Hz, array_annotations=signal_ann) asig.annotate(**cls.rdict(2)) seg.analogsignals.append(asig) - isig = IrregularlySampledSignal(times=times, signal=signal, - time_units=pq.s, + isig = IrregularlySampledSignal(times=times, signal=signal, time_units=pq.s, array_annotations=signal_ann) isig.annotate(**cls.rdict(2)) seg.irregularlysampledsignals.append(isig) @@ -649,8 +575,8 @@ def create_all_annotated(cls): event.annotate(**cls.rdict(4)) seg.events.append(event) - spiketrain = SpikeTrain(times=times, t_stop=10 * pq.s, - units=pq.s, array_annotations=times_ann) + spiketrain = SpikeTrain(times=times, t_stop=10 * pq.s, units=pq.s, + array_annotations=times_ann) d = cls.rdict(6) d["quantity"] = pq.Quantity(10, "mV") d["qarray"] = pq.Quantity(range(10), "mA") @@ -689,8 +615,7 @@ def write_and_compare(self, blocks, use_obj_names=False): self.compare_blocks(blocks, self.reader.blocks) def test_block_write(self): - block = Block(name=self.rword(), - description=self.rsentence()) + block = Block(name=self.rword(), description=self.rsentence()) self.write_and_compare([block]) block.annotate(**self.rdict(5)) @@ -764,8 +689,7 @@ def test_signals_compound_units(self): units = pq.CompoundUnit("1/30000*V") srate = pq.Quantity(10, pq.CompoundUnit("1.0/10 * Hz")) - asig = AnalogSignal(signal=self.rquant((10, 23), units), - sampling_rate=srate) + asig = AnalogSignal(signal=self.rquant((10, 23), units), sampling_rate=srate) seg.analogsignals.append(asig) self.write_and_compare([block]) @@ -782,10 +706,8 @@ def test_signals_compound_units(self): self.write_and_compare([block, anotherblock]) block.segments[0].analogsignals.append( - AnalogSignal(signal=[10.0, 1.0, 3.0], units=pq.S, - sampling_period=pq.Quantity(3, "s"), - dtype=np.double, name="signal42", - description="this is an analogsignal", + AnalogSignal(signal=[10.0, 1.0, 3.0], units=pq.S, sampling_period=pq.Quantity(3, "s"), + dtype=np.double, name="signal42", description="this is an analogsignal", t_start=45 * pq.CompoundUnit("3.14 * s")), ) self.write_and_compare([block, anotherblock]) @@ -809,8 +731,8 @@ def test_imagesequence_compound_units(self): units = pq.CompoundUnit("1/30000*V") srate = pq.Quantity(10, pq.CompoundUnit("1.0/10 * Hz")) size = pq.Quantity(10, pq.CompoundUnit("1.0/10 * micrometer")) - imgseq = ImageSequence(image_data=self.rquant((10, 20, 10), units), - sampling_rate=srate, spatial_scale=size) + imgseq = ImageSequence(image_data=self.rquant((10, 20, 10), units), sampling_rate=srate, + spatial_scale=size) seg.imagesequences.append(imgseq) self.write_and_compare([block]) @@ -845,8 +767,8 @@ def test_spiketrain_write(self): seg = Segment() block.segments.append(seg) - spiketrain = SpikeTrain(times=[3, 4, 5] * pq.s, t_stop=10.0, - name="spikes!", description="sssssspikes") + spiketrain = SpikeTrain(times=[3, 4, 5] * pq.s, t_stop=10.0, name="spikes!", + description="sssssspikes") seg.spiketrains.append(spiketrain) self.write_and_compare([block]) @@ -867,22 +789,20 @@ def test_spiketrain_write(self): def test_group_write(self): signals = [ - AnalogSignal(np.random.random(size=(1000, 5)) * pq.mV, - sampling_period=1 * pq.ms, name="sig1"), - AnalogSignal(np.random.random(size=(1000, 3)) * pq.mV, - sampling_period=1 * pq.ms, name="sig2"), + AnalogSignal(np.random.random(size=(1000, 5)) * pq.mV, sampling_period=1 * pq.ms, + name="sig1"), + AnalogSignal(np.random.random(size=(1000, 3)) * pq.mV, sampling_period=1 * pq.ms, + name="sig2"), ] spiketrains = [ - SpikeTrain([0.1, 54.3, 76.6, 464.2], units=pq.ms, - t_stop=1000.0 * pq.ms, t_start=0.0 * pq.ms), - SpikeTrain([30.1, 154.3, 276.6, 864.2], units=pq.ms, - t_stop=1000.0 * pq.ms, t_start=0.0 * pq.ms), - SpikeTrain([120.1, 454.3, 576.6, 764.2], units=pq.ms, - t_stop=1000.0 * pq.ms, t_start=0.0 * pq.ms), - ] - epochs = [ - Epoch(times=[0, 500], durations=[100, 100], units=pq.ms, labels=["A", "B"]) + SpikeTrain([0.1, 54.3, 76.6, 464.2], units=pq.ms, t_stop=1000.0 * pq.ms, + t_start=0.0 * pq.ms), + SpikeTrain([30.1, 154.3, 276.6, 864.2], units=pq.ms, t_stop=1000.0 * pq.ms, + t_start=0.0 * pq.ms), + SpikeTrain([120.1, 454.3, 576.6, 764.2], units=pq.ms, t_stop=1000.0 * pq.ms, + t_start=0.0 * pq.ms), ] + epochs = [Epoch(times=[0, 500], durations=[100, 100], units=pq.ms, labels=["A", "B"])] seg = Segment(name="seg1") seg.analogsignals.extend(signals) @@ -907,22 +827,20 @@ def test_group_write(self): def test_group_write_nested(self): signals = [ - AnalogSignal(np.random.random(size=(1000, 5)) * pq.mV, - sampling_period=1 * pq.ms, name="sig1"), - AnalogSignal(np.random.random(size=(1000, 3)) * pq.mV, - sampling_period=1 * pq.ms, name="sig2"), + AnalogSignal(np.random.random(size=(1000, 5)) * pq.mV, sampling_period=1 * pq.ms, + name="sig1"), + AnalogSignal(np.random.random(size=(1000, 3)) * pq.mV, sampling_period=1 * pq.ms, + name="sig2"), ] spiketrains = [ - SpikeTrain([0.1, 54.3, 76.6, 464.2], units=pq.ms, - t_stop=1000.0 * pq.ms, t_start=0.0 * pq.ms), - SpikeTrain([30.1, 154.3, 276.6, 864.2], units=pq.ms, - t_stop=1000.0 * pq.ms, t_start=0.0 * pq.ms), - SpikeTrain([120.1, 454.3, 576.6, 764.2], units=pq.ms, - t_stop=1000.0 * pq.ms, t_start=0.0 * pq.ms), - ] - epochs = [ - Epoch(times=[0, 500], durations=[100, 100], units=pq.ms, labels=["A", "B"]) + SpikeTrain([0.1, 54.3, 76.6, 464.2], units=pq.ms, t_stop=1000.0 * pq.ms, + t_start=0.0 * pq.ms), + SpikeTrain([30.1, 154.3, 276.6, 864.2], units=pq.ms, t_stop=1000.0 * pq.ms, + t_start=0.0 * pq.ms), + SpikeTrain([120.1, 454.3, 576.6, 764.2], units=pq.ms, t_stop=1000.0 * pq.ms, + t_start=0.0 * pq.ms), ] + epochs = [Epoch(times=[0, 500], durations=[100, 100], units=pq.ms, labels=["A", "B"])] seg = Segment(name="seg1") seg.analogsignals.extend(signals) @@ -960,7 +878,7 @@ def test_metadata_structure_write(self): grpmd = blkmd.sections[grp.name] for da in grp.data_arrays: # signals - name = ".".join(da.name.split(".")[:-1]) + name = da.name self.assertIn(name, grpmd.sections) for mtag in grp.multi_tags: # spiketrains, events, and epochs self.assertIn(mtag.name, grpmd.sections) @@ -990,13 +908,12 @@ def test_anonymous_objects_write(self): seg = Segment() blk.segments.append(seg) for anaidx in range(nanasig): - seg.analogsignals.append(AnalogSignal(signal=signal, - sampling_rate=pq.Hz)) + seg.analogsignals.append(AnalogSignal(signal=signal, sampling_rate=pq.Hz)) for imgseqdx in range(nimgseq): seg.imagesequences.append(ImageSequence(image_data=self.rquant( - (10, 20, 10), pq.V), - sampling_rate=pq.Hz, - spatial_scale=pq.micrometer)) + (10, 20, 10), pq.V), + sampling_rate=pq.Hz, + spatial_scale=pq.micrometer)) for irridx in range(nirrseg): seg.irregularlysampledsignals.append( IrregularlySampledSignal(times=times, @@ -1012,8 +929,7 @@ def test_anonymous_objects_write(self): t_stop=times[-1] + pq.s, units=pq.s)) for chidx in range(nchx): - chx = Group(index=[1, 2], - channel_ids=[11, 22]) + chx = Group(index=[1, 2], channel_ids=[11, 22]) blk.groups.append(chx) for unidx in range(nunits): unit = Group() @@ -1085,9 +1001,7 @@ def test_name_objects_write(self): units=pq.s) ) for chidx in range(nchx): - chx = Group(name="chx{}".format(chidx), - index=[1, 2], - channel_ids=[11, 22]) + chx = Group(name="chx{}".format(chidx), index=[1, 2], channel_ids=[11, 22]) blk.groups.append(chx) for unidx in range(nunits): unit = Group(name="chx{}-unit{}".format(chidx, unidx)) @@ -1095,8 +1009,7 @@ def test_name_objects_write(self): # put guard on _generate_nix_name if not SKIPMOCK: - nixgenmock = mock.Mock(name="_generate_nix_name", - wraps=self.io._generate_nix_name) + nixgenmock = mock.Mock(name="_generate_nix_name", wraps=self.io._generate_nix_name) self.io._generate_nix_name = nixgenmock self.writer.write_block(blocks[0], use_obj_names=True) self.compare_blocks([blocks[0]], self.reader.blocks) @@ -1175,8 +1088,7 @@ def test_name_conflicts(self): seg = Segment(name="Event+SpikeTrain conflict Segment") blk.segments.append(seg) seg.events.append(Event(name="TimeyStuff", times=times)) - seg.spiketrains.append(SpikeTrain(name="TimeyStuff", times=times, - t_stop=pq.s)) + seg.spiketrains.append(SpikeTrain(name="TimeyStuff", times=times, t_stop=pq.s)) with self.assertRaises(ValueError): self.io.write_block(blk, use_obj_names=True) @@ -1206,16 +1118,12 @@ def test_multiref_write(self): signal = AnalogSignal(name="sig1", signal=[0, 1, 2], units="mV", sampling_period=pq.Quantity(1, "ms")) othersignal = IrregularlySampledSignal(name="i1", signal=[0, 0, 0], - units="mV", times=[1, 2, 3], - time_units="ms") + units="mV", times=[1, 2, 3], time_units="ms") imgseq = ImageSequence(name="img1", image_data=self.rquant((10, 20, 10), pq.mV), - frame_duration=pq.Quantity(1, "ms"), - spatial_scale=pq.meter) + frame_duration=pq.Quantity(1, "ms"), spatial_scale=pq.meter) event = Event(name="Evee", times=[0.3, 0.42], units="year") - epoch = Epoch(name="epoche", times=[0.1, 0.2] * pq.min, - durations=[0.5, 0.5] * pq.min) - st = SpikeTrain(name="the train of spikes", times=[0.1, 0.2, 10.3], - t_stop=11, units="us") + epoch = Epoch(name="epoche", times=[0.1, 0.2] * pq.min, durations=[0.5, 0.5] * pq.min) + st = SpikeTrain(name="the train of spikes", times=[0.1, 0.2, 10.3], t_stop=11, units="us") for idx in range(3): segname = "seg" + str(idx) @@ -1230,8 +1138,7 @@ def test_multiref_write(self): chidx = Group(index=[10, 20, 29]) seg = blk.segments[0] - st = SpikeTrain(name="choochoo", times=[10, 11, 80], t_stop=1000, - units="s") + st = SpikeTrain(name="choochoo", times=[10, 11, 80], t_stop=1000, units="s") seg.spiketrains.append(st) blk.groups.append(chidx) for idx in range(6): @@ -1242,44 +1149,6 @@ def test_multiref_write(self): self.writer.write_block(blk) self.compare_blocks([blk], self.reader.blocks) - # NOTE: storing data objects that are not within a segment is currently - # disallowed. Leaving this test commented out until this policy - # is properly discussed. - # def test_no_segment_write(self): - # # Tests storing AnalogSignal, IrregularlySampledSignal, and SpikeTrain - # # objects in the secondary (Group) substructure without them - # # being attached to a Segment. - # blk = Block("segmentless block") - # signal = AnalogSignal(name="sig1", signal=[0, 1, 2], units="mV", - # sampling_period=pq.Quantity(1, "ms")) - # othersignal = IrregularlySampledSignal(name="i1", signal=[0, 0, 0], - # units="mV", times=[1, 2, 3], - # time_units="ms") - # sta = SpikeTrain(name="the train of spikes", times=[0.1, 0.2, 10.3], - # t_stop=11, units="us") - # stb = SpikeTrain(name="the train of spikes b", times=[1.1, 2.2, 10.1], - # t_stop=100, units="ms") - - # chidx = Group(index=[8, 13, 21]) - # blk.groups.append(chidx) - # chidx.add(signal) - # chidx.add(othersignal) - - # unit = Group() - # chidx.add(unit) - # unit.add(sta, stb) - # self.writer.write_block(blk) - # self.writer.close() - - # self.compare_blocks([blk], self.reader.blocks) - - # reader = NixIO(self.filename, "ro") - # blk = reader.read_block(neoname="segmentless block") - # chx = blk.groups[0] - # self.assertEqual(len(chx.analogsignals), 1) - # self.assertEqual(len(chx.irregularlysampledsignals), 1) - # self.assertEqual(len(chx.units[0].spiketrains), 2) - def test_rewrite_refs(self): def checksignalcounts(fname): @@ -1302,8 +1171,7 @@ def checksignalcounts(fname): # Two signals in Group for idx in range(2): - asigchx = AnalogSignal(signal=[idx], units="mV", - sampling_rate=pq.Hz) + asigchx = AnalogSignal(signal=[idx], units="mV", sampling_rate=pq.Hz) chidx.add(asigchx) seg.analogsignals.append(asigchx) @@ -1318,8 +1186,7 @@ def checksignalcounts(fname): seg.spiketrains.append(st) # One signal in Segment but not in Group - asigseg = AnalogSignal(signal=[2], units="uA", - sampling_rate=pq.Hz) + asigseg = AnalogSignal(signal=[2], units="uA", sampling_rate=pq.Hz) seg.analogsignals.append(asigseg) # One spiketrain in Segment but not in Group @@ -1350,8 +1217,7 @@ def checksignalcounts(fname): checksignalcounts(secondwrite) def test_to_value(self): - section = self.io.nix_file.create_section("Metadata value test", - "Test") + section = self.io.nix_file.create_section("Metadata value test", "Test") writeprop = self.io._write_property # quantity @@ -1419,8 +1285,7 @@ def test_annotations_special_cases(self): # list of strings losval = ["one", "two", "one million"] - wblock = Block("block with list of strings", - los=losval) + wblock = Block("block with list of strings", los=losval) self.writer.write_block(wblock) rblock = self.writer.read_block(neoname="block with list of strings") self.assertEqual(rblock.annotations["los"], losval) @@ -1489,13 +1354,11 @@ def generate_complete_block(): seg.events.append(event) # add channel index and unit - channel = Group(index=[0], channel_names=['mychannelname'], - channel_ids=[4], - name=['testname']) + channel = Group(index=[0], channel_names=['mychannelname'], channel_ids=[4], + name=['testname']) block.groups.append(channel) - unit = Group(name='myunit', description='blablabla', - file_origin='fileA.nix', - myannotation='myannotation') + unit = Group(name='myunit', description='blablabla', file_origin='fileA.nix', + myannotation='myannotation') channel.add(unit) unit.add(spiketrain) @@ -1621,7 +1484,7 @@ def test_array_annotations_read(self): for seg in bl.segments: for anasig in seg.analogsignals: - da = nix_block.data_arrays[anasig.annotations['nix_name'] + '.0'] + da = nix_block.data_arrays[anasig.annotations['nix_name']] self.assertIn('anasig_arr_ann', da.metadata) self.assertIn('anasig_arr_ann', anasig.array_annotations) nix_ann = da.metadata['anasig_arr_ann'] @@ -1630,7 +1493,7 @@ def test_array_annotations_read(self): self.assertEqual(da.metadata.props['anasig_arr_ann'].unit, units_to_string(neo_ann.units)) for irrsig in seg.irregularlysampledsignals: - da = nix_block.data_arrays[irrsig.annotations['nix_name'] + '.0'] + da = nix_block.data_arrays[irrsig.annotations['nix_name']] self.assertIn('irrsig_arr_ann', da.metadata) self.assertIn('irrsig_arr_ann', irrsig.array_annotations) nix_ann = da.metadata['irrsig_arr_ann'] @@ -1639,7 +1502,7 @@ def test_array_annotations_read(self): self.assertEqual(da.metadata.props['irrsig_arr_ann'].unit, units_to_string(neo_ann.units)) for imgseq in seg.imagesequences: - da = nix_block.data_arrays[imgseq.annotations['nix_name'] + '.0'] + da = nix_block.data_arrays[imgseq.annotations['nix_name']] self.assertIn('imgseq_arr_ann', da.metadata) self.assertIn('imgseq_arr_ann', imgseq.array_annotations) nix_ann = da.metadata['imgseq_arr_ann']