# Copyright 2019 Google LLC
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Creates networkx graph from a model."""
import logging
import networkx as nx
from keras import backend as K
from keras import layers
from qkeras.qtools.quantized_operators import (
quantizer_factory as quantizer_factory_module,
)
from qkeras.qtools.settings import cfg
SOURCE = -1
SINK = -2
[docs]
def GraphRemoveNode(graph, v):
"""Removes node "v" from u -> v -> w, connecting u -> w."""
incoming = [u for u in graph.predecessors(v) if u != v]
outgoing = [w for w in graph.successors(v) if w != v]
# add incoming edges
for u in incoming:
for w in outgoing:
in_attr = graph[u][v]
out_attr = graph[v][w]
assert list(in_attr["shape"]) == list(out_attr["shape"])
graph.add_edges_from([(u, w, out_attr)])
graph.remove_node(v)
[docs]
def GraphRemoveNodeWithNodeType(graph, node_type):
"""Removes node with attribute node_type, reconnecting network."""
nodes_to_remove = [
v for v in graph.nodes if graph.nodes[v]["type"][-1] == node_type
]
for v in nodes_to_remove:
GraphRemoveNode(graph, v)
[docs]
def GraphAddSingleSourceSingleSink(graph):
"""Connects graph to source and sink nodes."""
edge_list = []
for u in graph.nodes:
if u == SOURCE or u == SINK:
continue
if graph.nodes[u]["type"][-1] == "InputLayer":
# If the layer has multiple nodes, you can use get_output_at(node_index)
tensor = graph.nodes[u]["layer"][-1].output
# if tf 1.0+, we can do tensor.shape with the same effect
shape = tensor.shape
shape = shape
edge_list.append(
(SOURCE, u, {"shape": shape, "tensor": tensor, "quantizer": None})
)
if graph.out_degree(u) == 0:
tensor = graph.nodes[u]["layer"][-1].output
shape = tensor.shape
edge_list.append(
(u, SINK, {"shape": shape, "tensor": tensor, "quantizer": None})
)
graph.add_edges_from(edge_list)
[docs]
def AddToNodeDict(layer_items, layer, nodes_dict):
"""Adds layer to a node_dict, indexed by layer.(input or output).ref"""
i_list = layer_items
if not isinstance(layer_items, list):
i_list = [i_list]
else:
i_list = [tmp for tmp in i_list]
for i in i_list:
# dict: tensor -> layers have this tensor as input
if i not in nodes_dict.keys():
nodes_dict[i] = [layer]
else:
nodes_dict[i].append(layer)
[docs]
def GenerateGraphFromModel(model, input_quantizers, default_source_quantizer):
"""Generates single source, single sink graph from model."""
# node represents layers with attributes [layer, type(class_name)]
# edge represents the tensor flowing between two layers,
# attributes is [tensor, output_shape, QA(activation quantizer]
# input_quantizers are tagged on the edge between input
# layer and the following layer
# generate a list of input quantizers
input_quantizer_list = GenerateInputQuantizerList(
input_quantizers, len(model.inputs), default_source_quantizer
)
# dict that map input_tensor to its quantizer
input_quantizer_map = {}
for idx, tensor in enumerate(model.inputs):
input_quantizer_map[tensor] = input_quantizer_list[idx]
graph = nx.DiGraph()
source = SOURCE
sink = SINK
node_list = [
(source, {"layer": [None], "type": [None], "out_quantizer": None}),
(sink, {"layer": [None], "type": [None], "out_quantizer": None}),
]
for i, layer in enumerate(model.layers):
node_type = layer.__class__.__name__
node = (i, {"layer": [layer], "type": [node_type], "out_quantizer": None})
node_list.append(node)
node_dict = {layer: i for i, layer in enumerate(model.layers)}
graph.add_nodes_from(node_list)
# nodes = tensors
in_nodes = {}
out_nodes = {}
for layer in model.layers:
AddToNodeDict(layer.input, layer, in_nodes)
AddToNodeDict(layer.output, layer, out_nodes)
# union of all tensors; non-redundant
attr_set = set(in_nodes.keys()) | set(out_nodes.keys())
# add edges. we want edges annotated with tensors and shapes
edge_list = []
for a in attr_set:
# for a given tensor a, find the layer u that outputs this tensor
# and the layer v that has this tensor as input
u_list = out_nodes.get(a, [None])
v_list = in_nodes.get(a, [None])
for u in u_list:
for v in v_list:
if not u or not v:
continue
o_shape = u.output.shape
# layer -> layer_id
u_id = node_dict[u]
v_id = node_dict[v]
# insert input_quantizers on the edge between
# input layer and its next layer
if a in input_quantizer_map.keys():
edge_list.append(
(
u_id,
v_id,
{
"shape": o_shape,
"tensor": a,
"quantizer": input_quantizer_map[a],
},
)
)
else:
edge_list.append(
(u_id, v_id, {"shape": o_shape, "tensor": a, "quantizer": None})
)
graph.add_edges_from(edge_list)
GraphAddHiddenInputLayer(model, graph, input_quantizer_map)
return (graph, input_quantizer_list)
[docs]
def GraphGetOutputs(graph):
"""Returns edges u->SINK that are outputs."""
predecessors = list(graph.predecessors(SINK))
output_tensors = []
for u in predecessors:
if u == SOURCE or u == SINK:
continue
output_tensors.append(graph[u][SINK])
return output_tensors
[docs]
def GraphPropagateActivationsToEdges(graph, debug=False):
"""Traverses graph and move activations to edges.
1.If current dense/conv layer is specified with QA:
outgoing edge (output data type) will be QA type
2.If current dense/conv layer has no QA:
default type (float32) is used as output
3.If current layer is QA layer:
float32 is used by default as output type on the edge
Args:
graph: graph to inject activations to.
debug: debug mode
Returns:
None
"""
scheduler = list(nx.topological_sort(graph))
for vertex in scheduler[1:-1]:
# get rid of source and sink vertex
if debug:
print("########### GraphPropagateActivationsToEdges ############")
print("vertex:", vertex)
for u, v in graph.edges(vertex):
# u=vertex, v: outgoing edge vertex
if debug:
print(" outgoing ->", v, graph.nodes[v]["layer"][0].name)
layer = graph.nodes[u]["layer"][0]
result = None
# if current layer has no QA specified
if not hasattr(layer, "activation"):
result = None
else:
activation_name = (
layer.activation.__name__
if hasattr(layer.activation, "__name__")
else None
)
q_activation_class_name = (
layer.activation.__class__.__name__
if hasattr(layer.activation, "__class__")
else None
)
if debug:
print(" layer type:", layer.__class__.__name__)
print(" activation object:", layer.activation)
print(" activation_name:", activation_name)
print(" q_activation_class_name:", q_activation_class_name)
if activation_name == "linear":
result = None
else:
result = layer.activation
if debug:
print(f" {u}->{v}: {result}")
graph[u][v]["quantizer"] = result
# all edge_quantizer is the same for all edges starting
# from current vertex to different nodes
graph.nodes[vertex]["out_quantizer"] = result
[docs]
def PrintGraph(graph, msg=""):
"""Print graph structure."""
print()
print(msg)
print()
print(
"nodes:",
[
(
u,
graph.nodes[u]["layer"][0].name
if graph.nodes[u]["layer"][0] is not None
else "",
graph.nodes[u]["type"],
)
for u in graph.nodes
],
)
print()
print(
"edges:",
[
(u, v, graph[u][v]["shape"], graph[u][v]["quantizer"])
for u, v in graph.edges
],
)
[docs]
def CreateGraph(
model,
input_quantizers=None,
default_source_quantizer=cfg.default_source_quantizer,
debug=False,
):
"""create graph."""
K.set_image_data_format("channels_last")
(graph, source_quantizer_list) = GenerateGraphFromModel(
model, input_quantizers, default_source_quantizer
)
GraphAddSingleSourceSingleSink(graph)
GraphRemoveNodeWithNodeType(graph, "Dropout")
GraphRemoveNodeWithNodeType(graph, "InputLayer")
scheduler = list(nx.topological_sort(graph))
if debug:
for vertex in scheduler[1:-1]:
for _, v in graph.edges(vertex):
if v == SINK:
continue
print(
"... calling",
graph.nodes[v]["layer"][0].name,
graph.nodes[v]["type"],
)
return (graph, source_quantizer_list)
[docs]
def GraphUpdateEdge(graph, node_id, quantizer_on_edge):
"""update the graph edges outgoing from node_id with new quantizer."""
for u, v in graph.edges(node_id):
graph[u][v]["quantizer"] = quantizer_on_edge