You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1077 lines
44 KiB
1077 lines
44 KiB
# -------------------------------------------------------------------------
|
|
# Copyright (c) Microsoft Corporation. All rights reserved.
|
|
# Licensed under the MIT License. See License.txt in the project root for
|
|
# license information.
|
|
# --------------------------------------------------------------------------
|
|
import logging
|
|
|
|
import numpy as np
|
|
import onnx
|
|
import onnx.numpy_helper
|
|
from onnx import onnx_pb as onnx_proto
|
|
|
|
from .onnx_model import ONNXModel
|
|
from .quant_utils import (
|
|
TENSOR_NAME_QUANT_SUFFIX,
|
|
QuantizationMode,
|
|
QuantizedValue,
|
|
QuantizedValueType,
|
|
QuantType,
|
|
__producer__,
|
|
__version__,
|
|
add_infer_metadata,
|
|
attribute_to_kwarg,
|
|
compute_scale_zp,
|
|
find_by_name,
|
|
get_qmin_qmax_for_qType,
|
|
get_qrange_for_qType,
|
|
model_has_infer_metadata,
|
|
quantize_data,
|
|
save_and_reload_model,
|
|
tensor_proto_to_array,
|
|
)
|
|
from .registry import CreateOpQuantizer
|
|
|
|
|
|
class ONNXQuantizer:
|
|
def __init__(
|
|
self,
|
|
model,
|
|
per_channel,
|
|
reduce_range,
|
|
mode,
|
|
static,
|
|
weight_qType,
|
|
activation_qType,
|
|
tensors_range,
|
|
nodes_to_quantize,
|
|
nodes_to_exclude,
|
|
op_types_to_quantize,
|
|
extra_options=None,
|
|
):
|
|
|
|
if not model_has_infer_metadata(model):
|
|
model = save_and_reload_model(model)
|
|
self.value_infos = {vi.name: vi for vi in model.graph.value_info}
|
|
self.value_infos.update({ot.name: ot for ot in model.graph.output})
|
|
self.value_infos.update({it.name: it for it in model.graph.input})
|
|
|
|
self.model = ONNXModel(model)
|
|
if not static:
|
|
self.model.replace_gemm_with_matmul()
|
|
|
|
self.per_channel = per_channel # weight-pack per channel
|
|
self.reduce_range = reduce_range
|
|
self.mode = mode # QuantizationMode.Value
|
|
self.static = static # use static quantization for inputs.
|
|
self.fuse_dynamic_quant = False
|
|
|
|
self.extra_options = extra_options if extra_options else {}
|
|
self.enable_subgraph_quantization = (
|
|
"EnableSubgraph" in self.extra_options and self.extra_options["EnableSubgraph"]
|
|
)
|
|
self.force_quantize_no_input_check = (
|
|
"ForceQuantizeNoInputCheck" in self.extra_options and self.extra_options["ForceQuantizeNoInputCheck"]
|
|
)
|
|
self.q_matmul_const_b_only = "MatMulConstBOnly" in self.extra_options and self.extra_options["MatMulConstBOnly"]
|
|
is_weight_int8 = weight_qType == QuantType.QInt8
|
|
self.is_weight_symmetric = (
|
|
is_weight_int8 if "WeightSymmetric" not in self.extra_options else self.extra_options["WeightSymmetric"]
|
|
)
|
|
self.is_activation_symmetric = (
|
|
False if "ActivationSymmetric" not in self.extra_options else self.extra_options["ActivationSymmetric"]
|
|
)
|
|
|
|
self.activation_qType = (
|
|
onnx_proto.TensorProto.INT8 if activation_qType == QuantType.QInt8 else onnx_proto.TensorProto.UINT8
|
|
)
|
|
self.weight_qType = (
|
|
onnx_proto.TensorProto.INT8 if weight_qType == QuantType.QInt8 else onnx_proto.TensorProto.UINT8
|
|
)
|
|
"""
|
|
Dictionary specifying the min and max values for tensors. It has following format:
|
|
{
|
|
"param_name": [min, max]
|
|
}
|
|
example:
|
|
{
|
|
'Conv_3:0': [np.float32(0), np.float32(0.5)],
|
|
'Conv_4:0': [np.float32(1), np.float32(3.5)]
|
|
}
|
|
"""
|
|
self.tensors_range = tensors_range
|
|
self.nodes_to_quantize = nodes_to_quantize # specific nodes to quantize
|
|
self.nodes_to_exclude = nodes_to_exclude # specific nodes to exclude
|
|
self.op_types_to_quantize = op_types_to_quantize
|
|
self.new_nodes = []
|
|
self.parent = None
|
|
self.graph_scope = "/" # for human readable debug information
|
|
self.tensor_names = {} # in case the shape inference not totally working
|
|
self.tensor_names.update({ot.name: 1 for ot in model.graph.output})
|
|
self.tensor_names.update({it.name: 1 for it in model.graph.input})
|
|
for node in self.model.model.graph.node:
|
|
self.tensor_names.update({output_name: 1 for output_name in node.output})
|
|
|
|
self.opset_version = self.check_opset_version()
|
|
|
|
if not self.mode in QuantizationMode:
|
|
raise ValueError("unsupported quantization mode {}".format(self.mode))
|
|
|
|
self.quantization_params = self.calculate_quantization_params()
|
|
|
|
# QuantizeRange tensor name and zero tensor name for scale and zero point calculation.
|
|
# Used when static is False
|
|
self.fixed_qrange_uint8_name = "fixed_quantization_range_uint8"
|
|
self.fixed_qrange_int8_name = "fixed_quantization_range_int8"
|
|
# For uint8 data-type, to compute zero point, we subtract rmin from 0 (represented by fixed_zero_name tensor)
|
|
self.fixed_zero_name = "fixed_zero"
|
|
# For int8 data-type, zero point is always zero (respresented by fixed_zero_point_name tensor)
|
|
self.fixed_zero_zp_name = "fixed_zero_zp"
|
|
|
|
# Map of all original value names to quantized value names
|
|
self.quantized_value_map = {}
|
|
# some output from nodes will be quantized, yet itself should be treat as existing so
|
|
# no dequantized will be applied when needed later
|
|
self.generated_value_names = self.model.get_non_initializer_inputs()
|
|
# to store specified scale and zeropoint instead of calculated value, tensor_name->(scale, zeropoint)
|
|
self.used_scale_zp_map = {}
|
|
|
|
# routines for subgraph support
|
|
def quantize_subgraph(self, subgraph, graph_key):
|
|
"""
|
|
generate submodel for the subgraph, so that we re-utilize current quantization implementation.
|
|
quantize the submodel
|
|
update subgraph and set it back to node
|
|
"""
|
|
warped_model = onnx.helper.make_model(
|
|
subgraph,
|
|
producer_name="onnx-quantizer",
|
|
opset_imports=self.model.model.opset_import,
|
|
)
|
|
add_infer_metadata(warped_model)
|
|
sub_quanitzer = ONNXQuantizer(
|
|
warped_model,
|
|
self.per_channel,
|
|
self.reduce_range,
|
|
self.mode,
|
|
self.static,
|
|
self.weight_qType,
|
|
self.activation_qType,
|
|
self.tensors_range,
|
|
self.nodes_to_quantize,
|
|
self.nodes_to_exclude,
|
|
self.op_types_to_quantize,
|
|
self.extra_options,
|
|
)
|
|
sub_quanitzer.parent = self
|
|
sub_quanitzer.graph_scope = "{}{}/".format(self.graph_scope, graph_key)
|
|
sub_quanitzer.quantize_model()
|
|
return sub_quanitzer.model.model.graph
|
|
|
|
def quantize_node_with_sub_graph(self, node):
|
|
"""
|
|
Check subgraph, if any, quantize it and replace it.
|
|
return new_nodes added for quantizing subgraph
|
|
"""
|
|
graph_attrs = [
|
|
attr
|
|
for attr in node.attribute
|
|
if attr.type == onnx.AttributeProto.GRAPH or attr.type == onnx.AttributeProto.GRAPHS
|
|
]
|
|
if len(graph_attrs) == 0:
|
|
return node
|
|
node_name = node.name if node.name != "" else "{}_node_count_{}".format(node.op_type, len(self.new_nodes))
|
|
kwargs = {}
|
|
for attr in node.attribute:
|
|
if attr.type == onnx.AttributeProto.GRAPH:
|
|
kv = {attr.name: self.quantize_subgraph(attr.g, "{}:{}".format(node_name, attr.name))}
|
|
elif attr.type == onnx.AttributeProto.GRAPHS:
|
|
value = []
|
|
for subgraph in attr.graphs:
|
|
value.extend(
|
|
[
|
|
self.quantize_subgraph(
|
|
subgraph,
|
|
"{}:{}:{}".format(node_name, attr.name, len(value)),
|
|
)
|
|
]
|
|
)
|
|
kv = {attr.name: value}
|
|
else:
|
|
kv = attribute_to_kwarg(attr)
|
|
kwargs.update(kv)
|
|
return onnx.helper.make_node(node.op_type, node.input, node.output, name=node.name, **kwargs)
|
|
|
|
def check_opset_version(self):
|
|
ai_onnx_domain = [
|
|
opset for opset in self.model.model.opset_import if not opset.domain or opset.domain == "ai.onnx"
|
|
]
|
|
if 1 != len(ai_onnx_domain):
|
|
raise ValueError("Failed to find proper ai.onnx domain")
|
|
opset_version = ai_onnx_domain[0].version
|
|
|
|
if opset_version == 10:
|
|
logging.warning(
|
|
"The original model opset version is {}, which does not support node fusions. Please update the model to opset >= 11 for better performance.".format(
|
|
opset_version
|
|
)
|
|
)
|
|
return 10
|
|
|
|
if opset_version < 10:
|
|
logging.warning(
|
|
"The original model opset version is {}, which does not support quantization. Please update the model to opset >= 11. Updating the model automatically to opset 11. Please verify the quantized model.".format(
|
|
opset_version
|
|
)
|
|
)
|
|
self.model.model.opset_import.remove(ai_onnx_domain[0])
|
|
self.model.model.opset_import.extend([onnx.helper.make_opsetid("", 11)])
|
|
opset_version = 11
|
|
|
|
self.fuse_dynamic_quant = True
|
|
return opset_version
|
|
|
|
def has_QDQ_nodes(self):
|
|
"""
|
|
Detect if model already has QuantizeLinear or DequantizeLinear.
|
|
"""
|
|
return any(
|
|
node.op_type == "QuantizeLinear" or node.op_type == "DequantizeLinear" for node in self.model.nodes()
|
|
)
|
|
|
|
def find_initializer_in_path(self, initializer_name):
|
|
if find_by_name(initializer_name, self.model.initializer()) is not None:
|
|
return True
|
|
if self.parent is not None:
|
|
return self.parent.find_initializer_in_path(initializer_name)
|
|
return False
|
|
|
|
def add_new_nodes(self, nodes):
|
|
self.new_nodes.extend(nodes)
|
|
for node in nodes:
|
|
for output_name in node.output:
|
|
self.generated_value_names.add(output_name)
|
|
|
|
def quantize_model(self):
|
|
if self.has_QDQ_nodes():
|
|
logging.warning(
|
|
"Please check if the model is already quantized."
|
|
"Note you don't need to quantize a QAT model. OnnxRuntime support to run QAT model directly."
|
|
)
|
|
|
|
for node in self.model.nodes():
|
|
# quantize subgraphes if have
|
|
if self.enable_subgraph_quantization:
|
|
node = self.quantize_node_with_sub_graph(node)
|
|
|
|
number_of_existing_new_nodes = len(self.new_nodes)
|
|
op_quantizer = CreateOpQuantizer(self, node)
|
|
op_quantizer.quantize()
|
|
for i in range(number_of_existing_new_nodes, len(self.new_nodes)):
|
|
for output_name in self.new_nodes[i].output:
|
|
self.generated_value_names.add(output_name)
|
|
|
|
self._dequantize_outputs()
|
|
|
|
# extend is used to append to the list for a protobuf fields
|
|
# https://developers.google.com/protocol-buffers/docs/reference/python-generated?csw=1#fields
|
|
self.model.graph().ClearField("node")
|
|
self.model.graph().node.extend(self.new_nodes)
|
|
|
|
# Remove ununsed initializers from graph, starting from the top level graph.
|
|
if self.parent is None:
|
|
_, initializers_not_found = self.model.clean_initializers()
|
|
if len(initializers_not_found) > 0:
|
|
raise RuntimeError("Invalid model with unknown initializers/tensors." + str(initializers_not_found))
|
|
|
|
self.model.model.producer_name = __producer__
|
|
self.model.model.producer_version = __version__
|
|
|
|
return self.model.model
|
|
|
|
def is_input_a_initializer(self, input_name):
|
|
initializer = find_by_name(input_name, self.model.initializer())
|
|
return initializer is not None
|
|
|
|
def is_per_channel(self):
|
|
return self.per_channel
|
|
|
|
def is_valid_quantize_weight(self, weight_name):
|
|
weight = find_by_name(weight_name, self.model.initializer())
|
|
if weight is not None:
|
|
return weight.data_type == onnx_proto.TensorProto.FLOAT
|
|
if (not self.enable_subgraph_quantization) or (self.parent is None):
|
|
return False
|
|
return self.parent.is_valid_quantize_weight(weight_name)
|
|
|
|
def is_float_tensor(self, tensor_name):
|
|
if self.is_input_a_initializer(tensor_name):
|
|
return self.is_valid_quantize_weight(tensor_name)
|
|
|
|
if tensor_name in self.value_infos.keys():
|
|
vi = self.value_infos[tensor_name]
|
|
if vi.type.HasField("tensor_type") and vi.type.tensor_type.elem_type == onnx_proto.TensorProto.FLOAT:
|
|
return True
|
|
elif self.enable_subgraph_quantization and self.parent:
|
|
return self.parent.is_float_tensor(tensor_name)
|
|
else:
|
|
logging.warning(
|
|
"Failed to infer data type of tensor: {}. Please add data type info for this tensor "
|
|
"if your model has customized operators.".format(tensor_name)
|
|
)
|
|
|
|
return False
|
|
|
|
def should_quantize_node(self, node):
|
|
if (
|
|
self.nodes_to_quantize is not None
|
|
and len(self.nodes_to_quantize) != 0
|
|
and node.name not in self.nodes_to_quantize
|
|
):
|
|
return False
|
|
|
|
if node.op_type not in self.op_types_to_quantize:
|
|
return False
|
|
|
|
if self.nodes_to_exclude is not None and node.name in self.nodes_to_exclude:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _get_dynamic_input_quantization_params(self, input_name, nodes_list, qType):
|
|
"""
|
|
Create nodes for dynamic quantization of input and add them to nodes_list.
|
|
parameter input_name: Name of the input.
|
|
parameter nodes_list: new nodes are appended to this list.
|
|
parameter qType: type to quantize to.
|
|
return: scale_name, zero_point_name, scale_shape, zero_point_shape.
|
|
"""
|
|
if qType == onnx_proto.TensorProto.INT8:
|
|
return self._get_dynamic_input_quantization_params_int8(input_name, nodes_list)
|
|
|
|
return self._get_dynamic_input_quantization_params_uint8(input_name, nodes_list)
|
|
|
|
def _get_dynamic_input_quantization_params_int8(self, input_name, nodes_list):
|
|
"""
|
|
Create nodes for dynamic quantization of input to int8 and add them to nodes_list
|
|
parameter input_name: Name of the input.
|
|
parameter nodes_list: new nodes are appended to this list.
|
|
return: scale_name, zero_point_name, scale_shape, zero_point_shape.
|
|
"""
|
|
qType = onnx_proto.TensorProto.INT8
|
|
|
|
# Reduce min and Reduce max
|
|
input_scale_name = input_name + "_scale"
|
|
|
|
reduce_min_name = input_name + "_ReduceMin"
|
|
reduce_min_node = onnx.helper.make_node(
|
|
"ReduceMin",
|
|
[input_name],
|
|
[reduce_min_name + ":0"],
|
|
reduce_min_name,
|
|
keepdims=0,
|
|
)
|
|
nodes_list.append(reduce_min_node)
|
|
|
|
reduce_max_name = input_name + "_ReduceMax"
|
|
reduce_max_node = onnx.helper.make_node(
|
|
"ReduceMax",
|
|
[input_name],
|
|
[reduce_max_name + ":0"],
|
|
reduce_max_name,
|
|
keepdims=0,
|
|
)
|
|
nodes_list.append(reduce_max_node)
|
|
|
|
# Compute scale
|
|
# Find abs(rmin)
|
|
reduce_min_abs_name = reduce_min_name + "_Abs"
|
|
reduce_min_abs_node = onnx.helper.make_node(
|
|
"Abs",
|
|
[reduce_min_node.output[0]],
|
|
[reduce_min_abs_name + ":0"],
|
|
reduce_min_abs_name,
|
|
)
|
|
nodes_list.append(reduce_min_abs_node)
|
|
# Find abs(rmax)
|
|
reduce_max_abs_name = reduce_max_name + "_Abs"
|
|
reduce_max_abs_node = onnx.helper.make_node(
|
|
"Abs",
|
|
[reduce_max_node.output[0]],
|
|
[reduce_max_abs_name + ":0"],
|
|
reduce_max_abs_name,
|
|
)
|
|
nodes_list.append(reduce_max_abs_node)
|
|
# Compute max of abs(rmin) and abs(rmax)
|
|
abs_max_name = input_name + "_Abs_Max"
|
|
abs_max_node = onnx.helper.make_node(
|
|
"Max",
|
|
[reduce_min_abs_node.output[0], reduce_max_abs_node.output[0]],
|
|
[abs_max_name + ":0"],
|
|
abs_max_name,
|
|
)
|
|
nodes_list.append(abs_max_node)
|
|
# and divide by (quantize_range/2.0) which will be equal to max(...)*2.0/quantize_range
|
|
initializer_div = onnx.helper.make_tensor(
|
|
self.fixed_qrange_int8_name,
|
|
onnx_proto.TensorProto.FLOAT,
|
|
[],
|
|
[get_qrange_for_qType(qType) / 2.0],
|
|
)
|
|
self.model.add_initializer(initializer_div)
|
|
scale_div_name = input_name + "scale_Div"
|
|
scale_div_node = onnx.helper.make_node(
|
|
"Div",
|
|
[abs_max_node.output[0], self.fixed_qrange_int8_name],
|
|
[input_scale_name],
|
|
scale_div_name,
|
|
)
|
|
nodes_list.append(scale_div_node)
|
|
|
|
# Zero point
|
|
initializer_zp = onnx.helper.make_tensor(self.fixed_zero_zp_name, qType, [], [0])
|
|
self.model.add_initializer(initializer_zp)
|
|
|
|
return input_scale_name, self.fixed_zero_zp_name, [], []
|
|
|
|
def _get_dynamic_input_quantization_params_uint8(self, input_name, nodes_list):
|
|
"""
|
|
Create nodes for dynamic quantization of input to uint8 and add them to nodes_list
|
|
parameter input_name: Name of the input.
|
|
parameter nodes_list: new nodes are appended to this list.
|
|
return: scale_name, zero_point_name, scale_shape, zero_point_shape.
|
|
"""
|
|
qType = onnx_proto.TensorProto.UINT8
|
|
# Reduce min and Reduce max
|
|
input_scale_name = input_name + "_scale"
|
|
input_zp_name = input_name + "_zero_point"
|
|
|
|
reduce_min_name = input_name + "_ReduceMin"
|
|
reduce_min_node = onnx.helper.make_node(
|
|
"ReduceMin",
|
|
[input_name],
|
|
[reduce_min_name + ":0"],
|
|
reduce_min_name,
|
|
keepdims=0,
|
|
)
|
|
nodes_list.append(reduce_min_node)
|
|
|
|
reduce_max_name = input_name + "_ReduceMax"
|
|
reduce_max_node = onnx.helper.make_node(
|
|
"ReduceMax",
|
|
[input_name],
|
|
[reduce_max_name + ":0"],
|
|
reduce_max_name,
|
|
keepdims=0,
|
|
)
|
|
nodes_list.append(reduce_max_node)
|
|
|
|
# Add tensors for quantize range and zero value.
|
|
initializer_qrange = onnx.helper.make_tensor(
|
|
self.fixed_qrange_uint8_name,
|
|
onnx_proto.TensorProto.FLOAT,
|
|
[],
|
|
[get_qrange_for_qType(qType)],
|
|
)
|
|
self.model.add_initializer(initializer_qrange)
|
|
initializer_qvalue = onnx.helper.make_tensor(self.fixed_zero_name, onnx_proto.TensorProto.FLOAT, [], [0.0])
|
|
self.model.add_initializer(initializer_qvalue)
|
|
|
|
# Compute Scale
|
|
# Subtract rmax and rmin
|
|
scale_sub_name = input_name + "_scale_Sub"
|
|
scale_sub_node = onnx.helper.make_node(
|
|
"Sub",
|
|
[reduce_max_node.output[0], reduce_min_node.output[0]],
|
|
[scale_sub_name + ":0"],
|
|
scale_sub_name,
|
|
)
|
|
nodes_list.append(scale_sub_node)
|
|
# and divide by quantize range
|
|
scale_div_name = input_name + "_scale_Div"
|
|
scale_div_node = onnx.helper.make_node(
|
|
"Div",
|
|
[scale_sub_node.output[0], self.fixed_qrange_uint8_name],
|
|
[input_scale_name],
|
|
scale_div_name,
|
|
)
|
|
nodes_list.append(scale_div_node)
|
|
|
|
# Compute zero point
|
|
# Subtract zero and rmin
|
|
zp_sub_name = input_name + "_zero_point_Sub"
|
|
zp_sub_node = onnx.helper.make_node(
|
|
"Sub",
|
|
[self.fixed_zero_name, reduce_min_node.output[0]],
|
|
[zp_sub_name + ":0"],
|
|
zp_sub_name,
|
|
)
|
|
nodes_list.append(zp_sub_node)
|
|
# Divide by scale
|
|
zp_div_name = input_name + "_zero_point_Div"
|
|
zp_div_node = onnx.helper.make_node(
|
|
"Div",
|
|
[zp_sub_node.output[0], input_scale_name],
|
|
[zp_div_name + ":0"],
|
|
zp_div_name,
|
|
)
|
|
nodes_list.append(zp_div_node)
|
|
# Compute floor
|
|
zp_floor_name = input_name + "_zero_point_Floor"
|
|
zp_floor_node = onnx.helper.make_node("Floor", zp_div_node.output, [zp_floor_name + ":0"], zp_floor_name)
|
|
nodes_list.append(zp_floor_node)
|
|
# Cast to integer
|
|
zp_cast_name = input_name + "_zero_point_Cast"
|
|
zp_cast_node = onnx.helper.make_node("Cast", zp_floor_node.output, [input_zp_name], zp_cast_name, to=qType)
|
|
nodes_list.append(zp_cast_node)
|
|
|
|
return input_scale_name, input_zp_name, [], []
|
|
|
|
def _get_quantization_params(self, param_name, use_scale=None, use_zeropoint=None):
|
|
"""
|
|
Create initializers and inputs in the graph for zero point and scale of output.
|
|
Zero point and scale values are obtained from self.quantization_params if specified.
|
|
parameter param_name: Name of the quantization parameter.
|
|
return: result, scale_name, zero_point_name, scale_shape, zero_point_shape.
|
|
"""
|
|
if use_scale is None or use_zeropoint is None:
|
|
if self.quantization_params is None or param_name not in self.quantization_params:
|
|
logging.info('Quantization parameters for tensor:"{}" not specified'.format(param_name))
|
|
return False, "", "", "", ""
|
|
|
|
params = self.quantization_params[param_name]
|
|
if params is None or len(params) != 2:
|
|
raise ValueError(
|
|
"Quantization parameters should contain zero point and scale. "
|
|
"Specified values for output {}: {}".format(param_name, params)
|
|
)
|
|
|
|
zero_point_values = [params[0]]
|
|
scale_values = [params[1]]
|
|
else:
|
|
zero_point_values = [use_zeropoint]
|
|
scale_values = [use_scale]
|
|
|
|
zero_point_shape = []
|
|
zero_point_name = param_name + "_zero_point"
|
|
zero_point_type = self.activation_qType
|
|
scale_shape = []
|
|
scale_name = param_name + "_scale"
|
|
|
|
# Add initializers
|
|
init_zp = onnx.helper.make_tensor(zero_point_name, zero_point_type, zero_point_shape, zero_point_values)
|
|
self.model.add_initializer(init_zp)
|
|
init_scale = onnx.helper.make_tensor(scale_name, onnx_proto.TensorProto.FLOAT, scale_shape, scale_values)
|
|
self.model.add_initializer(init_scale)
|
|
|
|
return True, scale_name, zero_point_name, scale_shape, zero_point_shape
|
|
|
|
def _get_quantize_input_nodes(self, node, input_index, qType, given_scale_name=None, given_zp_name=None):
|
|
"""
|
|
Given an input for a node (which is not a initializer), this function
|
|
|
|
- add nodes to compute zero point and scale for this input if they don't exist.
|
|
- add new QuantizeLinear node to quantize the input.
|
|
|
|
:param node: node being quantized in NodeProto format.
|
|
:param input_index: index of input in node.input.
|
|
:param qType: type to quantize to.
|
|
:param given_scale_name: if those inputs need to be quanitzed using this scale tensor.
|
|
:param given_zp_name: if those inputs to be quantized using this zeropoint tensor.
|
|
:return: List of newly created nodes in NodeProto format.
|
|
"""
|
|
input_name = node.input[input_index]
|
|
output_name = input_name + TENSOR_NAME_QUANT_SUFFIX
|
|
ql_node_name = input_name + "_QuantizeLinear"
|
|
|
|
if (given_scale_name is not None) and (given_zp_name is not None):
|
|
data_found, scale_name, zp_name = (True, given_scale_name, given_zp_name)
|
|
else:
|
|
data_found, scale_name, zp_name, _, _ = self._get_quantization_params(input_name)
|
|
|
|
nodes = []
|
|
if data_found:
|
|
qlinear_node = onnx.helper.make_node(
|
|
"QuantizeLinear",
|
|
[input_name, scale_name, zp_name],
|
|
[output_name],
|
|
ql_node_name,
|
|
)
|
|
else:
|
|
if self.static:
|
|
return None
|
|
# dynamic mode
|
|
# Scale and Zero Points not available for this input. Add nodes to dynamically compute it
|
|
if self.fuse_dynamic_quant and qType == onnx_proto.TensorProto.UINT8:
|
|
scale_name = input_name + "_scale"
|
|
zp_name = input_name + "_zero_point"
|
|
qlinear_node = onnx.helper.make_node(
|
|
"DynamicQuantizeLinear",
|
|
[input_name],
|
|
[output_name, scale_name, zp_name],
|
|
ql_node_name,
|
|
)
|
|
else:
|
|
(
|
|
scale_name,
|
|
zp_name,
|
|
scale_shape,
|
|
zp_shape,
|
|
) = self._get_dynamic_input_quantization_params(input_name, nodes, qType)
|
|
qlinear_node = onnx.helper.make_node(
|
|
"QuantizeLinear",
|
|
[input_name, scale_name, zp_name],
|
|
[output_name],
|
|
ql_node_name,
|
|
)
|
|
|
|
self.quantized_value_map[input_name] = QuantizedValue(input_name, output_name, scale_name, zp_name, qType)
|
|
return nodes + [qlinear_node]
|
|
|
|
def set_quant_scale_zp(self, tensor_name, value):
|
|
assert isinstance(value, tuple) and len(value) == 2, "value must be scale(float) and zeropoint"
|
|
assert tensor_name not in self.used_scale_zp_map, f"{tensor_name} has been setted before"
|
|
self.used_scale_zp_map[tensor_name] = value
|
|
|
|
def find_quant_scale_zp(self, input_name):
|
|
if input_name in self.used_scale_zp_map:
|
|
return self.used_scale_zp_map[input_name]
|
|
if self.parent is not None:
|
|
return self.parent.find_quantized_value(input_name)
|
|
return (None, None)
|
|
|
|
def find_quantized_value(self, input_name):
|
|
if input_name in self.quantized_value_map:
|
|
return self.quantized_value_map[input_name]
|
|
if self.parent is not None:
|
|
return self.parent.find_quantized_value(input_name)
|
|
return None
|
|
|
|
def quantize_bias_static(self, bias_name, input_name, weight_name, beta=1.0):
|
|
"""
|
|
Quantized the bias. Zero Point == 0 and Scale == Input_Scale * Weight_Scale
|
|
"""
|
|
|
|
# Handle case where bias already in quantization map
|
|
if bias_name in self.quantized_value_map:
|
|
return self.quantized_value_map[bias_name].q_name
|
|
|
|
# get scale for weight
|
|
weight_scale_name = self.quantized_value_map[weight_name].scale_name
|
|
weight_initializer = find_by_name(weight_scale_name, self.model.initializer())
|
|
weight_scale = tensor_proto_to_array(weight_initializer)
|
|
|
|
# get bias
|
|
bias_initializer = find_by_name(bias_name, self.model.initializer())
|
|
bias_data = tensor_proto_to_array(bias_initializer)
|
|
quantized_bias_name = bias_name + TENSOR_NAME_QUANT_SUFFIX
|
|
|
|
# get scale for input
|
|
if input_name in self.quantized_value_map:
|
|
input_scale_name = self.quantized_value_map[input_name].scale_name
|
|
elif input_name in self.quantization_params:
|
|
_, input_scale_name, _, _, _ = self._get_quantization_params(input_name)
|
|
else:
|
|
raise ValueError("Expected {} to be in quantized value map for static quantization".format(input_name))
|
|
|
|
inputscale_initializer = find_by_name(input_scale_name, self.model.initializer())
|
|
input_scale = tensor_proto_to_array(inputscale_initializer)
|
|
|
|
# calcuate scale for bias
|
|
bias_scale = input_scale * weight_scale * beta
|
|
|
|
# quantize bias
|
|
quantized_data = (np.asarray(bias_data) / bias_scale).round().astype(np.int32)
|
|
|
|
# update bias initializer
|
|
bias_np_data = np.asarray(quantized_data, dtype=np.int32).reshape(bias_initializer.dims)
|
|
packed_bias_initializer = onnx.numpy_helper.from_array(bias_np_data, quantized_bias_name)
|
|
self.model.initializer().extend([packed_bias_initializer])
|
|
|
|
# update scale initializer
|
|
quantized_bias_scale_name = quantized_bias_name + "_scale"
|
|
bias_scale_data = np.asarray(bias_scale, dtype=np.float32).reshape(-1)
|
|
if self.is_per_channel():
|
|
packed_bias_scale_initializer = onnx.numpy_helper.from_array(bias_scale_data, quantized_bias_scale_name)
|
|
else:
|
|
packed_bias_scale_initializer = onnx.helper.make_tensor(
|
|
quantized_bias_scale_name, onnx_proto.TensorProto.FLOAT, [], bias_scale_data
|
|
)
|
|
self.model.initializer().extend([packed_bias_scale_initializer])
|
|
|
|
# update zero initializer
|
|
quantized_bias_zp_name = quantized_bias_name + "_zero_point"
|
|
bias_zp_data = np.zeros(bias_scale.shape, dtype=np.int32).reshape(-1)
|
|
if self.is_per_channel():
|
|
packed_bias_zp_initializer = onnx.numpy_helper.from_array(bias_zp_data, quantized_bias_zp_name)
|
|
else:
|
|
packed_bias_zp_initializer = onnx.helper.make_tensor(
|
|
quantized_bias_zp_name, onnx_proto.TensorProto.INT32, [], bias_zp_data
|
|
)
|
|
self.model.initializer().extend([packed_bias_zp_initializer])
|
|
|
|
assert bias_name not in self.quantized_value_map
|
|
quantized_value = QuantizedValue(
|
|
bias_name,
|
|
quantized_bias_name,
|
|
quantized_bias_scale_name,
|
|
quantized_bias_zp_name,
|
|
QuantizedValueType.Initializer,
|
|
0 if bias_scale_data.size > 1 else None,
|
|
)
|
|
self.quantized_value_map[bias_name] = quantized_value
|
|
|
|
return quantized_bias_name
|
|
|
|
def contains_tensor(self, tensor_name):
|
|
"""
|
|
only check for value info and newly generated tensor names, initializers are checked separately
|
|
"""
|
|
return (
|
|
(tensor_name in self.value_infos)
|
|
or (tensor_name in self.tensor_names)
|
|
or (tensor_name in self.generated_value_names)
|
|
)
|
|
|
|
def quantize_activation(self, node, indices, from_subgraph=False):
|
|
return self.__quantize_inputs(
|
|
node=node,
|
|
indices=indices,
|
|
initializer_use_weight_qType=False,
|
|
reduce_range=False,
|
|
op_level_per_channel=False,
|
|
axis=-1,
|
|
from_subgraph=from_subgraph,
|
|
)
|
|
|
|
# In some circumstances a weight is not an initializer, for example of MatMul, if both A and B are not
|
|
# initializer, B can still be considered as Weight
|
|
def quantize_weight(
|
|
self,
|
|
node,
|
|
indices,
|
|
reduce_range=False,
|
|
op_level_per_channel=False,
|
|
axis=-1,
|
|
from_subgraph=False,
|
|
):
|
|
return self.__quantize_inputs(
|
|
node=node,
|
|
indices=indices,
|
|
initializer_use_weight_qType=True,
|
|
reduce_range=reduce_range,
|
|
op_level_per_channel=op_level_per_channel,
|
|
axis=axis,
|
|
from_subgraph=from_subgraph,
|
|
)
|
|
|
|
def __quantize_inputs(
|
|
self,
|
|
node,
|
|
indices,
|
|
initializer_use_weight_qType=True,
|
|
reduce_range=False,
|
|
op_level_per_channel=False,
|
|
axis=-1,
|
|
from_subgraph=False,
|
|
):
|
|
"""
|
|
Given a node, this function quantizes the inputs as follows:
|
|
- If input is an initializer, quantize the initializer data, replace old initializer
|
|
with new initializer
|
|
- Else, add QuantizeLinear nodes to perform quantization
|
|
parameter node: node being quantized in NodeProto format.
|
|
parameter indices: input indices to quantize.
|
|
return: (List of quantized input names,
|
|
List of zero point names used for input quantization,
|
|
List of scale names used for input quantization,
|
|
List of new QuantizeLinear nodes created)
|
|
"""
|
|
|
|
scale_names = []
|
|
zero_point_names = []
|
|
quantized_input_names = []
|
|
nodes = []
|
|
|
|
for input_index in indices:
|
|
node_input = node.input[input_index]
|
|
|
|
# Find if this input is already quantized
|
|
if node_input in self.quantized_value_map:
|
|
quantized_value = self.quantized_value_map[node_input]
|
|
scale_names.append(quantized_value.scale_name)
|
|
zero_point_names.append(quantized_value.zp_name)
|
|
quantized_input_names.append(quantized_value.q_name)
|
|
continue
|
|
|
|
# Quantize the input
|
|
initializer = find_by_name(node_input, self.model.initializer())
|
|
if initializer is not None:
|
|
if self.per_channel and op_level_per_channel:
|
|
(q_weight_name, zp_name, scale_name,) = self.quantize_weight_per_channel(
|
|
initializer.name,
|
|
self.weight_qType if initializer_use_weight_qType else self.activation_qType,
|
|
axis,
|
|
reduce_range,
|
|
)
|
|
else:
|
|
q_weight_name, zp_name, scale_name = self.quantize_initializer(
|
|
initializer,
|
|
self.weight_qType if initializer_use_weight_qType else self.activation_qType,
|
|
reduce_range,
|
|
)
|
|
|
|
quantized_input_names.append(q_weight_name)
|
|
zero_point_names.append(zp_name)
|
|
scale_names.append(scale_name)
|
|
elif self.contains_tensor(node_input):
|
|
# Add QuantizeLinear node.
|
|
qlinear_node = self.model.find_node_by_name(
|
|
node_input + "_QuantizeLinear", self.new_nodes, self.model.graph()
|
|
)
|
|
if qlinear_node is None:
|
|
quantize_input_nodes = self._get_quantize_input_nodes(node, input_index, self.activation_qType)
|
|
if quantize_input_nodes is None:
|
|
return (None, None, None, None)
|
|
if from_subgraph:
|
|
self.add_new_nodes(quantize_input_nodes)
|
|
else:
|
|
nodes.extend(quantize_input_nodes)
|
|
qlinear_node = quantize_input_nodes[-1]
|
|
|
|
if qlinear_node.op_type == "QuantizeLinear":
|
|
quantized_input_names.extend(qlinear_node.output)
|
|
scale_names.append(qlinear_node.input[1])
|
|
zero_point_names.append(qlinear_node.input[2])
|
|
else:
|
|
quantized_input_names.append(qlinear_node.output[0])
|
|
scale_names.append(qlinear_node.output[1])
|
|
zero_point_names.append(qlinear_node.output[2])
|
|
elif self.parent is not None:
|
|
(
|
|
parent_quantized_input_names,
|
|
parent_zero_point_names,
|
|
parent_scale_names,
|
|
_,
|
|
) = self.parent.__quantize_inputs(
|
|
node,
|
|
[input_index],
|
|
initializer_use_weight_qType=initializer_use_weight_qType,
|
|
reduce_range=reduce_range,
|
|
op_level_per_channel=op_level_per_channel,
|
|
axis=axis,
|
|
from_subgraph=True,
|
|
)
|
|
quantized_input_names.append(parent_quantized_input_names[0])
|
|
scale_names.append(parent_scale_names[0])
|
|
zero_point_names.append(parent_zero_point_names[0])
|
|
# node should not be add this child level here
|
|
else:
|
|
raise ValueError(
|
|
"Invalid tensor name to quantize: {} @graph scope{}".format(node_input, self.graph_scope)
|
|
)
|
|
|
|
return quantized_input_names, zero_point_names, scale_names, nodes
|
|
|
|
def quantize_initializer(self, weight, qType, reduce_range=False, keep_float_weight=False):
|
|
"""
|
|
:param weight: TensorProto initializer
|
|
:param qType: type to quantize to
|
|
:param keep_float_weight: Whether to quantize the weight. In some cases, we only want to qunatize scale and zero point.
|
|
If keep_float_weight is False, quantize the weight, or don't quantize the weight.
|
|
:return: quantized weight name, zero point name, scale name
|
|
"""
|
|
# Find if this input is already quantized
|
|
if weight.name in self.quantized_value_map:
|
|
quantized_value = self.quantized_value_map[weight.name]
|
|
return (
|
|
quantized_value.q_name,
|
|
quantized_value.zp_name,
|
|
quantized_value.scale_name,
|
|
)
|
|
|
|
q_weight_name = weight.name + TENSOR_NAME_QUANT_SUFFIX
|
|
zp_name = weight.name + "_zero_point"
|
|
scale_name = weight.name + "_scale"
|
|
|
|
# Update packed weight, zero point, and scale initializers
|
|
weight_data = tensor_proto_to_array(weight)
|
|
_, _, zero_point, scale, q_weight_data = quantize_data(
|
|
weight_data.flatten().tolist(),
|
|
qType,
|
|
self.is_weight_symmetric,
|
|
self.reduce_range and reduce_range,
|
|
)
|
|
scale_initializer = onnx.helper.make_tensor(scale_name, onnx_proto.TensorProto.FLOAT, [], [scale])
|
|
zero_initializer = onnx.helper.make_tensor(zp_name, qType, [], [zero_point])
|
|
self.model.initializer().extend([scale_initializer, zero_initializer])
|
|
|
|
if not keep_float_weight:
|
|
q_weight_data = np.asarray(q_weight_data, dtype=onnx.mapping.TENSOR_TYPE_TO_NP_TYPE[qType]).reshape(
|
|
weight.dims
|
|
)
|
|
q_weight_initializer = onnx.numpy_helper.from_array(q_weight_data, q_weight_name)
|
|
self.model.initializer().extend([q_weight_initializer])
|
|
|
|
# Log entry for this quantized weight
|
|
quantized_value = QuantizedValue(
|
|
weight.name,
|
|
q_weight_name,
|
|
scale_name,
|
|
zp_name,
|
|
QuantizedValueType.Initializer,
|
|
None,
|
|
)
|
|
self.quantized_value_map[weight.name] = quantized_value
|
|
|
|
return q_weight_name, zp_name, scale_name
|
|
|
|
def quantize_weight_per_channel(
|
|
self,
|
|
weight_name,
|
|
weight_qType,
|
|
channel_axis,
|
|
reduce_range=True,
|
|
keep_float_weight=False,
|
|
):
|
|
# Find if this input is already quantized
|
|
if weight_name in self.quantized_value_map:
|
|
quantized_value = self.quantized_value_map[weight_name]
|
|
return (
|
|
quantized_value.q_name,
|
|
quantized_value.zp_name,
|
|
quantized_value.scale_name,
|
|
)
|
|
|
|
initializer = find_by_name(weight_name, self.model.initializer())
|
|
if initializer is None:
|
|
raise ValueError("{} is not an initializer", weight_name)
|
|
|
|
weights = tensor_proto_to_array(initializer)
|
|
channel_count = weights.shape[channel_axis]
|
|
rmin_list = []
|
|
rmax_list = []
|
|
zero_point_list = []
|
|
scale_list = []
|
|
quantized_per_channel_data_list = []
|
|
for i in range(channel_count):
|
|
per_channel_data = weights.take(i, channel_axis)
|
|
rmin, rmax, zero_point, scale, quantized_per_channel_data = quantize_data(
|
|
per_channel_data.flatten().tolist(),
|
|
weight_qType,
|
|
self.is_weight_symmetric or weight_qType == onnx_proto.TensorProto.INT8,
|
|
self.reduce_range and reduce_range,
|
|
)
|
|
rmin_list.append(rmin)
|
|
rmax_list.append(rmax)
|
|
zero_point_list.append(zero_point)
|
|
scale_list.append(scale)
|
|
quantized_per_channel_data_list.append(quantized_per_channel_data)
|
|
|
|
# combine per_channel_data into one
|
|
reshape_dims = list(weights.shape) # deep copy
|
|
reshape_dims[channel_axis] = 1 # only one per channel for reshape
|
|
quantized_weights = np.asarray(quantized_per_channel_data_list[0]).reshape(reshape_dims)
|
|
for i in range(1, len(quantized_per_channel_data_list)):
|
|
channel_weights = np.asarray(quantized_per_channel_data_list[i]).reshape(reshape_dims)
|
|
quantized_weights = np.concatenate((quantized_weights, channel_weights), channel_axis)
|
|
|
|
q_weight_name = weight_name + TENSOR_NAME_QUANT_SUFFIX
|
|
zp_name = weight_name + "_zero_point"
|
|
scale_name = weight_name + "_scale"
|
|
|
|
quantized_value = QuantizedValue(
|
|
weight_name,
|
|
q_weight_name,
|
|
scale_name,
|
|
zp_name,
|
|
QuantizedValueType.Initializer,
|
|
None,
|
|
)
|
|
self.quantized_value_map[weight_name] = quantized_value
|
|
|
|
# Update packed weight, zero point, and scale initializers
|
|
zero_scale_shape = [initializer.dims[channel_axis]]
|
|
scale_initializer = onnx.helper.make_tensor(
|
|
scale_name, onnx_proto.TensorProto.FLOAT, zero_scale_shape, scale_list
|
|
)
|
|
zero_initializer = onnx.helper.make_tensor(zp_name, weight_qType, zero_scale_shape, zero_point_list)
|
|
|
|
self.model.initializer().extend([scale_initializer, zero_initializer])
|
|
|
|
if not keep_float_weight:
|
|
quantized_weights = np.asarray(
|
|
quantized_weights,
|
|
dtype=onnx.mapping.TENSOR_TYPE_TO_NP_TYPE[weight_qType],
|
|
).reshape(initializer.dims)
|
|
q_weight_initializer = onnx.numpy_helper.from_array(quantized_weights, q_weight_name)
|
|
self.model.initializer().extend([q_weight_initializer])
|
|
|
|
return q_weight_name, zp_name, scale_name
|
|
|
|
def _dequantize_value(self, value_name):
|
|
"""
|
|
Given a value (input/output) which is quantized, add a DequantizeLinear node to dequantize
|
|
it back to float32
|
|
parameter value_name: value to dequantize
|
|
parameter new_nodes_list: List of new nodes created before processing current node
|
|
return: None if there is already a DequantizeLinear node that dequantizes it
|
|
A DequantizeLinear node otherwise
|
|
"""
|
|
if (value_name in self.quantized_value_map) and (value_name not in self.generated_value_names):
|
|
quantized_value = self.quantized_value_map[value_name]
|
|
# Add DequantizeLinear Node for this input
|
|
dqlinear_name = value_name + "_DequantizeLinear"
|
|
dqlinear_node = self.model.find_node_by_name(dqlinear_name, self.new_nodes, self.model.graph())
|
|
if dqlinear_node is None:
|
|
dqlinear_inputs = [
|
|
quantized_value.q_name,
|
|
quantized_value.scale_name,
|
|
quantized_value.zp_name,
|
|
]
|
|
dequantize_node = onnx.helper.make_node(
|
|
"DequantizeLinear", dqlinear_inputs, [value_name], dqlinear_name
|
|
)
|
|
return dequantize_node
|
|
else:
|
|
# DQ op is already present, assert it's output matches the input of current node
|
|
assert value_name == dqlinear_node.output[0]
|
|
return None
|
|
|
|
def _dequantize_outputs(self):
|
|
"""
|
|
Dequantize output if it is quantized
|
|
parameter new_nodes_list: List of new nodes created before processing current node
|
|
return: List of new nodes created
|
|
"""
|
|
|
|
for output in self.model.graph().output:
|
|
dequantize_node = self._dequantize_value(output.name)
|
|
if dequantize_node is not None:
|
|
self.new_nodes.append(dequantize_node)
|
|
|
|
def calculate_quantization_params(self):
|
|
if self.tensors_range is None:
|
|
return
|
|
|
|
# adjust tensor_ranges for input of Clip and Relu node
|
|
for node in self.model.nodes():
|
|
if node.op_type not in ["Clip", "Relu"]:
|
|
continue
|
|
if self.is_activation_symmetric:
|
|
continue
|
|
if not self.should_quantize_node(node):
|
|
continue
|
|
if len(self.model.input_name_to_nodes()[node.input[0]]) != 1:
|
|
continue
|
|
if node.input[0] not in self.tensors_range.keys() or node.output[0] not in self.tensors_range.keys():
|
|
continue
|
|
self.tensors_range[node.input[0]] = self.tensors_range[node.output[0]]
|
|
quantization_params = {}
|
|
for tensor_name in self.tensors_range.keys():
|
|
rmin, rmax = self.tensors_range[tensor_name]
|
|
qmin, qmax = get_qmin_qmax_for_qType(self.activation_qType, symmetric=self.is_activation_symmetric)
|
|
|
|
quantization_params[tensor_name] = compute_scale_zp(rmin, rmax, qmin, qmax, self.is_activation_symmetric)
|
|
|
|
return quantization_params
|