From 8e97cff26100f49bd87e4cb70dd434c8e21a4dc3 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Wed, 25 Feb 2026 01:52:16 -0500 Subject: [PATCH 1/3] Replace time.time with time.monotonic --- src/ezmsg/sigproc/messages.py | 2 +- src/ezmsg/sigproc/resample.py | 6 +++--- tests/helpers/synth.py | 18 +++++++++--------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/ezmsg/sigproc/messages.py b/src/ezmsg/sigproc/messages.py index 08ec4307..6ae5f6fb 100644 --- a/src/ezmsg/sigproc/messages.py +++ b/src/ezmsg/sigproc/messages.py @@ -24,7 +24,7 @@ def TSMessage( ) -> AxisArray: dims = [f"dim_{i}" for i in range(data.ndim)] dims[time_dim] = "time" - offset = time.time() if timestamp is None else timestamp + offset = time.monotonic() if timestamp is None else timestamp offset_adj = data.shape[time_dim] / fs # offset corresponds to idx[0] on time_dim axis = AxisArray.TimeAxis(fs, offset=offset - offset_adj) return AxisArray(data, dims=dims, axes=dict(time=axis)) diff --git a/src/ezmsg/sigproc/resample.py b/src/ezmsg/sigproc/resample.py index 58445e61..651092a0 100644 --- a/src/ezmsg/sigproc/resample.py +++ b/src/ezmsg/sigproc/resample.py @@ -75,7 +75,7 @@ class ResampleState: last_write_time: float = -np.inf """ - Wall clock time of the last write to the signal buffer. + Monotonic time of the last write to the signal buffer. This is used to determine if we need to extrapolate the reference axis if we have not received an update within max_chunk_delay. """ @@ -142,7 +142,7 @@ def _process(self, message: AxisArray) -> None: synth_ref_axis = LinearAxis(unit="s", gain=out_gain, offset=prev_t_end + out_gain) self.state.ref_axis_buffer.write(synth_ref_axis, n_samples=n_synth) - self.state.last_write_time = time.time() + self.state.last_write_time = time.monotonic() def __next__(self) -> AxisArray: if self.state.src_buffer is None or self.state.ref_axis_buffer is None: @@ -176,7 +176,7 @@ def __next__(self) -> AxisArray: # If we do not rely on an external reference, and we have not received new data in a while, # then extrapolate our reference vector out beyond the delay limit. - b_project = self.settings.resample_rate is not None and time.time() > ( + b_project = self.settings.resample_rate is not None and time.monotonic() > ( self.state.last_write_time + self.settings.max_chunk_delay ) if b_project: diff --git a/tests/helpers/synth.py b/tests/helpers/synth.py index dbdb9d1d..4d5eef02 100644 --- a/tests/helpers/synth.py +++ b/tests/helpers/synth.py @@ -33,7 +33,7 @@ class Counter(ez.Unit): async def initialize(self) -> None: self._counter = 0 self._n_sent = 0 - self._t0 = time.time() + self._t0 = time.monotonic() @ez.publisher(OUTPUT_SIGNAL) async def produce(self) -> typing.AsyncGenerator: @@ -42,7 +42,7 @@ async def produce(self) -> typing.AsyncGenerator: if self.SETTINGS.dispatch_rate is not None: n_disp = 1 + self._n_sent / self.SETTINGS.n_time t_next = self._t0 + n_disp / self.SETTINGS.dispatch_rate - sleep_time = t_next - time.time() + sleep_time = t_next - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) @@ -87,7 +87,7 @@ class WhiteNoise(ez.Unit): async def initialize(self) -> None: self._n_sent = 0 - self._t0 = time.time() + self._t0 = time.monotonic() @ez.publisher(OUTPUT_SIGNAL) async def produce(self) -> typing.AsyncGenerator: @@ -96,7 +96,7 @@ async def produce(self) -> typing.AsyncGenerator: if self.SETTINGS.dispatch_rate is not None: n_disp = 1 + self._n_sent / self.SETTINGS.n_time t_next = self._t0 + n_disp / self.SETTINGS.dispatch_rate - sleep_time = t_next - time.time() + sleep_time = t_next - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) @@ -145,7 +145,7 @@ class Oscillator(ez.Unit): async def initialize(self) -> None: self._n_sent = 0 - self._t0 = time.time() + self._t0 = time.monotonic() # Calculate synchronized frequency if requested self._freq = self.SETTINGS.freq @@ -162,7 +162,7 @@ async def produce(self) -> typing.AsyncGenerator: # Realtime mode: sleep until wall-clock time matches sample time n_next = self._n_sent + self.SETTINGS.n_time t_next = self._t0 + n_next / self.SETTINGS.fs - sleep_time = t_next - time.time() + sleep_time = t_next - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) offset = t_next - self.SETTINGS.n_time / self.SETTINGS.fs @@ -170,7 +170,7 @@ async def produce(self) -> typing.AsyncGenerator: # Manual dispatch rate mode n_disp = 1 + self._n_sent / self.SETTINGS.n_time t_next = self._t0 + n_disp / self.SETTINGS.dispatch_rate - sleep_time = t_next - time.time() + sleep_time = t_next - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) offset = self._n_sent / self.SETTINGS.fs @@ -218,7 +218,7 @@ class Clock(ez.Unit): SETTINGS: ez.Settings async def initialize(self) -> None: - self._t0 = time.time() + self._t0 = time.monotonic() self._n_dispatch = 0 @ez.publisher(OUTPUT_SIGNAL) @@ -226,7 +226,7 @@ async def produce(self) -> typing.AsyncGenerator: while True: if hasattr(self.SETTINGS, "dispatch_rate") and self.SETTINGS.dispatch_rate is not None: target_time = self._t0 + (self._n_dispatch + 1) / self.SETTINGS.dispatch_rate - sleep_time = target_time - time.time() + sleep_time = target_time - time.monotonic() if sleep_time > 0: await asyncio.sleep(sleep_time) From 503366fe520b9bcd7d329916a112e75671d7a6c6 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Wed, 25 Feb 2026 02:41:29 -0500 Subject: [PATCH 2/3] Bump baseproc dependency to get SampleTrigger with time.monotonic --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5826da5a..480b6ec7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ dynamic = ["version"] dependencies = [ "array-api-compat>=1.11.1", "ezmsg[axisarray]>=3.7.3", - "ezmsg-baseproc>=1.5.1", + "ezmsg-baseproc>=1.6.0", "mlx>=0.18.0; sys_platform == 'darwin' and platform_machine == 'arm64'", "numba>=0.61.0", "numpy>=1.26.0", From 53ceee1f64064920c9086ce5a1e1c41075c5bbf5 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Wed, 25 Feb 2026 11:26:37 -0500 Subject: [PATCH 3/3] Ignore rare race condition in test that is only meant to check accuracy. --- tests/integration/ezmsg/test_filter_system.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/integration/ezmsg/test_filter_system.py b/tests/integration/ezmsg/test_filter_system.py index 2c5098b0..db3f5fe0 100644 --- a/tests/integration/ezmsg/test_filter_system.py +++ b/tests/integration/ezmsg/test_filter_system.py @@ -62,9 +62,16 @@ def test_filter_system(filter_type: str, coef_type: str, order: int): inputs = AxisArray.concatenate(*[_ for _ in message_log(test_filename_raw)], dim="time") outputs = AxisArray.concatenate(*messages, dim="time") + # Truncate to common length: the raw and filtered loggers are on separate + # pipeline paths, so a termination race can cause different message counts. + n_samples = min(inputs.data.shape[0], outputs.data.shape[0]) + assert n_samples > 0 + inputs_data = inputs.data[:n_samples] + outputs_data = outputs.data[:n_samples] + if order == 0: # Passthrough - assert np.allclose(outputs.data, inputs.data) + assert np.allclose(outputs_data, inputs_data) return # Calculate expected @@ -99,8 +106,8 @@ def test_filter_system(filter_type: str, coef_type: str, order: int): system = scipy.signal.dlti(*coefs) coefs = (system.num, system.den) zi = scipy.signal.lfilter_zi(*coefs)[:, None] - expected, _ = scipy.signal.lfilter(coefs[0], coefs[1], inputs.data, axis=inputs.get_axis_idx("time"), zi=zi) + expected, _ = scipy.signal.lfilter(coefs[0], coefs[1], inputs_data, axis=inputs.get_axis_idx("time"), zi=zi) else: # coef_type == "sos": zi = scipy.signal.sosfilt_zi(coefs)[:, :, None] + np.zeros((1, 1, n_ch)) - expected, _ = scipy.signal.sosfilt(coefs, inputs.data, axis=inputs.get_axis_idx("time"), zi=zi) - assert np.allclose(outputs.data, expected) + expected, _ = scipy.signal.sosfilt(coefs, inputs_data, axis=inputs.get_axis_idx("time"), zi=zi) + assert np.allclose(outputs_data, expected)