From d118d28e002cc1c59e21053396b3bba928b38185 Mon Sep 17 00:00:00 2001 From: Phil Pearl Date: Wed, 24 Jun 2020 15:52:58 +0100 Subject: [PATCH] Avoid memory 'leak' in Go with multple inputs Fixes #40758. Go map iteration order is random. If we send randomly ordered lists of inputs to the C core it will create a key in the session executors_ cache for each of the random orderings. With more than a handful of inputs this can get big very quickly. --- tensorflow/go/session.go | 41 ++++++++++++ tensorflow/go/session_test.go | 114 ++++++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+) diff --git a/tensorflow/go/session.go b/tensorflow/go/session.go index 48909ffe39e..309dc21432a 100644 --- a/tensorflow/go/session.go +++ b/tensorflow/go/session.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "runtime" + "sort" "sync" "unsafe" ) @@ -359,16 +360,56 @@ type cRunArgs struct { targets []*C.TF_Operation } +type feedsort struct { + feeds []C.TF_Output + feedTensors []*C.TF_Tensor +} + +func (f *feedsort) Less(i, j int) bool { + // Ideally we would sort on the output names. But that's not easy for us to + // do efficiently as loads of Go name strings would be created from the C + // strings and destroyed. But we can sort on the addresses of the operation + // names. This won't sort alphabetically, but for a given set of feeds it + // should give consistent results from one run to the next. + ni := uintptr(unsafe.Pointer(C.TF_OperationName(f.feeds[i].oper))) + nj := uintptr(unsafe.Pointer(C.TF_OperationName(f.feeds[j].oper))) + if ni == nj { + // if the names are the same the index may differ + return f.feeds[i].index < f.feeds[j].index + } + return ni < nj +} + +func (f *feedsort) Swap(i, j int) { + f.feeds[i], f.feeds[j] = f.feeds[j], f.feeds[i] + f.feedTensors[i], f.feedTensors[j] = f.feedTensors[j], f.feedTensors[i] +} + +func (f *feedsort) Len() int { + return len(f.feeds) +} + func newCRunArgs(feeds map[Output]*Tensor, fetches []Output, targets []*Operation) *cRunArgs { c := &cRunArgs{ fetches: make([]C.TF_Output, len(fetches)), fetchTensors: make([]*C.TF_Tensor, len(fetches)), targets: make([]*C.TF_Operation, len(targets)), } + // Go map iteration order is random. So our list of input names will be + // random for each Run. This interacts badly with the TF core code which + // builds a executor cache key from these names in the order we provide + // them. We'll eventually enumerate every possible order and store it in the + // executor cache. With n inputs that's n! entries. That gets very big very + // quickly. for o, t := range feeds { c.feeds = append(c.feeds, o.c()) c.feedTensors = append(c.feedTensors, t.c) } + if len(c.feeds) > 1 { + fs := feedsort{feeds: c.feeds, feedTensors: c.feedTensors} + sort.Sort(&fs) + } + for i, o := range fetches { c.fetches[i] = o.c() } diff --git a/tensorflow/go/session_test.go b/tensorflow/go/session_test.go index c9bda001671..f5479aadca6 100644 --- a/tensorflow/go/session_test.go +++ b/tensorflow/go/session_test.go @@ -74,6 +74,120 @@ func TestSessionRunNeg(t *testing.T) { } } +func TestMultipleInput(t *testing.T) { + // The inputs to the graph get sorted. This test checks that process works + // OK and that we still get the right output. + graph := NewGraph() + + inputs := make([]Output, 20) + layer2 := make([]Output, len(inputs)) + for i := range inputs { + in, err := Placeholder(graph, fmt.Sprintf("input%d", i), Int64) + if err != nil { + t.Fatal(err) + } + inputs[i] = in + + factor, err := Const(graph, fmt.Sprintf("factor%d", i), int64(i+1)) + if err != nil { + t.Fatal(err) + } + l2, err := graph.AddOperation(OpSpec{ + Type: "Mul", + Name: fmt.Sprintf("Mul%d", i), + Input: []Input{ + in, + factor, + }, + }) + if err != nil { + t.Fatal(err) + } + layer2[i] = l2.Output(0) + } + + fetch, err := graph.AddOperation(OpSpec{ + Type: "AddN", + Input: []Input{ + OutputList(layer2), + }, + }) + if err != nil { + t.Fatal(err) + } + + session, err := NewSession(graph, nil) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := session.Close(); err != nil { + t.Fatal(err) + } + }() + + feeds := make(map[Output]*Tensor, len(inputs)) + for i, in := range inputs { + tensor, err := NewTensor(int64(i + 1)) + if err != nil { + t.Fatal(err) + } + feeds[in] = tensor + } + + output, err := session.Run( + feeds, + []Output{ + fetch.Output(0), + }, + nil, + ) + if err != nil { + t.Fatal(err) + } + + var exp int64 + for i := range inputs { + exp += int64((i + 1) * (i + 1)) + } + if v := output[0].Value().(int64); v != exp { + t.Fatalf("expected %d got %d", exp, v) + } +} + +func TestInputOrderStable(t *testing.T) { + graph := NewGraph() + + inputs := make([]Output, 20) + for i := range inputs { + in, err := Placeholder(graph, fmt.Sprintf("input%d", i), Int64) + if err != nil { + t.Fatal(err) + } + in.Index = i + inputs[i] = in + } + + makeArgs := func() *cRunArgs { + feeds := make(map[Output]*Tensor, len(inputs)) + for i, in := range inputs { + tensor, err := NewTensor(int64(i + 1)) + if err != nil { + t.Fatal(err) + } + feeds[in] = tensor + } + + return newCRunArgs(feeds, nil, nil) + } + args1 := makeArgs() + args2 := makeArgs() + + if !reflect.DeepEqual(args1.feeds, args2.feeds) { + t.Fatalf("order is not stable") + } +} + func TestSessionRunConcat(t *testing.T) { // Runs the Concat operation on two matrices: m1 and m2, along the // first dimension (dim1).