diff --git a/amaranth/sim/_base.py b/amaranth/sim/_base.py index 7e58112a4..f3a5b194c 100644 --- a/amaranth/sim/_base.py +++ b/amaranth/sim/_base.py @@ -1,4 +1,27 @@ -__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine"] +from abc import ABCMeta, abstractmethod + +__all__ = ["BaseProcess", "BaseSignalState", "BaseMemoryState", "BaseEngineState", "BaseEngine", "Observer", "DummyEngine", "PrintObserver"] + + +class Observer(metaclass=ABCMeta): + def __init__(self, fs_per_delta=0): + self._fs_per_delta = fs_per_delta + + @property + def fs_per_delta(self) -> int: + return self._fs_per_delta + + @abstractmethod + def update_signal(self, timestamp, signal): + ... + + @abstractmethod + def update_memory(self, timestamp, memory, addr): + ... + + @abstractmethod + def close(self, timestamp): + assert False class BaseProcess: @@ -62,6 +85,26 @@ def add_memory_waker(self, memory, waker): class BaseEngine: + # add storage for observers + def __init__(self): + self._observers = [] + + # append observer to list + def add_observer(self, observer: Observer): + self._observers.append(observer) + + def notify_signal_change(self, signal): + for observer in self._observers: + observer.update_signal(self.now, signal) + + def notify_memory_change(self, memory, addr): + for observer in self._observers: + observer.update_memory(self.now, memory, addr) + + def notify_close(self): + for observer in self._observers: + observer.close(self.now) + @property def state(self) -> BaseEngineState: raise NotImplementedError # :nocov: @@ -97,5 +140,45 @@ def step_design(self): def advance(self): raise NotImplementedError # :nocov: - def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta): + def observe(self, observer: Observer): raise NotImplementedError # :nocov: + +class DummyEngine(BaseEngine): + def __init__(self): + super().__init__() + self._now = 0 + + @property + def now(self): + return self._now + + def notify_signal_change(self, signal): + for obs in self._observers: + obs.update_signal(self.now, signal) + + def notify_memory_change(self, memory, addr): + for obs in self._observers: + obs.update_memory(self.now, memory, addr) + + def notify_close(self): + for obs in self._observers: + obs.close(self.now) + + +class PrintObserver(Observer): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @property + def fs_per_delta(self) -> int: + return 1 + + def update_signal(self, timestamp, signal): + print(f"[{timestamp}] Signal changed: {signal}") + + def update_memory(self, timestamp, memory, addr): + print(f"[{timestamp}] Memory write at {addr}") + + def close(self, timestamp): + print(f"[{timestamp}] Simulation ended") diff --git a/amaranth/sim/_coverage.py b/amaranth/sim/_coverage.py new file mode 100644 index 000000000..a5c2d99e3 --- /dev/null +++ b/amaranth/sim/_coverage.py @@ -0,0 +1,59 @@ +from ._base import Observer +from amaranth.sim._vcdwriter import eval_value, eval_format + +class ToggleCoverageObserver(Observer): + def __init__(self, state, **kwargs): + self.state = state + self._prev_values = {} + self._toggles = {} + self._signal_names = {} + super().__init__(**kwargs) + + def update_signal(self, timestamp, signal): + if getattr(signal, "name", "") != "out": + return + + sig_id = id(signal) + try: + val = eval_value(self.state, signal) + except Exception: + val = int(self.state.get_signal(signal)) + try: + curr_val = int(val) + except TypeError: + curr_val = val + print(f"[DEBUG] Signal {getattr(signal, 'name', signal)} = {curr_val}") + + if sig_id not in self._prev_values: + self._prev_values[sig_id] = curr_val + self._toggles[sig_id] = {"0->1": 0, "1->0": 0} + self._signal_names[sig_id] = signal.name + return + + prev_val = self._prev_values[sig_id] + + if prev_val == 0 and curr_val == 1: + self._toggles[sig_id]["0->1"] += 1 + elif prev_val == 1 and curr_val == 0: + self._toggles[sig_id]["1->0"] += 1 + + self._prev_values[sig_id] = curr_val + + def update_memory(self, timestamp, memory, addr): + pass + + def get_results(self): + return { + self._signal_names[sig_id]: toggles + for sig_id, toggles in self._toggles.items() + } + + def close(self, timestamp): + results = self.get_results() + print("=== Toggle Coverage Report ===") + for signal, toggles in results.items(): + print(f"{signal}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + + + diff --git a/amaranth/sim/_vcdwriter.py b/amaranth/sim/_vcdwriter.py new file mode 100644 index 000000000..7627e5fe2 --- /dev/null +++ b/amaranth/sim/_vcdwriter.py @@ -0,0 +1,313 @@ +import enum as py_enum +import itertools +import os.path +import re +import warnings + +from contextlib import contextmanager +from typing import IO + +from ..hdl import * +from ..hdl._mem import MemoryInstance +from ..hdl._ast import SignalDict +from ..lib import data, wiring +from ._base import * +from ._async import * +from ._pyeval import eval_format, eval_value, eval_assign +from ._pyrtl import _FragmentCompiler +from ._pyclock import PyClockProcess + + +class _VCDWriter(Observer): + @staticmethod + def decode_to_vcd(format, value): + return format.format(value).expandtabs().replace(" ", "_") + + def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), **kwargs): + super().__init__(**kwargs) + self.state = state + + # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that + # the simulator is still usable if it's not installed for some reason. + import vcd, vcd.gtkw + + self.close_vcd = False + self.close_gtkw = False + if isinstance(vcd_file, str): + vcd_file = open(vcd_file, "w") + self.close_vcd = True + if isinstance(gtkw_file, str): + gtkw_file = open(gtkw_file, "w") + self.close_gtkw = True + + self.vcd_signal_vars = SignalDict() + self.vcd_memory_vars = {} + self.vcd_file = vcd_file + self.vcd_writer = vcd_file and vcd.VCDWriter(self.vcd_file, + timescale="1 fs", comment="Generated by Amaranth") + + self.gtkw_signal_names = SignalDict() + self.gtkw_memory_names = {} + self.gtkw_file = gtkw_file + self.gtkw_save = gtkw_file and vcd.gtkw.GTKWSave(self.gtkw_file) + + self.traces = traces + + signal_names = SignalDict() + memories = {} + for fragment, fragment_info in design.fragments.items(): + fragment_name = ("bench", *fragment_info.name) + for signal, signal_name in fragment_info.signal_names.items(): + if signal not in signal_names: + signal_names[signal] = set() + signal_names[signal].add((*fragment_name, signal_name)) + if isinstance(fragment, MemoryInstance): + memories[fragment._data] = fragment_name + + trace_names = SignalDict() + assigned_names = set() + def traverse_traces(traces): + if isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(trace, MemoryData._Row): + memory = trace._memory + if not memory in memories: + if memory.name not in assigned_names: + name = memory.name + else: + name = f"{memory.name}${len(assigned_names)}" + assert name not in assigned_names + memories[memory] = ("bench", name) + assigned_names.add(name) + else: + for trace_signal in trace._rhs_signals(): + if trace_signal not in signal_names: + if trace_signal.name not in assigned_names: + name = trace_signal.name + else: + name = f"{trace_signal.name}${len(assigned_names)}" + assert name not in assigned_names + trace_names[trace_signal] = {("bench", name)} + assigned_names.add(name) + elif isinstance(traces, MemoryData): + if not traces in memories: + if traces.name not in assigned_names: + name = traces.name + else: + name = f"{traces.name}${len(assigned_names)}" + assert name not in assigned_names + memories[traces] = ("bench", name) + assigned_names.add(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + for name in traces.signature.members: + traverse_traces(getattr(traces, name)) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for trace in traces.values(): + traverse_traces(trace) + else: + raise TypeError(f"{traces!r} is not a traceable object") + traverse_traces(traces) + + if self.vcd_writer is None: + return + + for signal, names in itertools.chain(signal_names.items(), trace_names.items()): + self.vcd_signal_vars[signal] = [] + self.gtkw_signal_names[signal] = [] + + def add_var(path, var_type, var_size, var_init, value): + vcd_var = None + for (*var_scope, var_name) in names: + if re.search(r"[ \t\r\n]", var_name): + raise NameError("Signal '{}.{}' contains a whitespace character" + .format(".".join(var_scope), var_name)) + + field_name = var_name + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + if path: + field_name = "\\" + field_name + + if vcd_var is None: + vcd_var = self.vcd_writer.register_var( + scope=var_scope, name=field_name, + var_type=var_type, size=var_size, init=var_init) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + self.gtkw_signal_names[signal].append( + ".".join((*var_scope, field_name)) + suffix) + else: + self.vcd_writer.register_alias( + scope=var_scope, name=field_name, + var=vcd_var) + + self.vcd_signal_vars[signal].append((vcd_var, value)) + + def add_wire_var(path, value): + add_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_format_var(path, fmt): + add_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_wire_var(path, fmt._chunks[0][0]) + else: + add_format_var(path, fmt) + + if signal._decoder is not None and not isinstance(signal._decoder, py_enum.EnumMeta): + add_var((), "string", 1, signal._decoder(signal._init), signal._decoder) + else: + add_format((), signal._format) + + for memory, memory_name in memories.items(): + self.vcd_memory_vars[memory] = vcd_vars = [] + self.gtkw_memory_names[memory] = gtkw_names = [] + + for idx, row in enumerate(memory): + row_vcd_vars = [] + row_gtkw_names = [] + var_scope = memory_name[:-1] + + def add_mem_var(path, var_type, var_size, var_init, value): + field_name = "\\" + memory_name[-1] + f"[{idx}]" + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + row_vcd_vars.append((self.vcd_writer.register_var( + scope=var_scope, name=field_name, var_type=var_type, + size=var_size, init=var_init + ), value)) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + row_gtkw_names.append(".".join((*var_scope, field_name)) + suffix) + + def add_mem_wire_var(path, value): + add_mem_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_mem_format_var(path, fmt): + add_mem_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_mem_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_mem_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_mem_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_mem_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_mem_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_mem_wire_var(path, fmt._chunks[0][0]) + else: + add_mem_format_var(path, fmt) + + if isinstance(memory._shape, ShapeCastable): + fmt = memory._shape.format(memory._shape(row), "") + add_mem_format((), fmt) + else: + add_mem_wire_var((), row) + + vcd_vars.append(row_vcd_vars) + gtkw_names.append(row_gtkw_names) + + @property + def fs_per_delta(self) -> int: + return self._fs_per_delta + + def update_signal(self, timestamp, signal): + for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()): + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + elif isinstance(repr, (Format, Format.Enum)): + var_value = eval_format(self.state, repr) + else: + # decoder + var_value = repr(eval_value(self.state, signal)) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def update_memory(self, timestamp, memory, addr): + if memory not in self.vcd_memory_vars: + return + for vcd_var, repr in self.vcd_memory_vars[memory][addr]: + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + else: + var_value = eval_format(self.state, repr) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def close(self, timestamp): + if self.vcd_writer is not None: + self.vcd_writer.close(timestamp) + + if self.gtkw_save is not None: + self.gtkw_save.dumpfile(self.vcd_file.name) + self.gtkw_save.dumpfile_size(self.vcd_file.tell()) + + self.gtkw_save.treeopen("top") + + def traverse_traces(traces): + if isinstance(traces, data.View): + with self.gtkw_save.group("view"): + traverse_traces(Value.cast(traces)) + elif isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(traces, MemoryData._Row): + for name in self.gtkw_memory_names[traces._memory][traces._index]: + self.gtkw_save.trace(name) + else: + for trace_signal in trace._rhs_signals(): + for name in self.gtkw_signal_names[trace_signal]: + self.gtkw_save.trace(name) + elif isinstance(traces, MemoryData): + for row_names in self.gtkw_memory_names[traces]: + for name in row_names: + self.gtkw_save.trace(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + with self.gtkw_save.group("interface"): + for _, _, member in traces.signature.flatten(traces): + traverse_traces(member) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for name, trace in traces.items(): + with self.gtkw_save.group(name): + traverse_traces(trace) + else: + assert False # :nocov: + + traverse_traces(self.traces) + + if self.close_vcd: + self.vcd_file.close() + if self.close_gtkw: + self.gtkw_file.close() + + diff --git a/amaranth/sim/pysim.py b/amaranth/sim/pysim.py index d98ef7703..eccef7d00 100644 --- a/amaranth/sim/pysim.py +++ b/amaranth/sim/pysim.py @@ -1,4 +1,4 @@ -from contextlib import contextmanager +from contextlib import contextmanager, closing import itertools import re import os.path @@ -13,299 +13,11 @@ from ._pyeval import eval_format, eval_value, eval_assign from ._pyrtl import _FragmentCompiler from ._pyclock import PyClockProcess - +from ._vcdwriter import _VCDWriter __all__ = ["PySimEngine"] -class _VCDWriter: - @staticmethod - def decode_to_vcd(format, value): - return format.format(value).expandtabs().replace(" ", "_") - - def __init__(self, state, design, *, vcd_file, gtkw_file=None, traces=(), fs_per_delta=0): - self.state = state - self.fs_per_delta = fs_per_delta - - # Although pyvcd is a mandatory dependency, be resilient and import it as needed, so that - # the simulator is still usable if it's not installed for some reason. - import vcd, vcd.gtkw - - self.close_vcd = False - self.close_gtkw = False - if isinstance(vcd_file, str): - vcd_file = open(vcd_file, "w") - self.close_vcd = True - if isinstance(gtkw_file, str): - gtkw_file = open(gtkw_file, "w") - self.close_gtkw = True - - self.vcd_signal_vars = SignalDict() - self.vcd_memory_vars = {} - self.vcd_file = vcd_file - self.vcd_writer = vcd_file and vcd.VCDWriter(self.vcd_file, - timescale="1 fs", comment="Generated by Amaranth") - - self.gtkw_signal_names = SignalDict() - self.gtkw_memory_names = {} - self.gtkw_file = gtkw_file - self.gtkw_save = gtkw_file and vcd.gtkw.GTKWSave(self.gtkw_file) - - self.traces = traces - - signal_names = SignalDict() - memories = {} - for fragment, fragment_info in design.fragments.items(): - fragment_name = ("bench", *fragment_info.name) - for signal, signal_name in fragment_info.signal_names.items(): - if signal not in signal_names: - signal_names[signal] = set() - signal_names[signal].add((*fragment_name, signal_name)) - if isinstance(fragment, MemoryInstance): - memories[fragment._data] = fragment_name - - trace_names = SignalDict() - assigned_names = set() - def traverse_traces(traces): - if isinstance(traces, ValueLike): - trace = Value.cast(traces) - if isinstance(trace, MemoryData._Row): - memory = trace._memory - if not memory in memories: - if memory.name not in assigned_names: - name = memory.name - else: - name = f"{memory.name}${len(assigned_names)}" - assert name not in assigned_names - memories[memory] = ("bench", name) - assigned_names.add(name) - else: - for trace_signal in trace._rhs_signals(): - if trace_signal not in signal_names: - if trace_signal.name not in assigned_names: - name = trace_signal.name - else: - name = f"{trace_signal.name}${len(assigned_names)}" - assert name not in assigned_names - trace_names[trace_signal] = {("bench", name)} - assigned_names.add(name) - elif isinstance(traces, MemoryData): - if not traces in memories: - if traces.name not in assigned_names: - name = traces.name - else: - name = f"{traces.name}${len(assigned_names)}" - assert name not in assigned_names - memories[traces] = ("bench", name) - assigned_names.add(name) - elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): - for name in traces.signature.members: - traverse_traces(getattr(traces, name)) - elif isinstance(traces, list) or isinstance(traces, tuple): - for trace in traces: - traverse_traces(trace) - elif isinstance(traces, dict): - for trace in traces.values(): - traverse_traces(trace) - else: - raise TypeError(f"{traces!r} is not a traceable object") - traverse_traces(traces) - - if self.vcd_writer is None: - return - - for signal, names in itertools.chain(signal_names.items(), trace_names.items()): - self.vcd_signal_vars[signal] = [] - self.gtkw_signal_names[signal] = [] - - def add_var(path, var_type, var_size, var_init, value): - vcd_var = None - for (*var_scope, var_name) in names: - if re.search(r"[ \t\r\n]", var_name): - raise NameError("Signal '{}.{}' contains a whitespace character" - .format(".".join(var_scope), var_name)) - - field_name = var_name - for item in path: - if isinstance(item, int): - field_name += f"[{item}]" - else: - field_name += f".{item}" - if path: - field_name = "\\" + field_name - - if vcd_var is None: - vcd_var = self.vcd_writer.register_var( - scope=var_scope, name=field_name, - var_type=var_type, size=var_size, init=var_init) - if var_size > 1: - suffix = f"[{var_size - 1}:0]" - else: - suffix = "" - self.gtkw_signal_names[signal].append( - ".".join((*var_scope, field_name)) + suffix) - else: - self.vcd_writer.register_alias( - scope=var_scope, name=field_name, - var=vcd_var) - - self.vcd_signal_vars[signal].append((vcd_var, value)) - - def add_wire_var(path, value): - add_var(path, "wire", len(value), eval_value(self.state, value), value) - - def add_format_var(path, fmt): - add_var(path, "string", 1, eval_format(self.state, fmt), fmt) - - def add_format(path, fmt): - if isinstance(fmt, Format.Struct): - add_wire_var(path, fmt._value) - for name, subfmt in fmt._fields.items(): - add_format(path + (name,), subfmt) - elif isinstance(fmt, Format.Array): - add_wire_var(path, fmt._value) - for idx, subfmt in enumerate(fmt._fields): - add_format(path + (idx,), subfmt) - elif (isinstance(fmt, Format) and - len(fmt._chunks) == 1 and - isinstance(fmt._chunks[0], tuple) and - fmt._chunks[0][1] == ""): - add_wire_var(path, fmt._chunks[0][0]) - else: - add_format_var(path, fmt) - - if signal._decoder is not None and not isinstance(signal._decoder, py_enum.EnumMeta): - add_var((), "string", 1, signal._decoder(signal._init), signal._decoder) - else: - add_format((), signal._format) - - for memory, memory_name in memories.items(): - self.vcd_memory_vars[memory] = vcd_vars = [] - self.gtkw_memory_names[memory] = gtkw_names = [] - - for idx, row in enumerate(memory): - row_vcd_vars = [] - row_gtkw_names = [] - var_scope = memory_name[:-1] - - def add_mem_var(path, var_type, var_size, var_init, value): - field_name = "\\" + memory_name[-1] + f"[{idx}]" - for item in path: - if isinstance(item, int): - field_name += f"[{item}]" - else: - field_name += f".{item}" - row_vcd_vars.append((self.vcd_writer.register_var( - scope=var_scope, name=field_name, var_type=var_type, - size=var_size, init=var_init - ), value)) - if var_size > 1: - suffix = f"[{var_size - 1}:0]" - else: - suffix = "" - row_gtkw_names.append(".".join((*var_scope, field_name)) + suffix) - - def add_mem_wire_var(path, value): - add_mem_var(path, "wire", len(value), eval_value(self.state, value), value) - - def add_mem_format_var(path, fmt): - add_mem_var(path, "string", 1, eval_format(self.state, fmt), fmt) - - def add_mem_format(path, fmt): - if isinstance(fmt, Format.Struct): - add_mem_wire_var(path, fmt._value) - for name, subfmt in fmt._fields.items(): - add_mem_format(path + (name,), subfmt) - elif isinstance(fmt, Format.Array): - add_mem_wire_var(path, fmt._value) - for idx, subfmt in enumerate(fmt._fields): - add_mem_format(path + (idx,), subfmt) - elif (isinstance(fmt, Format) and - len(fmt._chunks) == 1 and - isinstance(fmt._chunks[0], tuple) and - fmt._chunks[0][1] == ""): - add_mem_wire_var(path, fmt._chunks[0][0]) - else: - add_mem_format_var(path, fmt) - - if isinstance(memory._shape, ShapeCastable): - fmt = memory._shape.format(memory._shape(row), "") - add_mem_format((), fmt) - else: - add_mem_wire_var((), row) - - vcd_vars.append(row_vcd_vars) - gtkw_names.append(row_gtkw_names) - - def update_signal(self, timestamp, signal): - for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()): - if isinstance(repr, Value): - var_value = eval_value(self.state, repr) - elif isinstance(repr, (Format, Format.Enum)): - var_value = eval_format(self.state, repr) - else: - # decoder - var_value = repr(eval_value(self.state, signal)) - self.vcd_writer.change(vcd_var, timestamp, var_value) - - def update_memory(self, timestamp, memory, addr): - if memory not in self.vcd_memory_vars: - return - for vcd_var, repr in self.vcd_memory_vars[memory][addr]: - if isinstance(repr, Value): - var_value = eval_value(self.state, repr) - else: - var_value = eval_format(self.state, repr) - self.vcd_writer.change(vcd_var, timestamp, var_value) - - def close(self, timestamp): - if self.vcd_writer is not None: - self.vcd_writer.close(timestamp) - - if self.gtkw_save is not None: - self.gtkw_save.dumpfile(self.vcd_file.name) - self.gtkw_save.dumpfile_size(self.vcd_file.tell()) - - self.gtkw_save.treeopen("top") - - def traverse_traces(traces): - if isinstance(traces, data.View): - with self.gtkw_save.group("view"): - traverse_traces(Value.cast(traces)) - elif isinstance(traces, ValueLike): - trace = Value.cast(traces) - if isinstance(traces, MemoryData._Row): - for name in self.gtkw_memory_names[traces._memory][traces._index]: - self.gtkw_save.trace(name) - else: - for trace_signal in trace._rhs_signals(): - for name in self.gtkw_signal_names[trace_signal]: - self.gtkw_save.trace(name) - elif isinstance(traces, MemoryData): - for row_names in self.gtkw_memory_names[traces]: - for name in row_names: - self.gtkw_save.trace(name) - elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): - with self.gtkw_save.group("interface"): - for _, _, member in traces.signature.flatten(traces): - traverse_traces(member) - elif isinstance(traces, list) or isinstance(traces, tuple): - for trace in traces: - traverse_traces(trace) - elif isinstance(traces, dict): - for name, trace in traces.items(): - with self.gtkw_save.group(name): - traverse_traces(trace) - else: - assert False # :nocov: - traverse_traces(self.traces) - - if self.close_vcd: - self.vcd_file.close() - if self.close_gtkw: - self.gtkw_file.close() - - class _PyTimeline: def __init__(self): self.now = 0 @@ -607,7 +319,7 @@ def __init__(self, design): self._processes = _FragmentCompiler(self._state)(self._design.fragment) self._testbenches = [] self._delta_cycles = 0 - self._vcd_writers = [] + self._observers = [] self._active_triggers = set() @property @@ -658,7 +370,7 @@ def step_design(self): # Performs the three phases of a delta cycle in a loop: converged = False while not converged: - changed = set() if self._vcd_writers else None + changed = set() if self._observers else None # 1a. trigger: run every active trigger, sampling values and waking up processes; for trigger_state in self._active_triggers: @@ -677,15 +389,15 @@ def step_design(self): # 2. commit: apply queued signal changes, activating any awaited triggers. converged = self._state.commit(changed) - for vcd_writer in self._vcd_writers: - now_plus_deltas = self._now_plus_deltas(vcd_writer.fs_per_delta) + for observer in self._observers: + now_plus_deltas = self._now_plus_deltas(observer.fs_per_delta) for change in changed: if type(change) is _PySignalState: signal_state = change - vcd_writer.update_signal(now_plus_deltas, + observer.update_signal(now_plus_deltas, signal_state.signal) elif type(change) is _PyMemoryChange: - vcd_writer.update_memory(now_plus_deltas, change.state.memory, + observer.update_memory(now_plus_deltas, change.state.memory, change.addr) else: assert False # :nocov: @@ -721,12 +433,17 @@ def advance(self): return False @contextmanager - def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta): - vcd_writer = _VCDWriter(self._state, self._design, - vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces, fs_per_delta=fs_per_delta) + def observe(self, observer: Observer): try: - self._vcd_writers.append(vcd_writer) + self._observers.append(observer) yield finally: - vcd_writer.close(self._now_plus_deltas(vcd_writer.fs_per_delta)) - self._vcd_writers.remove(vcd_writer) + observer.close(self._now_plus_deltas(observer.fs_per_delta)) + self._observers.remove(observer) + + @contextmanager + def write_vcd(self, *, vcd_file, gtkw_file, traces, fs_per_delta): + observer = _VCDWriter(self._state, self._design, vcd_file=vcd_file, gtkw_file=gtkw_file, + traces=traces, fs_per_delta=fs_per_delta) + with self.observe(observer): + yield diff --git a/amaranth/sim/test_vcd_writer.py b/amaranth/sim/test_vcd_writer.py new file mode 100644 index 000000000..cb83035d1 --- /dev/null +++ b/amaranth/sim/test_vcd_writer.py @@ -0,0 +1,31 @@ +from amaranth import Elaboratable, Module, Signal +from amaranth.sim import Simulator + +class Top(Elaboratable): + def __init__(self): + self.a = Signal() + + def elaborate(self, platform): + m = Module() + count = Signal(4) + m.d.sync += [ + count.eq(count + 1), + self.a.eq(count[-1]) + ] + return m + +# Create design and simulator +dut = Top() +sim = Simulator(dut) +sim.add_clock(1e-6) # 1 MHz + +def process(): + for _ in range(10): + yield + +sim.add_sync_process(process) + +# Write VCD output +with open("test_output.vcd", "w") as vcd_file: + with sim.write_vcd(vcd_file=vcd_file, gtkw_file=None, traces=[dut.a]): + sim.run() diff --git a/amaranth/sim/trace.py b/amaranth/sim/trace.py new file mode 100644 index 000000000..2df6e77f6 --- /dev/null +++ b/amaranth/sim/trace.py @@ -0,0 +1,276 @@ +import itertools +import re +import enum as py_enum + +import nanots + +from ..hdl import * +from ..hdl._mem import MemoryInstance +from ..hdl._ast import SignalDict +from ..lib import data, wiring +from ._base import * +from ._async import * + + +class TimeSeriesWriter(Observer): + def __init__(self, state, design, *, db_filename, traces=(), **kwargs): + super().__init__(**kwargs) + + self.state = state + + self._signal_vars = SignalDict() + self._memory_vars = {} + + self.traces = traces + + self.writer = nanots.Writer(db_filename, auto_reclaim=False) + + signal_names = SignalDict() + memories = {} + for fragment, fragment_info in design.fragments.items(): + fragment_name = ("bench", *fragment_info.name) + for signal, signal_name in fragment_info.signal_names.items(): + if signal not in signal_names: + signal_names[signal] = set() + signal_names[signal].add((*fragment_name, signal_name)) + if isinstance(fragment, MemoryInstance): + memories[fragment._data] = fragment_name + + trace_names = SignalDict() + assigned_names = set() + def traverse_traces(traces): + if isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(trace, MemoryData._Row): + memory = trace._memory + if not memory in memories: + if memory.name not in assigned_names: + name = memory.name + else: + name = f"{memory.name}${len(assigned_names)}" + assert name not in assigned_names + memories[memory] = ("bench", name) + assigned_names.add(name) + else: + for trace_signal in trace._rhs_signals(): + if trace_signal and trace_signal not in signal_names: + if trace_signal.name not in assigned_names: + name = trace_signal.name + else: + name = f"{trace_signal.name}${len(assigned_names)}" + assert name not in assigned_names + trace_names[trace_signal] = {("bench", name)} + assigned_names.add(name) + elif isinstance(traces, MemoryData): + if not traces in memories: + if traces.name not in assigned_names: + name = traces.name + else: + name = f"{traces.name}${len(assigned_names)}" + assert name not in assigned_names + memories[traces] = ("bench", name) + assigned_names.add(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + for name in traces.signature.members: + traverse_traces(getattr(traces, name)) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for trace in traces.values(): + traverse_traces(trace) + else: + raise TypeError(f"{traces!r} is not a traceable object") + traverse_traces(traces) + + for signal, names in itertools.chain(signal_names.items(), trace_names.items()): + self._signal_vars[signal] = [] + + def add_var(path, var_type, var_size, var_init, value): + vcd_var = None + for (*var_scope, var_name) in names: + if re.search(r"[ \t\r\n]", var_name): + raise NameError("Signal '{}.{}' contains a whitespace character" + .format(".".join(var_scope), var_name)) + + field_name = var_name + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + if path: + field_name = "\\" + field_name + + if vcd_var is None: + context = writer.create_context(f"{var_scope}@{field_name}" + scope=var_scope, name=field_name, + var_type=var_type, size=var_size, init=var_init) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + self._signal_names[signal].append( + ".".join((*var_scope, field_name)) + suffix) + else: + self.vcd_writer.register_alias( + scope=var_scope, name=field_name, + var=vcd_var) + + self.vcd_signal_vars[signal].append((vcd_var, value)) + + def add_wire_var(path, value): + add_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_format_var(path, fmt): + add_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_wire_var(path, fmt._chunks[0][0]) + else: + add_format_var(path, fmt) + + if signal._decoder is not None and not isinstance(signal._decoder, py_enum.EnumMeta): + add_var((), "string", 1, signal._decoder(signal._init), signal._decoder) + else: + add_format((), signal._format) + + for memory, memory_name in memories.items(): + self.vcd_memory_vars[memory] = vcd_vars = [] + self.gtkw_memory_names[memory] = gtkw_names = [] + + for idx, row in enumerate(memory): + row_vcd_vars = [] + row_gtkw_names = [] + var_scope = memory_name[:-1] + + def add_mem_var(path, var_type, var_size, var_init, value): + field_name = "\\" + memory_name[-1] + f"[{idx}]" + for item in path: + if isinstance(item, int): + field_name += f"[{item}]" + else: + field_name += f".{item}" + row_vcd_vars.append((self.vcd_writer.register_var( + scope=var_scope, name=field_name, var_type=var_type, + size=var_size, init=var_init + ), value)) + if var_size > 1: + suffix = f"[{var_size - 1}:0]" + else: + suffix = "" + row_gtkw_names.append(".".join((*var_scope, field_name)) + suffix) + + def add_mem_wire_var(path, value): + add_mem_var(path, "wire", len(value), eval_value(self.state, value), value) + + def add_mem_format_var(path, fmt): + add_mem_var(path, "string", 1, eval_format(self.state, fmt), fmt) + + def add_mem_format(path, fmt): + if isinstance(fmt, Format.Struct): + add_mem_wire_var(path, fmt._value) + for name, subfmt in fmt._fields.items(): + add_mem_format(path + (name,), subfmt) + elif isinstance(fmt, Format.Array): + add_mem_wire_var(path, fmt._value) + for idx, subfmt in enumerate(fmt._fields): + add_mem_format(path + (idx,), subfmt) + elif (isinstance(fmt, Format) and + len(fmt._chunks) == 1 and + isinstance(fmt._chunks[0], tuple) and + fmt._chunks[0][1] == ""): + add_mem_wire_var(path, fmt._chunks[0][0]) + else: + add_mem_format_var(path, fmt) + + if isinstance(memory._shape, ShapeCastable): + fmt = memory._shape.format(memory._shape(row), "") + add_mem_format((), fmt) + else: + add_mem_wire_var((), row) + + vcd_vars.append(row_vcd_vars) + gtkw_names.append(row_gtkw_names) + + def update_signal(self, timestamp, signal): + for (vcd_var, repr) in self.vcd_signal_vars.get(signal, ()): + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + elif isinstance(repr, (Format, Format.Enum)): + var_value = eval_format(self.state, repr) + else: + # decoder + var_value = repr(eval_value(self.state, signal)) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def update_memory(self, timestamp, memory, addr): + if memory not in self.vcd_memory_vars: + return + for vcd_var, repr in self.vcd_memory_vars[memory][addr]: + if isinstance(repr, Value): + var_value = eval_value(self.state, repr) + else: + var_value = eval_format(self.state, repr) + self.vcd_writer.change(vcd_var, timestamp, var_value) + + def close(self, timestamp): + if self.vcd_writer is not None: + self.vcd_writer.close(timestamp) + + if self.gtkw_save is not None: + self.gtkw_save.dumpfile(self.vcd_file.name) + self.gtkw_save.dumpfile_size(self.vcd_file.tell()) + + self.gtkw_save.treeopen("top") + + def traverse_traces(traces): + if isinstance(traces, data.View): + with self.gtkw_save.group("view"): + traverse_traces(Value.cast(traces)) + elif isinstance(traces, ValueLike): + trace = Value.cast(traces) + if isinstance(traces, MemoryData._Row): + for name in self.gtkw_memory_names[traces._memory][traces._index]: + self.gtkw_save.trace(name) + else: + for trace_signal in trace._rhs_signals(): + for name in self.gtkw_signal_names[trace_signal]: + self.gtkw_save.trace(name) + elif isinstance(traces, MemoryData): + for row_names in self.gtkw_memory_names[traces]: + for name in row_names: + self.gtkw_save.trace(name) + elif hasattr(traces, "signature") and isinstance(traces.signature, wiring.Signature): + with self.gtkw_save.group("interface"): + for _, _, member in traces.signature.flatten(traces): + traverse_traces(member) + elif isinstance(traces, list) or isinstance(traces, tuple): + for trace in traces: + traverse_traces(trace) + elif isinstance(traces, dict): + for name, trace in traces.items(): + with self.gtkw_save.group(name): + traverse_traces(trace) + else: + assert False # :nocov: + traverse_traces(self.traces) + + if self.close_vcd: + self.vcd_file.close() + if self.close_gtkw: + self.gtkw_file.close() + + diff --git a/pdm_build.py b/pdm_build.py index 3ad2682e4..c6215ccac 100644 --- a/pdm_build.py +++ b/pdm_build.py @@ -4,15 +4,25 @@ from pdm.backend._vendor.packaging.version import Version +# def format_version(version: SCMVersion) -> str: +# major, minor, patch = (int(n) for n in str(version.version).split(".")[:3]) +# dirty = f"+{datetime.utcnow():%Y%m%d.%H%M%S}" if version.dirty else "" +# if version.distance is None: +# return f"{major}.{minor}.{patch}{dirty}" +# else: +# return f"{major}.{minor}.{patch}.dev{version.distance}{dirty}" + def format_version(version: SCMVersion) -> str: - major, minor, patch = (int(n) for n in str(version.version).split(".")[:3]) + parts = str(version.version).split(".") + major = int(parts[0]) + minor = int(parts[1]) if len(parts) > 1 else 0 + patch = int(parts[2]) if len(parts) > 2 else 0 dirty = f"+{datetime.utcnow():%Y%m%d.%H%M%S}" if version.dirty else "" if version.distance is None: return f"{major}.{minor}.{patch}{dirty}" else: return f"{major}.{minor}.{patch}.dev{version.distance}{dirty}" - def pdm_build_initialize(context): version = Version(context.config.metadata["version"]) diff --git a/pyproject.toml b/pyproject.toml index 60a1ea849..546e895a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,51 +1,10 @@ # Project metadata + [tool.pdm.version] source = "scm" version_format = "pdm_build:format_version" -[project] -dynamic = ["version"] - -name = "amaranth" -description = "Amaranth hardware definition language" -readme = "README.md" -authors = [{name = "Amaranth HDL contributors"}] -license = { text = "BSD-2-clause" } - -requires-python = "~=3.9" -dependencies = [ - "jschon~=0.11.1", # for amaranth.lib.meta - "pyvcd>=0.2.2,<0.5", # for amaranth.sim.pysim - "Jinja2~=3.0", # for amaranth.build -] - -[project.optional-dependencies] -# This version requirement needs to be synchronized with: -# - pyproject.toml: tool.pdm.dev-dependencies.test -# - amaranth/back/verilog.py: _convert_rtlil_text -# - docs/install.rst: yosys-version -builtin-yosys = ["amaranth-yosys>=0.40"] -remote-build = ["paramiko~=2.7"] - -[project.scripts] -amaranth-rpc = "amaranth.rpc:main" - -[project.entry-points."amaranth.lib.meta"] -"0.5/component.json" = "amaranth.lib.wiring:ComponentMetadata" - -[project.urls] -"Homepage" = "https://amaranth-lang.org/" -"Documentation" = "https://amaranth-lang.org/docs/amaranth/" # modified in pdm_build.py -"Source Code" = "https://github.com/amaranth-lang/amaranth" -"Bug Tracker" = "https://github.com/amaranth-lang/amaranth/issues" - -# Build system configuration - -[build-system] -requires = ["pdm-backend~=2.3.0"] -build-backend = "pdm.backend" - [tool.pdm.build] # If amaranth 0.3 is checked out with git (e.g. as a part of a persistent editable install or # a git worktree cached by tools like poetry), it can have an empty `nmigen` directory left over, @@ -64,10 +23,6 @@ source-includes = [ [tool.pdm.dev-dependencies] # This version requirement needs to be synchronized with the one in pyproject.toml above! -test = [ - "yowasp-yosys>=0.40", - "coverage", -] docs = [ "sphinx~=7.1", "sphinxcontrib-platformpicker~=1.4", @@ -96,3 +51,54 @@ coverage-html.cmd = "python -m coverage html" coverage-xml.cmd = "python -m coverage xml" extract-schemas.call = "amaranth.lib.meta:_extract_schemas('amaranth', base_uri='https://amaranth-lang.org/schema/amaranth')" +[project] +dynamic = ["version"] + +name = "amaranth" +description = "Amaranth hardware definition language" +readme = "README.md" +authors = [{name = "Amaranth HDL contributors"}] +license = { text = "BSD-2-clause" } + +requires-python = "~=3.9" +dependencies = [ + "jschon>=0.11.1", # for amaranth.lib.meta + "pyvcd>=0.2.2,<0.5", # for amaranth.sim.pysim + "Jinja2~=3.0", # for amaranth.build + "pystore>=0.1.24", + "pytest>=8.4.1", +] + +[project.optional-dependencies] +# This version requirement needs to be synchronized with: +# - pyproject.toml: tool.pdm.dev-dependencies.test +# - amaranth/back/verilog.py: _convert_rtlil_text +# - docs/install.rst: yosys-version +builtin-yosys = ["amaranth-yosys>=0.40"] +remote-build = ["paramiko~=2.7"] + +[project.scripts] +amaranth-rpc = "amaranth.rpc:main" + +[project.entry-points."amaranth.lib.meta"] +"0.5/component.json" = "amaranth.lib.wiring:ComponentMetadata" + +[project.urls] +"Homepage" = "https://amaranth-lang.org/" +"Documentation" = "https://amaranth-lang.org/docs/amaranth/" # modified in pdm_build.py +"Source Code" = "https://github.com/amaranth-lang/amaranth" +"Bug Tracker" = "https://github.com/amaranth-lang/amaranth/issues" + +# Build system configuration + +[build-system] +requires = ["pdm-backend~=2.3.0"] +build-backend = "pdm.backend" + + +[dependency-groups] +test = [ + "yowasp-yosys>=0.40", + "coverage", + "pytest>=8.4.1", +] diff --git a/tests/test_coverage.py b/tests/test_coverage.py new file mode 100644 index 000000000..66eb4e499 --- /dev/null +++ b/tests/test_coverage.py @@ -0,0 +1,81 @@ +import unittest +from amaranth import * +from amaranth.sim import Tick, Simulator +from amaranth.sim._coverage import ToggleCoverageObserver + +class ToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(2, name="counter") + m.d.sync += counter.eq(counter + 1) + m.d.comb += self.out.eq(counter[1]) + return m + +class IrregularToggleDUT(Elaboratable): + def __init__(self): + self.out = Signal(name="out") + + def elaborate(self, platform): + m = Module() + counter = Signal(4, name="counter") + toggle = Signal() + + m.d.sync += counter.eq(counter + 1) + with m.If((counter == 1) | (counter == 3) | (counter == 6)): + m.d.sync += toggle.eq(~toggle) + m.d.comb += self.out.eq(toggle) + + return m + +class ToggleCoverageTest(unittest.TestCase): + def test_toggle_coverage_regular(self): + dut = ToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(16): + yield Tick() + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("[Regular] Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") + self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") + + def test_toggle_coverage_irregular(self): + dut = IrregularToggleDUT() + sim = Simulator(dut) + + toggle_cov = ToggleCoverageObserver(sim._engine.state) + sim._engine.add_observer(toggle_cov) + + def process(): + for _ in range(16): + yield Tick() + + sim.add_clock(1e-6) + sim.add_testbench(process) + sim.run() + + results = toggle_cov.get_results() + print("[Irregular] Toggle coverage results:") + for signal_name, toggles in results.items(): + print(f"{signal_name}: 0→1={toggles['0->1']}, 1→0={toggles['1->0']}") + + self.assertTrue(results["out"]["0->1"], "Expected at least one 0→1 toggle on 'out'") + self.assertTrue(results["out"]["1->0"], "Expected at least one 1→0 toggle on 'out'") + self.assertGreaterEqual(results["out"]["0->1"], 1) + self.assertGreaterEqual(results["out"]["1->0"], 1) +