From 194c23d51a6e66d094ead36dc60c0101257ffb36 Mon Sep 17 00:00:00 2001 From: legouee Date: Thu, 19 May 2022 13:52:48 +0200 Subject: [PATCH 01/15] NWB Inspector --- neo/io/nwbio.py | 170 +++++++++++++++++++++++-------- neo/test/iotest/test_nwbio.py | 186 ++++++++++++++++++++++++---------- 2 files changed, 256 insertions(+), 100 deletions(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index 8b7aa5c72..df25d4396 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -45,6 +45,8 @@ from pynwb.misc import AnnotationSeries from pynwb import image from pynwb.image import ImageSeries + from pynwb.file import Subject + from pynwb.epoch import TimeIntervals from pynwb.spec import NWBAttributeSpec, NWBDatasetSpec, NWBGroupSpec, NWBNamespace, \ NWBNamespaceBuilder from pynwb.device import Device @@ -55,6 +57,9 @@ except ImportError: have_pynwb = False +import nwbinspector +from nwbinspector import inspect_nwb, check_regular_timestamps + # hdmf imports try: from hdmf.spec import (LinkSpec, GroupSpec, DatasetSpec, SpecNamespace, @@ -275,6 +280,9 @@ def read_all_blocks(self, lazy=False, **kwargs): if "file_create_date" in self.global_block_metadata: self.global_block_metadata["file_datetime"] = self.global_block_metadata[ "rec_datetime"] + if "subject" in self.global_block_metadata: + self.global_block_metadata["subject"] = self.global_block_metadata[ + "subject"] self._blocks = {} self._read_acquisition_group(lazy=lazy) @@ -352,8 +360,6 @@ def _read_timeseries_group(self, group_name, lazy): except JSONDecodeError: # For NWB files created with other applications, we put everything in a single # segment in a single block - # todo: investigate whether there is a reliable way to create multiple segments, - # e.g. using Trial information block_name = "default" segment_name = "default" else: @@ -441,8 +447,16 @@ def write_all_blocks(self, blocks, **kwargs): raise Exception("Writing to NWB requires an annotation 'session_start_time'") self.annotations = {"rec_datetime": "rec_datetime"} self.annotations["rec_datetime"] = blocks[0].rec_datetime - # todo: handle subject + self.annotations = {"subject": "subject"} nwbfile = NWBFile(**annotations) + if "subject" not in annotations: + nwbfile.subject = Subject( + subject_id="subject_id", + age="P0D", #Period x days old + description="no description", + species="Mus musculus", # by default + sex="U", # unknown + ) assert self.nwb_file_mode in ('w',) # possibly expand to 'a'ppend later if self.nwb_file_mode == "w" and os.path.exists(self.filename): os.remove(self.filename) @@ -458,15 +472,16 @@ def write_all_blocks(self, blocks, **kwargs): 'block', 'the name of the Neo Block to which the SpikeTrain belongs') if sum(statistics(block)["Epoch"]["count"] for block in blocks) > 0: - nwbfile.add_epoch_column('_name', 'the name attribute of the Epoch') - # nwbfile.add_epoch_column('_description', 'the description attribute of the Epoch') - nwbfile.add_epoch_column( - 'segment', 'the name of the Neo Segment to which the Epoch belongs') - nwbfile.add_epoch_column('block', - 'the name of the Neo Block to which the Epoch belongs') + nwbfile.add_trial_column('segment', 'the name of the Neo Segment to which the Epoch belongs') + nwbfile.add_trial_column('block', 'the name of the Neo Block to which the Epoch belongs') + arr = [[], []] # epoch array for ascending t_start and t_stop for i, block in enumerate(blocks): - self.write_block(nwbfile, block) + block_name=block.name + self.write_block(nwbfile, block, arr) + arr2 = np.sort(arr) + self._write_epoch(nwbfile, arr2, block) + io_nwb.write(nwbfile) io_nwb.close() @@ -475,7 +490,37 @@ def write_all_blocks(self, blocks, **kwargs): if errors: raise Exception(f"Errors found when validating {self.filename}") - def write_block(self, nwbfile, block, **kwargs): + # NWBInspector : Inspect NWB files for compliance with NWB Best Practices. + results_generator = inspect_nwb(nwbfile_path=self.filename) + for message in results_generator: + if message.importance._name_=="CRITICAL": + print("message.importance = ", message.importance) + print("Potentially incorrect data") + print(message.message) + print("message.check_function_name = ", message.check_function_name) + print("message.object_type = ", message.object_type) + print("message.object_name = ", message.object_name) + print("----------------------") + if message.importance._name_=="BEST_PRACTICE_VIOLATION": + print("message.importance = ", message.importance) + print("Very suboptimal data representation") + print(message.message) + print("message.check_function_name = ", message.check_function_name) + print("message.object_type = ", message.object_type) + print("message.object_name = ", message.object_name) + print("----------------------") + if message.importance._name_=="BEST_PRACTICE_SUGGESTION": + print("message.importance = ", message.importance) + print("Improvable data representation") + print(message.message) + print("message.check_function_name = ", message.check_function_name) + print("message.object_type = ", message.object_type) + print("message.object_name = ", message.object_name) + print("----------------------") + + io_nwb.close() + + def write_block(self, nwbfile, block, arr, **kwargs): """ Write a Block to the file :param block: Block to be written @@ -485,10 +530,11 @@ def write_block(self, nwbfile, block, **kwargs): if not block.name: block.name = "block%d" % self.blocks_written for i, segment in enumerate(block.segments): + segment.name = "%s : segment%d" % (block.name, i) assert segment.block is block if not segment.name: segment.name = "%s : segment%d" % (block.name, i) - self._write_segment(nwbfile, segment, electrodes) + self._write_segment(nwbfile, segment, electrodes, arr) self.blocks_written += 1 def _write_electrodes(self, nwbfile, block): @@ -512,8 +558,7 @@ def _write_electrodes(self, nwbfile, block): ) return electrodes - def _write_segment(self, nwbfile, segment, electrodes): - # maybe use NWB trials to store Segment metadata? + def _write_segment(self, nwbfile, segment, electrodes, arr): for i, signal in enumerate( chain(segment.analogsignals, segment.irregularlysampledsignals)): assert signal.segment is segment @@ -541,8 +586,8 @@ def _write_segment(self, nwbfile, segment, electrodes): for i, epoch in enumerate(segment.epochs): if not epoch.name: - epoch.name = "%s : epoch%d" % (segment.name, i) - self._write_epoch(nwbfile, epoch) + epoch_name = "%s : epoch%d" % (segment.name, i) + self._write_manage_epoch(nwbfile, segment, epoch, arr) def _write_signal(self, nwbfile, signal, electrodes): hierarchy = {'block': signal.segment.block.name, 'segment': signal.segment.name} @@ -564,22 +609,22 @@ def _write_signal(self, nwbfile, signal, electrodes): if isinstance(signal, AnalogSignal): sampling_rate = signal.sampling_rate.rescale("Hz") tS = timeseries_class( - name=signal.name, - starting_time=time_in_seconds(signal.t_start), - data=signal, - unit=units.dimensionality.string, - rate=float(sampling_rate), - comments=json.dumps(hierarchy), - **additional_metadata) + name=signal.name, + starting_time=time_in_seconds(signal.t_start), + data=signal, + unit=units.dimensionality.string, + rate=float(sampling_rate), + comments=json.dumps(hierarchy), + **additional_metadata) # todo: try to add array_annotations via "control" attribute elif isinstance(signal, IrregularlySampledSignal): - tS = timeseries_class( - name=signal.name, - data=signal, - unit=units.dimensionality.string, - timestamps=signal.times.rescale('second').magnitude, - comments=json.dumps(hierarchy), - **additional_metadata) + tS = timeseries_class( + name=signal.name, + data=signal, + unit=units.dimensionality.string, + timestamps=signal.times.rescale('second').magnitude, + comments=json.dumps(hierarchy), + **additional_metadata) else: raise TypeError( "signal has type {0}, should be AnalogSignal or IrregularlySampledSignal".format( @@ -610,24 +655,57 @@ def _write_spiketrain(self, nwbfile, spiketrain): return nwbfile.units def _write_event(self, nwbfile, event): - hierarchy = {'block': event.segment.block.name, 'segment': event.segment.name} - tS_evt = AnnotationSeries( - name=event.name, - data=event.labels, - timestamps=event.times.rescale('second').magnitude, - description=event.description or "", - comments=json.dumps(hierarchy)) + hierarchy = {'block': event.segment.block.name, 'segment': event.segment.name} + + if any(event.times.rescale('second').magnitude)==any(event.times.rescale('second').magnitude): # if constant timestamps + tS_evt = TimeSeries( + name=event.name, + data=event.labels, + starting_time=0.0, + rate=0.01, + unit=str(event.units), + description=event.description or "", + comments=json.dumps(hierarchy)) + else: + tS_evt = TimeSeries( + name=event.name, + data=event.labels, + timestamps=event.times.rescale('second').magnitude, + unit=str(event.units), + description=event.description or "", + comments=json.dumps(hierarchy)) + nwbfile.add_acquisition(tS_evt) return tS_evt - def _write_epoch(self, nwbfile, epoch): + def _write_manage_epoch(self, nwbfile, segment, epoch, arr): for t_start, duration, label in zip(epoch.rescale('s').magnitude, epoch.durations.rescale('s').magnitude, - epoch.labels): - nwbfile.add_epoch(t_start, t_start + duration, [label], [], - _name=epoch.name, - segment=epoch.segment.name, - block=epoch.segment.block.name) + epoch.labels, + ): + for j in [label]: + t_stop = t_start + duration + seg_name = "%s %s" % (epoch.segment.name, label) + bl_name = "%s %s" % (epoch.segment.block.name, label) + epoch_name = "%s %s" % (segment.name, j) + + arr[0].append(t_start) + arr[1].append(t_stop) + + def _write_epoch(self, nwbfile, arr2, block): + for i in range(len(arr2[0])): + t_start = arr2[0][i] + t_stop = arr2[1][i] + for k in block.segments: + segment_name = k.name + nwbfile.add_trial( + start_time=t_start, + stop_time=t_stop, + tags=[" "], + timeseries=[], + segment=segment_name, + block=block.name, + ) return nwbfile.epochs @@ -644,6 +722,9 @@ def __init__(self, timeseries, nwb_group): self.units = timeseries.unit if timeseries.conversion: self.units = _recompose_unit(timeseries.unit, timeseries.conversion) + check_timestamps = check_regular_timestamps(timeseries) + if check_timestamps is not None: + timeseries.starting_time=0.0 if timeseries.starting_time is not None: self.t_start = timeseries.starting_time * pq.s else: @@ -714,7 +795,8 @@ def load(self, time_slice=None, strict_slicing=True): if self.sampling_rate is None: return IrregularlySampledSignal( self._timeseries.timestamps[i_start:i_stop] * pq.s, - signal, + # time_units=pq.s, + signal=signal, units=self.units, t_start=sig_t_start, sampling_rate=self.sampling_rate, diff --git a/neo/test/iotest/test_nwbio.py b/neo/test/iotest/test_nwbio.py index fe110416d..e0a0b93d5 100644 --- a/neo/test/iotest/test_nwbio.py +++ b/neo/test/iotest/test_nwbio.py @@ -28,10 +28,14 @@ import quantities as pq import numpy as np from numpy.testing import assert_array_equal, assert_allclose +import nwbinspector +from nwbinspector import inspect_nwb +from pynwb.file import Subject @unittest.skipUnless(HAVE_PYNWB, "requires pynwb") -class TestNWBIO(BaseTestIO, unittest.TestCase): +#class TestNWBIO(BaseTestIO, unittest.TestCase): +class TestNWBIO(unittest.TestCase): ioclass = NWBIO entities_to_download = ["nwb"] entities_to_test = [ @@ -41,26 +45,46 @@ class TestNWBIO(BaseTestIO, unittest.TestCase): def test_roundtrip(self): + subject_annotations = { + "nwb:subject_id": "012", + "nwb:age": "P90D", + "nwb:description": "mouse 5", + "nwb:species": "Mus musculus", + "nwb:sex": "M", + } annotations = { - "session_start_time": datetime.now() + "session_start_time": datetime.now(), + "subject": subject_annotations, } # Define Neo blocks - bl0 = Block(name='First block', **annotations) - bl1 = Block(name='Second block', **annotations) - bl2 = Block(name='Third block', **annotations) + bl0 = Block(name='First block', + experimenter="Experimenter's name", + experiment_description="Experiment description", + institution="Institution", + **annotations) + bl1 = Block(name='Second block', + experimenter="Experimenter's name", + experiment_description="Experiment description", + institution="Institution", + **annotations) + bl2 = Block(name='Third block', + experimenter="Experimenter's name", + experiment_description="Experiment description", + institution="Institution", + **annotations) original_blocks = [bl0, bl1, bl2] num_seg = 4 # number of segments - num_chan = 3 # number of channels + num_chan = 6 # number of channels - for blk in original_blocks: + for j, blk in enumerate(original_blocks): for ind in range(num_seg): # number of Segments seg = Segment(index=ind) seg.block = blk blk.segments.append(seg) - for seg in blk.segments: # AnalogSignal objects + for i, seg in enumerate(blk.segments): # AnalogSignal objects # 3 Neo AnalogSignals a = AnalogSignal(name='Signal_a %s' % (seg.name), @@ -75,49 +99,25 @@ def test_roundtrip(self): signal=np.random.randn(33, num_chan) * pq.uA, sampling_rate=10 * pq.kHz, t_start=120 * pq.ms) - # 2 Neo IrregularlySampledSignals - d = IrregularlySampledSignal(np.arange(7.0) * pq.ms, - np.random.randn(7, num_chan) * pq.mV) - + # 1 Neo IrregularlySampledSignals + d = IrregularlySampledSignal([0.01, 0.03, 0.12]*pq.s, [[4, 5], [5, 4], [6, 3]]*pq.nA) # 2 Neo SpikeTrains train = SpikeTrain(times=[1, 2, 3] * pq.s, t_start=1.0, t_stop=10.0) train2 = SpikeTrain(times=[4, 5, 6] * pq.s, t_stop=10.0) # todo: add waveforms - # 1 Neo Event - evt = Event(name='Event', - times=np.arange(0, 30, 10) * pq.ms, - labels=np.array(['ev0', 'ev1', 'ev2'])) - - # 2 Neo Epochs - epc = Epoch(times=np.arange(0, 30, 10) * pq.s, - durations=[10, 5, 7] * pq.ms, - labels=np.array(['btn0', 'btn1', 'btn2'])) - - epc2 = Epoch(times=np.arange(10, 40, 10) * pq.s, - durations=[9, 3, 8] * pq.ms, - labels=np.array(['btn3', 'btn4', 'btn5'])) - seg.spiketrains.append(train) seg.spiketrains.append(train2) - - seg.epochs.append(epc) - seg.epochs.append(epc2) - seg.analogsignals.append(a) seg.analogsignals.append(b) seg.analogsignals.append(c) seg.irregularlysampledsignals.append(d) - seg.events.append(evt) a.segment = seg b.segment = seg c.segment = seg d.segment = seg - evt.segment = seg train.segment = seg train2.segment = seg - epc.segment = seg - epc2.segment = seg # write to file test_file_name = "test_round_trip.nwb" @@ -148,16 +148,6 @@ def test_roundtrip(self): original_issignal_22d.times.rescale('ms').magnitude) assert_array_equal(retrieved_issignal_22d.magnitude, original_issignal_22d.magnitude) - original_event_11 = original_blocks[1].segments[1].events[0] - retrieved_event_11 = retrieved_blocks[1].segments[1].events[0] - for attr_name in ("name",): - retrieved_attribute = getattr(retrieved_event_11, attr_name) - original_attribute = getattr(original_event_11, attr_name) - self.assertEqual(retrieved_attribute, original_attribute) - assert_array_equal(retrieved_event_11.rescale('ms').magnitude, - original_event_11.rescale('ms').magnitude) - assert_array_equal(retrieved_event_11.labels, original_event_11.labels) - original_spiketrain_131 = original_blocks[1].segments[1].spiketrains[1] retrieved_spiketrain_131 = retrieved_blocks[1].segments[1].spiketrains[1] for attr_name in ("name", "t_start", "t_stop"): @@ -167,23 +157,28 @@ def test_roundtrip(self): assert_array_equal(retrieved_spiketrain_131.times.rescale('ms').magnitude, original_spiketrain_131.times.rescale('ms').magnitude) - original_epoch_11 = original_blocks[1].segments[1].epochs[0] - retrieved_epoch_11 = retrieved_blocks[1].segments[1].epochs[0] - for attr_name in ("name",): - retrieved_attribute = getattr(retrieved_epoch_11, attr_name) - original_attribute = getattr(original_epoch_11, attr_name) - self.assertEqual(retrieved_attribute, original_attribute) - assert_array_equal(retrieved_epoch_11.rescale('ms').magnitude, - original_epoch_11.rescale('ms').magnitude) - assert_allclose(retrieved_epoch_11.durations.rescale('ms').magnitude, - original_epoch_11.durations.rescale('ms').magnitude) - assert_array_equal(retrieved_epoch_11.labels, original_epoch_11.labels) + # NWBInspector : Inspect NWB files for compliance with NWB Best Practices. + results_roundtrip = list(inspect_nwb(nwbfile_path=test_file_name)) + #print("results test_roundtrip NWBInspector = ", results_roundtrip) + os.remove(test_file_name) def test_roundtrip_with_annotations(self): # test with NWB-specific annotations - original_block = Block(name="experiment", session_start_time=datetime.now()) + subject_annotations = { + "nwb:subject_id": "011", + "nwb:age": "P90D", + "nwb:description": "mouse 4", + "nwb:species": "Mus musculus", + "nwb:sex": "F", + } + original_block = Block(name="experiment", session_start_time=datetime.now(), + experimenter="Experimenter's name", + experiment_description="Experiment description", + institution="Institution", + subject=subject_annotations, + ) segment = Segment(name="session 1") original_block.segments.append(segment) segment.block = original_block @@ -248,6 +243,85 @@ def test_roundtrip_with_annotations(self): self.assertEqual(retrieved_attribute, original_attribute) assert_array_equal(retrieved_response.magnitude, original_response.magnitude) + # NWBInspector : Inspect NWB files for compliance with NWB Best Practices. + results_roundtrip_with_annotations = list(inspect_nwb(nwbfile_path=test_file_name)) + #print("results test_roundtrip_with_annotations NWBInspector = ", results_roundtrip_with_annotations) + + os.remove(test_file_name) + + def test_roundtrip_with_not_constant_sampling_rate(self): + # To check NWB Inspector for Epoch and Event + # NWB Epochs = Neo Segments + # Should work for multiple segments, not for multiple blocks + # The specific test for Time Series not having a constant sample rate + # For epochs and events + + annotations = { + "session_start_time": datetime.now(), + } + # Define Neo blocks + bl0 = Block(name='First block', + experimenter="Experimenter's name", + experiment_description="Experiment description", + institution="Institution", + **annotations) + original_blocks = [bl0] + + num_seg = 2 # number of segments + num_chan = 3 # number of channels + + for j, blk in enumerate(original_blocks): + + for ind in range(num_seg): # number of Segments + seg = Segment(index=ind) + seg.block = blk + blk.segments.append(seg) + + for i, seg in enumerate(blk.segments): # AnalogSignal objects + + a = AnalogSignal(name='Signal_a %s' % (seg.name), + signal=np.random.randn(44, num_chan) * pq.nA, + sampling_rate=10 * pq.kHz, + t_start=50 * pq.ms) + + epc = Epoch(times=[0 + i*ind, 10 + i*ind, 33 + i*ind]*pq.s, + durations=[10, 5, 7]*pq.s, + labels=np.array(['btn0', 'btn1', 'btn2'], dtype='U') + ) + + epc2 = Epoch(times=[0.1 + i*ind, 30 + i*ind, 61 + i*ind]*pq.s, + durations=[10, 5, 7]*pq.s, + labels=np.array(['btn4', 'btn5', 'btn6'])) + + evt = Event(name='Event', + times=[0.01 + i*ind, 11 + i*ind, 33 + i*ind]*pq.s, + labels=np.array(['ev0', 'ev1', 'ev2']) + ) + + seg.epochs.append(epc) + seg.epochs.append(epc2) + seg.events.append(evt) + seg.analogsignals.append(a) + + a.segment = seg + epc.segment = seg + epc2.segment = seg + evt.segment = seg + + # write to file + test_file_name = "test_round_trip_with_not_constant_sampling_rate.nwb" + iow = NWBIO(filename=test_file_name, mode='w') + iow.write_all_blocks(original_blocks) + + ior = NWBIO(filename=test_file_name, mode='r') + retrieved_blocks = ior.read_all_blocks() + + self.assertEqual(len(retrieved_blocks), 1) + + # NWBInspector : Inspect NWB files for compliance with NWB Best Practices. + results_roundtrip_specific_for_epochs = list(inspect_nwb(nwbfile_path=test_file_name)) + #print("results test_roundtrip_specific_for_epochs NWBInspector = ", results_roundtrip_specific_for_epochs) + os.remove(test_file_name) From 862c5266b2db87453b649d4b2c6c0fb4c4ef1771 Mon Sep 17 00:00:00 2001 From: legouee Date: Thu, 19 May 2022 14:17:29 +0200 Subject: [PATCH 02/15] pep8 fix --- neo/io/nwbio.py | 128 ++++++++++++++++------------------ neo/test/iotest/test_nwbio.py | 74 ++++++++++---------- 2 files changed, 96 insertions(+), 106 deletions(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index df25d4396..a1db878d9 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -452,11 +452,11 @@ def write_all_blocks(self, blocks, **kwargs): if "subject" not in annotations: nwbfile.subject = Subject( subject_id="subject_id", - age="P0D", #Period x days old + age="P0D",#Period x days old description="no description", - species="Mus musculus", # by default - sex="U", # unknown - ) + species="Mus musculus",# by default + sex="U",# unknown + ) assert self.nwb_file_mode in ('w',) # possibly expand to 'a'ppend later if self.nwb_file_mode == "w" and os.path.exists(self.filename): os.remove(self.filename) @@ -472,14 +472,14 @@ def write_all_blocks(self, blocks, **kwargs): 'block', 'the name of the Neo Block to which the SpikeTrain belongs') if sum(statistics(block)["Epoch"]["count"] for block in blocks) > 0: - nwbfile.add_trial_column('segment', 'the name of the Neo Segment to which the Epoch belongs') - nwbfile.add_trial_column('block', 'the name of the Neo Block to which the Epoch belongs') + nwbfile.add_trial_column('segment', 'name of the Segment to which the Epoch belongs') + nwbfile.add_trial_column('block', 'name of the Block to which the Epoch belongs') - arr = [[], []] # epoch array for ascending t_start and t_stop + arr = [[], []]# epoch array for ascending t_start and t_stop for i, block in enumerate(blocks): - block_name=block.name + block_name = block.name self.write_block(nwbfile, block, arr) - arr2 = np.sort(arr) + arr2 = np.sort(arr) self._write_epoch(nwbfile, arr2, block) io_nwb.write(nwbfile) @@ -493,7 +493,7 @@ def write_all_blocks(self, blocks, **kwargs): # NWBInspector : Inspect NWB files for compliance with NWB Best Practices. results_generator = inspect_nwb(nwbfile_path=self.filename) for message in results_generator: - if message.importance._name_=="CRITICAL": + if message.importance._name_ == "CRITICAL": print("message.importance = ", message.importance) print("Potentially incorrect data") print(message.message) @@ -501,7 +501,7 @@ def write_all_blocks(self, blocks, **kwargs): print("message.object_type = ", message.object_type) print("message.object_name = ", message.object_name) print("----------------------") - if message.importance._name_=="BEST_PRACTICE_VIOLATION": + if message.importance._name_ == "BEST_PRACTICE_VIOLATION": print("message.importance = ", message.importance) print("Very suboptimal data representation") print(message.message) @@ -509,7 +509,7 @@ def write_all_blocks(self, blocks, **kwargs): print("message.object_type = ", message.object_type) print("message.object_name = ", message.object_name) print("----------------------") - if message.importance._name_=="BEST_PRACTICE_SUGGESTION": + if message.importance._name_ == "BEST_PRACTICE_SUGGESTION": print("message.importance = ", message.importance) print("Improvable data representation") print(message.message) @@ -608,23 +608,21 @@ def _write_signal(self, nwbfile, signal, electrodes): units = signal.units if isinstance(signal, AnalogSignal): sampling_rate = signal.sampling_rate.rescale("Hz") - tS = timeseries_class( - name=signal.name, - starting_time=time_in_seconds(signal.t_start), - data=signal, - unit=units.dimensionality.string, - rate=float(sampling_rate), - comments=json.dumps(hierarchy), - **additional_metadata) + tS = timeseries_class(name=signal.name, + starting_time=time_in_seconds(signal.t_start), + data=signal, + unit=units.dimensionality.string, + rate=float(sampling_rate), + comments=json.dumps(hierarchy), + **additional_metadata) # todo: try to add array_annotations via "control" attribute elif isinstance(signal, IrregularlySampledSignal): - tS = timeseries_class( - name=signal.name, - data=signal, - unit=units.dimensionality.string, - timestamps=signal.times.rescale('second').magnitude, - comments=json.dumps(hierarchy), - **additional_metadata) + tS = timeseries_class(name=signal.name, + data=signal, + unit=units.dimensionality.string, + timestamps=signal.times.rescale('second').magnitude, + comments=json.dumps(hierarchy), + **additional_metadata) else: raise TypeError( "signal has type {0}, should be AnalogSignal or IrregularlySampledSignal".format( @@ -655,25 +653,23 @@ def _write_spiketrain(self, nwbfile, spiketrain): return nwbfile.units def _write_event(self, nwbfile, event): - hierarchy = {'block': event.segment.block.name, 'segment': event.segment.name} - - if any(event.times.rescale('second').magnitude)==any(event.times.rescale('second').magnitude): # if constant timestamps - tS_evt = TimeSeries( - name=event.name, - data=event.labels, - starting_time=0.0, - rate=0.01, - unit=str(event.units), - description=event.description or "", - comments=json.dumps(hierarchy)) + hierarchy = {'block': event.segment.block.name, 'segment': event.segment.name} + # if constant timestamps + if any(event.times.rescale('second').magnitude) == any(event.times.rescale('second').magnitude): + tS_evt = TimeSeries(name=event.name, + data=event.labels, + starting_time=0.0, + rate=0.01, + unit=str(event.units), + description=event.description or "", + comments=json.dumps(hierarchy)) else: - tS_evt = TimeSeries( - name=event.name, - data=event.labels, - timestamps=event.times.rescale('second').magnitude, - unit=str(event.units), - description=event.description or "", - comments=json.dumps(hierarchy)) + tS_evt = TimeSeries(name=event.name, + data=event.labels, + timestamps=event.times.rescale('second').magnitude, + unit=str(event.units), + description=event.description or "", + comments=json.dumps(hierarchy)) nwbfile.add_acquisition(tS_evt) return tS_evt @@ -698,8 +694,7 @@ def _write_epoch(self, nwbfile, arr2, block): t_stop = arr2[1][i] for k in block.segments: segment_name = k.name - nwbfile.add_trial( - start_time=t_start, + nwbfile.add_trial(start_time=t_start, stop_time=t_stop, tags=[" "], timeseries=[], @@ -724,7 +719,7 @@ def __init__(self, timeseries, nwb_group): self.units = _recompose_unit(timeseries.unit, timeseries.conversion) check_timestamps = check_regular_timestamps(timeseries) if check_timestamps is not None: - timeseries.starting_time=0.0 + timeseries.starting_time = 0.0 if timeseries.starting_time is not None: self.t_start = timeseries.starting_time * pq.s else: @@ -793,28 +788,25 @@ def load(self, time_slice=None, strict_slicing=True): time_slice, strict_slicing=strict_slicing) signal = self._timeseries.data[i_start: i_stop] if self.sampling_rate is None: - return IrregularlySampledSignal( - self._timeseries.timestamps[i_start:i_stop] * pq.s, - # time_units=pq.s, - signal=signal, - units=self.units, - t_start=sig_t_start, - sampling_rate=self.sampling_rate, - name=self.name, - description=self.description, - array_annotations=None, - **self.annotations) # todo: timeseries.control / control_description + return IrregularlySampledSignal(self._timeseries.timestamps[i_start:i_stop] * pq.s, + signal=signal, + units=self.units, + t_start=sig_t_start, + sampling_rate=self.sampling_rate, + name=self.name, + description=self.description, + array_annotations=None, + **self.annotations) # todo: timeseries.control / control_description else: - return AnalogSignal( - signal, - units=self.units, - t_start=sig_t_start, - sampling_rate=self.sampling_rate, - name=self.name, - description=self.description, - array_annotations=None, - **self.annotations) # todo: timeseries.control / control_description + return AnalogSignal(signal, + units=self.units, + t_start=sig_t_start, + sampling_rate=self.sampling_rate, + name=self.name, + description=self.description, + array_annotations=None, + **self.annotations) # todo: timeseries.control / control_description class EventProxy(BaseEventProxy): diff --git a/neo/test/iotest/test_nwbio.py b/neo/test/iotest/test_nwbio.py index e0a0b93d5..74441ba9f 100644 --- a/neo/test/iotest/test_nwbio.py +++ b/neo/test/iotest/test_nwbio.py @@ -34,8 +34,8 @@ @unittest.skipUnless(HAVE_PYNWB, "requires pynwb") -#class TestNWBIO(BaseTestIO, unittest.TestCase): -class TestNWBIO(unittest.TestCase): +class TestNWBIO(BaseTestIO, unittest.TestCase): +#class TestNWBIO(unittest.TestCase): ioclass = NWBIO entities_to_download = ["nwb"] entities_to_test = [ @@ -45,13 +45,12 @@ class TestNWBIO(unittest.TestCase): def test_roundtrip(self): - subject_annotations = { - "nwb:subject_id": "012", - "nwb:age": "P90D", - "nwb:description": "mouse 5", - "nwb:species": "Mus musculus", - "nwb:sex": "M", - } + subject_annotations = {"nwb:subject_id": "012", + "nwb:age": "P90D", + "nwb:description": "mouse 5", + "nwb:species": "Mus musculus", + "nwb:sex": "M", + } annotations = { "session_start_time": datetime.now(), "subject": subject_annotations, @@ -84,7 +83,7 @@ def test_roundtrip(self): seg.block = blk blk.segments.append(seg) - for i, seg in enumerate(blk.segments): # AnalogSignal objects + for i, seg in enumerate(blk.segments):# AnalogSignal objects # 3 Neo AnalogSignals a = AnalogSignal(name='Signal_a %s' % (seg.name), @@ -100,7 +99,8 @@ def test_roundtrip(self): sampling_rate=10 * pq.kHz, t_start=120 * pq.ms) # 1 Neo IrregularlySampledSignals - d = IrregularlySampledSignal([0.01, 0.03, 0.12]*pq.s, [[4, 5], [5, 4], [6, 3]]*pq.nA) + d = IrregularlySampledSignal([0.01, 0.03, 0.12]*pq.s, + [[4, 5], [5, 4], [6, 3]]*pq.nA) # 2 Neo SpikeTrains train = SpikeTrain(times=[1, 2, 3] * pq.s, t_start=1.0, t_stop=10.0) train2 = SpikeTrain(times=[4, 5, 6] * pq.s, t_stop=10.0) @@ -157,28 +157,26 @@ def test_roundtrip(self): assert_array_equal(retrieved_spiketrain_131.times.rescale('ms').magnitude, original_spiketrain_131.times.rescale('ms').magnitude) - # NWBInspector : Inspect NWB files for compliance with NWB Best Practices. + #NWBInspector : Inspect NWB files for compliance with NWB Best Practices. results_roundtrip = list(inspect_nwb(nwbfile_path=test_file_name)) #print("results test_roundtrip NWBInspector = ", results_roundtrip) os.remove(test_file_name) def test_roundtrip_with_annotations(self): - # test with NWB-specific annotations - - subject_annotations = { - "nwb:subject_id": "011", - "nwb:age": "P90D", - "nwb:description": "mouse 4", - "nwb:species": "Mus musculus", - "nwb:sex": "F", - } + #Test with NWB-specific annotations + + subject_annotations = {"nwb:subject_id": "011", + "nwb:age": "P90D", + "nwb:description": "mouse 4", + "nwb:species": "Mus musculus", + "nwb:sex": "F", + } original_block = Block(name="experiment", session_start_time=datetime.now(), - experimenter="Experimenter's name", + experimenter="Experimenter's name", experiment_description="Experiment description", institution="Institution", - subject=subject_annotations, - ) + subject=subject_annotations) segment = Segment(name="session 1") original_block.segments.append(segment) segment.block = original_block @@ -243,7 +241,8 @@ def test_roundtrip_with_annotations(self): self.assertEqual(retrieved_attribute, original_attribute) assert_array_equal(retrieved_response.magnitude, original_response.magnitude) - # NWBInspector : Inspect NWB files for compliance with NWB Best Practices. + #NWBInspector : Inspect NWB files + #for compliance with NWB Best Practices. results_roundtrip_with_annotations = list(inspect_nwb(nwbfile_path=test_file_name)) #print("results test_roundtrip_with_annotations NWBInspector = ", results_roundtrip_with_annotations) @@ -272,31 +271,29 @@ def test_roundtrip_with_not_constant_sampling_rate(self): for j, blk in enumerate(original_blocks): - for ind in range(num_seg): # number of Segments + for ind in range(num_seg): # number of Segments seg = Segment(index=ind) seg.block = blk blk.segments.append(seg) - for i, seg in enumerate(blk.segments): # AnalogSignal objects + for i, seg in enumerate(blk.segments):# AnalogSignal objects a = AnalogSignal(name='Signal_a %s' % (seg.name), signal=np.random.randn(44, num_chan) * pq.nA, sampling_rate=10 * pq.kHz, t_start=50 * pq.ms) - epc = Epoch(times=[0 + i*ind, 10 + i*ind, 33 + i*ind]*pq.s, - durations=[10, 5, 7]*pq.s, - labels=np.array(['btn0', 'btn1', 'btn2'], dtype='U') - ) + epc = Epoch(times [0 + i * ind, 10 + i * ind, 33 + i * ind]*pq.s, + durations=[10, 5, 7] * pq.s, + labels=np.array(['btn0', 'btn1', 'btn2'], dtype='U')) - epc2 = Epoch(times=[0.1 + i*ind, 30 + i*ind, 61 + i*ind]*pq.s, - durations=[10, 5, 7]*pq.s, + epc2 = Epoch(times=[0.1 + i * ind, 30 + i * ind, 61 + i * ind]*pq.s, + durations=[10, 5, 7] * pq.s, labels=np.array(['btn4', 'btn5', 'btn6'])) evt = Event(name='Event', - times=[0.01 + i*ind, 11 + i*ind, 33 + i*ind]*pq.s, - labels=np.array(['ev0', 'ev1', 'ev2']) - ) + times=[0.01 + i * ind, 11 + i * ind, 33 + i * ind]*pq.s, + labels=np.array(['ev0', 'ev1', 'ev2'])) seg.epochs.append(epc) seg.epochs.append(epc2) @@ -308,7 +305,7 @@ def test_roundtrip_with_not_constant_sampling_rate(self): epc2.segment = seg evt.segment = seg - # write to file + #write to file test_file_name = "test_round_trip_with_not_constant_sampling_rate.nwb" iow = NWBIO(filename=test_file_name, mode='w') iow.write_all_blocks(original_blocks) @@ -318,7 +315,8 @@ def test_roundtrip_with_not_constant_sampling_rate(self): self.assertEqual(len(retrieved_blocks), 1) - # NWBInspector : Inspect NWB files for compliance with NWB Best Practices. + #NWBInspector : Inspect NWB files + #for compliance with NWB Best Practices. results_roundtrip_specific_for_epochs = list(inspect_nwb(nwbfile_path=test_file_name)) #print("results test_roundtrip_specific_for_epochs NWBInspector = ", results_roundtrip_specific_for_epochs) From 7bac04c21a67cf1277163f44f3e88c34a0411ef0 Mon Sep 17 00:00:00 2001 From: legouee Date: Thu, 19 May 2022 14:43:17 +0200 Subject: [PATCH 03/15] pep8 fix --- neo/io/nwbio.py | 37 +++++++++++++++++++---------------- neo/test/iotest/test_nwbio.py | 24 +++++++++-------------- 2 files changed, 29 insertions(+), 32 deletions(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index a1db878d9..d844e04f1 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -451,13 +451,13 @@ def write_all_blocks(self, blocks, **kwargs): nwbfile = NWBFile(**annotations) if "subject" not in annotations: nwbfile.subject = Subject( - subject_id="subject_id", - age="P0D",#Period x days old - description="no description", - species="Mus musculus",# by default - sex="U",# unknown - ) - assert self.nwb_file_mode in ('w',) # possibly expand to 'a'ppend later + subject_id="subject_id", + age="P0D",#Period x days old + description="no description", + species="Mus musculus",# by default + sex="U",# unknown + ) + assert self.nwb_file_mode in ('w',) # possibly expand to 'a'ppend later if self.nwb_file_mode == "w" and os.path.exists(self.filename): os.remove(self.filename) io_nwb = pynwb.NWBHDF5IO(self.filename, mode=self.nwb_file_mode) @@ -475,7 +475,7 @@ def write_all_blocks(self, blocks, **kwargs): nwbfile.add_trial_column('segment', 'name of the Segment to which the Epoch belongs') nwbfile.add_trial_column('block', 'name of the Block to which the Epoch belongs') - arr = [[], []]# epoch array for ascending t_start and t_stop + arr=[[], []]# epoch array for ascending t_start and t_stop for i, block in enumerate(blocks): block_name = block.name self.write_block(nwbfile, block, arr) @@ -617,12 +617,12 @@ def _write_signal(self, nwbfile, signal, electrodes): **additional_metadata) # todo: try to add array_annotations via "control" attribute elif isinstance(signal, IrregularlySampledSignal): - tS = timeseries_class(name=signal.name, - data=signal, - unit=units.dimensionality.string, - timestamps=signal.times.rescale('second').magnitude, - comments=json.dumps(hierarchy), - **additional_metadata) + tS = timeseries_class(name=signal.name, + data=signal, + unit=units.dimensionality.string, + timestamps=signal.times.rescale('second').magnitude, + comments=json.dumps(hierarchy), + **additional_metadata) else: raise TypeError( "signal has type {0}, should be AnalogSignal or IrregularlySampledSignal".format( @@ -655,7 +655,8 @@ def _write_spiketrain(self, nwbfile, spiketrain): def _write_event(self, nwbfile, event): hierarchy = {'block': event.segment.block.name, 'segment': event.segment.name} # if constant timestamps - if any(event.times.rescale('second').magnitude) == any(event.times.rescale('second').magnitude): + timestamps=event.times.rescale('second').magnitude + if any(timestamps) == any(timestamps): tS_evt = TimeSeries(name=event.name, data=event.labels, starting_time=0.0, @@ -796,7 +797,8 @@ def load(self, time_slice=None, strict_slicing=True): name=self.name, description=self.description, array_annotations=None, - **self.annotations) # todo: timeseries.control / control_description + **self.annotations) + # todo: timeseries.control / control_description else: return AnalogSignal(signal, @@ -806,7 +808,8 @@ def load(self, time_slice=None, strict_slicing=True): name=self.name, description=self.description, array_annotations=None, - **self.annotations) # todo: timeseries.control / control_description + **self.annotations) + # todo: timeseries.control / control_description class EventProxy(BaseEventProxy): diff --git a/neo/test/iotest/test_nwbio.py b/neo/test/iotest/test_nwbio.py index 74441ba9f..8a2ad41db 100644 --- a/neo/test/iotest/test_nwbio.py +++ b/neo/test/iotest/test_nwbio.py @@ -34,8 +34,8 @@ @unittest.skipUnless(HAVE_PYNWB, "requires pynwb") -class TestNWBIO(BaseTestIO, unittest.TestCase): #class TestNWBIO(unittest.TestCase): +class TestNWBIO(BaseTestIO, unittest.TestCase): ioclass = NWBIO entities_to_download = ["nwb"] entities_to_test = [ @@ -49,8 +49,7 @@ def test_roundtrip(self): "nwb:age": "P90D", "nwb:description": "mouse 5", "nwb:species": "Mus musculus", - "nwb:sex": "M", - } + "nwb:sex": "M"} annotations = { "session_start_time": datetime.now(), "subject": subject_annotations, @@ -84,7 +83,6 @@ def test_roundtrip(self): blk.segments.append(seg) for i, seg in enumerate(blk.segments):# AnalogSignal objects - # 3 Neo AnalogSignals a = AnalogSignal(name='Signal_a %s' % (seg.name), signal=np.random.randn(44, num_chan) * pq.nA, @@ -99,8 +97,8 @@ def test_roundtrip(self): sampling_rate=10 * pq.kHz, t_start=120 * pq.ms) # 1 Neo IrregularlySampledSignals - d = IrregularlySampledSignal([0.01, 0.03, 0.12]*pq.s, - [[4, 5], [5, 4], [6, 3]]*pq.nA) + d = IrregularlySampledSignal([0.01, 0.03, 0.12] * pq.s, + [[4, 5], [5, 4], [6, 3]] * pq.nA) # 2 Neo SpikeTrains train = SpikeTrain(times=[1, 2, 3] * pq.s, t_start=1.0, t_stop=10.0) train2 = SpikeTrain(times=[4, 5, 6] * pq.s, t_stop=10.0) @@ -159,7 +157,6 @@ def test_roundtrip(self): #NWBInspector : Inspect NWB files for compliance with NWB Best Practices. results_roundtrip = list(inspect_nwb(nwbfile_path=test_file_name)) - #print("results test_roundtrip NWBInspector = ", results_roundtrip) os.remove(test_file_name) @@ -170,8 +167,7 @@ def test_roundtrip_with_annotations(self): "nwb:age": "P90D", "nwb:description": "mouse 4", "nwb:species": "Mus musculus", - "nwb:sex": "F", - } + "nwb:sex": "F"} original_block = Block(name="experiment", session_start_time=datetime.now(), experimenter="Experimenter's name", experiment_description="Experiment description", @@ -244,8 +240,7 @@ def test_roundtrip_with_annotations(self): #NWBInspector : Inspect NWB files #for compliance with NWB Best Practices. results_roundtrip_with_annotations = list(inspect_nwb(nwbfile_path=test_file_name)) - #print("results test_roundtrip_with_annotations NWBInspector = ", results_roundtrip_with_annotations) - + os.remove(test_file_name) def test_roundtrip_with_not_constant_sampling_rate(self): @@ -283,16 +278,16 @@ def test_roundtrip_with_not_constant_sampling_rate(self): sampling_rate=10 * pq.kHz, t_start=50 * pq.ms) - epc = Epoch(times [0 + i * ind, 10 + i * ind, 33 + i * ind]*pq.s, + epc = Epoch(times=[0 + i * ind, 10 + i * ind, 33 + i * ind] * pq.s, durations=[10, 5, 7] * pq.s, labels=np.array(['btn0', 'btn1', 'btn2'], dtype='U')) - epc2 = Epoch(times=[0.1 + i * ind, 30 + i * ind, 61 + i * ind]*pq.s, + epc2 = Epoch(times=[0.1 + i * ind, 30 + i * ind, 61 + i * ind] * pq.s, durations=[10, 5, 7] * pq.s, labels=np.array(['btn4', 'btn5', 'btn6'])) evt = Event(name='Event', - times=[0.01 + i * ind, 11 + i * ind, 33 + i * ind]*pq.s, + times=[0.01 + i * ind, 11 + i * ind, 33 + i * ind] * pq.s, labels=np.array(['ev0', 'ev1', 'ev2'])) seg.epochs.append(epc) @@ -318,7 +313,6 @@ def test_roundtrip_with_not_constant_sampling_rate(self): #NWBInspector : Inspect NWB files #for compliance with NWB Best Practices. results_roundtrip_specific_for_epochs = list(inspect_nwb(nwbfile_path=test_file_name)) - #print("results test_roundtrip_specific_for_epochs NWBInspector = ", results_roundtrip_specific_for_epochs) os.remove(test_file_name) From 2018e31875b9beaf9c1e82faceed6e97748ba649 Mon Sep 17 00:00:00 2001 From: legouee Date: Thu, 19 May 2022 14:50:58 +0200 Subject: [PATCH 04/15] pep8 fix --- neo/io/nwbio.py | 26 ++++++++++++-------------- neo/test/iotest/test_nwbio.py | 12 ++++-------- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index d844e04f1..5fee939dc 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -450,14 +450,12 @@ def write_all_blocks(self, blocks, **kwargs): self.annotations = {"subject": "subject"} nwbfile = NWBFile(**annotations) if "subject" not in annotations: - nwbfile.subject = Subject( - subject_id="subject_id", - age="P0D",#Period x days old - description="no description", - species="Mus musculus",# by default - sex="U",# unknown - ) - assert self.nwb_file_mode in ('w',) # possibly expand to 'a'ppend later + nwbfile.subject = Subject(subject_id="subject_id", + age="P0D",#Period x days old + description="no description", + species="Mus musculus",#by default + sex="U") + assert self.nwb_file_mode in ('w',)#possibly expand to 'a'ppend later if self.nwb_file_mode == "w" and os.path.exists(self.filename): os.remove(self.filename) io_nwb = pynwb.NWBHDF5IO(self.filename, mode=self.nwb_file_mode) @@ -475,7 +473,7 @@ def write_all_blocks(self, blocks, **kwargs): nwbfile.add_trial_column('segment', 'name of the Segment to which the Epoch belongs') nwbfile.add_trial_column('block', 'name of the Block to which the Epoch belongs') - arr=[[], []]# epoch array for ascending t_start and t_stop + arr = [[], []]#epoch array for ascending t_start and t_stop for i, block in enumerate(blocks): block_name = block.name self.write_block(nwbfile, block, arr) @@ -655,7 +653,7 @@ def _write_spiketrain(self, nwbfile, spiketrain): def _write_event(self, nwbfile, event): hierarchy = {'block': event.segment.block.name, 'segment': event.segment.name} # if constant timestamps - timestamps=event.times.rescale('second').magnitude + timestamps = event.times.rescale('second').magnitude if any(timestamps) == any(timestamps): tS_evt = TimeSeries(name=event.name, data=event.labels, @@ -797,8 +795,8 @@ def load(self, time_slice=None, strict_slicing=True): name=self.name, description=self.description, array_annotations=None, - **self.annotations) - # todo: timeseries.control / control_description + **self.annotations) + # todo: timeseries.control / control_description else: return AnalogSignal(signal, @@ -808,8 +806,8 @@ def load(self, time_slice=None, strict_slicing=True): name=self.name, description=self.description, array_annotations=None, - **self.annotations) - # todo: timeseries.control / control_description + **self.annotations) + # todo: timeseries.control / control_description class EventProxy(BaseEventProxy): diff --git a/neo/test/iotest/test_nwbio.py b/neo/test/iotest/test_nwbio.py index 8a2ad41db..2f5c30975 100644 --- a/neo/test/iotest/test_nwbio.py +++ b/neo/test/iotest/test_nwbio.py @@ -83,6 +83,7 @@ def test_roundtrip(self): blk.segments.append(seg) for i, seg in enumerate(blk.segments):# AnalogSignal objects + # 3 Neo AnalogSignals a = AnalogSignal(name='Signal_a %s' % (seg.name), signal=np.random.randn(44, num_chan) * pq.nA, @@ -155,14 +156,13 @@ def test_roundtrip(self): assert_array_equal(retrieved_spiketrain_131.times.rescale('ms').magnitude, original_spiketrain_131.times.rescale('ms').magnitude) - #NWBInspector : Inspect NWB files for compliance with NWB Best Practices. results_roundtrip = list(inspect_nwb(nwbfile_path=test_file_name)) os.remove(test_file_name) def test_roundtrip_with_annotations(self): - #Test with NWB-specific annotations + #Test with NWB-specific annotations subject_annotations = {"nwb:subject_id": "011", "nwb:age": "P90D", "nwb:description": "mouse 4", @@ -237,8 +237,6 @@ def test_roundtrip_with_annotations(self): self.assertEqual(retrieved_attribute, original_attribute) assert_array_equal(retrieved_response.magnitude, original_response.magnitude) - #NWBInspector : Inspect NWB files - #for compliance with NWB Best Practices. results_roundtrip_with_annotations = list(inspect_nwb(nwbfile_path=test_file_name)) os.remove(test_file_name) @@ -267,12 +265,13 @@ def test_roundtrip_with_not_constant_sampling_rate(self): for j, blk in enumerate(original_blocks): for ind in range(num_seg): # number of Segments + seg = Segment(index=ind) seg.block = blk blk.segments.append(seg) for i, seg in enumerate(blk.segments):# AnalogSignal objects - + a = AnalogSignal(name='Signal_a %s' % (seg.name), signal=np.random.randn(44, num_chan) * pq.nA, sampling_rate=10 * pq.kHz, @@ -300,7 +299,6 @@ def test_roundtrip_with_not_constant_sampling_rate(self): epc2.segment = seg evt.segment = seg - #write to file test_file_name = "test_round_trip_with_not_constant_sampling_rate.nwb" iow = NWBIO(filename=test_file_name, mode='w') iow.write_all_blocks(original_blocks) @@ -310,8 +308,6 @@ def test_roundtrip_with_not_constant_sampling_rate(self): self.assertEqual(len(retrieved_blocks), 1) - #NWBInspector : Inspect NWB files - #for compliance with NWB Best Practices. results_roundtrip_specific_for_epochs = list(inspect_nwb(nwbfile_path=test_file_name)) os.remove(test_file_name) From 891cc4ef29e2cda8687b4075b678bfd7e7d222db Mon Sep 17 00:00:00 2001 From: legouee Date: Thu, 19 May 2022 14:56:22 +0200 Subject: [PATCH 05/15] pep8 fix --- neo/io/nwbio.py | 8 ++++---- neo/test/iotest/test_nwbio.py | 9 ++++----- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index 5fee939dc..e5c2cf42a 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -451,11 +451,11 @@ def write_all_blocks(self, blocks, **kwargs): nwbfile = NWBFile(**annotations) if "subject" not in annotations: nwbfile.subject = Subject(subject_id="subject_id", - age="P0D",#Period x days old + age="P0D", # Period x days old description="no description", - species="Mus musculus",#by default + species="Mus musculus", # by default sex="U") - assert self.nwb_file_mode in ('w',)#possibly expand to 'a'ppend later + assert self.nwb_file_mode in ('w',) # possibly expand to 'a'ppend later if self.nwb_file_mode == "w" and os.path.exists(self.filename): os.remove(self.filename) io_nwb = pynwb.NWBHDF5IO(self.filename, mode=self.nwb_file_mode) @@ -473,7 +473,7 @@ def write_all_blocks(self, blocks, **kwargs): nwbfile.add_trial_column('segment', 'name of the Segment to which the Epoch belongs') nwbfile.add_trial_column('block', 'name of the Block to which the Epoch belongs') - arr = [[], []]#epoch array for ascending t_start and t_stop + arr = [[], []] # epoch array for ascending t_start and t_stop for i, block in enumerate(blocks): block_name = block.name self.write_block(nwbfile, block, arr) diff --git a/neo/test/iotest/test_nwbio.py b/neo/test/iotest/test_nwbio.py index 2f5c30975..71661d592 100644 --- a/neo/test/iotest/test_nwbio.py +++ b/neo/test/iotest/test_nwbio.py @@ -34,7 +34,6 @@ @unittest.skipUnless(HAVE_PYNWB, "requires pynwb") -#class TestNWBIO(unittest.TestCase): class TestNWBIO(BaseTestIO, unittest.TestCase): ioclass = NWBIO entities_to_download = ["nwb"] @@ -82,7 +81,7 @@ def test_roundtrip(self): seg.block = blk blk.segments.append(seg) - for i, seg in enumerate(blk.segments):# AnalogSignal objects + for i, seg in enumerate(blk.segments): # AnalogSignal objects # 3 Neo AnalogSignals a = AnalogSignal(name='Signal_a %s' % (seg.name), @@ -162,7 +161,7 @@ def test_roundtrip(self): def test_roundtrip_with_annotations(self): - #Test with NWB-specific annotations + # Test with NWB-specific annotations subject_annotations = {"nwb:subject_id": "011", "nwb:age": "P90D", "nwb:description": "mouse 4", @@ -264,13 +263,13 @@ def test_roundtrip_with_not_constant_sampling_rate(self): for j, blk in enumerate(original_blocks): - for ind in range(num_seg): # number of Segments + for ind in range(num_seg): # number of Segments seg = Segment(index=ind) seg.block = blk blk.segments.append(seg) - for i, seg in enumerate(blk.segments):# AnalogSignal objects + for i, seg in enumerate(blk.segments): # AnalogSignal objects a = AnalogSignal(name='Signal_a %s' % (seg.name), signal=np.random.randn(44, num_chan) * pq.nA, From 928cbf139939f2b1341ed043a7b4f00a69e48f38 Mon Sep 17 00:00:00 2001 From: legouee Date: Thu, 19 May 2022 15:26:13 +0200 Subject: [PATCH 06/15] requirements testing nwbinspector --- neo/io/nwbio.py | 10 +++++++--- requirements_testing.txt | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index e5c2cf42a..d9066e80d 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -57,9 +57,13 @@ except ImportError: have_pynwb = False -import nwbinspector -from nwbinspector import inspect_nwb, check_regular_timestamps - +try: + import nwbinspector + from nwbinspector import inspect_nwb, check_regular_timestamps + have_nwbinspector = True +except ImportError: + have_nwbinspector = False + # hdmf imports try: from hdmf.spec import (LinkSpec, GroupSpec, DatasetSpec, SpecNamespace, diff --git a/requirements_testing.txt b/requirements_testing.txt index 18b14b196..f22ab9a7c 100644 --- a/requirements_testing.txt +++ b/requirements_testing.txt @@ -14,4 +14,5 @@ coveralls pillow sonpy pynwb +nwbinspector probeinterface From 5d7748cb38ebf0243c1a453731b9d494c023ac09 Mon Sep 17 00:00:00 2001 From: legouee Date: Fri, 20 May 2022 10:09:34 +0200 Subject: [PATCH 07/15] pep8 fix --- neo/io/nwbio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index d9066e80d..d668388df 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -63,7 +63,7 @@ have_nwbinspector = True except ImportError: have_nwbinspector = False - + # hdmf imports try: from hdmf.spec import (LinkSpec, GroupSpec, DatasetSpec, SpecNamespace, From 530de1fe4a38651a9f8a07687b607308f7fc8552 Mon Sep 17 00:00:00 2001 From: legouee Date: Fri, 20 May 2022 15:10:06 +0200 Subject: [PATCH 08/15] Adding a condition on nwbinspector package --- neo/io/nwbio.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index d668388df..2787f4ce2 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -253,6 +253,8 @@ def __init__(self, filename, mode='r'): raise Exception("Please install the pynwb package to use NWBIO") if not have_hdmf: raise Exception("Please install the hdmf package to use NWBIO") + if not have_nwbinspector: + raise Exception("Please install the nwbinspector package to use NWBIO") BaseIO.__init__(self, filename=filename) self.filename = filename self.blocks_written = 0 From 78998f7afd4f8719a3ace461142e1d6e158a56ea Mon Sep 17 00:00:00 2001 From: legouee Date: Wed, 29 Jun 2022 14:20:55 +0200 Subject: [PATCH 09/15] Extra installation requirement --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 7bb3cd105..cf3118814 100755 --- a/setup.py +++ b/setup.py @@ -12,7 +12,8 @@ 'neomatlabio': ['scipy>=1.0.0'], 'nixio': ['nixio>=1.5.0'], 'stimfitio': ['stfio'], - 'tiffio': ['pillow'] + 'tiffio': ['pillow'], + 'nwbio': ['pynwb', 'nwbinspector'] } with open("neo/version.py") as fp: From be7868c62be82fab592056cb388695da4de9fa8b Mon Sep 17 00:00:00 2001 From: legouee Date: Wed, 29 Jun 2022 14:24:23 +0200 Subject: [PATCH 10/15] Remove duplicate self.annotation --- neo/io/nwbio.py | 1 - 1 file changed, 1 deletion(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index 2787f4ce2..29a218092 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -453,7 +453,6 @@ def write_all_blocks(self, blocks, **kwargs): raise Exception("Writing to NWB requires an annotation 'session_start_time'") self.annotations = {"rec_datetime": "rec_datetime"} self.annotations["rec_datetime"] = blocks[0].rec_datetime - self.annotations = {"subject": "subject"} nwbfile = NWBFile(**annotations) if "subject" not in annotations: nwbfile.subject = Subject(subject_id="subject_id", From 9e55cd7a0b18273d45ef22007d82fc70ff77133b Mon Sep 17 00:00:00 2001 From: legouee Date: Wed, 29 Jun 2022 14:26:54 +0200 Subject: [PATCH 11/15] Harmonize arguments in self.write_block and self._write_epoch --- neo/io/nwbio.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index 29a218092..48fdb6149 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -483,7 +483,7 @@ def write_all_blocks(self, blocks, **kwargs): block_name = block.name self.write_block(nwbfile, block, arr) arr2 = np.sort(arr) - self._write_epoch(nwbfile, arr2, block) + self._write_epoch(nwbfile, block, arr2) io_nwb.write(nwbfile) io_nwb.close() @@ -692,7 +692,7 @@ def _write_manage_epoch(self, nwbfile, segment, epoch, arr): arr[0].append(t_start) arr[1].append(t_stop) - def _write_epoch(self, nwbfile, arr2, block): + def _write_epoch(self, nwbfile, block, arr2): for i in range(len(arr2[0])): t_start = arr2[0][i] t_stop = arr2[1][i] From a3e31e99663513b4462ba9ef2af6354afb737acc Mon Sep 17 00:00:00 2001 From: legouee Date: Wed, 29 Jun 2022 14:45:58 +0200 Subject: [PATCH 12/15] add_epoch concept --- neo/io/nwbio.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index 48fdb6149..3cad637bb 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -475,8 +475,12 @@ def write_all_blocks(self, blocks, **kwargs): 'block', 'the name of the Neo Block to which the SpikeTrain belongs') if sum(statistics(block)["Epoch"]["count"] for block in blocks) > 0: - nwbfile.add_trial_column('segment', 'name of the Segment to which the Epoch belongs') - nwbfile.add_trial_column('block', 'name of the Block to which the Epoch belongs') + nwbfile.add_epoch_column('_name', 'the name attribute of the Epoch') + # nwbfile.add_epoch_column('_description', 'the description attribute of the Epoch') + nwbfile.add_epoch_column( + 'segment', 'the name of the Neo Segment to which the Epoch belongs') + nwbfile.add_epoch_column('block', + 'the name of the Neo Block to which the Epoch belongs') arr = [[], []] # epoch array for ascending t_start and t_stop for i, block in enumerate(blocks): @@ -698,13 +702,13 @@ def _write_epoch(self, nwbfile, block, arr2): t_stop = arr2[1][i] for k in block.segments: segment_name = k.name - nwbfile.add_trial(start_time=t_start, - stop_time=t_stop, - tags=[" "], + nwbfile.add_epoch(start_time=t_start, + stop_time=t_stop, + tags=[" "], timeseries=[], + _name=k.name, segment=segment_name, - block=block.name, - ) + block=block.name) return nwbfile.epochs From 6f54b493157d622c57d257831a838f4979ffc47c Mon Sep 17 00:00:00 2001 From: legouee Date: Wed, 29 Jun 2022 15:20:55 +0200 Subject: [PATCH 13/15] Neo dummy subject --- neo/io/nwbio.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index 3cad637bb..6e2dbad85 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -455,6 +455,17 @@ def write_all_blocks(self, blocks, **kwargs): self.annotations["rec_datetime"] = blocks[0].rec_datetime nwbfile = NWBFile(**annotations) if "subject" not in annotations: + # create neo dummy subject + # All following arguments are decided by this IO and are free + read_neo_dummy_params = { + Subject: [ + ("subject_id", {"value": "subject_id", "label": "empty_neo_subject_id"}), + ("age", {"value": "P0D", "label": "Period x days old"}), + ("description", {"value": "no description", "label": "Description"}), + ("species", {"value": "Mus musculus", "label": "Species by default"}), + ("sex", {"value": "U", "label": "Sex unknown"}), + ], + } nwbfile.subject = Subject(subject_id="subject_id", age="P0D", # Period x days old description="no description", From 53b9cc2b194328c31e6b14ec6434dea8f75170cc Mon Sep 17 00:00:00 2001 From: legouee Date: Thu, 30 Jun 2022 14:22:23 +0200 Subject: [PATCH 14/15] Updated setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 17219ae6d..1267e57d7 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ 'nixio': ['nixio>=1.5.0'], 'stimfitio': ['stfio'], 'tiffio': ['pillow'], - 'nwbio': ['pynwb', 'nwbinspector'] + 'nwbio': ['pynwb', 'nwbinspector'], 'edf': ['pyedflib'] } From 114c31fd77c0d543bbc5b836c538810e42ae16d3 Mon Sep 17 00:00:00 2001 From: legouee Date: Fri, 1 Jul 2022 15:20:03 +0200 Subject: [PATCH 15/15] Fix write_event comment --- neo/io/nwbio.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/neo/io/nwbio.py b/neo/io/nwbio.py index 24b6fa35c..32c55bc93 100644 --- a/neo/io/nwbio.py +++ b/neo/io/nwbio.py @@ -677,9 +677,10 @@ def _write_spiketrain(self, nwbfile, spiketrain): return nwbfile.units def _write_event(self, nwbfile, event): + segment = event.segment if hasattr(event, 'proxy_for') and event.proxy_for == Event: event = event.load() - hierarchy = {'block': event.segment.block.name, 'segment': event.segment.name} + hierarchy = {'block': segment.block.name, 'segment': segment.name} # if constant timestamps timestamps = event.times.rescale('second').magnitude if any(timestamps) == any(timestamps):