[RunHandler] Fix wait-for-handler code when timeout is not set.
Previously we were setting a (very short) deadline when `call_timeout == 0`, whereas this should be treated as an indefinite deadline. PiperOrigin-RevId: 292241523 Change-Id: I659886f0f1642b6683c4c2ff44d74ae7bec29620
This commit is contained in:
parent
39fd4e7c7b
commit
f3117e8ec1
@ -36,7 +36,9 @@ limitations under the License.
|
|||||||
|
|
||||||
namespace tensorflow {
|
namespace tensorflow {
|
||||||
namespace {
|
namespace {
|
||||||
|
// LINT.IfChange
|
||||||
static constexpr int32 kMaxConcurrentHandlers = 128;
|
static constexpr int32 kMaxConcurrentHandlers = 128;
|
||||||
|
// LINT.ThenChange(//tensorflow/core/framework/run_handler_test.cc)
|
||||||
|
|
||||||
// TODO(azaks): Refactor with thread:ThreadPool
|
// TODO(azaks): Refactor with thread:ThreadPool
|
||||||
class RunHandlerEnvironment {
|
class RunHandlerEnvironment {
|
||||||
@ -948,16 +950,18 @@ class RunHandlerPool::Impl {
|
|||||||
RunHandler::Impl* handler_impl;
|
RunHandler::Impl* handler_impl;
|
||||||
{
|
{
|
||||||
mutex_lock l(mu_);
|
mutex_lock l(mu_);
|
||||||
if (free_handlers_.empty()) {
|
if (!has_free_handler()) {
|
||||||
profiler::TraceMe activity(
|
profiler::TraceMe activity(
|
||||||
[&] {
|
[&] {
|
||||||
return strings::StrCat("WaitingForHandler#step_id=", step_id,
|
return strings::StrCat("WaitingForHandler#step_id=", step_id,
|
||||||
"#");
|
"#");
|
||||||
},
|
},
|
||||||
profiler::TraceMeLevel::kInfo);
|
profiler::TraceMeLevel::kInfo);
|
||||||
if (!mu_.AwaitWithDeadline(
|
if (timeout_in_ms == 0) {
|
||||||
Condition(this, &Impl::has_free_handler),
|
mu_.Await(Condition(this, &Impl::has_free_handler));
|
||||||
EnvTime::NowNanos() + timeout_in_ms * 1000 * 1000)) {
|
} else if (!mu_.AwaitWithDeadline(
|
||||||
|
Condition(this, &Impl::has_free_handler),
|
||||||
|
EnvTime::NowNanos() + timeout_in_ms * 1000 * 1000)) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -205,5 +205,37 @@ TEST_F(RunHandlerTest, TestConcurrencyUseRunHandlerPool) {
|
|||||||
delete tp;
|
delete tp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(RunHandlerTest, TestWaitTimeout) {
|
||||||
|
std::unique_ptr<RunHandlerPool> pool(new RunHandlerPool(1, 1));
|
||||||
|
|
||||||
|
// Get the single handler in the pool.
|
||||||
|
std::vector<std::unique_ptr<RunHandler>> blocking_handles;
|
||||||
|
const int32 kMaxConcurrentHandlers = 128; // Copied from run_handler.cc.
|
||||||
|
blocking_handles.reserve(kMaxConcurrentHandlers);
|
||||||
|
for (int i = 0; i < kMaxConcurrentHandlers; ++i) {
|
||||||
|
blocking_handles.push_back(pool->Get(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
// A subsequent request with a non-zero timeout will fail by returning
|
||||||
|
// nullptr.
|
||||||
|
auto null_handle = pool->Get(128, 1);
|
||||||
|
EXPECT_EQ(null_handle.get(), nullptr);
|
||||||
|
|
||||||
|
// A subsequent request with no timeout will succeed once the blocking handle
|
||||||
|
// is returned.
|
||||||
|
auto tp = std::make_unique<thread::ThreadPool>(Env::Default(), "test", 4);
|
||||||
|
std::atomic<int64> release_time;
|
||||||
|
|
||||||
|
tp->Schedule([&blocking_handles, &release_time]() {
|
||||||
|
Env::Default()->SleepForMicroseconds(5000);
|
||||||
|
release_time = EnvTime::NowNanos();
|
||||||
|
blocking_handles[0].reset();
|
||||||
|
});
|
||||||
|
|
||||||
|
auto next_handle = pool->Get(129, 0);
|
||||||
|
EXPECT_GT(EnvTime::NowNanos(), release_time);
|
||||||
|
EXPECT_NE(next_handle.get(), nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
} // namespace tensorflow
|
} // namespace tensorflow
|
||||||
|
Loading…
Reference in New Issue
Block a user