[data-stats] Adds support for gathering statistics as metrics with stats_aggreagtor
.
Also collects metrics for `examples_count`, `features_count`, `feature_values_count`, `feature_lists_count` and `sequence_examples_count` when `feature_stats()` transformation is applied to the dataset. PiperOrigin-RevId: 202667632
This commit is contained in:
parent
34df1eea29
commit
f139bc3f5c
@ -57,6 +57,10 @@ class StatsAggregator {
|
||||
// interface. It is possible that not all implementations will support
|
||||
// encoding their state as a protocol buffer.
|
||||
virtual void EncodeToProto(Summary* out_summary) = 0;
|
||||
|
||||
// Increment the `label` cell of metrics mapped with `name` by given `value`.
|
||||
virtual void IncrementCounter(const string& name, const string& label,
|
||||
int64 val) = 0;
|
||||
};
|
||||
|
||||
// A `StatsAggregatorResource` wraps a shareable `StatsAggregator` as a resource
|
||||
|
@ -20,11 +20,25 @@ limitations under the License.
|
||||
#include "tensorflow/core/framework/resource_op_kernel.h"
|
||||
#include "tensorflow/core/framework/summary.pb.h"
|
||||
#include "tensorflow/core/lib/histogram/histogram.h"
|
||||
#include "tensorflow/core/lib/monitoring/counter.h"
|
||||
#include "tensorflow/core/lib/monitoring/gauge.h"
|
||||
#include "tensorflow/core/lib/monitoring/sampler.h"
|
||||
#include "tensorflow/core/platform/macros.h"
|
||||
|
||||
namespace tensorflow {
|
||||
namespace {
|
||||
|
||||
static mutex* get_counters_map_lock() {
|
||||
static mutex counters_map_lock(LINKER_INITIALIZED);
|
||||
return &counters_map_lock;
|
||||
}
|
||||
|
||||
static std::unordered_map<string, monitoring::Counter<1>*>* get_counters_map() {
|
||||
static std::unordered_map<string, monitoring::Counter<1>*>* counters_map =
|
||||
new std::unordered_map<string, monitoring::Counter<1>*>;
|
||||
return counters_map;
|
||||
}
|
||||
|
||||
class StatsAggregatorImpl : public StatsAggregator {
|
||||
public:
|
||||
StatsAggregatorImpl() {}
|
||||
@ -61,6 +75,21 @@ class StatsAggregatorImpl : public StatsAggregator {
|
||||
}
|
||||
}
|
||||
|
||||
void IncrementCounter(const string& name, const string& label,
|
||||
int64 val) override {
|
||||
mutex_lock l(*get_counters_map_lock());
|
||||
auto counters_map = get_counters_map();
|
||||
if (counters_map->find(name) == counters_map->end()) {
|
||||
counters_map->emplace(
|
||||
name, monitoring::Counter<1>::New(
|
||||
/*streamz name*/ "/tensorflow/" + name,
|
||||
/*streamz description*/
|
||||
name + " generated or consumed by the component.",
|
||||
/*streamz label name*/ "component_descriptor"));
|
||||
}
|
||||
counters_map->at(name)->GetCell(label)->IncrementBy(val);
|
||||
}
|
||||
|
||||
private:
|
||||
mutex mu_;
|
||||
std::unordered_map<string, histogram::Histogram> histograms_ GUARDED_BY(mu_);
|
||||
|
@ -316,10 +316,14 @@ class FeatureStatsDatasetOp : public UnaryDatasetOpKernel {
|
||||
// changes to parse_example() where it returns stats as well.
|
||||
for (int i = 0; i < record_t.size(); ++i) {
|
||||
if (example.ParseFromString(record_t(i))) {
|
||||
stats_aggregator->IncrementCounter("examples_count", "trainer",
|
||||
1);
|
||||
AddStatsFeatures(example, stats_aggregator);
|
||||
} else {
|
||||
SequenceExample sequence_example;
|
||||
if (sequence_example.ParseFromString(record_t(i))) {
|
||||
stats_aggregator->IncrementCounter("sequence_examples_count",
|
||||
"trainer", 1);
|
||||
AddStatsFeatures(sequence_example, stats_aggregator);
|
||||
}
|
||||
}
|
||||
@ -360,8 +364,11 @@ class FeatureStatsDatasetOp : public UnaryDatasetOpKernel {
|
||||
|
||||
int feature_values_list_size_sum = 0;
|
||||
for (const auto& feature : example.features().feature()) {
|
||||
stats_aggregator->IncrementCounter("features_count", "trainer", 1);
|
||||
feature_values_list_size_sum += AddStatsFeatureValues(feature.second);
|
||||
}
|
||||
stats_aggregator->IncrementCounter("feature_values_count", "trainer",
|
||||
feature_values_list_size_sum);
|
||||
stats_aggregator->AddToHistogram(
|
||||
strings::StrCat(dataset()->tag_, ":feature-values"),
|
||||
{static_cast<double>(feature_values_list_size_sum)});
|
||||
@ -378,16 +385,20 @@ class FeatureStatsDatasetOp : public UnaryDatasetOpKernel {
|
||||
|
||||
int feature_values_list_size_sum = 0;
|
||||
for (const auto& feature : example.context().feature()) {
|
||||
stats_aggregator->IncrementCounter("features_count", "trainer", 1);
|
||||
feature_values_list_size_sum += AddStatsFeatureValues(feature.second);
|
||||
}
|
||||
|
||||
for (const auto& feature_list :
|
||||
example.feature_lists().feature_list()) {
|
||||
stats_aggregator->IncrementCounter("feature_lists_count", "reainer",
|
||||
1);
|
||||
for (const auto& feature : feature_list.second.feature()) {
|
||||
feature_values_list_size_sum += AddStatsFeatureValues(feature);
|
||||
}
|
||||
}
|
||||
|
||||
stats_aggregator->IncrementCounter("feature_values_count", "trainer",
|
||||
feature_values_list_size_sum);
|
||||
stats_aggregator->AddToHistogram(
|
||||
strings::StrCat(dataset()->tag_, ":feature-values"),
|
||||
{static_cast<double>(feature_values_list_size_sum)});
|
||||
|
Loading…
Reference in New Issue
Block a user