图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

591 lines
25 KiB

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import argparse
import logging
import os
import pathlib
import tempfile
from collections import deque
from enum import IntEnum
import onnx
from ..onnx_model_utils import (
get_producer_consumer_maps,
is_fixed_size_tensor,
iterate_graph_per_graph_func,
iterate_graph_per_node_func,
optimize_model,
)
class _SupportedOpsChecker:
"""
Class to process the md file with list of supported ops and caveats for an execution provider.
e.g. /tools/ci_build/github/android/nnapi_supported_ops.md
/tools/ci_build/github/apple/coreml_supported_ops.md
"""
def __init__(self, filename):
self._filename = filename
self._ops = {} # op to caveats
self._ops_seen = set()
with open(filename, "r") as f:
for line in f.readlines():
# we're looking for a markdown table with 2 columns. first is op name. second is caveats
# op name is domain:op
if line.startswith("|"):
pieces = line.strip().split("|")
if len(pieces) == 4: # pre-first '|'. op, caveat, post-last '|'
domain_op = pieces[1]
caveat = pieces[2]
caveat = caveat.replace("<br/>", " ") # remove some HTML tags
# skip lines that don't have the ':' which separates the domain and op
# e.g. the table header will fail this check
if ":" in domain_op:
self._ops[domain_op] = caveat
def is_op_supported(self, node):
domain = node.domain if node.domain else "ai.onnx"
domain_op = domain + ":" + node.op_type
is_supported = domain_op in self._ops
if is_supported:
self._ops_seen.add(domain_op)
return is_supported
def get_caveats(self):
caveats = []
for op in sorted(self._ops_seen):
caveat = self._ops[op]
if caveat:
caveats.append(f"{op}:{caveat}")
return caveats
class PartitioningInfo:
class TryWithEP(IntEnum):
NO = (0,)
MAYBE = (1,)
YES = 2
def __init__(self):
self.num_nodes = -1 # main graph only
self.num_supported_nodes = -1
self.num_partitions = -1
self.num_nodes_in_subgraphs = -1 # nodes not covered as we don't currently handle subgraphs in nnapi/coreml
self.supported_ops_checker = None
self.supported_groups = []
self.unsupported_ops = set()
self.nodes_unsupported_due_to_op = -1
self.nodes_unsupported_due_to_dynamic_input = -1
def suitability(self):
# for now add up all the nodes. if there are subgraphs, the percentage of covered nodes will be reduced by all
# nodes in the subgraphs.
num_nodes = self.num_nodes + self.num_nodes_in_subgraphs
# semi-arbitrary choices that err on the side of MAYBE.
# having 1 partition is always preferred, but if that is small it may not be useful.
# having 2 partitions may be okay if they cover most nodes
# more than 2 partitions and the device copy cost is almost guaranteed to outweight the benefit of using the NPU
# NOTE: This assumes the EP is not CPU based and there is device copy overhead to consider
pct_supported = self.num_supported_nodes / num_nodes * 100
if self.num_partitions == 1:
if pct_supported > 75:
return PartitioningInfo.TryWithEP.YES
elif pct_supported > 50:
return PartitioningInfo.TryWithEP.MAYBE
else:
return PartitioningInfo.TryWithEP.NO
if self.num_partitions == 2:
if pct_supported > 75:
return PartitioningInfo.TryWithEP.MAYBE
else:
return PartitioningInfo.TryWithEP.NO
return PartitioningInfo.TryWithEP.NO
def dump_analysis(self, logger: logging.Logger, ep_name: str):
"""
Analyze the partitioning information and log the analysis
:param logger: Logger to use
:param ep_name: Execution provider name to use in the log messages
"""
num_nodes = self.num_nodes + self.num_nodes_in_subgraphs
logger.info(
f"{self.num_partitions} partitions with a total of {self.num_supported_nodes}/{num_nodes} "
f"nodes can be handled by the {ep_name} EP."
)
if self.num_nodes_in_subgraphs:
logger.info(f"{self.num_nodes_in_subgraphs} nodes are in subgraphs, which are currently not handled.")
if self.supported_groups:
logger.info(f'Partition sizes: [{", ".join([str(len(partition)) for partition in self.supported_groups])}]')
logger.info(f"Unsupported nodes due to operator={self.nodes_unsupported_due_to_op}")
if self.nodes_unsupported_due_to_dynamic_input:
logger.info(
"Unsupported nodes due to input having a dynamic shape=%d",
self.nodes_unsupported_due_to_dynamic_input,
)
if logger.getEffectiveLevel() <= logging.DEBUG:
# Enable this manually if you need to look at specific partitions.
# for group in supported_groups:
# logger.debug(f'Nodes in group: {",".join([f"{node.name}:{node.op_type}" for node in group])}')
if self.unsupported_ops:
logger.info(f'Unsupported ops: {",".join(sorted(self.unsupported_ops))}')
caveats = self.supported_ops_checker.get_caveats()
if caveats:
indent = " " * 5
logger.debug(
"Caveats that have not been checked and may result in a node not being supported: "
f'{"".join([os.linesep + indent + caveat for caveat in caveats])}'
)
pct_nodes_using_ep = self.num_supported_nodes / num_nodes * 100
if self.num_partitions == 0:
logger.info(f"{ep_name} cannot run any nodes in this model.")
elif self.num_partitions == 1:
if pct_nodes_using_ep > 75:
logger.info(
f"{ep_name} should work well for this model as there is one partition "
f"covering {pct_nodes_using_ep:.1f}% of the nodes in the model."
)
elif pct_nodes_using_ep > 50:
logger.info(
f"{ep_name} may work well for this model, however only {pct_nodes_using_ep:.1f}% of nodes "
"will use it. Performance testing is required to validate."
)
else:
logger.info(
f"{ep_name} will probably not work will for this model as only {pct_nodes_using_ep:.2f}% "
"of nodes will use it."
)
elif self.num_partitions == 2 and pct_nodes_using_ep > 75:
logger.info(
f"{ep_name} can be considered for this model as there are two partitions "
f"covering {pct_nodes_using_ep:.1f}% of the nodes. "
"Performance testing is required to validate."
)
else:
logger.info(
f"{ep_name} is not recommended with this model as there are {self.num_partitions} partitions "
f"covering {pct_nodes_using_ep:.1f}% of the nodes in the model. "
"This will most likely result in worse performance than just using the CPU EP."
)
def check_partitioning(
graph: onnx.GraphProto,
supported_ops_checker: _SupportedOpsChecker,
require_fixed_input_sizes: bool = False,
value_info: dict = None,
):
"""
Estimate the partitions the graph will be split into for nodes that is_node_supported_fn returns true for.
The check on whether a node is supported is purely based on the operator type. Additional limitations
(e.g. NNAPI EP only supports 2D Conv) are not checked, so partitions may not be 100% accurate. The limitations
for operators in the partitions are printed so the user can manually check.
:param graph: Graph to process
:param supported_ops_checker: Checker with info on supported ops.
:param require_fixed_input_sizes: If True, require that the inputs to a potentially supported node are
fixed size tensors for it to be considered as supported.
If True, onnx.shape_inference.infer_shapes should have been run on the model
to populate the shape information.
:param value_info: Map of value name to ValueInfoProto. Required if require_fixed_input_sizes is True to lookup
the shape of a value.
:return PartitioningInfo instance with details
"""
if require_fixed_input_sizes and not value_info:
raise ValueError("value_info must be provided if require_fixed_input_sizes is True.")
node_to_producers, node_to_consumers = get_producer_consumer_maps(graph)
# initializers have fixed sizes.
# TODO: when adding subgraph support we also need to match against initializers in ancestor graphs as they are
# be accessible from the outer scope (unless shadowed locally)
initializers = [i.name for i in graph.initializer]
def _is_fixed_shape_value(value):
if value in value_info:
return is_fixed_size_tensor(value_info[value])
if value in initializers:
return True
# if something has an unknown shape (e.g. something downstream of a Reshape with dynamic input for the shape)
# it won't have an entry in value_info
return False
#
# Replicate logic from /onnxruntime/core/providers/partitioning_utils.cc:CreateSupportedPartitionNodeGroups
# to roughly estimate number of partitions for nodes that is_node_supported_fn returns true for.
#
# We keep the structure and variable names as close as possible to the C++ implementation to simplify keeping them
# in sync if future updates are needed.
#
# we don't currently support a callback for additional group closure checks in the python implementation
on_group_closed_fn = None
supported_groups = []
# number of inputs from unprocessed nodes (in-degree) per node
in_degree = {}
# nodes that are ready to process
nodes_to_process = deque() # deque of Node instances
# nodes that will be processed when considering the next partition node group
nodes_to_process_with_next_group = deque()
# initialize in-degrees and find root nodes
for node in graph.node:
node_input_edge_count = len(node_to_producers[node]) if node in node_to_producers else 0
in_degree[node] = node_input_edge_count
if node_input_edge_count == 0:
# node is only dependent on graph input or initializers
nodes_to_process.append(node)
# currently we don't support checking subgraphs in the partitioning as they're not handled by NNAPI/CoreML.
# check how many nodes are in that blind spot so we can adjust the recommendation accordingly.
# note: need to pass count in an array so that it's by reference
def _count_subgraph_nodes(cur_graph: onnx.GraphProto, original_graph: onnx.GraphProto, count: [int]):
if cur_graph != original_graph:
count[0] += len(cur_graph.node)
nodes_in_subgraphs = [0] # array with single value
iterate_graph_per_graph_func(graph, _count_subgraph_nodes, original_graph=graph, count=nodes_in_subgraphs)
supported_group = []
# the partition node group's border is the aggregate of its nodes' output nodes
supported_group_border = set()
num_supported_nodes = 0
num_unsupported_nodes_due_to_op = 0
num_unsupported_nodes_due_to_dynamic_input = 0
unsupported_ops = set()
def close_group():
if supported_group:
keep_partition = not on_group_closed_fn or on_group_closed_fn(supported_group)
if keep_partition:
supported_groups.append(supported_group.copy())
supported_group.clear()
supported_group_border.clear()
while nodes_to_process or nodes_to_process_with_next_group:
if not nodes_to_process:
close_group()
nodes_to_process = nodes_to_process_with_next_group
nodes_to_process_with_next_group = deque()
continue
node = nodes_to_process.popleft()
is_op_supported = supported_ops_checker.is_op_supported(node)
is_input_shape_supported = not require_fixed_input_sizes or all(_is_fixed_shape_value(i) for i in node.input)
is_node_supported = is_op_supported and is_input_shape_supported
if not is_node_supported:
if node in supported_group_border:
# an unsupported node on the border will be processed after the current partition node group
# so skip any additional processing/counting here
nodes_to_process_with_next_group.append(node)
continue
if not is_op_supported:
unsupported_ops.add(f'{node.domain if node.domain else "ai.onnx"}:{node.op_type}')
num_unsupported_nodes_due_to_op += 1
else:
num_unsupported_nodes_due_to_dynamic_input += 1
if is_node_supported:
num_supported_nodes += 1
# add node to the partition node group
supported_group.append(node)
# remove node from the border and add its outputs to the border
if node in supported_group_border:
supported_group_border.remove(node)
# for each consumer node add to supported_group_border
if node in node_to_consumers:
for consumer in node_to_consumers[node]:
supported_group_border.add(consumer)
# adjust in-degrees of the node outputs and add any new nodes to process
if node in node_to_consumers:
for consumer in node_to_consumers[node]:
consumer_node_in_degree = in_degree[consumer]
consumer_node_in_degree -= 1
if consumer_node_in_degree == 0:
nodes_to_process.append(consumer)
in_degree[consumer] = consumer_node_in_degree
close_group()
# find any subgraphs and check supported for nodes in the subgraphs. this won't change the partitioning as we skip
# Scan/Loop/If nodes, but will provide additional info on operators that are not supported if we changed that.
iterate_graph_per_node_func(graph, supported_ops_checker.is_op_supported)
num_nodes = len(graph.node)
num_partitions = len(supported_groups)
info = PartitioningInfo()
info.num_nodes = num_nodes
info.num_supported_nodes = num_supported_nodes
info.num_partitions = num_partitions
info.num_nodes_in_subgraphs = nodes_in_subgraphs[0]
info.supported_ops_checker = supported_ops_checker
info.supported_groups = supported_groups
info.unsupported_ops = unsupported_ops
info.nodes_unsupported_due_to_op = num_unsupported_nodes_due_to_op
info.nodes_unsupported_due_to_dynamic_input = num_unsupported_nodes_due_to_dynamic_input
return info
def _check_ep_partitioning(model, supported_ops_config, value_info: dict = None):
supported_ops = _SupportedOpsChecker(supported_ops_config)
partition_info = check_partitioning(model.graph, supported_ops, value_info is not None, value_info)
return partition_info
def check_nnapi_partitions(model, value_info: dict = None):
# if we're running in the ORT python package the file should be local. otherwise assume we're running from the
# ORT repo
script_dir = pathlib.Path(__file__).parent
local_config = script_dir / "nnapi_supported_ops.md"
if local_config.exists():
config_path = local_config
else:
ort_root = script_dir.parents[3]
config_path = ort_root / "tools" / "ci_build" / "github" / "android" / "nnapi_supported_ops.md"
return _check_ep_partitioning(model, config_path, value_info)
def check_coreml_partitions(model, value_info: dict = None):
# if we're running in the ORT python package the file should be local. otherwise assume we're running from the
# ORT repo
script_dir = pathlib.Path(__file__).parent
local_config = script_dir / "coreml_supported_ops.md"
if local_config.exists():
config_path = local_config
else:
ort_root = script_dir.parents[3]
config_path = ort_root / "tools" / "ci_build" / "github" / "apple" / "coreml_supported_ops.md"
return _check_ep_partitioning(model, config_path, value_info)
def check_shapes(graph: onnx.GraphProto, logger: logging.Logger = None):
"""
Check the shapes of graph inputs, values and graph outputs to determine if they have static or dynamic sizes.
NNAPI and CoreML do not support dynamically sized values.
:param graph: Graph to check. If shape inferencing has been run the checks on values will be meaningful.
:param logger: Optional logger for diagnostic information.
:return: Tuple of List of inputs with dynamic shapes, Number of dynamic values found
"""
# it's OK if the input is dynamically sized and we do a Resize early to a fixed size.
# it's not good if lots of ops have dynamic inputs
num_fixed_values = 0
num_dynamic_values = 0
dynamic_inputs = []
for i in graph.input:
if not is_fixed_size_tensor(i):
dynamic_inputs.append(i)
# split/join to remove repeated whitespace and newlines from str(i)
if logger:
logger.info(f"Input is not a fixed size tensor: {' '.join(str(i).split())}")
num_dynamic_values += 1
else:
num_fixed_values += 1
dynamic_outputs = []
for o in graph.output:
if not is_fixed_size_tensor(o):
dynamic_outputs.append(o)
if logger:
logger.info(f"Output is not a fixed size tensor: {' '.join(str(o).split())}")
num_dynamic_values += 1
else:
num_fixed_values += 1
# check we have value info.
# special case some test graphs with a single node which only have graph input and output values, and
# a model where all inputs are dynamic (results in no value_info)
if not graph.value_info and not (len(graph.node) == 1 or len(dynamic_inputs) == len(graph.input)):
logger.warning(
"Unable to check shapes within model. "
"ONNX shape inferencing should be run on the model prior to checking."
)
for vi in graph.value_info:
if is_fixed_size_tensor(vi):
num_fixed_values += 1
else:
num_dynamic_values += 1
if logger:
logger.info(
f"Num values with fixed shape={num_fixed_values}. " f"Num values with dynamic shape={num_dynamic_values}"
)
if dynamic_inputs:
if dynamic_outputs:
logger.info(
"Model has dynamic inputs and outputs. Consider re-exporting model with fixed sizes "
"if NNAPI or CoreML can be used with this model."
)
else:
logger.info(
"""Model has dynamically sized inputs but fixed sized outputs.
If the sizes become fixed early in the model (e.g. pre-processing of a dynamic input size
results in a fixed input size for the majority of the model) performance with NNAPI and CoreML,
if applicable, should not be significantly impacted."""
)
return dynamic_inputs, num_dynamic_values
def checker(model_path, logger: logging.Logger):
model = onnx.load(model_path)
model_with_shape_info = onnx.shape_inference.infer_shapes(model)
# create lookup map for efficiency
value_to_shape = {}
for v in model_with_shape_info.graph.input:
value_to_shape[v.name] = v
for v in model_with_shape_info.graph.output:
value_to_shape[v.name] = v
for v in model_with_shape_info.graph.value_info:
value_to_shape[v.name] = v
dynamic_inputs, num_dynamic_values = check_shapes(model_with_shape_info.graph)
def check_ep(ep_name, checker_func):
logger.info(f"Checking {ep_name}")
# check with shape info first so supported nodes takes into account values with dynamic shapes
partition_info = checker_func(model_with_shape_info, value_to_shape)
if logger.getEffectiveLevel() <= logging.DEBUG:
partition_info.dump_analysis(logger, ep_name)
suitability = partition_info.suitability()
logger.info(f"Model should perform well with {ep_name} as is: {suitability.name}")
if suitability != PartitioningInfo.TryWithEP.YES and dynamic_inputs:
logger.info("Checking if model will perform better if the dynamic shapes are fixed...")
partition_info_with_fixed_shapes = checker_func(model_with_shape_info)
if logger.getEffectiveLevel() <= logging.DEBUG:
# analyze and log detailed info
logger.info("Partition information if the model was updated to make the shapes fixed:")
partition_info_with_fixed_shapes.dump_analysis(logger, ep_name)
fixed_shape_suitability = partition_info_with_fixed_shapes.suitability()
logger.info(
f"Model should perform well with {ep_name} if modified to have fixed input shapes: "
f"{fixed_shape_suitability.name}"
)
if fixed_shape_suitability != PartitioningInfo.TryWithEP.NO:
logger.info("Shapes can be altered using python -m onnxruntime.tools.make_dynamic_shape_fixed")
if fixed_shape_suitability.value > suitability.value:
suitability = fixed_shape_suitability
return suitability
nnapi_suitability = check_ep("NNAPI", check_nnapi_partitions)
coreml_suitability = check_ep("CoreML", check_coreml_partitions)
if (
nnapi_suitability != PartitioningInfo.TryWithEP.YES or coreml_suitability != PartitioningInfo.TryWithEP.YES
) and logger.getEffectiveLevel() > logging.DEBUG:
logger.info("Re-run with log level of DEBUG for more details on the NNAPI/CoreML issues.")
logger.info("---------------")
return nnapi_suitability != PartitioningInfo.TryWithEP.NO or coreml_suitability != PartitioningInfo.TryWithEP.NO
def analyze_model(model_path: pathlib.Path, skip_optimize: bool = False, logger: logging.Logger = None):
"""
Analyze the provided model to determine if it's likely to work well with the NNAPI or CoreML Execution Providers
:param model_path: Model to analyze.
:param skip_optimize: Skip optimizing to BASIC level before checking. When exporting to ORT format we will do this
optimization..
:param logger: Logger for output
:return: True if either the NNAPI or CoreML Execution Providers may work well with this model.
"""
if not logger:
logger = logging.getLogger("usability_checker")
logger.setLevel(logging.INFO)
logger.info(f"Checking {model_path} for usability with ORT Mobile.")
with tempfile.TemporaryDirectory() as tmp:
if not skip_optimize:
tmp_path = pathlib.Path(tmp) / model_path.name
optimize_model(model_path, tmp_path)
model_path = tmp_path
try_eps = checker(str(model_path.resolve(strict=True)), logger)
return try_eps
def parse_args():
parser = argparse.ArgumentParser(
os.path.basename(__file__), description="""Analyze an ONNX model for usage with the ORT mobile"""
)
parser.add_argument(
"--log_level", choices=["debug", "info", "warning", "error"], default="info", help="Logging level"
)
parser.add_argument(
"--skip_optimize",
action="store_true",
help="Don't optimize the model to BASIC level prior to analyzing. "
"Optimization will occur when exporting the model to ORT format, so in general "
"should not be skipped unless you have a specific reason to do so.",
)
parser.add_argument("model_path", type=pathlib.Path, help="Provide path to ONNX model")
return parser.parse_args()
def run_analyze_model():
args = parse_args()
logger = logging.getLogger("default")
if args.log_level == "debug":
logger.setLevel(logging.DEBUG)
elif args.log_level == "info":
logger.setLevel(logging.INFO)
elif args.log_level == "warning":
logger.setLevel(logging.WARNING)
else:
logger.setLevel(logging.ERROR)
model_path = args.model_path.resolve()
analyze_model(model_path, args.skip_optimize, logger)
if __name__ == "__main__":
run_analyze_model()