Fix EventFileWriter to surface writer thread exceptions and avoid deadlock
Fixes the deadlock issue where the background thread used by EventFileWriter could die silently and leave the calling code that uses EventFileWriter blocked. The specific issue was blocking in flush() due to the queue.join() call that would never return, but the issue could also arise in close() since it calls flush(), as well as in add_event() since queue.put() would block forever once the queue fills up with no consumer alive to empty it. We fix these issues via a combination of techniques: 1) Replace the standard library queue with a fork that strips out unused functionality (like task completion tracking, non-blocking get/put, and inheritance) and adds new close() functionality that does what we need - causes any pending or future calls to put() to raise an exception. This solves the deadlock issue for add_event() and ensures we can safely do blocking enqueues without risking deadlock. 2) Re-implement flush() to avoid using queue.join() and task completion tracking. Instead, we change flush() to use a sentinel queue element, akin to close(), indicate the point in the queue we wish to flush until, and then to use a threading.Event to block on completion. This way we can notify the main thread either when the flush completes successfully or when the writer thread exits unexpectedly and solve the deadlock issue in flush(). 3) Add logic to propagate exceptions encountered by the writer thread, so that after it dies, the next call to add_event() or flush()/close() that encounters the closed queue can discover the exception and reraise it to the calling code. Lastly, I added four tests and confirmed that prior to these changes, they time out on deadlock (except for `raisedFromAddEvent` which fails at assertRaises, which is expected since it tests behavior that was made newly strict in the fixed code), and after these changes, they pass with `--runs_per_test=100`. PiperOrigin-RevId: 293278306 Change-Id: Ib2ee8bbd34e31ed64d639a1b13eff094a8a7b971
This commit is contained in:
parent
bc09867768
commit
ab0e8d2629
@ -18,15 +18,17 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import collections
|
||||
import os.path
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
from tensorflow.core.util import event_pb2
|
||||
from tensorflow.python import _pywrap_events_writer
|
||||
from tensorflow.python.platform import gfile
|
||||
from tensorflow.python.platform import tf_logging as logging
|
||||
from tensorflow.python.util import compat
|
||||
|
||||
|
||||
@ -36,6 +38,8 @@ class EventFileWriter(object):
|
||||
The `EventFileWriter` class creates an event file in the specified directory,
|
||||
and asynchronously writes Event protocol buffers to the file. The Event file
|
||||
is encoded using the tfrecord format, which is similar to RecordIO.
|
||||
|
||||
This class is not thread-safe.
|
||||
"""
|
||||
|
||||
def __init__(self, logdir, max_queue=10, flush_secs=120,
|
||||
@ -65,23 +69,32 @@ class EventFileWriter(object):
|
||||
self._logdir = str(logdir)
|
||||
if not gfile.IsDirectory(self._logdir):
|
||||
gfile.MakeDirs(self._logdir)
|
||||
self._event_queue = six.moves.queue.Queue(max_queue)
|
||||
self._max_queue = max_queue
|
||||
self._flush_secs = flush_secs
|
||||
self._flush_complete = threading.Event()
|
||||
self._flush_sentinel = object()
|
||||
self._close_sentinel = object()
|
||||
self._ev_writer = _pywrap_events_writer.EventsWriter(
|
||||
compat.as_bytes(os.path.join(self._logdir, "events")))
|
||||
self._flush_secs = flush_secs
|
||||
self._sentinel_event = self._get_sentinel_event()
|
||||
if filename_suffix:
|
||||
self._ev_writer.InitWithSuffix(compat.as_bytes(filename_suffix))
|
||||
self._initialize()
|
||||
self._closed = False
|
||||
|
||||
def _initialize(self):
|
||||
"""Initializes or re-initializes the queue and writer thread.
|
||||
|
||||
The EventsWriter itself does not need to be re-initialized explicitly,
|
||||
because it will auto-initialize itself if used after being closed.
|
||||
"""
|
||||
self._event_queue = CloseableQueue(self._max_queue)
|
||||
self._worker = _EventLoggerThread(self._event_queue, self._ev_writer,
|
||||
self._flush_secs, self._sentinel_event)
|
||||
self._flush_secs, self._flush_complete,
|
||||
self._flush_sentinel,
|
||||
self._close_sentinel)
|
||||
|
||||
self._worker.start()
|
||||
|
||||
def _get_sentinel_event(self):
|
||||
"""Generate a sentinel event for terminating worker."""
|
||||
return event_pb2.Event()
|
||||
|
||||
def get_logdir(self):
|
||||
"""Returns the directory where event file will be written."""
|
||||
return self._logdir
|
||||
@ -95,9 +108,7 @@ class EventFileWriter(object):
|
||||
Does nothing if the EventFileWriter was not closed.
|
||||
"""
|
||||
if self._closed:
|
||||
self._worker = _EventLoggerThread(self._event_queue, self._ev_writer,
|
||||
self._flush_secs, self._sentinel_event)
|
||||
self._worker.start()
|
||||
self._initialize()
|
||||
self._closed = False
|
||||
|
||||
def add_event(self, event):
|
||||
@ -107,7 +118,23 @@ class EventFileWriter(object):
|
||||
event: An `Event` protocol buffer.
|
||||
"""
|
||||
if not self._closed:
|
||||
self._event_queue.put(event)
|
||||
self._try_put(event)
|
||||
|
||||
def _try_put(self, item):
|
||||
"""Attempts to enqueue an item to the event queue.
|
||||
|
||||
If the queue is closed, this will close the EventFileWriter and reraise the
|
||||
exception that caused the queue closure, if one exists.
|
||||
|
||||
Args:
|
||||
item: the item to enqueue
|
||||
"""
|
||||
try:
|
||||
self._event_queue.put(item)
|
||||
except QueueClosedError:
|
||||
self._internal_close()
|
||||
if self._worker.failure_exc_info:
|
||||
six.reraise(*self._worker.failure_exc_info) # pylint: disable=no-value-for-parameter
|
||||
|
||||
def flush(self):
|
||||
"""Flushes the event file to disk.
|
||||
@ -115,58 +142,163 @@ class EventFileWriter(object):
|
||||
Call this method to make sure that all pending events have been written to
|
||||
disk.
|
||||
"""
|
||||
self._event_queue.join()
|
||||
self._ev_writer.Flush()
|
||||
if not self._closed:
|
||||
# Request a flush operation by enqueing a sentinel and then waiting for
|
||||
# the writer thread to mark the flush as complete.
|
||||
self._flush_complete.clear()
|
||||
self._try_put(self._flush_sentinel)
|
||||
self._flush_complete.wait()
|
||||
if self._worker.failure_exc_info:
|
||||
self._internal_close()
|
||||
six.reraise(*self._worker.failure_exc_info) # pylint: disable=no-value-for-parameter
|
||||
|
||||
def close(self):
|
||||
"""Flushes the event file to disk and close the file.
|
||||
|
||||
Call this method when you do not need the summary writer anymore.
|
||||
"""
|
||||
self.add_event(self._sentinel_event)
|
||||
self.flush()
|
||||
if not self._closed:
|
||||
self.flush()
|
||||
self._try_put(self._close_sentinel)
|
||||
self._internal_close()
|
||||
|
||||
def _internal_close(self):
|
||||
self._closed = True
|
||||
self._worker.join()
|
||||
self._ev_writer.Close()
|
||||
self._closed = True
|
||||
|
||||
|
||||
class _EventLoggerThread(threading.Thread):
|
||||
"""Thread that logs events."""
|
||||
|
||||
def __init__(self, queue, ev_writer, flush_secs, sentinel_event):
|
||||
def __init__(self, queue, ev_writer, flush_secs, flush_complete,
|
||||
flush_sentinel, close_sentinel):
|
||||
"""Creates an _EventLoggerThread.
|
||||
|
||||
Args:
|
||||
queue: A Queue from which to dequeue events.
|
||||
queue: A CloseableQueue from which to dequeue events. The queue will be
|
||||
closed just before the thread exits, whether due to `close_sentinel` or
|
||||
any exception raised in the writing loop.
|
||||
ev_writer: An event writer. Used to log brain events for
|
||||
the visualizer.
|
||||
the visualizer.
|
||||
flush_secs: How often, in seconds, to flush the
|
||||
pending file to disk.
|
||||
sentinel_event: A sentinel element in queue that tells this thread to
|
||||
terminate.
|
||||
flush_complete: A threading.Event that will be set whenever a flush
|
||||
operation requested via `flush_sentinel` has been completed.
|
||||
flush_sentinel: A sentinel element in queue that tells this thread to
|
||||
flush the writer and mark the current flush operation complete.
|
||||
close_sentinel: A sentinel element in queue that tells this thread to
|
||||
terminate and close the queue.
|
||||
"""
|
||||
threading.Thread.__init__(self)
|
||||
threading.Thread.__init__(self, name="EventLoggerThread")
|
||||
self.daemon = True
|
||||
self._queue = queue
|
||||
self._ev_writer = ev_writer
|
||||
self._flush_secs = flush_secs
|
||||
# The first event will be flushed immediately.
|
||||
self._next_event_flush_time = 0
|
||||
self._sentinel_event = sentinel_event
|
||||
self._flush_complete = flush_complete
|
||||
self._flush_sentinel = flush_sentinel
|
||||
self._close_sentinel = close_sentinel
|
||||
# Populated when writing logic raises an exception and kills the thread.
|
||||
self.failure_exc_info = ()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
event = self._queue.get()
|
||||
if event is self._sentinel_event:
|
||||
self._queue.task_done()
|
||||
break
|
||||
try:
|
||||
self._ev_writer.WriteEvent(event)
|
||||
# Flush the event writer every so often.
|
||||
now = time.time()
|
||||
if now > self._next_event_flush_time:
|
||||
try:
|
||||
while True:
|
||||
event = self._queue.get()
|
||||
if event is self._close_sentinel:
|
||||
return
|
||||
elif event is self._flush_sentinel:
|
||||
self._ev_writer.Flush()
|
||||
# Do it again in two minutes.
|
||||
self._next_event_flush_time = now + self._flush_secs
|
||||
finally:
|
||||
self._queue.task_done()
|
||||
self._flush_complete.set()
|
||||
else:
|
||||
self._ev_writer.WriteEvent(event)
|
||||
# Flush the event writer every so often.
|
||||
now = time.time()
|
||||
if now > self._next_event_flush_time:
|
||||
self._ev_writer.Flush()
|
||||
self._next_event_flush_time = now + self._flush_secs
|
||||
except Exception as e:
|
||||
logging.error("EventFileWriter writer thread error: %s", e)
|
||||
self.failure_exc_info = sys.exc_info()
|
||||
raise
|
||||
finally:
|
||||
# When exiting the thread, always complete any pending flush operation
|
||||
# (to unblock flush() calls) and close the queue (to unblock add_event()
|
||||
# calls, including those used by flush() and close()), which ensures that
|
||||
# code using EventFileWriter doesn't deadlock if this thread dies.
|
||||
self._flush_complete.set()
|
||||
self._queue.close()
|
||||
|
||||
|
||||
class CloseableQueue(object):
|
||||
"""Stripped-down fork of the standard library Queue that is closeable."""
|
||||
|
||||
def __init__(self, maxsize=0):
|
||||
"""Create a queue object with a given maximum size.
|
||||
|
||||
Args:
|
||||
maxsize: int size of queue. If <= 0, the queue size is infinite.
|
||||
"""
|
||||
self._maxsize = maxsize
|
||||
self._queue = collections.deque()
|
||||
self._closed = False
|
||||
# Mutex must be held whenever queue is mutating; shared by conditions.
|
||||
self._mutex = threading.Lock()
|
||||
# Notify not_empty whenever an item is added to the queue; a
|
||||
# thread waiting to get is notified then.
|
||||
self._not_empty = threading.Condition(self._mutex)
|
||||
# Notify not_full whenever an item is removed from the queue;
|
||||
# a thread waiting to put is notified then.
|
||||
self._not_full = threading.Condition(self._mutex)
|
||||
|
||||
def get(self):
|
||||
"""Remove and return an item from the queue.
|
||||
|
||||
If the queue is empty, blocks until an item is available.
|
||||
|
||||
Returns:
|
||||
an item from the queue
|
||||
"""
|
||||
with self._not_empty:
|
||||
while not self._queue:
|
||||
self._not_empty.wait()
|
||||
item = self._queue.popleft()
|
||||
self._not_full.notify()
|
||||
return item
|
||||
|
||||
def put(self, item):
|
||||
"""Put an item into the queue.
|
||||
|
||||
If the queue is closed, fails immediately.
|
||||
|
||||
If the queue is full, blocks until space is available or until the queue
|
||||
is closed by a call to close(), at which point this call fails.
|
||||
|
||||
Args:
|
||||
item: an item to add to the queue
|
||||
|
||||
Raises:
|
||||
QueueClosedError: if insertion failed because the queue is closed
|
||||
"""
|
||||
with self._not_full:
|
||||
if self._closed:
|
||||
raise QueueClosedError()
|
||||
if self._maxsize > 0:
|
||||
while len(self._queue) == self._maxsize:
|
||||
self._not_full.wait()
|
||||
if self._closed:
|
||||
raise QueueClosedError()
|
||||
self._queue.append(item)
|
||||
self._not_empty.notify()
|
||||
|
||||
def close(self):
|
||||
"""Closes the queue, causing any pending or future `put()` calls to fail."""
|
||||
with self._not_full:
|
||||
self._closed = True
|
||||
self._not_full.notify_all()
|
||||
|
||||
|
||||
class QueueClosedError(Exception):
|
||||
"""Raised when CloseableQueue.put() fails because the queue is closed."""
|
||||
|
@ -293,6 +293,8 @@ class FileWriter(SummaryToEventTransformer):
|
||||
instead forms a compatibility layer over new graph-based summaries
|
||||
(`tf.contrib.summary`) to facilitate the use of new summary writing with
|
||||
pre-existing code that expects a `FileWriter` instance.
|
||||
|
||||
This class is not thread-safe.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
|
@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# ==============================================================================
|
||||
"""Tests for training_coordinator.py."""
|
||||
"""Tests for tensorflow.python.summary.writer."""
|
||||
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
@ -21,6 +21,7 @@ from __future__ import print_function
|
||||
import glob
|
||||
import os.path
|
||||
import shutil
|
||||
import threading
|
||||
import time
|
||||
import warnings
|
||||
|
||||
@ -46,7 +47,7 @@ from tensorflow.python.summary.writer import writer_cache
|
||||
from tensorflow.python.util import compat
|
||||
|
||||
|
||||
class FileWriterTestCase(test.TestCase):
|
||||
class FileWriterTestBase(object):
|
||||
|
||||
def _FileWriter(self, *args, **kwargs):
|
||||
return writer.FileWriter(*args, **kwargs)
|
||||
@ -459,7 +460,87 @@ class FileWriterTestCase(test.TestCase):
|
||||
self.assertEqual(content, "bar!")
|
||||
|
||||
|
||||
class SessionBasedFileWriterTestCase(FileWriterTestCase):
|
||||
class FakeWriteError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FileWriterTestCase(FileWriterTestBase, test.TestCase):
|
||||
|
||||
@test_util.run_deprecated_v1
|
||||
def testWriterException_raisedFromFlush(self):
|
||||
test_dir = self.get_temp_dir()
|
||||
sw = self._FileWriter(test_dir)
|
||||
writer_thread = sw.event_writer._worker
|
||||
with test.mock.patch.object(
|
||||
writer_thread, "_ev_writer", autospec=True) as mock_writer:
|
||||
# Coordinate threads to ensure both events are added before the writer
|
||||
# thread dies, to avoid the second add_event() failing instead of flush().
|
||||
second_event_added = threading.Event()
|
||||
def _FakeWriteEvent(event):
|
||||
del event # unused
|
||||
second_event_added.wait()
|
||||
raise FakeWriteError()
|
||||
mock_writer.WriteEvent.side_effect = _FakeWriteEvent
|
||||
sw.add_event(event_pb2.Event())
|
||||
sw.add_event(event_pb2.Event())
|
||||
second_event_added.set()
|
||||
with self.assertRaises(FakeWriteError):
|
||||
sw.flush()
|
||||
|
||||
@test_util.run_deprecated_v1
|
||||
def testWriterException_raisedFromClose(self):
|
||||
test_dir = self.get_temp_dir()
|
||||
sw = self._FileWriter(test_dir)
|
||||
writer_thread = sw.event_writer._worker
|
||||
with test.mock.patch.object(
|
||||
writer_thread, "_ev_writer", autospec=True) as mock_writer:
|
||||
mock_writer.WriteEvent.side_effect = FakeWriteError()
|
||||
sw.add_event(event_pb2.Event())
|
||||
with self.assertRaises(FakeWriteError):
|
||||
sw.close()
|
||||
|
||||
@test_util.run_deprecated_v1
|
||||
def testWriterException_raisedFromAddEvent(self):
|
||||
test_dir = self.get_temp_dir()
|
||||
sw = self._FileWriter(test_dir)
|
||||
writer_thread = sw.event_writer._worker
|
||||
with test.mock.patch.object(
|
||||
writer_thread, "_ev_writer", autospec=True) as mock_writer:
|
||||
mock_writer.WriteEvent.side_effect = FakeWriteError()
|
||||
sw.add_event(event_pb2.Event())
|
||||
# Wait for writer thread to exit first, then try to add a new event.
|
||||
writer_thread.join()
|
||||
with self.assertRaises(FakeWriteError):
|
||||
sw.add_event(event_pb2.Event())
|
||||
|
||||
@test_util.run_deprecated_v1
|
||||
def testWriterException_raisedFromPendingAddEvent(self):
|
||||
test_dir = self.get_temp_dir()
|
||||
# Set max_queue=1 to allow the third add_event() call to block (first event
|
||||
# is consumed immediately, the second fills the queue, the third blocks).
|
||||
sw = self._FileWriter(test_dir, max_queue=1)
|
||||
writer_thread = sw.event_writer._worker
|
||||
with test.mock.patch.object(
|
||||
writer_thread, "_ev_writer", autospec=True) as mock_writer:
|
||||
# Coordinate threads to ensure the first two events are added and then
|
||||
# the writer thread sleeps briefly before exiting, to maximize the chance
|
||||
# that the third add_event() reaches the pending blocked state before the
|
||||
# queue closes on writer thread exit, since that's what we want to test.
|
||||
second_event_added = threading.Event()
|
||||
def _FakeWriteEvent(event):
|
||||
del event # unused
|
||||
second_event_added.wait()
|
||||
time.sleep(0.1)
|
||||
raise FakeWriteError()
|
||||
mock_writer.WriteEvent.side_effect = _FakeWriteEvent
|
||||
sw.add_event(event_pb2.Event())
|
||||
sw.add_event(event_pb2.Event())
|
||||
second_event_added.set()
|
||||
with self.assertRaises(FakeWriteError):
|
||||
sw.add_event(event_pb2.Event())
|
||||
|
||||
|
||||
class SessionBasedFileWriterTestCase(FileWriterTestBase, test.TestCase):
|
||||
"""Tests for FileWriter behavior when passed a Session argument."""
|
||||
|
||||
def _FileWriter(self, *args, **kwargs):
|
||||
|
Loading…
x
Reference in New Issue
Block a user