diff --git a/pycbsdk/examples/analog_latency_test.py b/pycbsdk/examples/analog_latency_test.py new file mode 100644 index 0000000..392e060 --- /dev/null +++ b/pycbsdk/examples/analog_latency_test.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +"""Analog Input Latency Test — measures the end-to-end timing from host +audio output to NSP analog input, demonstrating pycbsdk's clock +synchronisation and timestamp conversion. + +Hardware setup +-------------- +1. Take a **3.5 mm (headphone) → BNC** cable. +2. Plug the 3.5 mm end into your computer's **headphone jack**. +3. Connect the BNC end to **Analog Input 1** on the NSP front panel. +4. Set your computer's volume to approximately 50 %. The tone only needs + to be large enough to cross a simple amplitude threshold — if the + detected signal is too weak, increase the volume and re-run. + +What the test does +------------------ +1. Connects to the NSP and configures Analog Input channel 1 for raw + (30 kHz) sampling with DC coupling. +2. Collects ~1 second of silence to establish a baseline (mean and + standard deviation of the noise floor). +3. Plays a 400 Hz sine tone (300 ms) through the computer speakers + ten times, recording ``time.monotonic()`` at the instant each tone + begins. +4. In the streaming callback, converts each sample's device timestamp + to ``time.monotonic()`` and watches for the first sample whose + absolute value exceeds ``baseline_mean + 5 * baseline_std``. + That crossing time is the **detected onset** of each tone. +5. After all tones have been played, reports the latency + (``detected_time − play_time``) for each trial plus summary + statistics (mean, median, range, std). + +Expected results +~~~~~~~~~~~~~~~~ +Latency includes the computer's audio output buffer, the DAC, the +cable, the NSP's ADC, and the UDP network path. Typical values are +**5–30 ms** depending on the OS audio stack. The spread (std) should +be < 5 ms if clock synchronisation is working correctly. + +Requirements +~~~~~~~~~~~~ +* ``pycbsdk`` (installed from this repository) +* ``numpy`` +* ``sounddevice`` — ``pip install sounddevice`` + +Usage:: + + python analog_latency_test.py [--device-type NSP] +""" + +from __future__ import annotations + +import argparse +import math +import sys +import threading +import time + +import numpy as np + +try: + import sounddevice as sd +except ImportError: + sys.exit( + "This example requires the 'sounddevice' package.\n" + "Install it with: pip install sounddevice" + ) + +from pycbsdk import ChannelType, DeviceType, SampleRate, Session + + +# --------------------------------------------------------------------------- +# Tone parameters +# --------------------------------------------------------------------------- +TONE_FREQ_HZ = 400 # Frequency of the test tone +TONE_DURATION_S = 0.300 # Duration of each tone burst +TONE_AMPLITUDE = 0.5 # Peak amplitude (0–1, relative to full scale) +AUDIO_SAMPLE_RATE = 44100 # Sample rate for audio output + +# --------------------------------------------------------------------------- +# Test parameters +# --------------------------------------------------------------------------- +CHANNEL_ID = 1 # 1-based analog input channel +BASELINE_DURATION_S = 1.0 # Silence duration for baseline estimation +N_WARMUP = 3 # Warmup tones (not analysed) +N_TRIALS = 10 # Number of measured tone presentations +POST_TONE_WAIT_S = 0.200 # Extra wait after each tone for ringdown +THRESHOLD_SIGMAS = 5 # Onset threshold = mean + N * std + + +def generate_tone() -> np.ndarray: + """Create a mono 400 Hz sine wave at the audio sample rate.""" + t = np.arange(int(AUDIO_SAMPLE_RATE * TONE_DURATION_S)) / AUDIO_SAMPLE_RATE + # Apply a 5 ms raised-cosine ramp to avoid clicks + ramp_samples = int(0.005 * AUDIO_SAMPLE_RATE) + ramp = np.ones_like(t) + ramp[:ramp_samples] = 0.5 * (1 - np.cos(np.pi * np.arange(ramp_samples) / ramp_samples)) + ramp[-ramp_samples:] = ramp[:ramp_samples][::-1] + return (TONE_AMPLITUDE * np.sin(2 * np.pi * TONE_FREQ_HZ * t) * ramp).astype(np.float32) + + +def main(): + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--device-type", default="NSP", + help="pycbsdk DeviceType name (default: NSP)") + args = parser.parse_args() + + dev_type = DeviceType[args.device_type.upper()] + + # ----------------------------------------------------------------------- + # 1. Connect and configure + # ----------------------------------------------------------------------- + print(f"Connecting to {dev_type.name}...") + with Session(device_type=dev_type) as sess: + deadline = time.monotonic() + 10 + while not sess.running: + if time.monotonic() > deadline: + sys.exit("Session did not start within 10 s — is the device on?") + time.sleep(0.1) + + print(f" Processor: {sess.proc_ident}") + print(f" Clock uncertainty: {(sess.clock_uncertainty_ns or 0) / 1e6:.2f} ms") + print() + + # Configure: 1 channel, raw (30 kHz), DC coupling + sess.set_channel_smpgroup(CHANNEL_ID, SampleRate.SR_RAW) + sess.set_ac_input_coupling(1, ChannelType.ANALOG_IN, False) + sess.sync() + print(f" Channel {CHANNEL_ID} configured: SR_RAW, DC coupling") + + raw_channels = sess.get_group_channels(int(SampleRate.SR_RAW)) + if CHANNEL_ID not in raw_channels: + sys.exit(f"Channel {CHANNEL_ID} not in RAW group — check device type " + f"(got group: {raw_channels})") + + # Work out which index our channel occupies in the group data array. + # The on_group_batch callback delivers an (n_samples, n_channels) + # array where columns correspond to get_group_channels() order. + chan_index = raw_channels.index(CHANNEL_ID) + + # ------------------------------------------------------------------- + # 2. Prepare tone + # ------------------------------------------------------------------- + tone = generate_tone() + print(f" Tone: {TONE_FREQ_HZ} Hz, {TONE_DURATION_S*1000:.0f} ms, " + f"{AUDIO_SAMPLE_RATE} Hz audio sample rate") + print() + + # ------------------------------------------------------------------- + # 3. Register streaming callback + # ------------------------------------------------------------------- + # Shared state between callback and main thread. + # The callback runs on the SDK's internal thread, so we use a lock. + lock = threading.Lock() + baseline_samples: list[np.ndarray] = [] + baseline_done = False + baseline_mean = 0.0 + baseline_std = 1.0 + threshold = math.inf # will be set after baseline + + # Each entry: (trial_index, detected_monotonic_time) + detections: list[tuple[int, float]] = [] + current_trial: int | None = None # set by main thread when tone plays + trial_detected = False + + @sess.on_group_batch(SampleRate.SR_RAW) + def on_raw_batch(samples: np.ndarray, timestamps: np.ndarray): + """Process a batch of raw samples. + + Parameters + ---------- + samples : ndarray, shape (n_samples, n_channels), dtype int16 + Raw ADC values for all channels in the RAW group. + timestamps : ndarray, shape (n_samples,), dtype uint64 + Device timestamps in nanoseconds for each sample. + """ + nonlocal baseline_done, baseline_mean, baseline_std, threshold + nonlocal current_trial, trial_detected + + # Extract our channel's column + ch_data = samples[:, chan_index].astype(np.float64) + + with lock: + if not baseline_done: + # Accumulate baseline samples + baseline_samples.append(ch_data.copy()) + return + + if current_trial is None or trial_detected: + return # Not in an active trial, or already detected this one + + # Look for the first sample exceeding the threshold. + abs_data = np.abs(ch_data - baseline_mean) + above = np.where(abs_data > threshold)[0] + if len(above) == 0: + return + + # Convert the device timestamp of the onset sample to + # time.monotonic() — this is the key demonstration. + onset_device_ns = int(timestamps[above[0]]) + onset_monotonic = sess.device_to_monotonic(onset_device_ns) + + detections.append((current_trial, onset_monotonic)) + trial_detected = True + + # ------------------------------------------------------------------- + # 4. Collect baseline + # ------------------------------------------------------------------- + print(f"Collecting {BASELINE_DURATION_S} s of baseline (silence)...") + time.sleep(BASELINE_DURATION_S) + + with lock: + all_baseline = np.concatenate(baseline_samples) + baseline_mean = float(np.mean(all_baseline)) + baseline_std = float(np.std(all_baseline)) + threshold = THRESHOLD_SIGMAS * baseline_std + baseline_done = True + + print(f" Baseline: mean={baseline_mean:.1f} std={baseline_std:.1f} " + f"threshold=±{threshold:.1f} ({len(all_baseline)} samples)") + if baseline_std < 1.0: + print(" WARNING: baseline std is very low — is the cable connected?") + print() + + # ------------------------------------------------------------------- + # 5–7. Play tones and record play times + # ------------------------------------------------------------------- + # Use a callback-based output stream for precise timing. + # The callback fires every time the audio driver needs a new + # buffer. When we want to play a tone, we set ``tone_pending`` + # and record ``time.monotonic()`` inside the FIRST callback that + # copies tone data — that is the closest we can get to the + # actual moment audio exits the OS mixer. The remaining + # latency after that point (driver buffer → DAC → cable → ADC) + # is hardware-fixed and consistent. + tone_frames = len(tone) + tone_pos = 0 # current read position in tone array + tone_pending = False # main thread sets True to trigger a tone + play_times: list[float] = [] + play_lock = threading.Lock() + + def audio_callback(outdata, frames, time_info, status): + nonlocal tone_pos, tone_pending + with play_lock: + if tone_pending and tone_pos == 0: + # First callback after tone was armed — record the + # moment we start filling the driver's buffer. + play_times.append(time.monotonic()) + tone_pending = False + + if tone_pos < tone_frames: + end = min(tone_pos + frames, tone_frames) + n = end - tone_pos + outdata[:n, 0] = tone[tone_pos:end] + outdata[n:, 0] = 0.0 + tone_pos = end + else: + outdata[:, 0] = 0.0 + + stream = sd.OutputStream( + samplerate=AUDIO_SAMPLE_RATE, + channels=1, + dtype="float32", + latency="low", + callback=audio_callback, + ) + stream.start() + print(f" Audio output latency: {stream.latency * 1000:.1f} ms") + + # Warmup: play a few tones to prime the audio pipeline. + # CoreAudio (macOS) buffers the first 1-2 callbacks before the + # DAC actually starts, adding ~250 ms to the first tones. + print(f" Warming up ({N_WARMUP} tones)...") + for _ in range(N_WARMUP): + with play_lock: + tone_pos = 0 + tone_pending = True + time.sleep(TONE_DURATION_S + POST_TONE_WAIT_S) + # Discard any play_times recorded during warmup + play_times.clear() + print() + + for i in range(N_TRIALS): + with lock: + current_trial = i + trial_detected = False + + # Arm the tone: the audio callback will record the play time + # on the very first buffer fill. + with play_lock: + tone_pos = 0 + tone_pending = True + + # Wait for the tone to finish playing out plus margin. + time.sleep(TONE_DURATION_S + POST_TONE_WAIT_S) + + with lock: + status = "detected" if trial_detected else "MISSED" + print(f" Trial {i+1:2d}/{N_TRIALS}: {status}") + + stream.stop() + stream.close() + + # Stop looking for onsets + with lock: + current_trial = None + + # ------------------------------------------------------------------- + # 8–9. Report results + # ------------------------------------------------------------------- + print() + print("=" * 60) + print("Results") + print("=" * 60) + + if not detections: + print("No tones were detected! Check:") + print(" • Is the cable connected to Analog Input 1?") + print(" • Is the computer volume turned up?") + print(" • Try increasing TONE_AMPLITUDE or decreasing THRESHOLD_SIGMAS.") + sys.exit(1) + + latencies_ms: list[float] = [] + for trial_idx, detected_t in detections: + latency_ms = (detected_t - play_times[trial_idx]) * 1000 + latencies_ms.append(latency_ms) + print(f" Trial {trial_idx+1:2d}: " + f"play={play_times[trial_idx]:.6f} " + f"detect={detected_t:.6f} " + f"latency={latency_ms:+7.2f} ms") + + lat = np.array(latencies_ms) + print() + print(f" Detected: {len(detections)}/{N_TRIALS} trials") + print(f" Mean: {np.mean(lat):+.2f} ms") + print(f" Median: {np.median(lat):+.2f} ms") + print(f" Std: {np.std(lat):.2f} ms") + print(f" Range: [{np.min(lat):+.2f}, {np.max(lat):+.2f}] ms") + print() + + uncert = sess.clock_uncertainty_ns + if uncert is not None: + print(f" Clock sync uncertainty: {uncert / 1e6:.2f} ms") + print(f" (Latency std should be comparable to this value.)") + print() + print("The latency includes: OS audio buffer → DAC → cable → " + "NSP ADC → UDP → clock conversion.") + + +if __name__ == "__main__": + main() diff --git a/pycbsdk/pyproject.toml b/pycbsdk/pyproject.toml index 25e0942..b57a8bf 100644 --- a/pycbsdk/pyproject.toml +++ b/pycbsdk/pyproject.toml @@ -34,6 +34,7 @@ Issues = "https://github.com/CerebusOSS/CereLink/issues" [project.optional-dependencies] numpy = ["numpy"] +examples = ["sounddevice"] [dependency-groups] dev = [ diff --git a/src/cbdev/include/cbdev/clock_sync.h b/src/cbdev/include/cbdev/clock_sync.h index 6f61134..6896173 100644 --- a/src/cbdev/include/cbdev/clock_sync.h +++ b/src/cbdev/include/cbdev/clock_sync.h @@ -65,6 +65,16 @@ class ClockSync { /// Uncertainty (RTT/2) from the best probe. [[nodiscard]] std::optional getUncertaintyNs() const; + /// True if probes are producing consistent offsets (spread < threshold). + /// When false, the caller should feed data-packet timestamps as a fallback. + [[nodiscard]] bool probesAreReliable() const; + + /// Add a data-packet observation for fallback clock sync. + /// @param device_time_ns Device timestamp from a data packet header + /// @param recv_local Host time when the datagram was received + /// Used only when probesAreReliable() returns false. + void addDataPacketSample(uint64_t device_time_ns, time_point recv_local); + private: mutable std::mutex m_mutex; Config m_config; @@ -77,11 +87,18 @@ class ClockSync { std::deque m_probe_samples; + struct DataSample { + int64_t offset_ns; // device_ns - recv_host_ns + time_point when; + }; + std::deque m_data_samples; + std::optional m_current_offset_ns; std::optional m_current_uncertainty_ns; - void recomputeEstimate(); // called with lock held + void recomputeEstimate(); // called with lock held void pruneExpired(time_point now); // called with lock held + bool probeSpreadOk() const; // called with lock held }; } // namespace cbdev diff --git a/src/cbdev/src/clock_sync.cpp b/src/cbdev/src/clock_sync.cpp index 11347d7..42d9d9a 100644 --- a/src/cbdev/src/clock_sync.cpp +++ b/src/cbdev/src/clock_sync.cpp @@ -105,7 +105,63 @@ std::optional ClockSync::getUncertaintyNs() const { return m_current_uncertainty_ns; } +bool ClockSync::probesAreReliable() const { + std::lock_guard lock(m_mutex); + return probeSpreadOk(); +} + +void ClockSync::addDataPacketSample(const uint64_t device_time_ns, const time_point recv_local) { + std::lock_guard lock(m_mutex); + + const int64_t recv_ns = to_ns(recv_local); + // offset = device_time - recv_time. + // This overestimates the true offset by the one-way network delay + // (host→device path isn't measured). Over many samples the minimum + // offset will approach the true value because the minimum corresponds + // to the least-queued packet. + const int64_t offset_ns = static_cast(device_time_ns) - recv_ns; + + // Detect clock jumps (same logic as addProbeSample) + if (m_current_offset_ns && std::abs(offset_ns - *m_current_offset_ns) > 1'000'000'000LL) { + m_data_samples.clear(); + } + + DataSample ds; + ds.offset_ns = offset_ns; + ds.when = recv_local; + m_data_samples.push_back(ds); + + while (m_data_samples.size() > m_config.max_probe_samples * 4) { + m_data_samples.pop_front(); + } + + // Prune old entries + const auto cutoff = recv_local - m_config.max_probe_age; + while (!m_data_samples.empty() && m_data_samples.front().when < cutoff) { + m_data_samples.pop_front(); + } + + // Pick minimum offset (closest to true offset = least network delay). + // Add a constant to compensate for the one-way acquisition-to- + // transmission delay that this method can't measure. Without + // this correction the converted timestamps appear in the future. + // 0.7 ms is the observed lower bound on the device's capture-to- + // send latency (ADC sample → UDP datagram on the wire). + if (!m_data_samples.empty()) { + constexpr int64_t ONE_WAY_DELAY_ESTIMATE_NS = 700'000; // 0.7 ms + int64_t best = m_data_samples.front().offset_ns; + for (const auto& s : m_data_samples) + best = std::min(best, s.offset_ns); + m_current_offset_ns = best + ONE_WAY_DELAY_ESTIMATE_NS; + m_current_uncertainty_ns = ONE_WAY_DELAY_ESTIMATE_NS; + } +} + void ClockSync::recomputeEstimate() { + // If data-packet fallback is active, don't overwrite it with bad probe data. + if (!m_data_samples.empty() && !probeSpreadOk()) + return; + // Find probe with minimum RTT — least queuing/jitter gives most reliable estimate const ProbeSample* best_probe = nullptr; for (const auto& p : m_probe_samples) { @@ -117,12 +173,27 @@ void ClockSync::recomputeEstimate() { if (best_probe) { m_current_offset_ns = best_probe->offset_ns; m_current_uncertainty_ns = best_probe->rtt_ns / 2; - } else { + } else if (m_data_samples.empty()) { + // Only clear if no data-packet fallback either m_current_offset_ns = std::nullopt; m_current_uncertainty_ns = std::nullopt; } } +bool ClockSync::probeSpreadOk() const { + // Internal version — called with m_mutex already held. + if (m_probe_samples.size() < 3) + return true; + int64_t lo = m_probe_samples.front().offset_ns; + int64_t hi = lo; + for (const auto& p : m_probe_samples) { + lo = std::min(lo, p.offset_ns); + hi = std::max(hi, p.offset_ns); + } + constexpr int64_t RELIABLE_THRESHOLD_NS = 10'000'000; // 10 ms + return (hi - lo) < RELIABLE_THRESHOLD_NS; +} + void ClockSync::pruneExpired(time_point now) { const auto probe_cutoff = now - m_config.max_probe_age; while (!m_probe_samples.empty() && m_probe_samples.front().when < probe_cutoff) { diff --git a/src/cbdev/src/device_session.cpp b/src/cbdev/src/device_session.cpp index 28a3f4a..9bde2d9 100644 --- a/src/cbdev/src/device_session.cpp +++ b/src/cbdev/src/device_session.cpp @@ -1333,6 +1333,14 @@ void DeviceSession::updateConfigFromBuffer(const void* buffer, const size_t byte return; // Nothing to process } + // If probe-based clock sync is unreliable (e.g., NSP header->time + // running at the wrong rate), fall back to data-packet timestamps. + // Track the LAST data packet's timestamp in the datagram — it is + // closest to the actual send time (earlier packets may be ~1 ms + // stale from the start of the device's transmit batch). + const bool need_data_clock_sample = !m_impl->clock_sync.probesAreReliable(); + uint64_t last_data_time = 0; + // Parse packets in buffer and update device_config for configuration packets const auto* buff_bytes = static_cast(buffer); size_t offset = 0; @@ -1350,6 +1358,15 @@ void DeviceSession::updateConfigFromBuffer(const void* buffer, const size_t byte // Count every packet for protocol monitor drop detection m_impl->pkts_since_monitor++; + // Track the last data packet timestamp for fallback clock sync. + // Data timestamps come from the ADC/PTP clock and are reliable + // even when probe header->time is broken. + if (need_data_clock_sample && + !(header->chid & cbPKTCHAN_CONFIGURATION) && + header->time != 0) { + last_data_time = header->time; + } + if ((header->chid & cbPKTCHAN_CONFIGURATION) == cbPKTCHAN_CONFIGURATION) { // Configuration packet - process based on type if (header->type == cbPKTTYPE_SYSHEARTBEAT) { @@ -1562,7 +1579,6 @@ void DeviceSession::updateConfigFromBuffer(const void* buffer, const size_t byte m_impl->pending_clock_probe.t1_local, device_time_ns, m_impl->last_recv_timestamp); - m_impl->pending_clock_probe.active = false; } } @@ -1581,6 +1597,12 @@ void DeviceSession::updateConfigFromBuffer(const void* buffer, const size_t byte offset += packet_size; } + + // Feed the last data packet's timestamp for fallback clock sync. + if (last_data_time != 0) { + m_impl->clock_sync.addDataPacketSample( + last_data_time, m_impl->last_recv_timestamp); + } } ///////////////////////////////////////////////////////////////////////////////////////////////////