[tf.data service] Relax locking during round robin deepcopy.
Copying data out of the round robin buffer requires only a shared lock, but we were using an exclusive lock. This causes contention among consumers, especially when dataset elements are large. PiperOrigin-RevId: 351879643 Change-Id: I407fd1d5b296dcf514bc055fa5477c91430e8ccd
This commit is contained in:
parent
e97de5bfd7
commit
631fcba18c
@ -101,41 +101,45 @@ Status RoundRobinTaskRunner::GetNext(const Request& request,
|
|||||||
}
|
}
|
||||||
VLOG(2) << "Received request from consumer index " << request.consumer_index
|
VLOG(2) << "Received request from consumer index " << request.consumer_index
|
||||||
<< " for round " << request.round_index;
|
<< " for round " << request.round_index;
|
||||||
|
{
|
||||||
mutex_lock l(mu_);
|
mutex_lock l(mu_);
|
||||||
absl::flat_hash_set<int64>& round = requests_[request.round_index];
|
absl::flat_hash_set<int64>& round = requests_[request.round_index];
|
||||||
first_round_ = std::min(first_round_, request.round_index);
|
first_round_ = std::min(first_round_, request.round_index);
|
||||||
round.insert(request.consumer_index);
|
round.insert(request.consumer_index);
|
||||||
if (current_round_ < request.round_index && round.size() == num_consumers_) {
|
if (current_round_ < request.round_index &&
|
||||||
VLOG(1) << "Starting normal round with round index " << request.round_index;
|
round.size() == num_consumers_) {
|
||||||
// This was the last request to arrive, time to start a new round.
|
VLOG(1) << "Starting normal round with round index "
|
||||||
TF_RETURN_IF_ERROR(FillBuffer());
|
<< request.round_index;
|
||||||
VLOG(1) << "Finished preparing data for round " << request.round_index;
|
// This was the last request to arrive, time to start a new round.
|
||||||
current_round_ = request.round_index;
|
TF_RETURN_IF_ERROR(FillBuffer());
|
||||||
new_round_cv_.notify_all();
|
VLOG(1) << "Finished preparing data for round " << request.round_index;
|
||||||
}
|
current_round_ = request.round_index;
|
||||||
if (current_round_ < 0 &&
|
new_round_cv_.notify_all();
|
||||||
requests_[first_round_].size() + requests_[first_round_ + 1].size() ==
|
|
||||||
num_consumers_) {
|
|
||||||
VLOG(1) << "Starting partial round for " << requests_[first_round_].size()
|
|
||||||
<< " consumers";
|
|
||||||
// Indicates that we need a partial round to get consumers back in sync.
|
|
||||||
TF_RETURN_IF_ERROR(FillBuffer());
|
|
||||||
current_round_ = first_round_;
|
|
||||||
new_round_cv_.notify_all();
|
|
||||||
}
|
|
||||||
while (current_round_ < request.round_index) {
|
|
||||||
std::cv_status s =
|
|
||||||
new_round_cv_.wait_for(l, std::chrono::microseconds(kTimeoutUs));
|
|
||||||
if (s == std::cv_status::timeout) {
|
|
||||||
// Clients will retry Unavailable.
|
|
||||||
return errors::Unavailable(
|
|
||||||
"Timeout waiting for other round-robin consumers to be ready.");
|
|
||||||
}
|
}
|
||||||
|
if (current_round_ < 0 &&
|
||||||
|
requests_[first_round_].size() + requests_[first_round_ + 1].size() ==
|
||||||
|
num_consumers_) {
|
||||||
|
VLOG(1) << "Starting partial round for " << requests_[first_round_].size()
|
||||||
|
<< " consumers";
|
||||||
|
// Indicates that we need a partial round to get consumers back in sync.
|
||||||
|
TF_RETURN_IF_ERROR(FillBuffer());
|
||||||
|
current_round_ = first_round_;
|
||||||
|
new_round_cv_.notify_all();
|
||||||
|
}
|
||||||
|
while (current_round_ < request.round_index) {
|
||||||
|
std::cv_status s =
|
||||||
|
new_round_cv_.wait_for(l, std::chrono::microseconds(kTimeoutUs));
|
||||||
|
if (s == std::cv_status::timeout) {
|
||||||
|
// Clients will retry Unavailable.
|
||||||
|
return errors::Unavailable(
|
||||||
|
"Timeout waiting for other round-robin consumers to be ready.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
end_of_task = end_of_task_;
|
||||||
}
|
}
|
||||||
end_of_task = end_of_task_;
|
|
||||||
if (!end_of_task) {
|
if (!end_of_task) {
|
||||||
element.clear();
|
element.clear();
|
||||||
|
tf_shared_lock l(mu_);
|
||||||
for (auto& component : buffer_[request.consumer_index]) {
|
for (auto& component : buffer_[request.consumer_index]) {
|
||||||
element.push_back(tensor::DeepCopy(component));
|
element.push_back(tensor::DeepCopy(component));
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user