Added a PriorityReadyManager that uses given node priorities to scheduler ready nodes.

PiperOrigin-RevId: 247498851
This commit is contained in:
Peter Ma 2019-05-09 14:39:17 -07:00 committed by TensorFlower Gardener
parent 94751d7ba3
commit 7a8aadc06c
3 changed files with 122 additions and 24 deletions

View File

@ -112,31 +112,25 @@ void LIFOManager::RemoveCurrNode() {
curr_pos_ = nodes_.end(); // Reset curr_pos_.
}
FirstReadyManager::FirstReadyManager() : ReadyNodeManager() {
HeapReadyManager::HeapReadyManager() : ReadyNodeManager() {
std::make_heap(nodes_.begin(), nodes_.end());
}
Status FirstReadyManager::Init(
Status HeapReadyManager::Init(
const std::unordered_map<const NodeDef*, NodeState>* node_map) {
// Reset the node state since different instances of the scheduler can reuse
// Resets the node state since different instances of the scheduler can reuse
// the same node_manager.
node_map_ = node_map;
nodes_.clear();
waiting_queue_.clear();
greater_ = [this](const NodeDef* a, const NodeDef* b) -> bool {
if (node_map_->at(a).time_ready == node_map_->at(b).time_ready) {
// Use Node name as tie-breaker for deterministic node scheduling.
return a->name().compare(b->name()) > 0;
} else {
// Note: we need a node with minimum time_ready, not maximum; hence, using
// a > b for comparison function.
return node_map_->at(a).time_ready > node_map_->at(b).time_ready;
}
};
// Sets up the comparator for the heap.
greater_ = Greater();
return Status::OK();
}
const NodeDef* FirstReadyManager::GetCurrNode() {
const NodeDef* HeapReadyManager::GetCurrNode() {
if (nodes_.empty()) {
// Nothing in the node_; probably, the very first call. Move waiting_queue_
// to node_.
@ -146,7 +140,7 @@ const NodeDef* FirstReadyManager::GetCurrNode() {
return nodes_.front();
}
void FirstReadyManager::RemoveCurrNode() {
void HeapReadyManager::RemoveCurrNode() {
if (nodes_.empty()) {
// Make sure that there is a node to be removed at the front of nodes_.
GetCurrNode();
@ -156,11 +150,11 @@ void FirstReadyManager::RemoveCurrNode() {
DrainWaitingQueue();
}
bool FirstReadyManager::Empty() const {
bool HeapReadyManager::Empty() const {
return nodes_.empty() && waiting_queue_.empty();
}
void FirstReadyManager::DrainWaitingQueue() {
void HeapReadyManager::DrainWaitingQueue() {
for (const auto* node : waiting_queue_) {
// push_heap in AddNode() and pop_heap in RemoveCurrNode() guarantees that
// the first element is the node with minimum time_ready.
@ -170,6 +164,44 @@ void FirstReadyManager::DrainWaitingQueue() {
waiting_queue_.clear();
}
std::function<bool(const NodeDef*, const NodeDef*)>
FirstReadyManager::Greater() {
auto greater = [this](const NodeDef* a, const NodeDef* b) -> bool {
if (node_map_->at(a).time_ready == node_map_->at(b).time_ready) {
// Use Node name as tie-breaker for deterministic node scheduling.
return a->name().compare(b->name()) > 0;
} else {
// Note: we need a node with minimum time_ready, not maximum; hence, using
// a > b for comparison function.
return node_map_->at(a).time_ready > node_map_->at(b).time_ready;
}
};
return greater;
}
std::function<bool(const NodeDef*, const NodeDef*)>
PriorityReadyManager::Greater() {
auto greater = [this](const NodeDef* a, const NodeDef* b) -> bool {
return node_priority_.at(a->name()) > node_priority_.at(b->name());
};
return greater;
}
Status PriorityReadyManager::SetPriority(
const std::unordered_map<string, int>& node_priority) {
// Checks each node has a unique priority.
std::unordered_set<int> priorities;
for (const auto& it : node_priority_) {
if (priorities.find(it.second) != priorities.end()) {
return errors::InvalidArgument("Non-unique priority found");
}
priorities.insert(it.second);
}
node_priority_ = node_priority;
return Status::OK();
}
CompositeNodeManager::CompositeNodeManager()
: ReadyNodeManager(), send_manager_(), recv_manager_() {}

View File

@ -180,21 +180,22 @@ class LIFOManager : public ReadyNodeManager {
std::list<const NodeDef*>::iterator curr_pos_ = nodes_.end();
};
// FirstReadyManager picks a node with the minimum time_ready value.
// Behavior is deterministic when there are more than one nodes with the minimum
// time_ready value with unique node names as the tie-breaker.
class FirstReadyManager : public ReadyNodeManager {
// Abstract class that maintains a heap/priority queue for scheduling ready
// nodes. Derived class needs to implement the Greater() function which returns
// the comparator for the heap.
class HeapReadyManager : public ReadyNodeManager {
public:
FirstReadyManager();
HeapReadyManager();
Status Init(
const std::unordered_map<const NodeDef*, NodeState>* node_map) override;
~FirstReadyManager() override {}
~HeapReadyManager() override {}
void AddNode(const NodeDef* node) override { waiting_queue_.push_back(node); }
const NodeDef* GetCurrNode() override;
void RemoveCurrNode() override;
bool Empty() const override;
private:
protected:
virtual std::function<bool(const NodeDef*, const NodeDef*)> Greater() = 0;
// Move all the nodes in the waiting_queue_ to nodes_.
void DrainWaitingQueue();
@ -214,6 +215,37 @@ class FirstReadyManager : public ReadyNodeManager {
const std::unordered_map<const NodeDef*, NodeState>* node_map_;
};
// FirstReadyManager picks a node with the minimum time_ready value.
// Behavior is deterministic when there are more than one nodes with the minimum
// time_ready value with unique node names as the tie-breaker.
class FirstReadyManager : public HeapReadyManager {
public:
FirstReadyManager() : HeapReadyManager() {}
~FirstReadyManager() override {}
protected:
std::function<bool(const NodeDef*, const NodeDef*)> Greater() override;
};
// PriorityReadyManager uses the given node priorities when picking up next node
// from all the ready nodes.
class PriorityReadyManager : public HeapReadyManager {
public:
PriorityReadyManager() : HeapReadyManager() {}
~PriorityReadyManager() override {}
// Note this should be called after Init().
Status SetPriority(const std::unordered_map<string, int>& node_priority);
protected:
std::function<bool(const NodeDef*, const NodeDef*)> Greater() override;
private:
// A map from unique node name to unique priority. Lower number means higher
// priority.
std::unordered_map<string, int> node_priority_;
};
// CompositeNodeManager has a few other NodeManagers: per-device LIFO for normal
// ops (neither _Send nor _Recv) and FirstReadyManagers for _Send ops and _Recv
// ops, and then it chooses FirstReady among the ops chosen from each

View File

@ -372,6 +372,40 @@ TEST_F(ReadyNodeManagerTest, DeterminismInFirstReadyManager) {
EXPECT_TRUE(manager2.Empty());
}
TEST_F(ReadyNodeManagerTest, GetAndRemoveMultiplePriorityReadyManager) {
PriorityReadyManager manager;
TF_EXPECT_OK(manager.Init(&node_states_));
// Sets up node priorities.
std::unordered_map<string, int> node_priority = {{"Node1", 1}, {"Node2", 2},
{"Node3", 3}, {"Node4", 4},
{"Node5", 5}, {"Node6", 6}};
TF_EXPECT_OK(manager.SetPriority(node_priority));
// Inserts nodes in some random order.
manager.AddNode(&node2_);
manager.AddNode(&node1_);
manager.AddNode(&node4_);
manager.AddNode(&node5_);
manager.AddNode(&node3_);
manager.AddNode(&node6_);
// Expects nodes scheduled based on priority.
EXPECT_EQ(manager.GetCurrNode()->name(), "Node1");
manager.RemoveCurrNode();
EXPECT_EQ(manager.GetCurrNode()->name(), "Node2");
manager.RemoveCurrNode();
EXPECT_EQ(manager.GetCurrNode()->name(), "Node3");
manager.RemoveCurrNode();
EXPECT_EQ(manager.GetCurrNode()->name(), "Node4");
manager.RemoveCurrNode();
EXPECT_EQ(manager.GetCurrNode()->name(), "Node5");
manager.RemoveCurrNode();
EXPECT_EQ(manager.GetCurrNode()->name(), "Node6");
manager.RemoveCurrNode();
EXPECT_TRUE(manager.Empty());
}
TEST_F(ReadyNodeManagerTest, RemoveSingleNodeCompositeNodeManager) {
CompositeNodeManager manager;
TF_EXPECT_OK(manager.Init(&node_states_));