# Copyright 2019 Google LLC
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import copy
import json
import os
import re
import tempfile
import types
import keras
import keras.ops.numpy as knp
import networkx as nx
import six
from keras import KerasTensor, Model, layers, models, optimizers
from keras import ops as Kops
from .qconv2d_batchnorm import QConv2DBatchnorm
from .qconvolutional import (
QConv1D,
QConv2D,
QConv2DTranspose,
QDepthwiseConv2D,
QSeparableConv1D,
QSeparableConv2D,
)
from .qdepthwiseconv2d_batchnorm import QDepthwiseConv2DBatchnorm
from .qdense_batchnorm import QDenseBatchnorm
from .qlayers import Clip, QActivation, QAdaptiveActivation, QDense, QInitializer
from .qmac import QScaleShift
from .qnormalization import QBatchNormalization
from .qpooling import QAveragePooling2D, QGlobalAveragePooling2D
from .qrecurrent import (
QGRU,
QLSTM,
QBidirectional,
QGRUCell,
QLSTMCell,
QSimpleRNN,
QSimpleRNNCell,
)
from .qtools import qgraph
from .quantizers import *
# from .google_internals.experimental_quantizers import quantized_bits_learnable_scale
# from .google_internals.experimental_quantizers import parametric_quantizer_d_xmax
REGISTERED_LAYERS = [
"QActivation",
"QAdaptiveActivation",
"QDense",
"QConv1D",
"QConv2D",
"QSeparableConv1D",
"QSeparableConv2D",
"QDepthwiseConv2D",
"QConv2DTranspose",
"QSimpleRNN",
"QLSTM",
"QGRU",
"QBidirectional",
"QBatchNormalization",
"QConv2DBatchnorm",
"QDepthwiseConv2DBatchnorm",
"QAveragePooling2D",
"QGlobalAveragePooling2D",
"QDenseBatchnorm",
]
[docs]
def find_bn_fusing_layer_pair(model, custom_objects={}):
"""Finds layers that can be fused with the following batchnorm layers.
Args:
model: input model
custom_objects: Dict of model specific objects needed for cloning.
Returns:
Dict that marks all the layer pairs that need to be fused.
Note: supports sequential and non-sequential model
"""
fold_model = clone_model(model, custom_objects)
(graph, _) = qgraph.GenerateGraphFromModel(
fold_model, "quantized_bits(8, 0, 1)", "quantized_bits(8, 0, 1)"
)
qgraph.GraphAddSingleSourceSingleSink(graph)
qgraph.GraphRemoveNodeWithNodeType(graph, "InputLayer")
qgraph.GraphPropagateActivationsToEdges(graph)
# Finds the Batchnorm nodes and mark them.
layers_followed_by_bn = {}
bn_layers_to_skip = set()
for node_id in nx.topological_sort(graph):
node = graph.nodes[node_id]
layer = node["layer"][0]
if layer:
successor_ids = list(graph.successors(node_id))
is_single = len(successor_ids) == 1
successor_layer = graph.nodes[successor_ids[0]]["layer"][0]
followed_by_bn = successor_layer.__class__.__name__ == "QBatchNormalization"
# TODO(lishanok): extend to QDense types
enable_bn_fusing = (
layer.__class__.__name__ in ["QConv2D", "QDepthwiseConv2D"]
and is_single
and followed_by_bn
)
if enable_bn_fusing:
layers_followed_by_bn[layer.name] = successor_layer.name
bn_layers_to_skip.add(successor_layer.name)
return (layers_followed_by_bn, bn_layers_to_skip)
[docs]
def add_bn_fusing_weights(prev_layer, bn_layer, saved_weights):
"""Adds additional fusing weights to saved_weights.
In hardware inference, we need to combined fuse previous layer's output with
the following batchnorm op.
z[i] = bn(y[i]) = inv[i] * y'[i] * scale[i] - bias'[i] is the final output
of the previous layer and bn layer, with:
inv[i] = gamma[i]* rsqrt(variance[i]^2+epsilon) is computed from the
bn layer weights
y'[i] is the i-th channel output from the previous layer (before scale)
scale[i] is the i-th channel kernel quantizer scale
fused_bias[i] = inv[i] * bias[i] + beta[i] - inv[i]*mean[i] where bias is
the bias term from the previous layer, beta and mean are the bn
layer weights.
Args:
prev_layer: qkeras layer, could be QConv2D/QDepthwiseConv2D/QDense.
bn_layer: The following QBatchNormalization layer that needs to be
fused with the previous layer.
saved_weights: Dict. The centralized weights dictionary that exports
relevant weights and parameters for hardware inference.
"""
bn_qs = bn_layer.quantizers
bn_ws = bn_layer.get_weights()
if bn_qs[4] is not None:
assert bn_qs[0] is None and bn_qs[3] is None, (
"If using the inverse quantizer, the gamma and variance quantizers "
"should not be used in order to avoid quantizing a value twice."
)
def apply_quantizer(quantizer, input_weight):
if quantizer:
weight = input_weight
weight = quantizer(weight)
else:
weight = input_weight
return weight
# Quantize respective bn layer weights
gamma = 1.0
beta = 0
idx = 0
if bn_layer.scale:
gamma = apply_quantizer(bn_layer.gamma_quantizer_internal, bn_ws[idx])
idx += 1
if bn_layer.center:
beta = apply_quantizer(bn_layer.beta_quantizer_internal, bn_ws[idx])
idx += 1
mean = apply_quantizer(bn_layer.mean_quantizer_internal, bn_ws[idx])
idx += 1
variance = apply_quantizer(bn_layer.variance_quantizer_internal, bn_ws[idx])
# Compute inv[i]
inv = gamma * (1.0 / keras.ops.sqrt(variance + bn_layer.epsilon))
inv = keras.ops.convert_to_numpy(inv)
if bn_layer.inverse_quantizer_internal is not None:
quantizer = bn_layer.inverse_quantizer_internal
inv = quantizer(inv)
# Compute fused_bias[i]
if prev_layer.use_bias:
cur_weights = prev_layer.get_weights()
assert len(cur_weights) == 2, (
"Weights should have length of 2. Found" f"{len(cur_weights)} instead."
)
prev_bias = cur_weights[-1]
else:
prev_bias = 0
b_prime = inv * prev_bias + beta - inv * mean
saved_weights[prev_layer.name]["enable_bn_fusing"] = True
saved_weights[prev_layer.name]["fused_bn_layer_name"] = bn_layer.name
saved_weights[prev_layer.name]["bn_inv"] = inv
saved_weights[prev_layer.name]["fused_bias"] = b_prime
# Model utilities: before saving the weights, we want to apply the quantizers
[docs]
def model_save_quantized_weights(model, filename=None, custom_objects={}):
"""Quantizes model for inference and save it.
Takes a model with weights, apply quantization function to weights and
returns a dictionary with quantized weights.
User should be aware that "po2" quantization functions cannot really
be quantized in meaningful way in Keras. So, in order to preserve
compatibility with inference flow in Keras, we do not covert "po2"
weights and biases to exponents + signs (in case of quantize_po2), but
return instead (-1)**sign*(2**round(log2(x))). In the returned dictionary,
we will return the pair (sign, round(log2(x))).
Special care needs to be given to quantized_bits(alpha="auto_po2") as well.
Since in this quantizer, hardware needs the integer weights and scale for
hardware inference, this function will return the pair (scale,
integer_weights) in the returned dictionary.
Arguments:
model: model with weights to be quantized.
filename: if specified, we will save the hdf5 containing the quantized
weights so that we can use them for inference later on.
custom_objects: Dict of model specific objects needed to load/store.
Returns:
dictionary containing layer name and quantized weights that can be used
by a hardware generator.
"""
saved_weights = {}
# Find the conv/dense layers followed by Batchnorm layers
(fusing_layer_pair_dict, bn_layers_to_skip) = find_bn_fusing_layer_pair(
model, custom_objects
)
print("... quantizing model")
for layer in model.layers:
if hasattr(layer, "get_quantizers"):
# weights for software inference
weights = []
signs = []
scales = []
# weights for hardware inference
hw_weights = []
if any(
isinstance(layer, t)
for t in [QConv2DBatchnorm, QDenseBatchnorm, QDepthwiseConv2DBatchnorm]
):
qs = layer.get_quantizers()
ws = layer.get_folded_weights()
elif any(isinstance(layer, t) for t in [QSimpleRNN, QLSTM, QGRU]):
qs = layer.get_quantizers()[:-1]
ws = layer.get_weights()
else:
qs = layer.get_quantizers()
ws = layer.get_weights()
has_sign = False
has_scale = False
enable_bn_fusing = False
# isinstance() might fail due to inconsistent module import path.
# Use __class__.__name__ instead.
layer_class = layer.__class__.__name__
if layer_class == "QBatchNormalization" and layer.name in bn_layers_to_skip:
# Mark current bn layer to be fused with the previous layer
enable_bn_fusing = True
for quantizer, weight in zip(qs, ws):
if quantizer:
weight = weight
weight = quantizer(weight)
# If quantizer is power-of-2 (quantized_po2 or quantized_relu_po2),
# we would like to process it here.
#
# However, we cannot, because we will lose sign information as
# quanized_po2 will be represented by the tuple (sign, log2(abs(w))).
#
# In addition, we will not be able to use the weights on the model
# any longer.
#
# So, instead of "saving" the weights in the model, we will return
# a dictionary so that the proper values can be propagated.
# Weights store the weight in the format that software inference uses.
weights.append(weight)
q_name = ""
if quantizer:
if isinstance(quantizer, six.string_types):
q_name = quantizer
elif hasattr(quantizer, "__name__"):
q_name = quantizer.__name__
elif hasattr(quantizer, "name"):
q_name = quantizer.name
elif hasattr(quantizer, "__class__"):
q_name = quantizer.__class__.__name__
if quantizer and ("_po2" in q_name):
# Quantized_relu_po2 does not have a sign.
if q_name == "quantized_po2":
has_sign = True
sign = knp.sign(weight)
# Makes sure values are -1 or +1 only
sign += 1.0 - knp.abs(sign)
# hw_weight store the weight in the format that hardware inference
# uses.
hw_weight = knp.round(knp.log2(knp.abs(weight)))
signs.append(sign)
scales.append([])
elif q_name == "quantized_bits" and quantizer.alpha == "auto_po2":
unsigned_bits = quantizer.bits - quantizer.keep_negative
m = Kops.cast(pow(2.0, unsigned_bits), float)
m_i = Kops.cast(
keras.ops.power(2.0, quantizer.integer), float
)
assert isinstance(quantizer.scale, np.ndarray) or \
isinstance(quantizer.scale, KerasTensor) or \
keras.ops.is_tensor(quantizer.scale)
(
"The auto_po2 quantizer has to be called first in order "
"to know the values of scale."
)
scale = (
keras.ops.convert_to_numpy(quantizer.scale)
if isinstance(quantizer.scale, KerasTensor)
else quantizer.scale
)
scale = Kops.cast(scale, float)
# Make sure scale is power of 2 values
log2val = knp.log2(scale)
diff = knp.round(log2val) - log2val
assert knp.all(diff == 0), "scale must be power of 2 values!"
# Convert fixed point weight to integer weight, just
hw_weight = weight * m / m_i
# Because hw_weight is integer weights, set scale = scale * m_i / m
# so that when we can multiply scale with the integer weight
# during hardware inference to get the fixed point weights
scale = scale * m_i / m
has_scale = True
scales.append(scale)
else:
hw_weight = weight
signs.append([])
scales.append([])
hw_weights.append(hw_weight)
# Save the weights in the format that hardware inference uses
saved_weights[layer.name] = {
"weights": hw_weights,
"enable_bn_fusing": enable_bn_fusing,
}
if isinstance(layer, QAveragePooling2D) or isinstance(
layer, QGlobalAveragePooling2D
):
if isinstance(layer, QAveragePooling2D):
pool_area = layer.pool_size
if isinstance(layer.pool_size, int):
pool_area = layer.pool_size * layer.pool_size
else:
pool_area = knp.prod(layer.pool_size)
else:
pool_area = layer.compute_pooling_area(
input_shape=layer.input.shape
)
pool_area = keras.ops.cast(pool_area, dtype=float)
if hasattr(layer, "average_quantizer_internal") and callable(
layer.average_quantizer_internal
):
saved_weights[layer.name]["q_mult_factor"] = (
keras.ops.convert_to_numpy(layer.average_quantizer_internal(1.0 / pool_area))
)
else:
saved_weights[layer.name]["q_mult_factor"] = None
saved_weights[layer.name]["mult_factor"] = 1.0 / pool_area
saved_weights[layer.name]["pool_area"] = pool_area
if has_sign:
saved_weights[layer.name]["signs"] = signs
if has_scale:
saved_weights[layer.name]["scales"] = scales
if not any(
isinstance(layer, t)
for t in [QConv2DBatchnorm, QDenseBatchnorm, QDepthwiseConv2DBatchnorm]
):
# Set layer weights in the format that software inference uses
layer.set_weights(weights)
else:
print(
layer.name,
" conv and batchnorm weights cannot be seperately"
" quantized because they will be folded before quantization.",
)
# adjust weights for bn fusing if necessary
if layer.name in fusing_layer_pair_dict.keys():
print(
f"Fuse {layer.name} output with "
f"{fusing_layer_pair_dict[layer.name]} for hardware inference."
)
add_bn_fusing_weights(
prev_layer=layer,
bn_layer=model.get_layer(fusing_layer_pair_dict[layer.name]),
saved_weights=saved_weights,
)
elif layer.get_weights():
print(" ", layer.name, "has not been quantized")
if filename:
model.save_weights(filename)
return saved_weights
[docs]
def quantize_activation(layer_config, activation_bits):
"""Replaces activation by quantized activation functions."""
str_act_bits = str(activation_bits)
# relu -> quantized_relu(bits)
# tanh -> quantized_tanh(bits)
# sigmoid -> quantized_sigmoid(bits)
# more to come later
if layer_config.get("activation", None) is None:
return
if isinstance(layer_config["activation"], six.string_types):
a_name = layer_config["activation"]
elif isinstance(layer_config["activation"], types.FunctionType):
a_name = layer_config["activation"].__name__
else:
a_name = layer_config["activation"].__class__.__name__
if a_name == "linear":
return
if a_name == "relu":
layer_config["activation"] = "quantized_relu(" + str_act_bits + ")"
elif a_name == "tanh":
layer_config["activation"] = "quantized_tanh(" + str_act_bits + ")"
elif a_name == "sigmoid":
layer_config["activation"] = "quantized_sigmoid(" + str_act_bits + ")"
[docs]
def get_config(quantizer_config, layer, layer_class, parameter=None):
"""Returns search of quantizer on quantizer_config."""
quantizer = quantizer_config.get(
layer["config"]["name"], quantizer_config.get(layer_class, None)
)
if quantizer is not None and parameter is not None:
quantizer = quantizer.get(parameter, None)
return quantizer
[docs]
def is_TFOpLambda_layer(layer):
return layer.__class__.__name__ == "TFOpLambda"
[docs]
def get_y_from_TFOpLambda(model_cfg, layer):
"""Get the value of "y" from the TFOpLambda layer's configuration.
Args:
model_cfg: dictionary type, model.get_config() output
layer: a given layer instance
Return:
value of "y" for a TFOpLambda layer. 'y' here corresponds to how tensorflow
stores TFOpLambda layer parameter in serialization. for example,
TFOpLambda(func), where func is knp.multiply(input_tensor, 3). "y" would be
the value 3.
"""
for layer_config in model_cfg["layers"]:
op_name = layer_config["config"]["name"]
class_name = layer_config["class_name"]
# TODO(lishanok): Extend support for other TFOpLambda types when needed
if op_name == layer.name and class_name == "TFOpLambda":
assert (
"knp.multiply" in op_name
), f"TFOpLambda layer {op_name} not supported!"
return layer_config["inbound_nodes"][-1][-1]["y"]
return None
[docs]
def convert_to_folded_model(model):
"""Find Conv/Dense layers followed by BN layers and fold them.
Args:
model: Keras model instance.
Returns:
new_model: Keras model with folded BN layers.
layers_to_fold: list of folded layer names.
"""
fold_model = clone_model(model)
model_cfg = model.get_config()
(graph, _) = qgraph.GenerateGraphFromModel(
fold_model, "quantized_bits(8, 0, 1)", "quantized_bits(8, 0, 1)"
)
qgraph.GraphAddSingleSourceSingleSink(graph)
qgraph.GraphRemoveNodeWithNodeType(graph, "InputLayer")
qgraph.GraphPropagateActivationsToEdges(graph)
bn_nodes_to_delete = []
layers_to_fold = []
for node_id in nx.topological_sort(graph):
node = graph.nodes[node_id]
layer = node["layer"][0]
if layer:
successor_ids = list(graph.successors(node_id))
is_single = len(successor_ids) == 1
successor_layer = graph.nodes[successor_ids[0]]["layer"][0]
followed_by_bn = successor_layer.__class__.__name__ == "BatchNormalization"
if (
layer.__class__.__name__ in ["Conv2D", "DepthwiseConv2D"]
and is_single
and followed_by_bn
):
bn_nodes_to_delete.append(successor_ids[0])
layers_to_fold.append(layer.name)
for node_id in bn_nodes_to_delete:
qgraph.GraphRemoveNode(graph, node_id)
model_outputs = []
x = model_inputs = fold_model.inputs
for node_id in nx.topological_sort(graph):
layer_input_tensors = []
node = graph.nodes[node_id]
layer = node["layer"][0]
if layer:
for parent_node_id in graph.predecessors(node_id):
edge = graph.edges[(parent_node_id, node_id)]
input_tensor = edge["tensor"]
layer_input_tensors.append(input_tensor)
if len(layer_input_tensors) == 1:
layer_input_tensors = layer_input_tensors[0]
# Special handling depending on the layer type
if isinstance(layer, (layers.Multiply, layers.Add, layers.Concatenate)):
# Merge layers expect a list of tensors as input
x = layer(
layer_input_tensors
if isinstance(layer_input_tensors, list)
else [layer_input_tensors]
)
elif is_TFOpLambda_layer(layer):
# For TFOpLambda layers, pass additional input `y` derived from the model configuration
y = get_y_from_TFOpLambda(model_cfg, layer)
x = layer(layer_input_tensors, y)
else:
# Standard layer call with single input tensor
x = layer(layer_input_tensors)
for u, v in graph.edges(node_id):
graph[u][v]["tensor"] = x
if v == -2 and not any(x is y for y in model_outputs):
model_outputs.append(x)
keras_outputs = [t for t in model_outputs if isinstance(t, KerasTensor)]
new_model = Model(inputs=model_inputs, outputs=keras_outputs)
return new_model, layers_to_fold
[docs]
def model_quantize(
model,
quantizer_config,
activation_bits,
custom_objects=None,
transfer_weights=False,
prefer_qadaptiveactivation=False,
enable_bn_folding=False,
):
"""Creates a quantized model from non-quantized model.
The quantized model translation is based on json interface of Keras,
which requires a custom_objects dictionary for "string" types.
Because of the way json works, we pass "string" objects for the
quantization mechanisms and we perform an eval("string") which
technically is not safe, but it will do the job.
The quantizer_config is a dictionary with the following form.
{
Dense_layer_name: {
"kernel_quantizer": "quantizer string",
"bias_quantizer": "quantizer_string"
},
Conv2D_layer_name: {
"kernel_quantizer": "quantizer string",
"bias_quantizer": "quantizer_string"
},
Activation_layer_name: "quantizer string",
"QActivation": { "relu": "quantizer_string" },
"QConv2D": {
"kernel_quantizer": "quantizer string",
"bias_quantizer": "quantizer_string"
},
"QBatchNormalization": {}
}
In the case of "QBidirectional", we can follow the same form as above.
The specified configuration will be used for both forward and backwards
layer.
{
"Bidirectional" : {
"kernel_quantizer" : "quantizer string",
"bias_quantizer" : "quantizer string",
"recurrent_quantizer" : "quantizer string"
}
}
In the case of "QActivation", we can modify only certain types of
activations, for example, a "relu". In this case we represent the
activation name by a dictionary, or we can modify all activations,
without representhing as a set.
We right now require a default case in case we cannot find layer name.
This simplifies the dictionary because the simplest case, we can just
say:
{
"default": {
"kernel": "quantized_bits(4)",
"bias": "quantized_bits(4)"
}
}
and this will quantize all layers' weights and bias to be created with
4 bits.
Arguments:
model: model to be quantized
quantizer_config: dictionary (as above) with quantized parameters
activation_bits: number of bits for quantized_relu, quantized_tanh,
quantized_sigmoid
custom_objects: dictionary following keras recommendations for json
translation.
transfer_weights: if true, weights are to be transfered from model to
qmodel.
prefer_qadaptiveactivation: Bool. If true, try to use QAdaptiveActivation
over QActivation whenever possible
enable_bn_folding: Bool. If true, fold conv/dense layers with
following batch normalization layers whenever possible. use
QConv2DBatchnorm for example, to replace conv2d layers
Returns:
qmodel with quantized operations and custom_objects.
"""
if enable_bn_folding:
# Removes bn layers from the model and find a list of layers to fold.
model, layers_to_fold = convert_to_folded_model(model)
if len(layers_to_fold) == 0:
# If no layers to fold, no need to perform folding.
enable_bn_folding = False
if not custom_objects:
custom_objects = {}
# Let's make a deep copy to make sure our objects are not shared elsewhere.
jm = copy.deepcopy(json.loads(model.to_json()))
custom_objects = copy.deepcopy(custom_objects)
config = jm["config"]
layers = config["layers"]
def quantize_rnn(layer, quantizer_config):
q_name = "Q" + layer["class_name"]
# Needs to add kernel, recurrent bias quantizers.
kernel_quantizer = get_config(
quantizer_config, layer, q_name, "kernel_quantizer"
)
recurrent_quantizer = get_config(
quantizer_config, layer, q_name, "recurrent_quantizer"
)
if layer["config"]["use_bias"]:
bias_quantizer = get_config(
quantizer_config, layer, q_name, "bias_quantizer"
)
else:
bias_quantizer = None
state_quantizer = get_config(quantizer_config, layer, q_name, "state_quantizer")
# This is to avoid unwanted transformations.
if kernel_quantizer is None:
return
layer["config"]["kernel_quantizer"] = kernel_quantizer
layer["config"]["recurrent_quantizer"] = recurrent_quantizer
layer["config"]["bias_quantizer"] = bias_quantizer
layer["config"]["state_quantizer"] = state_quantizer
# If activation is present, add activation here.
activation = get_config(quantizer_config, layer, q_name, "activation_quantizer")
if activation:
layer["config"]["activation"] = activation
else:
quantize_activation(layer["config"], activation_bits)
# If recurrent activation is present, add activation here.
if layer["class_name"] in ["LSTM", "GRU"]:
recurrent_activation = get_config(
quantizer_config, layer, q_name, "recurrent_activation_quantizer"
)
if recurrent_activation:
layer["config"]["recurrent_activation"] = recurrent_activation
layer["class_name"] = q_name
registered_name = layer.pop("registered_name", None)
if registered_name:
layer["registered_name"] = q_name
for layer in layers:
layer_config = layer["config"]
# Dense becomes QDense, Conv1D becomes QConv1D etc
# Activation converts activation functions.
if layer["class_name"] in [
"Dense",
"Conv1D",
"Conv2D",
"Conv2DTranspose",
"SeparableConv1D",
"SeparableConv2D",
]:
if (
layer["class_name"] in ["Dense", "Conv2D"]
and enable_bn_folding
and layer["name"] in layers_to_fold
):
# Only fold if current layer is followed by BN layer.
q_name = "Q" + layer["class_name"] + "Batchnorm"
layer_config["use_bias"] = True # Folded layers require a bias
# Sets ema_freeze_delay and folding_mode specific to
# QDepthwiseConv2DBatchnorm layer config.
folding_mode = get_config(
quantizer_config, layer, q_name, "folding_mode"
)
layer_config["folding_mode"] = (
folding_mode if folding_mode else "ema_stats_folding"
)
ema_freeze_delay = get_config(
quantizer_config, layer, q_name, "ema_freeze_delay"
)
layer_config["ema_freeze_delay"] = (
ema_freeze_delay if ema_freeze_delay else None
)
else:
q_name = "Q" + layer["class_name"]
# Needs to add kernel/bias quantizers.
kernel_quantizer = get_config(
quantizer_config, layer, q_name, "kernel_quantizer"
)
if layer_config["use_bias"]:
bias_quantizer = get_config(
quantizer_config, layer, q_name, "bias_quantizer"
)
else:
bias_quantizer = None
if (
kernel_quantizer is None
and q_name == "Q" + layer["class_name"] + "Batchnorm"
):
# Tries none-folded layer quantizer as a back up.
kernel_quantizer = get_config(
quantizer_config,
layer,
"Q" + layer["class_name"],
"kernel_quantizer",
)
bias_quantizer = get_config(
quantizer_config, layer, "Q" + layer["class_name"], "bias_quantizer"
)
# This is to avoid unwanted transformations.
if kernel_quantizer is None:
continue
layer["class_name"] = q_name
layer_config["kernel_quantizer"] = kernel_quantizer
layer_config["bias_quantizer"] = bias_quantizer
# If activation is present, add activation here.
quantizer = get_config(
quantizer_config, layer, q_name, "activation_quantizer"
)
if quantizer:
layer_config["activation"] = quantizer
else:
quantize_activation(layer_config, activation_bits)
elif layer["class_name"] == "DepthwiseConv2D":
if enable_bn_folding and layer["name"] in layers_to_fold:
q_name = "QDepthwiseConv2DBatchnorm"
layer_config["use_bias"] = True # Folded layers require a bias
# Sets ema_freeze_delay and folding_mode specific to
# QDepthwiseConv2DBatchnorm layers.
folding_mode = get_config(
quantizer_config, layer, q_name, "folding_mode"
)
layer_config["folding_mode"] = (
folding_mode if folding_mode else "ema_stats_folding"
)
ema_freeze_delay = get_config(
quantizer_config, layer, q_name, "ema_freeze_delay"
)
layer_config["ema_freeze_delay"] = (
ema_freeze_delay if ema_freeze_delay else None
)
else:
q_name = "QDepthwiseConv2D"
# Needs to add kernel/bias quantizers.
depthwise_quantizer = get_config(
quantizer_config, layer, q_name, "depthwise_quantizer"
)
if layer_config["use_bias"]:
bias_quantizer = get_config(
quantizer_config, layer, q_name, "bias_quantizer"
)
else:
bias_quantizer = None
if depthwise_quantizer is None and q_name == "QDepthwiseConv2DBatchnorm":
# Tries none-folded layer quantizer as a back up.
depthwise_quantizer = get_config(
quantizer_config, layer, "QDepthwiseConv2D", "depthwise_quantizer"
)
bias_quantizer = get_config(
quantizer_config, layer, "QDepthwiseConv2D", "bias_quantizer"
)
# This is to avoid unwanted transformations.
if depthwise_quantizer is None:
continue
layer["class_name"] = q_name
layer_config["depthwise_quantizer"] = depthwise_quantizer
layer_config["bias_quantizer"] = bias_quantizer
# If activation is present, add activation here.
quantizer = get_config(
quantizer_config,
layer,
q_name,
"activation_quantizer",
)
if quantizer:
layer_config["activation"] = quantizer
else:
quantize_activation(layer_config, activation_bits)
elif layer["class_name"] in ["SimpleRNN", "LSTM", "GRU"]:
quantize_rnn(layer, quantizer_config)
elif layer["class_name"] == "Bidirectional":
forward_layer_quantizer_config = {
layer_config["layer"]["config"]["name"]: get_config(
quantizer_config, layer, "QBidirectional"
)
}
quantize_rnn(layer["config"]["layer"], forward_layer_quantizer_config)
if "backward_layer" in layer_config:
backward_layer_quantizer_config = {
layer_config["backward_layer"]["config"]["name"]: get_config(
quantizer_config, layer, "QBidirectional"
)
}
quantize_rnn(
layer["config"]["backward_layer"], backward_layer_quantizer_config
)
layer["class_name"] = "QBidirectional"
elif layer["class_name"] == "Activation":
if prefer_qadaptiveactivation: # Try to find QAdaptiveActivation first
quantizer = get_config(quantizer_config, layer, "QAdaptiveActivation")
is_qadaptiveactivation = True
if quantizer is None: # Try QActivation as a backup
quantizer = get_config(quantizer_config, layer, "QActivation")
is_qadaptiveactivation = False
else: # Tries to find QActivation first.
quantizer = get_config(quantizer_config, layer, "QActivation")
is_qadaptiveactivation = False
if quantizer is None: # Try QAdaptiveActivation as a backup
quantizer = get_config(
quantizer_config, layer, "QAdaptiveActivation"
)
is_qadaptiveactivation = True
# This is to avoid softmax from quantizing in autoq.
if quantizer is None:
continue
# If quantizer exists in dictionary related to this name,
# use it, otherwise, use normal transformations.
if not isinstance(quantizer, dict) or quantizer.get(
layer_config["activation"], None
):
# Only change activation layer if we will use a quantized activation.
layer["class_name"] = (
"QAdaptiveActivation" if is_qadaptiveactivation else "QActivation"
)
if isinstance(quantizer, dict):
quantizer = quantizer[layer_config["activation"]]
if quantizer:
if is_qadaptiveactivation:
assert (
quantizer.find(",") < 0
), "Only integer bits should be defined for QAdaptiveActivation"
layer_config["total_bits"] = int(
re.sub(r"[^\d]", "", quantizer)
)
quantizer = re.sub(r"\(.*", "", quantizer) # remove params
layer_config["activation"] = quantizer
else:
quantize_activation(layer_config, activation_bits)
# We have to do this because of other instances of ReLU.
elif layer["class_name"] in ["ReLU", "relu", "LeakyReLU"]:
quantizer = get_config(quantizer_config, layer, "QActivation")
# This is to avoid unwanted transformations.
if quantizer is None:
continue
if layer["class_name"] == "LeakyReLU":
negative_slope = layer["config"]["alpha"]
elif layer["class_name"] == "relu":
max_value = layer["config"]["max_value"]
negative_slope = layer["config"]["alpha"]
threshold = layer["config"]["threshold"]
else: # ReLU from mobilenet
max_value = layer["config"]["max_value"]
negative_slope = layer["config"]["negative_slope"]
threshold = layer["config"]["threshold"]
if negative_slope > 0:
q_name = "leakyrelu"
else:
q_name = "relu"
# If quantizer exists in dictionary related to this name,
# use it, otherwise, use normal transformations.
if not isinstance(quantizer, dict) or quantizer.get(q_name, None):
# Only change activation layer if we will use a quantized activation.
layer["class_name"] = "QActivation"
# Remove relu specific configurations
# remember that quantized relu's are always upper bounded.
if layer["class_name"] == "LeakyReLU":
del layer["config"]["alpha"]
elif layer["class_name"] == "relu":
del layer["config"]["max_value"]
del layer["config"]["alpha"]
del layer["config"]["threshold"]
else: # ReLU from mobilenet
del layer["config"]["max_value"]
del layer["config"]["negative_slope"]
del layer["config"]["threshold"]
if isinstance(quantizer, dict):
quantizer = quantizer[q_name]
if quantizer:
layer["config"]["activation"] = quantizer
else:
quantize_activation(layer["config"], activation_bits)
elif layer["class_name"] == "BatchNormalization":
# We will assume at least QBatchNormalization or
# layer name is in dictionary to enable conversion
# otherwise we will just skip it.
if (
layer_config["name"] not in quantizer_config
and "QBatchNormalization" not in quantizer_config
):
continue
layer["class_name"] = "QBatchNormalization"
# Needs to add kernel/bias quantizers.
gamma_quantizer = get_config(
quantizer_config, layer, "QBatchNormalization", "gamma_quantizer"
)
beta_quantizer = get_config(
quantizer_config, layer, "QBatchNormalization", "beta_quantizer"
)
mean_quantizer = get_config(
quantizer_config, layer, "QBatchNormalization", "mean_quantizer"
)
variance_quantizer = get_config(
quantizer_config, layer, "QBatchNormalization", "variance_quantizer"
)
layer_config["gamma_quantizer"] = gamma_quantizer
layer_config["beta_quantizer"] = beta_quantizer
layer_config["mean_quantizer"] = mean_quantizer
layer_config["variance_quantizer"] = variance_quantizer
elif layer["class_name"] in ["AveragePooling2D", "GlobalAveragePooling2D"]:
q_name = "Q" + layer["class_name"]
# Adds the average quanizer to config.
average_quantizer = get_config(
quantizer_config, layer, q_name, "average_quantizer"
)
# This is to avoid unwanted transformations.
if average_quantizer is None:
continue
layer["class_name"] = q_name
layer_config["average_quantizer"] = average_quantizer
# Adds activation to config.
quantizer = get_config(
quantizer_config, layer, q_name, "activation_quantizer"
)
if quantizer:
layer_config["activation"] = quantizer
else:
quantize_activation(layer_config, activation_bits)
q_name = None
registered_name = layer.pop("registered_name", None)
if registered_name:
layer["registered_name"] = q_name or registered_name
# We need to keep a dictionary of custom objects as our quantized library
# is not recognized by keras.
qmodel = quantized_model_from_json(json.dumps(jm), custom_objects)
# If transfer_weights is true, we load the weights from model to qmodel.
if transfer_weights and not enable_bn_folding:
for layer, qlayer in zip(model.layers, qmodel.layers):
if layer.get_weights():
qlayer.set_weights(copy.deepcopy(layer.get_weights()))
return qmodel
def _add_supported_quantized_objects(custom_objects):
"""Map all the quantized objects and quantizers."""
for name, quantizer in quantizer_registry._QUANTIZERS_REGISTRY._container.items():
custom_objects[name] = quantizer
custom_objects["QInitializer"] = QInitializer
custom_objects["QDense"] = QDense
custom_objects["QConv1D"] = QConv1D
custom_objects["QConv2D"] = QConv2D
custom_objects["QConv2DTranspose"] = QConv2DTranspose
custom_objects["QSimpleRNNCell"] = QSimpleRNNCell
custom_objects["QSimpleRNN"] = QSimpleRNN
custom_objects["QLSTMCell"] = QLSTMCell
custom_objects["QLSTM"] = QLSTM
custom_objects["QGRUCell"] = QGRUCell
custom_objects["QGRU"] = QGRU
custom_objects["QBidirectional"] = QBidirectional
custom_objects["QDepthwiseConv2D"] = QDepthwiseConv2D
custom_objects["QSeparableConv1D"] = QSeparableConv1D
custom_objects["QSeparableConv2D"] = QSeparableConv2D
custom_objects["QActivation"] = QActivation
custom_objects["QAdaptiveActivation"] = QAdaptiveActivation
custom_objects["QBatchNormalization"] = QBatchNormalization
custom_objects["Clip"] = Clip
custom_objects["QConv2DBatchnorm"] = QConv2DBatchnorm
custom_objects["QDepthwiseConv2DBatchnorm"] = QDepthwiseConv2DBatchnorm
custom_objects["QDenseBatchnorm"] = QDenseBatchnorm
custom_objects["QAveragePooling2D"] = QAveragePooling2D
custom_objects["QGlobalAveragePooling2D"] = QGlobalAveragePooling2D
custom_objects["QScaleShift"] = QScaleShift
[docs]
def clone_model(model, custom_objects=None):
"""Clone a qkeras model safely."""
if not custom_objects:
custom_objects = {}
custom_objects = copy.deepcopy(custom_objects)
try:
qmodel = keras.models.clone_model(model)
qmodel.set_weights(model.get_weights())
except Exception as e:
print(f"[ERROR] Model cloning failed: {e}")
raise e
return qmodel
[docs]
def remove_mask_keys(obj):
if isinstance(obj, dict):
# Removes "mask" from all dictionaries (no matter how deeply nested)
return {k: remove_mask_keys(v) for k, v in obj.items() if k != "mask"}
elif isinstance(obj, list):
# Also processes lists (e.g., inbound_nodes is a list)
return [remove_mask_keys(v) for v in obj]
else:
return obj # Primitive values (e.g., int, str) remain unchanged
[docs]
def quantized_model_from_json(json_string, custom_objects=None):
if not custom_objects:
custom_objects = {}
# Makes a deep copy to ensure our objects are not shared elsewhere.
custom_objects = copy.deepcopy(custom_objects)
_add_supported_quantized_objects(custom_objects)
json_dict = json.loads(json_string)
# Recursively remove "mask"
cleaned_json_dict = remove_mask_keys(json_dict)
# Convert back into JSON string
cleaned_json_string = json.dumps(cleaned_json_dict)
# Load the model
qmodel = models.model_from_json(cleaned_json_string, custom_objects=custom_objects)
return qmodel
[docs]
def load_qmodel(filepath, custom_objects=None, compile=True):
"""Loads quantized model from Keras's model.save() h5 file.
Arguments:
filepath: one of the following:
- string, path to the saved model
- h5py.File or h5py.Group object from which to load the model
- any file-like object implementing the method `read` that returns
`bytes` data (e.g. `io.BytesIO`) that represents a valid h5py file
image.
custom_objects: Optional dictionary mapping names (strings) to custom
classes or functions to be considered during deserialization.
compile: Boolean, whether to compile the model after loading.
Returns:
A Keras model instance. If an optimizer was found as part of the saved
model, the model is already compiled. Otherwise, the model is uncompiled
and a warning will be displayed. When `compile` is set to False, the
compilation is omitted without any warning.
"""
if not custom_objects:
custom_objects = {}
# Makes a deep copy to make sure our objects are not shared elsewhere.
custom_objects = copy.deepcopy(custom_objects)
_add_supported_quantized_objects(custom_objects)
qmodel = keras.models.load_model(
filepath, custom_objects=custom_objects, compile=compile
)
return qmodel
[docs]
def print_model_sparsity(model):
"""Prints sparsity for the pruned layers in the model."""
def _get_sparsity(weights):
return 1.0 - np.count_nonzero(weights) / float(weights.size)
print(f"Model Sparsity Summary ({model.name})")
print("--")
for layer in model.layers:
prunable_weights = None
if prunable_weights:
print(
"{}: {}".format(
layer.name,
", ".join(
[
f"({weight.name}, {str(_get_sparsity(keras.ops.convert_to_numpy(weight)))})"
for weight in prunable_weights
]
),
)
)
print("\n")
[docs]
def get_model_sparsity(model, per_layer=False, allow_list=None):
"""Calculates the sparsity of the model's weights and biases.
Quantizes the model weights using model_save_quantized_weights (but does not
save the quantized weights) before calculating the proportion of weights and
biases set to zero.
Arguments:
model: The model to use to calculate sparsity. Assumes that this is a
qkeras model with trained weights.
per_layer: If to return a per-layer breakdown of sparsity
allow_list: A list of layer class names that sparsity will be calculated
for. If set to None, a default list will be used.
Returns:
A float value representing the proportion of weights and biases set to
zero in the quantized model. If per_layer is True, it also returns a
per-layer breakdown of model sparsity formatted as a list of tuples in the
form (<layer name>, <sparsity proportion>)
"""
# Checks if to use a default list of allowed layers to calculate sparsity.
if allow_list is None:
allow_list = [
"QDense",
"Dense",
"QConv1D",
"Conv1D",
"QConv2D",
"Conv2D",
"QDepthwiseConv2D",
"DepthwiseConv2D",
"QSeparableConv1D",
"SeparableConv1D",
"QSeparableConv2D",
"SeparableConv2D",
"QOctaveConv2D",
"QSimpleRNN",
"RNN",
"QLSTM",
"QGRU",
"QConv2DTranspose",
"Conv2DTranspose",
"QConv2DBatchnorm",
"QDepthwiseConv2DBatchnorm",
]
# Quantizes the model weights for a more accurate sparsity calculation.
model_save_quantized_weights(model)
# Calculates the sparsity layer by layer.
layer_sparsity = []
total_sparsity = 0.0
all_weights = []
for layer in model.layers:
if hasattr(layer, "quantizers") and layer.__class__.__name__ in allow_list:
if layer.__class__.__name__ in [
"QConv2DBatchnorm",
"QDepthwiseConv2DBatchnorm",
]:
weights_to_examine = layer.get_folded_weights()
else:
weights_to_examine = layer.get_weights()
layer_weights = []
for weight in weights_to_examine:
try:
weight_numpy = weight.ravel()
except AttributeError:
# In case of EagerTensor.
weight_numpy = keras.ops.convert_to_numpy(weight).ravel()
layer_weights.append(weight_numpy)
all_weights.append(weight_numpy)
layer_weights = knp.concatenate(layer_weights)
layer_sparsity.append((layer.name, knp.mean(layer_weights == 0)))
if len(all_weights) > 0:
# Average the sparsity for the entire model.
all_weights = knp.concatenate(all_weights)
total_sparsity = knp.mean(all_weights == 0)
if per_layer:
return (total_sparsity, layer_sparsity)
else:
return total_sparsity
[docs]
def quantized_model_debug(model, X_test, plot=False, plt_instance=None):
"""Debugs and plots model weights and activations.
Args:
model: The qkeras model to debug
X_test: The sample data to use to give to model.predict
plot: Bool. If to plot the results.
plt_instance: A matplotlib.pyplot instance used to plot in an IPython
environment.
"""
assert (
plt_instance and plot
) or not plot, "plt_instance is required if plt is True"
outputs = []
output_names = []
for layer in model.layers:
if layer.__class__.__name__ in REGISTERED_LAYERS:
output_names.append(layer.name)
outputs.append(layer.output)
model_debug = Model(inputs=model.inputs, outputs=outputs)
y_pred = model_debug.predict(X_test)
print("{:30} {: 8.4f} {: 8.4f}".format("input", knp.min(X_test), knp.max(X_test)))
for n, p in zip(output_names, y_pred):
layer = model.get_layer(n)
if (
layer.__class__.__name__ in "QActivation"
or layer.__class__.__name__ in "QAdaptiveActivation"
):
alpha = get_weight_scale(layer.activation, p)
else:
alpha = 1.0
print(
f"{n:30} {knp.min(p / alpha): 8.4f} {knp.max(p / alpha): 8.4f}",
end="",
)
if alpha != 1.0:
print(f" a[{knp.min(alpha): 8.4f} {knp.max(alpha):8.4f}]")
if plot and layer.__class__.__name__ in [
"QConv1D",
"QConv2D",
"QConv2DTranspose",
"QDense",
"QActivation",
"QAdaptiveActivation",
"QSimpleRNN",
"QLSTM",
"QGRU",
"QBidirectional",
"QSeparableConv1D",
"QSeparableConv2D",
]:
plt_instance.hist(p.flatten(), bins=25)
plt_instance.title(layer.name + "(output)")
plt_instance.show()
alpha = None
if layer.__class__.__name__ not in [
"QConv2DBatchnorm",
"QDepthwiseConv2DBatchnorm",
]:
weights_to_examine = layer.get_weights()
else:
weights_to_examine = layer.get_folded_weights()
for i, weights in enumerate(weights_to_examine):
if hasattr(layer, "get_quantizers") and layer.get_quantizers()[i]:
# Quantize the current weight using the corresponding quantizer
quantized_weights = layer.get_quantizers()[i](weights)
# Evaluate the tensor to a NumPy array if necessary
if isinstance(quantized_weights, KerasTensor):
weights = keras.ops.convert_to_numpy(quantized_weights)
else:
weights = quantized_weights
if i == 0 and layer.__class__.__name__ in [
"QConv1D",
"QConv2D",
"QConv2DTranspose",
"QDense",
"QSimpleRNN",
"QLSTM",
"QGRU",
"QSeparableConv1D",
"QSeparableConv2D",
"QConv2DBatchnorm",
"QDepthwiseConv2DBatchnorm",
]:
alpha = get_weight_scale(layer.get_quantizers()[i], weights)
alpha_mask = alpha == 0.0
weights = knp.where(alpha_mask, weights * alpha, weights / alpha)
if plot:
plt_instance.hist(weights.flatten(), bins=25)
plt_instance.title(layer.name + "(weights)")
plt_instance.show()
print(
f" ({knp.min(weights): 8.4f} {knp.max(weights): 8.4f})", end=""
)
if alpha is not None and isinstance(alpha, KerasTensor):
print(
f" a({knp.min(alpha): 10.6f} {knp.max(alpha): 10.6f})", end=""
)
print("")
[docs]
def quantized_model_dump(model, x_test, output_dir=None, layers_to_dump=[]):
"""Dumps tensors of target layers to binary files.
Arguments:
model: qkeras model object.
x_test: numpy type, test tensors to generate output tensors.
output_dir: a string for the directory to hold binary data.
layers_to_dump: a list of string, specified layers by layer
customized name.
"""
outputs = []
y_names = []
if not output_dir:
with tempfile.TemporaryDirectory() as output_dir:
print("temp dir", output_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print("create dir", output_dir)
for layer in model.layers:
if not layers_to_dump or layer.name in layers_to_dump:
y_names.append(layer.name)
outputs.append(layer.output)
# Gather the tensor outputs from specified layers at layers_to_dump.
model_debug = Model(inputs=model.inputs, outputs=outputs)
y_pred = model_debug.predict(x_test)
# Dumps tensors to files.
for name, tensor_data in zip(y_names, y_pred):
filename = os.path.join(output_dir, name + ".bin")
print("writing the layer output tensor to ", filename)
with open(filename, "w") as fid:
tensor_data.astype("float32").tofile(fid)
[docs]
def clone_model_and_freeze_auto_po2_scale(
orig_model, orig_model_path=None, quantize_model_weights=False
):
"""Clone model and freeze the scale value of auto_po2 type quantizers.
Args:
orig_model: original model which will be used to clone the new model.
If set to None, the function will load the original model
from orig_model_path argument.
orig_model_path: The path to the original model file.
If set to None, the function will load the original model from the
orig_model argument.
quantize_model_weights: Bool to quantize weights to HW format.
If set to False, the model weights will be in float format.
If set to True, the model weights will be in HW format and the function
will also check if the hw weights extracted from the new model matches
the original model.
Returns:
A tuple of the new model and the new model's hw weights.
Note:
+ When using this function to retrain model with fixed scale value.
Set quantize_model_weights to False in this case.
+ This function only supports a collection of common layers that will use
auto_po2 quantizers. For less common layers, it will raise errors and we
will add more support case by case.
Example usage:
model, _ = clone_model_and_freeze_auto_po2_scale(
orig_model_path="path/to/model",
quantize_model_weights=False)
"""
def _create_bn_layer(layer_cfg, bn_inv_quantizer):
# Clone batch normalization layer with the new inverse quantizer.
if bn_inv_quantizer is not None:
layer_cfg["inverse_quantizer"]["config"] = bn_inv_quantizer.get_config()
return QBatchNormalization(**layer_cfg)
def _create_qconv2d_layer(layer_cfg, kernel_quantizer):
# Clone QConv2D layer wiht the new kernel quantizers.
if kernel_quantizer is not None:
layer_cfg["kernel_quantizer"]["config"] = kernel_quantizer.get_config()
return QConv2D(**layer_cfg)
def _create_qdepthwise_conv2d_layer(layer_cfg, depthwise_quantizer):
# Clone QDepthwiseConv2D layer with the new depthwise_quantizer quantizer.
if depthwise_quantizer is not None:
layer_cfg["depthwise_quantizer"]["config"] = (
depthwise_quantizer.get_config()
)
return QDepthwiseConv2D(**layer_cfg)
def _create_qdense_layer(layer_cfg, kernel_quantizer):
# Clone QDense layer with the new kernel quantizer.
if kernel_quantizer is not None:
layer_cfg["kernel_quantizer"]["config"] = kernel_quantizer.get_config()
return QDense(**layer_cfg)
def _create_other_layer(orig_layer):
# Clone other layers.
config = orig_layer.get_config()
return orig_layer.__class__.from_config(config)
def _create_quantized_bits_with_post_training_scale(q):
# Create a new quantized_bits instance with the fixed scale value.
if q is not None:
q_cfg = q.get_config()
q_cfg["post_training_scale"] = keras.ops.convert_to_numpy(q.scale)
q = quantized_bits(**q_cfg)
return q
def _find_auto_po2_quantizer(layer):
# Find the auto_po2 quantizer in the layer. Note that we allow at
# most one auto_po2 quantizer in each layer due to the limitation of
# the current HW implementation.
num_auto_po2_quantizers = 0
auto_po2_quantizer = None
if hasattr(layer, "quantizers"):
for q in layer.quantizers:
if hasattr(q, "alpha") and q.alpha == "auto_po2":
num_auto_po2_quantizers += 1
auto_po2_quantizer = q
if num_auto_po2_quantizers > 1:
raise ValueError(
f"{layer.name} has more than one auto_po2 quantizer. "
"Please check if this is expected."
)
else:
return auto_po2_quantizer
def _check_hw_weights_equal(hw_weights_1, hw_weights_2):
# Check if the hw weights extracted from the new model matches the
# original model.
for layer_name in hw_weights_2.keys():
for key in hw_weights_2[layer_name].keys():
val1 = hw_weights_2[layer_name][key]
val2 = hw_weights_1[layer_name][key]
if isinstance(val1, list):
for v1, v2 in zip(val1, val2):
if not knp.all(v1 == v2):
raise ValueError(
f"{layer_name}/{key}: No Match! v1={v1}, v2={v2}"
)
elif not knp.all(val1 == val2):
raise ValueError(
f"{layer_name}/{key}: No Match! val1={val1}, val2={val2}"
)
# Load the original model with float weights.
# Note: weights will be quantized later in silicon flow by calling
# model_save_quantized_weights.
if orig_model is not None and orig_model_path is not None:
raise ValueError("Only one of orig_model and orig_model_path can be set.")
elif orig_model is None and orig_model_path is None:
raise ValueError("One of orig_model and orig_model_path must be set.")
elif orig_model_path is not None:
orig_model = load_qmodel(orig_model_path, compile=False)
# Quantize model weights and compute quantizer scale values.
quantized_model = keras.models.clone_model(orig_model)
quantized_model.set_weights(orig_model.get_weights())
# In silicon flow, weight binary files are generated from hw weights.
orig_hw_weights = model_save_quantized_weights(quantized_model)
# Create a new model with fixed scale quantizers.
x = inputs = keras.Input(
shape=orig_model.input_shape[1:], name=orig_model.layers[0].name
)
for layer in quantized_model.layers[1:]:
layer_class = layer.__class__.__name__
auto_po2_quantizer = _find_auto_po2_quantizer(layer)
auto_po2_quantizer_with_frozen_scale = (
_create_quantized_bits_with_post_training_scale(auto_po2_quantizer)
)
layer_cfg = layer.get_config()
# To be compatible with different python versions, we do not use
# match-case style here.
if layer_class == "QConv2D":
x = _create_qconv2d_layer(layer_cfg, auto_po2_quantizer_with_frozen_scale)(
x
)
elif layer_class == "QDepthwiseConv2D":
x = _create_qdepthwise_conv2d_layer(
layer_cfg, auto_po2_quantizer_with_frozen_scale
)(x)
elif layer_class == "QBatchNormalization":
x = _create_bn_layer(layer_cfg, auto_po2_quantizer_with_frozen_scale)(x)
elif layer_class == "QDense":
x = _create_qdense_layer(layer_cfg, auto_po2_quantizer_with_frozen_scale)(x)
else:
x = _create_other_layer(layer)(x)
new_model = keras.Model(inputs, x)
# Set the weights of the new model to the original model (float weights).
new_model.set_weights(orig_model.get_weights())
# Check if the new model still has auto_po2 quantizer.
# This function only supports a colleciton of common layers that will use
# auto_po2 quantizers. For less common layers, we need to add extra support
# in the future.
for layer in new_model.layers:
q = _find_auto_po2_quantizer(layer)
if q is not None and q.post_training_scale is None:
raise ValueError(
f"{layer.name} in the new model still has auto_po2 quantizer with "
"adaptive scales. Please check if this is expected!"
)
new_hw_weights = None
if quantize_model_weights:
new_hw_weights = model_save_quantized_weights(new_model)
# Check if the hw weights extracted from the new model matches the original
# nima model.
_check_hw_weights_equal(orig_hw_weights, new_hw_weights)
return new_model, new_hw_weights
[docs]
def clone_optimizer(opt):
# robust across Keras 2.x and 3.x
return optimizers.deserialize(optimizers.serialize(opt))