From 1e219a23c8039f65cf14fc463b80d28910351470 Mon Sep 17 00:00:00 2001 From: Gunhan Gulsoy Date: Thu, 29 Aug 2019 13:19:33 -0700 Subject: [PATCH] Remove kafka code from TF main repo. This code has moved to github.com/tensorflow/io PiperOrigin-RevId: 266212852 --- .bazelrc | 1 - CODEOWNERS | 1 - configure.py | 1 - tensorflow/BUILD | 6 - tensorflow/contrib/BUILD | 27 -- tensorflow/contrib/kafka/BUILD | 121 ------- tensorflow/contrib/kafka/__init__.py | 32 -- .../kafka/kernels/kafka_dataset_ops.cc | 322 ------------------ tensorflow/contrib/kafka/ops/dataset_ops.cc | 44 --- .../kafka/python/kernel_tests/kafka_test.py | 117 ------- .../kafka/python/kernel_tests/kafka_test.sh | 50 --- .../kafka/python/ops/kafka_dataset_ops.py | 73 ---- .../kafka/python/ops/kafka_op_loader.py | 24 -- tensorflow/opensource_only.files | 2 - tensorflow/tools/pip_package/BUILD | 9 - tensorflow/workspace.bzl | 12 - third_party/kafka/BUILD | 180 ---------- third_party/kafka/config.patch | 44 --- 18 files changed, 1066 deletions(-) delete mode 100644 tensorflow/contrib/kafka/BUILD delete mode 100644 tensorflow/contrib/kafka/__init__.py delete mode 100644 tensorflow/contrib/kafka/kernels/kafka_dataset_ops.cc delete mode 100644 tensorflow/contrib/kafka/ops/dataset_ops.cc delete mode 100644 tensorflow/contrib/kafka/python/kernel_tests/kafka_test.py delete mode 100755 tensorflow/contrib/kafka/python/kernel_tests/kafka_test.sh delete mode 100644 tensorflow/contrib/kafka/python/ops/kafka_dataset_ops.py delete mode 100644 tensorflow/contrib/kafka/python/ops/kafka_op_loader.py delete mode 100644 third_party/kafka/BUILD delete mode 100644 third_party/kafka/config.patch diff --git a/.bazelrc b/.bazelrc index 0491eb500c7..9b33f50a134 100644 --- a/.bazelrc +++ b/.bazelrc @@ -99,7 +99,6 @@ build:numa --define=with_numa_support=true build:noaws --define=no_aws_support=true build:nogcp --define=no_gcp_support=true build:nohdfs --define=no_hdfs_support=true -build:nokafka --define=no_kafka_support=true build:noignite --define=no_ignite_support=true build:nonccl --define=no_nccl_support=true diff --git a/CODEOWNERS b/CODEOWNERS index 25ff318d2d8..a9bc9a661d1 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -37,7 +37,6 @@ /tensorflow/contrib/hadoop @yongtang /tensorflow/contrib/hvx/ @satok16 /tensorflow/contrib/integrate/ @shoyer -/tensorflow/contrib/kafka @yongtang /tensorflow/contrib/kernel_methods/ @petrosmol /tensorflow/contrib/kinesis @yongtang /tensorflow/contrib/ios_examples/ @petewarden diff --git a/configure.py b/configure.py index 743198eea47..6a8593319fc 100644 --- a/configure.py +++ b/configure.py @@ -1515,7 +1515,6 @@ def main(): config_info_line('nogcp', 'Disable GCP support.') config_info_line('nohdfs', 'Disable HDFS support.') config_info_line('noignite', 'Disable Apache Ignite support.') - config_info_line('nokafka', 'Disable Apache Kafka support.') config_info_line('nonccl', 'Disable NVIDIA NCCL support.') diff --git a/tensorflow/BUILD b/tensorflow/BUILD index ed1fb3a4618..4433e6d33e8 100644 --- a/tensorflow/BUILD +++ b/tensorflow/BUILD @@ -273,12 +273,6 @@ config_setting( visibility = ["//visibility:public"], ) -config_setting( - name = "no_kafka_support", - define_values = {"no_kafka_support": "true"}, - visibility = ["//visibility:public"], -) - config_setting( name = "no_nccl_support", define_values = {"no_nccl_support": "true"}, diff --git a/tensorflow/contrib/BUILD b/tensorflow/contrib/BUILD index 034ecd85fd0..cbb622598bd 100644 --- a/tensorflow/contrib/BUILD +++ b/tensorflow/contrib/BUILD @@ -108,15 +108,6 @@ py_library( "//tensorflow/python:util", "//tensorflow/python/estimator:estimator_py", ] + select({ - "//tensorflow:android": [], - "//tensorflow:ios": [], - "//tensorflow:linux_s390x": [], - "//tensorflow:windows": [], - "//tensorflow:no_kafka_support": [], - "//conditions:default": [ - "//tensorflow/contrib/kafka", - ], - }) + select({ "//tensorflow:android": [], "//tensorflow:ios": [], "//tensorflow:linux_s390x": [], @@ -175,15 +166,6 @@ cc_library( "//tensorflow/contrib/tensor_forest:tensor_forest_kernels", "//tensorflow/contrib/text:all_kernels", ] + select({ - "//tensorflow:android": [], - "//tensorflow:ios": [], - "//tensorflow:linux_s390x": [], - "//tensorflow:windows": [], - "//tensorflow:no_kafka_support": [], - "//conditions:default": [ - "//tensorflow/contrib/kafka:dataset_kernels", - ], - }) + select({ "//tensorflow:android": [], "//tensorflow:ios": [], "//tensorflow:linux_s390x": [], @@ -215,15 +197,6 @@ cc_library( "//tensorflow/contrib/tensor_forest:tensor_forest_ops_op_lib", "//tensorflow/contrib/text:all_ops", ] + select({ - "//tensorflow:android": [], - "//tensorflow:ios": [], - "//tensorflow:linux_s390x": [], - "//tensorflow:windows": [], - "//tensorflow:no_kafka_support": [], - "//conditions:default": [ - "//tensorflow/contrib/kafka:dataset_ops_op_lib", - ], - }) + select({ "//tensorflow:android": [], "//tensorflow:ios": [], "//tensorflow:linux_s390x": [], diff --git a/tensorflow/contrib/kafka/BUILD b/tensorflow/contrib/kafka/BUILD deleted file mode 100644 index 21f2af3deef..00000000000 --- a/tensorflow/contrib/kafka/BUILD +++ /dev/null @@ -1,121 +0,0 @@ -package(default_visibility = ["//tensorflow:internal"]) - -licenses(["notice"]) # Apache 2.0 - -exports_files(["LICENSE"]) - -load( - "//tensorflow:tensorflow.bzl", - "tf_custom_op_library", - "tf_custom_op_py_library", - "tf_gen_op_libs", - "tf_gen_op_wrapper_py", - "tf_kernel_library", - "tf_py_test", -) - -py_library( - name = "kafka", - srcs = ["__init__.py"], - srcs_version = "PY2AND3", - deps = [ - ":dataset_ops", - ], -) - -tf_custom_op_library( - name = "_dataset_ops.so", - srcs = ["ops/dataset_ops.cc"], - deps = [":dataset_kernels"], -) - -tf_gen_op_libs( - op_lib_names = ["dataset_ops"], -) - -cc_library( - name = "dataset_kernels", - srcs = ["kernels/kafka_dataset_ops.cc"], - deps = [ - "//tensorflow/core:framework_headers_lib", - "//third_party/eigen3", - "@com_google_protobuf//:protobuf_headers", - "@kafka", - ], - alwayslink = 1, -) - -py_library( - name = "dataset_ops", - srcs = [ - "python/ops/kafka_dataset_ops.py", - ], - srcs_version = "PY2AND3", - deps = [ - ":kafka_op_loader", - "//tensorflow/python:dataset_ops_gen", - "//tensorflow/python:util", - "//tensorflow/python/data/ops:dataset_ops", - "//tensorflow/python/data/util:nest", - ], -) - -tf_gen_op_wrapper_py( - name = "gen_dataset_ops", - out = "python/ops/gen_dataset_ops.py", - deps = ["//tensorflow/contrib/kafka:dataset_ops_op_lib"], -) - -tf_kernel_library( - name = "dataset_ops_kernels", - deps = [ - ":dataset_kernels", - "//tensorflow/core:framework", - ], - alwayslink = 1, -) - -tf_custom_op_py_library( - name = "kafka_op_loader", - srcs = ["python/ops/kafka_op_loader.py"], - dso = ["//tensorflow/contrib/kafka:_dataset_ops.so"], - kernels = [ - ":dataset_ops_kernels", - "//tensorflow/contrib/kafka:dataset_ops_op_lib", - ], - srcs_version = "PY2AND3", - deps = [ - ":gen_dataset_ops", - "//tensorflow/contrib/util:util_py", - "//tensorflow/python:platform", - ], -) - -# The Kafka server has to be setup before running the test. -# The Kafka server is setup through Docker so the Docker engine -# has to be installed. -# -# Once the Docker engine is ready: -# To setup the Kafka server: -# $ bash tensorflow/contrib/kafka/python/kernel_tests/kafka_test.sh start kafka -# -# After the test is complete: -# To team down the Kafka server: -# $ bash tensorflow/contrib/kafka/python/kernel_tests/kafka_test.sh stop kafka -tf_py_test( - name = "kafka_test", - srcs = ["python/kernel_tests/kafka_test.py"], - additional_deps = [ - ":kafka", - "//third_party/py/numpy", - "//tensorflow/python:client_testlib", - "//tensorflow/python:framework", - "//tensorflow/python:framework_test_lib", - "//tensorflow/python:platform_test", - ], - tags = [ - "manual", - "no_windows", - "notap", - ], -) diff --git a/tensorflow/contrib/kafka/__init__.py b/tensorflow/contrib/kafka/__init__.py deleted file mode 100644 index 4d755c40568..00000000000 --- a/tensorflow/contrib/kafka/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright 2016 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Kafka Dataset. - -@@KafkaDataset -""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from tensorflow.contrib.kafka.python.ops.kafka_dataset_ops import KafkaDataset - -from tensorflow.python.util.all_util import remove_undocumented - -_allowed_symbols = [ - "KafkaDataset", -] - -remove_undocumented(__name__) diff --git a/tensorflow/contrib/kafka/kernels/kafka_dataset_ops.cc b/tensorflow/contrib/kafka/kernels/kafka_dataset_ops.cc deleted file mode 100644 index a3875bb4a19..00000000000 --- a/tensorflow/contrib/kafka/kernels/kafka_dataset_ops.cc +++ /dev/null @@ -1,322 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/core/framework/dataset.h" - -#include "rdkafkacpp.h" - -namespace tensorflow { - -class KafkaDatasetOp : public DatasetOpKernel { - public: - using DatasetOpKernel::DatasetOpKernel; - - void MakeDataset(OpKernelContext* ctx, DatasetBase** output) override { - const Tensor* topics_tensor; - OP_REQUIRES_OK(ctx, ctx->input("topics", &topics_tensor)); - OP_REQUIRES( - ctx, topics_tensor->dims() <= 1, - errors::InvalidArgument("`topics` must be a scalar or a vector.")); - - std::vector topics; - topics.reserve(topics_tensor->NumElements()); - for (int i = 0; i < topics_tensor->NumElements(); ++i) { - topics.push_back(topics_tensor->flat()(i)); - } - - std::string servers = ""; - OP_REQUIRES_OK( - ctx, data::ParseScalarArgument(ctx, "servers", &servers)); - std::string group = ""; - OP_REQUIRES_OK(ctx, - data::ParseScalarArgument(ctx, "group", &group)); - bool eof = false; - OP_REQUIRES_OK(ctx, data::ParseScalarArgument(ctx, "eof", &eof)); - int64 timeout = -1; - OP_REQUIRES_OK(ctx, - data::ParseScalarArgument(ctx, "timeout", &timeout)); - OP_REQUIRES(ctx, (timeout > 0), - errors::InvalidArgument( - "Timeout value should be large than 0, got ", timeout)); - *output = new Dataset(ctx, std::move(topics), servers, group, eof, timeout); - } - - private: - class Dataset : public DatasetBase { - public: - Dataset(OpKernelContext* ctx, std::vector topics, - const string& servers, const string& group, const bool eof, - const int64 timeout) - : DatasetBase(DatasetContext(ctx)), - topics_(std::move(topics)), - servers_(servers), - group_(group), - eof_(eof), - timeout_(timeout) {} - - std::unique_ptr MakeIteratorInternal( - const string& prefix) const override { - return std::unique_ptr( - new Iterator({this, strings::StrCat(prefix, "::Kafka")})); - } - - const DataTypeVector& output_dtypes() const override { - static DataTypeVector* dtypes = new DataTypeVector({DT_STRING}); - return *dtypes; - } - - const std::vector& output_shapes() const override { - static std::vector* shapes = - new std::vector({{}}); - return *shapes; - } - - string DebugString() const override { return "KafkaDatasetOp::Dataset"; } - - protected: - Status AsGraphDefInternal(SerializationContext* ctx, - DatasetGraphDefBuilder* b, - Node** output) const override { - Node* topics = nullptr; - TF_RETURN_IF_ERROR(b->AddVector(topics_, &topics)); - Node* servers = nullptr; - TF_RETURN_IF_ERROR(b->AddScalar(servers_, &servers)); - Node* group = nullptr; - TF_RETURN_IF_ERROR(b->AddScalar(group_, &group)); - Node* eof = nullptr; - TF_RETURN_IF_ERROR(b->AddScalar(eof_, &eof)); - Node* timeout = nullptr; - TF_RETURN_IF_ERROR(b->AddScalar(timeout_, &timeout)); - TF_RETURN_IF_ERROR( - b->AddDataset(this, {topics, servers, group, eof, timeout}, output)); - return Status::OK(); - } - - private: - class Iterator : public DatasetIterator { - public: - explicit Iterator(const Params& params) - : DatasetIterator(params) {} - - Status GetNextInternal(IteratorContext* ctx, - std::vector* out_tensors, - bool* end_of_sequence) override { - mutex_lock l(mu_); - do { - // We are currently processing a topic, so try to read the next line. - if (consumer_.get()) { - while (true) { - if (limit_ >= 0 && - (topic_partition_->offset() >= limit_ || offset_ >= limit_)) { - // EOF current topic - break; - } - std::unique_ptr message( - consumer_->consume(dataset()->timeout_)); - if (message->err() == RdKafka::ERR_NO_ERROR) { - // Produce the line as output. - Tensor line_tensor(cpu_allocator(), DT_STRING, {}); - line_tensor.scalar()().assign( - static_cast(message->payload()), - message->len()); - out_tensors->emplace_back(std::move(line_tensor)); - *end_of_sequence = false; - // Sync offset - offset_ = message->offset(); - return Status::OK(); - } - - if (message->err() == RdKafka::ERR__PARTITION_EOF && - dataset()->eof_) { - // EOF current topic - break; - } - if (message->err() != RdKafka::ERR__TIMED_OUT) { - return errors::Internal("Failed to consume:", - message->errstr()); - } - message.reset(nullptr); - consumer_->poll(0); - } - - // We have reached the end of the current topic, so maybe - // move on to next topic. - ResetStreamsLocked(); - ++current_topic_index_; - } - - // Iteration ends when there are no more topic to process. - if (current_topic_index_ == dataset()->topics_.size()) { - *end_of_sequence = true; - return Status::OK(); - } - - TF_RETURN_IF_ERROR(SetupStreamsLocked(ctx->env())); - } while (true); - } - - protected: - Status SaveInternal(IteratorStateWriter* writer) override { - mutex_lock l(mu_); - TF_RETURN_IF_ERROR(writer->WriteScalar(full_name("current_topic_index"), - current_topic_index_)); - - // `consumer_` is empty if - // 1. GetNext has not been called even once. - // 2. All topics have been read and iterator has been exhausted. - if (consumer_.get()) { - TF_RETURN_IF_ERROR( - writer->WriteScalar(full_name("current_pos"), offset_)); - } - return Status::OK(); - } - - Status RestoreInternal(IteratorContext* ctx, - IteratorStateReader* reader) override { - mutex_lock l(mu_); - ResetStreamsLocked(); - int64 current_topic_index; - TF_RETURN_IF_ERROR(reader->ReadScalar(full_name("current_topic_index"), - ¤t_topic_index)); - current_topic_index_ = size_t(current_topic_index); - // The key "current_pos" is written only if the iterator was saved - // with an open topic. - if (reader->Contains(full_name("current_pos"))) { - int64 current_pos; - TF_RETURN_IF_ERROR( - reader->ReadScalar(full_name("current_pos"), ¤t_pos)); - - TF_RETURN_IF_ERROR(SetupStreamsLocked(ctx->env())); - topic_partition_->set_offset(current_pos); - if (topic_partition_->offset() != current_pos) { - return errors::Internal("Failed to restore to offset ", - current_pos); - } - offset_ = current_pos; - } - return Status::OK(); - } - - private: - // Sets up Kafka streams to read from the topic at - // `current_topic_index_`. - Status SetupStreamsLocked(Env* env) EXCLUSIVE_LOCKS_REQUIRED(mu_) { - if (current_topic_index_ >= dataset()->topics_.size()) { - return errors::InvalidArgument( - "current_topic_index_:", current_topic_index_, - " >= topics_.size():", dataset()->topics_.size()); - } - - // Actually move on to next topic. - string entry = dataset()->topics_[current_topic_index_]; - - std::vector parts = str_util::Split(entry, ":"); - if (parts.size() < 1) { - return errors::InvalidArgument("Invalid parameters: ", entry); - } - string topic = parts[0]; - int32 partition = 0; - if (parts.size() > 1) { - if (!strings::safe_strto32(parts[1], &partition)) { - return errors::InvalidArgument("Invalid parameters: ", entry); - } - } - int64 offset = 0; - if (parts.size() > 2) { - if (!strings::safe_strto64(parts[2], &offset)) { - return errors::InvalidArgument("Invalid parameters: ", entry); - } - } - - topic_partition_.reset( - RdKafka::TopicPartition::create(topic, partition, offset)); - - offset_ = topic_partition_->offset(); - limit_ = -1; - if (parts.size() > 3) { - if (!strings::safe_strto64(parts[3], &limit_)) { - return errors::InvalidArgument("Invalid parameters: ", entry); - } - } - - std::unique_ptr conf( - RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); - std::unique_ptr topic_conf( - RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC)); - - std::string errstr; - - RdKafka::Conf::ConfResult result = - conf->set("default_topic_conf", topic_conf.get(), errstr); - if (result != RdKafka::Conf::CONF_OK) { - return errors::Internal("Failed to set default_topic_conf:", errstr); - } - - result = conf->set("bootstrap.servers", dataset()->servers_, errstr); - if (result != RdKafka::Conf::CONF_OK) { - return errors::Internal("Failed to set bootstrap.servers ", - dataset()->servers_, ":", errstr); - } - result = conf->set("group.id", dataset()->group_, errstr); - if (result != RdKafka::Conf::CONF_OK) { - return errors::Internal("Failed to set group.id ", dataset()->group_, - ":", errstr); - } - - consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr)); - if (!consumer_.get()) { - return errors::Internal("Failed to create consumer:", errstr); - } - - std::vector partitions; - partitions.emplace_back(topic_partition_.get()); - RdKafka::ErrorCode err = consumer_->assign(partitions); - if (err != RdKafka::ERR_NO_ERROR) { - return errors::Internal( - "Failed to assign partition [", topic_partition_->topic(), ", ", - topic_partition_->partition(), ", ", topic_partition_->offset(), - "]:", RdKafka::err2str(err)); - } - - return Status::OK(); - } - - // Resets all Kafka streams. - void ResetStreamsLocked() EXCLUSIVE_LOCKS_REQUIRED(mu_) { - consumer_->unassign(); - consumer_->close(); - consumer_.reset(nullptr); - } - - mutex mu_; - size_t current_topic_index_ GUARDED_BY(mu_) = 0; - int64 offset_ GUARDED_BY(mu_) = 0; - int64 limit_ GUARDED_BY(mu_) = -1; - std::unique_ptr topic_partition_ GUARDED_BY(mu_); - std::unique_ptr consumer_ GUARDED_BY(mu_); - }; - - const std::vector topics_; - const std::string servers_; - const std::string group_; - const bool eof_; - const int64 timeout_; - }; -}; - -REGISTER_KERNEL_BUILDER(Name("KafkaDataset").Device(DEVICE_CPU), - KafkaDatasetOp); - -} // namespace tensorflow diff --git a/tensorflow/contrib/kafka/ops/dataset_ops.cc b/tensorflow/contrib/kafka/ops/dataset_ops.cc deleted file mode 100644 index 8cdf16103ba..00000000000 --- a/tensorflow/contrib/kafka/ops/dataset_ops.cc +++ /dev/null @@ -1,44 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/core/framework/common_shape_fns.h" -#include "tensorflow/core/framework/op.h" -#include "tensorflow/core/framework/shape_inference.h" - -namespace tensorflow { - -REGISTER_OP("KafkaDataset") - .Input("topics: string") - .Input("servers: string") - .Input("group: string") - .Input("eof: bool") - .Input("timeout: int64") - .Output("handle: variant") - .SetIsStateful() - .SetShapeFn(shape_inference::ScalarShape) - .Doc(R"doc( -Creates a dataset that emits the messages of one or more Kafka topics. - -topics: A `tf.string` tensor containing one or more subscriptions, - in the format of [topic:partition:offset:length], - by default length is -1 for unlimited. -servers: A list of bootstrap servers. -group: The consumer group id. -eof: If True, the kafka reader will stop on EOF. -timeout: The timeout value for the Kafka Consumer to wait - (in millisecond). -)doc"); - -} // namespace tensorflow diff --git a/tensorflow/contrib/kafka/python/kernel_tests/kafka_test.py b/tensorflow/contrib/kafka/python/kernel_tests/kafka_test.py deleted file mode 100644 index 3651275f935..00000000000 --- a/tensorflow/contrib/kafka/python/kernel_tests/kafka_test.py +++ /dev/null @@ -1,117 +0,0 @@ -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy of -# the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. -# ============================================================================== -"""Tests for KafkaDataset.""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from tensorflow.contrib.kafka.python.ops import kafka_dataset_ops -from tensorflow.python.data.ops import dataset_ops -from tensorflow.python.data.ops import iterator_ops -from tensorflow.python.framework import dtypes -from tensorflow.python.framework import errors -from tensorflow.python.ops import array_ops -from tensorflow.python.platform import test - - -class KafkaDatasetTest(test.TestCase): - - def setUp(self): - # The Kafka server has to be setup before the test - # and tear down after the test manually. - # The docker engine has to be installed. - # - # To setup the Kafka server: - # $ bash kafka_test.sh start kafka - # - # To team down the Kafka server: - # $ bash kafka_test.sh stop kafka - pass - - def testKafkaDataset(self): - topics = array_ops.placeholder(dtypes.string, shape=[None]) - num_epochs = array_ops.placeholder(dtypes.int64, shape=[]) - batch_size = array_ops.placeholder(dtypes.int64, shape=[]) - - repeat_dataset = kafka_dataset_ops.KafkaDataset( - topics, group="test", eof=True).repeat(num_epochs) - batch_dataset = repeat_dataset.batch(batch_size) - - iterator = iterator_ops.Iterator.from_structure( - dataset_ops.get_legacy_output_types(batch_dataset)) - init_op = iterator.make_initializer(repeat_dataset) - init_batch_op = iterator.make_initializer(batch_dataset) - get_next = iterator.get_next() - - with self.cached_session() as sess: - # Basic test: read from topic 0. - sess.run(init_op, feed_dict={topics: ["test:0:0:4"], num_epochs: 1}) - for i in range(5): - self.assertEqual("D" + str(i), sess.run(get_next)) - with self.assertRaises(errors.OutOfRangeError): - sess.run(get_next) - - # Basic test: read from topic 1. - sess.run(init_op, feed_dict={topics: ["test:0:5:-1"], num_epochs: 1}) - for i in range(5): - self.assertEqual("D" + str(i + 5), sess.run(get_next)) - with self.assertRaises(errors.OutOfRangeError): - sess.run(get_next) - - # Basic test: read from both topics. - sess.run( - init_op, - feed_dict={ - topics: ["test:0:0:4", "test:0:5:-1"], - num_epochs: 1 - }) - for j in range(2): - for i in range(5): - self.assertEqual("D" + str(i + j * 5), sess.run(get_next)) - with self.assertRaises(errors.OutOfRangeError): - sess.run(get_next) - - # Test repeated iteration through both files. - sess.run( - init_op, - feed_dict={ - topics: ["test:0:0:4", "test:0:5:-1"], - num_epochs: 10 - }) - for _ in range(10): - for j in range(2): - for i in range(5): - self.assertEqual("D" + str(i + j * 5), sess.run(get_next)) - with self.assertRaises(errors.OutOfRangeError): - sess.run(get_next) - - # Test batched and repeated iteration through both files. - sess.run( - init_batch_op, - feed_dict={ - topics: ["test:0:0:4", "test:0:5:-1"], - num_epochs: 10, - batch_size: 5 - }) - for _ in range(10): - self.assertAllEqual(["D" + str(i) for i in range(5)], - sess.run(get_next)) - self.assertAllEqual(["D" + str(i + 5) for i in range(5)], - sess.run(get_next)) - - -if __name__ == "__main__": - test.main() diff --git a/tensorflow/contrib/kafka/python/kernel_tests/kafka_test.sh b/tensorflow/contrib/kafka/python/kernel_tests/kafka_test.sh deleted file mode 100755 index 69553c3bd15..00000000000 --- a/tensorflow/contrib/kafka/python/kernel_tests/kafka_test.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env bash -# Copyright 2018 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -set -e -set -o pipefail - -if [ "$#" -ne 2 ]; then - echo "Usage: $0 start|stop " >&2 - exit 1 -fi - -action=$1 -container=$2 -if [ "$action" == "start" ]; then - echo pull spotify/kafka - docker pull spotify/kafka - echo pull spotify/kafka successfully - docker run -d --rm --net=host --name=$container spotify/kafka - echo Wait 5 secs until kafka is up and running - sleep 5 - echo Create test topic - docker exec $container bash -c '/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test' - echo Create test message - docker exec $container bash -c 'echo -e "D0\nD1\nD2\nD3\nD4\nD5\nD6\nD7\nD8\nD9" > /test' - echo Produce test message - docker exec $container bash -c '/opt/kafka_2.11-0.10.1.0/bin/kafka-console-producer.sh --topic test --broker-list 127.0.0.1:9092 < /test' - echo Container $container started successfully -elif [ "$action" == "stop" ]; then - docker rm -f $container - echo Container $container removed successfully -else - echo "Usage: $0 start|stop " >&2 - exit 1 -fi - - - diff --git a/tensorflow/contrib/kafka/python/ops/kafka_dataset_ops.py b/tensorflow/contrib/kafka/python/ops/kafka_dataset_ops.py deleted file mode 100644 index ab915d9e029..00000000000 --- a/tensorflow/contrib/kafka/python/ops/kafka_dataset_ops.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Kafka Dataset.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from tensorflow.contrib.kafka.python.ops import gen_dataset_ops -from tensorflow.contrib.kafka.python.ops import kafka_op_loader # pylint: disable=unused-import -from tensorflow.python.data.ops import dataset_ops -from tensorflow.python.framework import dtypes -from tensorflow.python.framework import ops -from tensorflow.python.framework import tensor_spec -from tensorflow.python.util import deprecation - - -class KafkaDataset(dataset_ops.DatasetSource): - """A Kafka Dataset that consumes the message. - """ - - @deprecation.deprecated( - None, - "tf.contrib.kafka will be removed in 2.0, the support for Apache Kafka " - "will continue to be provided through the tensorflow/io GitHub project.") - def __init__(self, - topics, - servers="localhost", - group="", - eof=False, - timeout=1000): - """Create a KafkaReader. - - Args: - topics: A `tf.string` tensor containing one or more subscriptions, - in the format of [topic:partition:offset:length], - by default length is -1 for unlimited. - servers: A list of bootstrap servers. - group: The consumer group id. - eof: If True, the kafka reader will stop on EOF. - timeout: The timeout value for the Kafka Consumer to wait - (in millisecond). - """ - self._topics = ops.convert_to_tensor( - topics, dtype=dtypes.string, name="topics") - self._servers = ops.convert_to_tensor( - servers, dtype=dtypes.string, name="servers") - self._group = ops.convert_to_tensor( - group, dtype=dtypes.string, name="group") - self._eof = ops.convert_to_tensor(eof, dtype=dtypes.bool, name="eof") - self._timeout = ops.convert_to_tensor( - timeout, dtype=dtypes.int64, name="timeout") - - super(KafkaDataset, self).__init__(self._as_variant_tensor()) - - def _as_variant_tensor(self): - return gen_dataset_ops.kafka_dataset(self._topics, self._servers, - self._group, self._eof, self._timeout) - - @property - def element_spec(self): - return tensor_spec.TensorSpec([], dtypes.string) diff --git a/tensorflow/contrib/kafka/python/ops/kafka_op_loader.py b/tensorflow/contrib/kafka/python/ops/kafka_op_loader.py deleted file mode 100644 index ec2fdea962e..00000000000 --- a/tensorflow/contrib/kafka/python/ops/kafka_op_loader.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2018 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Python helper for loading kafka ops and kernels.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from tensorflow.contrib.util import loader -from tensorflow.python.platform import resource_loader - -_dataset_ops = loader.load_op_library( - resource_loader.get_path_to_datafile("../../_dataset_ops.so")) diff --git a/tensorflow/opensource_only.files b/tensorflow/opensource_only.files index 6551b35edd6..ec66f4947c2 100644 --- a/tensorflow/opensource_only.files +++ b/tensorflow/opensource_only.files @@ -93,8 +93,6 @@ tensorflow/third_party/gpus/rocm_configure.bzl tensorflow/third_party/grpc/BUILD tensorflow/third_party/icu/udata.patch tensorflow/third_party/jsoncpp.BUILD -tensorflow/third_party/kafka/BUILD -tensorflow/third_party/kafka/config.patch tensorflow/third_party/libxsmm.BUILD tensorflow/third_party/linenoise.BUILD tensorflow/third_party/llvm/BUILD diff --git a/tensorflow/tools/pip_package/BUILD b/tensorflow/tools/pip_package/BUILD index 68b1ba2de1d..b0e32a2296b 100644 --- a/tensorflow/tools/pip_package/BUILD +++ b/tensorflow/tools/pip_package/BUILD @@ -183,15 +183,6 @@ filegroup( "@com_github_googleapis_googleapis//:LICENSE", "@com_github_googlecloudplatform_google_cloud_cpp//:LICENSE", ], - }) + select({ - "//tensorflow:android": [], - "//tensorflow:ios": [], - "//tensorflow:linux_s390x": [], - "//tensorflow:windows": [], - "//tensorflow:no_kafka_support": [], - "//conditions:default": [ - "@kafka//:LICENSE", - ], }) + select({ "//tensorflow/core/kernels:xsmm": [ "@libxsmm_archive//:LICENSE.md", diff --git a/tensorflow/workspace.bzl b/tensorflow/workspace.bzl index 3a0a0b82eb9..25ba1831c6f 100755 --- a/tensorflow/workspace.bzl +++ b/tensorflow/workspace.bzl @@ -637,18 +637,6 @@ def tf_repositories(path_prefix = "", tf_repo_name = ""): ], ) - tf_http_archive( - name = "kafka", - build_file = clean_dep("//third_party:kafka/BUILD"), - patch_file = clean_dep("//third_party/kafka:config.patch"), - sha256 = "cc6ebbcd0a826eec1b8ce1f625ffe71b53ef3290f8192b6cae38412a958f4fd3", - strip_prefix = "librdkafka-0.11.5", - urls = [ - "https://storage.googleapis.com/mirror.tensorflow.org/github.com/edenhill/librdkafka/archive/v0.11.5.tar.gz", - "https://github.com/edenhill/librdkafka/archive/v0.11.5.tar.gz", - ], - ) - java_import_external( name = "junit", jar_sha256 = "59721f0805e223d84b90677887d9ff567dc534d7c502ca903c0c2b17f05c116a", diff --git a/third_party/kafka/BUILD b/third_party/kafka/BUILD deleted file mode 100644 index 11ec50069a3..00000000000 --- a/third_party/kafka/BUILD +++ /dev/null @@ -1,180 +0,0 @@ -# Description: -# Kafka C/C++ (librdkafka) client library - -licenses(["notice"]) # 2-clause BSD license - -exports_files(["LICENSE"]) - -cc_library( - name = "kafka", - srcs = [ - "config.h", - "src-cpp/ConfImpl.cpp", - "src-cpp/ConsumerImpl.cpp", - "src-cpp/HandleImpl.cpp", - "src-cpp/KafkaConsumerImpl.cpp", - "src-cpp/MessageImpl.cpp", - "src-cpp/MetadataImpl.cpp", - "src-cpp/ProducerImpl.cpp", - "src-cpp/QueueImpl.cpp", - "src-cpp/RdKafka.cpp", - "src-cpp/TopicImpl.cpp", - "src-cpp/TopicPartitionImpl.cpp", - "src/crc32c.c", - "src/crc32c.h", - "src/lz4.c", - "src/lz4.h", - "src/lz4frame.c", - "src/lz4frame.h", - "src/lz4frame_static.h", - "src/lz4hc.c", - "src/lz4hc.h", - "src/lz4opt.h", - "src/queue.h", - "src/rd.h", - "src/rdaddr.c", - "src/rdaddr.h", - "src/rdatomic.h", - "src/rdavg.h", - "src/rdavl.c", - "src/rdavl.h", - "src/rdbuf.c", - "src/rdbuf.h", - "src/rdcrc32.h", - "src/rddl.h", - "src/rdendian.h", - "src/rdgz.c", - "src/rdgz.h", - "src/rdinterval.h", - "src/rdkafka.c", - "src/rdkafka.h", - "src/rdkafka_admin.c", - "src/rdkafka_admin.h", - "src/rdkafka_assignor.c", - "src/rdkafka_assignor.h", - "src/rdkafka_aux.c", - "src/rdkafka_aux.h", - "src/rdkafka_background.c", - "src/rdkafka_broker.c", - "src/rdkafka_broker.h", - "src/rdkafka_buf.c", - "src/rdkafka_buf.h", - "src/rdkafka_cgrp.c", - "src/rdkafka_cgrp.h", - "src/rdkafka_conf.c", - "src/rdkafka_conf.h", - "src/rdkafka_confval.h", - "src/rdkafka_event.h", - "src/rdkafka_feature.c", - "src/rdkafka_feature.h", - "src/rdkafka_header.c", - "src/rdkafka_header.h", - "src/rdkafka_int.h", - "src/rdkafka_interceptor.c", - "src/rdkafka_interceptor.h", - "src/rdkafka_lz4.c", - "src/rdkafka_lz4.h", - "src/rdkafka_metadata.c", - "src/rdkafka_metadata.h", - "src/rdkafka_metadata_cache.c", - "src/rdkafka_msg.c", - "src/rdkafka_msg.h", - "src/rdkafka_msgset.h", - "src/rdkafka_msgset_reader.c", - "src/rdkafka_msgset_writer.c", - "src/rdkafka_offset.c", - "src/rdkafka_offset.h", - "src/rdkafka_op.c", - "src/rdkafka_op.h", - "src/rdkafka_partition.c", - "src/rdkafka_partition.h", - "src/rdkafka_pattern.c", - "src/rdkafka_pattern.h", - "src/rdkafka_proto.h", - "src/rdkafka_queue.c", - "src/rdkafka_queue.h", - "src/rdkafka_range_assignor.c", - "src/rdkafka_request.c", - "src/rdkafka_request.h", - "src/rdkafka_roundrobin_assignor.c", - "src/rdkafka_sasl.c", - "src/rdkafka_sasl.h", - "src/rdkafka_sasl_int.h", - "src/rdkafka_sasl_plain.c", - "src/rdkafka_subscription.c", - "src/rdkafka_timer.c", - "src/rdkafka_timer.h", - "src/rdkafka_topic.c", - "src/rdkafka_topic.h", - "src/rdkafka_transport.c", - "src/rdkafka_transport.h", - "src/rdkafka_transport_int.h", - "src/rdlist.c", - "src/rdlist.h", - "src/rdlog.c", - "src/rdlog.h", - "src/rdmurmur2.c", - "src/rdmurmur2.h", - "src/rdports.c", - "src/rdports.h", - "src/rdposix.h", - "src/rdrand.c", - "src/rdrand.h", - "src/rdregex.c", - "src/rdregex.h", - "src/rdstring.c", - "src/rdstring.h", - "src/rdsysqueue.h", - "src/rdtime.h", - "src/rdtypes.h", - "src/rdunittest.c", - "src/rdunittest.h", - "src/rdvarint.c", - "src/rdvarint.h", - "src/snappy.c", - "src/snappy.h", - "src/tinycthread.c", - "src/tinycthread.h", - "src/xxhash.c", - "src/xxhash.h", - ] + select({ - "@org_tensorflow//tensorflow:windows": [ - "src/rdkafka_sasl_win32.c", - "src/rdwin32.h", - "src/regexp.c", - "src/regexp.h", - ], - "//conditions:default": [], - }), - hdrs = [ - "config.h", - "src-cpp/rdkafkacpp.h", - "src-cpp/rdkafkacpp_int.h", - "src/lz4.c", - "src/snappy_compat.h", - ], - copts = select({ - "@org_tensorflow//tensorflow:windows": [ - "-DWIN32_LEAN_AND_MEAN", - "-DWITHOUT_WIN32_CONFIG", - "-DWITH_ZLIB=1", - "-DWITH_SSL=1", - "-DWITH_SNAPPY=1", - ], - "//conditions:default": [], - }), - defines = ["LIBRDKAFKA_STATICLIB"], - includes = [ - "src", - "src-cpp", - ], - linkopts = select({ - "@org_tensorflow//tensorflow:windows": ["-defaultlib:crypt32.lib"], - "//conditions:default": ["-lpthread"], - }), - visibility = ["//visibility:public"], - deps = [ - "@boringssl//:ssl", - "@zlib_archive//:zlib", - ], -) diff --git a/third_party/kafka/config.patch b/third_party/kafka/config.patch deleted file mode 100644 index fa5c2d35b40..00000000000 --- a/third_party/kafka/config.patch +++ /dev/null @@ -1,44 +0,0 @@ -diff -Naur a/config.h b/config.h ---- a/config.h 1970-01-01 00:00:00.000000000 +0000 -+++ b/config.h 2017-10-28 00:57:03.316957390 +0000 -@@ -0,0 +1,40 @@ -+#pragma once -+#define WITHOUT_OPTIMIZATION 0 -+#define ENABLE_DEVEL 0 -+#define ENABLE_REFCNT_DEBUG 0 -+#define ENABLE_SHAREDPTR_DEBUG 0 -+ -+#define HAVE_ATOMICS_32 1 -+#define HAVE_ATOMICS_32_SYNC 1 -+ -+#if (HAVE_ATOMICS_32) -+# if (HAVE_ATOMICS_32_SYNC) -+# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL) -+# else -+# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST) -+# endif -+#endif -+ -+#define HAVE_ATOMICS_64 1 -+#define HAVE_ATOMICS_64_SYNC 1 -+ -+#if (HAVE_ATOMICS_64) -+# if (HAVE_ATOMICS_64_SYNC) -+# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL) -+# else -+# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST) -+# endif -+#endif -+ -+ -+#define WITH_ZLIB 1 -+#define WITH_LIBDL 1 -+#define WITH_PLUGINS 0 -+#define WITH_SNAPPY 1 -+#define WITH_SOCKEM 1 -+#define WITH_SSL 1 -+#define WITH_SASL 0 -+#define WITH_SASL_SCRAM 0 -+#define WITH_SASL_CYRUS 0 -+#define HAVE_REGEX 1 -+#define HAVE_STRNDUP 1