diff --git a/qupulse/program/measurement.py b/qupulse/program/measurement.py new file mode 100644 index 00000000..4de94b01 --- /dev/null +++ b/qupulse/program/measurement.py @@ -0,0 +1,259 @@ +import contextlib +from typing import Sequence, Mapping, Iterable, Optional, Union, ContextManager, Callable +from dataclasses import dataclass +from functools import cached_property + +import numpy + +from qupulse.utils.types import TimeType, MeasurementWindow +from qupulse.program.protocol import ProgramBuilder, Program +from qupulse.program.values import RepetitionCount, HardwareTime, HardwareVoltage, DynamicLinearValue, TimeType +from qupulse.program.waveforms import Waveform +from qupulse.parameter_scope import Scope + + +MeasurementID = str | int + + +@dataclass +class LoopLabel: + idx: int + runtime_name: str | None + count: RepetitionCount + + +@dataclass +class Measure: + meas_id: MeasurementID + delay: HardwareTime + length: HardwareTime + + +@dataclass +class Wait: + duration: HardwareTime + + +@dataclass +class LoopJmp: + idx: int + + +Command = Union[LoopLabel, LoopJmp, Wait, Measure] + + +@dataclass +class MeasurementInstructions(Program): + commands: Sequence[Command] + + @cached_property + def duration(self) -> float: + latest = 0. + + def process(_, begin, length): + nonlocal latest + end = begin + length + latest = max(latest, end) + + vm = MeasurementVM(process) + vm.execute(commands=self.commands) + return latest + + +@dataclass +class MeasurementFrame: + commands: list['Command'] + keep: bool + + +class MeasurementBuilder(ProgramBuilder): + def __init__(self): + super().__init__() + + self._frames = [MeasurementFrame([], False)] + self._ranges: list[tuple[str, range]] = [] + self._repetitions = [] + self._measurements = [] + self._label_counter = 0 + + def _with_new_frame(self, measurements): + self._frames.append(MeasurementFrame([], False)) + yield self + frame = self._frames.pop() + if not frame.keep: + return + self.measure(measurements) + # measure does not keep if there are no measurements + self._frames[-1].keep = True + return frame.commands + + def _with_loop_scope(self, measurements, loop_name, loop_count): + new_commands = yield from self._with_new_frame(measurements) + if new_commands is None: + return + parent = self._frames[-1] + + self._label_counter += 1 + label_idx = self._label_counter + parent.commands.append(LoopLabel(idx=label_idx, runtime_name=loop_name, count=loop_count)) + parent.commands.extend(new_commands) + parent.commands.append(LoopJmp(idx=label_idx)) + + def inner_scope(self, scope: Scope) -> Scope: + """This function is necessary to inject program builder specific parameter implementations into the build + process.""" + if self._ranges: + name, rng = self._ranges[-1] + return scope.overwrite({name: DynamicLinearValue(base=rng.start, factors={name: rng.step})}) + else: + return scope + + def hold_voltage(self, duration: HardwareTime, voltages: Mapping[str, HardwareVoltage]): + """Supports dynamic i.e. for loop generated offsets and duration""" + self._frames[-1].commands.append(Wait(duration)) + self._frames[-1].keep = True + + def play_arbitrary_waveform(self, waveform: Waveform): + """""" + self._frames[-1].commands.append(Wait(waveform.duration)) + self._frames[-1].keep = True + + def measure(self, measurements: Optional[Sequence[MeasurementWindow]]): + """Unconditionally add given measurements relative to the current position.""" + if measurements: + commands = self._frames[-1].commands + commands.extend(Measure(*meas) for meas in measurements) + self._frames[-1].keep = True + + def with_repetition(self, repetition_count: RepetitionCount, + measurements: Optional[Sequence[MeasurementWindow]] = None) -> Iterable['ProgramBuilder']: + """Measurements that are added to the new builder are dropped if the builder is empty upon exit""" + yield from self._with_loop_scope(measurements, loop_name=None, loop_count=repetition_count) + + @contextlib.contextmanager + def with_sequence(self, + measurements: Optional[Sequence[MeasurementWindow]] = None) -> ContextManager['ProgramBuilder']: + """ + + Measurements that are added in to the returned program builder are discarded if the sequence is empty on exit. + + Args: + measurements: Measurements to attach to the potential child. + Returns: + """ + new_commands = yield from self._with_new_frame(measurements) + if new_commands is None: + return + parent = self._frames[-1] + parent.commands.extend(new_commands) + + @contextlib.contextmanager + def new_subprogram(self, global_transformation: 'Transformation' = None) -> ContextManager['ProgramBuilder']: + """Create a context managed program builder whose contents are translated into a single waveform upon exit if + it is not empty.""" + yield self + + def with_iteration(self, index_name: str, rng: range, + measurements: Optional[Sequence[MeasurementWindow]] = None) -> Iterable['ProgramBuilder']: + self._ranges.append((index_name, rng)) + yield from self._with_loop_scope(measurements, loop_name=index_name, loop_count=len(rng)) + self._ranges.pop() + + def time_reversed(self) -> ContextManager['ProgramBuilder']: + self._frames.append(MeasurementFrame([], False)) + yield self + frame = self._frames.pop() + if not frame.keep: + return + + self._frames[-1].keep = True + self._frames[-1].commands.extend(_reversed_commands(frame.commands)) + + def to_program(self, channels = None) -> Optional[Program]: + """Further addition of new elements might fail after finalizing the program.""" + if self._frames[0].keep: + return MeasurementInstructions(self._frames[0].commands) + + +def _reversed_commands(cmds: Sequence[Command]) -> Sequence[Command]: + reversed_cmds = [] + jumps = {} + for cmd in reversed(cmds): + if isinstance(cmd, LoopJmp): + jumps[cmd.idx] = len(reversed_cmds) + reversed_cmds.append(cmd) + elif isinstance(cmd, LoopLabel): + jump_idx = jumps[cmd.idx] + jump = reversed_cmds[jump_idx] + reversed_cmds[jump_idx] = cmd + reversed_cmds.append(jump) + + elif isinstance(cmd, Measure): + if isinstance(cmd.delay, DynamicLinearValue) or isinstance(cmd.delay, DynamicLinearValue): + raise NotImplementedError("TODO") + reversed_cmds.append(Measure(meas_id=cmd.meas_id, + delay=-(cmd.delay + cmd.length), + length=cmd.length,)) + elif isinstance(cmd, Wait): + reversed_cmds.append(cmd) + else: + raise ValueError("Not a command", cmd) + + return reversed_cmds + + +class MeasurementVM: + """A VM that is capable of executing the measurement commands""" + + def __init__(self, callback: Callable[[str, float, float], None]): + self._time = TimeType(0) + self._memory = {} + self._counts = {} + self._callback = callback + + def _eval_hardware_time(self, t: HardwareTime): + if isinstance(t, DynamicLinearValue): + value = t.base + for (factor_name, factor_val) in t.factors.items(): + count = self._counts[self._memory[factor_name]] + value += factor_val * count + return value + else: + return t + + def _execute_after_label(self, sequence: Sequence[Command]) -> int: + skip = 0 + for idx, cmd in enumerate(sequence): + if idx < skip: + continue + if isinstance(cmd, LoopJmp): + return idx + elif isinstance(cmd, LoopLabel): + if cmd.runtime_name: + self._memory[cmd.runtime_name] = cmd.idx + + for iter_val in range(cmd.count): + self._counts[cmd.idx] = iter_val + pos = self._execute_after_label(sequence[idx + 1:]) + skip = idx + pos + 2 + + elif isinstance(cmd, Measure): + meas_time = float(self._eval_hardware_time(cmd.delay) + self._time) + meas_len = float(self._eval_hardware_time(cmd.length)) + self._callback(cmd.meas_id, meas_time, meas_len) + + elif isinstance(cmd, Wait): + self._time += self._eval_hardware_time(cmd.duration) + + def execute(self, commands: Sequence[Command]): + self._execute_after_label(commands) + + +def to_table(commands: Sequence[Command]) -> dict[str, numpy.ndarray]: + tables = {} + + vm = MeasurementVM(lambda name, begin, length: tables.setdefault(name, []).append((begin, length))) + vm.execute(commands) + return { + name: numpy.array(values) for name, values in tables.items() + } diff --git a/qupulse/pulses/measurement.py b/qupulse/pulses/measurement.py index 76145e35..76998c24 100644 --- a/qupulse/pulses/measurement.py +++ b/qupulse/pulses/measurement.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: LGPL-3.0-or-later +import warnings from typing import Optional, List, Tuple, Union, Dict, Set, Mapping, AbstractSet from numbers import Real import itertools @@ -62,8 +63,11 @@ def get_measurement_windows(self, begin_val = begin.evaluate_in_scope(parameters) length_val = length.evaluate_in_scope(parameters) - if begin_val < 0 or length_val < 0: - raise ValueError('Measurement window with negative begin or length: {}, {}'.format(begin, length)) + try: + if begin_val < 0 or length_val < 0: + warnings.warn('Measurement window with negative begin or length: {}, {}'.format(begin, length)) + except TypeError: + pass resulting_windows.append( (name, diff --git a/tests/program/measurement_tests.py b/tests/program/measurement_tests.py new file mode 100644 index 00000000..c0aacf60 --- /dev/null +++ b/tests/program/measurement_tests.py @@ -0,0 +1,61 @@ +import copy +import unittest +from unittest import TestCase + +import numpy as np + +from qupulse.pulses import * +from qupulse.program.measurement import * +from qupulse.program import DynamicLinearValue + + +class SingleRampTest(TestCase): + def setUp(self): + hold = ConstantPT(10 ** 6, {'a': '-1. + idx * 0.01'}, measurements=[('A', 10, 100), ('B', '1 + idx * 2', 200)]) + self.pulse_template = hold.with_iteration('idx', 200) + + self.commands = [ + LoopLabel(1, 'idx', 200), + Measure('A', 10, 100), + Measure('B', DynamicLinearValue(base=1, factors={'idx': 2}), 200), + Wait(TimeType(10 ** 6)), + LoopJmp(1) + ] + + self.table_a = np.array([(10 + 10**6 * idx, 100) for idx in range(200)]) + self.table_b = np.array([(1 + idx * 2 + 10**6 * idx, 200) for idx in range(200)]) + + def test_commands(self): + builder = MeasurementBuilder() + instructions = self.pulse_template.create_program(program_builder=builder) + self.assertEqual(self.commands, instructions.commands) + + def test_table(self): + table = to_table(self.commands) + tab_a = table['A'] + tab_b = table['B'] + np.testing.assert_array_equal(self.table_a, tab_a) + np.testing.assert_array_equal(self.table_b, tab_b) + + +class ComplexPulse(TestCase): + def setUp(self): + hold = ConstantPT(10 ** 6, {'a': 1}, measurements=[('A', 10, 100), ('B', '1 + ii * 2 + jj', '3 + ii + jj')]) + dyn_hold = ConstantPT('10 ** 6 - 4 * ii', {'a': 1}, measurements=[('A', 10, 100), ('B', '1 + ii * 2 + jj', '3 + ii + jj')]) + + self.pulse_template = SequencePT( + hold.with_repetition(2).with_iteration('ii', 100).with_repetition(2).with_iteration('jj', 200), + measurements=[('A', 1, 100)] + ).with_repetition(2) + + self.commands = [] + + def test_commands(self): + builder = MeasurementBuilder() + commands = self.pulse_template.create_program(program_builder=builder) + to_table(commands.commands) + raise NotImplementedError("TODO") + + def test_table(self): + + raise NotImplementedError("TODO") \ No newline at end of file diff --git a/tests/pulses/measurement_tests.py b/tests/pulses/measurement_tests.py index f010e409..8dd40e21 100644 --- a/tests/pulses/measurement_tests.py +++ b/tests/pulses/measurement_tests.py @@ -47,10 +47,10 @@ def test_measurement_windows_invalid(self) -> None: pulse = self.to_test_constructor(measurements=[('mw', 'a', 'd')]) measurement_mapping = {'mw': 'mw'} - with self.assertRaises(ValueError): + with self.assertWarnsRegex(UserWarning, "negative begin or length"): pulse.get_measurement_windows(measurement_mapping=measurement_mapping, parameters=dict(length=10, a=-1, d=3)) - with self.assertRaises(ValueError): + with self.assertWarnsRegex(UserWarning, "negative begin or length"): pulse.get_measurement_windows(measurement_mapping=measurement_mapping, parameters=dict(length=10, a=3, d=-1))