diff --git a/tensorflow/python/summary/event_accumulator.py b/tensorflow/python/summary/event_accumulator.py index 3715ca64cb1..c605b1b0078 100644 --- a/tensorflow/python/summary/event_accumulator.py +++ b/tensorflow/python/summary/event_accumulator.py @@ -524,8 +524,10 @@ def _GeneratorFromPath(path): """Create an event generator for file or directory at given path string.""" loader_factory = event_file_loader.EventFileLoader if gfile.IsDirectory(path): - return directory_watcher.DirectoryWatcher(path, loader_factory, - IsTensorFlowEventsFile) + provider = directory_watcher.SequentialGFileProvider( + path, + path_filter=IsTensorFlowEventsFile) + return directory_watcher.DirectoryWatcher(provider, loader_factory) else: return loader_factory(path) diff --git a/tensorflow/python/summary/impl/directory_watcher.py b/tensorflow/python/summary/impl/directory_watcher.py index 587c7a6d30c..d3ece1f01d4 100644 --- a/tensorflow/python/summary/impl/directory_watcher.py +++ b/tensorflow/python/summary/impl/directory_watcher.py @@ -25,52 +25,56 @@ from tensorflow.python.platform import logging class DirectoryWatcher(object): - """A DirectoryWatcher wraps a loader to load from a directory. + """A DirectoryWatcher wraps a loader to load from a sequence of paths. - A loader reads a file on disk and produces some kind of values as an - iterator. A DirectoryWatcher takes a directory with one file at a time being - written to and a factory for loaders and watches all the files at once. + A loader reads a path and produces some kind of values as an iterator. A + DirectoryWatcher takes a directory, a path provider (see below) to call to + find new paths to load from, and a factory for loaders and watches all the + paths inside that directory. + + A path provider is a function that, given either a path or None, returns the + next path to load from (or None if there is no such path). This class is only + valid under the assumption that only one path will be written to by the data + source at a time, and that the path_provider will return the oldest data + source that contains fresh data. - This class is *only* valid under the assumption that files are never removed - and the only file ever changed is whichever one is lexicographically last. """ - def __init__(self, directory, loader_factory, path_filter=lambda x: True): + def __init__(self, path_provider, loader_factory): """Constructs a new DirectoryWatcher. Args: - directory: The directory to watch. The directory doesn't have to exist. + path_provider: The callback to invoke when trying to find a new path to + load from. See the class documentation for the semantics of a path + provider. loader_factory: A factory for creating loaders. The factory should take a - file path and return an object that has a Load method returning an + path and return an object that has a Load method returning an iterator that will yield all events that have not been yielded yet. - path_filter: Only files whose full path matches this predicate will be - loaded. If not specified, all files are loaded. Raises: - ValueError: If directory or loader_factory is None. + ValueError: If path_provider or loader_factory are None. """ - if directory is None: - raise ValueError('A directory is required') + if path_provider is None: + raise ValueError('A path provider is required') if loader_factory is None: raise ValueError('A loader factory is required') - self._directory = directory + self._path_provider = path_provider + self._path = None self._loader_factory = loader_factory self._loader = None - self._path = '' - self._path_filter = path_filter def Load(self): - """Loads new values from disk. + """Loads new values. - The watcher will load from one file at a time; as soon as that file stops - yielding events, it will move on to the next file. We assume that old files - are never modified after a newer file has been written. As a result, Load() + The watcher will load from one path at a time; as soon as that path stops + yielding events, it will move on to the next path. We assume that old paths + are never modified after a newer path has been written. As a result, Load() can be called multiple times in a row without losing events that have not been yielded yet. In other words, we guarantee that every event will be yielded exactly once. Yields: - All values that were written to disk that have not been yielded yet. + All values that have not been yielded yet. """ # If the loader exists, check it for a value. @@ -78,39 +82,39 @@ class DirectoryWatcher(object): self._InitializeLoader() while True: - # Yield all the new events in the file we're currently loading from. + # Yield all the new events in the path we're currently loading from. for event in self._loader.Load(): yield event next_path = self._GetNextPath() if not next_path: - logging.info('No more files in %s', self._directory) - # Current file is empty and there are no new files, so we're done. + logging.info('No path found after %s', self._path) + # Current path is empty and there are no new paths, so we're done. return - # There's a new file, so check to make sure there weren't any events - # written between when we finished reading the current file and when we + # There's a new path, so check to make sure there weren't any events + # written between when we finished reading the current path and when we # checked for the new one. The sequence of events might look something # like this: # - # 1. Event #1 written to file #1. - # 2. We check for events and yield event #1 from file #1 - # 3. We check for events and see that there are no more events in file #1. - # 4. Event #2 is written to file #1. - # 5. Event #3 is written to file #2. - # 6. We check for a new file and see that file #2 exists. + # 1. Event #1 written to path #1. + # 2. We check for events and yield event #1 from path #1 + # 3. We check for events and see that there are no more events in path #1. + # 4. Event #2 is written to path #1. + # 5. Event #3 is written to path #2. + # 6. We check for a new path and see that path #2 exists. # # Without this loop, we would miss event #2. We're also guaranteed by the - # loader contract that no more events will be written to file #1 after - # events start being written to file #2, so we don't have to worry about + # loader contract that no more events will be written to path #1 after + # events start being written to path #2, so we don't have to worry about # that. for event in self._loader.Load(): yield event - logging.info('Directory watcher for %s advancing to file %s', - self._directory, next_path) + logging.info('Directory watcher advancing from %s to %s', self._path, + next_path) - # Advance to the next file and start over. + # Advance to the next path and start over. self._SetPath(next_path) def _InitializeLoader(self): @@ -125,10 +129,49 @@ class DirectoryWatcher(object): self._loader = self._loader_factory(path) def _GetNextPath(self): - """Returns the path of the next file to use or None if no file exists.""" - sorted_paths = [os.path.join(self._directory, path) - for path in sorted(gfile.ListDirectory(self._directory))] - # We filter here so the filter gets the full directory name. - filtered_paths = (path for path in sorted_paths - if self._path_filter(path) and path > self._path) - return next(filtered_paths, None) + """Returns the next path to use or None if no such path exists.""" + return self._path_provider(self._path) + + +def _SequentialProvider(path_source): + """A provider that iterates over the output of a function that produces paths. + + _SequentialProvider takes in a path_source, which is a function that returns a + list of all currently available paths. _SequentialProvider returns in a path + provider (see documentation for the |DirectoryWatcher| class for the + semantics) that will return the alphabetically next path after the current one + (or the earliest path if the current path is None). + + The provider will never return a path which is alphanumerically less than the + current path; as such, if the path source provides a high path (e.g. "c") and + later doubles back and provides a low path (e.g. "b"), once the current path + was set to "c" the _SequentialProvider will ignore the "b" and never return + it. + + Args: + path_source: A function that returns an iterable of paths. + + Returns: + A path provider for use with DirectoryWatcher. + + """ + def _Provider(current_path): + next_paths = list(path + for path in path_source() + if current_path is None or path > current_path) + if next_paths: + return min(next_paths) + else: + return None + + return _Provider + + +def SequentialGFileProvider(directory, path_filter=lambda x: True): + """Provides the files in a directory that match the given filter.""" + def _Source(): + paths = (os.path.join(directory, path) + for path in gfile.ListDirectory(directory)) + return (path for path in paths if path_filter(path)) + + return _SequentialProvider(_Source) diff --git a/tensorflow/python/summary/impl/directory_watcher_test.py b/tensorflow/python/summary/impl/directory_watcher_test.py index f020c4fd036..784d585dfad 100644 --- a/tensorflow/python/summary/impl/directory_watcher_test.py +++ b/tensorflow/python/summary/impl/directory_watcher_test.py @@ -52,7 +52,7 @@ class DirectoryWatcherTest(test_util.TensorFlowTestCase): self._directory = os.path.join(self.get_temp_dir(), 'monitor_dir') os.mkdir(self._directory) self._watcher = directory_watcher.DirectoryWatcher( - self._directory, _ByteLoader) + directory_watcher.SequentialGFileProvider(self._directory), _ByteLoader) def tearDown(self): shutil.rmtree(self._directory) @@ -69,7 +69,7 @@ class DirectoryWatcherTest(test_util.TensorFlowTestCase): with self.assertRaises(ValueError): directory_watcher.DirectoryWatcher(None, lambda x: []) with self.assertRaises(ValueError): - directory_watcher.DirectoryWatcher('asdf', None) + directory_watcher.DirectoryWatcher(lambda x: None, None) def testEmptyDirectory(self): self.assertWatcherYields([]) @@ -110,15 +110,17 @@ class DirectoryWatcherTest(test_util.TensorFlowTestCase): self._WriteToFile('c', 'c') self.assertWatcherYields(['a', 'c']) - def testFileFilter(self): - self._watcher = directory_watcher.DirectoryWatcher( - self._directory, _ByteLoader, + def testPathFilter(self): + provider = directory_watcher.SequentialGFileProvider( + self._directory, path_filter=lambda path: 'do_not_watch_me' not in path) + self._watcher = directory_watcher.DirectoryWatcher(provider, _ByteLoader) self._WriteToFile('a', 'a') self._WriteToFile('do_not_watch_me', 'b') self._WriteToFile('c', 'c') self.assertWatcherYields(['a', 'c']) + if __name__ == '__main__': googletest.main() diff --git a/tensorflow/tensorflow.bzl b/tensorflow/tensorflow.bzl index 4c1af94b0da..22bc61ec689 100644 --- a/tensorflow/tensorflow.bzl +++ b/tensorflow/tensorflow.bzl @@ -9,7 +9,7 @@ load("//tensorflow/core:platform/default/build_config_root.bzl", # List of proto files for android builds def tf_android_core_proto_sources(): return [ - "//google/protobuf", # any.proto + "//google/protobuf:any.proto", "//tensorflow/core:example/example.proto", "//tensorflow/core:example/feature.proto", "//tensorflow/core:framework/allocation_description.proto",