# Copyright 2019 Google LLC
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""utility functions."""
import copy
import sys
import keras
import keras.ops.numpy as knp
import numpy as np
from keras import KerasTensor
from keras import backend as K
from qkeras.qtools import quantized_operators
[docs]
def get_val(feature, key, default_val=None):
# Return feature[key] or feature.key
if isinstance(feature, dict):
val = feature.get(key, default_val)
else:
val = getattr(feature, key, default_val)
return val
[docs]
def is_shape_alternation_layers(layer):
lname = layer.__class__.__name__
if lname:
return "MaxPool" in lname or "Reshape" in lname or "Flatten" in lname
return False
[docs]
def is_merge_layers(layer):
if layer.__class__.__name__ in [
"Add",
"Multiply",
"Subtract",
"Average",
"Maximum",
"Minimum",
"Concatenate",
"Dot",
]:
return True
else:
return False
[docs]
def get_operation_count(layer, input_shape):
"""Determines number of multiplier operations in a qkeras layer."""
# Check if the inputs are a list of Dimensions
if isinstance(input_shape, list):
input_shape = input_shape[0]
operation_count = 0
if is_merge_layers(layer) or is_shape_alternation_layers(layer):
operation_count = knp.prod(input_shape[1:])
elif layer.__class__.__name__ in [
"AveragePooling2D",
"AvgPool2D",
"GlobalAvgPool2D",
"GlobalAveragePooling2D",
"QGlobalAveragePooling2D",
]:
if hasattr(layer, "pool_size"):
pool_size = layer.pool_size
else:
pool_size = input_shape[1:-1]
add_ops = knp.prod(pool_size)
output_shape = layer.compute_output_shape(input_shape)
channels_o = output_shape[-1]
# total number of add ops
operation_count = channels_o * add_ops
elif "UpSampling" in layer.__class__.__name__:
# UpSampling1D/2D/3D
output_shape = layer.compute_output_shape(input_shape)
operation_count = knp.prod(output_shape[1:])
elif (
"Activation" in layer.__class__.__name__
or "BatchNormalization" in layer.__class__.__name__
):
operation_count = knp.prod(input_shape[1:])
elif layer.__class__.__name__ in [
"QConv2D",
"Conv2D",
"QConv2DBatchnorm",
"QConv2DTranspose",
"Conv2DTranspose",
]:
output_shape = layer.compute_output_shape(input_shape)
_, _, _, channels_i = input_shape
_, height_o, width_o, channels_o = output_shape
weight = layer.get_weights()[0]
kernel_h, kernel_w, _, _ = weight.shape
operation_count = (
height_o * width_o * channels_o * kernel_h * kernel_w * channels_i
)
elif layer.__class__.__name__ in ["QConv1D", "Conv1D"]:
output_shape = layer.compute_output_shape(input_shape)
_, _, channels_i = input_shape
_, time_o, channels_o = output_shape
weight = layer.get_weights()[0]
kernel_length, _, _ = weight.shape
operation_count = time_o * channels_o * kernel_length * channels_i
elif layer.__class__.__name__ in ["QDepthwiseConv2D", "DepthwiseConv2D"]:
output_shape = layer.compute_output_shape(input_shape)
_, _, _, channels_i = input_shape
_, height_o, width_o, channels_o = output_shape
weight_1 = layer.get_weights()[0]
kernel_h, kernel_w, _, _ = weight_1.shape
operation_count = kernel_h * kernel_w * height_o * width_o * channels_i
elif layer.__class__.__name__ in ["QDense", "Dense"]:
output_shape = layer.compute_output_shape(input_shape)
# Find the input and output shapes out of all possible dimensions.
# Usually, the first shape dimension will be the batch size, and the second
# shape dimension will be the number of channels. However, if the
# Dense layer is in Squeeze-and-Excite, the first shape dimension
# will be the batch size, the second and third shape dimension will be the
# spatial sizes (should both be 1), and the fourth shape dimensions will
# be the number of channels
#
# Note: asserts have been changed to sum(*shape > 1) <= 1 to avoid the case
# when the dense layer has an output with shape (None, 1), which results in
# sum(oshape > 1) = 0.
ishape = knp.array([i for i in input_shape if i is not None])
assert knp.sum(ishape > 1) <= 1, (
"Input Tensor shape in %s has " "multiple >1 size dims"
) % layer.name
size_i = knp.max(ishape)
oshape = knp.array([i for i in output_shape if i is not None])
assert knp.sum(oshape > 1) <= 1, (
"Output Tensor shape in %s has " + "multiple >1 size dims"
) % layer.name
size_o = knp.max(oshape)
operation_count = size_i * size_o
else:
print(f"operation count for {layer} is defaulted to 0")
return int(operation_count)
[docs]
def get_weights(layer, model_weights_already_quantized=True):
"""Get layer weights.
Args:
layer: given qkeras/keras layer
model_weights_already_quantized: bool. whether the given layer's weights
are already quantized. This is necessary because with certain quantizers,
eg., quantized_bits(alpha="auto_po2"), we cannot quantize the same
weights more than once, as it will lead to different results.
Returns:
Quantized layer weights.
"""
weights = layer.get_weights()
out = copy.deepcopy(weights)
if not model_weights_already_quantized:
for j, weight in enumerate(weights):
if hasattr(layer, "get_quantizers") and layer.get_quantizers()[j]:
out[j] = K.eval(layer.get_quantizers()[j](K.constant(weight)))
return out
[docs]
def get_scale_from_quantized_bits_with_auto_po2(quantizer):
"""Get scale from quantized_bits with alpha=auto_po2."""
if isinstance(quantizer.scale, np.ndarray):
return quantizer.scale
elif isinstance(quantizer.scale, KerasTensor):
return quantizer.scale
elif keras.ops.is_tensor(quantizer.scale):
return keras.ops.convert_to_numpy(quantizer.scale)
else:
return None
[docs]
def adjust_multiplier_for_auto_po2(multiplier, qkeras_weight_quantizer):
"""Adjust multiplier when weight quantizer is auto_po2 type.
Multiplier_bits = bits_x + bits_w
Multiplier_intbits = log2(scale) + intbits_x + intbits_w
Because we might have different scale for auto_po2 quantizer at different
output channels, multiplier will have different integer bits at different
output channel accordingly, which is not desirable in hardware implementation.
Therefore we set a general multiplier quantizers so that it provides enough
fractional bits and integer bits for all output channels.
"""
print("adjust multiplier for auto_po2 ...")
output_quantizer = multiplier.output
if (
hasattr(qkeras_weight_quantizer, "__str__")
and "quantized_bits" in qkeras_weight_quantizer.__str__()
and qkeras_weight_quantizer.alpha == "auto_po2"
):
bits = output_quantizer.bits
int_bits = output_quantizer.int_bits
scale = get_scale_from_quantized_bits_with_auto_po2(qkeras_weight_quantizer)
if scale is not None:
if isinstance(scale, KerasTensor) or isinstance(scale, np.ndarray):
scale = knp.squeeze(scale)
max_shift = int(knp.log2(knp.max(scale)))
min_shift = int(knp.log2(knp.min(scale)))
elif isinstance(scale, float):
max_shift = int(knp.log2(scale))
min_shift = max_shift
else:
raise ValueError(
f"Scale should be either numpy array or float,"
f"{type(scale)} is found instead!"
)
# In order to set a general quantizer for different output channels,
# we need to set both fractional bits and integer bits as the max required
# bits for different output channels
max_fractional_bits = bits - int_bits - min_shift
max_int_bits = int_bits + max_shift
total_bits = max_int_bits + max_fractional_bits
output_quantizer.bits = total_bits
output_quantizer.int_bits = max_int_bits
else:
# If scale is None, it means the quantizer has
# never being called. Therfore we skip the bitwidth adjustment steps
print(
"[WARNING] The weight quantizer is never called even though it has "
"alpha=auto_po2. In this case we do not adjust the multiplier and "
"accumulator bit width since we don't know the exact values of "
"scale",
file=sys.stderr,
)
elif hasattr(qkeras_weight_quantizer, "alpha") and (
qkeras_weight_quantizer.alpha == "auto_po2"
):
print(
"[WARNING] auto_po2 is detected on a non-quantized_bits quantizer."
"Currently in QTools we do not yet support the auto_po2 with the "
f" given quantizer type: {type(qkeras_weight_quantizer)}."
"Therefore we do not adjust the multiplier and accumulator bit width"
)
[docs]
def adjust_accumulator_for_auto_po2(
layer, multiplier, qkeras_weight_quantizer, bias_quantizer
):
"""Adjust accumulator when weight quantizer is auto_po2 type."""
fused_multiplier = copy.deepcopy(multiplier)
adjust_multiplier_for_auto_po2(fused_multiplier, qkeras_weight_quantizer)
weights = layer.get_weights()
kernel = weights[0]
kernel_shape = kernel.shape
# depthwise_kernel_shape = kernel_size + (input_dim, depth_multiplier)
# When computing accumulator bitwidth for dw conv2d layer, we do not
# need to count the last two dimensions
if layer.__class__.__name__ in ["QDepthwiseConv2D", "DepthwiseConv2D"]:
assert kernel_shape[-1] == 1, (
"depth_multiplier must be 1, " f"{kernel_shape[-1]} found instead!"
)
kernel_shape = kernel.shape[:-2] + (1, 1)
kernel_accumulator_factory = quantized_operators.AccumulatorFactory()
# Sets use_bias=False so that the accumulator doesn't account for bias
# bitwdith.
fused_kernel_accumulator = kernel_accumulator_factory.make_accumulator(
kernel_shape, fused_multiplier, use_bias=False
)
if not layer.use_bias:
bias_quantizer = None
fused_accumulator = fused_kernel_accumulator
else:
# Add bias quantizer bitwidth to the overall accumulator
bias_accumulator_instance = quantized_operators.adder_factory.IAdder()
fused_accumulator = bias_accumulator_instance.make_quantizer(
fused_kernel_accumulator.output, bias_quantizer
)
return fused_accumulator
[docs]
def find_divisors(num):
return [i for i in range(1, num + 1) if num % i == 0]
[docs]
def get_layer_info(layer: keras.layers.Layer, attr_name: str):
layer_type = layer.__class__.__name__
supported_layer_types = [
"QDense",
"QConv2D",
"QDepthwiseConv2D",
"MaxPooling2D",
"GlobalMaxPooling2D",
"QAveragePooling2D",
"QGlobalAveragePooling2D",
"UpSampling2D",
"Concatenate",
"QBatchNormalization",
"QActivation",
"Activation",
"Dropout",
"Reshape",
"ZeroPadding2D",
]
assert layer_type in supported_layer_types, (
f"For now only {supported_layer_types} layers are supported. "
f"Found {layer_type} instead."
)
# Get layer info such as input/output channels, kernel size and quantizers.
input_channel = layer.input.shape[-1]
output_channel = layer.output.shape[-1]
# Change default kernel_size to 1 to represent Dense Layer with Conv Layers.
kernel_height, kernel_width = (
layer.kernel_size if hasattr(layer, "kernel_size") else (1, 1)
)
layer_dict = {
"layer_type": layer_type,
"input_channel": input_channel,
"output_channel": output_channel,
"kernel_height": kernel_height,
"kernel_width": kernel_width,
}
return layer_dict.get(attr_name, None)
[docs]
def is_upsampled(layer: keras.layers.Layer):
# Evaluate if a given layer is doing upsampling.
return "UpSampling" in layer.__class__.__name__