# Copyright 2019 Google LLC
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Calculate energy consumption of a given quantized model."""
import keras
import keras.ops.numpy as knp
import numpy as np
from qkeras.qtools import qtools_util
from qkeras.qtools.generate_layer_data_type_map import KERAS_LAYERS, qkeras_LAYERS
from qkeras.qtools.quantized_operators.quantizer_impl import IQuantizer
from qkeras.qtools.settings import cfg
# Model based on:
# Mark Horowitz, Computing’s Energy Problem (and what we can
# do about it). IEEE ISSCC, pp. 10–14, 2014
# www.youtube.com/watch?v=eZdOkDtYMoo&feature=youtu.be&t=497
# all metrics converted to pJ/bit
OP = {
"fp32": {
"add": lambda x: max(cfg.fp32_add(x), 0),
"mul": lambda x: max(cfg.fp32_mul(x), 0),
},
"fp16": {
"add": lambda x: max(cfg.fp16_add(x), 0),
"mul": lambda x: max(cfg.fp16_mul(x), 0),
},
"fpm": {
"add": lambda x: max(cfg.fpm_add(x), 0),
"mux": lambda x: max(cfg.fpm_add(x), 0),
"xor": lambda x: max(cfg.fpm_add(x), 0),
"and": lambda x: max(cfg.fpm_add(x), 0),
"or": lambda x: max(cfg.fpm_add(x), 0),
"shifter": lambda x: max(cfg.fpm_add(x), 0),
"mul": lambda x: max(cfg.fpm_mul(x), 0),
},
"sram": {
"rd": lambda x: max(cfg.sram_rd(x), 0),
"wr": lambda x: max(cfg.sram_rd(x), 0),
"mul_factor": cfg.sram_mul_factor,
},
"dram": {
"rd": lambda x: max(cfg.dram_rd(x), 0),
"wr": lambda x: max(cfg.dram_rd(x), 0),
"mul_factor": cfg.dram_mul_factor,
},
}
[docs]
def to_scalar(x):
# Python scalar
if isinstance(x, (int, float, bool)):
return x
# NumPy scalar
if isinstance(x, np.generic):
return x.item()
# NumPy array
if isinstance(x, np.ndarray):
if x.size != 1:
raise ValueError(f"NumPy array has {x.size} elements, not 1")
return x.item()
# Keras backend tensor (immutable KerasTensor, JAX, Torch, TF eager…)
if keras.ops.is_tensor(x):
# Convert to NumPy via keras.ops
arr = keras.ops.convert_to_numpy(x)
if arr.size != 1:
raise ValueError(f"Keras tensor has {arr.size} elements, not 1")
return arr.item()
# Generic sequence wrapper (list/tuple) of length 1
if hasattr(x, "__len__") and len(x) == 1:
return to_scalar(x[0])
raise TypeError(f"Unsupported type {type(x)}")
[docs]
def get_op_type(quantizer):
assert isinstance(quantizer, IQuantizer)
if quantizer.is_floating_point:
return "fp" + str(quantizer.bits)
else:
return "fpm"
[docs]
def memory_read_energy(
is_input_layer,
tensor_shape,
mode,
min_sram_size,
rd_wr_on_io,
quantizer_bits,
is_tensor=True,
):
"""compute energy to bring tensors from DRAM to SRAM."""
if is_input_layer:
if rd_wr_on_io:
mode = "dram"
else:
mode = "sram"
energy_mem = 0
if is_tensor:
tensor_shape = tensor_shape[1:]
total_bits = keras.ops.cast(knp.prod(tensor_shape) * quantizer_bits, float)
total_bits_log2 = knp.log2(max(total_bits, min_sram_size))
if mode == "dram":
# load input from dram; wx_sizes[1]-> input x quantizer bits
# total_bits * 20
energy_mem += OP["dram"]["rd"](total_bits)
if rd_wr_on_io:
# write input to sram
# total_bits * sqrt(data_size/2^18)*0.3125
# bits1 = total_bits * OP["sram"]["mul_factor"](knp.prod(tensor_shape))
# energy_mem += OP["sram"]["wr"](bits1)
energy_mem += knp.ceil(total_bits * OP["sram"]["mul_factor"]) * OP["sram"][
"wr"
](total_bits_log2)
elif mode == "sram":
# read input from sram
# total_bits * sqrt(data_size/2^18)*0.3125
# bits1 = total_bits * OP["sram"]["mul_factor"](knp.prod(tensor_shape))
# energy_mem += OP["sram"]["rd"](bits1)
energy_mem += knp.ceil(total_bits * OP["sram"]["mul_factor"]) * OP["sram"]["rd"](
total_bits_log2
)
return energy_mem
[docs]
def parameter_read_energy(
layer, layer_item, weights_on_memory, min_sram_size, rd_wr_on_io
):
"""read weights/bias from memory."""
node_type = layer.__class__.__name__
rd_energy = 0
if node_type in ["QBatchNormalization", "BatchNormalization"]:
gamma_quantizer = layer_item["gamma_quantizer"]
beta_quantizer = layer_item["beta_quantizer"]
mean_quantizer = layer_item["mean_quantizer"]
variance_quantizer = layer_item["variance_quantizer"]
# gamma, beta, mean, stddev
weights = layer.get_weights()
s = len(weights[0])
for q in [gamma_quantizer, beta_quantizer, mean_quantizer, variance_quantizer]:
if q:
rd_energy += memory_read_energy(
False,
(s),
weights_on_memory,
min_sram_size,
rd_wr_on_io,
q.bits,
is_tensor=False,
)
elif node_type in qkeras_LAYERS or node_type in KERAS_LAYERS:
weight_quantizer = qtools_util.get_val(layer_item, "weight_quantizer")
w_shapes = qtools_util.get_val(layer_item, "w_shapes")
bias_quantizer = qtools_util.get_val(layer_item, "bias_quantizer")
b_shapes = qtools_util.get_val(layer_item, "b_shapes")
rd_energy += memory_read_energy(
False,
w_shapes,
weights_on_memory,
min_sram_size,
rd_wr_on_io,
weight_quantizer.bits,
is_tensor=False,
)
if bias_quantizer:
# if use_bias=0, no bias
bias_shapes = b_shapes
rd_energy += memory_read_energy(
False,
bias_shapes,
weights_on_memory,
min_sram_size,
rd_wr_on_io,
bias_quantizer.bits,
is_tensor=False,
)
return rd_energy
[docs]
def memory_write_energy(
is_output_layer, tensor_shape, mode, min_sram_size, rd_wr_on_io, quantizer_bits
):
"""compute energy to bring tensors from SRAM to DRAM."""
if is_output_layer:
if rd_wr_on_io:
mode = "dram"
else:
mode = "sram"
energy_mem = 0
tensor_shape = tensor_shape[1:]
total_bits = keras.ops.cast(knp.prod(tensor_shape) * quantizer_bits, float)
total_bits_log2 = knp.log2(max(total_bits, min_sram_size))
if mode == "dram":
# load input from dram; wx_sizes[1]-> input x quantizer bits
if rd_wr_on_io:
# read input from sram
# total_bits * sqrt(data_size/2^18)*0.3125
# bits1 = total_bits * OP["sram"]["mul_factor"](knp.prod(tensor_shape))
# energy_mem += OP["sram"]["rd"](bits1)
energy_mem += knp.ceil(total_bits * OP["sram"]["mul_factor"]) * OP["sram"][
"rd"
](total_bits_log2)
# write output to dram
energy_mem += OP["dram"]["wr"](total_bits)
elif mode == "sram":
# write to sram
# total_bits * sqrt(data_size/2^18)*0.3125
# bits1 = total_bits * OP["sram"]["mul_factor"](knp.prod(tensor_shape))
# energy_mem += OP["sram"]["wr"](bits1)
energy_mem += knp.ceil(total_bits * OP["sram"]["mul_factor"]) * OP["sram"]["wr"](
total_bits_log2
)
return keras.ops.convert_to_numpy(energy_mem).ravel()[0]
[docs]
def energy_estimate(
model,
layer_map,
weights_on_memory,
activations_on_memory,
min_sram_size,
rd_wr_on_io,
):
"""estimate energy."""
output_layers = layer_map["output_layers"]
input_layers = layer_map["input_layers"]
layer_data_type_map = layer_map["layer_data_type_map"]
result = {}
total_energy = 0
# compute MAC and memory access energy for intermediate layers
for layer in model.layers:
if layer not in layer_data_type_map.keys():
continue
layer_item = layer_data_type_map[layer]
input_quantizer_list = qtools_util.get_val(layer_item, "input_quantizer_list")
operation_count = qtools_util.get_val(layer_item, "operation_count")
output_shapes = qtools_util.get_val(layer_item, "output_shapes")
output_quantizer = qtools_util.get_val(layer_item, "output_quantizer")
is_input_layer = layer in input_layers
is_output_layer = layer in output_layers
input_rd_energy = 0
energy_op = 0
# Sicheres Auslesen der Input-Shape(s)
if isinstance(layer.input, list):
input_shapes = [inp.shape for inp in layer.input]
else:
input_shapes = [layer.input.shape]
for input_shape, input_quantizer in zip(input_shapes, input_quantizer_list):
input_rd_energy += memory_read_energy(
is_input_layer,
input_shape,
activations_on_memory,
min_sram_size,
rd_wr_on_io,
input_quantizer.bits,
)
parameter_rd_energy = parameter_read_energy(
layer, layer_item, weights_on_memory, min_sram_size, rd_wr_on_io
)
output_wr_energy = memory_write_energy(
is_output_layer,
output_shapes,
activations_on_memory,
min_sram_size,
rd_wr_on_io,
output_quantizer.bits,
)
# QActivation Layer
if layer.__class__.__name__ in [
"QActivation",
"QAdaptiveActivation",
"Activation",
]:
pass
# QBN Layer
elif layer.__class__.__name__ in ["QBatchNormalization", "BatchNormalization"]:
# assume QBN is embedded with conv/dense layers
# -> no memory read/write cost
divider = layer_item["internal_divide_quantizer"]
if divider:
gate_factor = divider.gate_factor
mode = divider.implemented_as()
energy_op += gate_factor * OP[get_op_type(divider.output)][mode](
divider.gate_bits
)
multiplier = layer_item["internal_multiplier"]
if multiplier:
gate_factor = multiplier.gate_factor
mode = multiplier.implemented_as()
energy_op += gate_factor * OP[get_op_type(multiplier.output)][mode](
multiplier.gate_bits
)
energy_op *= operation_count
# Merge layer
elif layer.__class__.__name__ in ["Add", "Multiply", "Subtract"]:
# multiply or add operation energy
# TODO(lishanok): check energy for concatenate
merge_quantizer = qtools_util.get_val(layer_item, "multiplier")
mode = merge_quantizer.implemented_as()
number_of_inputs = len(
qtools_util.get_val(layer_item, "input_quantizer_list")
)
gate_factor = merge_quantizer.gate_factor
q = get_op_type(merge_quantizer.output)
b = merge_quantizer.gate_bits
energy_op = (
(number_of_inputs - 1) * operation_count * gate_factor * OP[q][mode](b)
)
# AveragePooling and GlobalAveragePooling
elif layer.__class__.__name__ in [
"AveragePooling2D",
"AvgPool2D",
"GlobalAvgPool2D",
"GlobalAveragePooling2D",
]:
# accumulation operation energy
accumulator = qtools_util.get_val(layer_item, "accumulator")
add_energy = OP[get_op_type(accumulator.output)]["add"](
accumulator.output.bits
)
energy_op = operation_count * add_energy
# MAC energy calculation
elif layer.__class__.__name__ in [
"QConv2D",
"QConv1D",
"QDepthwiseConv2D",
"QDense",
"Conv2D",
"Conv1D",
"DepthwiseConv2D",
"Dense",
]:
multiplier = qtools_util.get_val(layer_item, "multiplier")
accumulator = qtools_util.get_val(layer_item, "accumulator")
# implementation mode: xor/andgate/shift etc.
mode = multiplier.implemented_as()
gate_factor = multiplier.gate_factor
op = get_op_type(multiplier.output)
bits = multiplier.gate_bits
c1 = gate_factor * OP[op][mode](bits)
c2 = OP[get_op_type(accumulator.output)]["add"](accumulator.output.bits)
energy_op = operation_count * (c1 + c2)
else:
pass
input_rd_energy = to_scalar(input_rd_energy)
output_wr_energy = to_scalar(output_wr_energy)
parameter_rd_energy = to_scalar(parameter_rd_energy)
energy_op = to_scalar(energy_op)
result[layer.name] = {
"class_name": layer.__class__.__name__,
"energy": {
"inputs": float(f"{input_rd_energy:.2f}"),
"outputs": float(f"{output_wr_energy:.2f}"),
"parameters": float(f"{parameter_rd_energy:.2f}"),
"op_cost": float(f"{energy_op:.2f}"),
},
}
total_energy += (
input_rd_energy + output_wr_energy + parameter_rd_energy + energy_op
)
result["total_cost"] = int(total_energy)
return result