Source code for qkeras.qtools.qenergy.qenergy

# Copyright 2019 Google LLC
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Calculate energy consumption of a given quantized model."""


import keras
import keras.ops.numpy as knp
import numpy as np

from qkeras.qtools import qtools_util
from qkeras.qtools.generate_layer_data_type_map import KERAS_LAYERS, qkeras_LAYERS
from qkeras.qtools.quantized_operators.quantizer_impl import IQuantizer
from qkeras.qtools.settings import cfg

# Model based on:
#   Mark Horowitz, Computing’s Energy Problem (and what we can
#   do about it). IEEE ISSCC, pp. 10–14, 2014
#   www.youtube.com/watch?v=eZdOkDtYMoo&feature=youtu.be&t=497

# all metrics converted to pJ/bit

OP = {
    "fp32": {
        "add": lambda x: max(cfg.fp32_add(x), 0),
        "mul": lambda x: max(cfg.fp32_mul(x), 0),
    },
    "fp16": {
        "add": lambda x: max(cfg.fp16_add(x), 0),
        "mul": lambda x: max(cfg.fp16_mul(x), 0),
    },
    "fpm": {
        "add": lambda x: max(cfg.fpm_add(x), 0),
        "mux": lambda x: max(cfg.fpm_add(x), 0),
        "xor": lambda x: max(cfg.fpm_add(x), 0),
        "and": lambda x: max(cfg.fpm_add(x), 0),
        "or": lambda x: max(cfg.fpm_add(x), 0),
        "shifter": lambda x: max(cfg.fpm_add(x), 0),
        "mul": lambda x: max(cfg.fpm_mul(x), 0),
    },
    "sram": {
        "rd": lambda x: max(cfg.sram_rd(x), 0),
        "wr": lambda x: max(cfg.sram_rd(x), 0),
        "mul_factor": cfg.sram_mul_factor,
    },
    "dram": {
        "rd": lambda x: max(cfg.dram_rd(x), 0),
        "wr": lambda x: max(cfg.dram_rd(x), 0),
        "mul_factor": cfg.dram_mul_factor,
    },
}


[docs] def to_scalar(x): # Python scalar if isinstance(x, (int, float, bool)): return x # NumPy scalar if isinstance(x, np.generic): return x.item() # NumPy array if isinstance(x, np.ndarray): if x.size != 1: raise ValueError(f"NumPy array has {x.size} elements, not 1") return x.item() # Keras backend tensor (immutable KerasTensor, JAX, Torch, TF eager…) if keras.ops.is_tensor(x): # Convert to NumPy via keras.ops arr = keras.ops.convert_to_numpy(x) if arr.size != 1: raise ValueError(f"Keras tensor has {arr.size} elements, not 1") return arr.item() # Generic sequence wrapper (list/tuple) of length 1 if hasattr(x, "__len__") and len(x) == 1: return to_scalar(x[0]) raise TypeError(f"Unsupported type {type(x)}")
[docs] def get_op_type(quantizer): assert isinstance(quantizer, IQuantizer) if quantizer.is_floating_point: return "fp" + str(quantizer.bits) else: return "fpm"
[docs] def memory_read_energy( is_input_layer, tensor_shape, mode, min_sram_size, rd_wr_on_io, quantizer_bits, is_tensor=True, ): """compute energy to bring tensors from DRAM to SRAM.""" if is_input_layer: if rd_wr_on_io: mode = "dram" else: mode = "sram" energy_mem = 0 if is_tensor: tensor_shape = tensor_shape[1:] total_bits = keras.ops.cast(knp.prod(tensor_shape) * quantizer_bits, float) total_bits_log2 = knp.log2(max(total_bits, min_sram_size)) if mode == "dram": # load input from dram; wx_sizes[1]-> input x quantizer bits # total_bits * 20 energy_mem += OP["dram"]["rd"](total_bits) if rd_wr_on_io: # write input to sram # total_bits * sqrt(data_size/2^18)*0.3125 # bits1 = total_bits * OP["sram"]["mul_factor"](knp.prod(tensor_shape)) # energy_mem += OP["sram"]["wr"](bits1) energy_mem += knp.ceil(total_bits * OP["sram"]["mul_factor"]) * OP["sram"][ "wr" ](total_bits_log2) elif mode == "sram": # read input from sram # total_bits * sqrt(data_size/2^18)*0.3125 # bits1 = total_bits * OP["sram"]["mul_factor"](knp.prod(tensor_shape)) # energy_mem += OP["sram"]["rd"](bits1) energy_mem += knp.ceil(total_bits * OP["sram"]["mul_factor"]) * OP["sram"]["rd"]( total_bits_log2 ) return energy_mem
[docs] def parameter_read_energy( layer, layer_item, weights_on_memory, min_sram_size, rd_wr_on_io ): """read weights/bias from memory.""" node_type = layer.__class__.__name__ rd_energy = 0 if node_type in ["QBatchNormalization", "BatchNormalization"]: gamma_quantizer = layer_item["gamma_quantizer"] beta_quantizer = layer_item["beta_quantizer"] mean_quantizer = layer_item["mean_quantizer"] variance_quantizer = layer_item["variance_quantizer"] # gamma, beta, mean, stddev weights = layer.get_weights() s = len(weights[0]) for q in [gamma_quantizer, beta_quantizer, mean_quantizer, variance_quantizer]: if q: rd_energy += memory_read_energy( False, (s), weights_on_memory, min_sram_size, rd_wr_on_io, q.bits, is_tensor=False, ) elif node_type in qkeras_LAYERS or node_type in KERAS_LAYERS: weight_quantizer = qtools_util.get_val(layer_item, "weight_quantizer") w_shapes = qtools_util.get_val(layer_item, "w_shapes") bias_quantizer = qtools_util.get_val(layer_item, "bias_quantizer") b_shapes = qtools_util.get_val(layer_item, "b_shapes") rd_energy += memory_read_energy( False, w_shapes, weights_on_memory, min_sram_size, rd_wr_on_io, weight_quantizer.bits, is_tensor=False, ) if bias_quantizer: # if use_bias=0, no bias bias_shapes = b_shapes rd_energy += memory_read_energy( False, bias_shapes, weights_on_memory, min_sram_size, rd_wr_on_io, bias_quantizer.bits, is_tensor=False, ) return rd_energy
[docs] def memory_write_energy( is_output_layer, tensor_shape, mode, min_sram_size, rd_wr_on_io, quantizer_bits ): """compute energy to bring tensors from SRAM to DRAM.""" if is_output_layer: if rd_wr_on_io: mode = "dram" else: mode = "sram" energy_mem = 0 tensor_shape = tensor_shape[1:] total_bits = keras.ops.cast(knp.prod(tensor_shape) * quantizer_bits, float) total_bits_log2 = knp.log2(max(total_bits, min_sram_size)) if mode == "dram": # load input from dram; wx_sizes[1]-> input x quantizer bits if rd_wr_on_io: # read input from sram # total_bits * sqrt(data_size/2^18)*0.3125 # bits1 = total_bits * OP["sram"]["mul_factor"](knp.prod(tensor_shape)) # energy_mem += OP["sram"]["rd"](bits1) energy_mem += knp.ceil(total_bits * OP["sram"]["mul_factor"]) * OP["sram"][ "rd" ](total_bits_log2) # write output to dram energy_mem += OP["dram"]["wr"](total_bits) elif mode == "sram": # write to sram # total_bits * sqrt(data_size/2^18)*0.3125 # bits1 = total_bits * OP["sram"]["mul_factor"](knp.prod(tensor_shape)) # energy_mem += OP["sram"]["wr"](bits1) energy_mem += knp.ceil(total_bits * OP["sram"]["mul_factor"]) * OP["sram"]["wr"]( total_bits_log2 ) return keras.ops.convert_to_numpy(energy_mem).ravel()[0]
[docs] def energy_estimate( model, layer_map, weights_on_memory, activations_on_memory, min_sram_size, rd_wr_on_io, ): """estimate energy.""" output_layers = layer_map["output_layers"] input_layers = layer_map["input_layers"] layer_data_type_map = layer_map["layer_data_type_map"] result = {} total_energy = 0 # compute MAC and memory access energy for intermediate layers for layer in model.layers: if layer not in layer_data_type_map.keys(): continue layer_item = layer_data_type_map[layer] input_quantizer_list = qtools_util.get_val(layer_item, "input_quantizer_list") operation_count = qtools_util.get_val(layer_item, "operation_count") output_shapes = qtools_util.get_val(layer_item, "output_shapes") output_quantizer = qtools_util.get_val(layer_item, "output_quantizer") is_input_layer = layer in input_layers is_output_layer = layer in output_layers input_rd_energy = 0 energy_op = 0 # Sicheres Auslesen der Input-Shape(s) if isinstance(layer.input, list): input_shapes = [inp.shape for inp in layer.input] else: input_shapes = [layer.input.shape] for input_shape, input_quantizer in zip(input_shapes, input_quantizer_list): input_rd_energy += memory_read_energy( is_input_layer, input_shape, activations_on_memory, min_sram_size, rd_wr_on_io, input_quantizer.bits, ) parameter_rd_energy = parameter_read_energy( layer, layer_item, weights_on_memory, min_sram_size, rd_wr_on_io ) output_wr_energy = memory_write_energy( is_output_layer, output_shapes, activations_on_memory, min_sram_size, rd_wr_on_io, output_quantizer.bits, ) # QActivation Layer if layer.__class__.__name__ in [ "QActivation", "QAdaptiveActivation", "Activation", ]: pass # QBN Layer elif layer.__class__.__name__ in ["QBatchNormalization", "BatchNormalization"]: # assume QBN is embedded with conv/dense layers # -> no memory read/write cost divider = layer_item["internal_divide_quantizer"] if divider: gate_factor = divider.gate_factor mode = divider.implemented_as() energy_op += gate_factor * OP[get_op_type(divider.output)][mode]( divider.gate_bits ) multiplier = layer_item["internal_multiplier"] if multiplier: gate_factor = multiplier.gate_factor mode = multiplier.implemented_as() energy_op += gate_factor * OP[get_op_type(multiplier.output)][mode]( multiplier.gate_bits ) energy_op *= operation_count # Merge layer elif layer.__class__.__name__ in ["Add", "Multiply", "Subtract"]: # multiply or add operation energy # TODO(lishanok): check energy for concatenate merge_quantizer = qtools_util.get_val(layer_item, "multiplier") mode = merge_quantizer.implemented_as() number_of_inputs = len( qtools_util.get_val(layer_item, "input_quantizer_list") ) gate_factor = merge_quantizer.gate_factor q = get_op_type(merge_quantizer.output) b = merge_quantizer.gate_bits energy_op = ( (number_of_inputs - 1) * operation_count * gate_factor * OP[q][mode](b) ) # AveragePooling and GlobalAveragePooling elif layer.__class__.__name__ in [ "AveragePooling2D", "AvgPool2D", "GlobalAvgPool2D", "GlobalAveragePooling2D", ]: # accumulation operation energy accumulator = qtools_util.get_val(layer_item, "accumulator") add_energy = OP[get_op_type(accumulator.output)]["add"]( accumulator.output.bits ) energy_op = operation_count * add_energy # MAC energy calculation elif layer.__class__.__name__ in [ "QConv2D", "QConv1D", "QDepthwiseConv2D", "QDense", "Conv2D", "Conv1D", "DepthwiseConv2D", "Dense", ]: multiplier = qtools_util.get_val(layer_item, "multiplier") accumulator = qtools_util.get_val(layer_item, "accumulator") # implementation mode: xor/andgate/shift etc. mode = multiplier.implemented_as() gate_factor = multiplier.gate_factor op = get_op_type(multiplier.output) bits = multiplier.gate_bits c1 = gate_factor * OP[op][mode](bits) c2 = OP[get_op_type(accumulator.output)]["add"](accumulator.output.bits) energy_op = operation_count * (c1 + c2) else: pass input_rd_energy = to_scalar(input_rd_energy) output_wr_energy = to_scalar(output_wr_energy) parameter_rd_energy = to_scalar(parameter_rd_energy) energy_op = to_scalar(energy_op) result[layer.name] = { "class_name": layer.__class__.__name__, "energy": { "inputs": float(f"{input_rd_energy:.2f}"), "outputs": float(f"{output_wr_energy:.2f}"), "parameters": float(f"{parameter_rd_energy:.2f}"), "op_cost": float(f"{energy_op:.2f}"), }, } total_energy += ( input_rd_energy + output_wr_energy + parameter_rd_energy + energy_op ) result["total_cost"] = int(total_energy) return result