Completely rewrite GetMatchingPaths.

The current parallel implementation is too complex (lambda inside lambda, two levels of parallelism) and has a read outside of bounds issue.

The new implementation cleans up artifacts from the previous implementations that were left in the code as it evolves. We add multiple helper functions, and document invariants and preconditions as well as every major step. This way, we fix the security issue and a potential new one which was not caught before

PiperOrigin-RevId: 346146220
Change-Id: Iec0f44673f43349797bf9944dffe9b2f779137d8
This commit is contained in:
Mihai Maruseac 2020-12-07 11:57:01 -08:00 committed by TensorFlower Gardener
parent 458c626026
commit 8b5b9dc966

View File

@ -52,115 +52,217 @@ void ForEach(int first, int last, const std::function<void(int)>& f) {
#endif #endif
} }
// A globbing pattern can only start with these characters:
static const char kGlobbingChars[] = "*?[\\";
static inline bool IsGlobbingPattern(const std::string& pattern) {
return (pattern.find_first_of(kGlobbingChars) != std::string::npos);
}
// Make sure that the first entry in `dirs` during glob expansion does not
// contain a glob pattern. This is to prevent a corner-case bug where
// `<pattern>` would be treated differently than `./<pattern>`.
static std::string PatchPattern(const std::string& pattern) {
const std::string fixed_prefix =
pattern.substr(0, pattern.find_first_of(kGlobbingChars));
// Patching is needed when there is no directory part in `prefix`
if (io::Dirname(fixed_prefix).empty()) {
return io::JoinPath(".", pattern);
}
// No patching needed
return pattern;
}
static std::vector<std::string> AllDirectoryPrefixes(const std::string& d) {
std::vector<std::string> dirs;
const std::string patched = PatchPattern(d);
StringPiece dir(patched);
// If the pattern ends with a `/` (or `\\` on Windows), we need to strip it
// otherwise we would have one additional matching step and the result set
// would be empty.
bool is_directory = d[d.size() - 1] == '/';
#ifdef PLATFORM_WINDOWS
is_directory = is_directory || (d[d.size() - 1] == '\\');
#endif
if (is_directory) {
dir = io::Dirname(dir);
}
while (!dir.empty()) {
dirs.emplace_back(dir);
StringPiece new_dir(io::Dirname(dir));
// io::Dirname("/") returns "/" so we need to break the loop.
// On Windows, io::Dirname("C:\\") would return "C:\\", so we check for
// identity of the result instead of checking for dir[0] == `/`.
if (dir == new_dir) break;
dir = new_dir;
}
// Order the array from parent to ancestor (reverse order).
std::reverse(dirs.begin(), dirs.end());
return dirs;
}
static inline int GetFirstGlobbingEntry(const std::vector<std::string>& dirs) {
int i = 0;
for (const auto& d : dirs) {
if (IsGlobbingPattern(d)) {
break;
}
i++;
}
return i;
}
} // namespace } // namespace
Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern, Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern,
std::vector<string>* results) { std::vector<string>* results) {
// Check that `fs`, `env` and `results` are non-null.
if (fs == nullptr || env == nullptr || results == nullptr) {
return Status(tensorflow::error::INVALID_ARGUMENT,
"Filesystem calls GetMatchingPaths with nullptr arguments");
}
// By design, we don't match anything on empty pattern
results->clear(); results->clear();
if (pattern.empty()) { if (pattern.empty()) {
return Status::OK(); return Status::OK();
} }
string fixed_prefix = pattern.substr(0, pattern.find_first_of("*?[\\")); // The pattern can contain globbing characters at multiple levels, e.g.:
string eval_pattern = pattern; //
string dir(io::Dirname(fixed_prefix)); // foo/ba?/baz/f*r
// If dir is empty then we need to fix up fixed_prefix and eval_pattern to //
// include . as the top level directory. // To match the full pattern, we must match every prefix subpattern and then
if (dir.empty()) { // operate on the children for each match. Thus, we separate all subpatterns
dir = "."; // in the `dirs` vector below.
fixed_prefix = io::JoinPath(dir, fixed_prefix); std::vector<std::string> dirs = AllDirectoryPrefixes(pattern);
eval_pattern = io::JoinPath(dir, eval_pattern);
// We can have patterns that have several parents where no globbing is being
// done, for example, `foo/bar/baz/*`. We don't need to expand the directories
// which don't contain the globbing characters.
int matching_index = GetFirstGlobbingEntry(dirs);
// If we don't have globbing characters in the pattern then it specifies a
// path in the filesystem. We add it to the result set if it exists.
if (matching_index == dirs.size()) {
if (fs->FileExists(pattern).ok()) {
results->emplace_back(pattern);
}
return Status::OK();
} }
bool is_directory = pattern[pattern.size() - 1] == '/';
#ifdef PLATFORM_WINDOWS // To expand the globbing, we do a BFS from `dirs[matching_index-1]`.
is_directory = is_directory || pattern[pattern.size() - 1] == '\\'; // At every step, we work on a pair `{dir, ix}` such that `dir` is a real
#endif // directory, `ix < dirs.size() - 1` and `dirs[ix+1]` is a globbing pattern.
std::vector<string> dirs; // To expand the pattern, we select from all the children of `dir` only those
if (!is_directory) { // that match against `dirs[ix+1]`.
dirs.emplace_back(eval_pattern); // If there are more entries in `dirs` after `dirs[ix+1]` this mean we have
} // more patterns to match. So, we add to the queue only those children that
StringPiece tmp_dir(io::Dirname(eval_pattern)); // are also directories, paired with `ix+1`.
while (tmp_dir.size() > dir.size()) { // If there are no more entries in `dirs`, we return all children as part of
dirs.emplace_back(string(tmp_dir)); // the answer.
tmp_dir = io::Dirname(tmp_dir); // Since we can get into a combinatorial explosion issue (e.g., pattern
} // `/*/*/*`), we process the queue in parallel. Each parallel processing takes
dirs.emplace_back(dir); // elements from `expand_queue` and adds them to `next_expand_queue`, after
std::reverse(dirs.begin(), dirs.end()); // which we swap these two queues (similar to double buffering algorithms).
// Setup a parallel BFS to explore everything under dir. // PRECONDITION: `IsGlobbingPattern(dirs[0]) == false`
std::deque<std::pair<string, int>> dir_q; // PRECONDITION: `matching_index > 0`
std::deque<std::pair<string, int>> next_dir_q; // INVARIANT: If `{d, ix}` is in queue, then `d` and `dirs[ix]` are at the
dir_q.emplace_back(std::make_pair(dirs[0], 0)); // same level in the filesystem tree.
Status ret; // Status to return. // INVARIANT: If `{d, _}` is in queue, then `IsGlobbingPattern(d) == false`.
mutex results_mutex; // INVARIANT: If `{d, _}` is in queue, then `d` is a real directory.
condition_variable results_cond; // INVARIANT: If `{_, ix}` is in queue, then `ix < dirs.size() - 1`.
mutex next_que_mutex; // INVARIANT: If `{_, ix}` is in queue, `IsGlobbingPattern(dirs[ix + 1])`.
condition_variable next_que_cond; std::deque<std::pair<string, int>> expand_queue;
while (!dir_q.empty()) { std::deque<std::pair<string, int>> next_expand_queue;
next_dir_q.clear(); expand_queue.emplace_back(dirs[matching_index - 1], matching_index - 1);
std::vector<Status> new_rets(dir_q.size());
auto handle_level = [fs, &results, &dir_q, &next_dir_q, &new_rets, // Adding to `result` or `new_expand_queue` need to be protected by mutexes
&is_directory, &dirs, &results_mutex, &results_cond, // since there are multiple threads writing to these.
&next_que_mutex, &next_que_cond](int i) { mutex result_mutex;
string current_dir = dir_q.at(i).first; mutex queue_mutex;
int dir_index = dir_q.at(i).second;
dir_index++; while (!expand_queue.empty()) {
std::vector<string> children; next_expand_queue.clear();
Status s = fs->GetChildren(current_dir, &children);
// In case PERMISSION_DENIED is encountered, we bail here. // The work item for every item in `expand_queue`.
// pattern, we process them in parallel.
auto handle_level = [&fs, &results, &dirs, &expand_queue,
&next_expand_queue, &result_mutex,
&queue_mutex](int i) {
// See invariants above, all of these are valid accesses.
const auto& queue_item = expand_queue.at(i);
const std::string& parent = queue_item.first;
const int index = queue_item.second + 1;
const std::string& match_pattern = dirs[index];
// Get all children of `parent`. If this fails, return early.
std::vector<std::string> children;
Status s = fs->GetChildren(parent, &children);
if (s.code() == tensorflow::error::PERMISSION_DENIED) { if (s.code() == tensorflow::error::PERMISSION_DENIED) {
return; return;
} }
new_rets[i] = s;
if (children.empty()) return;
// children_dir_status holds is_dir status for children. It can have three // Also return early if we don't have any children
// possible values: OK for true; FAILED_PRECONDITION for false; CANCELLED if (children.empty()) {
// if we don't calculate IsDirectory (we might do that because there isn't return;
// any point in exploring that child path). }
std::vector<Status> children_dir_status;
// This IsDirectory call can be expensive for some FS. Parallelizing it. // Since we can get extremely many children here and on some filesystems
children_dir_status.resize(children.size()); // `IsDirectory` is expensive, we process the children in parallel.
auto handle_children = [fs, &current_dir, &children, &dirs, dir_index, // We also check that children match the pattern in parallel, for speedup.
is_directory, &children_dir_status](int j) { // We store the status of the match and `IsDirectory` in
const string child_path = io::JoinPath(current_dir, children[j]); // `children_status` array, one element for each children.
if (!fs->Match(child_path, dirs[dir_index])) { std::vector<Status> children_status(children.size());
children_dir_status[j] = auto handle_children = [&fs, &match_pattern, &parent, &children,
&children_status](int j) {
const std::string path = io::JoinPath(parent, children[j]);
if (!fs->Match(path, match_pattern)) {
children_status[j] =
Status(tensorflow::error::CANCELLED, "Operation not needed"); Status(tensorflow::error::CANCELLED, "Operation not needed");
} else if (dir_index != dirs.size() - 1) {
children_dir_status[j] = fs->IsDirectory(child_path);
} else { } else {
children_dir_status[j] = children_status[j] = fs->IsDirectory(path);
is_directory ? fs->IsDirectory(child_path) : Status::OK();
} }
}; };
ForEach(0, children.size(), handle_children); ForEach(0, children.size(), handle_children);
for (size_t j = 0; j < children.size(); ++j) { // At this point, pairing `children` with `children_status` will tell us
const string child_path = io::JoinPath(current_dir, children[j]); // if a children:
// If the IsDirectory call was cancelled we bail. // * does not match the pattern
if (children_dir_status[j].code() == tensorflow::error::CANCELLED) { // * matches the pattern and is a directory
// * matches the pattern and is not a directory
// We fully ignore the first case.
// If we matched the last pattern (`index == dirs.size() - 1`) then all
// remaining children get added to the result.
// Otherwise, only the directories get added to the next queue.
for (size_t j = 0; j < children.size(); j++) {
if (children_status[j].code() == tensorflow::error::CANCELLED) {
continue; continue;
} }
if (children_dir_status[j].ok()) {
if (dir_index != dirs.size() - 1) { const std::string path = io::JoinPath(parent, children[j]);
mutex_lock lk(next_que_mutex); if (index == dirs.size() - 1) {
next_dir_q.emplace_back(std::make_pair(child_path, dir_index)); mutex_lock l(result_mutex);
next_que_cond.notify_one(); results->emplace_back(path);
} else { } else if (children_status[j].ok()) {
mutex_lock lk(results_mutex); mutex_lock l(queue_mutex);
results->emplace_back(child_path); next_expand_queue.emplace_back(path, index);
results_cond.notify_one();
}
} }
} }
}; };
ForEach(0, dir_q.size(), handle_level); ForEach(0, expand_queue.size(), handle_level);
ret.Update(new_rets[dir_q.size() - 1]); // After evaluating one level, swap the "buffers"
std::swap(dir_q, next_dir_q); std::swap(expand_queue, next_expand_queue);
} }
return ret;
return Status::OK();
} }
} // namespace internal } // namespace internal