Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dynamic = ["version"]
dependencies = [
"array-api-compat>=1.11.1",
"ezmsg[axisarray]>=3.7.3",
"ezmsg-baseproc>=1.5.1",
"ezmsg-baseproc>=1.6.0",
"mlx>=0.18.0; sys_platform == 'darwin' and platform_machine == 'arm64'",
"numba>=0.61.0",
"numpy>=1.26.0",
Expand Down
2 changes: 1 addition & 1 deletion src/ezmsg/sigproc/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def TSMessage(
) -> AxisArray:
dims = [f"dim_{i}" for i in range(data.ndim)]
dims[time_dim] = "time"
offset = time.time() if timestamp is None else timestamp
offset = time.monotonic() if timestamp is None else timestamp
offset_adj = data.shape[time_dim] / fs # offset corresponds to idx[0] on time_dim
axis = AxisArray.TimeAxis(fs, offset=offset - offset_adj)
return AxisArray(data, dims=dims, axes=dict(time=axis))
6 changes: 3 additions & 3 deletions src/ezmsg/sigproc/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ResampleState:

last_write_time: float = -np.inf
"""
Wall clock time of the last write to the signal buffer.
Monotonic time of the last write to the signal buffer.
This is used to determine if we need to extrapolate the reference axis
if we have not received an update within max_chunk_delay.
"""
Expand Down Expand Up @@ -142,7 +142,7 @@ def _process(self, message: AxisArray) -> None:
synth_ref_axis = LinearAxis(unit="s", gain=out_gain, offset=prev_t_end + out_gain)
self.state.ref_axis_buffer.write(synth_ref_axis, n_samples=n_synth)

self.state.last_write_time = time.time()
self.state.last_write_time = time.monotonic()

def __next__(self) -> AxisArray:
if self.state.src_buffer is None or self.state.ref_axis_buffer is None:
Expand Down Expand Up @@ -176,7 +176,7 @@ def __next__(self) -> AxisArray:

# If we do not rely on an external reference, and we have not received new data in a while,
# then extrapolate our reference vector out beyond the delay limit.
b_project = self.settings.resample_rate is not None and time.time() > (
b_project = self.settings.resample_rate is not None and time.monotonic() > (
self.state.last_write_time + self.settings.max_chunk_delay
)
if b_project:
Expand Down
18 changes: 9 additions & 9 deletions tests/helpers/synth.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Counter(ez.Unit):
async def initialize(self) -> None:
self._counter = 0
self._n_sent = 0
self._t0 = time.time()
self._t0 = time.monotonic()

@ez.publisher(OUTPUT_SIGNAL)
async def produce(self) -> typing.AsyncGenerator:
Expand All @@ -42,7 +42,7 @@ async def produce(self) -> typing.AsyncGenerator:
if self.SETTINGS.dispatch_rate is not None:
n_disp = 1 + self._n_sent / self.SETTINGS.n_time
t_next = self._t0 + n_disp / self.SETTINGS.dispatch_rate
sleep_time = t_next - time.time()
sleep_time = t_next - time.monotonic()
if sleep_time > 0:
await asyncio.sleep(sleep_time)

Expand Down Expand Up @@ -87,7 +87,7 @@ class WhiteNoise(ez.Unit):

async def initialize(self) -> None:
self._n_sent = 0
self._t0 = time.time()
self._t0 = time.monotonic()

@ez.publisher(OUTPUT_SIGNAL)
async def produce(self) -> typing.AsyncGenerator:
Expand All @@ -96,7 +96,7 @@ async def produce(self) -> typing.AsyncGenerator:
if self.SETTINGS.dispatch_rate is not None:
n_disp = 1 + self._n_sent / self.SETTINGS.n_time
t_next = self._t0 + n_disp / self.SETTINGS.dispatch_rate
sleep_time = t_next - time.time()
sleep_time = t_next - time.monotonic()
if sleep_time > 0:
await asyncio.sleep(sleep_time)

Expand Down Expand Up @@ -145,7 +145,7 @@ class Oscillator(ez.Unit):

async def initialize(self) -> None:
self._n_sent = 0
self._t0 = time.time()
self._t0 = time.monotonic()

# Calculate synchronized frequency if requested
self._freq = self.SETTINGS.freq
Expand All @@ -162,15 +162,15 @@ async def produce(self) -> typing.AsyncGenerator:
# Realtime mode: sleep until wall-clock time matches sample time
n_next = self._n_sent + self.SETTINGS.n_time
t_next = self._t0 + n_next / self.SETTINGS.fs
sleep_time = t_next - time.time()
sleep_time = t_next - time.monotonic()
if sleep_time > 0:
await asyncio.sleep(sleep_time)
offset = t_next - self.SETTINGS.n_time / self.SETTINGS.fs
elif self.SETTINGS.dispatch_rate is not None:
# Manual dispatch rate mode
n_disp = 1 + self._n_sent / self.SETTINGS.n_time
t_next = self._t0 + n_disp / self.SETTINGS.dispatch_rate
sleep_time = t_next - time.time()
sleep_time = t_next - time.monotonic()
if sleep_time > 0:
await asyncio.sleep(sleep_time)
offset = self._n_sent / self.SETTINGS.fs
Expand Down Expand Up @@ -218,15 +218,15 @@ class Clock(ez.Unit):
SETTINGS: ez.Settings

async def initialize(self) -> None:
self._t0 = time.time()
self._t0 = time.monotonic()
self._n_dispatch = 0

@ez.publisher(OUTPUT_SIGNAL)
async def produce(self) -> typing.AsyncGenerator:
while True:
if hasattr(self.SETTINGS, "dispatch_rate") and self.SETTINGS.dispatch_rate is not None:
target_time = self._t0 + (self._n_dispatch + 1) / self.SETTINGS.dispatch_rate
sleep_time = target_time - time.time()
sleep_time = target_time - time.monotonic()
if sleep_time > 0:
await asyncio.sleep(sleep_time)

Expand Down
15 changes: 11 additions & 4 deletions tests/integration/ezmsg/test_filter_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,16 @@ def test_filter_system(filter_type: str, coef_type: str, order: int):
inputs = AxisArray.concatenate(*[_ for _ in message_log(test_filename_raw)], dim="time")
outputs = AxisArray.concatenate(*messages, dim="time")

# Truncate to common length: the raw and filtered loggers are on separate
# pipeline paths, so a termination race can cause different message counts.
n_samples = min(inputs.data.shape[0], outputs.data.shape[0])
assert n_samples > 0
inputs_data = inputs.data[:n_samples]
outputs_data = outputs.data[:n_samples]

if order == 0:
# Passthrough
assert np.allclose(outputs.data, inputs.data)
assert np.allclose(outputs_data, inputs_data)
return

# Calculate expected
Expand Down Expand Up @@ -99,8 +106,8 @@ def test_filter_system(filter_type: str, coef_type: str, order: int):
system = scipy.signal.dlti(*coefs)
coefs = (system.num, system.den)
zi = scipy.signal.lfilter_zi(*coefs)[:, None]
expected, _ = scipy.signal.lfilter(coefs[0], coefs[1], inputs.data, axis=inputs.get_axis_idx("time"), zi=zi)
expected, _ = scipy.signal.lfilter(coefs[0], coefs[1], inputs_data, axis=inputs.get_axis_idx("time"), zi=zi)
else: # coef_type == "sos":
zi = scipy.signal.sosfilt_zi(coefs)[:, :, None] + np.zeros((1, 1, n_ch))
expected, _ = scipy.signal.sosfilt(coefs, inputs.data, axis=inputs.get_axis_idx("time"), zi=zi)
assert np.allclose(outputs.data, expected)
expected, _ = scipy.signal.sosfilt(coefs, inputs_data, axis=inputs.get_axis_idx("time"), zi=zi)
assert np.allclose(outputs_data, expected)