Refactor DirectoryWatcher to take in a path provider as input.

This will allow us to watchings things that aren't local files on disk, such as
GCS directories.
Change: 115708990
This commit is contained in:
A. Unique TensorFlower 2016-02-26 14:24:22 -08:00 committed by TensorFlower Gardener
parent 6390a06d80
commit 6f74cd15a6
4 changed files with 100 additions and 53 deletions

View File

@ -524,8 +524,10 @@ def _GeneratorFromPath(path):
"""Create an event generator for file or directory at given path string."""
loader_factory = event_file_loader.EventFileLoader
if gfile.IsDirectory(path):
return directory_watcher.DirectoryWatcher(path, loader_factory,
IsTensorFlowEventsFile)
provider = directory_watcher.SequentialGFileProvider(
path,
path_filter=IsTensorFlowEventsFile)
return directory_watcher.DirectoryWatcher(provider, loader_factory)
else:
return loader_factory(path)

View File

@ -25,52 +25,56 @@ from tensorflow.python.platform import logging
class DirectoryWatcher(object):
"""A DirectoryWatcher wraps a loader to load from a directory.
"""A DirectoryWatcher wraps a loader to load from a sequence of paths.
A loader reads a file on disk and produces some kind of values as an
iterator. A DirectoryWatcher takes a directory with one file at a time being
written to and a factory for loaders and watches all the files at once.
A loader reads a path and produces some kind of values as an iterator. A
DirectoryWatcher takes a directory, a path provider (see below) to call to
find new paths to load from, and a factory for loaders and watches all the
paths inside that directory.
A path provider is a function that, given either a path or None, returns the
next path to load from (or None if there is no such path). This class is only
valid under the assumption that only one path will be written to by the data
source at a time, and that the path_provider will return the oldest data
source that contains fresh data.
This class is *only* valid under the assumption that files are never removed
and the only file ever changed is whichever one is lexicographically last.
"""
def __init__(self, directory, loader_factory, path_filter=lambda x: True):
def __init__(self, path_provider, loader_factory):
"""Constructs a new DirectoryWatcher.
Args:
directory: The directory to watch. The directory doesn't have to exist.
path_provider: The callback to invoke when trying to find a new path to
load from. See the class documentation for the semantics of a path
provider.
loader_factory: A factory for creating loaders. The factory should take a
file path and return an object that has a Load method returning an
path and return an object that has a Load method returning an
iterator that will yield all events that have not been yielded yet.
path_filter: Only files whose full path matches this predicate will be
loaded. If not specified, all files are loaded.
Raises:
ValueError: If directory or loader_factory is None.
ValueError: If path_provider or loader_factory are None.
"""
if directory is None:
raise ValueError('A directory is required')
if path_provider is None:
raise ValueError('A path provider is required')
if loader_factory is None:
raise ValueError('A loader factory is required')
self._directory = directory
self._path_provider = path_provider
self._path = None
self._loader_factory = loader_factory
self._loader = None
self._path = ''
self._path_filter = path_filter
def Load(self):
"""Loads new values from disk.
"""Loads new values.
The watcher will load from one file at a time; as soon as that file stops
yielding events, it will move on to the next file. We assume that old files
are never modified after a newer file has been written. As a result, Load()
The watcher will load from one path at a time; as soon as that path stops
yielding events, it will move on to the next path. We assume that old paths
are never modified after a newer path has been written. As a result, Load()
can be called multiple times in a row without losing events that have not
been yielded yet. In other words, we guarantee that every event will be
yielded exactly once.
Yields:
All values that were written to disk that have not been yielded yet.
All values that have not been yielded yet.
"""
# If the loader exists, check it for a value.
@ -78,39 +82,39 @@ class DirectoryWatcher(object):
self._InitializeLoader()
while True:
# Yield all the new events in the file we're currently loading from.
# Yield all the new events in the path we're currently loading from.
for event in self._loader.Load():
yield event
next_path = self._GetNextPath()
if not next_path:
logging.info('No more files in %s', self._directory)
# Current file is empty and there are no new files, so we're done.
logging.info('No path found after %s', self._path)
# Current path is empty and there are no new paths, so we're done.
return
# There's a new file, so check to make sure there weren't any events
# written between when we finished reading the current file and when we
# There's a new path, so check to make sure there weren't any events
# written between when we finished reading the current path and when we
# checked for the new one. The sequence of events might look something
# like this:
#
# 1. Event #1 written to file #1.
# 2. We check for events and yield event #1 from file #1
# 3. We check for events and see that there are no more events in file #1.
# 4. Event #2 is written to file #1.
# 5. Event #3 is written to file #2.
# 6. We check for a new file and see that file #2 exists.
# 1. Event #1 written to path #1.
# 2. We check for events and yield event #1 from path #1
# 3. We check for events and see that there are no more events in path #1.
# 4. Event #2 is written to path #1.
# 5. Event #3 is written to path #2.
# 6. We check for a new path and see that path #2 exists.
#
# Without this loop, we would miss event #2. We're also guaranteed by the
# loader contract that no more events will be written to file #1 after
# events start being written to file #2, so we don't have to worry about
# loader contract that no more events will be written to path #1 after
# events start being written to path #2, so we don't have to worry about
# that.
for event in self._loader.Load():
yield event
logging.info('Directory watcher for %s advancing to file %s',
self._directory, next_path)
logging.info('Directory watcher advancing from %s to %s', self._path,
next_path)
# Advance to the next file and start over.
# Advance to the next path and start over.
self._SetPath(next_path)
def _InitializeLoader(self):
@ -125,10 +129,49 @@ class DirectoryWatcher(object):
self._loader = self._loader_factory(path)
def _GetNextPath(self):
"""Returns the path of the next file to use or None if no file exists."""
sorted_paths = [os.path.join(self._directory, path)
for path in sorted(gfile.ListDirectory(self._directory))]
# We filter here so the filter gets the full directory name.
filtered_paths = (path for path in sorted_paths
if self._path_filter(path) and path > self._path)
return next(filtered_paths, None)
"""Returns the next path to use or None if no such path exists."""
return self._path_provider(self._path)
def _SequentialProvider(path_source):
"""A provider that iterates over the output of a function that produces paths.
_SequentialProvider takes in a path_source, which is a function that returns a
list of all currently available paths. _SequentialProvider returns in a path
provider (see documentation for the |DirectoryWatcher| class for the
semantics) that will return the alphabetically next path after the current one
(or the earliest path if the current path is None).
The provider will never return a path which is alphanumerically less than the
current path; as such, if the path source provides a high path (e.g. "c") and
later doubles back and provides a low path (e.g. "b"), once the current path
was set to "c" the _SequentialProvider will ignore the "b" and never return
it.
Args:
path_source: A function that returns an iterable of paths.
Returns:
A path provider for use with DirectoryWatcher.
"""
def _Provider(current_path):
next_paths = list(path
for path in path_source()
if current_path is None or path > current_path)
if next_paths:
return min(next_paths)
else:
return None
return _Provider
def SequentialGFileProvider(directory, path_filter=lambda x: True):
"""Provides the files in a directory that match the given filter."""
def _Source():
paths = (os.path.join(directory, path)
for path in gfile.ListDirectory(directory))
return (path for path in paths if path_filter(path))
return _SequentialProvider(_Source)

View File

@ -52,7 +52,7 @@ class DirectoryWatcherTest(test_util.TensorFlowTestCase):
self._directory = os.path.join(self.get_temp_dir(), 'monitor_dir')
os.mkdir(self._directory)
self._watcher = directory_watcher.DirectoryWatcher(
self._directory, _ByteLoader)
directory_watcher.SequentialGFileProvider(self._directory), _ByteLoader)
def tearDown(self):
shutil.rmtree(self._directory)
@ -69,7 +69,7 @@ class DirectoryWatcherTest(test_util.TensorFlowTestCase):
with self.assertRaises(ValueError):
directory_watcher.DirectoryWatcher(None, lambda x: [])
with self.assertRaises(ValueError):
directory_watcher.DirectoryWatcher('asdf', None)
directory_watcher.DirectoryWatcher(lambda x: None, None)
def testEmptyDirectory(self):
self.assertWatcherYields([])
@ -110,15 +110,17 @@ class DirectoryWatcherTest(test_util.TensorFlowTestCase):
self._WriteToFile('c', 'c')
self.assertWatcherYields(['a', 'c'])
def testFileFilter(self):
self._watcher = directory_watcher.DirectoryWatcher(
self._directory, _ByteLoader,
def testPathFilter(self):
provider = directory_watcher.SequentialGFileProvider(
self._directory,
path_filter=lambda path: 'do_not_watch_me' not in path)
self._watcher = directory_watcher.DirectoryWatcher(provider, _ByteLoader)
self._WriteToFile('a', 'a')
self._WriteToFile('do_not_watch_me', 'b')
self._WriteToFile('c', 'c')
self.assertWatcherYields(['a', 'c'])
if __name__ == '__main__':
googletest.main()

View File

@ -9,7 +9,7 @@ load("//tensorflow/core:platform/default/build_config_root.bzl",
# List of proto files for android builds
def tf_android_core_proto_sources():
return [
"//google/protobuf", # any.proto
"//google/protobuf:any.proto",
"//tensorflow/core:example/example.proto",
"//tensorflow/core:example/feature.proto",
"//tensorflow/core:framework/allocation_description.proto",