Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lakeshore/ssm_measure_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ def set_identify_state(self, state):
def get_dark_mode_state(self):
"""Returns the dark mode state for the given pod."""

response = self.device.query(f'SENSe{self.module_number}:DMODe?', cherk_errors=False)
response = self.device.query(f'SENSe{self.module_number}:DMODe?', check_errors=False)
return response

def set_dark_mode_state(self, state):
Expand Down Expand Up @@ -1234,7 +1234,7 @@ def set_resistance_mode(self, resistance_mode):
Args:
resistance_mode (ResistanceMode): The desired resistance optimization mode.
"""
if isinstance(resistance_mode, SSMSystemEnums.ResistanceExcitationType):
if isinstance(resistance_mode, SSMSystemEnums.ResistanceMode):
mode = resistance_mode.name
else:
mode = resistance_mode
Expand Down
10 changes: 5 additions & 5 deletions lakeshore/ssm_source_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ def set_identify_state(self, state):

def get_dark_mode_state(self):
"""Returns the dark mode state for the given pod."""
response = self.device.query(f'SOURce{self.module_number}:DMODe?', cherk_errors=False)
response = self.device.query(f'SOURce{self.module_number}:DMODe?', check_errors=False)
return response

def set_dark_mode_state(self, state):
Expand All @@ -1014,7 +1014,7 @@ def set_dark_mode_state(self, state):

def get_voltage_output_limit_high(self):
"""Returns the present voltage high output limit."""
response = float(self.device.query(f'SOURce{self.module_number}:VOLTage:LIMit:HIGH?', cherk_errors=False))
response = float(self.device.query(f'SOURce{self.module_number}:VOLTage:LIMit:HIGH?', check_errors=False))
return response

def set_voltage_output_limit_high(self, limit):
Expand All @@ -1033,7 +1033,7 @@ def set_voltage_output_limit_high(self, limit):

def get_voltage_output_limit_low(self):
"""Returns the present voltage low output limit."""
response = float(self.device.query(f'SOURce{self.module_number}:VOLTage:LIMit:LOW?', cherk_errors=False))
response = float(self.device.query(f'SOURce{self.module_number}:VOLTage:LIMit:LOW?', check_errors=False))
return response

def set_voltage_output_limit_low(self, limit):
Expand All @@ -1052,7 +1052,7 @@ def set_voltage_output_limit_low(self, limit):

def get_current_output_limit_high(self):
"""Returns the present current high output limit."""
response = float(self.device.query(f'SOURce{self.module_number}:CURRent:LIMit:HIGH?', cherk_errors=False))
response = float(self.device.query(f'SOURce{self.module_number}:CURRent:LIMit:HIGH?', check_errors=False))
return response

def set_current_output_limit_high(self, limit):
Expand All @@ -1071,7 +1071,7 @@ def set_current_output_limit_high(self, limit):

def get_current_output_limit_low(self):
"""Returns the present current low output limit."""
response = float(self.device.query(f'SOURce{self.module_number}:CURRent:LIMit:LOW?', cherk_errors=False))
response = float(self.device.query(f'SOURce{self.module_number}:CURRent:LIMit:LOW?', check_errors=False))
return response

def set_disable_on_compliance(self, disable_on_compliance):
Expand Down
104 changes: 78 additions & 26 deletions lakeshore/ssm_system.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Implements functionality unique to the Lake Shore M81."""
from datetime import datetime
import logging
import struct
import time
from base64 import b64decode
from threading import Lock
from warnings import warn
Expand All @@ -12,12 +14,25 @@
from lakeshore.ssm_settings_profiles import SettingsProfiles
from lakeshore.requires_firmware_version import requires_firmware_version

logger = logging.getLogger(__name__)

try:
from wakepy import keep
except NotImplementedError:
pass # Proceed without wakepy on linux without systemd
except KeyError:
pass # Proceed without wakepy on linux without dbus
except (NotImplementedError, KeyError, ImportError):
# Provide a no-op context manager fallback when wakepy is unavailable
from contextlib import contextmanager

class _NoOpMode:
"""Mimics wakepy's Mode object returned by keep.running()."""
success = False

class _NoOpKeep:
@staticmethod
@contextmanager
def running():
yield _NoOpMode()

keep = _NoOpKeep()


class SSMSystemOperationRegister(RegisterBase):
Expand Down Expand Up @@ -128,6 +143,8 @@ def __init__(self,

self.settings_profiles = SettingsProfiles(self)

# Lock ordering: stream_lock must be acquired before dut_lock (inherited from
# GenericInstrument) to avoid deadlock. Never acquire dut_lock then stream_lock.
self.stream_lock = Lock()

# Sweeping limits
Expand Down Expand Up @@ -218,6 +235,7 @@ def _locate_module_by_name(module_name, set_of_modules):
'SRANge': float,
'SVLimit': lambda s: bool(int(s)),
'SILimit': lambda s: bool(int(s)),
'SSWeeping': lambda s: bool(int(s)),
'MDC': float,
'MRMs': float,
'MPPeak': float,
Expand Down Expand Up @@ -270,6 +288,13 @@ def get_multiple_min_max_values(self, *data_sources):
def stream_data(self, rate, num_points, *data_sources):
"""Generator object to stream data from the instrument.

A TRACe:STOp command is sent when the generator exits for any reason
(normal completion, caller break, exception, or garbage collection)
to ensure instrument-side cleanup.
Comment on lines +291 to +293

Copilot AI Feb 18, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring claim about cleanup on 'garbage collection' is stronger than Python generally guarantees (timing of generator finalization can be implementation-dependent). Consider softening this to 'when the generator is closed/finalized (e.g., via exhaustion, close(), or GC finalization) we attempt best-effort cleanup' to avoid implying a deterministic guarantee.

Suggested change
A TRACe:STOp command is sent when the generator exits for any reason
(normal completion, caller break, exception, or garbage collection)
to ensure instrument-side cleanup.
When the generator is closed or finalized (for example due to normal
completion, caller break, an exception, or garbage-collection
finalization), it attempts to send a TRACe:STOp command as a
best-effort instrument-side cleanup.

Copilot uses AI. Check for mistakes.

Note: the overflow check only runs on normal completion. If the
generator is abandoned early, overflow status is not checked.

Args:
rate (int):
Desired transfer rate in points/sec.
Expand All @@ -292,26 +317,45 @@ def stream_data(self, rate, num_points, *data_sources):
bytes_per_row = int(self.query('TRACe:FORMat:ENCOding:B64:BCOunt?'))
binary_format = '<' + self.query('TRACe:FORMat:ENCOding:B64:BFORmat?').strip('\"')

if num_points is not None:
self.command(f'TRACe:STARt {num_points}')
else:
self.command('TRACe:STARt')

num_collected = 0
while num_points is None or num_collected < num_points:
b64_string = ''
while not b64_string:
b64_string = self.query('TRACe:DATA:ALL?', check_errors=False)

new_bytes = b64decode(b64_string)
rows = [new_bytes[i:i + bytes_per_row] for i in range(0, len(new_bytes), bytes_per_row)]

for row in rows:
data = struct.unpack(binary_format, row)
num_collected += 1

yield data

try:
if num_points is not None:
self.command(f'TRACe:STARt {num_points}')
else:
self.command('TRACe:STARt')

num_collected = 0
while num_points is None or num_collected < num_points:
b64_string = ''
while not b64_string:
b64_string = self.query('TRACe:DATA:ALL?', check_errors=False)
if not b64_string:
time.sleep(0.01)

new_bytes = b64decode(b64_string)
rows = [new_bytes[i:i + bytes_per_row] for i in range(0, len(new_bytes), bytes_per_row)]

for row in rows:
if len(row) < bytes_per_row:
# Incomplete trailing row from non-evenly-divisible buffer
logger.debug('Skipping incomplete row (%d/%d bytes)', len(row), bytes_per_row)
break
Comment on lines +334 to +341

Copilot AI Feb 18, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking on an incomplete trailing row discards the remainder bytes. If TRACe:DATA:ALL? can return chunks that aren’t aligned to bytes_per_row, this will permanently lose data and corrupt subsequent struct.unpack alignment. Instead, keep a remainder buffer across iterations (prepend remainder + new_bytes, process only full rows, and carry the leftover bytes into the next loop) rather than skipping/breaking.

Copilot uses AI. Check for mistakes.
if num_points is not None and num_collected >= num_points:
# Batch contained more rows than needed to reach num_points
break
Comment on lines +327 to +344

Copilot AI Feb 18, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New behavior was added for (a) incomplete trailing rows and (b) batches that contain more rows than needed to reach num_points, but the new tests only assert TRACe:STOp is sent. Add a unit test that simulates a TRACe:DATA:ALL? response containing a partial row followed by the remainder in a subsequent poll (and another that returns more than num_points rows in one batch) to validate correct point yield counts and no data loss.

Suggested change
while num_points is None or num_collected < num_points:
b64_string = ''
while not b64_string:
b64_string = self.query('TRACe:DATA:ALL?', check_errors=False)
if not b64_string:
time.sleep(0.01)
new_bytes = b64decode(b64_string)
rows = [new_bytes[i:i + bytes_per_row] for i in range(0, len(new_bytes), bytes_per_row)]
for row in rows:
if len(row) < bytes_per_row:
# Incomplete trailing row from non-evenly-divisible buffer
logger.debug('Skipping incomplete row (%d/%d bytes)', len(row), bytes_per_row)
break
if num_points is not None and num_collected >= num_points:
# Batch contained more rows than needed to reach num_points
break
pending_bytes = b''
done = False
while not done and (num_points is None or num_collected < num_points):
b64_string = ''
while not b64_string:
b64_string = self.query('TRACe:DATA:ALL?', check_errors=False)
if not b64_string:
time.sleep(0.01)
# Combine any leftover bytes from the previous poll with the newly decoded data.
new_bytes = pending_bytes + b64decode(b64_string)
total_len = len(new_bytes)
if total_len < bytes_per_row:
# Not enough data to form a full row yet; keep buffering.
pending_bytes = new_bytes
continue
full_rows_len = (total_len // bytes_per_row) * bytes_per_row
full_rows_bytes = new_bytes[:full_rows_len]
pending_bytes = new_bytes[full_rows_len:]
for i in range(0, full_rows_len, bytes_per_row):
if num_points is not None and num_collected >= num_points:
# Batch contained more rows than needed to reach num_points
done = True
break
row = full_rows_bytes[i:i + bytes_per_row]

Copilot uses AI. Check for mistakes.
data = struct.unpack(binary_format, row)
num_collected += 1
yield data
finally:
# Best-effort cleanup: stop instrument-side streaming.
# May block briefly if dut_lock is held by another thread.
try:
self.command('TRACe:STOp', check_errors=False)
except Exception:
logger.debug('Failed to send TRACe:STOp during cleanup', exc_info=True)

# Note: this overflow check is only reached on normal generator
# completion. If the generator is abandoned (e.g. caller break),
# GeneratorExit bypasses this code via the context manager exits.
overflow_occurred = bool(int(self.query('TRACe:DATA:OVERflow?', check_errors=True)))
if overflow_occurred:
raise XIPInstrumentException('Data loss occurred during this data stream.')
Expand Down Expand Up @@ -492,7 +536,11 @@ def get_head_cal_datetime(self):
"""Returns the date and time of the head calibration."""

response = self.query('CALibration:DATE?').split(',')
return datetime(int(response[0]), int(response[1]), int(response[2]), int(response[3]), int(response[4]), int(response[5]))
if len(response) < 6:
raise XIPInstrumentException(
f"Malformed calibration date response: expected 6 values, got {len(response)}")
Comment on lines +539 to +541

Copilot AI Feb 18, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new error message doesn’t include the raw response payload, which is often the key detail when diagnosing instrument/firmware parsing issues. Consider including the original response (or the pre-split string) in the exception message, e.g. ... got {len(response)}: {response!r} (and similarly for the self-calibration date).

Copilot uses AI. Check for mistakes.
return datetime(int(response[0]), int(response[1]), int(response[2]),
int(response[3]), int(response[4]), int(response[5]))

def get_head_cal_temperature(self):
"""Returns the temperature of the head calibration."""
Expand All @@ -508,7 +556,11 @@ def get_head_self_cal_datetime(self):
"""Returns the datetime of the last head self calibration."""

response = self.query('CALibration:SCALibration:DATE?').split(',')
return datetime(int(response[0]), int(response[1]), int(response[2]), int(response[3]), int(response[4]), int(response[5]))
if len(response) < 6:
raise XIPInstrumentException(
f"Malformed self-calibration date response: expected 6 values, got {len(response)}")
return datetime(int(response[0]), int(response[1]), int(response[2]),
int(response[3]), int(response[4]), int(response[5]))

def get_head_self_cal_temperature(self):
"""Returns the temperature of the last head self calibration."""
Expand Down
1 change: 1 addition & 0 deletions lakeshore/ssm_system_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DataSourceMnemonic(str, Enum):
MEASURE_SETTLING = 'MSETtling'
MEASURE_UNLOCK = 'MUNLock'
MEASURE_REFERENCE_FREQUENCY = 'MRFRequency'
SOURCE_IS_SETTLING = 'SRSettling'

Copilot AI Feb 18, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says the DataSourceMnemonic enum is being reconciled with the data_source_types dict, but in the shown diffs: (1) SSWeeping is added to data_source_types without a corresponding enum member, and (2) SOURCE_IS_SETTLING (SRSettling) is added to the enum without a corresponding entry in data_source_types. This leaves the enum API and streaming/fetch conversion mapping out of sync—add the missing enum entry for SSWeeping and add a parser entry for SRSettling in data_source_types (or remove/rename to match the instrument mnemonic used by the mapping).

Suggested change
SOURCE_IS_SETTLING = 'SRSettling'

Copilot uses AI. Check for mistakes.
GENERAL_PURPOSE_INPUT_STATES = 'GPIStates'
GENERAL_PURPOSE_OUTPUT_STATES = 'GPOStates'

Expand Down
71 changes: 71 additions & 0 deletions tests/test_ssm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,77 @@ def test_stream_data(self):
self.assertIn('TRACe:FORMat:ENCOding:B64:BFORmat?', self.fake_connection.get_outgoing_message())
self.assertIn('TRACe:STARt 3', self.fake_connection.get_outgoing_message())

def test_stream_data_sends_stop_on_completion(self):
"""Test that TRACe:STOp is sent when stream_data completes normally"""

list_data = [(False, 45.6521), (True, 1.258), (False, 65.8974)]

my_data = []
for data in list_data:
for value in data:
my_data.append(value)

list_format = '<?d?d?d'
pack_data = pack(list_format, *my_data)
encoded_data = str(b64encode(pack_data))[2:-1]
Comment on lines +132 to +141

Copilot AI Feb 18, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two new tests duplicate the same data/packing setup logic. Consider extracting a small helper within the test class (or reusing the setup used by test_stream_data) to build encoded_data and enqueue the common initial responses, so future format/packing changes only need to be updated in one place.

Copilot uses AI. Check for mistakes.

self.fake_connection.setup_response('No error') # trace reset
self.fake_connection.setup_response('No error') # configure elements
self.fake_connection.setup_response('No error') # format encoding
self.fake_connection.setup_response('No error') # rate
self.fake_connection.setup_response('9;No error') # bytes per row
self.fake_connection.setup_response('"?d";No error') # binary format
self.fake_connection.setup_response('No error') # trace start
self.fake_connection.setup_response(encoded_data) # trace data
self.fake_connection.setup_response('0;No error') # overflow

# Consume all data
list(self.dut.stream_data(10, 3, ("MX", 1), ("MY", 2), ("MR", 3)))

# Drain all outgoing messages and check TRACe:STOp was sent
outgoing_messages = []
while self.fake_connection.outgoing:
outgoing_messages.append(self.fake_connection.get_outgoing_message())
self.assertTrue(
any('TRACe:STOp' in msg for msg in outgoing_messages),
f'TRACe:STOp not found in outgoing messages: {outgoing_messages}')

def test_stream_data_sends_stop_on_early_break(self):
"""Test that TRACe:STOp is sent when caller breaks out of stream_data early"""

list_data = [(False, 45.6521), (True, 1.258), (False, 65.8974)]

my_data = []
for data in list_data:
for value in data:
my_data.append(value)

list_format = '<?d?d?d'
pack_data = pack(list_format, *my_data)
encoded_data = str(b64encode(pack_data))[2:-1]

self.fake_connection.setup_response('No error') # trace reset
self.fake_connection.setup_response('No error') # configure elements
self.fake_connection.setup_response('No error') # format encoding
self.fake_connection.setup_response('No error') # rate
self.fake_connection.setup_response('9;No error') # bytes per row
self.fake_connection.setup_response('"?d";No error') # binary format
self.fake_connection.setup_response('No error') # trace start
self.fake_connection.setup_response(encoded_data) # trace data

# Only consume 1 of 3 points then abandon the generator
gen = self.dut.stream_data(10, 3, ("MX", 1), ("MY", 2), ("MR", 3))
next(gen)
gen.close()

# Drain all outgoing messages and check TRACe:STOp was sent
outgoing_messages = []
while self.fake_connection.outgoing:
outgoing_messages.append(self.fake_connection.get_outgoing_message())
self.assertTrue(
any('TRACe:STOp' in msg for msg in outgoing_messages),
f'TRACe:STOp not found in outgoing messages: {outgoing_messages}')

def test_get_data(self):
"""Test get data"""

Expand Down
Loading