Source code for qkeras.utils

# Copyright 2019 Google LLC
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import copy
import json
import os
import re
import tempfile
import types

import keras
import keras.ops.numpy as knp
import networkx as nx
import six
from keras import KerasTensor, Model, layers, models, optimizers
from keras import ops as Kops

from .qconv2d_batchnorm import QConv2DBatchnorm
from .qconvolutional import (
    QConv1D,
    QConv2D,
    QConv2DTranspose,
    QDepthwiseConv2D,
    QSeparableConv1D,
    QSeparableConv2D,
)
from .qdepthwiseconv2d_batchnorm import QDepthwiseConv2DBatchnorm
from .qdense_batchnorm import QDenseBatchnorm
from .qlayers import Clip, QActivation, QAdaptiveActivation, QDense, QInitializer
from .qmac import QScaleShift
from .qnormalization import QBatchNormalization
from .qpooling import QAveragePooling2D, QGlobalAveragePooling2D
from .qrecurrent import (
    QGRU,
    QLSTM,
    QBidirectional,
    QGRUCell,
    QLSTMCell,
    QSimpleRNN,
    QSimpleRNNCell,
)
from .qtools import qgraph
from .quantizers import *

# from .google_internals.experimental_quantizers import quantized_bits_learnable_scale
# from .google_internals.experimental_quantizers import parametric_quantizer_d_xmax

REGISTERED_LAYERS = [
    "QActivation",
    "QAdaptiveActivation",
    "QDense",
    "QConv1D",
    "QConv2D",
    "QSeparableConv1D",
    "QSeparableConv2D",
    "QDepthwiseConv2D",
    "QConv2DTranspose",
    "QSimpleRNN",
    "QLSTM",
    "QGRU",
    "QBidirectional",
    "QBatchNormalization",
    "QConv2DBatchnorm",
    "QDepthwiseConv2DBatchnorm",
    "QAveragePooling2D",
    "QGlobalAveragePooling2D",
    "QDenseBatchnorm",
]


[docs] def find_bn_fusing_layer_pair(model, custom_objects={}): """Finds layers that can be fused with the following batchnorm layers. Args: model: input model custom_objects: Dict of model specific objects needed for cloning. Returns: Dict that marks all the layer pairs that need to be fused. Note: supports sequential and non-sequential model """ fold_model = clone_model(model, custom_objects) (graph, _) = qgraph.GenerateGraphFromModel( fold_model, "quantized_bits(8, 0, 1)", "quantized_bits(8, 0, 1)" ) qgraph.GraphAddSingleSourceSingleSink(graph) qgraph.GraphRemoveNodeWithNodeType(graph, "InputLayer") qgraph.GraphPropagateActivationsToEdges(graph) # Finds the Batchnorm nodes and mark them. layers_followed_by_bn = {} bn_layers_to_skip = set() for node_id in nx.topological_sort(graph): node = graph.nodes[node_id] layer = node["layer"][0] if layer: successor_ids = list(graph.successors(node_id)) is_single = len(successor_ids) == 1 successor_layer = graph.nodes[successor_ids[0]]["layer"][0] followed_by_bn = successor_layer.__class__.__name__ == "QBatchNormalization" # TODO(lishanok): extend to QDense types enable_bn_fusing = ( layer.__class__.__name__ in ["QConv2D", "QDepthwiseConv2D"] and is_single and followed_by_bn ) if enable_bn_fusing: layers_followed_by_bn[layer.name] = successor_layer.name bn_layers_to_skip.add(successor_layer.name) return (layers_followed_by_bn, bn_layers_to_skip)
[docs] def add_bn_fusing_weights(prev_layer, bn_layer, saved_weights): """Adds additional fusing weights to saved_weights. In hardware inference, we need to combined fuse previous layer's output with the following batchnorm op. z[i] = bn(y[i]) = inv[i] * y'[i] * scale[i] - bias'[i] is the final output of the previous layer and bn layer, with: inv[i] = gamma[i]* rsqrt(variance[i]^2+epsilon) is computed from the bn layer weights y'[i] is the i-th channel output from the previous layer (before scale) scale[i] is the i-th channel kernel quantizer scale fused_bias[i] = inv[i] * bias[i] + beta[i] - inv[i]*mean[i] where bias is the bias term from the previous layer, beta and mean are the bn layer weights. Args: prev_layer: qkeras layer, could be QConv2D/QDepthwiseConv2D/QDense. bn_layer: The following QBatchNormalization layer that needs to be fused with the previous layer. saved_weights: Dict. The centralized weights dictionary that exports relevant weights and parameters for hardware inference. """ bn_qs = bn_layer.quantizers bn_ws = bn_layer.get_weights() if bn_qs[4] is not None: assert bn_qs[0] is None and bn_qs[3] is None, ( "If using the inverse quantizer, the gamma and variance quantizers " "should not be used in order to avoid quantizing a value twice." ) def apply_quantizer(quantizer, input_weight): if quantizer: weight = input_weight weight = quantizer(weight) else: weight = input_weight return weight # Quantize respective bn layer weights gamma = 1.0 beta = 0 idx = 0 if bn_layer.scale: gamma = apply_quantizer(bn_layer.gamma_quantizer_internal, bn_ws[idx]) idx += 1 if bn_layer.center: beta = apply_quantizer(bn_layer.beta_quantizer_internal, bn_ws[idx]) idx += 1 mean = apply_quantizer(bn_layer.mean_quantizer_internal, bn_ws[idx]) idx += 1 variance = apply_quantizer(bn_layer.variance_quantizer_internal, bn_ws[idx]) # Compute inv[i] inv = gamma * (1.0 / keras.ops.sqrt(variance + bn_layer.epsilon)) inv = keras.ops.convert_to_numpy(inv) if bn_layer.inverse_quantizer_internal is not None: quantizer = bn_layer.inverse_quantizer_internal inv = quantizer(inv) # Compute fused_bias[i] if prev_layer.use_bias: cur_weights = prev_layer.get_weights() assert len(cur_weights) == 2, ( "Weights should have length of 2. Found" f"{len(cur_weights)} instead." ) prev_bias = cur_weights[-1] else: prev_bias = 0 b_prime = inv * prev_bias + beta - inv * mean saved_weights[prev_layer.name]["enable_bn_fusing"] = True saved_weights[prev_layer.name]["fused_bn_layer_name"] = bn_layer.name saved_weights[prev_layer.name]["bn_inv"] = inv saved_weights[prev_layer.name]["fused_bias"] = b_prime
# Model utilities: before saving the weights, we want to apply the quantizers
[docs] def model_save_quantized_weights(model, filename=None, custom_objects={}): """Quantizes model for inference and save it. Takes a model with weights, apply quantization function to weights and returns a dictionary with quantized weights. User should be aware that "po2" quantization functions cannot really be quantized in meaningful way in Keras. So, in order to preserve compatibility with inference flow in Keras, we do not covert "po2" weights and biases to exponents + signs (in case of quantize_po2), but return instead (-1)**sign*(2**round(log2(x))). In the returned dictionary, we will return the pair (sign, round(log2(x))). Special care needs to be given to quantized_bits(alpha="auto_po2") as well. Since in this quantizer, hardware needs the integer weights and scale for hardware inference, this function will return the pair (scale, integer_weights) in the returned dictionary. Arguments: model: model with weights to be quantized. filename: if specified, we will save the hdf5 containing the quantized weights so that we can use them for inference later on. custom_objects: Dict of model specific objects needed to load/store. Returns: dictionary containing layer name and quantized weights that can be used by a hardware generator. """ saved_weights = {} # Find the conv/dense layers followed by Batchnorm layers (fusing_layer_pair_dict, bn_layers_to_skip) = find_bn_fusing_layer_pair( model, custom_objects ) print("... quantizing model") for layer in model.layers: if hasattr(layer, "get_quantizers"): # weights for software inference weights = [] signs = [] scales = [] # weights for hardware inference hw_weights = [] if any( isinstance(layer, t) for t in [QConv2DBatchnorm, QDenseBatchnorm, QDepthwiseConv2DBatchnorm] ): qs = layer.get_quantizers() ws = layer.get_folded_weights() elif any(isinstance(layer, t) for t in [QSimpleRNN, QLSTM, QGRU]): qs = layer.get_quantizers()[:-1] ws = layer.get_weights() else: qs = layer.get_quantizers() ws = layer.get_weights() has_sign = False has_scale = False enable_bn_fusing = False # isinstance() might fail due to inconsistent module import path. # Use __class__.__name__ instead. layer_class = layer.__class__.__name__ if layer_class == "QBatchNormalization" and layer.name in bn_layers_to_skip: # Mark current bn layer to be fused with the previous layer enable_bn_fusing = True for quantizer, weight in zip(qs, ws): if quantizer: weight = weight weight = quantizer(weight) # If quantizer is power-of-2 (quantized_po2 or quantized_relu_po2), # we would like to process it here. # # However, we cannot, because we will lose sign information as # quanized_po2 will be represented by the tuple (sign, log2(abs(w))). # # In addition, we will not be able to use the weights on the model # any longer. # # So, instead of "saving" the weights in the model, we will return # a dictionary so that the proper values can be propagated. # Weights store the weight in the format that software inference uses. weights.append(weight) q_name = "" if quantizer: if isinstance(quantizer, six.string_types): q_name = quantizer elif hasattr(quantizer, "__name__"): q_name = quantizer.__name__ elif hasattr(quantizer, "name"): q_name = quantizer.name elif hasattr(quantizer, "__class__"): q_name = quantizer.__class__.__name__ if quantizer and ("_po2" in q_name): # Quantized_relu_po2 does not have a sign. if q_name == "quantized_po2": has_sign = True sign = knp.sign(weight) # Makes sure values are -1 or +1 only sign += 1.0 - knp.abs(sign) # hw_weight store the weight in the format that hardware inference # uses. hw_weight = knp.round(knp.log2(knp.abs(weight))) signs.append(sign) scales.append([]) elif q_name == "quantized_bits" and quantizer.alpha == "auto_po2": unsigned_bits = quantizer.bits - quantizer.keep_negative m = Kops.cast(pow(2.0, unsigned_bits), float) m_i = Kops.cast( keras.ops.power(2.0, quantizer.integer), float ) assert isinstance(quantizer.scale, np.ndarray) or \ isinstance(quantizer.scale, KerasTensor) or \ keras.ops.is_tensor(quantizer.scale) ( "The auto_po2 quantizer has to be called first in order " "to know the values of scale." ) scale = ( keras.ops.convert_to_numpy(quantizer.scale) if isinstance(quantizer.scale, KerasTensor) else quantizer.scale ) scale = Kops.cast(scale, float) # Make sure scale is power of 2 values log2val = knp.log2(scale) diff = knp.round(log2val) - log2val assert knp.all(diff == 0), "scale must be power of 2 values!" # Convert fixed point weight to integer weight, just hw_weight = weight * m / m_i # Because hw_weight is integer weights, set scale = scale * m_i / m # so that when we can multiply scale with the integer weight # during hardware inference to get the fixed point weights scale = scale * m_i / m has_scale = True scales.append(scale) else: hw_weight = weight signs.append([]) scales.append([]) hw_weights.append(hw_weight) # Save the weights in the format that hardware inference uses saved_weights[layer.name] = { "weights": hw_weights, "enable_bn_fusing": enable_bn_fusing, } if isinstance(layer, QAveragePooling2D) or isinstance( layer, QGlobalAveragePooling2D ): if isinstance(layer, QAveragePooling2D): pool_area = layer.pool_size if isinstance(layer.pool_size, int): pool_area = layer.pool_size * layer.pool_size else: pool_area = knp.prod(layer.pool_size) else: pool_area = layer.compute_pooling_area( input_shape=layer.input.shape ) pool_area = keras.ops.cast(pool_area, dtype=float) if hasattr(layer, "average_quantizer_internal") and callable( layer.average_quantizer_internal ): saved_weights[layer.name]["q_mult_factor"] = ( keras.ops.convert_to_numpy(layer.average_quantizer_internal(1.0 / pool_area)) ) else: saved_weights[layer.name]["q_mult_factor"] = None saved_weights[layer.name]["mult_factor"] = 1.0 / pool_area saved_weights[layer.name]["pool_area"] = pool_area if has_sign: saved_weights[layer.name]["signs"] = signs if has_scale: saved_weights[layer.name]["scales"] = scales if not any( isinstance(layer, t) for t in [QConv2DBatchnorm, QDenseBatchnorm, QDepthwiseConv2DBatchnorm] ): # Set layer weights in the format that software inference uses layer.set_weights(weights) else: print( layer.name, " conv and batchnorm weights cannot be seperately" " quantized because they will be folded before quantization.", ) # adjust weights for bn fusing if necessary if layer.name in fusing_layer_pair_dict.keys(): print( f"Fuse {layer.name} output with " f"{fusing_layer_pair_dict[layer.name]} for hardware inference." ) add_bn_fusing_weights( prev_layer=layer, bn_layer=model.get_layer(fusing_layer_pair_dict[layer.name]), saved_weights=saved_weights, ) elif layer.get_weights(): print(" ", layer.name, "has not been quantized") if filename: model.save_weights(filename) return saved_weights
[docs] def quantize_activation(layer_config, activation_bits): """Replaces activation by quantized activation functions.""" str_act_bits = str(activation_bits) # relu -> quantized_relu(bits) # tanh -> quantized_tanh(bits) # sigmoid -> quantized_sigmoid(bits) # more to come later if layer_config.get("activation", None) is None: return if isinstance(layer_config["activation"], six.string_types): a_name = layer_config["activation"] elif isinstance(layer_config["activation"], types.FunctionType): a_name = layer_config["activation"].__name__ else: a_name = layer_config["activation"].__class__.__name__ if a_name == "linear": return if a_name == "relu": layer_config["activation"] = "quantized_relu(" + str_act_bits + ")" elif a_name == "tanh": layer_config["activation"] = "quantized_tanh(" + str_act_bits + ")" elif a_name == "sigmoid": layer_config["activation"] = "quantized_sigmoid(" + str_act_bits + ")"
[docs] def get_config(quantizer_config, layer, layer_class, parameter=None): """Returns search of quantizer on quantizer_config.""" quantizer = quantizer_config.get( layer["config"]["name"], quantizer_config.get(layer_class, None) ) if quantizer is not None and parameter is not None: quantizer = quantizer.get(parameter, None) return quantizer
[docs] def is_TFOpLambda_layer(layer): return layer.__class__.__name__ == "TFOpLambda"
[docs] def get_y_from_TFOpLambda(model_cfg, layer): """Get the value of "y" from the TFOpLambda layer's configuration. Args: model_cfg: dictionary type, model.get_config() output layer: a given layer instance Return: value of "y" for a TFOpLambda layer. 'y' here corresponds to how tensorflow stores TFOpLambda layer parameter in serialization. for example, TFOpLambda(func), where func is knp.multiply(input_tensor, 3). "y" would be the value 3. """ for layer_config in model_cfg["layers"]: op_name = layer_config["config"]["name"] class_name = layer_config["class_name"] # TODO(lishanok): Extend support for other TFOpLambda types when needed if op_name == layer.name and class_name == "TFOpLambda": assert ( "knp.multiply" in op_name ), f"TFOpLambda layer {op_name} not supported!" return layer_config["inbound_nodes"][-1][-1]["y"] return None
[docs] def convert_to_folded_model(model): """Find Conv/Dense layers followed by BN layers and fold them. Args: model: Keras model instance. Returns: new_model: Keras model with folded BN layers. layers_to_fold: list of folded layer names. """ fold_model = clone_model(model) model_cfg = model.get_config() (graph, _) = qgraph.GenerateGraphFromModel( fold_model, "quantized_bits(8, 0, 1)", "quantized_bits(8, 0, 1)" ) qgraph.GraphAddSingleSourceSingleSink(graph) qgraph.GraphRemoveNodeWithNodeType(graph, "InputLayer") qgraph.GraphPropagateActivationsToEdges(graph) bn_nodes_to_delete = [] layers_to_fold = [] for node_id in nx.topological_sort(graph): node = graph.nodes[node_id] layer = node["layer"][0] if layer: successor_ids = list(graph.successors(node_id)) is_single = len(successor_ids) == 1 successor_layer = graph.nodes[successor_ids[0]]["layer"][0] followed_by_bn = successor_layer.__class__.__name__ == "BatchNormalization" if ( layer.__class__.__name__ in ["Conv2D", "DepthwiseConv2D"] and is_single and followed_by_bn ): bn_nodes_to_delete.append(successor_ids[0]) layers_to_fold.append(layer.name) for node_id in bn_nodes_to_delete: qgraph.GraphRemoveNode(graph, node_id) model_outputs = [] x = model_inputs = fold_model.inputs for node_id in nx.topological_sort(graph): layer_input_tensors = [] node = graph.nodes[node_id] layer = node["layer"][0] if layer: for parent_node_id in graph.predecessors(node_id): edge = graph.edges[(parent_node_id, node_id)] input_tensor = edge["tensor"] layer_input_tensors.append(input_tensor) if len(layer_input_tensors) == 1: layer_input_tensors = layer_input_tensors[0] # Special handling depending on the layer type if isinstance(layer, (layers.Multiply, layers.Add, layers.Concatenate)): # Merge layers expect a list of tensors as input x = layer( layer_input_tensors if isinstance(layer_input_tensors, list) else [layer_input_tensors] ) elif is_TFOpLambda_layer(layer): # For TFOpLambda layers, pass additional input `y` derived from the model configuration y = get_y_from_TFOpLambda(model_cfg, layer) x = layer(layer_input_tensors, y) else: # Standard layer call with single input tensor x = layer(layer_input_tensors) for u, v in graph.edges(node_id): graph[u][v]["tensor"] = x if v == -2 and not any(x is y for y in model_outputs): model_outputs.append(x) keras_outputs = [t for t in model_outputs if isinstance(t, KerasTensor)] new_model = Model(inputs=model_inputs, outputs=keras_outputs) return new_model, layers_to_fold
[docs] def model_quantize( model, quantizer_config, activation_bits, custom_objects=None, transfer_weights=False, prefer_qadaptiveactivation=False, enable_bn_folding=False, ): """Creates a quantized model from non-quantized model. The quantized model translation is based on json interface of Keras, which requires a custom_objects dictionary for "string" types. Because of the way json works, we pass "string" objects for the quantization mechanisms and we perform an eval("string") which technically is not safe, but it will do the job. The quantizer_config is a dictionary with the following form. { Dense_layer_name: { "kernel_quantizer": "quantizer string", "bias_quantizer": "quantizer_string" }, Conv2D_layer_name: { "kernel_quantizer": "quantizer string", "bias_quantizer": "quantizer_string" }, Activation_layer_name: "quantizer string", "QActivation": { "relu": "quantizer_string" }, "QConv2D": { "kernel_quantizer": "quantizer string", "bias_quantizer": "quantizer_string" }, "QBatchNormalization": {} } In the case of "QBidirectional", we can follow the same form as above. The specified configuration will be used for both forward and backwards layer. { "Bidirectional" : { "kernel_quantizer" : "quantizer string", "bias_quantizer" : "quantizer string", "recurrent_quantizer" : "quantizer string" } } In the case of "QActivation", we can modify only certain types of activations, for example, a "relu". In this case we represent the activation name by a dictionary, or we can modify all activations, without representhing as a set. We right now require a default case in case we cannot find layer name. This simplifies the dictionary because the simplest case, we can just say: { "default": { "kernel": "quantized_bits(4)", "bias": "quantized_bits(4)" } } and this will quantize all layers' weights and bias to be created with 4 bits. Arguments: model: model to be quantized quantizer_config: dictionary (as above) with quantized parameters activation_bits: number of bits for quantized_relu, quantized_tanh, quantized_sigmoid custom_objects: dictionary following keras recommendations for json translation. transfer_weights: if true, weights are to be transfered from model to qmodel. prefer_qadaptiveactivation: Bool. If true, try to use QAdaptiveActivation over QActivation whenever possible enable_bn_folding: Bool. If true, fold conv/dense layers with following batch normalization layers whenever possible. use QConv2DBatchnorm for example, to replace conv2d layers Returns: qmodel with quantized operations and custom_objects. """ if enable_bn_folding: # Removes bn layers from the model and find a list of layers to fold. model, layers_to_fold = convert_to_folded_model(model) if len(layers_to_fold) == 0: # If no layers to fold, no need to perform folding. enable_bn_folding = False if not custom_objects: custom_objects = {} # Let's make a deep copy to make sure our objects are not shared elsewhere. jm = copy.deepcopy(json.loads(model.to_json())) custom_objects = copy.deepcopy(custom_objects) config = jm["config"] layers = config["layers"] def quantize_rnn(layer, quantizer_config): q_name = "Q" + layer["class_name"] # Needs to add kernel, recurrent bias quantizers. kernel_quantizer = get_config( quantizer_config, layer, q_name, "kernel_quantizer" ) recurrent_quantizer = get_config( quantizer_config, layer, q_name, "recurrent_quantizer" ) if layer["config"]["use_bias"]: bias_quantizer = get_config( quantizer_config, layer, q_name, "bias_quantizer" ) else: bias_quantizer = None state_quantizer = get_config(quantizer_config, layer, q_name, "state_quantizer") # This is to avoid unwanted transformations. if kernel_quantizer is None: return layer["config"]["kernel_quantizer"] = kernel_quantizer layer["config"]["recurrent_quantizer"] = recurrent_quantizer layer["config"]["bias_quantizer"] = bias_quantizer layer["config"]["state_quantizer"] = state_quantizer # If activation is present, add activation here. activation = get_config(quantizer_config, layer, q_name, "activation_quantizer") if activation: layer["config"]["activation"] = activation else: quantize_activation(layer["config"], activation_bits) # If recurrent activation is present, add activation here. if layer["class_name"] in ["LSTM", "GRU"]: recurrent_activation = get_config( quantizer_config, layer, q_name, "recurrent_activation_quantizer" ) if recurrent_activation: layer["config"]["recurrent_activation"] = recurrent_activation layer["class_name"] = q_name registered_name = layer.pop("registered_name", None) if registered_name: layer["registered_name"] = q_name for layer in layers: layer_config = layer["config"] # Dense becomes QDense, Conv1D becomes QConv1D etc # Activation converts activation functions. if layer["class_name"] in [ "Dense", "Conv1D", "Conv2D", "Conv2DTranspose", "SeparableConv1D", "SeparableConv2D", ]: if ( layer["class_name"] in ["Dense", "Conv2D"] and enable_bn_folding and layer["name"] in layers_to_fold ): # Only fold if current layer is followed by BN layer. q_name = "Q" + layer["class_name"] + "Batchnorm" layer_config["use_bias"] = True # Folded layers require a bias # Sets ema_freeze_delay and folding_mode specific to # QDepthwiseConv2DBatchnorm layer config. folding_mode = get_config( quantizer_config, layer, q_name, "folding_mode" ) layer_config["folding_mode"] = ( folding_mode if folding_mode else "ema_stats_folding" ) ema_freeze_delay = get_config( quantizer_config, layer, q_name, "ema_freeze_delay" ) layer_config["ema_freeze_delay"] = ( ema_freeze_delay if ema_freeze_delay else None ) else: q_name = "Q" + layer["class_name"] # Needs to add kernel/bias quantizers. kernel_quantizer = get_config( quantizer_config, layer, q_name, "kernel_quantizer" ) if layer_config["use_bias"]: bias_quantizer = get_config( quantizer_config, layer, q_name, "bias_quantizer" ) else: bias_quantizer = None if ( kernel_quantizer is None and q_name == "Q" + layer["class_name"] + "Batchnorm" ): # Tries none-folded layer quantizer as a back up. kernel_quantizer = get_config( quantizer_config, layer, "Q" + layer["class_name"], "kernel_quantizer", ) bias_quantizer = get_config( quantizer_config, layer, "Q" + layer["class_name"], "bias_quantizer" ) # This is to avoid unwanted transformations. if kernel_quantizer is None: continue layer["class_name"] = q_name layer_config["kernel_quantizer"] = kernel_quantizer layer_config["bias_quantizer"] = bias_quantizer # If activation is present, add activation here. quantizer = get_config( quantizer_config, layer, q_name, "activation_quantizer" ) if quantizer: layer_config["activation"] = quantizer else: quantize_activation(layer_config, activation_bits) elif layer["class_name"] == "DepthwiseConv2D": if enable_bn_folding and layer["name"] in layers_to_fold: q_name = "QDepthwiseConv2DBatchnorm" layer_config["use_bias"] = True # Folded layers require a bias # Sets ema_freeze_delay and folding_mode specific to # QDepthwiseConv2DBatchnorm layers. folding_mode = get_config( quantizer_config, layer, q_name, "folding_mode" ) layer_config["folding_mode"] = ( folding_mode if folding_mode else "ema_stats_folding" ) ema_freeze_delay = get_config( quantizer_config, layer, q_name, "ema_freeze_delay" ) layer_config["ema_freeze_delay"] = ( ema_freeze_delay if ema_freeze_delay else None ) else: q_name = "QDepthwiseConv2D" # Needs to add kernel/bias quantizers. depthwise_quantizer = get_config( quantizer_config, layer, q_name, "depthwise_quantizer" ) if layer_config["use_bias"]: bias_quantizer = get_config( quantizer_config, layer, q_name, "bias_quantizer" ) else: bias_quantizer = None if depthwise_quantizer is None and q_name == "QDepthwiseConv2DBatchnorm": # Tries none-folded layer quantizer as a back up. depthwise_quantizer = get_config( quantizer_config, layer, "QDepthwiseConv2D", "depthwise_quantizer" ) bias_quantizer = get_config( quantizer_config, layer, "QDepthwiseConv2D", "bias_quantizer" ) # This is to avoid unwanted transformations. if depthwise_quantizer is None: continue layer["class_name"] = q_name layer_config["depthwise_quantizer"] = depthwise_quantizer layer_config["bias_quantizer"] = bias_quantizer # If activation is present, add activation here. quantizer = get_config( quantizer_config, layer, q_name, "activation_quantizer", ) if quantizer: layer_config["activation"] = quantizer else: quantize_activation(layer_config, activation_bits) elif layer["class_name"] in ["SimpleRNN", "LSTM", "GRU"]: quantize_rnn(layer, quantizer_config) elif layer["class_name"] == "Bidirectional": forward_layer_quantizer_config = { layer_config["layer"]["config"]["name"]: get_config( quantizer_config, layer, "QBidirectional" ) } quantize_rnn(layer["config"]["layer"], forward_layer_quantizer_config) if "backward_layer" in layer_config: backward_layer_quantizer_config = { layer_config["backward_layer"]["config"]["name"]: get_config( quantizer_config, layer, "QBidirectional" ) } quantize_rnn( layer["config"]["backward_layer"], backward_layer_quantizer_config ) layer["class_name"] = "QBidirectional" elif layer["class_name"] == "Activation": if prefer_qadaptiveactivation: # Try to find QAdaptiveActivation first quantizer = get_config(quantizer_config, layer, "QAdaptiveActivation") is_qadaptiveactivation = True if quantizer is None: # Try QActivation as a backup quantizer = get_config(quantizer_config, layer, "QActivation") is_qadaptiveactivation = False else: # Tries to find QActivation first. quantizer = get_config(quantizer_config, layer, "QActivation") is_qadaptiveactivation = False if quantizer is None: # Try QAdaptiveActivation as a backup quantizer = get_config( quantizer_config, layer, "QAdaptiveActivation" ) is_qadaptiveactivation = True # This is to avoid softmax from quantizing in autoq. if quantizer is None: continue # If quantizer exists in dictionary related to this name, # use it, otherwise, use normal transformations. if not isinstance(quantizer, dict) or quantizer.get( layer_config["activation"], None ): # Only change activation layer if we will use a quantized activation. layer["class_name"] = ( "QAdaptiveActivation" if is_qadaptiveactivation else "QActivation" ) if isinstance(quantizer, dict): quantizer = quantizer[layer_config["activation"]] if quantizer: if is_qadaptiveactivation: assert ( quantizer.find(",") < 0 ), "Only integer bits should be defined for QAdaptiveActivation" layer_config["total_bits"] = int( re.sub(r"[^\d]", "", quantizer) ) quantizer = re.sub(r"\(.*", "", quantizer) # remove params layer_config["activation"] = quantizer else: quantize_activation(layer_config, activation_bits) # We have to do this because of other instances of ReLU. elif layer["class_name"] in ["ReLU", "relu", "LeakyReLU"]: quantizer = get_config(quantizer_config, layer, "QActivation") # This is to avoid unwanted transformations. if quantizer is None: continue if layer["class_name"] == "LeakyReLU": negative_slope = layer["config"]["alpha"] elif layer["class_name"] == "relu": max_value = layer["config"]["max_value"] negative_slope = layer["config"]["alpha"] threshold = layer["config"]["threshold"] else: # ReLU from mobilenet max_value = layer["config"]["max_value"] negative_slope = layer["config"]["negative_slope"] threshold = layer["config"]["threshold"] if negative_slope > 0: q_name = "leakyrelu" else: q_name = "relu" # If quantizer exists in dictionary related to this name, # use it, otherwise, use normal transformations. if not isinstance(quantizer, dict) or quantizer.get(q_name, None): # Only change activation layer if we will use a quantized activation. layer["class_name"] = "QActivation" # Remove relu specific configurations # remember that quantized relu's are always upper bounded. if layer["class_name"] == "LeakyReLU": del layer["config"]["alpha"] elif layer["class_name"] == "relu": del layer["config"]["max_value"] del layer["config"]["alpha"] del layer["config"]["threshold"] else: # ReLU from mobilenet del layer["config"]["max_value"] del layer["config"]["negative_slope"] del layer["config"]["threshold"] if isinstance(quantizer, dict): quantizer = quantizer[q_name] if quantizer: layer["config"]["activation"] = quantizer else: quantize_activation(layer["config"], activation_bits) elif layer["class_name"] == "BatchNormalization": # We will assume at least QBatchNormalization or # layer name is in dictionary to enable conversion # otherwise we will just skip it. if ( layer_config["name"] not in quantizer_config and "QBatchNormalization" not in quantizer_config ): continue layer["class_name"] = "QBatchNormalization" # Needs to add kernel/bias quantizers. gamma_quantizer = get_config( quantizer_config, layer, "QBatchNormalization", "gamma_quantizer" ) beta_quantizer = get_config( quantizer_config, layer, "QBatchNormalization", "beta_quantizer" ) mean_quantizer = get_config( quantizer_config, layer, "QBatchNormalization", "mean_quantizer" ) variance_quantizer = get_config( quantizer_config, layer, "QBatchNormalization", "variance_quantizer" ) layer_config["gamma_quantizer"] = gamma_quantizer layer_config["beta_quantizer"] = beta_quantizer layer_config["mean_quantizer"] = mean_quantizer layer_config["variance_quantizer"] = variance_quantizer elif layer["class_name"] in ["AveragePooling2D", "GlobalAveragePooling2D"]: q_name = "Q" + layer["class_name"] # Adds the average quanizer to config. average_quantizer = get_config( quantizer_config, layer, q_name, "average_quantizer" ) # This is to avoid unwanted transformations. if average_quantizer is None: continue layer["class_name"] = q_name layer_config["average_quantizer"] = average_quantizer # Adds activation to config. quantizer = get_config( quantizer_config, layer, q_name, "activation_quantizer" ) if quantizer: layer_config["activation"] = quantizer else: quantize_activation(layer_config, activation_bits) q_name = None registered_name = layer.pop("registered_name", None) if registered_name: layer["registered_name"] = q_name or registered_name # We need to keep a dictionary of custom objects as our quantized library # is not recognized by keras. qmodel = quantized_model_from_json(json.dumps(jm), custom_objects) # If transfer_weights is true, we load the weights from model to qmodel. if transfer_weights and not enable_bn_folding: for layer, qlayer in zip(model.layers, qmodel.layers): if layer.get_weights(): qlayer.set_weights(copy.deepcopy(layer.get_weights())) return qmodel
def _add_supported_quantized_objects(custom_objects): """Map all the quantized objects and quantizers.""" for name, quantizer in quantizer_registry._QUANTIZERS_REGISTRY._container.items(): custom_objects[name] = quantizer custom_objects["QInitializer"] = QInitializer custom_objects["QDense"] = QDense custom_objects["QConv1D"] = QConv1D custom_objects["QConv2D"] = QConv2D custom_objects["QConv2DTranspose"] = QConv2DTranspose custom_objects["QSimpleRNNCell"] = QSimpleRNNCell custom_objects["QSimpleRNN"] = QSimpleRNN custom_objects["QLSTMCell"] = QLSTMCell custom_objects["QLSTM"] = QLSTM custom_objects["QGRUCell"] = QGRUCell custom_objects["QGRU"] = QGRU custom_objects["QBidirectional"] = QBidirectional custom_objects["QDepthwiseConv2D"] = QDepthwiseConv2D custom_objects["QSeparableConv1D"] = QSeparableConv1D custom_objects["QSeparableConv2D"] = QSeparableConv2D custom_objects["QActivation"] = QActivation custom_objects["QAdaptiveActivation"] = QAdaptiveActivation custom_objects["QBatchNormalization"] = QBatchNormalization custom_objects["Clip"] = Clip custom_objects["QConv2DBatchnorm"] = QConv2DBatchnorm custom_objects["QDepthwiseConv2DBatchnorm"] = QDepthwiseConv2DBatchnorm custom_objects["QDenseBatchnorm"] = QDenseBatchnorm custom_objects["QAveragePooling2D"] = QAveragePooling2D custom_objects["QGlobalAveragePooling2D"] = QGlobalAveragePooling2D custom_objects["QScaleShift"] = QScaleShift
[docs] def clone_model(model, custom_objects=None): """Clone a qkeras model safely.""" if not custom_objects: custom_objects = {} custom_objects = copy.deepcopy(custom_objects) try: qmodel = keras.models.clone_model(model) qmodel.set_weights(model.get_weights()) except Exception as e: print(f"[ERROR] Model cloning failed: {e}") raise e return qmodel
[docs] def remove_mask_keys(obj): if isinstance(obj, dict): # Removes "mask" from all dictionaries (no matter how deeply nested) return {k: remove_mask_keys(v) for k, v in obj.items() if k != "mask"} elif isinstance(obj, list): # Also processes lists (e.g., inbound_nodes is a list) return [remove_mask_keys(v) for v in obj] else: return obj # Primitive values (e.g., int, str) remain unchanged
[docs] def quantized_model_from_json(json_string, custom_objects=None): if not custom_objects: custom_objects = {} # Makes a deep copy to ensure our objects are not shared elsewhere. custom_objects = copy.deepcopy(custom_objects) _add_supported_quantized_objects(custom_objects) json_dict = json.loads(json_string) # Recursively remove "mask" cleaned_json_dict = remove_mask_keys(json_dict) # Convert back into JSON string cleaned_json_string = json.dumps(cleaned_json_dict) # Load the model qmodel = models.model_from_json(cleaned_json_string, custom_objects=custom_objects) return qmodel
[docs] def load_qmodel(filepath, custom_objects=None, compile=True): """Loads quantized model from Keras's model.save() h5 file. Arguments: filepath: one of the following: - string, path to the saved model - h5py.File or h5py.Group object from which to load the model - any file-like object implementing the method `read` that returns `bytes` data (e.g. `io.BytesIO`) that represents a valid h5py file image. custom_objects: Optional dictionary mapping names (strings) to custom classes or functions to be considered during deserialization. compile: Boolean, whether to compile the model after loading. Returns: A Keras model instance. If an optimizer was found as part of the saved model, the model is already compiled. Otherwise, the model is uncompiled and a warning will be displayed. When `compile` is set to False, the compilation is omitted without any warning. """ if not custom_objects: custom_objects = {} # Makes a deep copy to make sure our objects are not shared elsewhere. custom_objects = copy.deepcopy(custom_objects) _add_supported_quantized_objects(custom_objects) qmodel = keras.models.load_model( filepath, custom_objects=custom_objects, compile=compile ) return qmodel
[docs] def get_model_sparsity(model, per_layer=False, allow_list=None): """Calculates the sparsity of the model's weights and biases. Quantizes the model weights using model_save_quantized_weights (but does not save the quantized weights) before calculating the proportion of weights and biases set to zero. Arguments: model: The model to use to calculate sparsity. Assumes that this is a qkeras model with trained weights. per_layer: If to return a per-layer breakdown of sparsity allow_list: A list of layer class names that sparsity will be calculated for. If set to None, a default list will be used. Returns: A float value representing the proportion of weights and biases set to zero in the quantized model. If per_layer is True, it also returns a per-layer breakdown of model sparsity formatted as a list of tuples in the form (<layer name>, <sparsity proportion>) """ # Checks if to use a default list of allowed layers to calculate sparsity. if allow_list is None: allow_list = [ "QDense", "Dense", "QConv1D", "Conv1D", "QConv2D", "Conv2D", "QDepthwiseConv2D", "DepthwiseConv2D", "QSeparableConv1D", "SeparableConv1D", "QSeparableConv2D", "SeparableConv2D", "QOctaveConv2D", "QSimpleRNN", "RNN", "QLSTM", "QGRU", "QConv2DTranspose", "Conv2DTranspose", "QConv2DBatchnorm", "QDepthwiseConv2DBatchnorm", ] # Quantizes the model weights for a more accurate sparsity calculation. model_save_quantized_weights(model) # Calculates the sparsity layer by layer. layer_sparsity = [] total_sparsity = 0.0 all_weights = [] for layer in model.layers: if hasattr(layer, "quantizers") and layer.__class__.__name__ in allow_list: if layer.__class__.__name__ in [ "QConv2DBatchnorm", "QDepthwiseConv2DBatchnorm", ]: weights_to_examine = layer.get_folded_weights() else: weights_to_examine = layer.get_weights() layer_weights = [] for weight in weights_to_examine: try: weight_numpy = weight.ravel() except AttributeError: # In case of EagerTensor. weight_numpy = keras.ops.convert_to_numpy(weight).ravel() layer_weights.append(weight_numpy) all_weights.append(weight_numpy) layer_weights = knp.concatenate(layer_weights) layer_sparsity.append((layer.name, knp.mean(layer_weights == 0))) if len(all_weights) > 0: # Average the sparsity for the entire model. all_weights = knp.concatenate(all_weights) total_sparsity = knp.mean(all_weights == 0) if per_layer: return (total_sparsity, layer_sparsity) else: return total_sparsity
[docs] def quantized_model_debug(model, X_test, plot=False, plt_instance=None): """Debugs and plots model weights and activations. Args: model: The qkeras model to debug X_test: The sample data to use to give to model.predict plot: Bool. If to plot the results. plt_instance: A matplotlib.pyplot instance used to plot in an IPython environment. """ assert ( plt_instance and plot ) or not plot, "plt_instance is required if plt is True" outputs = [] output_names = [] for layer in model.layers: if layer.__class__.__name__ in REGISTERED_LAYERS: output_names.append(layer.name) outputs.append(layer.output) model_debug = Model(inputs=model.inputs, outputs=outputs) y_pred = model_debug.predict(X_test) print("{:30} {: 8.4f} {: 8.4f}".format("input", knp.min(X_test), knp.max(X_test))) for n, p in zip(output_names, y_pred): layer = model.get_layer(n) if ( layer.__class__.__name__ in "QActivation" or layer.__class__.__name__ in "QAdaptiveActivation" ): alpha = get_weight_scale(layer.activation, p) else: alpha = 1.0 print( f"{n:30} {knp.min(p / alpha): 8.4f} {knp.max(p / alpha): 8.4f}", end="", ) if alpha != 1.0: print(f" a[{knp.min(alpha): 8.4f} {knp.max(alpha):8.4f}]") if plot and layer.__class__.__name__ in [ "QConv1D", "QConv2D", "QConv2DTranspose", "QDense", "QActivation", "QAdaptiveActivation", "QSimpleRNN", "QLSTM", "QGRU", "QBidirectional", "QSeparableConv1D", "QSeparableConv2D", ]: plt_instance.hist(p.flatten(), bins=25) plt_instance.title(layer.name + "(output)") plt_instance.show() alpha = None if layer.__class__.__name__ not in [ "QConv2DBatchnorm", "QDepthwiseConv2DBatchnorm", ]: weights_to_examine = layer.get_weights() else: weights_to_examine = layer.get_folded_weights() for i, weights in enumerate(weights_to_examine): if hasattr(layer, "get_quantizers") and layer.get_quantizers()[i]: # Quantize the current weight using the corresponding quantizer quantized_weights = layer.get_quantizers()[i](weights) # Evaluate the tensor to a NumPy array if necessary if isinstance(quantized_weights, KerasTensor): weights = keras.ops.convert_to_numpy(quantized_weights) else: weights = quantized_weights if i == 0 and layer.__class__.__name__ in [ "QConv1D", "QConv2D", "QConv2DTranspose", "QDense", "QSimpleRNN", "QLSTM", "QGRU", "QSeparableConv1D", "QSeparableConv2D", "QConv2DBatchnorm", "QDepthwiseConv2DBatchnorm", ]: alpha = get_weight_scale(layer.get_quantizers()[i], weights) alpha_mask = alpha == 0.0 weights = knp.where(alpha_mask, weights * alpha, weights / alpha) if plot: plt_instance.hist(weights.flatten(), bins=25) plt_instance.title(layer.name + "(weights)") plt_instance.show() print( f" ({knp.min(weights): 8.4f} {knp.max(weights): 8.4f})", end="" ) if alpha is not None and isinstance(alpha, KerasTensor): print( f" a({knp.min(alpha): 10.6f} {knp.max(alpha): 10.6f})", end="" ) print("")
[docs] def quantized_model_dump(model, x_test, output_dir=None, layers_to_dump=[]): """Dumps tensors of target layers to binary files. Arguments: model: qkeras model object. x_test: numpy type, test tensors to generate output tensors. output_dir: a string for the directory to hold binary data. layers_to_dump: a list of string, specified layers by layer customized name. """ outputs = [] y_names = [] if not output_dir: with tempfile.TemporaryDirectory() as output_dir: print("temp dir", output_dir) if not os.path.exists(output_dir): os.makedirs(output_dir) print("create dir", output_dir) for layer in model.layers: if not layers_to_dump or layer.name in layers_to_dump: y_names.append(layer.name) outputs.append(layer.output) # Gather the tensor outputs from specified layers at layers_to_dump. model_debug = Model(inputs=model.inputs, outputs=outputs) y_pred = model_debug.predict(x_test) # Dumps tensors to files. for name, tensor_data in zip(y_names, y_pred): filename = os.path.join(output_dir, name + ".bin") print("writing the layer output tensor to ", filename) with open(filename, "w") as fid: tensor_data.astype("float32").tofile(fid)
[docs] def clone_model_and_freeze_auto_po2_scale( orig_model, orig_model_path=None, quantize_model_weights=False ): """Clone model and freeze the scale value of auto_po2 type quantizers. Args: orig_model: original model which will be used to clone the new model. If set to None, the function will load the original model from orig_model_path argument. orig_model_path: The path to the original model file. If set to None, the function will load the original model from the orig_model argument. quantize_model_weights: Bool to quantize weights to HW format. If set to False, the model weights will be in float format. If set to True, the model weights will be in HW format and the function will also check if the hw weights extracted from the new model matches the original model. Returns: A tuple of the new model and the new model's hw weights. Note: + When using this function to retrain model with fixed scale value. Set quantize_model_weights to False in this case. + This function only supports a collection of common layers that will use auto_po2 quantizers. For less common layers, it will raise errors and we will add more support case by case. Example usage: model, _ = clone_model_and_freeze_auto_po2_scale( orig_model_path="path/to/model", quantize_model_weights=False) """ def _create_bn_layer(layer_cfg, bn_inv_quantizer): # Clone batch normalization layer with the new inverse quantizer. if bn_inv_quantizer is not None: layer_cfg["inverse_quantizer"]["config"] = bn_inv_quantizer.get_config() return QBatchNormalization(**layer_cfg) def _create_qconv2d_layer(layer_cfg, kernel_quantizer): # Clone QConv2D layer wiht the new kernel quantizers. if kernel_quantizer is not None: layer_cfg["kernel_quantizer"]["config"] = kernel_quantizer.get_config() return QConv2D(**layer_cfg) def _create_qdepthwise_conv2d_layer(layer_cfg, depthwise_quantizer): # Clone QDepthwiseConv2D layer with the new depthwise_quantizer quantizer. if depthwise_quantizer is not None: layer_cfg["depthwise_quantizer"]["config"] = ( depthwise_quantizer.get_config() ) return QDepthwiseConv2D(**layer_cfg) def _create_qdense_layer(layer_cfg, kernel_quantizer): # Clone QDense layer with the new kernel quantizer. if kernel_quantizer is not None: layer_cfg["kernel_quantizer"]["config"] = kernel_quantizer.get_config() return QDense(**layer_cfg) def _create_other_layer(orig_layer): # Clone other layers. config = orig_layer.get_config() return orig_layer.__class__.from_config(config) def _create_quantized_bits_with_post_training_scale(q): # Create a new quantized_bits instance with the fixed scale value. if q is not None: q_cfg = q.get_config() q_cfg["post_training_scale"] = keras.ops.convert_to_numpy(q.scale) q = quantized_bits(**q_cfg) return q def _find_auto_po2_quantizer(layer): # Find the auto_po2 quantizer in the layer. Note that we allow at # most one auto_po2 quantizer in each layer due to the limitation of # the current HW implementation. num_auto_po2_quantizers = 0 auto_po2_quantizer = None if hasattr(layer, "quantizers"): for q in layer.quantizers: if hasattr(q, "alpha") and q.alpha == "auto_po2": num_auto_po2_quantizers += 1 auto_po2_quantizer = q if num_auto_po2_quantizers > 1: raise ValueError( f"{layer.name} has more than one auto_po2 quantizer. " "Please check if this is expected." ) else: return auto_po2_quantizer def _check_hw_weights_equal(hw_weights_1, hw_weights_2): # Check if the hw weights extracted from the new model matches the # original model. for layer_name in hw_weights_2.keys(): for key in hw_weights_2[layer_name].keys(): val1 = hw_weights_2[layer_name][key] val2 = hw_weights_1[layer_name][key] if isinstance(val1, list): for v1, v2 in zip(val1, val2): if not knp.all(v1 == v2): raise ValueError( f"{layer_name}/{key}: No Match! v1={v1}, v2={v2}" ) elif not knp.all(val1 == val2): raise ValueError( f"{layer_name}/{key}: No Match! val1={val1}, val2={val2}" ) # Load the original model with float weights. # Note: weights will be quantized later in silicon flow by calling # model_save_quantized_weights. if orig_model is not None and orig_model_path is not None: raise ValueError("Only one of orig_model and orig_model_path can be set.") elif orig_model is None and orig_model_path is None: raise ValueError("One of orig_model and orig_model_path must be set.") elif orig_model_path is not None: orig_model = load_qmodel(orig_model_path, compile=False) # Quantize model weights and compute quantizer scale values. quantized_model = keras.models.clone_model(orig_model) quantized_model.set_weights(orig_model.get_weights()) # In silicon flow, weight binary files are generated from hw weights. orig_hw_weights = model_save_quantized_weights(quantized_model) # Create a new model with fixed scale quantizers. x = inputs = keras.Input( shape=orig_model.input_shape[1:], name=orig_model.layers[0].name ) for layer in quantized_model.layers[1:]: layer_class = layer.__class__.__name__ auto_po2_quantizer = _find_auto_po2_quantizer(layer) auto_po2_quantizer_with_frozen_scale = ( _create_quantized_bits_with_post_training_scale(auto_po2_quantizer) ) layer_cfg = layer.get_config() # To be compatible with different python versions, we do not use # match-case style here. if layer_class == "QConv2D": x = _create_qconv2d_layer(layer_cfg, auto_po2_quantizer_with_frozen_scale)( x ) elif layer_class == "QDepthwiseConv2D": x = _create_qdepthwise_conv2d_layer( layer_cfg, auto_po2_quantizer_with_frozen_scale )(x) elif layer_class == "QBatchNormalization": x = _create_bn_layer(layer_cfg, auto_po2_quantizer_with_frozen_scale)(x) elif layer_class == "QDense": x = _create_qdense_layer(layer_cfg, auto_po2_quantizer_with_frozen_scale)(x) else: x = _create_other_layer(layer)(x) new_model = keras.Model(inputs, x) # Set the weights of the new model to the original model (float weights). new_model.set_weights(orig_model.get_weights()) # Check if the new model still has auto_po2 quantizer. # This function only supports a colleciton of common layers that will use # auto_po2 quantizers. For less common layers, we need to add extra support # in the future. for layer in new_model.layers: q = _find_auto_po2_quantizer(layer) if q is not None and q.post_training_scale is None: raise ValueError( f"{layer.name} in the new model still has auto_po2 quantizer with " "adaptive scales. Please check if this is expected!" ) new_hw_weights = None if quantize_model_weights: new_hw_weights = model_save_quantized_weights(new_model) # Check if the hw weights extracted from the new model matches the original # nima model. _check_hw_weights_equal(orig_hw_weights, new_hw_weights) return new_model, new_hw_weights
[docs] def clone_optimizer(opt): # robust across Keras 2.x and 3.x return optimizers.deserialize(optimizers.serialize(opt))