From 8a771ffd8ced1fea1bf3b8df0dacfd56c1eecf0f Mon Sep 17 00:00:00 2001 From: Rob Taylor Date: Mon, 28 Apr 2025 16:47:37 +0100 Subject: [PATCH 1/5] wip --- amaranth/sim/_base.py | 25 ++- amaranth/sim/_vcdwriter.py | 313 ++++++++++++++++++++++++++++++++++++ amaranth/sim/pysim.py | 321 +++---------------------------------- amaranth/sim/trace.py | 275 +++++++++++++++++++++++++++++++ pyproject.toml | 1 + 5 files changed, 631 insertions(+), 304 deletions(-) create mode 100644 amaranth/sim/_vcdwriter.py create mode 100644 amaranth/sim/trace.py diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index 7e58112a4..e2c784a02 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -1,4 +1,25 @@ -__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine"] +from abc import ABCMeta, abstractmethod + +__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine", "Observer"] + + +class Observer(metaclass=ABCMeta): + @property + @abstractmethod + def fs_per_delta(self) -> int: + return 0 + + @abstractmethod + def update_signal(self, timestamp, signal): + ... + + @abstractmethod + def update_memory(self, timestamp, memory, addr): + ... + + @abstractmethod + def close(self, timestamp): + assert False class BaseProcess: @@ -97,5 +118,5 @@ def step_design(self): def advance(self): raise NotImplementedError # :nocov: - def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta): + def observe(self, observer: Observer): raise NotImplementedError # :nocov: diff --git a/amaranth/sim/_vcdwriter.py b/amaranth/sim/_vcdwriter.py new file mode 100644 index 000000000..da5c4323d --- /dev/null +++ b/amaranth/sim/_vcdwriter.py @@ -0,0 +1,313 @@ +import enum as py_enum +import itertools +import os.path +import re +import warnings + +from contextlib import contextmanager +from typing import IO + +from ..hdl import * +from ..hdl._mem import MemoryInstance +from ..hdl._ast import SignalDict +from ..lib import data, wiring +from ._base import * +from ._async import * +from ._pyeval import eval_format, eval_value, eval_assign +from ._pyrtl import _FragmentCompiler +from ._pyclock import PyClockProcess + + +class _VCDWriter(Observer): + @staticmethod + def decode_to_vcd(format, value): + return format.format(value).expandtabs().replace(" ", "_") + + def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0): + self.state = state + self._fs_per_delta = fs_per_delta + + # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that + # the simulator is still usable if it's not installed for some reason. + import vcd, vcd.gtkw + + self.close_vcd = False + self.close_gtkw = False + if isinstance(vcd_file, str): + vcd_file = open(vcd_file, "w") + self.close_vcd = True + if isinstance(gtkw_file, str): + gtkw_file = open(gtkw_file, "w") + self.close_gtkw = True + + self.vcd_signal_vars = SignalDict() + self.vcd_memory_vars = {} + self.vcd_file = vcd_file + self.vcd_writer = vcd_file and vcd.VCDWriter(self.vcd_file, + timescale="1 fs", comment="Generated by Amaranth") + + self.gtkw_signal_names = SignalDict() + self.gtkw_memory_names = {} + self.gtkw_file = gtkw_file + self.gtkw_save = gtkw_file and vcd.gtkw.GTKWSave(self.gtkw_file) + + self.traces = traces + + signal_names = SignalDict() + memories = {} + for fragment, fragment_info in design.fragments.items(): + fragment_name = ("bench", *fragment_info.name) + for signal, signal_name in fragment_info.signal_names.items(): + if signal not in signal_names: + signal_names[signal] = set() + signal_names[signal].add((*fragment_name, signal_name)) + if isinstance(fragment, MemoryInstance): + memories[fragment._data] = fragment_name + + trace_names = SignalDict() + assigned_names = set() + def traverse_traces(traces): + if isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(trace, MemoryData._Row): + memory = trace._memory + if not memory in memories: + if memory.name not in assigned_names: + name = memory.name + else: + name = f"{memory.name}${len(assigned_names)}" + assert name not in assigned_names + memories[memory] = ("bench", name) + assigned_names.add(name) + else: + for trace_signal in trace._rhs_signals(): + if trace_signal not in signal_names: + if trace_signal.name not in assigned_names: + name = trace_signal.name + else: + name = f"{trace_signal.name}${len(assigned_names)}" + assert name not in assigned_names + trace_names[trace_signal] = {("bench", name)} + assigned_names.add(name) + elif isinstance(traces, MemoryData): + if not traces in memories: + if traces.name not in assigned_names: + name = traces.name + else: + name = f"{traces.name}${len(assigned_names)}" + assert name not in assigned_names + memories[traces] = ("bench", name) + assigned_names.add(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + for name in traces.signature.members: + traverse_traces(getattr(traces, name)) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for trace in traces.values(): + traverse_traces(trace) + else: + raise TypeError(f"{traces!r} is not a traceable object") + traverse_traces(traces) + + if self.vcd_writer is None: + return + + for signal, names in itertools.chain(signal_names.items(), trace_names.items()): + self.vcd_signal_vars[signal] = [] + self.gtkw_signal_names[signal] = [] + + def add_var(path, var_type, var_size, var_init, value): + vcd_var = None + for (*var_scope, var_name) in names: + if re.search(r"[ \t\r\n]", var_name): + raise NameError("Signal '{}.{}' contains a whitespace character" + .format(".".join(var_scope), var_name)) + + field_name = var_name + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + if path: + field_name = "\\" + field_name + + if vcd_var is None: + vcd_var = self.vcd_writer.register_var( + scope=var_scope, name=field_name, + var_type=var_type, size=var_size, init=var_init) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + self.gtkw_signal_names[signal].append( + ".".join((*var_scope, field_name)) + suffix) + else: + self.vcd_writer.register_alias( + scope=var_scope, name=field_name, + var=vcd_var) + + self.vcd_signal_vars[signal].append((vcd_var, value)) + + def add_wire_var(path, value): + add_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_format_var(path, fmt): + add_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_wire_var(path, fmt._chunks[0][0]) + else: + add_format_var(path, fmt) + + if signal._decoder is not None and not isinstance(signal._decoder, py_enum.EnumMeta): + add_var((), "string", 1, signal._decoder(signal._init), signal._decoder) + else: + add_format((), signal._format) + + for memory, memory_name in memories.items(): + self.vcd_memory_vars[memory] = vcd_vars = [] + self.gtkw_memory_names[memory] = gtkw_names = [] + + for idx, row in enumerate(memory): + row_vcd_vars = [] + row_gtkw_names = [] + var_scope = memory_name[:-1] + + def add_mem_var(path, var_type, var_size, var_init, value): + field_name = "\\" + memory_name[-1] + f"[{idx}]" + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + row_vcd_vars.append((self.vcd_writer.register_var( + scope=var_scope, name=field_name, var_type=var_type, + size=var_size, init=var_init + ), value)) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + row_gtkw_names.append(".".join((*var_scope, field_name)) + suffix) + + def add_mem_wire_var(path, value): + add_mem_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_mem_format_var(path, fmt): + add_mem_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_mem_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_mem_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_mem_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_mem_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_mem_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_mem_wire_var(path, fmt._chunks[0][0]) + else: + add_mem_format_var(path, fmt) + + if isinstance(memory._shape, ShapeCastable): + fmt = memory._shape.format(memory._shape(row), "") + add_mem_format((), fmt) + else: + add_mem_wire_var((), row) + + vcd_vars.append(row_vcd_vars) + gtkw_names.append(row_gtkw_names) + + @property + def fs_per_delta(self) -> int: + return self._fs_per_delta + + def update_signal(self, timestamp, signal): + for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()): + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + elif isinstance(repr, (Format, Format.Enum)): + var_value = eval_format(self.state, repr) + else: + # decoder + var_value = repr(eval_value(self.state, signal)) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def update_memory(self, timestamp, memory, addr): + if memory not in self.vcd_memory_vars: + return + for vcd_var, repr in self.vcd_memory_vars[memory][addr]: + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + else: + var_value = eval_format(self.state, repr) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def close(self, timestamp): + if self.vcd_writer is not None: + self.vcd_writer.close(timestamp) + + if self.gtkw_save is not None: + self.gtkw_save.dumpfile(self.vcd_file.name) + self.gtkw_save.dumpfile_size(self.vcd_file.tell()) + + self.gtkw_save.treeopen("top") + + def traverse_traces(traces): + if isinstance(traces, data.View): + with self.gtkw_save.group("view"): + traverse_traces(Value.cast(traces)) + elif isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(traces, MemoryData._Row): + for name in self.gtkw_memory_names[traces._memory][traces._index]: + self.gtkw_save.trace(name) + else: + for trace_signal in trace._rhs_signals(): + for name in self.gtkw_signal_names[trace_signal]: + self.gtkw_save.trace(name) + elif isinstance(traces, MemoryData): + for row_names in self.gtkw_memory_names[traces]: + for name in row_names: + self.gtkw_save.trace(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + with self.gtkw_save.group("interface"): + for _, _, member in traces.signature.flatten(traces): + traverse_traces(member) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for name, trace in traces.items(): + with self.gtkw_save.group(name): + traverse_traces(trace) + else: + assert False # :nocov: + + traverse_traces(self.traces) + + if self.close_vcd: + self.vcd_file.close() + if self.close_gtkw: + self.gtkw_file.close() + + diff --git a/amaranth/sim/pysim.py b/amaranth/sim/pysim.py index d98ef7703..eccef7d00 100644 --- a/amaranth/sim/pysim.py +++ b/amaranth/sim/pysim.py @@ -1,4 +1,4 @@ -from contextlib import contextmanager +from contextlib import contextmanager, closing import itertools import re import os.path @@ -13,299 +13,11 @@ from ._pyeval import eval_format, eval_value, eval_assign from ._pyrtl import _FragmentCompiler from ._pyclock import PyClockProcess - +from ._vcdwriter import _VCDWriter __all__ = ["PySimEngine"] -class _VCDWriter: - @staticmethod - def decode_to_vcd(format, value): - return format.format(value).expandtabs().replace(" ", "_") - - def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0): - self.state = state - self.fs_per_delta = fs_per_delta - - # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that - # the simulator is still usable if it's not installed for some reason. - import vcd, vcd.gtkw - - self.close_vcd = False - self.close_gtkw = False - if isinstance(vcd_file, str): - vcd_file = open(vcd_file, "w") - self.close_vcd = True - if isinstance(gtkw_file, str): - gtkw_file = open(gtkw_file, "w") - self.close_gtkw = True - - self.vcd_signal_vars = SignalDict() - self.vcd_memory_vars = {} - self.vcd_file = vcd_file - self.vcd_writer = vcd_file and vcd.VCDWriter(self.vcd_file, - timescale="1 fs", comment="Generated by Amaranth") - - self.gtkw_signal_names = SignalDict() - self.gtkw_memory_names = {} - self.gtkw_file = gtkw_file - self.gtkw_save = gtkw_file and vcd.gtkw.GTKWSave(self.gtkw_file) - - self.traces = traces - - signal_names = SignalDict() - memories = {} - for fragment, fragment_info in design.fragments.items(): - fragment_name = ("bench", *fragment_info.name) - for signal, signal_name in fragment_info.signal_names.items(): - if signal not in signal_names: - signal_names[signal] = set() - signal_names[signal].add((*fragment_name, signal_name)) - if isinstance(fragment, MemoryInstance): - memories[fragment._data] = fragment_name - - trace_names = SignalDict() - assigned_names = set() - def traverse_traces(traces): - if isinstance(traces, ValueLike): - trace = Value.cast(traces) - if isinstance(trace, MemoryData._Row): - memory = trace._memory - if not memory in memories: - if memory.name not in assigned_names: - name = memory.name - else: - name = f"{memory.name}${len(assigned_names)}" - assert name not in assigned_names - memories[memory] = ("bench", name) - assigned_names.add(name) - else: - for trace_signal in trace._rhs_signals(): - if trace_signal not in signal_names: - if trace_signal.name not in assigned_names: - name = trace_signal.name - else: - name = f"{trace_signal.name}${len(assigned_names)}" - assert name not in assigned_names - trace_names[trace_signal] = {("bench", name)} - assigned_names.add(name) - elif isinstance(traces, MemoryData): - if not traces in memories: - if traces.name not in assigned_names: - name = traces.name - else: - name = f"{traces.name}${len(assigned_names)}" - assert name not in assigned_names - memories[traces] = ("bench", name) - assigned_names.add(name) - elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): - for name in traces.signature.members: - traverse_traces(getattr(traces, name)) - elif isinstance(traces, list) or isinstance(traces, tuple): - for trace in traces: - traverse_traces(trace) - elif isinstance(traces, dict): - for trace in traces.values(): - traverse_traces(trace) - else: - raise TypeError(f"{traces!r} is not a traceable object") - traverse_traces(traces) - - if self.vcd_writer is None: - return - - for signal, names in itertools.chain(signal_names.items(), trace_names.items()): - self.vcd_signal_vars[signal] = [] - self.gtkw_signal_names[signal] = [] - - def add_var(path, var_type, var_size, var_init, value): - vcd_var = None - for (*var_scope, var_name) in names: - if re.search(r"[ \t\r\n]", var_name): - raise NameError("Signal '{}.{}' contains a whitespace character" - .format(".".join(var_scope), var_name)) - - field_name = var_name - for item in path: - if isinstance(item, int): - field_name += f"[{item}]" - else: - field_name += f".{item}" - if path: - field_name = "\\" + field_name - - if vcd_var is None: - vcd_var = self.vcd_writer.register_var( - scope=var_scope, name=field_name, - var_type=var_type, size=var_size, init=var_init) - if var_size > 1: - suffix = f"[{var_size - 1}:0]" - else: - suffix = "" - self.gtkw_signal_names[signal].append( - ".".join((*var_scope, field_name)) + suffix) - else: - self.vcd_writer.register_alias( - scope=var_scope, name=field_name, - var=vcd_var) - - self.vcd_signal_vars[signal].append((vcd_var, value)) - - def add_wire_var(path, value): - add_var(path, "wire", len(value), eval_value(self.state, value), value) - - def add_format_var(path, fmt): - add_var(path, "string", 1, eval_format(self.state, fmt), fmt) - - def add_format(path, fmt): - if isinstance(fmt, Format.Struct): - add_wire_var(path, fmt._value) - for name, subfmt in fmt._fields.items(): - add_format(path + (name,), subfmt) - elif isinstance(fmt, Format.Array): - add_wire_var(path, fmt._value) - for idx, subfmt in enumerate(fmt._fields): - add_format(path + (idx,), subfmt) - elif (isinstance(fmt, Format) and - len(fmt._chunks) == 1 and - isinstance(fmt._chunks[0], tuple) and - fmt._chunks[0][1] == ""): - add_wire_var(path, fmt._chunks[0][0]) - else: - add_format_var(path, fmt) - - if signal._decoder is not None and not isinstance(signal._decoder, py_enum.EnumMeta): - add_var((), "string", 1, signal._decoder(signal._init), signal._decoder) - else: - add_format((), signal._format) - - for memory, memory_name in memories.items(): - self.vcd_memory_vars[memory] = vcd_vars = [] - self.gtkw_memory_names[memory] = gtkw_names = [] - - for idx, row in enumerate(memory): - row_vcd_vars = [] - row_gtkw_names = [] - var_scope = memory_name[:-1] - - def add_mem_var(path, var_type, var_size, var_init, value): - field_name = "\\" + memory_name[-1] + f"[{idx}]" - for item in path: - if isinstance(item, int): - field_name += f"[{item}]" - else: - field_name += f".{item}" - row_vcd_vars.append((self.vcd_writer.register_var( - scope=var_scope, name=field_name, var_type=var_type, - size=var_size, init=var_init - ), value)) - if var_size > 1: - suffix = f"[{var_size - 1}:0]" - else: - suffix = "" - row_gtkw_names.append(".".join((*var_scope, field_name)) + suffix) - - def add_mem_wire_var(path, value): - add_mem_var(path, "wire", len(value), eval_value(self.state, value), value) - - def add_mem_format_var(path, fmt): - add_mem_var(path, "string", 1, eval_format(self.state, fmt), fmt) - - def add_mem_format(path, fmt): - if isinstance(fmt, Format.Struct): - add_mem_wire_var(path, fmt._value) - for name, subfmt in fmt._fields.items(): - add_mem_format(path + (name,), subfmt) - elif isinstance(fmt, Format.Array): - add_mem_wire_var(path, fmt._value) - for idx, subfmt in enumerate(fmt._fields): - add_mem_format(path + (idx,), subfmt) - elif (isinstance(fmt, Format) and - len(fmt._chunks) == 1 and - isinstance(fmt._chunks[0], tuple) and - fmt._chunks[0][1] == ""): - add_mem_wire_var(path, fmt._chunks[0][0]) - else: - add_mem_format_var(path, fmt) - - if isinstance(memory._shape, ShapeCastable): - fmt = memory._shape.format(memory._shape(row), "") - add_mem_format((), fmt) - else: - add_mem_wire_var((), row) - - vcd_vars.append(row_vcd_vars) - gtkw_names.append(row_gtkw_names) - - def update_signal(self, timestamp, signal): - for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()): - if isinstance(repr, Value): - var_value = eval_value(self.state, repr) - elif isinstance(repr, (Format, Format.Enum)): - var_value = eval_format(self.state, repr) - else: - # decoder - var_value = repr(eval_value(self.state, signal)) - self.vcd_writer.change(vcd_var, timestamp, var_value) - - def update_memory(self, timestamp, memory, addr): - if memory not in self.vcd_memory_vars: - return - for vcd_var, repr in self.vcd_memory_vars[memory][addr]: - if isinstance(repr, Value): - var_value = eval_value(self.state, repr) - else: - var_value = eval_format(self.state, repr) - self.vcd_writer.change(vcd_var, timestamp, var_value) - - def close(self, timestamp): - if self.vcd_writer is not None: - self.vcd_writer.close(timestamp) - - if self.gtkw_save is not None: - self.gtkw_save.dumpfile(self.vcd_file.name) - self.gtkw_save.dumpfile_size(self.vcd_file.tell()) - - self.gtkw_save.treeopen("top") - - def traverse_traces(traces): - if isinstance(traces, data.View): - with self.gtkw_save.group("view"): - traverse_traces(Value.cast(traces)) - elif isinstance(traces, ValueLike): - trace = Value.cast(traces) - if isinstance(traces, MemoryData._Row): - for name in self.gtkw_memory_names[traces._memory][traces._index]: - self.gtkw_save.trace(name) - else: - for trace_signal in trace._rhs_signals(): - for name in self.gtkw_signal_names[trace_signal]: - self.gtkw_save.trace(name) - elif isinstance(traces, MemoryData): - for row_names in self.gtkw_memory_names[traces]: - for name in row_names: - self.gtkw_save.trace(name) - elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): - with self.gtkw_save.group("interface"): - for _, _, member in traces.signature.flatten(traces): - traverse_traces(member) - elif isinstance(traces, list) or isinstance(traces, tuple): - for trace in traces: - traverse_traces(trace) - elif isinstance(traces, dict): - for name, trace in traces.items(): - with self.gtkw_save.group(name): - traverse_traces(trace) - else: - assert False # :nocov: - traverse_traces(self.traces) - - if self.close_vcd: - self.vcd_file.close() - if self.close_gtkw: - self.gtkw_file.close() - - class _PyTimeline: def __init__(self): self.now = 0 @@ -607,7 +319,7 @@ def __init__(self, design): self._processes = _FragmentCompiler(self._state)(self._design.fragment) self._testbenches = [] self._delta_cycles = 0 - self._vcd_writers = [] + self._observers = [] self._active_triggers = set() @property @@ -658,7 +370,7 @@ def step_design(self): # Performs the three phases of a delta cycle in a loop: converged = False while not converged: - changed = set() if self._vcd_writers else None + changed = set() if self._observers else None # 1a. trigger: run every active trigger, sampling values and waking up processes; for trigger_state in self._active_triggers: @@ -677,15 +389,15 @@ def step_design(self): # 2. commit: apply queued signal changes, activating any awaited triggers. converged = self._state.commit(changed) - for vcd_writer in self._vcd_writers: - now_plus_deltas = self._now_plus_deltas(vcd_writer.fs_per_delta) + for observer in self._observers: + now_plus_deltas = self._now_plus_deltas(observer.fs_per_delta) for change in changed: if type(change) is _PySignalState: signal_state = change - vcd_writer.update_signal(now_plus_deltas, + observer.update_signal(now_plus_deltas, signal_state.signal) elif type(change) is _PyMemoryChange: - vcd_writer.update_memory(now_plus_deltas, change.state.memory, + observer.update_memory(now_plus_deltas, change.state.memory, change.addr) else: assert False # :nocov: @@ -721,12 +433,17 @@ def advance(self): return False @contextmanager - def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta): - vcd_writer = _VCDWriter(self._state, self._design, - vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces, fs_per_delta=fs_per_delta) + def observe(self, observer: Observer): try: - self._vcd_writers.append(vcd_writer) + self._observers.append(observer) yield finally: - vcd_writer.close(self._now_plus_deltas(vcd_writer.fs_per_delta)) - self._vcd_writers.remove(vcd_writer) + observer.close(self._now_plus_deltas(observer.fs_per_delta)) + self._observers.remove(observer) + + @contextmanager + def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta): + observer = _VCDWriter(self._state, self._design, vcd_file=vcd_file, gtkw_file=gtkw_file, + traces=traces, fs_per_delta=fs_per_delta) + with self.observe(observer): + yield diff --git a/amaranth/sim/trace.py b/amaranth/sim/trace.py new file mode 100644 index 000000000..485d93a8a --- /dev/null +++ b/amaranth/sim/trace.py @@ -0,0 +1,275 @@ +import itertools +import re +import enum as py_enum + +import pystore + +from ..hdl import * +from ..hdl._mem import MemoryInstance +from ..hdl._ast import SignalDict +from ..lib import data, wiring +from ._base import * +from ._async import * + + +class TimeSeriesWriter: + def __init__(self, state, design, *, pystore_path, collection_name, traces=()): + self.state = state + + self._signal_vars = SignalDict() + self._memory_vars = {} + + self.traces = traces + + pystore.set_path(pystore_path) # ugh api fail + store = pystore.store('amaranth') + self._collection = store.collection(collection_name) + signal_names = SignalDict() + memories = {} + for fragment, fragment_info in design.fragments.items(): + fragment_name = ("bench", *fragment_info.name) + for signal, signal_name in fragment_info.signal_names.items(): + if signal not in signal_names: + signal_names[signal] = set() + signal_names[signal].add((*fragment_name, signal_name)) + if isinstance(fragment, MemoryInstance): + memories[fragment._data] = fragment_name + + trace_names = SignalDict() + assigned_names = set() + def traverse_traces(traces): + if isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(trace, MemoryData._Row): + memory = trace._memory + if not memory in memories: + if memory.name not in assigned_names: + name = memory.name + else: + name = f"{memory.name}${len(assigned_names)}" + assert name not in assigned_names + memories[memory] = ("bench", name) + assigned_names.add(name) + else: + for trace_signal in trace._rhs_signals(): + if trace_signal not in signal_names: + if trace_signal.name not in assigned_names: + name = trace_signal.name + else: + name = f"{trace_signal.name}${len(assigned_names)}" + assert name not in assigned_names + trace_names[trace_signal] = {("bench", name)} + assigned_names.add(name) + elif isinstance(traces, MemoryData): + if not traces in memories: + if traces.name not in assigned_names: + name = traces.name + else: + name = f"{traces.name}${len(assigned_names)}" + assert name not in assigned_names + memories[traces] = ("bench", name) + assigned_names.add(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + for name in traces.signature.members: + traverse_traces(getattr(traces, name)) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for trace in traces.values(): + traverse_traces(trace) + else: + raise TypeError(f"{traces!r} is not a traceable object") + traverse_traces(traces) + + for signal, names in itertools.chain(signal_names.items(), trace_names.items()): + self._signal_vars[signal] = [] + + def add_var(path, var_type, var_size, var_init, value): + vcd_var = None + for (*var_scope, var_name) in names: + if re.search(r"[ \t\r\n]", var_name): + raise NameError("Signal '{}.{}' contains a whitespace character" + .format(".".join(var_scope), var_name)) + + field_name = var_name + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + if path: + field_name = "\\" + field_name + + if vcd_var is None: + self.register_var( + scope=var_scope, name=field_name, + var_type=var_type, size=var_size, init=var_init) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + self._signal_names[signal].append( + ".".join((*var_scope, field_name)) + suffix) + else: + self.vcd_writer.register_alias( + scope=var_scope, name=field_name, + var=vcd_var) + + self.vcd_signal_vars[signal].append((vcd_var, value)) + + def add_wire_var(path, value): + add_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_format_var(path, fmt): + add_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_wire_var(path, fmt._chunks[0][0]) + else: + add_format_var(path, fmt) + + if signal._decoder is not None and not isinstance(signal._decoder, py_enum.EnumMeta): + add_var((), "string", 1, signal._decoder(signal._init), signal._decoder) + else: + add_format((), signal._format) + + for memory, memory_name in memories.items(): + self.vcd_memory_vars[memory] = vcd_vars = [] + self.gtkw_memory_names[memory] = gtkw_names = [] + + for idx, row in enumerate(memory): + row_vcd_vars = [] + row_gtkw_names = [] + var_scope = memory_name[:-1] + + def add_mem_var(path, var_type, var_size, var_init, value): + field_name = "\\" + memory_name[-1] + f"[{idx}]" + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + row_vcd_vars.append((self.vcd_writer.register_var( + scope=var_scope, name=field_name, var_type=var_type, + size=var_size, init=var_init + ), value)) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + row_gtkw_names.append(".".join((*var_scope, field_name)) + suffix) + + def add_mem_wire_var(path, value): + add_mem_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_mem_format_var(path, fmt): + add_mem_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_mem_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_mem_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_mem_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_mem_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_mem_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_mem_wire_var(path, fmt._chunks[0][0]) + else: + add_mem_format_var(path, fmt) + + if isinstance(memory._shape, ShapeCastable): + fmt = memory._shape.format(memory._shape(row), "") + add_mem_format((), fmt) + else: + add_mem_wire_var((), row) + + vcd_vars.append(row_vcd_vars) + gtkw_names.append(row_gtkw_names) + + def update_signal(self, timestamp, signal): + for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()): + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + elif isinstance(repr, (Format, Format.Enum)): + var_value = eval_format(self.state, repr) + else: + # decoder + var_value = repr(eval_value(self.state, signal)) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def update_memory(self, timestamp, memory, addr): + if memory not in self.vcd_memory_vars: + return + for vcd_var, repr in self.vcd_memory_vars[memory][addr]: + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + else: + var_value = eval_format(self.state, repr) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def close(self, timestamp): + if self.vcd_writer is not None: + self.vcd_writer.close(timestamp) + + if self.gtkw_save is not None: + self.gtkw_save.dumpfile(self.vcd_file.name) + self.gtkw_save.dumpfile_size(self.vcd_file.tell()) + + self.gtkw_save.treeopen("top") + + def traverse_traces(traces): + if isinstance(traces, data.View): + with self.gtkw_save.group("view"): + traverse_traces(Value.cast(traces)) + elif isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(traces, MemoryData._Row): + for name in self.gtkw_memory_names[traces._memory][traces._index]: + self.gtkw_save.trace(name) + else: + for trace_signal in trace._rhs_signals(): + for name in self.gtkw_signal_names[trace_signal]: + self.gtkw_save.trace(name) + elif isinstance(traces, MemoryData): + for row_names in self.gtkw_memory_names[traces]: + for name in row_names: + self.gtkw_save.trace(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + with self.gtkw_save.group("interface"): + for _, _, member in traces.signature.flatten(traces): + traverse_traces(member) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for name, trace in traces.items(): + with self.gtkw_save.group(name): + traverse_traces(trace) + else: + assert False # :nocov: + traverse_traces(self.traces) + + if self.close_vcd: + self.vcd_file.close() + if self.close_gtkw: + self.gtkw_file.close() + + diff --git a/pyproject.toml b/pyproject.toml index 60a1ea849..2764cb028 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "jschon~=0.11.1", # for amaranth.lib.meta "pyvcd>=0.2.2,<0.5", # for amaranth.sim.pysim "Jinja2~=3.0", # for amaranth.build + "pystore>=0.1.24", ] [project.optional-dependencies] From 72611e6e15cc77795766996cb4880236e8684103 Mon Sep 17 00:00:00 2001 From: Ivy Yu Date: Mon, 14 Jul 2025 23:11:50 +0800 Subject: [PATCH 2/5] started observer & toggle coverage implementation --- amaranth/sim/_base.py | 62 ++++++++++++++++++++++++++++++++++++-- amaranth/sim/_coverage.py | 54 +++++++++++++++++++++++++++++++++ amaranth/sim/_test_base.py | 57 +++++++++++++++++++++++++++++++++++ 3 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 amaranth/sim/_coverage.py create mode 100644 amaranth/sim/_test_base.py diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index e2c784a02..e0d3d7770 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -1,6 +1,6 @@ from abc import ABCMeta, abstractmethod -__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine", "Observer"] +__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine", "Observer", "DummyEngine", "PrintObserver"] class Observer(metaclass=ABCMeta): @@ -65,7 +65,9 @@ def reset(self): raise NotImplementedError # :nocov: def get_signal(self, signal): - raise NotImplementedError # :nocov: + val = self._sim.read_signal(signal) + print(f"[DEBUG] Raw value read: {val} for signal: {signal}") + return int(val) def get_memory(self, memory): raise NotImplementedError # :nocov: @@ -83,6 +85,26 @@ def add_memory_waker(self, memory, waker): class BaseEngine: + # add storage for observers + def __init__(self): + self._observers = [] + + # append observer to list + def add_observer(self, observer: Observer): + self._observers.append(observer) + + def notify_signal_change(self, signal): + for observer in self._observers: + observer.update_signal(self.now, signal) + + def notify_memory_change(self, memory, addr): + for observer in self._observers: + observer.update_memory(self.now, memory, addr) + + def notify_close(self): + for observer in self._observers: + observer.close(self.now) + @property def state(self) -> BaseEngineState: raise NotImplementedError # :nocov: @@ -120,3 +142,39 @@ def advance(self): def observe(self, observer: Observer): raise NotImplementedError # :nocov: + +class DummyEngine(BaseEngine): + def __init__(self): + super().__init__() + self._now = 0 + + @property + def now(self): + return self._now + + def notify_signal_change(self, signal): + for obs in self._observers: + obs.update_signal(self.now, signal) + + def notify_memory_change(self, memory, addr): + for obs in self._observers: + obs.update_memory(self.now, memory, addr) + + def notify_close(self): + for obs in self._observers: + obs.close(self.now) + + +class PrintObserver(Observer): + @property + def fs_per_delta(self) -> int: + return 1 + + def update_signal(self, timestamp, signal): + print(f"[{timestamp}] Signal changed: {signal}") + + def update_memory(self, timestamp, memory, addr): + print(f"[{timestamp}] Memory write at {addr}") + + def close(self, timestamp): + print(f"[{timestamp}] Simulation ended") diff --git a/amaranth/sim/_coverage.py b/amaranth/sim/_coverage.py new file mode 100644 index 000000000..c8042f29f --- /dev/null +++ b/amaranth/sim/_coverage.py @@ -0,0 +1,54 @@ +from ._base import Observer + +class ToggleCoverageObserver(Observer): + def __init__(self, state): + self.state = state + self._prev_values = {} + self._toggles = {} + self._signal_names = {} + + @property + def fs_per_delta(self) -> int: + return 0 + + def update_signal(self, timestamp, signal): + if getattr(signal, "name", "") != "out": + return + + sig_id = id(signal) + curr_val = int(self.state.get_signal(signal)) #FIX??? + print(f"[DEBUG] Signal {getattr(signal, 'name', signal)} = {curr_val}") + + if sig_id not in self._prev_values: + self._prev_values[sig_id] = curr_val + self._toggles[sig_id] = {"0->1": False, "1->0": False} + self._signal_names[sig_id] = signal.name + return + + prev_val = self._prev_values[sig_id] + + if prev_val == 0 and curr_val == 1: + self._toggles[sig_id]["0->1"] = True + elif prev_val == 1 and curr_val == 0: + self._toggles[sig_id]["1->0"] = True + + self._prev_values[sig_id] = curr_val + + def update_memory(self, timestamp, memory, addr): + pass + + def get_results(self): + return { + self._signal_names[sig_id]: toggles + for sig_id, toggles in self._toggles.items() + } + + def close(self, timestamp): + results = self.get_results() + print("=== Toggle Coverage Report ===") + for signal, toggles in results.items(): + print(f"{signal}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + + + diff --git a/amaranth/sim/_test_base.py b/amaranth/sim/_test_base.py new file mode 100644 index 000000000..9487ed964 --- /dev/null +++ b/amaranth/sim/_test_base.py @@ -0,0 +1,57 @@ +from _base import DummyEngine, PrintObserver + +# def test_print_observer(): +# engine = DummyEngine() +# observer = PrintObserver() +# engine.add_observer(observer) + +# engine.notify_signal_change("CLK") +# engine.notify_memory_change("RAM", 0x10) +# engine.notify_close() + +# if __name__ == "__main__": +# test_print_observer() + +from amaranth import * +from amaranth.sim import Tick, Simulator +from amaranth.sim._coverage import ToggleCoverageObserver + +class ToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(2, name="counter") + m.d.sync += counter.eq(counter + 1) + m.d.comb += self.out.eq(counter[1]) + return m + + +def run_toggle_coverage_test(): + dut = ToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(8): # Run for 8 cycles + yield Tick() + sim._engine.notify_signal_change(dut.out) + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + assert results["out"]["0->1"] + assert results["out"]["1->0"] + + +if __name__ == "__main__": + run_toggle_coverage_test() From f6580d4d2204a6ba0517651ae1b37c599c7bd974 Mon Sep 17 00:00:00 2001 From: Rob Taylor Date: Thu, 17 Jul 2025 12:05:28 +0100 Subject: [PATCH 3/5] Add setting of fs_per_delta to base observer class --- amaranth/sim/_base.py | 10 ++++++++-- amaranth/sim/_coverage.py | 11 ++++------- amaranth/sim/_vcdwriter.py | 4 ++-- amaranth/sim/trace.py | 17 +++++++++-------- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index e0d3d7770..7a8abc0f2 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -4,10 +4,12 @@ class Observer(metaclass=ABCMeta): + def __init__(self, fs_per_delta=0): + self._fs_per_delta = fs_per_delta + @property - @abstractmethod def fs_per_delta(self) -> int: - return 0 + return self._fs_per_delta @abstractmethod def update_signal(self, timestamp, signal): @@ -166,6 +168,10 @@ def notify_close(self): class PrintObserver(Observer): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + @property def fs_per_delta(self) -> int: return 1 diff --git a/amaranth/sim/_coverage.py b/amaranth/sim/_coverage.py index c8042f29f..96ad6c339 100644 --- a/amaranth/sim/_coverage.py +++ b/amaranth/sim/_coverage.py @@ -1,20 +1,17 @@ from ._base import Observer class ToggleCoverageObserver(Observer): - def __init__(self, state): + def __init__(self, state, **kwargs): self.state = state self._prev_values = {} - self._toggles = {} + self._toggles = {} self._signal_names = {} - - @property - def fs_per_delta(self) -> int: - return 0 + super().__init__(**kwargs) def update_signal(self, timestamp, signal): if getattr(signal, "name", "") != "out": return - + sig_id = id(signal) curr_val = int(self.state.get_signal(signal)) #FIX??? print(f"[DEBUG] Signal {getattr(signal, 'name', signal)} = {curr_val}") diff --git a/amaranth/sim/_vcdwriter.py b/amaranth/sim/_vcdwriter.py index da5c4323d..a93ec5a4e 100644 --- a/amaranth/sim/_vcdwriter.py +++ b/amaranth/sim/_vcdwriter.py @@ -23,9 +23,9 @@ class _VCDWriter(Observer): def decode_to_vcd(format, value): return format.format(value).expandtabs().replace(" ", "_") - def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0): + def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), **kwargs): + super.__init__(**kwargs) self.state = state - self._fs_per_delta = fs_per_delta # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that # the simulator is still usable if it's not installed for some reason. diff --git a/amaranth/sim/trace.py b/amaranth/sim/trace.py index 485d93a8a..2df6e77f6 100644 --- a/amaranth/sim/trace.py +++ b/amaranth/sim/trace.py @@ -2,7 +2,7 @@ import re import enum as py_enum -import pystore +import nanots from ..hdl import * from ..hdl._mem import MemoryInstance @@ -12,8 +12,10 @@ from ._async import * -class TimeSeriesWriter: - def __init__(self, state, design, *, pystore_path, collection_name, traces=()): +class TimeSeriesWriter(Observer): + def __init__(self, state, design, *, db_filename, traces=(), **kwargs): + super().__init__(**kwargs) + self.state = state self._signal_vars = SignalDict() @@ -21,9 +23,8 @@ def __init__(self, state, design, *, pystore_path, collection_name, traces=()): self.traces = traces - pystore.set_path(pystore_path) # ugh api fail - store = pystore.store('amaranth') - self._collection = store.collection(collection_name) + self.writer = nanots.Writer(db_filename, auto_reclaim=False) + signal_names = SignalDict() memories = {} for fragment, fragment_info in design.fragments.items(): @@ -52,7 +53,7 @@ def traverse_traces(traces): assigned_names.add(name) else: for trace_signal in trace._rhs_signals(): - if trace_signal not in signal_names: + if trace_signal and trace_signal not in signal_names: if trace_signal.name not in assigned_names: name = trace_signal.name else: @@ -102,7 +103,7 @@ def add_var(path, var_type, var_size, var_init, value): field_name = "\\" + field_name if vcd_var is None: - self.register_var( + context = writer.create_context(f"{var_scope}@{field_name}" scope=var_scope, name=field_name, var_type=var_type, size=var_size, init=var_init) if var_size > 1: From 1da678b21483f8500fce5bc37e7e04a8168e0623 Mon Sep 17 00:00:00 2001 From: Ivy Yu Date: Thu, 17 Jul 2025 20:33:13 +0800 Subject: [PATCH 4/5] moved test_base.py to ./tests and into a unittest --- amaranth/sim/_test_base.py | 57 --------------------------------- amaranth/sim/_vcdwriter.py | 2 +- amaranth/sim/test_vcd_writer.py | 31 ++++++++++++++++++ pdm_build.py | 14 ++++++-- pyproject.toml | 3 +- tests/test_base.py | 45 ++++++++++++++++++++++++++ 6 files changed, 91 insertions(+), 61 deletions(-) delete mode 100644 amaranth/sim/_test_base.py create mode 100644 amaranth/sim/test_vcd_writer.py create mode 100644 tests/test_base.py diff --git a/amaranth/sim/_test_base.py b/amaranth/sim/_test_base.py deleted file mode 100644 index 9487ed964..000000000 --- a/amaranth/sim/_test_base.py +++ /dev/null @@ -1,57 +0,0 @@ -from _base import DummyEngine, PrintObserver - -# def test_print_observer(): -# engine = DummyEngine() -# observer = PrintObserver() -# engine.add_observer(observer) - -# engine.notify_signal_change("CLK") -# engine.notify_memory_change("RAM", 0x10) -# engine.notify_close() - -# if __name__ == "__main__": -# test_print_observer() - -from amaranth import * -from amaranth.sim import Tick, Simulator -from amaranth.sim._coverage import ToggleCoverageObserver - -class ToggleDUT(Elaboratable): - def __init__(self): - self.out = Signal(name="out") - - def elaborate(self, platform): - m = Module() - counter = Signal(2, name="counter") - m.d.sync += counter.eq(counter + 1) - m.d.comb += self.out.eq(counter[1]) - return m - - -def run_toggle_coverage_test(): - dut = ToggleDUT() - sim = Simulator(dut) - - toggle_cov = ToggleCoverageObserver(sim._engine.state) - sim._engine.add_observer(toggle_cov) - - def process(): - for _ in range(8): # Run for 8 cycles - yield Tick() - sim._engine.notify_signal_change(dut.out) - - sim.add_clock(1e-6) - sim.add_testbench(process) - sim.run() - - results = toggle_cov.get_results() - print("Toggle coverage results:") - for signal_name, toggles in results.items(): - print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") - - assert results["out"]["0->1"] - assert results["out"]["1->0"] - - -if __name__ == "__main__": - run_toggle_coverage_test() diff --git a/amaranth/sim/_vcdwriter.py b/amaranth/sim/_vcdwriter.py index a93ec5a4e..7627e5fe2 100644 --- a/amaranth/sim/_vcdwriter.py +++ b/amaranth/sim/_vcdwriter.py @@ -24,7 +24,7 @@ def decode_to_vcd(format, value): return format.format(value).expandtabs().replace(" ", "_") def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), **kwargs): - super.__init__(**kwargs) + super().__init__(**kwargs) self.state = state # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that diff --git a/amaranth/sim/test_vcd_writer.py b/amaranth/sim/test_vcd_writer.py new file mode 100644 index 000000000..cb83035d1 --- /dev/null +++ b/amaranth/sim/test_vcd_writer.py @@ -0,0 +1,31 @@ +from amaranth import Elaboratable, Module, Signal +from amaranth.sim import Simulator + +class Top(Elaboratable): + def __init__(self): + self.a = Signal() + + def elaborate(self, platform): + m = Module() + count = Signal(4) + m.d.sync += [ + count.eq(count + 1), + self.a.eq(count[-1]) + ] + return m + +# Create design and simulator +dut = Top() +sim = Simulator(dut) +sim.add_clock(1e-6) # 1 MHz + +def process(): + for _ in range(10): + yield + +sim.add_sync_process(process) + +# Write VCD output +with open("test_output.vcd", "w") as vcd_file: + with sim.write_vcd(vcd_file=vcd_file, gtkw_file=None, traces=[dut.a]): + sim.run() diff --git a/pdm_build.py b/pdm_build.py index 3ad2682e4..c6215ccac 100644 --- a/pdm_build.py +++ b/pdm_build.py @@ -4,15 +4,25 @@ from pdm.backend._vendor.packaging.version import Version +# def format_version(version: SCMVersion) -> str: +# major, minor, patch = (int(n) for n in str(version.version).split(".")[:3]) +# dirty = f"+{datetime.utcnow():%Y%m%d.%H%M%S}" if version.dirty else "" +# if version.distance is None: +# return f"{major}.{minor}.{patch}{dirty}" +# else: +# return f"{major}.{minor}.{patch}.dev{version.distance}{dirty}" + def format_version(version: SCMVersion) -> str: - major, minor, patch = (int(n) for n in str(version.version).split(".")[:3]) + parts = str(version.version).split(".") + major = int(parts[0]) + minor = int(parts[1]) if len(parts) > 1 else 0 + patch = int(parts[2]) if len(parts) > 2 else 0 dirty = f"+{datetime.utcnow():%Y%m%d.%H%M%S}" if version.dirty else "" if version.distance is None: return f"{major}.{minor}.{patch}{dirty}" else: return f"{major}.{minor}.{patch}.dev{version.distance}{dirty}" - def pdm_build_initialize(context): version = Version(context.config.metadata["version"]) diff --git a/pyproject.toml b/pyproject.toml index 2764cb028..8d4f4f68b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,10 +15,11 @@ license = { text = "BSD-2-clause" } requires-python = "~=3.9" dependencies = [ - "jschon~=0.11.1", # for amaranth.lib.meta + "jschon>=0.11.1", # for amaranth.lib.meta "pyvcd>=0.2.2,<0.5", # for amaranth.sim.pysim "Jinja2~=3.0", # for amaranth.build "pystore>=0.1.24", + "pytest>=8.4.1", ] [project.optional-dependencies] diff --git a/tests/test_base.py b/tests/test_base.py new file mode 100644 index 000000000..4ac7e979e --- /dev/null +++ b/tests/test_base.py @@ -0,0 +1,45 @@ +import unittest +from amaranth import * +from amaranth.sim import Tick, Simulator +from amaranth.sim._coverage import ToggleCoverageObserver + +class ToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(2, name="counter") + m.d.sync += counter.eq(counter + 1) + m.d.comb += self.out.eq(counter[1]) + return m + + +class ToggleCoverageTest(unittest.TestCase): + def test_toggle_coverage(self): + dut = ToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(8): + yield Tick() + sim._engine.notify_signal_change(dut.out) + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") + self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") + + +if __name__ == "__main__": + unittest.main() From 493dbbc662a5267af5da616fb788519561df886c Mon Sep 17 00:00:00 2001 From: Ivy Yu Date: Fri, 18 Jul 2025 20:00:47 +0800 Subject: [PATCH 5/5] implemented toggle coverage with regular & irregular toggleDUT, tracking num of signal transitions --- amaranth/sim/_base.py | 4 +- amaranth/sim/_coverage.py | 16 +++++-- pyproject.toml | 98 ++++++++++++++++++++------------------- tests/test_base.py | 45 ------------------ tests/test_coverage.py | 81 ++++++++++++++++++++++++++++++++ 5 files changed, 145 insertions(+), 99 deletions(-) delete mode 100644 tests/test_base.py create mode 100644 tests/test_coverage.py diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index 7a8abc0f2..f3a5b194c 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -67,9 +67,7 @@ def reset(self): raise NotImplementedError # :nocov: def get_signal(self, signal): - val = self._sim.read_signal(signal) - print(f"[DEBUG] Raw value read: {val} for signal: {signal}") - return int(val) + raise NotImplementedError # :nocov: def get_memory(self, memory): raise NotImplementedError # :nocov: diff --git a/amaranth/sim/_coverage.py b/amaranth/sim/_coverage.py index 96ad6c339..a5c2d99e3 100644 --- a/amaranth/sim/_coverage.py +++ b/amaranth/sim/_coverage.py @@ -1,4 +1,5 @@ from ._base import Observer +from amaranth.sim._vcdwriter import eval_value, eval_format class ToggleCoverageObserver(Observer): def __init__(self, state, **kwargs): @@ -13,21 +14,28 @@ def update_signal(self, timestamp, signal): return sig_id = id(signal) - curr_val = int(self.state.get_signal(signal)) #FIX??? + try: + val = eval_value(self.state, signal) + except Exception: + val = int(self.state.get_signal(signal)) + try: + curr_val = int(val) + except TypeError: + curr_val = val print(f"[DEBUG] Signal {getattr(signal, 'name', signal)} = {curr_val}") if sig_id not in self._prev_values: self._prev_values[sig_id] = curr_val - self._toggles[sig_id] = {"0->1": False, "1->0": False} + self._toggles[sig_id] = {"0->1": 0, "1->0": 0} self._signal_names[sig_id] = signal.name return prev_val = self._prev_values[sig_id] if prev_val == 0 and curr_val == 1: - self._toggles[sig_id]["0->1"] = True + self._toggles[sig_id]["0->1"] += 1 elif prev_val == 1 and curr_val == 0: - self._toggles[sig_id]["1->0"] = True + self._toggles[sig_id]["1->0"] += 1 self._prev_values[sig_id] = curr_val diff --git a/pyproject.toml b/pyproject.toml index 8d4f4f68b..546e895a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,9 +1,56 @@ # Project metadata + [tool.pdm.version] source = "scm" version_format = "pdm_build:format_version" +[tool.pdm.build] +# If amaranth 0.3 is checked out with git (e.g. as a part of a persistent editable install or +# a git worktree cached by tools like poetry), it can have an empty `nmigen` directory left over, +# which causes a hard error because setuptools cannot determine the top-level package. +# Add a workaround to improve experience for people upgrading from old checkouts. +includes = ["amaranth/"] + +source-includes = [ + ".gitignore", + ".coveragerc", + ".env.toolchain", + "CONTRIBUTING.txt", +] + +# Development workflow configuration + +[tool.pdm.dev-dependencies] +# This version requirement needs to be synchronized with the one in pyproject.toml above! +docs = [ + "sphinx~=7.1", + "sphinxcontrib-platformpicker~=1.4", + "sphinxcontrib-yowasp-wavedrom==1.8", # exact version to avoid changes in rendering + "sphinx-rtd-theme~=2.0", + "sphinx-autobuild", +] +examples = [ + "amaranth-boards @ git+https://github.com/amaranth-lang/amaranth-boards.git" +] + +[tool.pdm.scripts] +_.env_file = ".env.toolchain" + +test.composite = ["test-code", "test-docs", "coverage-xml"] +test-code.env = {PYTHONWARNINGS = "error"} +test-code.cmd = "python -m coverage run -m unittest discover -t . -s tests -v" +test-docs.cmd = "sphinx-build -b doctest docs/ docs/_build" + +document.cmd = "sphinx-build docs/ docs/_build/ -W --keep-going" +document-live.cmd = "sphinx-autobuild docs/ docs/_build/ --watch amaranth" +document-linkcheck.cmd = "sphinx-build docs/ docs/_linkcheck/ -b linkcheck" + +coverage-text.cmd = "python -m coverage report" +coverage-html.cmd = "python -m coverage html" +coverage-xml.cmd = "python -m coverage xml" + +extract-schemas.call = "amaranth.lib.meta:_extract_schemas('amaranth', base_uri='https://amaranth-lang.org/schema/amaranth')" [project] dynamic = ["version"] @@ -48,53 +95,10 @@ amaranth-rpc = "amaranth.rpc:main" requires = ["pdm-backend~=2.3.0"] build-backend = "pdm.backend" -[tool.pdm.build] -# If amaranth 0.3 is checked out with git (e.g. as a part of a persistent editable install or -# a git worktree cached by tools like poetry), it can have an empty `nmigen` directory left over, -# which causes a hard error because setuptools cannot determine the top-level package. -# Add a workaround to improve experience for people upgrading from old checkouts. -includes = ["amaranth/"] -source-includes = [ - ".gitignore", - ".coveragerc", - ".env.toolchain", - "CONTRIBUTING.txt", -] - -# Development workflow configuration - -[tool.pdm.dev-dependencies] -# This version requirement needs to be synchronized with the one in pyproject.toml above! +[dependency-groups] test = [ - "yowasp-yosys>=0.40", - "coverage", -] -docs = [ - "sphinx~=7.1", - "sphinxcontrib-platformpicker~=1.4", - "sphinxcontrib-yowasp-wavedrom==1.8", # exact version to avoid changes in rendering - "sphinx-rtd-theme~=2.0", - "sphinx-autobuild", -] -examples = [ - "amaranth-boards @ git+https://github.com/amaranth-lang/amaranth-boards.git" + "yowasp-yosys>=0.40", + "coverage", + "pytest>=8.4.1", ] - -[tool.pdm.scripts] -_.env_file = ".env.toolchain" - -test.composite = ["test-code", "test-docs", "coverage-xml"] -test-code.env = {PYTHONWARNINGS = "error"} -test-code.cmd = "python -m coverage run -m unittest discover -t . -s tests -v" -test-docs.cmd = "sphinx-build -b doctest docs/ docs/_build" - -document.cmd = "sphinx-build docs/ docs/_build/ -W --keep-going" -document-live.cmd = "sphinx-autobuild docs/ docs/_build/ --watch amaranth" -document-linkcheck.cmd = "sphinx-build docs/ docs/_linkcheck/ -b linkcheck" - -coverage-text.cmd = "python -m coverage report" -coverage-html.cmd = "python -m coverage html" -coverage-xml.cmd = "python -m coverage xml" - -extract-schemas.call = "amaranth.lib.meta:_extract_schemas('amaranth', base_uri='https://amaranth-lang.org/schema/amaranth')" diff --git a/tests/test_base.py b/tests/test_base.py deleted file mode 100644 index 4ac7e979e..000000000 --- a/tests/test_base.py +++ /dev/null @@ -1,45 +0,0 @@ -import unittest -from amaranth import * -from amaranth.sim import Tick, Simulator -from amaranth.sim._coverage import ToggleCoverageObserver - -class ToggleDUT(Elaboratable): - def __init__(self): - self.out = Signal(name="out") - - def elaborate(self, platform): - m = Module() - counter = Signal(2, name="counter") - m.d.sync += counter.eq(counter + 1) - m.d.comb += self.out.eq(counter[1]) - return m - - -class ToggleCoverageTest(unittest.TestCase): - def test_toggle_coverage(self): - dut = ToggleDUT() - sim = Simulator(dut) - - toggle_cov = ToggleCoverageObserver(sim._engine.state) - sim._engine.add_observer(toggle_cov) - - def process(): - for _ in range(8): - yield Tick() - sim._engine.notify_signal_change(dut.out) - - sim.add_clock(1e-6) - sim.add_testbench(process) - sim.run() - - results = toggle_cov.get_results() - print("Toggle coverage results:") - for signal_name, toggles in results.items(): - print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") - - self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") - self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_coverage.py b/tests/test_coverage.py new file mode 100644 index 000000000..66eb4e499 --- /dev/null +++ b/tests/test_coverage.py @@ -0,0 +1,81 @@ +import unittest +from amaranth import * +from amaranth.sim import Tick, Simulator +from amaranth.sim._coverage import ToggleCoverageObserver + +class ToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(2, name="counter") + m.d.sync += counter.eq(counter + 1) + m.d.comb += self.out.eq(counter[1]) + return m + +class IrregularToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(4, name="counter") + toggle = Signal() + + m.d.sync += counter.eq(counter + 1) + with m.If((counter == 1) | (counter == 3) | (counter == 6)): + m.d.sync += toggle.eq(~toggle) + m.d.comb += self.out.eq(toggle) + + return m + +class ToggleCoverageTest(unittest.TestCase): + def test_toggle_coverage_regular(self): + dut = ToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(16): + yield Tick() + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("[Regular] Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") + self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") + + def test_toggle_coverage_irregular(self): + dut = IrregularToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(16): + yield Tick() + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("[Irregular] Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") + self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") + self.assertGreaterEqual(results["out"]["0->1"], 1) + self.assertGreaterEqual(results["out"]["1->0"], 1) +