diff --git a/tensorflow/python/BUILD b/tensorflow/python/BUILD index 04a740e1f30..e6ea4fd36b2 100644 --- a/tensorflow/python/BUILD +++ b/tensorflow/python/BUILD @@ -3303,6 +3303,7 @@ py_library( ":smart_cond", ":summary_op_util", ":summary_ops_gen", + ":tensor_util", ":training_util", ":util", "//tensorflow/core:protos_all_py", diff --git a/tensorflow/python/kernel_tests/BUILD b/tensorflow/python/kernel_tests/BUILD index ea458969700..085c1d76b20 100644 --- a/tensorflow/python/kernel_tests/BUILD +++ b/tensorflow/python/kernel_tests/BUILD @@ -1093,6 +1093,7 @@ cuda_py_test( "//tensorflow/python:client_testlib", "//tensorflow/python:constant_op", "//tensorflow/python:dtypes", + "//tensorflow/python:errors", "//tensorflow/python:framework_ops", "//tensorflow/python:framework_test_lib", "//tensorflow/python:lib", diff --git a/tensorflow/python/kernel_tests/summary_ops_test.py b/tensorflow/python/kernel_tests/summary_ops_test.py index b7a05136c4b..0b4d58f6080 100644 --- a/tensorflow/python/kernel_tests/summary_ops_test.py +++ b/tensorflow/python/kernel_tests/summary_ops_test.py @@ -19,7 +19,6 @@ from __future__ import division from __future__ import print_function import os -import time import unittest from tensorflow.core.framework import graph_pb2 @@ -32,6 +31,7 @@ from tensorflow.python.eager import context from tensorflow.python.eager import def_function from tensorflow.python.framework import constant_op from tensorflow.python.framework import dtypes +from tensorflow.python.framework import errors from tensorflow.python.framework import ops from tensorflow.python.framework import tensor_spec from tensorflow.python.framework import tensor_util @@ -52,7 +52,7 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def testWrite(self): logdir = self.get_temp_dir() with context.eager_mode(): - with summary_ops.create_file_writer(logdir).as_default(): + with summary_ops.create_file_writer_v2(logdir).as_default(): output = summary_ops.write('tag', 42, step=12) self.assertTrue(output.numpy()) events = events_from_logdir(logdir) @@ -64,11 +64,12 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def testWrite_fromFunction(self): logdir = self.get_temp_dir() - @def_function.function - def f(): - with summary_ops.create_file_writer(logdir).as_default(): - return summary_ops.write('tag', 42, step=12) with context.eager_mode(): + writer = summary_ops.create_file_writer_v2(logdir) + @def_function.function + def f(): + with writer.as_default(): + return summary_ops.write('tag', 42, step=12) output = f() self.assertTrue(output.numpy()) events = events_from_logdir(logdir) @@ -83,7 +84,7 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): metadata = summary_pb2.SummaryMetadata() metadata.plugin_data.plugin_name = 'foo' with context.eager_mode(): - with summary_ops.create_file_writer(logdir).as_default(): + with summary_ops.create_file_writer_v2(logdir).as_default(): summary_ops.write('obj', 0, 0, metadata=metadata) summary_ops.write('bytes', 0, 0, metadata=metadata.SerializeToString()) m = constant_op.constant(metadata.SerializeToString()) @@ -104,7 +105,7 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def testWrite_ndarray(self): logdir = self.get_temp_dir() with context.eager_mode(): - with summary_ops.create_file_writer(logdir).as_default(): + with summary_ops.create_file_writer_v2(logdir).as_default(): summary_ops.write('tag', [[1, 2], [3, 4]], step=12) events = events_from_logdir(logdir) value = events[1].summary.value[0] @@ -114,7 +115,7 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): logdir = self.get_temp_dir() with context.eager_mode(): t = constant_op.constant([[1, 2], [3, 4]]) - with summary_ops.create_file_writer(logdir).as_default(): + with summary_ops.create_file_writer_v2(logdir).as_default(): summary_ops.write('tag', t, step=12) expected = t.numpy() events = events_from_logdir(logdir) @@ -123,11 +124,12 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def testWrite_tensor_fromFunction(self): logdir = self.get_temp_dir() - @def_function.function - def f(t): - with summary_ops.create_file_writer(logdir).as_default(): - summary_ops.write('tag', t, step=12) with context.eager_mode(): + writer = summary_ops.create_file_writer_v2(logdir) + @def_function.function + def f(t): + with writer.as_default(): + summary_ops.write('tag', t, step=12) t = constant_op.constant([[1, 2], [3, 4]]) f(t) expected = t.numpy() @@ -138,7 +140,7 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def testWrite_stringTensor(self): logdir = self.get_temp_dir() with context.eager_mode(): - with summary_ops.create_file_writer(logdir).as_default(): + with summary_ops.create_file_writer_v2(logdir).as_default(): summary_ops.write('tag', [b'foo', b'bar'], step=12) events = events_from_logdir(logdir) value = events[1].summary.value[0] @@ -168,7 +170,7 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def testWrite_recordIf_constant(self): logdir = self.get_temp_dir() with context.eager_mode(): - with summary_ops.create_file_writer(logdir).as_default(): + with summary_ops.create_file_writer_v2(logdir).as_default(): self.assertTrue(summary_ops.write('default', 1, step=0)) with summary_ops.record_if(True): self.assertTrue(summary_ops.write('set_on', 1, step=0)) @@ -181,16 +183,17 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def testWrite_recordIf_constant_fromFunction(self): logdir = self.get_temp_dir() - @def_function.function - def f(): - with summary_ops.create_file_writer(logdir).as_default(): - # Use assertAllEqual instead of assertTrue since it works in a defun. - self.assertAllEqual(summary_ops.write('default', 1, step=0), True) - with summary_ops.record_if(True): - self.assertAllEqual(summary_ops.write('set_on', 1, step=0), True) - with summary_ops.record_if(False): - self.assertAllEqual(summary_ops.write('set_off', 1, step=0), False) with context.eager_mode(): + writer = summary_ops.create_file_writer_v2(logdir) + @def_function.function + def f(): + with writer.as_default(): + # Use assertAllEqual instead of assertTrue since it works in a defun. + self.assertAllEqual(summary_ops.write('default', 1, step=0), True) + with summary_ops.record_if(True): + self.assertAllEqual(summary_ops.write('set_on', 1, step=0), True) + with summary_ops.record_if(False): + self.assertAllEqual(summary_ops.write('set_off', 1, step=0), False) f() events = events_from_logdir(logdir) self.assertEqual(3, len(events)) @@ -204,7 +207,7 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def record_fn(): step.assign_add(1) return int(step % 2) == 0 - with summary_ops.create_file_writer(logdir).as_default(): + with summary_ops.create_file_writer_v2(logdir).as_default(): with summary_ops.record_if(record_fn): self.assertTrue(summary_ops.write('tag', 1, step=step)) self.assertFalse(summary_ops.write('tag', 1, step=step)) @@ -220,6 +223,7 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def testWrite_recordIf_callable_fromFunction(self): logdir = self.get_temp_dir() with context.eager_mode(): + writer = summary_ops.create_file_writer_v2(logdir) step = variables.Variable(-1, dtype=dtypes.int64) @def_function.function def record_fn(): @@ -227,7 +231,7 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): return math_ops.equal(step % 2, 0) @def_function.function def f(): - with summary_ops.create_file_writer(logdir).as_default(): + with writer.as_default(): with summary_ops.record_if(record_fn): return [ summary_ops.write('tag', 1, step=step), @@ -243,13 +247,14 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): def testWrite_recordIf_tensorInput_fromFunction(self): logdir = self.get_temp_dir() - @def_function.function(input_signature=[ - tensor_spec.TensorSpec(shape=[], dtype=dtypes.int64)]) - def f(step): - with summary_ops.create_file_writer(logdir).as_default(): - with summary_ops.record_if(math_ops.equal(step % 2, 0)): - return summary_ops.write('tag', 1, step=step) with context.eager_mode(): + writer = summary_ops.create_file_writer_v2(logdir) + @def_function.function(input_signature=[ + tensor_spec.TensorSpec(shape=[], dtype=dtypes.int64)]) + def f(step): + with writer.as_default(): + with summary_ops.record_if(math_ops.equal(step % 2, 0)): + return summary_ops.write('tag', 1, step=step) self.assertTrue(f(0)) self.assertFalse(f(1)) self.assertTrue(f(2)) @@ -311,77 +316,152 @@ class SummaryOpsCoreTest(test_util.TensorFlowTestCase): class SummaryWriterTest(test_util.TensorFlowTestCase): - def testWriterInitAndClose(self): + def testCreate_withInitAndClose(self): logdir = self.get_temp_dir() with context.eager_mode(): - writer = summary_ops.create_file_writer( + writer = summary_ops.create_file_writer_v2( logdir, max_queue=1000, flush_millis=1000000) - files = gfile.Glob(os.path.join(logdir, '*')) - self.assertEqual(1, len(files)) - file1 = files[0] - self.assertEqual(1, len(events_from_file(file1))) # file_version Event + get_total = lambda: len(events_from_logdir(logdir)) + self.assertEqual(1, get_total()) # file_version Event # Calling init() again while writer is open has no effect writer.init() - self.assertEqual(1, len(events_from_file(file1))) + self.assertEqual(1, get_total()) with writer.as_default(): summary_ops.write('tag', 1, step=0) - self.assertEqual(1, len(events_from_file(file1))) + self.assertEqual(1, get_total()) # Calling .close() should do an implicit flush writer.close() - self.assertEqual(2, len(events_from_file(file1))) - # Calling init() on a closed writer should start a new file - time.sleep(1.1) # Ensure filename has a different timestamp - writer.init() - files = gfile.Glob(os.path.join(logdir, '*')) - self.assertEqual(2, len(files)) - files.remove(file1) - file2 = files[0] - self.assertEqual(1, len(events_from_file(file2))) # file_version - self.assertEqual(2, len(events_from_file(file1))) # should be unchanged + self.assertEqual(2, get_total()) - def testSharedName(self): + def testCreate_fromFunction(self): + logdir = self.get_temp_dir() + @def_function.function + def f(): + # Returned SummaryWriter must be stored in a non-local variable so it + # lives throughout the function execution. + if not hasattr(f, 'writer'): + f.writer = summary_ops.create_file_writer_v2(logdir) + with context.eager_mode(): + f() + event_files = gfile.Glob(os.path.join(logdir, '*')) + self.assertEqual(1, len(event_files)) + + def testCreate_graphTensorArgument_raisesError(self): + logdir = self.get_temp_dir() + with context.graph_mode(): + logdir_tensor = constant_op.constant(logdir) + with context.eager_mode(): + with self.assertRaisesRegex( + ValueError, 'Invalid graph Tensor argument.*logdir'): + summary_ops.create_file_writer_v2(logdir_tensor) + self.assertEmpty(gfile.Glob(os.path.join(logdir, '*'))) + + def testCreate_fromFunction_graphTensorArgument_raisesError(self): + logdir = self.get_temp_dir() + @def_function.function + def f(): + summary_ops.create_file_writer_v2(constant_op.constant(logdir)) + with context.eager_mode(): + with self.assertRaisesRegex( + ValueError, 'Invalid graph Tensor argument.*logdir'): + f() + self.assertEmpty(gfile.Glob(os.path.join(logdir, '*'))) + + def testCreate_fromFunction_unpersistedResource_raisesError(self): + logdir = self.get_temp_dir() + @def_function.function + def f(): + with summary_ops.create_file_writer_v2(logdir).as_default(): + pass # Calling .as_default() is enough to indicate use. + with context.eager_mode(): + # TODO(nickfelt): change this to a better error + with self.assertRaisesRegex( + errors.NotFoundError, 'Resource.*does not exist'): + f() + # Even though we didn't use it, an event file will have been created. + self.assertEqual(1, len(gfile.Glob(os.path.join(logdir, '*')))) + + def testNoSharing(self): + # Two writers with the same logdir should not share state. logdir = self.get_temp_dir() with context.eager_mode(): - # Create with default shared name (should match logdir) - writer1 = summary_ops.create_file_writer(logdir) + writer1 = summary_ops.create_file_writer_v2(logdir) with writer1.as_default(): summary_ops.write('tag', 1, step=1) - summary_ops.flush() - # Create with explicit logdir shared name (should be same resource/file) - shared_name = 'logdir:' + logdir - writer2 = summary_ops.create_file_writer(logdir, name=shared_name) + event_files = gfile.Glob(os.path.join(logdir, '*')) + self.assertEqual(1, len(event_files)) + file1 = event_files[0] + + writer2 = summary_ops.create_file_writer_v2(logdir) with writer2.as_default(): summary_ops.write('tag', 1, step=2) - summary_ops.flush() - # Create with different shared name (should be separate resource/file) - time.sleep(1.1) # Ensure filename has a different timestamp - writer3 = summary_ops.create_file_writer(logdir, name='other') - with writer3.as_default(): - summary_ops.write('tag', 1, step=3) - summary_ops.flush() + event_files = gfile.Glob(os.path.join(logdir, '*')) + self.assertEqual(2, len(event_files)) + event_files.remove(file1) + file2 = event_files[0] - event_files = iter(sorted(gfile.Glob(os.path.join(logdir, '*')))) + # Extra writes to ensure interleaved usage works. + with writer1.as_default(): + summary_ops.write('tag', 1, step=1) + with writer2.as_default(): + summary_ops.write('tag', 1, step=2) - # First file has tags "one" and "two" - events = iter(events_from_file(next(event_files))) + events = iter(events_from_file(file1)) self.assertEqual('brain.Event:2', next(events).file_version) self.assertEqual(1, next(events).step) + self.assertEqual(1, next(events).step) + self.assertRaises(StopIteration, lambda: next(events)) + events = iter(events_from_file(file2)) + self.assertEqual('brain.Event:2', next(events).file_version) + self.assertEqual(2, next(events).step) self.assertEqual(2, next(events).step) self.assertRaises(StopIteration, lambda: next(events)) - # Second file has tag "three" - events = iter(events_from_file(next(event_files))) - self.assertEqual('brain.Event:2', next(events).file_version) - self.assertEqual(3, next(events).step) - self.assertRaises(StopIteration, lambda: next(events)) + def testNoSharing_fromFunction(self): + logdir = self.get_temp_dir() + @def_function.function + def f1(): + if not hasattr(f1, 'writer'): + f1.writer = summary_ops.create_file_writer_v2(logdir) + with f1.writer.as_default(): + summary_ops.write('tag', 1, step=1) + @def_function.function + def f2(): + if not hasattr(f2, 'writer'): + f2.writer = summary_ops.create_file_writer_v2(logdir) + with f2.writer.as_default(): + summary_ops.write('tag', 1, step=2) + with context.eager_mode(): + f1() + event_files = gfile.Glob(os.path.join(logdir, '*')) + self.assertEqual(1, len(event_files)) + file1 = event_files[0] - # No more files - self.assertRaises(StopIteration, lambda: next(event_files)) + f2() + event_files = gfile.Glob(os.path.join(logdir, '*')) + self.assertEqual(2, len(event_files)) + event_files.remove(file1) + file2 = event_files[0] + + # Extra writes to ensure interleaved usage works. + f1() + f2() + + events = iter(events_from_file(file1)) + self.assertEqual('brain.Event:2', next(events).file_version) + self.assertEqual(1, next(events).step) + self.assertEqual(1, next(events).step) + self.assertRaises(StopIteration, lambda: next(events)) + events = iter(events_from_file(file2)) + self.assertEqual('brain.Event:2', next(events).file_version) + self.assertEqual(2, next(events).step) + self.assertEqual(2, next(events).step) + self.assertRaises(StopIteration, lambda: next(events)) def testMaxQueue(self): logdir = self.get_temp_dir() with context.eager_mode(): - with summary_ops.create_file_writer( + with summary_ops.create_file_writer_v2( logdir, max_queue=1, flush_millis=999999).as_default(): get_total = lambda: len(events_from_logdir(logdir)) # Note: First tf.Event is always file_version. @@ -396,7 +476,7 @@ class SummaryWriterTest(test_util.TensorFlowTestCase): logdir = self.get_temp_dir() get_total = lambda: len(events_from_logdir(logdir)) with context.eager_mode(): - writer = summary_ops.create_file_writer( + writer = summary_ops.create_file_writer_v2( logdir, max_queue=1000, flush_millis=1000000) self.assertEqual(1, get_total()) # file_version Event with writer.as_default(): @@ -412,7 +492,7 @@ class SummaryWriterTest(test_util.TensorFlowTestCase): def testFlushFunction(self): logdir = self.get_temp_dir() with context.eager_mode(): - writer = summary_ops.create_file_writer( + writer = summary_ops.create_file_writer_v2( logdir, max_queue=999999, flush_millis=999999) with writer.as_default(), summary_ops.always_record_summaries(): get_total = lambda: len(events_from_logdir(logdir)) @@ -436,9 +516,24 @@ class SummaryWriterTest(test_util.TensorFlowTestCase): @test_util.assert_no_new_pyobjects_executing_eagerly def testEagerMemory(self): logdir = self.get_temp_dir() - with summary_ops.create_file_writer(logdir).as_default(): + with summary_ops.create_file_writer_v2(logdir).as_default(): summary_ops.write('tag', 1, step=0) + def testClose_preventsLaterUse(self): + logdir = self.get_temp_dir() + with context.eager_mode(): + writer = summary_ops.create_file_writer_v2(logdir) + writer.close() + writer.close() # redundant close() is a no-op + writer.flush() # redundant flush() is a no-op + with self.assertRaisesRegex(RuntimeError, 'already closed'): + writer.init() + with self.assertRaisesRegex(RuntimeError, 'already closed'): + with writer.as_default(): + self.fail('should not get here') + with self.assertRaisesRegex(RuntimeError, 'already closed'): + writer.set_as_default() + def testClose_closesOpenFile(self): try: import psutil # pylint: disable=g-import-not-at-top @@ -448,7 +543,7 @@ class SummaryWriterTest(test_util.TensorFlowTestCase): get_open_filenames = lambda: set(info[0] for info in proc.open_files()) logdir = self.get_temp_dir() with context.eager_mode(): - writer = summary_ops.create_file_writer(logdir) + writer = summary_ops.create_file_writer_v2(logdir) files = gfile.Glob(os.path.join(logdir, '*')) self.assertEqual(1, len(files)) eventfile = files[0] @@ -465,7 +560,7 @@ class SummaryWriterTest(test_util.TensorFlowTestCase): get_open_filenames = lambda: set(info[0] for info in proc.open_files()) logdir = self.get_temp_dir() with context.eager_mode(): - writer = summary_ops.create_file_writer(logdir) + writer = summary_ops.create_file_writer_v2(logdir) files = gfile.Glob(os.path.join(logdir, '*')) self.assertEqual(1, len(files)) eventfile = files[0] diff --git a/tensorflow/python/ops/summary_ops_v2.py b/tensorflow/python/ops/summary_ops_v2.py index cde00c766e9..bb570bafeec 100644 --- a/tensorflow/python/ops/summary_ops_v2.py +++ b/tensorflow/python/ops/summary_ops_v2.py @@ -19,6 +19,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import abc +import functools import getpass import os import re @@ -34,6 +36,7 @@ from tensorflow.python.framework import constant_op from tensorflow.python.framework import dtypes from tensorflow.python.framework import ops from tensorflow.python.framework import smart_cond +from tensorflow.python.framework import tensor_util from tensorflow.python.ops import array_ops from tensorflow.python.ops import control_flow_ops from tensorflow.python.ops import gen_summary_ops @@ -124,65 +127,120 @@ def never_record_summaries(): @tf_export("summary.SummaryWriter", v1=[]) +@six.add_metaclass(abc.ABCMeta) class SummaryWriter(object): - """Encapsulates a stateful summary writer resource. + """Interface representing a stateful summary writer object.""" - See also: - - `tf.summary.create_file_writer` - - `tf.summary.create_db_writer` - """ + @abc.abstractmethod + def set_as_default(self): + """Enables this summary writer for the current thread.""" + raise NotImplementedError() - def __init__(self, resource, init_op_fn): - self._resource = resource - # TODO(nickfelt): cache constructed ops in graph mode + @abc.abstractmethod + @tf_contextlib.contextmanager + def as_default(self): + """Returns a context manager that enables summary writing.""" + raise NotImplementedError() + + def init(self): + """Initializes the summary writer.""" + raise NotImplementedError() + + def flush(self): + """Flushes any buffered data.""" + raise NotImplementedError() + + def close(self): + """Flushes and closes the summary writer.""" + raise NotImplementedError() + + +class ResourceSummaryWriter(SummaryWriter): + """Implementation of SummaryWriter using a SummaryWriterInterface resource.""" + + def __init__(self, shared_name, init_op_fn, name=None, v2=False): + self._resource = gen_summary_ops.summary_writer( + shared_name=shared_name, name=name) + # TODO(nickfelt): cache other constructed ops in graph mode self._init_op_fn = init_op_fn - if context.executing_eagerly() and self._resource is not None: + self._init_op = init_op_fn(self._resource) + self._v2 = v2 + self._closed = False + if context.executing_eagerly(): self._resource_deleter = resource_variable_ops.EagerResourceDeleter( handle=self._resource, handle_device="cpu:0") + else: + global _SUMMARY_WRITER_INIT_OP + key = ops.get_default_graph()._graph_key # pylint: disable=protected-access + _SUMMARY_WRITER_INIT_OP.setdefault(key, []).append(self._init_op) def set_as_default(self): """Enables this summary writer for the current thread.""" + if self._v2 and context.executing_eagerly() and self._closed: + raise RuntimeError("SummaryWriter is already closed") context.context().summary_writer_resource = self._resource @tf_contextlib.contextmanager def as_default(self): - """Enables summary writing within a `with` block.""" - if self._resource is None: + """Returns a context manager that enables summary writing.""" + if self._v2 and context.executing_eagerly() and self._closed: + raise RuntimeError("SummaryWriter is already closed") + old = context.context().summary_writer_resource + try: + context.context().summary_writer_resource = self._resource yield self - else: - old = context.context().summary_writer_resource - try: - context.context().summary_writer_resource = self._resource - yield self - # Flushes the summary writer in eager mode or in graph functions, but - # not in legacy graph mode (you're on your own there). - with ops.device("cpu:0"): - gen_summary_ops.flush_summary_writer(self._resource) - finally: - context.context().summary_writer_resource = old + # Flushes the summary writer in eager mode or in graph functions, but + # not in legacy graph mode (you're on your own there). + self.flush() + finally: + context.context().summary_writer_resource = old def init(self): - """Operation to initialize the summary writer resource.""" - if self._resource is not None: - return self._init_op_fn() - - def _flush(self): - return _flush_fn(writer=self) + """Initializes the summary writer.""" + if self._v2: + if context.executing_eagerly() and self._closed: + raise RuntimeError("SummaryWriter is already closed") + return self._init_op + # Legacy behavior allows re-initializing the resource. + return self._init_op_fn(self._resource) def flush(self): - """Operation to force the summary writer to flush any buffered data.""" - if self._resource is not None: - return self._flush() - - def _close(self): - with ops.control_dependencies([self.flush()]): - with ops.device("cpu:0"): - return gen_summary_ops.close_summary_writer(self._resource) + """Flushes any buffered data.""" + if self._v2 and context.executing_eagerly() and self._closed: + return + return _flush_fn(writer=self) def close(self): - """Operation to flush and close the summary writer resource.""" - if self._resource is not None: - return self._close() + """Flushes and closes the summary writer.""" + if self._v2 and context.executing_eagerly() and self._closed: + return + try: + with ops.control_dependencies([self.flush()]): + with ops.device("cpu:0"): + return gen_summary_ops.close_summary_writer(self._resource) + finally: + if self._v2 and context.executing_eagerly(): + self._closed = True + + +class NoopSummaryWriter(SummaryWriter): + """A summary writer that does nothing, for create_noop_writer().""" + + def set_as_default(self): + pass + + @tf_contextlib.contextmanager + def as_default(self): + yield + + def init(self): + pass + + def flush(self): + pass + + def close(self): + pass @tf_export(v1=["summary.initialize"]) @@ -228,6 +286,66 @@ def initialize( @tf_export("summary.create_file_writer", v1=[]) +def create_file_writer_v2(logdir, + max_queue=None, + flush_millis=None, + filename_suffix=None, + name=None): + """Creates a summary file writer for the given log directory. + + Args: + logdir: a string specifying the directory in which to write an event file. + max_queue: the largest number of summaries to keep in a queue; will + flush once the queue gets bigger than this. Defaults to 10. + flush_millis: the largest interval between flushes. Defaults to 120,000. + filename_suffix: optional suffix for the event file name. Defaults to `.v2`. + name: a name for the op that creates the writer. + + Returns: + A SummaryWriter object. + """ + if logdir is None: + raise ValueError("logdir cannot be None") + inside_function = ops.inside_function() + with ops.name_scope(name, "create_file_writer") as scope, ops.device("cpu:0"): + # Run init inside an init_scope() to hoist it out of tf.functions. + with ops.init_scope(): + if context.executing_eagerly(): + _check_create_file_writer_args( + inside_function, + logdir=logdir, + max_queue=max_queue, + flush_millis=flush_millis, + filename_suffix=filename_suffix) + logdir = ops.convert_to_tensor(logdir, dtype=dtypes.string) + if max_queue is None: + max_queue = constant_op.constant(10) + if flush_millis is None: + flush_millis = constant_op.constant(2 * 60 * 1000) + if filename_suffix is None: + filename_suffix = constant_op.constant(".v2") + # Prepend the PID and a process-local UID to the filename suffix to avoid + # filename collisions within the machine (the filename already contains + # the hostname to avoid cross-machine collisions). + unique_prefix = constant_op.constant(".%s.%s" % (os.getpid(), ops.uid())) + filename_suffix = unique_prefix + filename_suffix + # Use a unique shared_name to prevent resource sharing. + if context.executing_eagerly(): + shared_name = context.shared_name() + else: + shared_name = ops._name_from_scope_name(scope) # pylint: disable=protected-access + return ResourceSummaryWriter( + shared_name=shared_name, + init_op_fn=functools.partial( + gen_summary_ops.create_summary_file_writer, + logdir=logdir, + max_queue=max_queue, + flush_millis=flush_millis, + filename_suffix=filename_suffix), + name=name, + v2=True) + + def create_file_writer(logdir, max_queue=None, flush_millis=None, @@ -254,7 +372,7 @@ def create_file_writer(logdir, summary writer. """ if logdir is None: - return SummaryWriter(None, None) + return NoopSummaryWriter() logdir = str(logdir) with ops.device("cpu:0"): if max_queue is None: @@ -265,13 +383,14 @@ def create_file_writer(logdir, filename_suffix = constant_op.constant(".v2") if name is None: name = "logdir:" + logdir - return _make_summary_writer( - name, - gen_summary_ops.create_summary_file_writer, - logdir=logdir, - max_queue=max_queue, - flush_millis=flush_millis, - filename_suffix=filename_suffix) + return ResourceSummaryWriter( + shared_name=name, + init_op_fn=functools.partial( + gen_summary_ops.create_summary_file_writer, + logdir=logdir, + max_queue=max_queue, + flush_millis=flush_millis, + filename_suffix=filename_suffix)) def create_db_writer(db_uri, @@ -316,26 +435,23 @@ def create_db_writer(db_uri, "experiment_name", _EXPERIMENT_NAME_PATTERNS, experiment_name) run_name = _cleanse_string("run_name", _RUN_NAME_PATTERNS, run_name) user_name = _cleanse_string("user_name", _USER_NAME_PATTERNS, user_name) - return _make_summary_writer( - name, - gen_summary_ops.create_summary_db_writer, - db_uri=db_uri, - experiment_name=experiment_name, - run_name=run_name, - user_name=user_name) + return ResourceSummaryWriter( + shared_name=name, + init_op_fn=functools.partial( + gen_summary_ops.create_summary_db_writer, + db_uri=db_uri, + experiment_name=experiment_name, + run_name=run_name, + user_name=user_name)) -def _make_summary_writer(name, factory, **kwargs): - resource = gen_summary_ops.summary_writer(shared_name=name) - init_op_fn = lambda: factory(resource, **kwargs) - init_op = init_op_fn() - if not context.executing_eagerly(): - # TODO(apassos): Consider doing this instead. - # ops.get_default_session().run(init_op) - global _SUMMARY_WRITER_INIT_OP - key = ops.get_default_graph()._graph_key # pylint: disable=protected-access - _SUMMARY_WRITER_INIT_OP.setdefault(key, []).append(init_op) - return SummaryWriter(resource, init_op_fn) +@tf_export("summary.create_noop_writer", v1=[]) +def create_noop_writer(): + """Returns a summary writer that does nothing. + + This is useful as a placeholder in code that expects a context manager. + """ + return NoopSummaryWriter() def _cleanse_string(name, pattern, value): @@ -436,7 +552,7 @@ def write(tag, tensor, step, metadata=None, name=None): tag: string tag used to identify the summary (e.g. in TensorBoard), usually generated with `tf.summary.summary_scope` tensor: the Tensor holding the summary data to write - step: `int64`-castable monotic step value for this summary + step: `int64`-castable monotonic step value for this summary metadata: Optional SummaryMetadata, as a proto or serialized bytes name: Optional string name for this op. @@ -734,6 +850,30 @@ def _choose_step(step): return step +def _check_create_file_writer_args(inside_function, **kwargs): + """Helper to check the validity of arguments to a create_file_writer() call. + + Args: + inside_function: whether the create_file_writer() call is in a tf.function + **kwargs: the arguments to check, as kwargs to give them names. + + Raises: + ValueError: if the arguments are graph tensors. + """ + for arg_name, arg in kwargs.items(): + if not isinstance(arg, ops.EagerTensor) and tensor_util.is_tensor(arg): + if inside_function: + raise ValueError( + "Invalid graph Tensor argument \"%s=%s\" to create_file_writer() " + "inside an @tf.function. The create call will be lifted into the " + "outer eager execution context, so it cannot consume graph tensors " + "defined inside the function body." % (arg_name, arg)) + else: + raise ValueError( + "Invalid graph Tensor argument \"%s=%s\" to eagerly executed " + "create_file_writer()." % (arg_name, arg)) + + def run_metadata(name, data, step): """Writes entire RunMetadata summary. diff --git a/tensorflow/tools/api/golden/v2/tensorflow.summary.-summary-writer.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.summary.-summary-writer.pbtxt index 6715c14e168..8f45ecbf776 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.summary.-summary-writer.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.summary.-summary-writer.pbtxt @@ -4,7 +4,6 @@ tf_class { is_instance: "" member_method { name: "__init__" - argspec: "args=[\'self\', \'resource\', \'init_op_fn\'], varargs=None, keywords=None, defaults=None" } member_method { name: "as_default" diff --git a/tensorflow/tools/api/golden/v2/tensorflow.summary.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.summary.pbtxt index 85edef9d7e4..bdc9c83553c 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.summary.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.summary.pbtxt @@ -12,6 +12,10 @@ tf_module { name: "create_file_writer" argspec: "args=[\'logdir\', \'max_queue\', \'flush_millis\', \'filename_suffix\', \'name\'], varargs=None, keywords=None, defaults=[\'None\', \'None\', \'None\', \'None\'], " } + member_method { + name: "create_noop_writer" + argspec: "args=[], varargs=None, keywords=None, defaults=None" + } member_method { name: "flush" argspec: "args=[\'writer\', \'name\'], varargs=None, keywords=None, defaults=[\'None\', \'None\'], "