Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 259 additions & 0 deletions qupulse/program/measurement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
import contextlib
from typing import Sequence, Mapping, Iterable, Optional, Union, ContextManager, Callable
from dataclasses import dataclass
from functools import cached_property

import numpy

from qupulse.utils.types import TimeType, MeasurementWindow
from qupulse.program.protocol import ProgramBuilder, Program
from qupulse.program.values import RepetitionCount, HardwareTime, HardwareVoltage, DynamicLinearValue, TimeType
from qupulse.program.waveforms import Waveform
from qupulse.parameter_scope import Scope


MeasurementID = str | int


@dataclass
class LoopLabel:
idx: int
runtime_name: str | None
count: RepetitionCount


@dataclass
class Measure:
meas_id: MeasurementID
delay: HardwareTime
length: HardwareTime


@dataclass
class Wait:
duration: HardwareTime


@dataclass
class LoopJmp:
idx: int


Command = Union[LoopLabel, LoopJmp, Wait, Measure]


@dataclass
class MeasurementInstructions(Program):
commands: Sequence[Command]

@cached_property
def duration(self) -> float:
latest = 0.

def process(_, begin, length):
nonlocal latest
end = begin + length
latest = max(latest, end)

vm = MeasurementVM(process)
vm.execute(commands=self.commands)
return latest


@dataclass
class MeasurementFrame:
commands: list['Command']
keep: bool


class MeasurementBuilder(ProgramBuilder):
def __init__(self):
super().__init__()

self._frames = [MeasurementFrame([], False)]
self._ranges: list[tuple[str, range]] = []
self._repetitions = []
self._measurements = []
self._label_counter = 0

def _with_new_frame(self, measurements):
self._frames.append(MeasurementFrame([], False))
yield self
frame = self._frames.pop()
if not frame.keep:
return
self.measure(measurements)
# measure does not keep if there are no measurements
self._frames[-1].keep = True
return frame.commands

def _with_loop_scope(self, measurements, loop_name, loop_count):
new_commands = yield from self._with_new_frame(measurements)
if new_commands is None:
return
parent = self._frames[-1]

self._label_counter += 1
label_idx = self._label_counter
parent.commands.append(LoopLabel(idx=label_idx, runtime_name=loop_name, count=loop_count))
parent.commands.extend(new_commands)
parent.commands.append(LoopJmp(idx=label_idx))

def inner_scope(self, scope: Scope) -> Scope:
"""This function is necessary to inject program builder specific parameter implementations into the build
process."""
if self._ranges:
name, rng = self._ranges[-1]
return scope.overwrite({name: DynamicLinearValue(base=rng.start, factors={name: rng.step})})
else:
return scope

def hold_voltage(self, duration: HardwareTime, voltages: Mapping[str, HardwareVoltage]):
"""Supports dynamic i.e. for loop generated offsets and duration"""
self._frames[-1].commands.append(Wait(duration))
self._frames[-1].keep = True

def play_arbitrary_waveform(self, waveform: Waveform):
""""""
self._frames[-1].commands.append(Wait(waveform.duration))
self._frames[-1].keep = True

def measure(self, measurements: Optional[Sequence[MeasurementWindow]]):
"""Unconditionally add given measurements relative to the current position."""
if measurements:
commands = self._frames[-1].commands
commands.extend(Measure(*meas) for meas in measurements)
self._frames[-1].keep = True

def with_repetition(self, repetition_count: RepetitionCount,
measurements: Optional[Sequence[MeasurementWindow]] = None) -> Iterable['ProgramBuilder']:
"""Measurements that are added to the new builder are dropped if the builder is empty upon exit"""
yield from self._with_loop_scope(measurements, loop_name=None, loop_count=repetition_count)

@contextlib.contextmanager
def with_sequence(self,
measurements: Optional[Sequence[MeasurementWindow]] = None) -> ContextManager['ProgramBuilder']:
"""

Measurements that are added in to the returned program builder are discarded if the sequence is empty on exit.

Args:
measurements: Measurements to attach to the potential child.
Returns:
"""
new_commands = yield from self._with_new_frame(measurements)
if new_commands is None:
return
parent = self._frames[-1]
parent.commands.extend(new_commands)

@contextlib.contextmanager
def new_subprogram(self, global_transformation: 'Transformation' = None) -> ContextManager['ProgramBuilder']:
"""Create a context managed program builder whose contents are translated into a single waveform upon exit if
it is not empty."""
yield self

def with_iteration(self, index_name: str, rng: range,
measurements: Optional[Sequence[MeasurementWindow]] = None) -> Iterable['ProgramBuilder']:
self._ranges.append((index_name, rng))
yield from self._with_loop_scope(measurements, loop_name=index_name, loop_count=len(rng))
self._ranges.pop()

def time_reversed(self) -> ContextManager['ProgramBuilder']:
self._frames.append(MeasurementFrame([], False))
yield self
frame = self._frames.pop()
if not frame.keep:
return

self._frames[-1].keep = True
self._frames[-1].commands.extend(_reversed_commands(frame.commands))

def to_program(self, channels = None) -> Optional[Program]:
"""Further addition of new elements might fail after finalizing the program."""
if self._frames[0].keep:
return MeasurementInstructions(self._frames[0].commands)


def _reversed_commands(cmds: Sequence[Command]) -> Sequence[Command]:
reversed_cmds = []
jumps = {}
for cmd in reversed(cmds):
if isinstance(cmd, LoopJmp):
jumps[cmd.idx] = len(reversed_cmds)
reversed_cmds.append(cmd)
elif isinstance(cmd, LoopLabel):
jump_idx = jumps[cmd.idx]
jump = reversed_cmds[jump_idx]
reversed_cmds[jump_idx] = cmd
reversed_cmds.append(jump)

elif isinstance(cmd, Measure):
if isinstance(cmd.delay, DynamicLinearValue) or isinstance(cmd.delay, DynamicLinearValue):
raise NotImplementedError("TODO")
reversed_cmds.append(Measure(meas_id=cmd.meas_id,
delay=-(cmd.delay + cmd.length),
length=cmd.length,))
elif isinstance(cmd, Wait):
reversed_cmds.append(cmd)
else:
raise ValueError("Not a command", cmd)

return reversed_cmds


class MeasurementVM:
"""A VM that is capable of executing the measurement commands"""

def __init__(self, callback: Callable[[str, float, float], None]):
self._time = TimeType(0)
self._memory = {}
self._counts = {}
self._callback = callback

def _eval_hardware_time(self, t: HardwareTime):
if isinstance(t, DynamicLinearValue):
value = t.base
for (factor_name, factor_val) in t.factors.items():
count = self._counts[self._memory[factor_name]]
value += factor_val * count
return value
else:
return t

def _execute_after_label(self, sequence: Sequence[Command]) -> int:
skip = 0
for idx, cmd in enumerate(sequence):
if idx < skip:
continue
if isinstance(cmd, LoopJmp):
return idx
elif isinstance(cmd, LoopLabel):
if cmd.runtime_name:
self._memory[cmd.runtime_name] = cmd.idx

for iter_val in range(cmd.count):
self._counts[cmd.idx] = iter_val
pos = self._execute_after_label(sequence[idx + 1:])
skip = idx + pos + 2

elif isinstance(cmd, Measure):
meas_time = float(self._eval_hardware_time(cmd.delay) + self._time)
meas_len = float(self._eval_hardware_time(cmd.length))
self._callback(cmd.meas_id, meas_time, meas_len)

elif isinstance(cmd, Wait):
self._time += self._eval_hardware_time(cmd.duration)

def execute(self, commands: Sequence[Command]):
self._execute_after_label(commands)


def to_table(commands: Sequence[Command]) -> dict[str, numpy.ndarray]:
tables = {}

vm = MeasurementVM(lambda name, begin, length: tables.setdefault(name, []).append((begin, length)))
vm.execute(commands)
return {
name: numpy.array(values) for name, values in tables.items()
}
8 changes: 6 additions & 2 deletions qupulse/pulses/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: LGPL-3.0-or-later

import warnings
from typing import Optional, List, Tuple, Union, Dict, Set, Mapping, AbstractSet
from numbers import Real
import itertools
Expand Down Expand Up @@ -62,8 +63,11 @@ def get_measurement_windows(self,

begin_val = begin.evaluate_in_scope(parameters)
length_val = length.evaluate_in_scope(parameters)
if begin_val < 0 or length_val < 0:
raise ValueError('Measurement window with negative begin or length: {}, {}'.format(begin, length))
try:
if begin_val < 0 or length_val < 0:
warnings.warn('Measurement window with negative begin or length: {}, {}'.format(begin, length))
except TypeError:
pass

resulting_windows.append(
(name,
Expand Down
61 changes: 61 additions & 0 deletions tests/program/measurement_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import copy
import unittest
from unittest import TestCase

import numpy as np

from qupulse.pulses import *
from qupulse.program.measurement import *
from qupulse.program import DynamicLinearValue


class SingleRampTest(TestCase):
def setUp(self):
hold = ConstantPT(10 ** 6, {'a': '-1. + idx * 0.01'}, measurements=[('A', 10, 100), ('B', '1 + idx * 2', 200)])
self.pulse_template = hold.with_iteration('idx', 200)

self.commands = [
LoopLabel(1, 'idx', 200),
Measure('A', 10, 100),
Measure('B', DynamicLinearValue(base=1, factors={'idx': 2}), 200),
Wait(TimeType(10 ** 6)),
LoopJmp(1)
]

self.table_a = np.array([(10 + 10**6 * idx, 100) for idx in range(200)])
self.table_b = np.array([(1 + idx * 2 + 10**6 * idx, 200) for idx in range(200)])

def test_commands(self):
builder = MeasurementBuilder()
instructions = self.pulse_template.create_program(program_builder=builder)
self.assertEqual(self.commands, instructions.commands)

def test_table(self):
table = to_table(self.commands)
tab_a = table['A']
tab_b = table['B']
np.testing.assert_array_equal(self.table_a, tab_a)
np.testing.assert_array_equal(self.table_b, tab_b)


class ComplexPulse(TestCase):
def setUp(self):
hold = ConstantPT(10 ** 6, {'a': 1}, measurements=[('A', 10, 100), ('B', '1 + ii * 2 + jj', '3 + ii + jj')])
dyn_hold = ConstantPT('10 ** 6 - 4 * ii', {'a': 1}, measurements=[('A', 10, 100), ('B', '1 + ii * 2 + jj', '3 + ii + jj')])

self.pulse_template = SequencePT(
hold.with_repetition(2).with_iteration('ii', 100).with_repetition(2).with_iteration('jj', 200),
measurements=[('A', 1, 100)]
).with_repetition(2)

self.commands = []

def test_commands(self):
builder = MeasurementBuilder()
commands = self.pulse_template.create_program(program_builder=builder)
to_table(commands.commands)
raise NotImplementedError("TODO")

def test_table(self):

raise NotImplementedError("TODO")
4 changes: 2 additions & 2 deletions tests/pulses/measurement_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ def test_measurement_windows_invalid(self) -> None:
pulse = self.to_test_constructor(measurements=[('mw', 'a', 'd')])
measurement_mapping = {'mw': 'mw'}

with self.assertRaises(ValueError):
with self.assertWarnsRegex(UserWarning, "negative begin or length"):
pulse.get_measurement_windows(measurement_mapping=measurement_mapping,
parameters=dict(length=10, a=-1, d=3))
with self.assertRaises(ValueError):
with self.assertWarnsRegex(UserWarning, "negative begin or length"):
pulse.get_measurement_windows(measurement_mapping=measurement_mapping,
parameters=dict(length=10, a=3, d=-1))

Expand Down
Loading