Avoid memory 'leak' in Go with multple inputs

Fixes #40758. Go map iteration order is random. If we send randomly ordered lists of inputs to the C core it will create a key in the session executors_ cache for each of the random orderings. With more than a handful of inputs this can get big very quickly.
This commit is contained in:
Phil Pearl 2020-06-24 15:52:58 +01:00
parent 098a3eacfd
commit d118d28e00
2 changed files with 155 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"runtime"
"sort"
"sync"
"unsafe"
)
@ -359,16 +360,56 @@ type cRunArgs struct {
targets []*C.TF_Operation
}
type feedsort struct {
feeds []C.TF_Output
feedTensors []*C.TF_Tensor
}
func (f *feedsort) Less(i, j int) bool {
// Ideally we would sort on the output names. But that's not easy for us to
// do efficiently as loads of Go name strings would be created from the C
// strings and destroyed. But we can sort on the addresses of the operation
// names. This won't sort alphabetically, but for a given set of feeds it
// should give consistent results from one run to the next.
ni := uintptr(unsafe.Pointer(C.TF_OperationName(f.feeds[i].oper)))
nj := uintptr(unsafe.Pointer(C.TF_OperationName(f.feeds[j].oper)))
if ni == nj {
// if the names are the same the index may differ
return f.feeds[i].index < f.feeds[j].index
}
return ni < nj
}
func (f *feedsort) Swap(i, j int) {
f.feeds[i], f.feeds[j] = f.feeds[j], f.feeds[i]
f.feedTensors[i], f.feedTensors[j] = f.feedTensors[j], f.feedTensors[i]
}
func (f *feedsort) Len() int {
return len(f.feeds)
}
func newCRunArgs(feeds map[Output]*Tensor, fetches []Output, targets []*Operation) *cRunArgs {
c := &cRunArgs{
fetches: make([]C.TF_Output, len(fetches)),
fetchTensors: make([]*C.TF_Tensor, len(fetches)),
targets: make([]*C.TF_Operation, len(targets)),
}
// Go map iteration order is random. So our list of input names will be
// random for each Run. This interacts badly with the TF core code which
// builds a executor cache key from these names in the order we provide
// them. We'll eventually enumerate every possible order and store it in the
// executor cache. With n inputs that's n! entries. That gets very big very
// quickly.
for o, t := range feeds {
c.feeds = append(c.feeds, o.c())
c.feedTensors = append(c.feedTensors, t.c)
}
if len(c.feeds) > 1 {
fs := feedsort{feeds: c.feeds, feedTensors: c.feedTensors}
sort.Sort(&fs)
}
for i, o := range fetches {
c.fetches[i] = o.c()
}

View File

@ -74,6 +74,120 @@ func TestSessionRunNeg(t *testing.T) {
}
}
func TestMultipleInput(t *testing.T) {
// The inputs to the graph get sorted. This test checks that process works
// OK and that we still get the right output.
graph := NewGraph()
inputs := make([]Output, 20)
layer2 := make([]Output, len(inputs))
for i := range inputs {
in, err := Placeholder(graph, fmt.Sprintf("input%d", i), Int64)
if err != nil {
t.Fatal(err)
}
inputs[i] = in
factor, err := Const(graph, fmt.Sprintf("factor%d", i), int64(i+1))
if err != nil {
t.Fatal(err)
}
l2, err := graph.AddOperation(OpSpec{
Type: "Mul",
Name: fmt.Sprintf("Mul%d", i),
Input: []Input{
in,
factor,
},
})
if err != nil {
t.Fatal(err)
}
layer2[i] = l2.Output(0)
}
fetch, err := graph.AddOperation(OpSpec{
Type: "AddN",
Input: []Input{
OutputList(layer2),
},
})
if err != nil {
t.Fatal(err)
}
session, err := NewSession(graph, nil)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := session.Close(); err != nil {
t.Fatal(err)
}
}()
feeds := make(map[Output]*Tensor, len(inputs))
for i, in := range inputs {
tensor, err := NewTensor(int64(i + 1))
if err != nil {
t.Fatal(err)
}
feeds[in] = tensor
}
output, err := session.Run(
feeds,
[]Output{
fetch.Output(0),
},
nil,
)
if err != nil {
t.Fatal(err)
}
var exp int64
for i := range inputs {
exp += int64((i + 1) * (i + 1))
}
if v := output[0].Value().(int64); v != exp {
t.Fatalf("expected %d got %d", exp, v)
}
}
func TestInputOrderStable(t *testing.T) {
graph := NewGraph()
inputs := make([]Output, 20)
for i := range inputs {
in, err := Placeholder(graph, fmt.Sprintf("input%d", i), Int64)
if err != nil {
t.Fatal(err)
}
in.Index = i
inputs[i] = in
}
makeArgs := func() *cRunArgs {
feeds := make(map[Output]*Tensor, len(inputs))
for i, in := range inputs {
tensor, err := NewTensor(int64(i + 1))
if err != nil {
t.Fatal(err)
}
feeds[in] = tensor
}
return newCRunArgs(feeds, nil, nil)
}
args1 := makeArgs()
args2 := makeArgs()
if !reflect.DeepEqual(args1.feeds, args2.feeds) {
t.Fatalf("order is not stable")
}
}
func TestSessionRunConcat(t *testing.T) {
// Runs the Concat operation on two matrices: m1 and m2, along the
// first dimension (dim1).