Source code for qkeras.qtools.run_qtools
# Copyright 2019 Google LLC
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Interface for running qtools and qenergy."""
import json
import keras.ops.numpy as knp
from qkeras.qtools import generate_layer_data_type_map, interface, qgraph, qtools_util
from qkeras.qtools.config_public import config_settings
from qkeras.qtools.qenergy import qenergy
from qkeras.qtools.settings import cfg
[docs]
class QTools:
"""integration of different qtools functions."""
def __init__(
self,
model,
process,
source_quantizers=None,
is_inference=False,
weights_path=None,
keras_quantizer=None,
keras_accumulator=None,
for_reference=False,
model_weights_already_quantized=True,
hw_weight_dict=None,
):
if model is not None:
self._model = model
if weights_path is not None:
self._model.load_weights(weights_path)
cfg.update(process, config_settings)
# if source_quantizers is None, CreateGraph will use
# default_source_quantizers defined in cfg
(graph, source_quantizer_list) = qgraph.CreateGraph(
model, source_quantizers, cfg.default_source_quantizer
)
# qgraph.PrintGraph(graph)
qgraph.GraphPropagateActivationsToEdges(graph)
self._layer_map = generate_layer_data_type_map.generate_layer_data_type_map(
graph,
source_quantizer_list,
is_inference,
keras_quantizer,
keras_accumulator,
for_reference,
model_weights_already_quantized=model_weights_already_quantized,
hw_weight_dict=hw_weight_dict,
)
self._output_dict = interface.map_to_json(self._layer_map)
self.source_quantizer_list = source_quantizer_list
[docs]
def qtools_stats_to_json(self, json_name):
"""dump the layer stats to a json file."""
with open(json_name, "w") as outfile:
json.dump(self._output_dict, outfile, indent=4)
[docs]
def qtools_stats_print(self):
"""print out the layer stats."""
dict_to_json = json.dumps(self._output_dict, indent=4)
print(dict_to_json)
[docs]
def pe(
self,
weights_on_memory="dram",
activations_on_memory="dram",
min_sram_size=0,
rd_wr_on_io=True,
verbose=False,
):
"""energy consumption calculation."""
assert weights_on_memory in ["dram", "sram", "fixed"]
energy_dict = qenergy.energy_estimate(
self._model,
self._layer_map,
weights_on_memory,
activations_on_memory,
min_sram_size,
rd_wr_on_io,
)
if verbose:
print("COST:")
dict_to_json = json.dumps(energy_dict, indent=4)
print(dict_to_json)
return energy_dict
[docs]
def extract_energy_sum(self, cfg_setting, energy_dict):
"""extracted energy needed in caculating sum."""
value = 0
for layer in energy_dict.keys():
if layer == "total_cost":
continue
class_name = energy_dict[layer]["class_name"]
keys = cfg_setting.get(class_name, cfg_setting.get("default", []))
value += sum([energy_dict[layer]["energy"][key] for key in keys])
return int(value)
[docs]
def extract_energy_profile(self, cfg_setting, energy_dict):
"""extract energy consumption in each layer."""
energy = {}
for layer in energy_dict.keys():
if layer == "total_cost":
continue
class_name = energy_dict[layer]["class_name"]
keys = cfg_setting.get(class_name, cfg_setting.get("default", []))
energy[layer] = {}
energy[layer]["energy"] = energy_dict[layer]["energy"]
energy[layer]["total"] = sum(
[energy_dict[layer]["energy"][key] for key in keys]
)
return energy
[docs]
def calculate_ace(self, default_float_bits):
"""Computes ACE numbers from conv/dense layers."""
def _get_ace(layer):
ace = 0
ace_float = 0
if layer.name in self._output_dict:
layer_item = self._output_dict[layer.name]
# Here we only consider the number of multiplication as the
# operation count. To include the number of
# accumulators, we should multiply the value by 2, assuming
# accumulation count ~= multiplication count.
operation_count = layer_item["operation_count"]
# Input bitwidth.
input_quantizer_list = layer_item["input_quantizer_list"]
input_bits = input_quantizer_list[0]["bits"]
# Weight bitwidth
weight_quantizer = qtools_util.get_val(layer_item, "weight_quantizer")
if weight_quantizer:
# Only layers such as Conv/Dense have weight_quantizers.
w_bits = weight_quantizer["bits"]
ace = operation_count * input_bits * w_bits
ace_float = (
operation_count * default_float_bits * default_float_bits
)
return (ace, ace_float)
print("WARNING: ACE are computed from conv/dense layers only!")
return (
sum([_get_ace(l)[0] for l in self._model.layers]),
sum([_get_ace(l)[1] for l in self._model.layers]),
)
[docs]
def calculate_output_bytes(self, include_model_input_size, default_float_bits):
"""Computes activation layers' output size in bytes."""
def _get_activation_size(layer):
# Since in hardare previous conv/dense layers will be fused with
# the following activation layers, we only consider the output of
# Activation layers when calculating output sizes.
if layer.__class__.__name__ in ["QActivation"]:
layer_item = self._output_dict[layer.name]
output_quantizer = layer_item["output_quantizer"]
output_shape = output_quantizer["shape"]
o_bits = output_quantizer["bits"]
return (
int(knp.prod(output_shape[1:]) * o_bits / 8.0),
int(knp.prod(output_shape[1:]) * default_float_bits / 8.0),
)
else:
return (0, 0)
output_bytes = sum([_get_activation_size(l)[0] for l in self._model.layers])
output_bytes_float = sum(
[_get_activation_size(l)[1] for l in self._model.layers]
)
if include_model_input_size:
# Include model input size.
output_bytes += (
knp.prod(self._model.input_shape[1:])
* self.source_quantizer_list[0].bits
/ 8.0
)
output_bytes_float += (
knp.prod(self._model.input_shape[1:]) * default_float_bits / 8.0
)
return (output_bytes, output_bytes_float)
[docs]
def calculate_weight_bytes(self, default_float_bits):
"""Computes weight size in bytes from conv/dense layers."""
def _get_weight_size(layer):
weight_bytes = 0
weight_bytes_float = 0
if layer.name in self._output_dict:
layer_item = self._output_dict[layer.name]
weight_quantizer = qtools_util.get_val(layer_item, "weight_quantizer")
if weight_quantizer:
# Calculates kernel bytes.
w_bits = weight_quantizer["bits"]
weight_bytes += int(knp.prod(layer.weights[0].shape) * w_bits / 8.0)
weight_bytes_float += int(
knp.prod(layer.weights[0].shape) * default_float_bits / 8.0
)
# Calculates bias bytes.
if hasattr(layer, "use_bias") and layer.use_bias:
bias_quantizer = qtools_util.get_val(
layer_item, "bias_quantizer"
)
assert (
bias_quantizer is not None
), f"{layer.name} has no bias_quantizer!"
b_bits = bias_quantizer["bits"]
weight_bytes += int(
knp.prod(layer.weights[1].shape) * b_bits / 8.0
)
weight_bytes_float += int(
knp.prod(layer.weights[1].shape) * default_float_bits / 8.0
)
return (weight_bytes, weight_bytes_float)
return (
sum([_get_weight_size(l)[0] for l in self._model.layers]),
sum([_get_weight_size(l)[1] for l in self._model.layers]),
)
[docs]
def get_roofline_numbers(
self, include_model_input_size=True, default_float_bits=32
):
"""Extracts model numbers for roofline model analysis."""
return {
"ACE": self.calculate_ace(default_float_bits)[0],
"weight_in_bytes": self.calculate_weight_bytes(default_float_bits)[0],
"activation_in_bytes": self.calculate_output_bytes(
include_model_input_size, default_float_bits
)[0],
"ACE_float": self.calculate_ace(default_float_bits)[1],
"weight_in_bytes_float": self.calculate_weight_bytes(default_float_bits)[1],
"activation_in_bytes_float": self.calculate_output_bytes(
include_model_input_size, default_float_bits
)[1],
}