[tfdbg2] Support reading multiple DebugEvent file sets from the same tfdbg run
TensorFlow jobs that involve multiple hosts (e.g., parameter-server setups and TPU coordinator-worker setups) can generate >1 DebugEvent file sets when instrumented with tfdbg2's `tf.debugging.experimental.enable_dump_debug_info()`. This CL adds capability to load these multiple file sets belonging to the same tfdbg_run_id to DebugEventsReader and DebugDataReader. PiperOrigin-RevId: 317765159 Change-Id: Ifcf593bd8b404e3e1c3a6f3f3be70bd6b8b73555
This commit is contained in:
parent
3cc9264e30
commit
1a3b7af373
@ -46,28 +46,37 @@ class DebugEventsReader(object):
|
||||
# penalty.
|
||||
_READER_RELEASE_PER = 100
|
||||
|
||||
_METADATA_SUFFIX = ".metadata"
|
||||
_SOURCE_FILE_SUFFIX = ".source_files"
|
||||
_STACK_FRAMES_SUFFIX = ".stack_frames"
|
||||
_GRAPHS_SUFFIX = ".graphs"
|
||||
_EXECUTION_SUFFIX = ".execution"
|
||||
_GRAPH_EXECUTION_TRACES_SUFFIX = ".graph_execution_traces"
|
||||
|
||||
def __init__(self, dump_root):
|
||||
if not file_io.is_directory(dump_root):
|
||||
raise ValueError("Specified dump_root is not a directory: %s" % dump_root)
|
||||
metadata_paths = file_io.get_matching_files(
|
||||
os.path.join(dump_root, "*.metadata"))
|
||||
if not metadata_paths:
|
||||
raise ValueError("Cannot find any metadata file in directory: %s" %
|
||||
dump_root)
|
||||
elif len(metadata_paths) > 1:
|
||||
raise ValueError(
|
||||
"Unexpected: Found multiple (%d) metadata in directory: %s" %
|
||||
(len(metadata_paths), dump_root))
|
||||
self._metadata_path = compat.as_bytes(metadata_paths[0])
|
||||
self._metadata_reader = None
|
||||
self._dump_root = dump_root
|
||||
self._metadata_paths = self._load_metadata_files()
|
||||
|
||||
prefix = metadata_paths[0][:-len(".metadata")]
|
||||
self._source_files_path = compat.as_bytes("%s.source_files" % prefix)
|
||||
self._stack_frames_path = compat.as_bytes("%s.stack_frames" % prefix)
|
||||
self._graphs_path = compat.as_bytes("%s.graphs" % prefix)
|
||||
self._execution_path = compat.as_bytes("%s.execution" % prefix)
|
||||
self._graph_execution_traces_path = compat.as_bytes(
|
||||
"%s.graph_execution_traces" % prefix)
|
||||
prefixes = [
|
||||
metadata_path[:-len(self._METADATA_SUFFIX)]
|
||||
for metadata_path in self._metadata_paths
|
||||
]
|
||||
prefix = prefixes[0] # This is the prefix of the main file set.
|
||||
self._source_files_path = compat.as_bytes(prefix + self._SOURCE_FILE_SUFFIX)
|
||||
self._stack_frames_path = compat.as_bytes(prefix +
|
||||
self._STACK_FRAMES_SUFFIX)
|
||||
self._graphs_path = compat.as_bytes(prefix + self._GRAPHS_SUFFIX)
|
||||
self._execution_path = compat.as_bytes(prefix + self._EXECUTION_SUFFIX)
|
||||
# There can be multiple .graph_execution_trace files each belonging
|
||||
# to a file set generated on an individual host, in the case of
|
||||
# a distributed TensorFlow job.
|
||||
# This is different from the other debug event files in the file set.
|
||||
self._graph_execution_traces_paths = [
|
||||
compat.as_bytes(prefix + self._GRAPH_EXECUTION_TRACES_SUFFIX)
|
||||
for prefix in prefixes
|
||||
]
|
||||
self._readers = dict() # A map from file path to reader.
|
||||
# A map from file path to current reading offset.
|
||||
self._reader_offsets = dict()
|
||||
@ -78,6 +87,91 @@ class DebugEventsReader(object):
|
||||
|
||||
self._offsets = dict()
|
||||
|
||||
def _load_metadata_files(self):
|
||||
"""Load and parse metadata files in the dump root.
|
||||
|
||||
Check that all metadata files have a common tfdbg_run_id, and raise
|
||||
a ValueError if their tfdbg_run_ids differ.
|
||||
|
||||
Returns:
|
||||
A list of metadata file paths in ascending order of their starting
|
||||
wall_time timestamp.
|
||||
"""
|
||||
|
||||
metadata_paths = file_io.get_matching_files(
|
||||
os.path.join(self._dump_root, "*%s" % self._METADATA_SUFFIX))
|
||||
if not metadata_paths:
|
||||
raise ValueError("Cannot find any tfdbg metadata file in directory: %s" %
|
||||
self._dump_root)
|
||||
wall_times = []
|
||||
run_ids = []
|
||||
tensorflow_versions = []
|
||||
file_versions = []
|
||||
for metadata_path in metadata_paths:
|
||||
reader = tf_record.tf_record_random_reader(metadata_path)
|
||||
try:
|
||||
record = reader.read(0)[0]
|
||||
debug_event = debug_event_pb2.DebugEvent.FromString(record)
|
||||
wall_times.append(debug_event.wall_time)
|
||||
run_ids.append(debug_event.debug_metadata.tfdbg_run_id)
|
||||
tensorflow_versions.append(
|
||||
debug_event.debug_metadata.tensorflow_version)
|
||||
file_versions.append(debug_event.debug_metadata.file_version)
|
||||
finally:
|
||||
reader.close()
|
||||
self._starting_wall_time = wall_times[0]
|
||||
self._tfdbg_run_id = run_ids[0]
|
||||
self._tensorflow_version = tensorflow_versions[0]
|
||||
self._file_version = file_versions[0]
|
||||
if len(metadata_paths) == 1:
|
||||
# Fast path for a common case (only one DebugEvent file set.)
|
||||
return metadata_paths
|
||||
|
||||
num_no_id = len([run_id for run_id in run_ids if not run_id])
|
||||
if num_no_id:
|
||||
paths_without_run_id = [
|
||||
metadata_path
|
||||
for metadata_path, run_id in zip(metadata_paths, run_ids)
|
||||
if not run_id
|
||||
]
|
||||
raise ValueError(
|
||||
"Found %d tfdbg metadata files and %d of them do not "
|
||||
"have tfdbg run ids. The metadata files without run ids are: %s" %
|
||||
(len(run_ids), num_no_id, paths_without_run_id))
|
||||
elif len(set(run_ids)) != 1:
|
||||
raise ValueError(
|
||||
"Unexpected: Found multiple (%d) tfdbg2 runs in directory %s" %
|
||||
(len(set(run_ids)), self._dump_root))
|
||||
# Return the metadata files in ascending order of their timestamps.
|
||||
paths_and_timestamps = sorted(
|
||||
zip(metadata_paths, wall_times), key=lambda t: t[1])
|
||||
self._starting_wall_time = paths_and_timestamps[0][1]
|
||||
return [path[0] for path in paths_and_timestamps]
|
||||
|
||||
def starting_wall_time(self):
|
||||
"""Get the starting timestamp of the instrumented TensorFlow program.
|
||||
|
||||
When there are multiple hosts (i.e., multiple tfdbg file sets), the earliest
|
||||
timestamp among the file sets is returned. It is assumed to be the job that
|
||||
starts first (e.g., the coordinator).
|
||||
|
||||
Returns:
|
||||
Starting timestamp in seconds since the epoch, as a float.
|
||||
"""
|
||||
return self._starting_wall_time
|
||||
|
||||
def tfdbg_run_id(self):
|
||||
"""Get the run ID of the instrumented TensorFlow program."""
|
||||
return self._tfdbg_run_id
|
||||
|
||||
def tensorflow_version(self):
|
||||
"""Get the version string of TensorFlow that the debugged program ran on."""
|
||||
return self._tensorflow_version
|
||||
|
||||
def tfdbg_file_version(self):
|
||||
"""Get the tfdbg file format version."""
|
||||
return self._file_version
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
@ -139,9 +233,6 @@ class DebugEventsReader(object):
|
||||
self._reader_offsets[file_path] = 0
|
||||
return self._readers[file_path]
|
||||
|
||||
def metadata_iterator(self):
|
||||
return self._generic_iterator(self._metadata_path)
|
||||
|
||||
def source_files_iterator(self):
|
||||
return self._generic_iterator(self._source_files_path)
|
||||
|
||||
@ -193,14 +284,18 @@ class DebugEventsReader(object):
|
||||
proto_string = self._get_reader(self._execution_path).read(offset)[0]
|
||||
return debug_event_pb2.DebugEvent.FromString(proto_string)
|
||||
|
||||
def graph_execution_traces_iterator(self):
|
||||
return self._generic_iterator(self._graph_execution_traces_path)
|
||||
def graph_execution_traces_iterators(self):
|
||||
return [
|
||||
self._generic_iterator(path)
|
||||
for path in self._graph_execution_traces_paths
|
||||
]
|
||||
|
||||
def read_graph_execution_traces_event(self, offset):
|
||||
"""Read DebugEvent at given offset from .graph_execution_traces file.
|
||||
def read_graph_execution_traces_event(self, locator):
|
||||
"""Read DebugEvent at given offset from given .graph_execution_traces file.
|
||||
|
||||
Args:
|
||||
offset: Offset to read the DebugEvent proto from.
|
||||
locator: A (file_index, offset) tuple that locates the DebugEvent
|
||||
containing the graph execution trace.
|
||||
|
||||
Returns:
|
||||
A DebugEventProto.
|
||||
@ -209,9 +304,11 @@ class DebugEventsReader(object):
|
||||
`errors.DataLossError` if offset is at a wrong location.
|
||||
`IndexError` if offset is out of range of the file.
|
||||
"""
|
||||
with self._reader_read_locks[self._graph_execution_traces_path]:
|
||||
proto_string = self._get_reader(
|
||||
self._graph_execution_traces_path).read(offset)[0]
|
||||
file_index, offset = locator
|
||||
graph_execution_traces_path = self._graph_execution_traces_paths[file_index]
|
||||
with self._reader_read_locks[graph_execution_traces_path]:
|
||||
proto_string = self._get_reader(graph_execution_traces_path).read(
|
||||
offset)[0]
|
||||
return debug_event_pb2.DebugEvent.FromString(proto_string)
|
||||
|
||||
def close(self):
|
||||
@ -227,21 +324,27 @@ class BaseDigest(object):
|
||||
|
||||
Properties:
|
||||
wall_time: A timestamp for the digest as a `float` (unit: s).
|
||||
offset: A offset number in the corresponding file that can be used for
|
||||
fast random read access.
|
||||
locator: A datum that allows tracng the digest to its original
|
||||
location. It can be either of the two:
|
||||
1. Bytes offset from the beginning of the file as a single integer,
|
||||
for the case of all digests of the same kind coming from the same
|
||||
file.
|
||||
2. A tuple of a file index and a byte offset. This applies to case
|
||||
in which the same type of debugger data may come from multple files,
|
||||
e.g., graph execution traces.
|
||||
"""
|
||||
|
||||
def __init__(self, wall_time, offset):
|
||||
def __init__(self, wall_time, locator):
|
||||
self._wall_time = wall_time
|
||||
self._offset = offset
|
||||
self._locator = locator
|
||||
|
||||
@property
|
||||
def wall_time(self):
|
||||
return self._wall_time
|
||||
|
||||
@property
|
||||
def offset(self):
|
||||
return self._offset
|
||||
def locator(self):
|
||||
return self._locator
|
||||
|
||||
def to_json(self):
|
||||
return {"wall_time": self.wall_time}
|
||||
@ -265,10 +368,10 @@ class ExecutionDigest(BaseDigest):
|
||||
|
||||
def __init__(self,
|
||||
wall_time,
|
||||
offset,
|
||||
locator,
|
||||
op_type,
|
||||
output_tensor_device_ids=None):
|
||||
super(ExecutionDigest, self).__init__(wall_time, offset)
|
||||
super(ExecutionDigest, self).__init__(wall_time, locator)
|
||||
self._op_type = op_type
|
||||
self._output_tensor_device_ids = _tuple_or_none(output_tensor_device_ids)
|
||||
|
||||
@ -332,7 +435,7 @@ class Execution(ExecutionDigest):
|
||||
debug_tensor_values=None):
|
||||
super(Execution, self).__init__(
|
||||
execution_digest.wall_time,
|
||||
execution_digest.offset,
|
||||
execution_digest.locator,
|
||||
execution_digest.op_type,
|
||||
output_tensor_device_ids=execution_digest.output_tensor_device_ids)
|
||||
self._host_name = host_name
|
||||
@ -556,7 +659,7 @@ class GraphOpCreationDigest(BaseDigest):
|
||||
|
||||
def __init__(self,
|
||||
wall_time,
|
||||
offset,
|
||||
locator,
|
||||
graph_id,
|
||||
op_type,
|
||||
op_name,
|
||||
@ -565,7 +668,7 @@ class GraphOpCreationDigest(BaseDigest):
|
||||
stack_frame_ids,
|
||||
input_names=None,
|
||||
device_name=None):
|
||||
super(GraphOpCreationDigest, self).__init__(wall_time, offset)
|
||||
super(GraphOpCreationDigest, self).__init__(wall_time, locator)
|
||||
self._graph_id = graph_id
|
||||
self._op_type = op_type
|
||||
self._op_name = op_name
|
||||
@ -640,14 +743,9 @@ class GraphExecutionTraceDigest(BaseDigest):
|
||||
graph.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
wall_time,
|
||||
offset,
|
||||
op_type,
|
||||
op_name,
|
||||
output_slot,
|
||||
def __init__(self, wall_time, locator, op_type, op_name, output_slot,
|
||||
graph_id):
|
||||
super(GraphExecutionTraceDigest, self).__init__(wall_time, offset)
|
||||
super(GraphExecutionTraceDigest, self).__init__(wall_time, locator)
|
||||
self._op_type = op_type
|
||||
self._op_name = op_name
|
||||
self._output_slot = output_slot
|
||||
@ -701,13 +799,13 @@ class GraphExecutionTrace(GraphExecutionTraceDigest):
|
||||
tensor_debug_mode,
|
||||
debug_tensor_value=None,
|
||||
device_name=None):
|
||||
super(GraphExecutionTrace, self).__init__(
|
||||
graph_execution_trace_digest.wall_time,
|
||||
graph_execution_trace_digest.offset,
|
||||
graph_execution_trace_digest.op_type,
|
||||
graph_execution_trace_digest.op_name,
|
||||
graph_execution_trace_digest.output_slot,
|
||||
graph_execution_trace_digest.graph_id)
|
||||
super(GraphExecutionTrace,
|
||||
self).__init__(graph_execution_trace_digest.wall_time,
|
||||
graph_execution_trace_digest.locator,
|
||||
graph_execution_trace_digest.op_type,
|
||||
graph_execution_trace_digest.op_name,
|
||||
graph_execution_trace_digest.output_slot,
|
||||
graph_execution_trace_digest.graph_id)
|
||||
self._graph_ids = tuple(graph_ids)
|
||||
self._tensor_debug_mode = tensor_debug_mode
|
||||
self._debug_tensor_value = debug_tensor_value
|
||||
@ -780,17 +878,17 @@ def _parse_tensor_value(tensor_proto, return_list=False):
|
||||
return None
|
||||
|
||||
|
||||
def _execution_digest_from_debug_event_proto(debug_event, offset):
|
||||
def _execution_digest_from_debug_event_proto(debug_event, locator):
|
||||
"""Convert a DebugEvent proto into an ExecutionDigest data object."""
|
||||
return ExecutionDigest(
|
||||
debug_event.wall_time,
|
||||
offset,
|
||||
locator,
|
||||
debug_event.execution.op_type,
|
||||
output_tensor_device_ids=(
|
||||
debug_event.execution.output_tensor_device_ids or None))
|
||||
output_tensor_device_ids=(debug_event.execution.output_tensor_device_ids
|
||||
or None))
|
||||
|
||||
|
||||
def _execution_from_debug_event_proto(debug_event, offset):
|
||||
def _execution_from_debug_event_proto(debug_event, locator):
|
||||
"""Convert a DebugEvent proto into an Execution data object."""
|
||||
execution_proto = debug_event.execution
|
||||
|
||||
@ -806,7 +904,7 @@ def _execution_from_debug_event_proto(debug_event, offset):
|
||||
debug_tensor_values.append(
|
||||
_parse_tensor_value(tensor_proto, return_list=True))
|
||||
return Execution(
|
||||
_execution_digest_from_debug_event_proto(debug_event, offset),
|
||||
_execution_digest_from_debug_event_proto(debug_event, locator),
|
||||
execution_proto.code_location.host_name,
|
||||
tuple(execution_proto.code_location.stack_frame_ids),
|
||||
execution_proto.tensor_debug_mode,
|
||||
@ -832,7 +930,6 @@ class DebugDataReader(object):
|
||||
|
||||
def __init__(self, dump_root):
|
||||
self._reader = DebugEventsReader(dump_root)
|
||||
self._load_metadata()
|
||||
|
||||
# TODO(cais): Implement pagination for memory constraints.
|
||||
self._execution_digests = []
|
||||
@ -858,13 +955,6 @@ class DebugDataReader(object):
|
||||
def _add_monitor(self, monitor):
|
||||
self._monitors.append(monitor)
|
||||
|
||||
def _load_metadata(self):
|
||||
metadata_iter = self._reader.metadata_iterator()
|
||||
debug_event = next(metadata_iter).debug_event
|
||||
self._starting_wall_time = debug_event.wall_time
|
||||
self._tensorflow_version = debug_event.debug_metadata.tensorflow_version
|
||||
self._tfdbg_run_id = debug_event.debug_metadata.tfdbg_run_id
|
||||
|
||||
def _load_source_files(self):
|
||||
"""Incrementally read the .source_files DebugEvent file."""
|
||||
source_files_iter = self._reader.source_files_iterator()
|
||||
@ -944,37 +1034,32 @@ class DebugDataReader(object):
|
||||
|
||||
def _load_graph_execution_traces(self):
|
||||
"""Incrementally load the .graph_execution_traces file."""
|
||||
traces_iter = self._reader.graph_execution_traces_iterator()
|
||||
for debug_event, offset in traces_iter:
|
||||
self._graph_execution_trace_digests.append(
|
||||
self._graph_execution_trace_digest_from_debug_event_proto(
|
||||
debug_event, offset))
|
||||
if self._monitors:
|
||||
graph_execution_trace = (
|
||||
self._graph_execution_trace_from_debug_event_proto(
|
||||
debug_event, offset))
|
||||
for monitor in self._monitors:
|
||||
monitor.on_graph_execution_trace(
|
||||
len(self._graph_execution_trace_digests) - 1,
|
||||
graph_execution_trace)
|
||||
for i, traces_iter in enumerate(
|
||||
self._reader.graph_execution_traces_iterators()):
|
||||
for debug_event, offset in traces_iter:
|
||||
self._graph_execution_trace_digests.append(
|
||||
self._graph_execution_trace_digest_from_debug_event_proto(
|
||||
debug_event, (i, offset)))
|
||||
if self._monitors:
|
||||
graph_execution_trace = (
|
||||
self._graph_execution_trace_from_debug_event_proto(
|
||||
debug_event, (i, offset)))
|
||||
for monitor in self._monitors:
|
||||
monitor.on_graph_execution_trace(
|
||||
len(self._graph_execution_trace_digests) - 1,
|
||||
graph_execution_trace)
|
||||
|
||||
def _graph_execution_trace_digest_from_debug_event_proto(self,
|
||||
debug_event,
|
||||
offset):
|
||||
def _graph_execution_trace_digest_from_debug_event_proto(
|
||||
self, debug_event, locator):
|
||||
trace_proto = debug_event.graph_execution_trace
|
||||
op_name = trace_proto.op_name
|
||||
op_type = self._lookup_op_type(trace_proto.tfdbg_context_id, op_name)
|
||||
return GraphExecutionTraceDigest(
|
||||
debug_event.wall_time,
|
||||
offset,
|
||||
op_type,
|
||||
op_name,
|
||||
debug_event.wall_time, locator, op_type, op_name,
|
||||
trace_proto.output_slot,
|
||||
debug_event.graph_execution_trace.tfdbg_context_id)
|
||||
|
||||
def _graph_execution_trace_from_debug_event_proto(self,
|
||||
debug_event,
|
||||
offset):
|
||||
def _graph_execution_trace_from_debug_event_proto(self, debug_event, locator):
|
||||
"""Convert a DebugEvent proto into a GraphExecutionTrace data object."""
|
||||
trace_proto = debug_event.graph_execution_trace
|
||||
graph_ids = [trace_proto.tfdbg_context_id]
|
||||
@ -995,7 +1080,7 @@ class DebugDataReader(object):
|
||||
trace_proto.tensor_proto, return_list=True)
|
||||
return GraphExecutionTrace(
|
||||
self._graph_execution_trace_digest_from_debug_event_proto(
|
||||
debug_event, offset),
|
||||
debug_event, locator),
|
||||
graph_ids=graph_ids,
|
||||
tensor_debug_mode=trace_proto.tensor_debug_mode,
|
||||
debug_tensor_value=debug_tensor_value,
|
||||
@ -1059,7 +1144,7 @@ class DebugDataReader(object):
|
||||
Returns:
|
||||
Stating wall time as seconds since the epoch, as a `float`.
|
||||
"""
|
||||
return self._starting_wall_time
|
||||
return self._reader.starting_wall_time()
|
||||
|
||||
def tensorflow_version(self):
|
||||
"""TensorFlow version used in the debugged TensorFlow program.
|
||||
@ -1070,11 +1155,11 @@ class DebugDataReader(object):
|
||||
Returns:
|
||||
TensorFlow version used by the debugged program, as a `str`.
|
||||
"""
|
||||
return self._tensorflow_version
|
||||
return self._reader.tensorflow_version()
|
||||
|
||||
def tfdbg_run_id(self):
|
||||
"""Get the debugger run ID of the debugged TensorFlow program."""
|
||||
return self._tfdbg_run_id
|
||||
return self._reader.tfdbg_run_id()
|
||||
|
||||
def outermost_graphs(self):
|
||||
"""Get the number of outer most graphs read so far."""
|
||||
@ -1171,9 +1256,9 @@ class DebugDataReader(object):
|
||||
|
||||
def read_execution(self, execution_digest):
|
||||
"""Read a detailed Execution object."""
|
||||
debug_event = self._reader.read_execution_event(execution_digest.offset)
|
||||
return _execution_from_debug_event_proto(
|
||||
debug_event, execution_digest.offset)
|
||||
debug_event = self._reader.read_execution_event(execution_digest.locator)
|
||||
return _execution_from_debug_event_proto(debug_event,
|
||||
execution_digest.locator)
|
||||
|
||||
def read_graph_execution_trace(self, graph_execution_trace_digest):
|
||||
"""Read the detailed graph execution trace.
|
||||
@ -1185,9 +1270,9 @@ class DebugDataReader(object):
|
||||
The corresponding `GraphExecutionTrace` object.
|
||||
"""
|
||||
debug_event = self._reader.read_graph_execution_traces_event(
|
||||
graph_execution_trace_digest.offset)
|
||||
graph_execution_trace_digest.locator)
|
||||
return self._graph_execution_trace_from_debug_event_proto(
|
||||
debug_event, graph_execution_trace_digest.offset)
|
||||
debug_event, graph_execution_trace_digest.locator)
|
||||
|
||||
def read_execution_stack_trace(self, execution):
|
||||
"""Read the stack trace of a given Execution object.
|
||||
@ -1234,7 +1319,7 @@ class DebugDataReader(object):
|
||||
A list of numpy arrays representing the output tensor values of the
|
||||
execution event.
|
||||
"""
|
||||
debug_event = self._reader.read_execution_event(execution.offset)
|
||||
debug_event = self._reader.read_execution_event(execution.locator)
|
||||
return [_parse_tensor_value(tensor_proto)
|
||||
for tensor_proto in debug_event.execution.tensor_protos]
|
||||
|
||||
@ -1248,8 +1333,7 @@ class DebugDataReader(object):
|
||||
A numpy array representing the output tensor value of the intra-graph
|
||||
tensor execution event.
|
||||
"""
|
||||
debug_event = self._reader.read_graph_execution_traces_event(
|
||||
trace.offset)
|
||||
debug_event = self._reader.read_graph_execution_traces_event(trace.locator)
|
||||
return _parse_tensor_value(debug_event.graph_execution_trace.tensor_proto)
|
||||
|
||||
def symbolic_tensor_id(self, graph_id, op_name, output_slot):
|
||||
|
@ -21,6 +21,7 @@ from __future__ import print_function
|
||||
import glob
|
||||
import json as json_lib
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
|
||||
@ -264,14 +265,14 @@ class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase,
|
||||
writer.WriteGraphExecutionTrace(trace)
|
||||
|
||||
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
|
||||
actuals = list(reader.graph_execution_traces_iterator())
|
||||
actuals = list(reader.graph_execution_traces_iterators()[0])
|
||||
# Before FlushExecutionFiles() is called. No data should have been written
|
||||
# to the file.
|
||||
self.assertEmpty(actuals)
|
||||
|
||||
writer.FlushExecutionFiles()
|
||||
actuals = list(item.debug_event.graph_execution_trace
|
||||
for item in reader.graph_execution_traces_iterator())
|
||||
for item in reader.graph_execution_traces_iterators()[0])
|
||||
self.assertLen(actuals, debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE)
|
||||
for i in range(debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE):
|
||||
self.assertEqual(
|
||||
@ -291,7 +292,7 @@ class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase,
|
||||
|
||||
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
|
||||
actuals = list(item.debug_event.graph_execution_trace
|
||||
for item in reader.graph_execution_traces_iterator())
|
||||
for item in reader.graph_execution_traces_iterators()[0])
|
||||
self.assertLen(actuals, num_execution_events)
|
||||
for i in range(num_execution_events):
|
||||
self.assertEqual(actuals[i].op_name, "Op%d" % i)
|
||||
@ -598,6 +599,86 @@ class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase,
|
||||
self.assertEqual(traces[-1].op_name, "Op_%d" % (expected_end - 1))
|
||||
|
||||
|
||||
class MultiSetReaderTest(dumping_callback_test_lib.DumpingCallbackTestBase):
|
||||
"""Test for DebugDataReader for multiple file sets under a dump root."""
|
||||
|
||||
def testReadingTwoFileSetsWithTheSameDumpRootSucceeds(self):
|
||||
# To simulate a multi-host data dump, we first generate file sets in two
|
||||
# different directories, with the same tfdbg_run_id, and then combine them.
|
||||
tfdbg_run_id = "foo"
|
||||
for i in range(2):
|
||||
writer = debug_events_writer.DebugEventsWriter(
|
||||
os.path.join(self.dump_root, str(i)),
|
||||
tfdbg_run_id,
|
||||
circular_buffer_size=-1)
|
||||
if i == 0:
|
||||
debugged_graph = debug_event_pb2.DebuggedGraph(
|
||||
graph_id="graph1", graph_name="graph1")
|
||||
writer.WriteDebuggedGraph(debugged_graph)
|
||||
op_name = "Op_0"
|
||||
graph_op_creation = debug_event_pb2.GraphOpCreation(
|
||||
op_type="FooOp", op_name=op_name, graph_id="graph1")
|
||||
writer.WriteGraphOpCreation(graph_op_creation)
|
||||
op_name = "Op_1"
|
||||
graph_op_creation = debug_event_pb2.GraphOpCreation(
|
||||
op_type="FooOp", op_name=op_name, graph_id="graph1")
|
||||
writer.WriteGraphOpCreation(graph_op_creation)
|
||||
for _ in range(10):
|
||||
trace = debug_event_pb2.GraphExecutionTrace(
|
||||
op_name="Op_%d" % i, tfdbg_context_id="graph1")
|
||||
writer.WriteGraphExecutionTrace(trace)
|
||||
writer.FlushNonExecutionFiles()
|
||||
writer.FlushExecutionFiles()
|
||||
|
||||
# Move all files from the subdirectory /1 to subdirectory /0.
|
||||
dump_root_0 = os.path.join(self.dump_root, "0")
|
||||
src_paths = glob.glob(os.path.join(self.dump_root, "1", "*"))
|
||||
for src_path in src_paths:
|
||||
dst_path = os.path.join(
|
||||
dump_root_0,
|
||||
# Rename the file set to avoid file name collision.
|
||||
re.sub(r"(tfdbg_events\.\d+)", r"\g<1>1", os.path.basename(src_path)))
|
||||
os.rename(src_path, dst_path)
|
||||
|
||||
with debug_events_reader.DebugDataReader(dump_root_0) as reader:
|
||||
reader.update()
|
||||
# Verify the content of the .graph_execution_traces file.
|
||||
trace_digests = reader.graph_execution_traces(digest=True)
|
||||
self.assertLen(trace_digests, 20)
|
||||
for _ in range(10):
|
||||
trace = reader.read_graph_execution_trace(trace_digests[i])
|
||||
self.assertEqual(trace.op_name, "Op_0")
|
||||
for _ in range(10):
|
||||
trace = reader.read_graph_execution_trace(trace_digests[i + 10])
|
||||
self.assertEqual(trace.op_name, "Op_1")
|
||||
|
||||
def testReadingTwoFileSetsWithTheDifferentRootsLeadsToError(self):
|
||||
# To simulate a multi-host data dump, we first generate file sets in two
|
||||
# different directories, with different tfdbg_run_ids, and then combine
|
||||
# them.
|
||||
for i in range(2):
|
||||
writer = debug_events_writer.DebugEventsWriter(
|
||||
os.path.join(self.dump_root, str(i)),
|
||||
"run_id_%d" % i,
|
||||
circular_buffer_size=-1)
|
||||
writer.FlushNonExecutionFiles()
|
||||
writer.FlushExecutionFiles()
|
||||
|
||||
# Move all files from the subdirectory /1 to subdirectory /0.
|
||||
dump_root_0 = os.path.join(self.dump_root, "0")
|
||||
src_paths = glob.glob(os.path.join(self.dump_root, "1", "*"))
|
||||
for src_path in src_paths:
|
||||
dst_path = os.path.join(
|
||||
dump_root_0,
|
||||
# Rename the file set to avoid file name collision.
|
||||
re.sub(r"(tfdbg_events\.\d+)", r"\g<1>1", os.path.basename(src_path)))
|
||||
os.rename(src_path, dst_path)
|
||||
|
||||
with self.assertRaisesRegexp(ValueError,
|
||||
r"Found multiple \(2\) tfdbg2 runs"):
|
||||
debug_events_reader.DebugDataReader(dump_root_0)
|
||||
|
||||
|
||||
class DataObjectsTest(test_util.TensorFlowTestCase, parameterized.TestCase):
|
||||
|
||||
def jsonRoundTripCheck(self, obj):
|
||||
|
@ -92,16 +92,13 @@ class DebugIdentityV2OpTest(dumping_callback_test_lib.DumpingCallbackTestBase):
|
||||
write_debug_trace(x), [9.0 + np.sqrt(3.0), 16.0 + 2.0])
|
||||
|
||||
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
|
||||
metadata_iter = reader.metadata_iterator()
|
||||
# Check that the .metadata DebugEvents data file has been created, even
|
||||
# before FlushExecutionFiles() is called.
|
||||
debug_event = next(metadata_iter).debug_event
|
||||
self.assertGreater(debug_event.wall_time, 0)
|
||||
self.assertTrue(debug_event.debug_metadata.tensorflow_version)
|
||||
self.assertTrue(
|
||||
debug_event.debug_metadata.file_version.startswith("debug.Event:"))
|
||||
self.assertGreater(reader.starting_wall_time(), 0)
|
||||
self.assertTrue(reader.tensorflow_version())
|
||||
self.assertTrue(reader.tfdbg_file_version().startswith("debug.Event"))
|
||||
|
||||
graph_trace_iter = reader.graph_execution_traces_iterator()
|
||||
graph_trace_iter = reader.graph_execution_traces_iterators()[0]
|
||||
# Before FlushExecutionFiles() is called, the .graph_execution_traces file
|
||||
# ought to be empty.
|
||||
with self.assertRaises(StopIteration):
|
||||
@ -109,7 +106,7 @@ class DebugIdentityV2OpTest(dumping_callback_test_lib.DumpingCallbackTestBase):
|
||||
|
||||
# Flush the circular buffer.
|
||||
self.writer.FlushExecutionFiles()
|
||||
graph_trace_iter = reader.graph_execution_traces_iterator()
|
||||
graph_trace_iter = reader.graph_execution_traces_iterators()[0]
|
||||
|
||||
# The circular buffer has a size of 4. So only the data from the
|
||||
# last two iterations should have been written to self.dump_root.
|
||||
@ -167,7 +164,7 @@ class DebugIdentityV2OpTest(dumping_callback_test_lib.DumpingCallbackTestBase):
|
||||
|
||||
self.writer.FlushExecutionFiles()
|
||||
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
|
||||
graph_trace_iter = reader.graph_execution_traces_iterator()
|
||||
graph_trace_iter = reader.graph_execution_traces_iterators()[0]
|
||||
try:
|
||||
x_values = []
|
||||
timestamp = 0
|
||||
@ -216,7 +213,7 @@ class DebugIdentityV2OpTest(dumping_callback_test_lib.DumpingCallbackTestBase):
|
||||
|
||||
for debug_root in (self.dump_root, another_dump_root):
|
||||
with debug_events_reader.DebugEventsReader(debug_root) as reader:
|
||||
graph_trace_iter = reader.graph_execution_traces_iterator()
|
||||
graph_trace_iter = reader.graph_execution_traces_iterators()[0]
|
||||
|
||||
debug_event = next(graph_trace_iter).debug_event
|
||||
trace = debug_event.graph_execution_trace
|
||||
@ -272,7 +269,7 @@ class DebugIdentityV2OpUninitializedWriterTest(
|
||||
writer.FlushExecutionFiles()
|
||||
|
||||
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
|
||||
graph_trace_iter = reader.graph_execution_traces_iterator()
|
||||
graph_trace_iter = reader.graph_execution_traces_iterators()[0]
|
||||
graph_execution_traces = []
|
||||
while True:
|
||||
try:
|
||||
|
@ -48,7 +48,6 @@ class DumpingCallbackTestBase(test_util.TensorFlowTestCase):
|
||||
def _readAndCheckMetadataFile(self):
|
||||
"""Read and check the .metadata debug-events file."""
|
||||
with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
|
||||
metadata_iter = reader.metadata_iterator()
|
||||
metadata = next(metadata_iter).debug_event.debug_metadata
|
||||
self.assertEqual(metadata.tensorflow_version, versions.__version__)
|
||||
self.assertTrue(metadata.file_version.startswith("debug.Event"))
|
||||
self.assertTrue(reader.tfdbg_run_id())
|
||||
self.assertEqual(reader.tensorflow_version(), versions.__version__)
|
||||
self.assertTrue(reader.tfdbg_file_version().startswith("debug.Event"))
|
||||
|
Loading…
Reference in New Issue
Block a user