Source code for qkeras.qdepthwiseconv2d_batchnorm

# Copyright 2020 Google LLC
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Fold batchnormalization with previous QDepthwiseConv2D layers."""

import keras
import keras.ops.numpy as knp
from keras import layers
from keras import ops as Kops

from .ops_portable import bias_add_portable
from .qconvolutional import QDepthwiseConv2D
from .quantizers import *


[docs] @register_keras_serializable(package="qkeras") class QDepthwiseConv2DBatchnorm(QDepthwiseConv2D): """Fold batchnormalization with a previous QDepthwiseConv2d layer.""" def __init__( self, # QDepthwiseConv2d params kernel_size, strides=(1, 1), padding="VALID", depth_multiplier=1, data_format=None, activation=None, use_bias=True, depthwise_initializer="he_normal", bias_initializer="zeros", depthwise_regularizer=None, bias_regularizer=None, activity_regularizer=None, depthwise_constraint=None, bias_constraint=None, dilation_rate=(1, 1), depthwise_quantizer=None, bias_quantizer=None, depthwise_range=None, bias_range=None, # batchnorm params axis=-1, momentum=0.99, epsilon=0.001, center=True, scale=True, beta_initializer="zeros", gamma_initializer="ones", moving_mean_initializer="zeros", moving_variance_initializer="ones", beta_regularizer=None, gamma_regularizer=None, beta_constraint=None, gamma_constraint=None, trainable=True, # other params ema_freeze_delay=None, folding_mode="ema_stats_folding", **kwargs, ): """A composite layer that folds depthwiseconv2d and batch normalization. The first group of parameters correponds to the initialization parameters of a QDepthwiseConv2d layer. check qkeras.qconvolutional.QDepthwiseConv2D for details. The 2nd group of parameters corresponds to the initialization parameters of a BatchNormalization layer. Check keras.layers.normalization.BatchNorma lizationBase for details. The 3rd group of parameters corresponds to the initialization parameters specific to this class. ema_freeze_delay: int or None. number of steps before batch normalization mv_mean and mv_variance will be frozen and used in the folded layer. folding_mode: string "ema_stats_folding": mimic tflite which uses the ema statistics to fold the kernel to suppress quantization induced jitter then performs the correction to have a similar effect of using the current batch statistics. "batch_stats_folding": use batch mean and variance to fold kernel first; after enough training steps switch to moving_mean and moving_variance for kernel folding. """ # pop legacy kwargs kwargs.pop("synchronized", None) kwargs.pop("renorm", None) kwargs.pop("renorm_clipping", None) kwargs.pop("renorm_momentum", None) # intialization the QDepthwiseConv2d part of the composite layer super().__init__( kernel_size=kernel_size, strides=strides, padding=padding, depth_multiplier=depth_multiplier, data_format=data_format, activation=activation, use_bias=use_bias, depthwise_initializer=depthwise_initializer, bias_initializer=bias_initializer, depthwise_regularizer=depthwise_regularizer, bias_regularizer=bias_regularizer, activity_regularizer=activity_regularizer, depthwise_constraint=depthwise_constraint, bias_constraint=bias_constraint, dilation_rate=dilation_rate, depthwise_quantizer=depthwise_quantizer, bias_quantizer=bias_quantizer, depthwise_range=depthwise_range, bias_range=bias_range, **kwargs, ) # initialization of batchnorm part of the composite layer self.batchnorm = layers.BatchNormalization( axis=axis, momentum=momentum, epsilon=epsilon, center=center, scale=scale, beta_initializer=beta_initializer, gamma_initializer=gamma_initializer, moving_mean_initializer=moving_mean_initializer, moving_variance_initializer=moving_variance_initializer, beta_regularizer=beta_regularizer, gamma_regularizer=gamma_regularizer, beta_constraint=beta_constraint, gamma_constraint=gamma_constraint, trainable=trainable, ) self.ema_freeze_delay = ema_freeze_delay assert folding_mode in ["ema_stats_folding", "batch_stats_folding"] self.folding_mode = folding_mode
[docs] def build(self, input_shape): super(QDepthwiseConv2DBatchnorm, self).build(input_shape) if self.data_format == "channels_last": channel_axis = -1 else: channel_axis = 1 in_channels = int(input_shape[channel_axis]) out_channels = in_channels * self.depth_multiplier if self.data_format == "channels_last": bn_input_shape = (input_shape[0], 1, 1, out_channels) else: # channels_first bn_input_shape = (input_shape[0], out_channels, 1, 1) self.batchnorm.build(bn_input_shape) # If start training from scratch, self._iteration (i.e., training_steps) # is initialized with -1. When loading ckpt, it can load the number of # training steps that have been previously trainied. # TODO(lishanok): develop a way to count iterations outside layer self._iteration = keras.Variable( -1, trainable=False, name="iteration", dtype="int64" )
[docs] def call(self, inputs, training=False): # default if training is None: training = False # Determine whether BatchNormalization layers should run in training mode. if (self.ema_freeze_delay is None) or (self.ema_freeze_delay < 0): bn_training = Kops.cast(training, dtype=bool) else: bn_training = Kops.logical_and( training, knp.less_equal(self._iteration, self.ema_freeze_delay) ) depthwise_kernel = self.depthwise_kernel # Depthwise Convolution conv_outputs = keras.ops.depthwise_conv( inputs, depthwise_kernel, strides=self.strides, padding=self.padding, dilation_rate=self.dilation_rate, data_format=self.data_format, ) if self.use_bias: bias = self.bias conv_outputs = bias_add_portable( conv_outputs, bias, data_format=self.data_format ) else: bias = 0 # TODO(makoeppel): the following code is hacky: # since self._iteration is counted inside the layer # `bn_training` will be always a tensor in graph mode, # which can not be passed like training=bn_training. # Therefore, we have to call self.batchnorm with `training` # first to perform a forward pass and then use `bn_training` # later in keras.ops.where to assign the correct values. gamma_prev = self.batchnorm.gamma beta_prev = self.batchnorm.beta mm_prev = self.batchnorm.moving_mean mv_prev = self.batchnorm.moving_variance _ = self.batchnorm(conv_outputs, training=training) gamma = Kops.where(bn_training, self.batchnorm.gamma, gamma_prev) beta = Kops.where(bn_training, self.batchnorm.beta, beta_prev) moving_mean = Kops.where(bn_training, self.batchnorm.moving_mean, mm_prev) moving_variance = Kops.where(bn_training, self.batchnorm.moving_variance, mv_prev) self.batchnorm.gamma.assign(gamma) self.batchnorm.beta.assign(beta) self.batchnorm.moving_mean.assign(moving_mean) self.batchnorm.moving_variance.assign(moving_variance) self._iteration.assign_add( keras.ops.where( Kops.cast(training, bool), knp.array(1, dtype="int64"), knp.array(0, dtype="int64") ) ) bn_shape = conv_outputs.shape ndims = len(bn_shape) axes = self.batchnorm.axis if isinstance(axes, int): axes = [axes] reduction_axes = [i for i in range(ndims) if i not in axes] keep_dims = len(axes) > 1 mean, variance = keras.ops.moments(conv_outputs, reduction_axes, keepdims=keep_dims) if self.folding_mode not in ["batch_stats_folding", "ema_stats_folding"]: raise ValueError(f"mode {self.folding_mode} not supported!") mv_inv = keras.ops.rsqrt(moving_variance + self.batchnorm.epsilon) batch_inv = keras.ops.rsqrt(variance + self.batchnorm.epsilon) if gamma is not None: mv_inv *= gamma batch_inv *= gamma folded_bias = keras.ops.where( Kops.cast(bn_training, bool), batch_inv * (bias - mean) + beta, mv_inv * (bias - moving_mean) + beta ) if self.folding_mode == "batch_stats_folding": inv = keras.ops.where( Kops.cast(bn_training, bool), batch_inv, mv_inv ) elif self.folding_mode == "ema_stats_folding": inv = mv_inv depthwise_weights_shape = [ depthwise_kernel.shape[2], depthwise_kernel.shape[3], ] inv = knp.reshape(inv, depthwise_weights_shape) folded_depthwise_kernel = inv * depthwise_kernel if self.depthwise_quantizer is not None: q_folded_depthwise_kernel = self.depthwise_quantizer_internal( folded_depthwise_kernel ) else: q_folded_depthwise_kernel = folded_depthwise_kernel if self.bias_quantizer is not None: q_folded_bias = self.bias_quantizer_internal(folded_bias) else: q_folded_bias = folded_bias applied_kernel = q_folded_depthwise_kernel applied_bias = q_folded_bias folded_outputs = keras.ops.depthwise_conv( inputs, applied_kernel, strides=self.strides, padding=self.padding, dilation_rate=self.dilation_rate, data_format=self.data_format, ) if training and self.folding_mode == "ema_stats_folding": y_corr = keras.ops.where( Kops.cast(bn_training, bool), knp.sqrt(moving_variance + self.batchnorm.epsilon) * keras.ops.rsqrt(variance + self.batchnorm.epsilon), 1 ) folded_outputs = folded_outputs * y_corr folded_outputs = bias_add_portable( folded_outputs, applied_bias, data_format=self.data_format ) if self.activation is not None: return self.activation(folded_outputs) return folded_outputs
[docs] def get_config(self): base_config = super().get_config() bn_config = self.batchnorm.get_config() config = { "ema_freeze_delay": self.ema_freeze_delay, "folding_mode": self.folding_mode, } name = base_config["name"] out_config = dict( list(base_config.items()) + list(bn_config.items()) + list(config.items()) ) # names from different config override each other; use the base layer name # as the this layer's config name out_config["name"] = name return out_config
[docs] def get_quantization_config(self): return { "depthwise_quantizer": str(self.depthwise_quantizer_internal), "bias_quantizer": str(self.bias_quantizer_internal), "activation": str(self.activation), "filters": str(self.filters), }
[docs] def get_quantizers(self): return self.quantizers
[docs] def get_folded_weights(self): """Function to get the batchnorm folded weights. This function converts the weights by folding batchnorm parameters into the weight of QDepthwiseConv2d. The high-level equation: W_fold = gamma * W / sqrt(variance + epsilon) bias_fold = gamma * (bias - moving_mean) / sqrt(variance + epsilon) + beta """ depthwise_kernel = self.depthwise_kernel if self.use_bias: bias = self.bias else: bias = 0 # get Batchnorm stats gamma = self.batchnorm.gamma beta = self.batchnorm.beta moving_mean = self.batchnorm.moving_mean moving_variance = self.batchnorm.moving_variance # get the inversion factor so that we replace division by multiplication inv = 1 / keras.ops.sqrt(moving_variance + self.batchnorm.epsilon) if gamma is not None: inv *= gamma # fold bias with bn stats folded_bias = inv * (bias - moving_mean) + beta # for DepthwiseConv2D inv needs to be broadcasted to the last 2 dimensions # of the kernels depthwise_weights_shape = [depthwise_kernel.shape[2], depthwise_kernel.shape[3]] inv = keras.ops.reshape(inv, depthwise_weights_shape) # wrap conv kernel with bn parameters folded_depthwise_kernel = inv * depthwise_kernel return [folded_depthwise_kernel, folded_bias]
[docs] def save_own_variables(self, store): super().save_own_variables(store) # Stores the value of the variable upon saving store["iteration"] = keras.ops.convert_to_numpy(self._iteration)
[docs] def load_own_variables(self, store): super().load_own_variables(store) # Assigns the value of the variable upon loading self._iteration.assign(store["iteration"])