[tfdbg] Implement random file read & DebugDataReader; Simplify & improve tests.

- Support offset-based random read access in DebugEventsReader
- Support yielding offsets from the iterators of DebugEventsReader to enable subsequent random-access reading
- Check the tensor ID in the debug tensor values under the CURT_HEALTH, CONCISE_HEALTH and SHAPE modes: Tackling multiple TODO items.
- Use new DebugDataReader in tests to simplify code.

Per design for scalable reading of large tfdbg v2 datasets:
- Create light-weight digest classes: ExecutionDigest and GraphExecutionTraceDigest
  - Loaded by DebugDataReader.executions() and .graph_execution_traces() with
    kwarg digest=True.
- Corresponding detailed data classes: Execution and GraphExecutionTrace.
- Other data classes:
  - DebuggedGraph
  - GraphOpCreationDigest

PiperOrigin-RevId: 286955104
Change-Id: I750fc085fd75a7df11637413389b68dd0a6733c6
This commit is contained in:
Shanqing Cai 2019-12-23 16:04:06 -08:00 committed by TensorFlower Gardener
parent f92fc5d442
commit e724d9e162
7 changed files with 1522 additions and 662 deletions

View File

@ -162,6 +162,7 @@ message GraphOpCreation {
string graph_name = 3;
// Unique ID of the graph (generated by debugger).
// This is the ID of the immediately-enclosing graph.
string graph_id = 4;
// Name of the device that the op is assigned to (if available).

View File

@ -120,6 +120,7 @@ py_library(
deps = [
"//tensorflow/core:protos_all_py",
"//tensorflow/python:framework",
"@six_archive//:six",
],
)

View File

@ -18,17 +18,24 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import glob
import os
import threading
from six.moves import map
import six
from tensorflow.core.protobuf import debug_event_pb2
from tensorflow.python.lib.io import tf_record
from tensorflow.python import pywrap_tensorflow
from tensorflow.python.framework import errors
from tensorflow.python.framework import tensor_util
from tensorflow.python.util import compat
DebugEventWithOffset = collections.namedtuple(
"DebugEventWithOffset", "debug_event offset")
class DebugEventsReader(object):
"""Reader class for a tfdbg v2 DebugEvents directory."""
@ -56,6 +63,8 @@ class DebugEventsReader(object):
self._readers = dict() # A map from file path to reader.
self._readers_lock = threading.Lock()
self._offsets = dict()
def __enter__(self):
return self
@ -64,15 +73,48 @@ class DebugEventsReader(object):
self.close()
def _generic_iterator(self, file_path):
"""A helper method that makes an iterator given a debug-events file path."""
"""A helper method that makes an iterator given a debug-events file path.
Repeated calls to this method create iterators that remember the last
successful reading position (offset) for each given `file_path`. So the
iterators are meant for incremental reading of the file.
Args:
file_path: Path to the file to create the iterator for.
Yields:
A tuple of (offset, debug_event_proto) on each `next()` call.
"""
# The following code uses the double-checked locking pattern to optimize
# the common case (where the reader is already initialized).
if file_path not in self._readers: # 1st check, without lock.
with self._readers_lock:
if file_path not in self._readers: # 2nd check, with lock.
self._readers[file_path] = tf_record.tf_record_iterator(file_path)
with errors.raise_exception_on_not_ok_status() as status:
# TODO(b/136474806): Use tf_record.tf_record_iterator() once it
# supports offset.
self._readers[file_path] = pywrap_tensorflow.PyRecordReader_New(
compat.as_bytes(file_path), 0, b"", status)
reader = self._readers[file_path]
while True:
offset = reader.offset()
try:
reader.GetNext()
except (errors.DataLossError, errors.OutOfRangeError):
# We ignore partial read exceptions, because a record may be truncated.
# PyRecordReader holds the offset prior to the failed read, so retrying
# will succeed.
break
yield DebugEventWithOffset(
debug_event=debug_event_pb2.DebugEvent.FromString(reader.record()),
offset=offset)
return map(debug_event_pb2.DebugEvent.FromString, self._readers[file_path])
def _create_offset_reader(self, file_path, offset):
with errors.raise_exception_on_not_ok_status() as status:
# TODO(b/136474806): Use tf_record.tf_record_iterator() once it
# supports ofset.
return pywrap_tensorflow.PyRecordReader_New(
file_path, offset, b"", status)
def metadata_iterator(self):
return self._generic_iterator(self._metadata_path)
@ -86,12 +128,839 @@ class DebugEventsReader(object):
def graphs_iterator(self):
return self._generic_iterator(self._graphs_path)
def read_graphs_event(self, offset):
"""Read a DebugEvent proto at a given offset from the .graphs file.
Args:
offset: Offset to read the DebugEvent proto from.
Returns:
A DebugEventProto.
Raises:
`errors.DataLossError` if offset is at a wrong location.
`errors.OutOfRangeError` if offset is out of range of the file.
"""
# TODO(cais): After switching to new Python wrapper of tfrecord reader,
# use seeking instead of repeated file opening. Same below.
reader = self._create_offset_reader(self._graphs_path, offset)
reader.GetNext()
debug_event = debug_event_pb2.DebugEvent.FromString(reader.record())
reader.Close()
return debug_event
def execution_iterator(self):
return self._generic_iterator(self._execution_path)
def read_execution_debug_event(self, offset):
"""Read a DebugEvent proto at a given offset from the .execution file.
Args:
offset: Offset to read the DebugEvent proto from.
Returns:
A DebugEventProto.
Raises:
`errors.DataLossError` if offset is at a wrong location.
`errors.OutOfRangeError` if offset is out of range of the file.
"""
reader = self._create_offset_reader(self._execution_path, offset)
reader.GetNext()
debug_event = debug_event_pb2.DebugEvent.FromString(reader.record())
reader.Close()
return debug_event
def graph_execution_traces_iterator(self):
return self._generic_iterator(self._graph_execution_traces_path)
def read_graph_execution_traces_event(self, offset):
"""Read DebugEvent at given offset from .graph_execution_traces file.
Args:
offset: Offset to read the DebugEvent proto from.
Returns:
A DebugEventProto.
Raises:
`errors.DataLossError` if offset is at a wrong location.
`errors.OutOfRangeError` if offset is out of range of the file.
"""
reader = self._create_offset_reader(
self._graph_execution_traces_path, offset)
reader.GetNext()
debug_event = debug_event_pb2.DebugEvent.FromString(reader.record())
reader.Close()
return debug_event
def close(self):
with self._readers_lock:
self._readers.clear()
for reader in self._readers.values():
reader.Close()
class BaseDigest(object):
"""Base class for digest.
Properties:
wall_time: A timestamp for the digest (unit: s).
offset: A offset number in the corresponding file that can be used for
fast random read access.
"""
def __init__(self, wall_time, offset):
self._wall_time = wall_time
self._offset = offset
@property
def wall_time(self):
return self._wall_time
@property
def offset(self):
return self._offset
class ExecutionDigest(BaseDigest):
"""Light-weight digest summarizing top-level execution event.
Use `DebugDataReader.read_execution(execution_digest)` to load the more
detailed data object concerning the execution event (`Execution`).
Properties:
op_type: Type name of the executed op. In the case of the eager execution of
an individual op, it is the name of the op (e.g., "MatMul").
In the case of the execution of a tf.function (FuncGraph), this is the
internally-generated name of the function (e.g.,
"__inference_my_func_123").
"""
def __init__(self,
wall_time,
offset,
op_type):
super(ExecutionDigest, self).__init__(wall_time, offset)
self._op_type = op_type
@property
def op_type(self):
return self._op_type
# TODO(cais): Implement to_json().
class Execution(ExecutionDigest):
"""Detailed data relating to a top-level execution event.
The execution is of an individual op or a tf.function, which may have any
number of output tensors.
Properties (beyond the base class `ExecutionDigest`):
stack_frame_ids: Reference IDs for stack frames, ordered from bottommost to
topmost. Use `DebugDataReader.read_execution_stack_trace()` to load the
detailed stack frames (filepath, lineno and function name).
tensor_debug_mode: TensorDebugMode enum value, as an `int`.
graph_id: ID of the executed FuncGraph (applicable only the execution of a
tf.function). `None` for the eager execution of an individual op.
input_tensor_ids: IDs of the input (eager) tensor(s) for this execution, if
any.
output_tensor_ids: IDs of the output (eager) tensor(s) from this execution,
if any.
debug_tensor_values: Values of the debug tensor(s), applicable only to
non-FULL_TENSOR tensor debug mode. A tuple of list of numbers. Each
element of the tuple corresponds to an output tensor of the execution.
See documentation of the various TensorDebugModes for the semantics of the
numbers.
"""
def __init__(self,
execution_digest,
stack_frame_ids,
tensor_debug_mode,
graph_id=None,
input_tensor_ids=None,
output_tensor_ids=None,
debug_tensor_values=None):
super(Execution, self).__init__(
execution_digest.wall_time,
execution_digest.offset,
execution_digest.op_type)
self._stack_frame_ids = stack_frame_ids
self._tensor_debug_mode = tensor_debug_mode
self._graph_id = graph_id
self._input_tensor_ids = input_tensor_ids
self._output_tensor_ids = output_tensor_ids
self._debug_tensor_values = debug_tensor_values
@property
def stack_frame_ids(self):
return self._stack_frame_ids
@property
def tensor_debug_mode(self):
return self._tensor_debug_mode
@property
def graph_id(self):
return self._graph_id
@property
def input_tensor_ids(self):
return self._input_tensor_ids
@property
def num_outputs(self):
return len(self._output_tensor_ids)
@property
def output_tensor_ids(self):
return self._output_tensor_ids
@property
def debug_tensor_values(self):
return self._debug_tensor_values
# TODO(cais): Implement to_json().
class DebuggedGraph(object):
"""Data object representing debugging information about a tf.Graph.
Includes `FuncGraph`s.
Properties:
name: Name of the graph (if any). May be `None` for non-function graphs.
graph_id: Debugger-generated ID for the graph.
inner_graph_ids: A list of the debugger-generated IDs for the graphs
enclosed by this graph.
outer_graph_id: If this graph is nested within an outer graph, ID of the
outer graph. If this is an outermost graph, `None`.
"""
def __init__(self,
name,
graph_id,
outer_graph_id=None):
self._name = name
self._graph_id = graph_id
self._outer_graph_id = outer_graph_id
self._inner_graph_ids = []
# A dictionary from op name to GraphOpCreationDigest.
self._op_by_name = dict()
def add_inner_graph_id(self, inner_graph_id):
"""Add the debugger-generated ID of a graph nested within this graph.
Args:
inner_graph_id: The debugger-generated ID of the nested inner graph.
"""
assert isinstance(inner_graph_id, six.string_types)
self._inner_graph_ids.append(inner_graph_id)
def add_op(self, graph_op_creation_digest):
"""Add an op creation data object.
Args:
graph_op_creation_digest: A GraphOpCreationDigest data object describing
the creation of an op inside this graph.
"""
assert graph_op_creation_digest.op_name not in self._op_by_name
self._op_by_name[
graph_op_creation_digest.op_name] = graph_op_creation_digest
@property
def name(self):
return self._name
@property
def graph_id(self):
return self._graph_id
@property
def outer_graph_id(self):
return self._outer_graph_id
@property
def inner_graph_ids(self):
return self._inner_graph_ids
def get_op_type(self, op_name):
return self._op_by_name[op_name].op_type
def get_tensor_id(self, op_name, output_slot):
"""Get the ID of a symbolic tensor in this graph."""
return self._op_by_name[op_name].output_tensor_ids[output_slot]
# TODO(cais): Implement to_json().
class GraphOpCreationDigest(BaseDigest):
"""Data object describing the creation of an op inside a graph.
For size efficiency, this digest object does not contain any stack frames or
any references to them. To obtain the stack frames, use
`DataReader.read_graph_op_creation_stack_trace()`.
Properties (beyond the base class):
graph_id: Debugger-generated ID of the immediately-enclosing graph.
op_type: Type name of the op (e.g., "MatMul").
op_name: Name of the op (e.g., "dense_1/MatMul").
output_tensor_ids: Debugger-generated IDs for the output(s) of the op.
input_names: Names of the input tensors to the op.
device_name: The name of the device that the op is placed on (if available).
"""
def __init__(self,
wall_time,
offset,
graph_id,
op_type,
op_name,
output_tensor_ids,
input_names=None,
device_name=None):
super(GraphOpCreationDigest, self).__init__(wall_time, offset)
self._graph_id = graph_id
self._op_type = op_type
self._op_name = op_name
self._output_tensor_ids = output_tensor_ids
self._input_names = input_names
self._device_name = device_name
@property
def graph_id(self):
return self._graph_id
@property
def op_type(self):
return self._op_type
@property
def op_name(self):
return self._op_name
@property
def output_tensor_ids(self):
return self._output_tensor_ids
@property
def num_outputs(self):
return len(self._output_tensor_ids)
@property
def input_names(self):
return self._input_names
@property
def device_name(self):
return self._device_name
# TODO(cais): Implement to_json().
class GraphExecutionTraceDigest(BaseDigest):
"""Light-weight summary of a intra-graph tensor execution event.
Use `DebugDataReader.read_graph_execution_trace()` on this object to read more
detailed data (`GraphExecutionTrace`).
Properties (beyond the base class):
op_type: Type name of the executed op (e.g., "Conv2D").
op_name: Name of the op (e.g., "conv_2d_3/Conv2D").
output_slot: Output slot index of the tensor.
"""
def __init__(self,
wall_time,
offset,
op_type,
op_name,
output_slot):
super(GraphExecutionTraceDigest, self).__init__(wall_time, offset)
self._op_type = op_type
self._op_name = op_name
self._output_slot = output_slot
@property
def op_type(self):
return self._op_type
@property
def op_name(self):
return self._op_name
@property
def output_slot(self):
return self._output_slot
# TODO(cais): Implement to_json().
class GraphExecutionTrace(GraphExecutionTraceDigest):
"""Detailed data object describing an intra-graph tensor execution.
Attributes (in addition to GraphExecutionTraceDigest):
graph_ids: The debugger-generated IDs of the graphs that enclose the
executed op (tensor), ordered from the outermost to the innermost.
graph_id: The debugger-generated ID of the innermost (immediately-enclosing)
graph.
tensor_debug_mode: TensorDebugMode enum value.
debug_tensor_value: Debug tensor values (only for non-FULL_TENSOR
tensor_debug_mode). A list of numbers. See the documentation of the
TensorDebugModes for the semantics of the numbers.
device_name: Device on which the tensor resides (if available)
"""
def __init__(self,
graph_execution_trace_digest,
graph_ids,
tensor_debug_mode,
debug_tensor_value=None,
device_name=None):
super(GraphExecutionTrace, self).__init__(
graph_execution_trace_digest.wall_time,
graph_execution_trace_digest.offset,
graph_execution_trace_digest.op_type,
graph_execution_trace_digest.op_name,
graph_execution_trace_digest.output_slot)
self._graph_ids = graph_ids
self._tensor_debug_mode = tensor_debug_mode
self._debug_tensor_value = debug_tensor_value
self._device_name = device_name
@property
def graph_ids(self):
return self._graph_ids
@property
def graph_id(self):
return self._graph_ids[-1]
@property
def tensor_debug_mode(self):
return self._tensor_debug_mode
@property
def debug_tensor_value(self):
return self._debug_tensor_value
@property
def device_name(self):
return self._device_name
# TODO(cais): Implement to_json().
def _parse_tensor_value(tensor_proto, return_list=False):
"""Helper method for reading a tensor value from a tensor proto.
The rationale for the distinction between `True` and `False value of
`return_list` is as follows:
- `return_list=True` is used for TensorDebugMode values other than
FULL_TENSOR, e.g., CONCISE_HEALTH, SHAPE and FULL_HEATLH. Under
those modes, the value is guaranteed (by contract) to be a 1D float64
tensor.
- `return_list=False` is used for the FULL_HEALTH TensorDebugMode
specifically. Instead, we use `numpy.ndarray` to maximally preserve
the shape, dtype and value information regarding the underlying tensor
value. Under that mode, we don't use a python list to represent the
tensor value because that can lead to loss of information (e.g., both
float16 and float32 dtypes get mapped to Python floats).
Args:
tensor_proto: The TensorProto instance from which the tensor value will be
loaded.
return_list: Whether the return value will be a nested Python list that
comes out from `numpy.ndarray.tolist()`.
Returns:
If parsing is successful, the tensor value as a `numpy.ndarray` or the
nested Python list converted from it.
If parsing fails, `None`.
"""
try:
ndarray = tensor_util.MakeNdarray(tensor_proto)
return ndarray.tolist() if return_list else ndarray
except TypeError:
# Depending on tensor_debug_mode, certain dtype of tensors don't
# have logged debug tensor values.
return None
class DebugDataReader(object):
"""A reader that reads structured debugging data in the tfdbg v2 format.
The set of data read by an object of this class concerns the execution history
of a tfdbg2-instrumented TensorFlow program.
Note:
- An object of this class incrementally reads data from files that belong to
the tfdbg v2 DebugEvent file set. Calling `update()` triggers the reading
from the last-successful reading positions in the files.
- This object can be used as a context manager. Its `__exit__()` call
closes the file readers cleanly.
"""
def __init__(self, dump_root):
self._reader = DebugEventsReader(dump_root)
# TODO(cais): Implement pagination for memory constraints.
self._execution_digests = []
# A list of (host_name, file_path) tuples.
self._host_name_file_paths = []
# A dict mapping id to (host_name, file_path, lineno, func) tuple.
self._stack_frame_by_id = dict()
# Stores unprocessed stack frame IDs. This is necessary to handle the
# case in which reading of the .stack_frames file gets ahead of the reading
# of the .source_files file.
self._unprocessed_stack_frames = dict()
# A dict mapping id to DebuggedGraph objects.
self._graph_by_id = dict()
self._graph_op_digests = []
# TODO(cais): Implement pagination for memory constraints.
self._graph_execution_trace_digests = []
# The following timestamps keep track where we've reached in each
# file of the DebugEvent source file, so that we don't run into race
# conditions with the writer.
self._source_files_timestamp = 0
# Temporary object used to hold DebugEvent protos with stack_frames
# field that has been read beyond max_wall_time.
# self._last_successful_stack_frames_offset = -1 # TODO(cais): Fix.
# TODO(cais): Read metadata.
def _load_source_files(self):
"""Incrementally read the .source_files DebugEvent file."""
source_files_iter = self._reader.source_files_iterator()
for debug_event, _ in source_files_iter:
source_file = debug_event.source_file
self._host_name_file_paths.append(
(source_file.host_name, source_file.file_path))
self._source_file_timestamp = debug_event.wall_time
def _load_stack_frames(self):
"""Incrementally read the .stack_frames file.
This must be called after _load_source_files().
It assumes that the following contract is honored by the writer of the tfdbg
v2 data file set:
- Before a stack frame is written to the .stack_frames file, the
corresponding source file information must have been written to the
.source_files file first.
"""
stack_frames_iter = self._reader.stack_frames_iterator()
for debug_event, _ in stack_frames_iter:
stack_frame_with_id = debug_event.stack_frame_with_id
file_line_col = stack_frame_with_id.file_line_col
self._unprocessed_stack_frames[stack_frame_with_id.id] = file_line_col
# We do the processing in a separate stage, because the reading in the
# .source_files file may sometimes get ahead of the .source_files file.
unprocessed_stack_frame_ids = tuple(self._unprocessed_stack_frames.keys())
for stack_frame_id in unprocessed_stack_frame_ids:
file_line_col = self._unprocessed_stack_frames[stack_frame_id]
if len(self._host_name_file_paths) > file_line_col.file_index:
self._stack_frame_by_id[stack_frame_id] = (
self._host_name_file_paths[file_line_col.file_index][0],
self._host_name_file_paths[file_line_col.file_index][1],
file_line_col.line,
file_line_col.func)
del self._unprocessed_stack_frames[stack_frame_id]
def _load_graphs(self):
"""Incrementally read the .graphs file.
Compiles the DebuggedGraph and GraphOpCreation data.
"""
graphs_iter = self._reader.graphs_iterator()
for debug_event, offset in graphs_iter:
if debug_event.graph_op_creation.ByteSize():
op_creation_proto = debug_event.graph_op_creation
op_digest = GraphOpCreationDigest(
debug_event.wall_time,
offset,
op_creation_proto.graph_id,
op_creation_proto.op_type,
op_creation_proto.op_name,
tuple(op_creation_proto.output_tensor_ids),
input_names=tuple(op_creation_proto.input_names))
self._graph_op_digests.append(op_digest)
self._graph_by_id[op_creation_proto.graph_id].add_op(op_digest)
elif debug_event.debugged_graph.ByteSize():
graph_proto = debug_event.debugged_graph
graph = DebuggedGraph(
graph_proto.graph_name or None,
graph_proto.graph_id,
outer_graph_id=graph_proto.outer_context_id or None)
self._graph_by_id[graph_proto.graph_id] = graph
if graph_proto.outer_context_id:
self._graph_by_id[
graph_proto.outer_context_id].add_inner_graph_id(graph.graph_id)
def _load_graph_execution_traces(self):
"""Incrementally load the .graph_execution_traces file."""
traces_iter = self._reader.graph_execution_traces_iterator()
for debug_event, offset in traces_iter:
trace_proto = debug_event.graph_execution_trace
op_name = trace_proto.op_name
op_type = self._lookup_op_type(trace_proto.tfdbg_context_id, op_name)
digest = GraphExecutionTraceDigest(
debug_event.wall_time,
offset,
op_type,
op_name,
trace_proto.output_slot)
self._graph_execution_trace_digests.append(digest)
def _lookup_op_type(self, graph_id, op_name):
"""Lookup the type of an op by name and the immediately enclosing graph.
Args:
graph_id: Debugger-generated ID of the immediately-enclosing graph.
op_name: Name of the op.
Returns:
Op type as a str.
"""
return self._graph_by_id[graph_id].get_op_type(op_name)
def _load_execution(self):
"""Incrementally read the .execution file."""
execution_iter = self._reader.execution_iterator()
for debug_event, offset in execution_iter:
self._execution_digests.append(ExecutionDigest(
debug_event.wall_time,
offset,
debug_event.execution.op_type))
def update(self):
"""Perform incremental read of the file set."""
self._load_source_files()
self._load_stack_frames()
self._load_graphs()
self._load_graph_execution_traces()
self._load_execution()
def outermost_graphs(self):
"""Get the number of outer most graphs read so far."""
return [graph for graph in self._graph_by_id.values()
if not graph.outer_graph_id]
def graph_by_id(self, graph_id):
"""Get a DebuggedGraph object by its ID."""
return self._graph_by_id[graph_id]
def graph_op_digests(self, op_type=None):
"""Get the list of the digests for graph-op creation so far.
Args:
op_type: Optional op type to filter the creation events with.
Returns:
A list of `GraphOpCreationDigest` objects.
"""
if op_type is not None:
return [digest for digest in self._graph_op_digests
if digest.op_type == op_type]
else:
return self._graph_op_digests
def graph_execution_traces(self, digest=False):
"""Get all the intra-graph execution tensor traces read so far.
TODO(cais): Support begin and end to enable partial loading.
Args:
digest: Whether the results will be returned in the more light-weight
digest form.
Returns:
If `digest`: a `list` of `GraphExecutionTraceDigest` objects.
Else: a `list` of `GraphExecutionTrace` objects.
"""
if digest:
return self._graph_execution_trace_digests
else:
return [self.read_graph_execution_trace(digest)
for digest in self._graph_execution_trace_digests]
def num_graph_execution_traces(self):
"""Get the number of graph execution traces read so far."""
return len(self._graph_execution_trace_digests)
def executions(self, digest=False):
"""Get `Execution`s or `ExecutionDigest`s this reader has read so far.
# TODO(cais): Support begin index and end index to support partial loading.
Args:
digest: Whether the results are returned in a digest form, i.e.,
`ExecutionDigest` format, instead of the more detailed `Execution`
format.
Returns:
If `digest`: a `list` of `ExecutionDigest` objects.
Else: a `list` of `Execution` objects.
"""
if digest:
return self._execution_digests
else:
# TODO(cais): Optimizer performance removing repeated file open/close.
return [self.read_execution(digest) for digest in self._execution_digests]
def num_executions(self):
"""Get the number of execution events read so far."""
return len(self._execution_digests)
def read_execution(self, execution_digest):
"""Read a detailed Execution object."""
debug_event = self._reader.read_execution_debug_event(
execution_digest.offset)
execution_proto = debug_event.execution
debug_tensor_values = None
if (execution_proto.tensor_debug_mode ==
debug_event_pb2.TensorDebugMode.FULL_TENSOR):
pass # TODO(cais): Build tensor store.
elif (execution_proto.tensor_debug_mode !=
debug_event_pb2.TensorDebugMode.NO_TENSOR):
debug_tensor_values = []
for tensor_proto in execution_proto.tensor_protos:
# TODO(cais): Refactor into a helper method.
debug_tensor_values.append(
_parse_tensor_value(tensor_proto, return_list=True))
return Execution(
execution_digest,
tuple(execution_proto.code_location.stack_frame_ids),
execution_proto.tensor_debug_mode,
graph_id=execution_proto.graph_id,
input_tensor_ids=tuple(execution_proto.input_tensor_ids),
output_tensor_ids=tuple(execution_proto.output_tensor_ids),
debug_tensor_values=tuple(
debug_tensor_values) if debug_tensor_values else None)
def read_graph_execution_trace(self, graph_execution_trace_digest):
"""Read the detailed graph execution trace.
Args:
graph_execution_trace_digest: A `GraphExecutionTraceDigest` object.
Returns:
The corresponding `GraphExecutionTrace` object.
"""
debug_event = self._reader.read_graph_execution_traces_event(
graph_execution_trace_digest.offset)
trace_proto = debug_event.graph_execution_trace
graph_ids = [trace_proto.tfdbg_context_id]
# Exhaust the outer contexts (graphs).
while True:
graph = self.graph_by_id(graph_ids[0])
if graph.outer_graph_id:
graph_ids.insert(0, graph.outer_graph_id)
else:
break
debug_tensor_value = None
if (trace_proto.tensor_debug_mode ==
debug_event_pb2.TensorDebugMode.FULL_TENSOR):
pass # TODO(cais): Build tensor store.
else:
debug_tensor_value = _parse_tensor_value(
trace_proto.tensor_proto, return_list=True)
return GraphExecutionTrace(
graph_execution_trace_digest,
graph_ids=graph_ids,
tensor_debug_mode=trace_proto.tensor_debug_mode,
debug_tensor_value=debug_tensor_value,
device_name=trace_proto.device_name or None)
def read_execution_stack_trace(self, execution):
"""Read the stack trace of a given Execution object.
Args:
execution: The Execution object of interest.
Returns:
A tuple consisting of:
1. The host name.
2. The stack trace, as a list of (file_path, lineno, func) tuples.
"""
host_name = self._stack_frame_by_id[execution.stack_frame_ids[0]][0]
return (host_name, [
self._stack_frame_by_id[frame_id][1:]
for frame_id in execution.stack_frame_ids])
def read_graph_op_creation_stack_trace(self, graph_op_creation_digest):
"""Read the stack trace of a given graph op creation object.
Args:
graph_op_creation_digest: The GraphOpCreationDigest object of interest.
Returns:
A tuple consisting of:
1. The host name.
2. The stack trace, as a list of (file_path, lineno, func) tuples.
"""
debug_event = self._reader.read_graphs_event(
graph_op_creation_digest.offset)
graph_op_creation = debug_event.graph_op_creation
host_name = graph_op_creation.code_location.host_name
return host_name, [
self._stack_frame_by_id[frame_id][1:]
for frame_id in graph_op_creation.code_location.stack_frame_ids]
# TODO(cais): Add graph_execution_digests() with an ExecutionDigest
# as a kwarg, to establish the association between top-level and intra-graph
# execution events.
def execution_to_tensor_values(self, execution):
"""Read the full tensor values from an Execution or ExecutionDigest.
Args:
execution: An `ExecutionDigest` or `ExeuctionDigest` object.
Returns:
A list of numpy arrays representing the output tensor values of the
execution event.
"""
debug_event = self._reader.read_execution_debug_event(execution.offset)
return [_parse_tensor_value(tensor_proto)
for tensor_proto in debug_event.execution.tensor_protos]
def graph_execution_trace_to_tensor_value(self, trace):
"""Read full tensor values from an Execution or ExecutionDigest.
Args:
trace: An `GraphExecutionTraceDigest` or `GraphExecutionTrace` object.
Returns:
A numpy array representing the output tensor value of the intra-graph
tensor execution event.
"""
debug_event = self._reader.read_graph_execution_traces_event(trace.offset)
return _parse_tensor_value(debug_event.graph_execution_trace.tensor_proto)
def symbolic_tensor_id(self, graph_id, op_name, output_slot):
"""Get the ID of a symbolic tensor.
Args:
graph_id: The ID of the immediately-enclosing graph.
op_name: Name of the op.
output_slot: Output slot as an int.
Returns:
The ID of the symbolic tensor as an int.
"""
return self._graph_by_id[graph_id].get_tensor_id(op_name, output_slot)
def graph_execution_trace_to_tensor_id(self, trace):
"""Get symbolic tensor ID from a GraphExecutoinTraceDigest object."""
return self.symbolic_tensor_id(
trace.graph_id, trace.op_name, trace.output_slot)
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
del exception_type, exception_value, traceback # Unused
self._reader.close()

View File

@ -76,20 +76,20 @@ class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase):
writer.FlushNonExecutionFiles()
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
actuals = list(reader.source_files_iterator())
actuals = list(item.debug_event.source_file
for item in reader.source_files_iterator())
self.assertLen(actuals, num_protos)
for i in range(num_protos):
self.assertEqual(actuals[i].source_file.file_path,
"/home/tf2user/main.py")
self.assertEqual(actuals[i].source_file.host_name, "machine.cluster")
self.assertEqual(actuals[i].source_file.lines, ["print(%d)" % i])
self.assertEqual(actuals[i].file_path, "/home/tf2user/main.py")
self.assertEqual(actuals[i].host_name, "machine.cluster")
self.assertEqual(actuals[i].lines, ["print(%d)" % i])
actuals = list(reader.stack_frames_iterator())
actuals = list(item.debug_event.stack_frame_with_id
for item in reader.stack_frames_iterator())
self.assertLen(actuals, num_protos)
for i in range(num_protos):
self.assertEqual(actuals[i].stack_frame_with_id.id, "stack_%d" % i)
self.assertEqual(
actuals[i].stack_frame_with_id.file_line_col.file_index, i * 10)
self.assertEqual(actuals[i].id, "stack_%d" % i)
self.assertEqual(actuals[i].file_line_col.file_index, i * 10)
def testWriteGraphOpCreationAndDebuggedGraphs(self):
writer = debug_events_writer.DebugEventsWriter(self.dump_root)
@ -106,7 +106,7 @@ class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase):
writer.FlushNonExecutionFiles()
reader = debug_events_reader.DebugEventsReader(self.dump_root)
actuals = list(reader.graphs_iterator())
actuals = list(item.debug_event for item in reader.graphs_iterator())
self.assertLen(actuals, num_op_creations + 1)
for i in range(num_op_creations):
self.assertEqual(actuals[i].graph_op_creation.op_type, "Conv2D")
@ -172,24 +172,24 @@ class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase):
# Verify the content of the .source_files file.
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
source_files_iter = reader.source_files_iterator()
actuals = list(source_files_iter)
file_paths = sorted([actual.source_file.file_path for actual in actuals])
actuals = list(item.debug_event.source_file for item in source_files_iter)
file_paths = sorted([actual.file_path for actual in actuals])
self.assertEqual(file_paths, [
"/home/tf2user/file_0.py", "/home/tf2user/file_1.py",
"/home/tf2user/file_2.py"
])
# Verify the content of the .stack_frames file.
actuals = list(reader.stack_frames_iterator())
stack_frame_ids = sorted(
[actual.stack_frame_with_id.id for actual in actuals])
actuals = list(item.debug_event.stack_frame_with_id
for item in reader.stack_frames_iterator())
stack_frame_ids = sorted([actual.id for actual in actuals])
self.assertEqual(stack_frame_ids,
["stack_frame_0", "stack_frame_1", "stack_frame_2"])
# Verify the content of the .graphs file.
actuals = list(reader.graphs_iterator())
graph_op_names = sorted(
[actual.graph_op_creation.op_name for actual in actuals])
actuals = list(item.debug_event.graph_op_creation
for item in reader.graphs_iterator())
graph_op_names = sorted([actual.op_name for actual in actuals])
self.assertEqual(graph_op_names, ["Op0", "Op1", "Op2"])
def testWriteExecutionEventsWithCircularBuffer(self):
@ -242,11 +242,12 @@ class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase):
self.assertEqual(len(actuals), 0)
writer.FlushExecutionFiles()
actuals = list(reader.graph_execution_traces_iterator())
actuals = list(item.debug_event.graph_execution_trace
for item in reader.graph_execution_traces_iterator())
self.assertLen(actuals, debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE)
for i in range(debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE):
self.assertEqual(
actuals[i].graph_execution_trace.op_name,
actuals[i].op_name,
"Op%d" % (i + debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE))
def testWriteGraphExecutionTraceEventsWithoutCircularBufferBehavior(self):
@ -260,10 +261,11 @@ class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase):
writer.FlushExecutionFiles()
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
actuals = list(reader.graph_execution_traces_iterator())
actuals = list(item.debug_event.graph_execution_trace
for item in reader.graph_execution_traces_iterator())
self.assertLen(actuals, num_execution_events)
for i in range(num_execution_events):
self.assertEqual(actuals[i].graph_execution_trace.op_name, "Op%d" % i)
self.assertEqual(actuals[i].op_name, "Op%d" % i)
def testConcurrentWritesToExecutionFiles(self):
circular_buffer_size = 5
@ -308,9 +310,9 @@ class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase):
# Verify the content of the .execution file.
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
actuals = list(reader.graph_execution_traces_iterator())
op_names = sorted(
[actual.graph_execution_trace.op_name for actual in actuals])
actuals = list(item.debug_event.graph_execution_trace
for item in reader.graph_execution_traces_iterator())
op_names = sorted([actual.op_name for actual in actuals])
self.assertLen(op_names, circular_buffer_size)
self.assertLen(op_names, len(set(op_names)))

View File

@ -88,7 +88,7 @@ class DebugIdentityV2OpTest(dumping_callback_test_lib.DumpingCallbackTestBase):
metadata_iter = reader.metadata_iterator()
# Check that the .metadata DebugEvents data file has been created, even
# before FlushExecutionFiles() is called.
debug_event = next(metadata_iter)
debug_event = next(metadata_iter).debug_event
self.assertGreater(debug_event.wall_time, 0)
self.assertTrue(debug_event.debug_metadata.tensorflow_version)
self.assertTrue(
@ -107,7 +107,7 @@ class DebugIdentityV2OpTest(dumping_callback_test_lib.DumpingCallbackTestBase):
# The circular buffer has a size of 4. So only the data from the
# last two iterations should have been written to self.dump_root.
for _ in range(2):
debug_event = next(graph_trace_iter)
debug_event = next(graph_trace_iter).debug_event
self.assertGreater(debug_event.wall_time, 0)
trace = debug_event.graph_execution_trace
self.assertEqual(trace.tfdbg_context_id, "deadbeaf")
@ -118,7 +118,7 @@ class DebugIdentityV2OpTest(dumping_callback_test_lib.DumpingCallbackTestBase):
tensor_value = tensor_util.MakeNdarray(trace.tensor_proto)
self.assertAllClose(tensor_value, [9.0, 16.0])
debug_event = next(graph_trace_iter)
debug_event = next(graph_trace_iter).debug_event
self.assertGreater(debug_event.wall_time, 0)
trace = debug_event.graph_execution_trace
self.assertEqual(trace.tfdbg_context_id, "beafdead")
@ -165,7 +165,7 @@ class DebugIdentityV2OpTest(dumping_callback_test_lib.DumpingCallbackTestBase):
x_values = []
timestamp = 0
while True:
debug_event = next(graph_trace_iter)
debug_event = next(graph_trace_iter).debug_event
self.assertGreater(debug_event.wall_time, timestamp)
timestamp = debug_event.wall_time
trace = debug_event.graph_execution_trace
@ -210,7 +210,7 @@ class DebugIdentityV2OpTest(dumping_callback_test_lib.DumpingCallbackTestBase):
with debug_events_reader.DebugEventsReader(debug_root) as reader:
graph_trace_iter = reader.graph_execution_traces_iterator()
debug_event = next(graph_trace_iter)
debug_event = next(graph_trace_iter).debug_event
trace = debug_event.graph_execution_trace
self.assertEqual(trace.tfdbg_context_id, "deadbeaf")
self.assertEqual(trace.op_name, "")

File diff suppressed because it is too large Load Diff

View File

@ -52,7 +52,7 @@ class DumpingCallbackTestBase(test_util.TensorFlowTestCase):
"""Read and check the .metadata debug-events file."""
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
metadata_iter = reader.metadata_iterator()
metadata = next(metadata_iter).debug_metadata
metadata = next(metadata_iter).debug_event.debug_metadata
self.assertEqual(metadata.tensorflow_version, versions.__version__)
self.assertTrue(metadata.file_version.startswith("debug.Event"))
@ -67,7 +67,7 @@ class DumpingCallbackTestBase(test_util.TensorFlowTestCase):
source_files_iter = reader.source_files_iterator()
source_file_paths = []
prev_wall_time = 1
for debug_event in source_files_iter:
for debug_event, _ in source_files_iter:
self.assertGreaterEqual(debug_event.wall_time, prev_wall_time)
prev_wall_time = debug_event.wall_time
source_file = debug_event.source_file
@ -84,7 +84,7 @@ class DumpingCallbackTestBase(test_util.TensorFlowTestCase):
stack_frame_by_id = collections.OrderedDict()
stack_frames_iter = reader.stack_frames_iterator()
prev_wall_time = 0
for debug_event in stack_frames_iter:
for debug_event, _ in stack_frames_iter:
self.assertGreaterEqual(debug_event.wall_time, prev_wall_time)
prev_wall_time = debug_event.wall_time
stack_frame_with_id = debug_event.stack_frame_with_id
@ -133,7 +133,7 @@ class DumpingCallbackTestBase(test_util.TensorFlowTestCase):
# outermost contexts).
context_id_to_outer_id = dict()
for debug_event in graphs_iter:
for debug_event, _ in graphs_iter:
self.assertGreaterEqual(debug_event.wall_time, prev_wall_time)
prev_wall_time = debug_event.wall_time
# A DebugEvent in the .graphs file contains either of the two fields:
@ -219,7 +219,7 @@ class DumpingCallbackTestBase(test_util.TensorFlowTestCase):
output_tensor_ids = []
tensor_debug_modes = []
tensor_values = []
for debug_event in execution_iter:
for debug_event, _ in execution_iter:
self.assertGreaterEqual(debug_event.wall_time, prev_wall_time)
prev_wall_time = debug_event.wall_time
execution = debug_event.execution
@ -260,7 +260,7 @@ class DumpingCallbackTestBase(test_util.TensorFlowTestCase):
device_names = []
output_slots = []
tensor_values = []
for debug_event in graph_execution_traces_iter:
for debug_event, _ in graph_execution_traces_iter:
self.assertGreaterEqual(debug_event.wall_time, 0)
graph_execution_trace = debug_event.graph_execution_trace
op_names.append(graph_execution_trace.op_name)