diff --git a/pyproject.toml b/pyproject.toml index 5826da5..480b6ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ dynamic = ["version"] dependencies = [ "array-api-compat>=1.11.1", "ezmsg[axisarray]>=3.7.3", - "ezmsg-baseproc>=1.5.1", + "ezmsg-baseproc>=1.6.0", "mlx>=0.18.0; sys_platform == 'darwin' and platform_machine == 'arm64'", "numba>=0.61.0", "numpy>=1.26.0", diff --git a/src/ezmsg/sigproc/messages.py b/src/ezmsg/sigproc/messages.py index 08ec430..6ae5f6f 100644 --- a/src/ezmsg/sigproc/messages.py +++ b/src/ezmsg/sigproc/messages.py @@ -24,7 +24,7 @@ def TSMessage( ) -> AxisArray: dims = [f"dim_{i}" for i in range(data.ndim)] dims[time_dim] = "time" - offset = time.time() if timestamp is None else timestamp + offset = time.monotonic() if timestamp is None else timestamp offset_adj = data.shape[time_dim] / fs # offset corresponds to idx[0] on time_dim axis = AxisArray.TimeAxis(fs, offset=offset - offset_adj) return AxisArray(data, dims=dims, axes=dict(time=axis)) diff --git a/src/ezmsg/sigproc/resample.py b/src/ezmsg/sigproc/resample.py index 58445e6..651092a 100644 --- a/src/ezmsg/sigproc/resample.py +++ b/src/ezmsg/sigproc/resample.py @@ -75,7 +75,7 @@ class ResampleState: last_write_time: float = -np.inf """ - Wall clock time of the last write to the signal buffer. + Monotonic time of the last write to the signal buffer. This is used to determine if we need to extrapolate the reference axis if we have not received an update within max_chunk_delay. """ @@ -142,7 +142,7 @@ def _process(self, message: AxisArray) -> None: synth_ref_axis = LinearAxis(unit="s", gain=out_gain, offset=prev_t_end + out_gain) self.state.ref_axis_buffer.write(synth_ref_axis, n_samples=n_synth) - self.state.last_write_time = time.time() + self.state.last_write_time = time.monotonic() def __next__(self) -> AxisArray: if self.state.src_buffer is None or self.state.ref_axis_buffer is None: @@ -176,7 +176,7 @@ def __next__(self) -> AxisArray: # If we do not rely on an external reference, and we have not received new data in a while, # then extrapolate our reference vector out beyond the delay limit. - b_project = self.settings.resample_rate is not None and time.time() > ( + b_project = self.settings.resample_rate is not None and time.monotonic() > ( self.state.last_write_time + self.settings.max_chunk_delay ) if b_project: diff --git a/tests/helpers/synth.py b/tests/helpers/synth.py index dbdb9d1..4d5eef0 100644 --- a/tests/helpers/synth.py +++ b/tests/helpers/synth.py @@ -33,7 +33,7 @@ class Counter(ez.Unit): async def initialize(self) -> None: self._counter = 0 self._n_sent = 0 - self._t0 = time.time() + self._t0 = time.monotonic() @ez.publisher(OUTPUT_SIGNAL) async def produce(self) -> typing.AsyncGenerator: @@ -42,7 +42,7 @@ async def produce(self) -> typing.AsyncGenerator: if self.SETTINGS.dispatch_rate is not None: n_disp = 1 + self._n_sent / self.SETTINGS.n_time t_next = self._t0 + n_disp / self.SETTINGS.dispatch_rate - sleep_time = t_next - time.time() + sleep_time = t_next - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) @@ -87,7 +87,7 @@ class WhiteNoise(ez.Unit): async def initialize(self) -> None: self._n_sent = 0 - self._t0 = time.time() + self._t0 = time.monotonic() @ez.publisher(OUTPUT_SIGNAL) async def produce(self) -> typing.AsyncGenerator: @@ -96,7 +96,7 @@ async def produce(self) -> typing.AsyncGenerator: if self.SETTINGS.dispatch_rate is not None: n_disp = 1 + self._n_sent / self.SETTINGS.n_time t_next = self._t0 + n_disp / self.SETTINGS.dispatch_rate - sleep_time = t_next - time.time() + sleep_time = t_next - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) @@ -145,7 +145,7 @@ class Oscillator(ez.Unit): async def initialize(self) -> None: self._n_sent = 0 - self._t0 = time.time() + self._t0 = time.monotonic() # Calculate synchronized frequency if requested self._freq = self.SETTINGS.freq @@ -162,7 +162,7 @@ async def produce(self) -> typing.AsyncGenerator: # Realtime mode: sleep until wall-clock time matches sample time n_next = self._n_sent + self.SETTINGS.n_time t_next = self._t0 + n_next / self.SETTINGS.fs - sleep_time = t_next - time.time() + sleep_time = t_next - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) offset = t_next - self.SETTINGS.n_time / self.SETTINGS.fs @@ -170,7 +170,7 @@ async def produce(self) -> typing.AsyncGenerator: # Manual dispatch rate mode n_disp = 1 + self._n_sent / self.SETTINGS.n_time t_next = self._t0 + n_disp / self.SETTINGS.dispatch_rate - sleep_time = t_next - time.time() + sleep_time = t_next - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) offset = self._n_sent / self.SETTINGS.fs @@ -218,7 +218,7 @@ class Clock(ez.Unit): SETTINGS: ez.Settings async def initialize(self) -> None: - self._t0 = time.time() + self._t0 = time.monotonic() self._n_dispatch = 0 @ez.publisher(OUTPUT_SIGNAL) @@ -226,7 +226,7 @@ async def produce(self) -> typing.AsyncGenerator: while True: if hasattr(self.SETTINGS, "dispatch_rate") and self.SETTINGS.dispatch_rate is not None: target_time = self._t0 + (self._n_dispatch + 1) / self.SETTINGS.dispatch_rate - sleep_time = target_time - time.time() + sleep_time = target_time - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) diff --git a/tests/integration/ezmsg/test_filter_system.py b/tests/integration/ezmsg/test_filter_system.py index 2c5098b..db3f5fe 100644 --- a/tests/integration/ezmsg/test_filter_system.py +++ b/tests/integration/ezmsg/test_filter_system.py @@ -62,9 +62,16 @@ def test_filter_system(filter_type: str, coef_type: str, order: int): inputs = AxisArray.concatenate(*[_ for _ in message_log(test_filename_raw)], dim="time") outputs = AxisArray.concatenate(*messages, dim="time") + # Truncate to common length: the raw and filtered loggers are on separate + # pipeline paths, so a termination race can cause different message counts. + n_samples = min(inputs.data.shape[0], outputs.data.shape[0]) + assert n_samples > 0 + inputs_data = inputs.data[:n_samples] + outputs_data = outputs.data[:n_samples] + if order == 0: # Passthrough - assert np.allclose(outputs.data, inputs.data) + assert np.allclose(outputs_data, inputs_data) return # Calculate expected @@ -99,8 +106,8 @@ def test_filter_system(filter_type: str, coef_type: str, order: int): system = scipy.signal.dlti(*coefs) coefs = (system.num, system.den) zi = scipy.signal.lfilter_zi(*coefs)[:, None] - expected, _ = scipy.signal.lfilter(coefs[0], coefs[1], inputs.data, axis=inputs.get_axis_idx("time"), zi=zi) + expected, _ = scipy.signal.lfilter(coefs[0], coefs[1], inputs_data, axis=inputs.get_axis_idx("time"), zi=zi) else: # coef_type == "sos": zi = scipy.signal.sosfilt_zi(coefs)[:, :, None] + np.zeros((1, 1, n_ch)) - expected, _ = scipy.signal.sosfilt(coefs, inputs.data, axis=inputs.get_axis_idx("time"), zi=zi) - assert np.allclose(outputs.data, expected) + expected, _ = scipy.signal.sosfilt(coefs, inputs_data, axis=inputs.get_axis_idx("time"), zi=zi) + assert np.allclose(outputs_data, expected)