From b6ccf92b931883ec6e39704c894c2b279b15bc18 Mon Sep 17 00:00:00 2001 From: Enrico Lupi Date: Fri, 7 Nov 2025 11:31:35 +0100 Subject: [PATCH 1/8] ADD support for PQuantML layers in PyTorch --- hls4ml/converters/pytorch/pquant.py | 154 ++++++++++++++++++ hls4ml/converters/pytorch_to_hls.py | 43 +++-- .../passes/convert_to_channels_last.py | 39 +++-- hls4ml/utils/torch.py | 1 + 4 files changed, 211 insertions(+), 26 deletions(-) create mode 100644 hls4ml/converters/pytorch/pquant.py diff --git a/hls4ml/converters/pytorch/pquant.py b/hls4ml/converters/pytorch/pquant.py new file mode 100644 index 0000000000..14502f9955 --- /dev/null +++ b/hls4ml/converters/pytorch/pquant.py @@ -0,0 +1,154 @@ +from collections.abc import Iterable +from warnings import warn + +import numpy as np + +from hls4ml.converters.pytorch.convolution import parse_conv1d_layer, parse_conv2d_layer +from hls4ml.converters.pytorch.core import parse_batchnorm_layer, parse_linear_layer +from hls4ml.converters.pytorch.pooling import parse_pooling_layer +from hls4ml.converters.pytorch_to_hls import pytorch_handler +from hls4ml.model.types import FixedPrecisionType + + +def extract_fixed_quantizer_config(q, shape, input, name): + q_params = q._parameters + + shape = tuple(shape[1:]) # type: ignore + print(f'FixedPointQuantizer shape: {shape}') + if any([s is None for s in shape]): + raise ValueError(f'Tensor {input} has at least one dimension with no fixed size') + k, i, f = q_params['k'].data, q_params['i'].data, q_params['f'].data + k, B, I = k, k + i + f, k + i # type: ignore # noqa: E741 + k, B, I = k.detach().cpu().numpy(), B.detach().cpu().numpy(), I.detach().cpu().numpy() # noqa: E741 + I = np.where(B > 0, I, 0) # noqa: E741 # type: ignore + + k = np.broadcast_to(k.astype(np.int16), (1,) + shape) # type: ignore + B = np.broadcast_to(B.astype(np.int16), (1,) + shape) # type: ignore + I = np.broadcast_to(I.astype(np.int16), (1,) + shape) # noqa: E741 + + overflow_mode: str = q.overflow + round_mode: str = q.round_mode + if round_mode.startswith('S_'): + round_mode = round_mode[2:] + fusible = np.unique(k).size == 1 and np.unique(B).size == 1 and np.unique(I).size == 1 + + return { + 'name': name, + 'inputs': [input], + 'class_name': 'FixedPointQuantizer', + 'mask_kbi': (k, B, I), + 'SAT': overflow_mode, + 'RND': round_mode, + 'fusible': fusible, + 'overrides': {}, + } + + +def add_quantizer_info(class_object, input_names, input_shapes, output_shape, layer): + if getattr(class_object, 'quantize_input', False) and hasattr(class_object, 'input_quantizer'): + if isinstance(class_object.input_quantizer, Iterable): + iq_confs = [ + extract_fixed_quantizer_config(q, shape, input, f'{layer["name"]}_iq_{i}') + for q, shape, input, i in zip( + class_object.input_quantizer, input_shapes, input_names, [k for k in range(len(input_names))] + ) + ] + else: + iq_confs = [ + extract_fixed_quantizer_config( + class_object.input_quantizer, input_shapes[0], input_names[0], f'{layer["name"]}_iq' + ) + ] + layer['inputs'] = [q['name'] for q in iq_confs] + iq_shapes = input_shapes + else: + iq_confs = [] + iq_shapes = [] + + if getattr(class_object, 'quantize_output', False) and hasattr(class_object, 'output_quantizer'): + if isinstance(class_object.output_quantizer, Iterable): + oq_confs = [ + extract_fixed_quantizer_config(q, output_shape, layer['name'], f'{layer["name"]}_oq_{i}') + for q, i in zip(class_object.output_quantizer, [k for k in range(len(class_object.output_quantizer))]) + ] + oq_shapes = [output_shape for _ in len(class_object.output_quantizer)] + else: + oq_confs = [ + extract_fixed_quantizer_config( + class_object.output_quantizer, output_shape, layer['name'], f'{layer["name"]}_oq' + ) + ] + oq_shapes = [output_shape] + else: + oq_confs = [] + oq_shapes = [] + + out_shapes = [] + if iq_shapes: + out_shapes.append(iq_shapes) + out_shapes.append(output_shape) + if oq_shapes: + out_shapes.append(oq_shapes) + + return iq_confs + [layer] + oq_confs, iq_shapes + [output_shape] + oq_shapes + + +def make_pquant_handler(base_parse_func, op, op_check=None): + if op_check is None: + op_check = op + + @pytorch_handler(op) + def handler(operation, layer_name, input_names, input_shapes, node, class_object, data_reader, config): + assert op in operation + layer, output_shape = base_parse_func( + op_check, layer_name, input_names, input_shapes, node, class_object, data_reader, config + ) + layers, output_shapes = add_quantizer_info(class_object, input_names, input_shapes, output_shape, layer) + return layers, output_shapes + + handler.__name__ = f'parse_{op.lower()}_layer' + return handler + + +parse_pqlinear_layer = make_pquant_handler(parse_linear_layer, 'PQDense', 'PQLinear') +parse_pqbatchnorm_layer = make_pquant_handler(parse_batchnorm_layer, 'PQBatchNorm2d') +parse_pqconv1d_layer = make_pquant_handler(parse_conv1d_layer, 'PQConv1d') +parse_pqconv2d_layer = make_pquant_handler(parse_conv2d_layer, 'PQConv2d') +parse_pqpool1d_layer = make_pquant_handler(parse_pooling_layer, 'PQAvgPool1d', 'AvgPool1d') +parse_pqpool2d_layer = make_pquant_handler(parse_pooling_layer, 'PQAvgPool2d', 'AvgPool2d') + + +def parse_quant_activation_layer(operation, layer_name, input_names, input_shapes, node, class_object, data_reader, config): + layer = {} + + layer['activation'] = class_object.activation_name + + print(f'Parsing activation: {layer["activation"]}') + + layer['name'] = layer_name + layer['inputs'] = input_names + + if layer['activation'] == 'hard_tanh': + layer['class_name'] = 'HardActivation' + layer['slope'] = 0.5 + layer['shift'] = 0.5 + layer['slope_prec'] = FixedPrecisionType(width=2, integer=0, signed=False) + layer['shift_prec'] = FixedPrecisionType(width=2, integer=0, signed=False) + warn(f'Hard Tanh activation {layer_name} is currently not supported for bit-exactness.') + + elif layer['activation'] == 'relu' and class_object.use_multiplier: + raise Exception('hls4ml does not currently support activations with multiplier') + """ + layer['activation'] = 'multiplier_relu' + layer['class_name'] = 'MultiplierReLU' + layer['param_data'] = class_object.multiplier.data.numpy() + """ + + else: + layer['class_name'] = 'Activation' + + output_shape = input_shapes[0] + return layer, output_shape + + +parse_pqactivation_layer = make_pquant_handler(parse_quant_activation_layer, 'PQActivation') diff --git a/hls4ml/converters/pytorch_to_hls.py b/hls4ml/converters/pytorch_to_hls.py index 5399cf37cb..9431650a1a 100644 --- a/hls4ml/converters/pytorch_to_hls.py +++ b/hls4ml/converters/pytorch_to_hls.py @@ -274,20 +274,41 @@ def resolve_getitem_source(node_name, visited=None): pytorch_class, layer_name, input_names, input_shapes, node, class_object, reader, config ) - if verbose: - print( - 'Layer name: {}, layer type: {}, input shape: {}'.format( - layer['name'], - layer['class_name'], - input_shapes, + if isinstance(layer, dict): + if verbose: + print( + 'Layer name: {}, layer type: {}, input shape: {}'.format( + layer['name'], + layer['class_name'], + input_shapes, + ) ) - ) - layer_list.append(layer) + layer_list.append(layer) - assert output_shape is not None - output_shapes[layer['name']] = output_shape + assert output_shape is not None + output_shapes[layer['name']] = output_shape - layer_counter += 1 + layer_counter += 1 + + else: + for idx, (lay, out_shape) in enumerate(zip(layer, output_shape)): + if verbose: + print( + 'Layer name: {}, layer type: {}, input shape: {}'.format( + lay['name'], + lay['class_name'], + input_shapes, + ) + ) + layer_list.append(lay) + + if idx < len(layer) - 1: + inputs_map[lay['name']] = inputs_map.get(layer[idx + 1]['name'], layer[idx + 1]['name']) + + assert out_shape is not None + output_shapes[lay['name']] = out_shape + + layer_counter += 1 if node.op == 'placeholder': # 'placeholder' indicates an input layer. Multiple inputs are supported diff --git a/hls4ml/model/optimizer/passes/convert_to_channels_last.py b/hls4ml/model/optimizer/passes/convert_to_channels_last.py index cc3b6d0e10..b5400140df 100644 --- a/hls4ml/model/optimizer/passes/convert_to_channels_last.py +++ b/hls4ml/model/optimizer/passes/convert_to_channels_last.py @@ -2,8 +2,11 @@ # Based on https://github.com/fastmachinelearning/qonnx/blob/ # 12c96a3ded06beacab08e0f554e4ed014476c0aa/src/qonnx/transformation/channels_last.py +import numpy as np + from hls4ml.model.layers import GRU, LSTM, Concatenate, Dense, Input, LayerNormalization, Reshape, Transpose from hls4ml.model.optimizer import OptimizerPass +from hls4ml.model.optimizer.passes.hgq_proxy_model import FixedPointQuantizer from hls4ml.model.types import WeightVariable @@ -62,21 +65,27 @@ def transform(self, model, node): elif isinstance(node, LSTM) or isinstance(node, GRU): pass else: - # Transpose weight tensors - tensors = ['weight', 'depthwise', 'pointwise', 'zero_bias', 'scale', 'recurrent_weight'] - for tensor in tensors: - try: - if len(node.get_weights(tensor).shape) == 2: - weights_channels_last = node.get_weights(tensor).data.transpose() - node.get_weights(tensor).data = weights_channels_last - elif len(node.get_weights(tensor).shape) == 3: - weights_channels_last = node.get_weights(tensor).data.transpose([2, 1, 0]) - node.get_weights(tensor).data = weights_channels_last - elif len(node.get_weights(tensor).shape) == 4: - weights_channels_last = node.get_weights(tensor).data.transpose([2, 3, 1, 0]) - node.get_weights(tensor).data = weights_channels_last - except KeyError: - pass + if isinstance(node, FixedPointQuantizer): + transpose_map = {3: (0, 2, 1), 4: (0, 3, 2, 1), 5: (0, 3, 4, 2, 1)} + node.mask_kbi = tuple( + np.transpose(t, transpose_map[t.ndim]) if t.ndim in transpose_map else t for t in node.mask_kbi + ) + else: + # Transpose weight tensors + tensors = ['weight', 'depthwise', 'pointwise', 'zero_bias', 'scale', 'recurrent_weight'] + for tensor in tensors: + try: + if len(node.get_weights(tensor).shape) == 2: + weights_channels_last = node.get_weights(tensor).data.transpose() + node.get_weights(tensor).data = weights_channels_last + elif len(node.get_weights(tensor).shape) == 3: + weights_channels_last = node.get_weights(tensor).data.transpose([2, 1, 0]) + node.get_weights(tensor).data = weights_channels_last + elif len(node.get_weights(tensor).shape) == 4: + weights_channels_last = node.get_weights(tensor).data.transpose([2, 3, 1, 0]) + node.get_weights(tensor).data = weights_channels_last + except KeyError: + pass try: node.set_attr('data_format', 'channels_last') except AttributeError: diff --git a/hls4ml/utils/torch.py b/hls4ml/utils/torch.py index 25d2754b1f..71d97dfaff 100644 --- a/hls4ml/utils/torch.py +++ b/hls4ml/utils/torch.py @@ -22,4 +22,5 @@ def is_leaf_module(self, m, module_qualified_name: str) -> bool: or m.__module__.startswith('torch.nn') or m.__module__.startswith('torch.ao.nn') or m.__module__.startswith('brevitas.nn') + or m.__module__.startswith('pquant.core') ) and not isinstance(m, torch.nn.Sequential) From 26f6d8a0b94256cd421de2756d4ae1bd0ab99373 Mon Sep 17 00:00:00 2001 From: Enrico Lupi Date: Thu, 27 Nov 2025 16:21:36 +0100 Subject: [PATCH 2/8] ADD support for PQuantML layers in Keras V3 --- hls4ml/converters/keras_v3/__init__.py | 1 + hls4ml/converters/keras_v3/hgq2/_base.py | 20 +- hls4ml/converters/keras_v3/pquant/__init__.py | 3 + hls4ml/converters/keras_v3/pquant/_base.py | 194 ++++++++++++++++++ hls4ml/converters/keras_v3/pquant/pooling.py | 30 +++ 5 files changed, 241 insertions(+), 7 deletions(-) create mode 100644 hls4ml/converters/keras_v3/pquant/__init__.py create mode 100644 hls4ml/converters/keras_v3/pquant/_base.py create mode 100644 hls4ml/converters/keras_v3/pquant/pooling.py diff --git a/hls4ml/converters/keras_v3/__init__.py b/hls4ml/converters/keras_v3/__init__.py index 21950aea6c..4f99225313 100644 --- a/hls4ml/converters/keras_v3/__init__.py +++ b/hls4ml/converters/keras_v3/__init__.py @@ -5,6 +5,7 @@ hgq2, # noqa: F401 merge, # noqa: F401 pooling, # noqa: F401 + pquant, # noqa: F401 recurrent, # noqa: F401 ) from ._base import registry as layer_handlers diff --git a/hls4ml/converters/keras_v3/hgq2/_base.py b/hls4ml/converters/keras_v3/hgq2/_base.py index 4a6d0a22c2..cc82934b53 100644 --- a/hls4ml/converters/keras_v3/hgq2/_base.py +++ b/hls4ml/converters/keras_v3/hgq2/_base.py @@ -16,16 +16,14 @@ from keras.src.layers.layer import Layer as Layer -def extract_fixed_quantizer_config(q, tensor: 'KerasTensor', is_input: bool) -> dict[str, Any]: - from hgq.quantizer.internal.fixed_point_quantizer import FixedPointQuantizerKBI, FixedPointQuantizerKIF +def extract_quantizer_config(q, extract_kif, tensor: 'KerasTensor', is_input: bool) -> dict[str, Any]: from keras import ops - internal_q: FixedPointQuantizerKIF | FixedPointQuantizerKBI = q.quantizer - shape: tuple[int, ...] = tensor.shape[1:] # type: ignore if any([s is None for s in shape]): raise ValueError(f'Tensor {tensor.name} has at least one dimension with no fixed size') - k, i, f = internal_q.kif + + k, i, f = extract_kif(q) k, B, I = k, k + i + f, k + i # type: ignore # noqa: E741 k, B, I = ops.convert_to_numpy(k), ops.convert_to_numpy(B), ops.convert_to_numpy(I) # noqa: E741 I = np.where(B > 0, I, 0) # noqa: E741 # type: ignore @@ -34,8 +32,8 @@ def extract_fixed_quantizer_config(q, tensor: 'KerasTensor', is_input: bool) -> B = np.broadcast_to(B.astype(np.int16), (1,) + shape) # type: ignore I = np.broadcast_to(I.astype(np.int16), (1,) + shape) # noqa: E741 - overflow_mode: str = internal_q.overflow_mode - round_mode: str = internal_q.round_mode + overflow_mode: str = getattr(q, 'overflow_mode', q.overflow) + round_mode: str = q.round_mode if round_mode.startswith('S_'): round_mode = round_mode[2:] fusible = np.unique(k).size == 1 and np.unique(B).size == 1 and np.unique(I).size == 1 @@ -55,6 +53,14 @@ def extract_fixed_quantizer_config(q, tensor: 'KerasTensor', is_input: bool) -> } +def extract_fixed_quantizer_config(q, tensor: 'KerasTensor', is_input: bool) -> dict[str, Any]: + from hgq.quantizer.internal.fixed_point_quantizer import FixedPointQuantizerKBI, FixedPointQuantizerKIF + + internal_q: FixedPointQuantizerKIF | FixedPointQuantizerKBI = q.quantizer + + return extract_quantizer_config(internal_q, lambda q: q.kif, tensor, is_input) + + def override_io_tensor_confs(confs: tuple[dict[str, Any], ...], overrides: dict[str, str]): for conf in confs: inp_tensor_names = conf['input_keras_tensor_names'] diff --git a/hls4ml/converters/keras_v3/pquant/__init__.py b/hls4ml/converters/keras_v3/pquant/__init__.py new file mode 100644 index 0000000000..0a7ac5bb6d --- /dev/null +++ b/hls4ml/converters/keras_v3/pquant/__init__.py @@ -0,0 +1,3 @@ +from . import _base, pooling + +__all__ = ['_base', 'pooling'] diff --git a/hls4ml/converters/keras_v3/pquant/_base.py b/hls4ml/converters/keras_v3/pquant/_base.py new file mode 100644 index 0000000000..81f681dc6f --- /dev/null +++ b/hls4ml/converters/keras_v3/pquant/_base.py @@ -0,0 +1,194 @@ +from collections.abc import Sequence +from math import prod +from typing import TYPE_CHECKING, Any + +from hls4ml.converters.keras_v3._base import KerasV3LayerHandler, register +from hls4ml.converters.keras_v3.conv import ConvHandler +from hls4ml.converters.keras_v3.core import ActivationHandler, DenseHandler +from hls4ml.converters.keras_v3.hgq2._base import extract_quantizer_config, override_io_tensor_confs + +if TYPE_CHECKING: + import pquant + from keras import KerasTensor + from keras.src.layers.layer import Layer as Layer + + +def extract_pquant_quantizer_config(q, tensor: 'KerasTensor', is_input: bool) -> dict[str, Any]: + from pquant.quantizer import Quantizer + + if not isinstance(q, Quantizer): + raise TypeError(f'Quantizer {type(q).__name__} ({q.__module__}) is not an instance of any allowed Quantizer class.') + + if q.use_hgq: + return extract_quantizer_config(q.quantizer.quantizer, lambda q: q.kif, tensor, is_input) + else: + return extract_quantizer_config(q, lambda q: (q.k, q.i, q.f), tensor, is_input) + + +@register +class PQLayerHandler(KerasV3LayerHandler): + def __call__( + self, + layer: ( + 'pquant.core.keras.layers.PQWeightBiasBase | ' + 'pquant.core.keras.layers.PQBatchNormalization | ' + 'pquant.core.keras.layers.QuantizedPooling | ' + 'pquant.core.keras.layers.QuantizedActivation' + ), + in_tensors: Sequence['KerasTensor'], + out_tensors: Sequence['KerasTensor'], + ): + ret = super().__call__(layer, in_tensors, out_tensors) + + if getattr(layer, 'quantize_input', False) and hasattr(layer, 'input_quantizer'): + if len(in_tensors) > 1: + iq_confs = [ + extract_pquant_quantizer_config(q, tensor, True) for q, tensor in zip(layer.input_quantizer, in_tensors) + ] + else: + iq_confs = [extract_pquant_quantizer_config(layer.input_quantizer, in_tensors[0], True)] + else: + iq_confs = () + + if getattr(layer, 'quantize_output', False) and hasattr(layer, 'output_quantizer'): + if len(out_tensors) > 1: + oq_confs = [ + extract_pquant_quantizer_config(q, tensor, False) + for q, tensor in zip(layer.output_quantizer, out_tensors) + ] + else: + oq_confs = [extract_pquant_quantizer_config(layer.output_quantizer, out_tensors[0], False)] + else: + oq_confs = () + + if iq_confs: + _froms = [t.name for t in in_tensors] + _tos = [f'{t.name}_q' for t in in_tensors] + overrides = dict(zip(_froms, _tos)) + override_io_tensor_confs(ret, overrides) + + if oq_confs: + _froms = [t.name for t in out_tensors] + _tos = [f'{t.name}_q' for t in out_tensors] + overrides = dict(zip(_froms, _tos)) + override_io_tensor_confs(ret, overrides) + + return *iq_confs, *ret, *oq_confs + + def load_weight(self, layer: 'Layer', key: str): + from keras import ops + + if hasattr(layer, f'q{key}'): + return ops.convert_to_numpy(getattr(layer, f'q{key}')) + return super().load_weight(layer, key) + + def default_class_name(self, layer: 'Layer') -> str: + class_name = layer.__class__.__name__ + if class_name.startswith('PQ'): + class_name = class_name[2:] + return class_name + + +@register +class PQActivationHandler(PQLayerHandler, ActivationHandler): + handles = ('pquant.core.keras.activations.PQActivation',) + + def handle( + self, + layer: 'pquant.core.keras.activations.PQActivation', + in_tensors: Sequence['KerasTensor'], + out_tensors: Sequence['KerasTensor'], + ): + config = {} + config.update(self.default_config) + + activation = getattr(layer, 'activation_name', 'linear') + match activation: + case 'hard_tanh': + class_name = 'HardActivation' + case _: + class_name = 'Activation' + + config['activation'] = activation + config['class_name'] = class_name + config['n_in'] = prod(in_tensors[0].shape[1:]) # type: ignore + return (config,) + + +@register +class PQBatchNormalizationHandler(PQLayerHandler): + handles = ('pquant.core.keras.layers.PQBatchNormalization',) + + def handle( + self, + layer: 'pquant.core.keras.layers.PQBatchNormalization', + in_tensors: Sequence['KerasTensor'], + out_tensors: Sequence['KerasTensor'], + ): + from keras import ops + + assert layer.axis in (len(in_tensors[0].shape) - 1, -1), 'Only batch_norm with axis=-1 is supported in hls4ml' + + conf = {} + conf['class_name'] = layer.__class__.__name__[1:] + conf['n_in'] = prod(in_tensors[0].shape[1:]) + + conf['use_gamma'] = layer.scale + if conf['use_gamma']: + conf['gamma_data'] = ops.convert_to_numpy(layer.gamma) + else: + conf['gamma_data'] = 1 + + conf['use_beta'] = layer.center + if conf['use_beta']: + conf['beta_data'] = ops.convert_to_numpy(layer.beta) + else: + conf['beta_data'] = 0 + + conf['mean_data'] = ops.convert_to_numpy(layer.moving_mean) + conf['variance_data'] = ops.convert_to_numpy(layer.moving_variance) + conf['n_filt'] = conf['variance_data'].size + + return conf + + +@register +class PQConvHandler(PQLayerHandler, ConvHandler): + handles = ('pquant.core.keras.layers.PQConv1d', 'pquant.core.keras.layers.PQConv2d') + + def handle( + self, + layer: 'pquant.core.keras.layers.PQConv1D | pquant.core.keras.layers.PQConv2D', + in_tensors: Sequence['KerasTensor'], + out_tensors: Sequence['KerasTensor'], + ): + conf = super().handle(layer, in_tensors, out_tensors) + conf['class_name'] = layer.__class__.__name__[1:-1] + 'D' + pf = layer.parallelization_factor + out_shape: tuple[int, ...] = out_tensors[0].shape[1:] # type: ignore + if pf < 0: + if layer.data_format == 'channels_last': + pf = prod(out_shape[:-1]) + else: + pf = prod(out_shape[1:]) + conf['parallelization_factor'] = pf + return conf + + +@register +class PQDenseHandler(PQLayerHandler, DenseHandler): + handles = ('pquant.core.keras.layers.PQDense',) + + def handle( + self, + layer: 'pquant.core.keras.layers.PQDense', + in_tensors: Sequence['KerasTensor'], + out_tensors: Sequence['KerasTensor'], + ): + conf = super().handle(layer, in_tensors, out_tensors) + conf['class_name'] = 'Dense' + in_shape: tuple[int, ...] = in_tensors[0].shape[1:] # type: ignore + if len(in_shape) > 1: + pf = layer.parallelization_factor + conf['parallelization_factor'] = pf + return conf diff --git a/hls4ml/converters/keras_v3/pquant/pooling.py b/hls4ml/converters/keras_v3/pquant/pooling.py new file mode 100644 index 0000000000..5625502cfe --- /dev/null +++ b/hls4ml/converters/keras_v3/pquant/pooling.py @@ -0,0 +1,30 @@ +from collections.abc import Sequence +from typing import TYPE_CHECKING + +from hls4ml.converters.keras_v3._base import register +from hls4ml.converters.keras_v3.pooling import PoolingHandler + +from ._base import PQLayerHandler + +if TYPE_CHECKING: + import pquant + from keras import KerasTensor + + +@register +class PQAvgPoolHandler(PQLayerHandler, PoolingHandler): + handles = ( + 'pquant.core.keras.layers.PQAvgPool1d', + 'pquant.core.keras.layers.PQAvgPool2d', + ) + + def handle( + self, + layer: 'pquant.core.keras.layers.PQAvgPool1d | pquant.core.keras.layers.PQAvgPool2d', + in_tensors: Sequence['KerasTensor'], + out_tensors: Sequence['KerasTensor'], + ): + conf = super().handle(layer, in_tensors, out_tensors) + conf['class_name'] = 'AveragePooling' + layer.__class__.__name__[-2] + 'D' + + return conf From 27c684fddd51a9fecf0326cd34b3f944e318fab8 Mon Sep 17 00:00:00 2001 From: Enrico Lupi Date: Wed, 3 Dec 2025 13:57:40 +0100 Subject: [PATCH 3/8] FIX commutation for Quantizer --- .../model/optimizer/passes/convert_to_channels_last.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hls4ml/model/optimizer/passes/convert_to_channels_last.py b/hls4ml/model/optimizer/passes/convert_to_channels_last.py index b5400140df..ff35c95182 100644 --- a/hls4ml/model/optimizer/passes/convert_to_channels_last.py +++ b/hls4ml/model/optimizer/passes/convert_to_channels_last.py @@ -66,7 +66,7 @@ def transform(self, model, node): pass else: if isinstance(node, FixedPointQuantizer): - transpose_map = {3: (0, 2, 1), 4: (0, 3, 2, 1), 5: (0, 3, 4, 2, 1)} + transpose_map = {3: (0, 2, 1), 4: (0, 2, 3, 1), 5: (0, 2, 3, 4, 1)} node.mask_kbi = tuple( np.transpose(t, transpose_map[t.ndim]) if t.ndim in transpose_map else t for t in node.mask_kbi ) @@ -75,13 +75,14 @@ def transform(self, model, node): tensors = ['weight', 'depthwise', 'pointwise', 'zero_bias', 'scale', 'recurrent_weight'] for tensor in tensors: try: - if len(node.get_weights(tensor).shape) == 2: + t_shape = node.get_weights(tensor).shape + if len(t_shape) == 2: weights_channels_last = node.get_weights(tensor).data.transpose() node.get_weights(tensor).data = weights_channels_last - elif len(node.get_weights(tensor).shape) == 3: + elif len(t_shape) == 3: weights_channels_last = node.get_weights(tensor).data.transpose([2, 1, 0]) node.get_weights(tensor).data = weights_channels_last - elif len(node.get_weights(tensor).shape) == 4: + elif len(t_shape) == 4: weights_channels_last = node.get_weights(tensor).data.transpose([2, 3, 1, 0]) node.get_weights(tensor).data = weights_channels_last except KeyError: From 46473f66d2c9d7ff151f6ff66fe47d99251a1f43 Mon Sep 17 00:00:00 2001 From: Enrico Lupi Date: Fri, 5 Dec 2025 17:43:47 +0100 Subject: [PATCH 4/8] FIX extract quant info & batchnorm --- hls4ml/converters/keras_v3/hgq2/_base.py | 20 ++++----- hls4ml/converters/keras_v3/pquant/_base.py | 49 ++++++++++++++++++++-- 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/hls4ml/converters/keras_v3/hgq2/_base.py b/hls4ml/converters/keras_v3/hgq2/_base.py index cc82934b53..4a6d0a22c2 100644 --- a/hls4ml/converters/keras_v3/hgq2/_base.py +++ b/hls4ml/converters/keras_v3/hgq2/_base.py @@ -16,14 +16,16 @@ from keras.src.layers.layer import Layer as Layer -def extract_quantizer_config(q, extract_kif, tensor: 'KerasTensor', is_input: bool) -> dict[str, Any]: +def extract_fixed_quantizer_config(q, tensor: 'KerasTensor', is_input: bool) -> dict[str, Any]: + from hgq.quantizer.internal.fixed_point_quantizer import FixedPointQuantizerKBI, FixedPointQuantizerKIF from keras import ops + internal_q: FixedPointQuantizerKIF | FixedPointQuantizerKBI = q.quantizer + shape: tuple[int, ...] = tensor.shape[1:] # type: ignore if any([s is None for s in shape]): raise ValueError(f'Tensor {tensor.name} has at least one dimension with no fixed size') - - k, i, f = extract_kif(q) + k, i, f = internal_q.kif k, B, I = k, k + i + f, k + i # type: ignore # noqa: E741 k, B, I = ops.convert_to_numpy(k), ops.convert_to_numpy(B), ops.convert_to_numpy(I) # noqa: E741 I = np.where(B > 0, I, 0) # noqa: E741 # type: ignore @@ -32,8 +34,8 @@ def extract_quantizer_config(q, extract_kif, tensor: 'KerasTensor', is_input: bo B = np.broadcast_to(B.astype(np.int16), (1,) + shape) # type: ignore I = np.broadcast_to(I.astype(np.int16), (1,) + shape) # noqa: E741 - overflow_mode: str = getattr(q, 'overflow_mode', q.overflow) - round_mode: str = q.round_mode + overflow_mode: str = internal_q.overflow_mode + round_mode: str = internal_q.round_mode if round_mode.startswith('S_'): round_mode = round_mode[2:] fusible = np.unique(k).size == 1 and np.unique(B).size == 1 and np.unique(I).size == 1 @@ -53,14 +55,6 @@ def extract_quantizer_config(q, extract_kif, tensor: 'KerasTensor', is_input: bo } -def extract_fixed_quantizer_config(q, tensor: 'KerasTensor', is_input: bool) -> dict[str, Any]: - from hgq.quantizer.internal.fixed_point_quantizer import FixedPointQuantizerKBI, FixedPointQuantizerKIF - - internal_q: FixedPointQuantizerKIF | FixedPointQuantizerKBI = q.quantizer - - return extract_quantizer_config(internal_q, lambda q: q.kif, tensor, is_input) - - def override_io_tensor_confs(confs: tuple[dict[str, Any], ...], overrides: dict[str, str]): for conf in confs: inp_tensor_names = conf['input_keras_tensor_names'] diff --git a/hls4ml/converters/keras_v3/pquant/_base.py b/hls4ml/converters/keras_v3/pquant/_base.py index 81f681dc6f..a972d195ac 100644 --- a/hls4ml/converters/keras_v3/pquant/_base.py +++ b/hls4ml/converters/keras_v3/pquant/_base.py @@ -2,10 +2,12 @@ from math import prod from typing import TYPE_CHECKING, Any +import numpy as np + from hls4ml.converters.keras_v3._base import KerasV3LayerHandler, register from hls4ml.converters.keras_v3.conv import ConvHandler from hls4ml.converters.keras_v3.core import ActivationHandler, DenseHandler -from hls4ml.converters.keras_v3.hgq2._base import extract_quantizer_config, override_io_tensor_confs +from hls4ml.converters.keras_v3.hgq2._base import override_io_tensor_confs if TYPE_CHECKING: import pquant @@ -13,6 +15,45 @@ from keras.src.layers.layer import Layer as Layer +def extract_quantizer_config( + q, extract_kif, tensor: 'KerasTensor', is_input: bool, overflow_attr: str = 'overflow_mode' +) -> dict[str, Any]: + from keras import ops + + shape: tuple[int, ...] = tensor.shape[1:] # type: ignore + if any([s is None for s in shape]): + raise ValueError(f'Tensor {tensor.name} has at least one dimension with no fixed size') + + k, i, f = extract_kif(q) + k, B, I = k, k + i + f, k + i # type: ignore # noqa: E741 + k, B, I = ops.convert_to_numpy(k), ops.convert_to_numpy(B), ops.convert_to_numpy(I) # noqa: E741 + I = np.where(B > 0, I, 0) # noqa: E741 # type: ignore + + k = np.broadcast_to(k.astype(np.int16), (1,) + shape) # type: ignore + B = np.broadcast_to(B.astype(np.int16), (1,) + shape) # type: ignore + I = np.broadcast_to(I.astype(np.int16), (1,) + shape) # noqa: E741 + + overflow_mode: str = getattr(q, overflow_attr, 'SAT') + round_mode: str = q.round_mode + if round_mode.startswith('S_'): + round_mode = round_mode[2:] + fusible = np.unique(k).size == 1 and np.unique(B).size == 1 and np.unique(I).size == 1 + + input_keras_tensor_names = tensor.name if is_input else f'{tensor.name}_q' + output_keras_tensor_names = f'{tensor.name}_q' if is_input else tensor.name + return { + 'name': q.name, + 'class_name': 'FixedPointQuantizer', + 'mask_kbi': (k, B, I), + 'SAT': overflow_mode, + 'RND': round_mode, + 'fusible': fusible, + 'input_keras_tensor_names': [input_keras_tensor_names], + 'output_keras_tensor_names': [output_keras_tensor_names], + 'overrides': {}, + } + + def extract_pquant_quantizer_config(q, tensor: 'KerasTensor', is_input: bool) -> dict[str, Any]: from pquant.quantizer import Quantizer @@ -22,7 +63,7 @@ def extract_pquant_quantizer_config(q, tensor: 'KerasTensor', is_input: bool) -> if q.use_hgq: return extract_quantizer_config(q.quantizer.quantizer, lambda q: q.kif, tensor, is_input) else: - return extract_quantizer_config(q, lambda q: (q.k, q.i, q.f), tensor, is_input) + return extract_quantizer_config(q, lambda q: (q.k, q.i, q.f), tensor, is_input, 'overflow') @register @@ -135,13 +176,13 @@ def handle( conf['use_gamma'] = layer.scale if conf['use_gamma']: - conf['gamma_data'] = ops.convert_to_numpy(layer.gamma) + conf['gamma_data'] = ops.convert_to_numpy(layer.weight_quantizer(layer.gamma)) else: conf['gamma_data'] = 1 conf['use_beta'] = layer.center if conf['use_beta']: - conf['beta_data'] = ops.convert_to_numpy(layer.beta) + conf['beta_data'] = ops.convert_to_numpy(layer.bias_quantizer(layer.beta)) else: conf['beta_data'] = 0 From 07dd12cf86ed71a5fddec2792e1a26eba75af08e Mon Sep 17 00:00:00 2001 From: Enrico Lupi Date: Fri, 5 Dec 2025 17:48:33 +0100 Subject: [PATCH 5/8] ADD testing for pytorch and keras --- test/pytest/test_pquant_keras.py | 174 +++++++++++++++++++++++++++ test/pytest/test_pquant_pytorch.py | 184 +++++++++++++++++++++++++++++ 2 files changed, 358 insertions(+) create mode 100644 test/pytest/test_pquant_keras.py create mode 100644 test/pytest/test_pquant_pytorch.py diff --git a/test/pytest/test_pquant_keras.py b/test/pytest/test_pquant_keras.py new file mode 100644 index 0000000000..7d985caa8b --- /dev/null +++ b/test/pytest/test_pquant_keras.py @@ -0,0 +1,174 @@ +import os +from pathlib import Path + +import numpy as np +import pytest +from pquant.activations import PQActivation +from pquant.core.finetuning import TuningConfig +from pquant.core.utils import get_default_config +from pquant.layers import PQAvgPool1d, PQAvgPool2d, PQBatchNormalization, PQConv1d, PQConv2d, PQDense + +from hls4ml.converters import convert_from_keras_model +from hls4ml.utils import config_from_keras_model + +os.environ['KERAS_BACKEND'] = 'tensorflow' +import keras # noqa: E402 + +test_path = Path(__file__).parent + + +def _run_synth_match_test(PQmodel: keras.Model, data, io_type: str, backend: str, dir: str, cond=None, strategy='latency'): + output_dir = dir + '/hls4ml_prj' + hls_config = config_from_keras_model( + PQmodel, + granularity='name', + default_precision='ap_fixed<32, 16>', + backend=backend, + ) + hls_model = convert_from_keras_model( + PQmodel, + io_type=io_type, + output_dir=output_dir, + backend=backend, + hls_config=hls_config, + ) + hls_model.compile() + + data_len = data.shape[0] if isinstance(data, np.ndarray) else data[0].shape[0] + r_pq: list[np.ndarray] = [PQmodel(data).numpy()] # type: ignore + r_hls: list[np.ndarray] = [hls_model.predict(np.ascontiguousarray(data)).reshape(r_pq[0].shape)] # type: ignore + + errors = [] + for i, (p, h) in enumerate(zip(r_pq, r_hls)): + try: + if cond is None: + mismatch_ph = p != h + assert np.sum(mismatch_ph) == 0, ( + f'Proxy-HLS4ML mismatch for out {i}: {np.sum(np.any(mismatch_ph, axis=1))} out of {data_len} samples are different. Sample: {p[mismatch_ph].ravel()[:5]} vs {h[mismatch_ph].ravel()[:5]}' # noqa: E501 + ) + else: + cond(p, h) + except AssertionError as e: + errors.append(e) + if len(errors) > 0: + msgs = [str(e) for e in errors] + raise AssertionError('\n'.join(msgs)) + + +def run_model_test( + PQmodel: keras.Model, + data, + io_type: str, + backend: str, + dir: str, + cond=None, + strategy='latency', +): + _run_synth_match_test(PQmodel, data, io_type, backend, dir, cond=cond, strategy=strategy) + + +def create_pqlayer_model(layer: str, use_hgq: bool): + config = get_default_config('pdp') + config['pruning_parameters']['disable_pruning_for_layers'] = [''] + config['quantization_parameters']['use_high_granularity_quantization'] = use_hgq + config = TuningConfig.load_from_config(config) + + idx = layer.find('(') + 1 + layer = ( + layer[:idx] + + 'config, ' + + layer[idx:-1] + + (', quantize_output=True, out_quant_bits=(1., 2., 7.)' if 'BatchNorm' not in layer else '') + + ')' + ) + _layer = eval(layer) + + shape = get_shape(_layer) + inp = keras.Input(shape[1:]) + out = _layer(inp) + if 'BatchNorm' in layer: + flat = keras.layers.Flatten() + _layer2 = PQDense(config, 16, in_quant_bits=(1.0, 1.0, 7.0), quantize_output=True, out_quant_bits=(1.0, 2.0, 7.0)) + out = _layer2(flat(out)) + model = keras.Model(inp, out) + + return model, shape + + +def get_data(shape: tuple[int, ...], v: float, max_scale: float): + rng = np.random.default_rng() + a1 = rng.uniform(-v, v, shape).astype(np.float32) + a2 = rng.uniform(0, max_scale, (1, *shape[1:])).astype(np.float32) + return (a1 * a2).astype(np.float32) + + +def get_shape( + layer: keras.layers.Layer, + batch_size: int = 1, + default_length: int = 32, + default_hw: tuple[int, int] = (32, 32), + default_channels: int = 2, +): + match layer: + case PQActivation(): + # (N, L) + return (batch_size, default_length) + case PQAvgPool1d(): + # (N, L, C) + return (batch_size, default_length, default_channels) + case PQAvgPool2d(): + # (N, H, W, C) + return (batch_size, *default_hw, default_channels) + case PQBatchNormalization(): + # (N, num_features, H, W) + return (batch_size, *default_hw, default_channels) + case PQConv1d(): + # (N, C_in, L) + return (batch_size, default_length, default_channels) + case PQConv2d(): + # (N, C_in, H, W) + return (batch_size, *default_hw, default_channels) + case PQDense(): + # (N, in_features) + return (batch_size, default_length) + case _: + raise TypeError(f'Unsupported layer type: {type(layer).__name__}') + + +@pytest.mark.parametrize( + 'layer', + [ + 'PQDense(16)', + 'PQDense(16, use_bias=False)', + "PQConv1d(3, kernel_size=3, padding='same')", + "PQConv1d(3, kernel_size=3, padding='valid')", + "PQConv1d(3, kernel_size=3, padding='valid', use_bias=False)", + "PQConv1d(3, kernel_size=3, padding='valid', strides=2)", + "PQConv1d(3, kernel_size=3, padding='same', strides=2)", + "PQConv2d(3, kernel_size=(3,3), padding='same')", + "PQConv2d(3, kernel_size=(3,3), padding='valid')", + "PQConv2d(3, kernel_size=(3,3), padding='valid', use_bias=False)", + "PQConv2d(3, kernel_size=(3,3), padding='valid', strides=2)", + "PQConv2d(3, kernel_size=(3,3), padding='same', strides=2)", + 'PQBatchNormalization()', + "PQAvgPool1d(2, padding='same')", + "PQAvgPool2d((1,2), padding='same')", + "PQAvgPool2d((2,2), padding='same')", + "PQAvgPool1d(2, padding='valid')", + "PQAvgPool2d((1,2), padding='valid')", + "PQAvgPool2d((2,2), padding='valid')PQActivation('relu')", + "PQActivation('tanh')", + ], +) +@pytest.mark.parametrize('N', [1000]) +@pytest.mark.parametrize('io_type', ['io_parallel']) +@pytest.mark.parametrize('backend', ['vivado', 'vitis']) +@pytest.mark.parametrize('use_hgq', [True, False]) +@pytest.mark.parametrize('strategy', ['latency', 'resource']) +def test_syn_hlayers(layer, N: int, io_type: str, backend: str, use_hgq: bool, strategy: str): + model, data_shape = create_pqlayer_model(layer=layer, use_hgq=use_hgq) + data = get_data(data_shape, 7, 1) + + path = test_path / f'hls4mlprj_pquant_keras__{layer}_{io_type}_{backend}_{use_hgq}_{strategy}' + + run_model_test(model, data, io_type, backend, str(path), None, strategy) diff --git a/test/pytest/test_pquant_pytorch.py b/test/pytest/test_pquant_pytorch.py new file mode 100644 index 0000000000..472d9cb899 --- /dev/null +++ b/test/pytest/test_pquant_pytorch.py @@ -0,0 +1,184 @@ +import os +from pathlib import Path + +import numpy as np +import pytest +from pquant.activations import PQActivation +from pquant.core.finetuning import TuningConfig +from pquant.core.utils import get_default_config +from pquant.layers import PQAvgPool1d, PQAvgPool2d, PQBatchNorm2d, PQConv1d, PQConv2d, PQDense + +from hls4ml.converters import convert_from_pytorch_model +from hls4ml.utils import config_from_pytorch_model + +os.environ['KERAS_BACKEND'] = 'torch' +import torch # noqa: E402 +import torch.nn as nn # noqa: E402 + +test_path = Path(__file__).parent + + +def _run_synth_match_test(PQmodel: nn.Module, data, io_type: str, backend: str, dir: str, cond=None, strategy='latency'): + output_dir = dir + '/hls4ml_prj' + hls_config = config_from_pytorch_model( + PQmodel, + input_shape=tuple(data.shape[1:]), + granularity='name', + default_precision='ap_fixed<32, 16>', + backend=backend, + transpose_outputs=True, + ) + hls_model = convert_from_pytorch_model( + PQmodel, + io_type=io_type, + output_dir=output_dir, + backend=backend, + hls_config=hls_config, + ) + hls_model.compile() + + data_len = data.shape[0] if isinstance(data, np.ndarray) else data[0].shape[0] + r_pq: list[np.ndarray] = [PQmodel(data).detach().cpu().numpy()] # type: ignore + r_hls: list[np.ndarray] = [hls_model.predict(np.ascontiguousarray(data)).reshape(r_pq[0].shape)] # type: ignore + + errors = [] + for i, (p, h) in enumerate(zip(r_pq, r_hls)): + try: + if cond is None: + mismatch_ph = p != h + assert np.sum(mismatch_ph) == 0, ( + f'Proxy-HLS4ML mismatch for out {i}: {np.sum(np.any(mismatch_ph, axis=1))} out of {data_len} samples are different. Sample: {p[mismatch_ph].ravel()[:5]} vs {h[mismatch_ph].ravel()[:5]}' # noqa: E501 + ) + else: + cond(p, h) + except AssertionError as e: + errors.append(e) + if len(errors) > 0: + msgs = [str(e) for e in errors] + raise AssertionError('\n'.join(msgs)) + + +def run_model_test( + PQmodel: nn.Module, + data, + io_type: str, + backend: str, + dir: str, + cond=None, + strategy='latency', +): + PQmodel.eval() + PQmodel(data[:1]) + _run_synth_match_test(PQmodel, data, io_type, backend, dir, cond=cond, strategy=strategy) + + +def create_pqlayer_model(layer: str, use_hgq: bool): + config = get_default_config('pdp') + config['pruning_parameters']['disable_pruning_for_layers'] = [''] + config['quantization_parameters']['use_high_granularity_quantization'] = use_hgq + config = TuningConfig.load_from_config(config) + + idx = layer.find('(') + 1 + layer = ( + layer[:idx] + + 'config, ' + + layer[idx:-1] + + (', quantize_output=True, out_quant_bits=(1, 2, 7)' if 'BatchNorm' not in layer else '') + + ')' + ) + _layer = eval(layer) + + class SingleLayerModel(nn.Module): + def __init__(self, layer): + super().__init__() + self.layer = layer + + def forward(self, x): + return self.layer(x) + + model = SingleLayerModel(_layer) + return model + + +def get_data(shape: tuple[int, ...], v: float, max_scale: float): + rng = np.random.default_rng() + a1 = rng.uniform(-v, v, shape).astype(np.float32) + a2 = rng.uniform(0, max_scale, (1, *shape[1:])).astype(np.float32) + return torch.tensor((a1 * a2), dtype=torch.float32) + + +def get_shape(model: nn.Module, batch_size: int = 1, default_length: int = 32, default_hw: tuple[int, int] = (32, 32)): + for lay in list(model.modules())[1:]: + if not isinstance(lay, (nn.Sequential, nn.ModuleList, nn.Identity)): + layer = lay + break + else: + raise ValueError('Model has no valid layers to infer shape from.') + + match layer: + case PQActivation(): + # (N, L) + return (batch_size, default_length) + case PQAvgPool1d(): + # (N, C, L) + return (batch_size, 1, default_length) + case PQAvgPool2d(): + # (N, C, H, W) + return (batch_size, 1, *default_hw) + # case PQBatchNorm1d(): + # # (N, num_features, L) + # return (batch_size, layer.num_features, *default_length) + case PQBatchNorm2d(): + # (N, num_features, H, W) + return (batch_size, layer.num_features, *default_hw) + case PQConv1d(): + # (N, C_in, L) + return (batch_size, layer.in_channels, default_length) + case PQConv2d(): + # (N, C_in, H, W) + return (batch_size, layer.in_channels, *default_hw) + case PQDense(): + # (N, in_features) + return (batch_size, layer.in_features) + case _: + raise TypeError(f'Unsupported layer type: {type(layer).__name__}') + + +@pytest.mark.parametrize( + 'layer', + [ + 'PQDense(16, 4)', + 'PQDense(16, 4, bias=False)', + 'PQConv1d(2, 3, kernel_size=3, padding=1)', + 'PQConv1d(2, 3, kernel_size=3, padding=0)', + 'PQConv1d(2, 3, kernel_size=3, padding=0, bias=False)', + 'PQConv1d(2, 3, kernel_size=3, padding=0, stride=2)', + 'PQConv1d(2, 3, kernel_size=3, padding=1, stride=2)', + 'PQConv2d(2, 3, kernel_size=(3,3), padding=1)', + 'PQConv2d(2, 3, kernel_size=(3,3), padding=0)', + 'PQConv2d(2, 3, kernel_size=(3,3), padding=0, bias=False)', + 'PQConv2d(2, 3, kernel_size=(3,3), padding=0, stride=2)', + 'PQConv2d(2, 3, kernel_size=(3,3), padding=1, stride=2)', + 'PQBatchNorm2d(3)', + 'PQAvgPool1d(2, padding=1)', + 'PQAvgPool1d(2, padding=0)', + 'PQAvgPool2d((2,2), padding=1)', + 'PQAvgPool2d((2,2), padding=0)', + 'PQAvgPool2d((1, 2), stride=(1, 2), padding=(0, 1))', + "PQActivation('relu')", + "PQActivation('tanh')", + ], +) +@pytest.mark.parametrize('N', [1000]) +@pytest.mark.parametrize('io_type', ['io_parallel']) +@pytest.mark.parametrize('backend', ['vivado', 'vitis']) +@pytest.mark.parametrize('use_hgq', [True, False]) +@pytest.mark.parametrize('strategy', ['latency', 'resource']) +def test_syn_hlayers(layer, N: int, io_type: str, backend: str, use_hgq: bool, strategy: str): + model = create_pqlayer_model(layer=layer, use_hgq=use_hgq) + data_shape = get_shape(model, batch_size=N) + data = get_data(data_shape, 7, 1) + + path = test_path / f'hls4mlprj_pquant_pytorch_{layer}_{io_type}_{backend}_{use_hgq}_{strategy}' + + run_model_test(model, data, io_type, backend, str(path), None, strategy) From 5f0155f1154a9bf76ef89a63e46483d8a6492c6b Mon Sep 17 00:00:00 2001 From: Enrico Lupi Date: Tue, 9 Dec 2025 15:11:10 +0100 Subject: [PATCH 6/8] ADD documentation --- docs/advanced/pquant.rst | 89 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 docs/advanced/pquant.rst diff --git a/docs/advanced/pquant.rst b/docs/advanced/pquant.rst new file mode 100644 index 0000000000..c4eafa96b3 --- /dev/null +++ b/docs/advanced/pquant.rst @@ -0,0 +1,89 @@ +====================================== +PQuantML +====================================== + +.. image:: https://img.shields.io/badge/License-Apache_2.0-blue.svg + :target: https://www.apache.org/licenses/LICENSE-2.0 +.. image:: https://github.com/nroope/PQuant/actions/workflows/python-publish.yml/badge.svg + :target: https://pquantml.readthedocs.io +.. image:: https://badge.fury.io/py/pquant-ml.svg + :target: https://badge.fury.io/py/pquant-ml + +PQuantML is a hardware-aware model compression framework supporting: + - Joint pruning + quantization + - Layer-wise precision configuration + - Flexible training pipelines + - PyTorch and Keras V3 implementations + - Integration with hardware-friendly toolchains (e.g., hls4ml) + +PQuantML enables efficient deployment of compact neural networks on resource-constrained hardware such as FPGAs and embedded accelerators. + + +Key Features +------------ + + - **Joint Quantization + Pruning**: Combine bit-width reduction with structured pruning. + - **Flexible Precision Control**: Per-layer and mixed-precision configuration. + - **Hardware-Aware Objective**: Include resource constraints (DSP, LUT, BRAM) in training. + - **Simple API**: Configure compression through a single YAML or Python object. + - **PyTorch Integration**: Works with custom training/validation loops. + - **Export Support**: Model conversion towards hardware toolchains. + + +.. code-block:: python + :caption: Simple example + + import torch + from pquant import dst_config + from pquant.layers import PQDense + from pquant.activations import PQActivation + + # Define the compression config and model + config = dst_config() + config.training_parameters.epochs = 1000 + config.quantization_parameters.default_data_integer_bit = 3. + config.quantization_parameters.default_data_fractional_bits = 2. + config.quantization_parameters.default_weight_fractional_bits = 3. + config.quantization_parameters.use_relu_multiplier = False + + def build_model(config): + class Model(torch.nn.Module): + def __init__(self): + super().__init__() + self.dense1 = PQDense(config, 16, 64, + in_quant_bits = (1, 3, 3)) + self.relu1 = PQActivation(config, "relu") + self.dense2 = PQDense(config, 64, 32) + self.relu2 = PQActivation(config, "relu") + self.dense3 = PQDense(config, 32, 32) + self.relu3 = PQActivation(config, "relu") + self.dense4 = PQDense(config, 32, 5, + quantize_output=True, + out_quant_bits=(1, 3, 3)) + + def forward(self, x): + x = self.relu1(self.dense1(x)) + x = self.relu2(self.dense2(x)) + x = self.relu3(self.dense3(x)) + x = self.dense4(x) + return x + + return Model(config) + + PQmodel = build_model(config) + PQmodel(torch.rand((1, 16))) + + ... # Training, evaluation, and anything else you want to do with the model + + hls_config = config_from_pytorch_model( + PQmodel, + input_shape=input_shape, + ) + hls_model = convert_from_pytorch_model(PQmodel, ...) + # Model-wise precision propagation is done automatically for PQuantML models for bit-exactness + # Do NOT pass precision config if you don't know what you are doing + + hls_model.compile() + +.. note:: + Do not pass any precision configuration from ``hls4ml.converters.convert_from__model`` in general. PQuantML-defined models will invoke model-wise precision propagation automatically to ensure bit-exactness between the PQuantML model and the generated HLS code (See `here <./precision.html>`__ for more details). From 0805039ee5b867a87cdd8b032ad5876d781d2955 Mon Sep 17 00:00:00 2001 From: Enrico Lupi Date: Tue, 9 Dec 2025 15:11:52 +0100 Subject: [PATCH 7/8] ADD optional dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 82633a7158..a764701d61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ optional-dependencies.optimization = [ "ortools==9.4.1874", "packaging", ] +optional-dependencies.pquant-ml = [ "pquant-ml>=0.0.1" ] optional-dependencies.profiling = [ "matplotlib", "pandas", "seaborn" ] optional-dependencies.qkeras = [ "qkeras", From a8b3c1f82fd6a9e1a06610fbba8e2e609047ab07 Mon Sep 17 00:00:00 2001 From: Enrico Lupi Date: Mon, 15 Dec 2025 16:48:58 +0100 Subject: [PATCH 8/8] ADD support for PQBatchNorm1d in pytorch --- hls4ml/converters/pytorch/pquant.py | 3 ++- test/pytest/test_pquant_pytorch.py | 15 ++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/hls4ml/converters/pytorch/pquant.py b/hls4ml/converters/pytorch/pquant.py index 14502f9955..0856882544 100644 --- a/hls4ml/converters/pytorch/pquant.py +++ b/hls4ml/converters/pytorch/pquant.py @@ -111,7 +111,8 @@ def handler(operation, layer_name, input_names, input_shapes, node, class_object parse_pqlinear_layer = make_pquant_handler(parse_linear_layer, 'PQDense', 'PQLinear') -parse_pqbatchnorm_layer = make_pquant_handler(parse_batchnorm_layer, 'PQBatchNorm2d') +parse_pqbatchnorm1d_layer = make_pquant_handler(parse_batchnorm_layer, 'PQBatchNorm1d') +parse_pqbatchnorm2d_layer = make_pquant_handler(parse_batchnorm_layer, 'PQBatchNorm2d') parse_pqconv1d_layer = make_pquant_handler(parse_conv1d_layer, 'PQConv1d') parse_pqconv2d_layer = make_pquant_handler(parse_conv2d_layer, 'PQConv2d') parse_pqpool1d_layer = make_pquant_handler(parse_pooling_layer, 'PQAvgPool1d', 'AvgPool1d') diff --git a/test/pytest/test_pquant_pytorch.py b/test/pytest/test_pquant_pytorch.py index 472d9cb899..c0c8ed4902 100644 --- a/test/pytest/test_pquant_pytorch.py +++ b/test/pytest/test_pquant_pytorch.py @@ -3,10 +3,6 @@ import numpy as np import pytest -from pquant.activations import PQActivation -from pquant.core.finetuning import TuningConfig -from pquant.core.utils import get_default_config -from pquant.layers import PQAvgPool1d, PQAvgPool2d, PQBatchNorm2d, PQConv1d, PQConv2d, PQDense from hls4ml.converters import convert_from_pytorch_model from hls4ml.utils import config_from_pytorch_model @@ -14,6 +10,10 @@ os.environ['KERAS_BACKEND'] = 'torch' import torch # noqa: E402 import torch.nn as nn # noqa: E402 +from pquant.activations import PQActivation # noqa: E402 +from pquant.core.finetuning import TuningConfig # noqa: E402 +from pquant.core.utils import get_default_config # noqa: E402 +from pquant.layers import PQAvgPool1d, PQAvgPool2d, PQBatchNorm1d, PQBatchNorm2d, PQConv1d, PQConv2d, PQDense # noqa: E402 test_path = Path(__file__).parent @@ -125,9 +125,9 @@ def get_shape(model: nn.Module, batch_size: int = 1, default_length: int = 32, d case PQAvgPool2d(): # (N, C, H, W) return (batch_size, 1, *default_hw) - # case PQBatchNorm1d(): - # # (N, num_features, L) - # return (batch_size, layer.num_features, *default_length) + case PQBatchNorm1d(): + # (N, num_features, L) + return (batch_size, layer.num_features, default_length) case PQBatchNorm2d(): # (N, num_features, H, W) return (batch_size, layer.num_features, *default_hw) @@ -159,6 +159,7 @@ def get_shape(model: nn.Module, batch_size: int = 1, default_length: int = 32, d 'PQConv2d(2, 3, kernel_size=(3,3), padding=0, bias=False)', 'PQConv2d(2, 3, kernel_size=(3,3), padding=0, stride=2)', 'PQConv2d(2, 3, kernel_size=(3,3), padding=1, stride=2)', + 'PQBatchNorm1d(3)', 'PQBatchNorm2d(3)', 'PQAvgPool1d(2, padding=1)', 'PQAvgPool1d(2, padding=0)',