Added scalar caching to tensorflow::ConvertToEagerTensor

Note that cache key contains PyObject* and is therefore not easily reusable
from other languages.

CPU

| Benchmark                       | Before (calls/sec) | After (calls/sec) |
|---------------------------------+--------------------+-------------------|
| benchmark_add_float_scalars     |      96697.1650772 |     122549.093512 |
| benchmark_add_int_scalars       |      100551.000642 |     124905.320251 |
| benchmark_create_float_constant |      269135.927106 |     368643.600035 |
| benchmark_create_int32_constant |      250023.088998 |      347383.13732 |

GPU

| Benchmark                       | Before (calls/sec) | After (calls/sec) |
|---------------------------------+--------------------+-------------------|
| benchmark_add_float_scalars     |      9478.74450315 |     17181.8063021 |
| benchmark_add_int_scalars       |      99584.0439651 |     117965.869066 |
| benchmark_create_float_constant |      275277.007219 |     381577.874818 |

Notes:

* The timings between CPU and GPU are incomparable because they were measured
  on different hardware;
* I suspect that benchmark_add_int_scalars on GPU does addition on CPU and
  copies to GPU after, therefore the gap between *_add_float_* and *_add_int_*.

PiperOrigin-RevId: 261293772
This commit is contained in:
Sergei Lebedev 2019-08-02 03:29:32 -07:00 committed by TensorFlower Gardener
parent eb4504defc
commit f124540c29
7 changed files with 220 additions and 57 deletions

View File

@ -14,10 +14,12 @@ cc_library(
name = "pywrap_tfe_lib",
srcs = [
"pywrap_tensor.cc",
"pywrap_tensor_conversion.cc",
"pywrap_tfe_src.cc",
],
hdrs = [
"pywrap_tensor.h",
"pywrap_tensor_conversion.h",
"pywrap_tfe.h",
],
visibility = [
@ -42,6 +44,8 @@ cc_library(
"//tensorflow/python:safe_ptr",
"//third_party/py/numpy:headers",
"//third_party/python_runtime:headers",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/hash",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/types:variant",
],

View File

@ -156,7 +156,6 @@ class _TensorCaches(threading.local):
def __init__(self):
super(_TensorCaches, self).__init__()
self.scalar_cache = {}
self._ones_rank_cache = None
self._zeros_cache = None
@ -502,9 +501,9 @@ class Context(object):
self._initialize_logical_devices()
def _clear_caches(self):
self.scalar_cache().clear()
self.ones_rank_cache().flush()
self.zeros_cache().flush()
pywrap_tensorflow.TFE_ClearScalarCache()
def set_server_def(self, server_def, keep_alive_secs=600):
"""Allow setting a server_def on the context.
@ -534,12 +533,11 @@ class Context(object):
server_def_str = server_def.SerializeToString()
pywrap_tensorflow.TFE_ContextSetServerDef(self._context_handle,
keep_alive_secs, server_def_str)
# Clear all the caches in case there are remote tensors in them.
self._clear_caches()
self._initialize_logical_devices()
# Clear all the caches in case there are remote tensors in them.
self._clear_caches()
def enable_collective_ops(self, server_def):
"""Enable distributed collective ops with an appropriate server_def.
@ -651,10 +649,6 @@ class Context(object):
"""Returns True if current thread has eager executing enabled."""
return self._thread_local_data.is_eager
def scalar_cache(self):
"""Per-device cache for scalars."""
return _tensor_caches_map[self._id].scalar_cache
def ones_rank_cache(self):
"""Per-device cache for scalars."""
return _tensor_caches_map[self._id].ones_rank_cache

View File

@ -24,6 +24,7 @@ limitations under the License.
#include "tensorflow/core/framework/types.h"
#include "tensorflow/core/framework/types.pb.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/python/eager/pywrap_tensor_conversion.h"
#include "tensorflow/python/eager/pywrap_tfe.h"
#include "tensorflow/python/lib/core/ndarray_tensor.h"
#include "tensorflow/python/lib/core/ndarray_tensor_bridge.h"
@ -265,9 +266,10 @@ TFE_TensorHandle* PySeqToTFE_TensorHandle(PyObject* value, DataType dtype) {
return new TFE_TensorHandle(handle);
}
TFE_TensorHandle* ConvertToEagerTensor(TFE_Context* ctx, PyObject* value,
tensorflow::DataType dtype,
const char* device_name) {
TFE_TensorHandle* ConvertToEagerTensorUncached(TFE_Context* ctx,
PyObject* value,
tensorflow::DataType dtype,
const char* device_name) {
tensorflow::Safe_PyObjectPtr value_decrefer;
if (PyArray_IsScalar(value, Generic)) {
// Convert numpy scalars to numpy arrays.
@ -385,6 +387,26 @@ TFE_TensorHandle* ConvertToEagerTensor(TFE_Context* ctx, PyObject* value,
return handle.release();
}
TFE_TensorHandle* ConvertToEagerTensor(TFE_Context* ctx, PyObject* value,
DataType dtype,
const char* device_name) {
// Reduce the overhead of allocation/transfer-to-device for scalars by
// caching the corresponding handles. Note that currently only Python
// scalars are cached.
// TODO(slebedev): also cache singleton NumPy arrays and scalars?
if (PyArray_IsPythonNumber(value)) {
auto* cache = TFE_TensorHandleCache::Get();
TFE_TensorHandle* handle = cache->Lookup(value, dtype, device_name);
if (handle != nullptr) return handle;
handle = ConvertToEagerTensorUncached(ctx, value, dtype, device_name);
if (handle == nullptr) return nullptr;
cache->Insert(value, dtype, device_name, handle);
return handle;
} else {
return ConvertToEagerTensorUncached(ctx, value, dtype, device_name);
}
}
} // namespace tensorflow
extern "C" {
@ -484,12 +506,10 @@ int EagerTensor_init(EagerTensor* self, PyObject* args, PyObject* kwds) {
PyObject* value;
const char* device_name = nullptr;
tensorflow::DataType dtype = tensorflow::DataType::DT_INVALID;
PyObject* other_value = nullptr;
const char* kwlist[] = {"value", "device", "dtype", "other_value", nullptr};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "OO&|O&O",
const_cast<char**>(kwlist), &value,
ConvertDeviceName, &device_name,
ConvertDataType, &dtype, &other_value)) {
const char* kwlist[] = {"value", "device", "dtype", nullptr};
if (!PyArg_ParseTupleAndKeywords(
args, kwds, "OO&|O&", const_cast<char**>(kwlist), &value,
ConvertDeviceName, &device_name, ConvertDataType, &dtype)) {
return -1;
}
@ -497,27 +517,6 @@ int EagerTensor_init(EagerTensor* self, PyObject* args, PyObject* kwds) {
if (py_context == nullptr) return -1;
self->context = py_context;
if (other_value != nullptr) {
if (!EagerTensor_CheckExact(other_value)) {
PyErr_SetString(PyExc_TypeError,
tensorflow::strings::StrCat(
"Expecting an EagerTensor for other_value, got ",
Py_TYPE(other_value)->tp_name)
.c_str());
return -1;
}
EagerTensor* other = reinterpret_cast<EagerTensor*>(other_value);
self->handle =
TFE_TensorHandleCopySharingTensor(other->handle, self->status);
if (MaybeRaiseExceptionFromTFStatus(self->status, PyExc_ValueError)) {
return -1;
}
return 0;
}
auto* handle = tensorflow::ConvertToEagerTensor(GetContextHandle(py_context),
value, dtype, device_name);
if (handle == nullptr) return -1;
@ -673,6 +672,7 @@ static PyObject* EagerTensor_copy_to_device(EagerTensor* self, PyObject* args,
TF_SetStatus(self->status, TF_OK, "");
return nullptr;
}
return EagerTensorFromHandle(handle);
}

View File

@ -0,0 +1,69 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/python/eager/pywrap_tensor_conversion.h"
#include "absl/container/flat_hash_map.h"
#include "absl/hash/hash.h"
#include "tensorflow/c/eager/c_api_internal.h"
#include "tensorflow/core/lib/monitoring/counter.h"
#include "tensorflow/core/platform/logging.h"
namespace tensorflow {
auto* scalar_cache_hits = tensorflow::monitoring::Counter<0>::New(
"/tensorflow/eager/python/scalar_cache_hits",
"Number of times a scalar TFE_TensorHandle was retrieved from cache");
auto* scalar_cache_misses = tensorflow::monitoring::Counter<0>::New(
"/tensorflow/eager/python/scalar_cache_misses",
"Number of times a scalar TFE_TensorHandle was not available in cache");
TFE_TensorHandleCache* TFE_TensorHandleCache::Get() {
// TODO(slebedev): link with Context (in context.py) instead of having
// a static global?
static auto* cache = new TFE_TensorHandleCache();
return cache;
}
TFE_TensorHandle* TFE_TensorHandleCache::Lookup(
PyObject* value, tensorflow::DataType dtype,
absl::string_view device_name) const {
CHECK_NOTNULL(value);
const auto& it = cache.find(Key{PyObjectPtr{value}, dtype, device_name});
if (it == cache.end()) {
scalar_cache_misses->GetCell()->IncrementBy(1);
return nullptr;
}
scalar_cache_hits->GetCell()->IncrementBy(1);
auto* handle = it->second;
handle->Ref();
return new TFE_TensorHandle(handle);
}
void TFE_TensorHandleCache::Insert(PyObject* value, tensorflow::DataType dtype,
absl::string_view device_name,
TFE_TensorHandle* handle) {
Py_INCREF(value);
handle->handle->Ref();
cache.emplace(Key{PyObjectPtr{value}, dtype, device_name}, handle->handle);
}
void TFE_TensorHandleCache::Clear() {
DecrefUnrefAll();
cache.clear();
}
} // namespace tensorflow

View File

@ -0,0 +1,101 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_PYTHON_EAGER_PYWRAP_TENSOR_CONVERSION_H_
#define TENSORFLOW_PYTHON_EAGER_PYWRAP_TENSOR_CONVERSION_H_
// Place `<locale>` before <Python.h> to avoid build failure in macOS.
#include <locale>
// The empty line above is on purpose as otherwise clang-format will
// automatically move <Python.h> before <locale>.
#include <Python.h>
#include "absl/container/flat_hash_map.h"
#include "absl/hash/hash.h"
#include "absl/strings/string_view.h"
#include "tensorflow/c/eager/c_api.h"
#include "tensorflow/core/common_runtime/eager/tensor_handle.h"
#include "tensorflow/core/framework/types.pb.h"
namespace tensorflow {
// Wrapper-class allowing to use Python hashing/comparison functions
// for PyObject*.
//
// Note that unlike Safe_PyObjectPtr this class does not steal a
// reference to a Python object. The caller is responsible for doing
// Py_INCREF/Py_DECREF.
struct PyObjectPtr {
template <typename H>
friend H AbslHashValue(H h, const PyObjectPtr& obj) {
return H::combine(std::move(h), PyObject_Hash(obj.ptr));
}
explicit PyObjectPtr(PyObject* ptr) : ptr(ptr) {}
explicit inline operator PyObject*() const { return ptr; }
inline bool operator==(const PyObjectPtr& other) const {
// We require exact type equality to account for 0 == 0.0 == False.
if (Py_TYPE(ptr) != Py_TYPE(other.ptr)) {
return false;
}
bool result = PyObject_RichCompareBool(ptr, other.ptr, Py_EQ) > 0;
CHECK(!PyErr_Occurred());
return result;
}
private:
PyObject* ptr;
};
// Cache mapping PyObject* to the corresponding on-device TFE_TensorHandles.
// Used to speed up ConvertToEagerTensor for scalars.
// TODO(slebedev): move ConvertToEagerTensor here.
struct TFE_TensorHandleCache {
static TFE_TensorHandleCache* Get();
TFE_TensorHandleCache() { cache.reserve(64); }
~TFE_TensorHandleCache() { DecrefUnrefAll(); }
TFE_TensorHandle* Lookup(PyObject* value, tensorflow::DataType dtype,
absl::string_view device_name) const;
void Insert(PyObject* value, tensorflow::DataType dtype,
absl::string_view device_name, TFE_TensorHandle* handle);
void Clear();
private:
// TODO(slebedev): should the key depend on TFE_Context?
using Key = std::tuple<PyObjectPtr, tensorflow::DataType, absl::string_view>;
void DecrefUnrefAll() {
for (const auto& p : cache) {
Py_DECREF(static_cast<PyObject*>(std::get<0>(p.first)));
p.second->Unref();
}
}
// Not guarded by a mutex because the code is only used while the
// GIL is held.
absl::flat_hash_map<Key, tensorflow::TensorHandle*> cache;
};
} // namespace tensorflow
#endif // TENSORFLOW_PYTHON_EAGER_PYWRAP_TENSOR_CONVERSION_H_

View File

@ -23,8 +23,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import six
from tensorflow.core.framework import attr_value_pb2
from tensorflow.core.framework import types_pb2
from tensorflow.python.eager import context
@ -95,21 +93,7 @@ def convert_to_eager_tensor(value, ctx, dtype=None):
except AttributeError:
dtype = dtypes.as_dtype(dtype).as_datatype_enum
ctx.ensure_initialized()
device = ctx.device_name
if isinstance(value, (float,) + six.integer_types):
# Use a scalar cache. This will put each scalar of each type only once on
# each device. Scalars don't use much device memory but copying scalars can
# trigger memcpys which are slow.
cache_key = device, value, dtype, type(value)
scalar_cache = ctx.scalar_cache()
tensor = scalar_cache.get(cache_key, None)
if tensor is not None:
return ops.EagerTensor(value, device, dtype, tensor)
t = ops.EagerTensor(value, device, dtype)
scalar_cache[cache_key] = t
return t
else:
return ops.EagerTensor(value, device, dtype)
return ops.EagerTensor(value, ctx.device_name, dtype)
@tf_export(v1=["constant"])

View File

@ -167,6 +167,7 @@ limitations under the License.
%rename("%s") TFE_CancellationManagerStartCancel;
%rename("%s") TFE_DeleteCancellationManager;
%rename("%s") TF_ImportGraphDefOptionsSetValidateColocationConstraints;
%rename("%s") TFE_ClearScalarCache;
%{
#include "tensorflow/python/eager/pywrap_tfe.h"
@ -194,6 +195,16 @@ static PyObject* TF_ListPhysicalDevices(TF_Status* status) {
%}
static PyObject* TF_ListPhysicalDevices(TF_Status* status);
%{
#include "tensorflow/python/eager/pywrap_tensor_conversion.h"
static PyObject* TFE_ClearScalarCache() {
tensorflow::TFE_TensorHandleCache::Get()->Clear();
Py_RETURN_NONE;
}
%}
static PyObject* TFE_ClearScalarCache();
%typemap(in) (const void* proto) {
char* c_string;
Py_ssize_t py_size;