Remove kafka code from TF main repo.

This code has moved to github.com/tensorflow/io

PiperOrigin-RevId: 266212852
This commit is contained in:
Gunhan Gulsoy 2019-08-29 13:19:33 -07:00 committed by TensorFlower Gardener
parent f03510b71a
commit 1e219a23c8
18 changed files with 0 additions and 1066 deletions

View File

@ -99,7 +99,6 @@ build:numa --define=with_numa_support=true
build:noaws --define=no_aws_support=true
build:nogcp --define=no_gcp_support=true
build:nohdfs --define=no_hdfs_support=true
build:nokafka --define=no_kafka_support=true
build:noignite --define=no_ignite_support=true
build:nonccl --define=no_nccl_support=true

View File

@ -37,7 +37,6 @@
/tensorflow/contrib/hadoop @yongtang
/tensorflow/contrib/hvx/ @satok16
/tensorflow/contrib/integrate/ @shoyer
/tensorflow/contrib/kafka @yongtang
/tensorflow/contrib/kernel_methods/ @petrosmol
/tensorflow/contrib/kinesis @yongtang
/tensorflow/contrib/ios_examples/ @petewarden

View File

@ -1515,7 +1515,6 @@ def main():
config_info_line('nogcp', 'Disable GCP support.')
config_info_line('nohdfs', 'Disable HDFS support.')
config_info_line('noignite', 'Disable Apache Ignite support.')
config_info_line('nokafka', 'Disable Apache Kafka support.')
config_info_line('nonccl', 'Disable NVIDIA NCCL support.')

View File

@ -273,12 +273,6 @@ config_setting(
visibility = ["//visibility:public"],
)
config_setting(
name = "no_kafka_support",
define_values = {"no_kafka_support": "true"},
visibility = ["//visibility:public"],
)
config_setting(
name = "no_nccl_support",
define_values = {"no_nccl_support": "true"},

View File

@ -108,15 +108,6 @@ py_library(
"//tensorflow/python:util",
"//tensorflow/python/estimator:estimator_py",
] + select({
"//tensorflow:android": [],
"//tensorflow:ios": [],
"//tensorflow:linux_s390x": [],
"//tensorflow:windows": [],
"//tensorflow:no_kafka_support": [],
"//conditions:default": [
"//tensorflow/contrib/kafka",
],
}) + select({
"//tensorflow:android": [],
"//tensorflow:ios": [],
"//tensorflow:linux_s390x": [],
@ -175,15 +166,6 @@ cc_library(
"//tensorflow/contrib/tensor_forest:tensor_forest_kernels",
"//tensorflow/contrib/text:all_kernels",
] + select({
"//tensorflow:android": [],
"//tensorflow:ios": [],
"//tensorflow:linux_s390x": [],
"//tensorflow:windows": [],
"//tensorflow:no_kafka_support": [],
"//conditions:default": [
"//tensorflow/contrib/kafka:dataset_kernels",
],
}) + select({
"//tensorflow:android": [],
"//tensorflow:ios": [],
"//tensorflow:linux_s390x": [],
@ -215,15 +197,6 @@ cc_library(
"//tensorflow/contrib/tensor_forest:tensor_forest_ops_op_lib",
"//tensorflow/contrib/text:all_ops",
] + select({
"//tensorflow:android": [],
"//tensorflow:ios": [],
"//tensorflow:linux_s390x": [],
"//tensorflow:windows": [],
"//tensorflow:no_kafka_support": [],
"//conditions:default": [
"//tensorflow/contrib/kafka:dataset_ops_op_lib",
],
}) + select({
"//tensorflow:android": [],
"//tensorflow:ios": [],
"//tensorflow:linux_s390x": [],

View File

@ -1,121 +0,0 @@
package(default_visibility = ["//tensorflow:internal"])
licenses(["notice"]) # Apache 2.0
exports_files(["LICENSE"])
load(
"//tensorflow:tensorflow.bzl",
"tf_custom_op_library",
"tf_custom_op_py_library",
"tf_gen_op_libs",
"tf_gen_op_wrapper_py",
"tf_kernel_library",
"tf_py_test",
)
py_library(
name = "kafka",
srcs = ["__init__.py"],
srcs_version = "PY2AND3",
deps = [
":dataset_ops",
],
)
tf_custom_op_library(
name = "_dataset_ops.so",
srcs = ["ops/dataset_ops.cc"],
deps = [":dataset_kernels"],
)
tf_gen_op_libs(
op_lib_names = ["dataset_ops"],
)
cc_library(
name = "dataset_kernels",
srcs = ["kernels/kafka_dataset_ops.cc"],
deps = [
"//tensorflow/core:framework_headers_lib",
"//third_party/eigen3",
"@com_google_protobuf//:protobuf_headers",
"@kafka",
],
alwayslink = 1,
)
py_library(
name = "dataset_ops",
srcs = [
"python/ops/kafka_dataset_ops.py",
],
srcs_version = "PY2AND3",
deps = [
":kafka_op_loader",
"//tensorflow/python:dataset_ops_gen",
"//tensorflow/python:util",
"//tensorflow/python/data/ops:dataset_ops",
"//tensorflow/python/data/util:nest",
],
)
tf_gen_op_wrapper_py(
name = "gen_dataset_ops",
out = "python/ops/gen_dataset_ops.py",
deps = ["//tensorflow/contrib/kafka:dataset_ops_op_lib"],
)
tf_kernel_library(
name = "dataset_ops_kernels",
deps = [
":dataset_kernels",
"//tensorflow/core:framework",
],
alwayslink = 1,
)
tf_custom_op_py_library(
name = "kafka_op_loader",
srcs = ["python/ops/kafka_op_loader.py"],
dso = ["//tensorflow/contrib/kafka:_dataset_ops.so"],
kernels = [
":dataset_ops_kernels",
"//tensorflow/contrib/kafka:dataset_ops_op_lib",
],
srcs_version = "PY2AND3",
deps = [
":gen_dataset_ops",
"//tensorflow/contrib/util:util_py",
"//tensorflow/python:platform",
],
)
# The Kafka server has to be setup before running the test.
# The Kafka server is setup through Docker so the Docker engine
# has to be installed.
#
# Once the Docker engine is ready:
# To setup the Kafka server:
# $ bash tensorflow/contrib/kafka/python/kernel_tests/kafka_test.sh start kafka
#
# After the test is complete:
# To team down the Kafka server:
# $ bash tensorflow/contrib/kafka/python/kernel_tests/kafka_test.sh stop kafka
tf_py_test(
name = "kafka_test",
srcs = ["python/kernel_tests/kafka_test.py"],
additional_deps = [
":kafka",
"//third_party/py/numpy",
"//tensorflow/python:client_testlib",
"//tensorflow/python:framework",
"//tensorflow/python:framework_test_lib",
"//tensorflow/python:platform_test",
],
tags = [
"manual",
"no_windows",
"notap",
],
)

View File

@ -1,32 +0,0 @@
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Kafka Dataset.
@@KafkaDataset
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.contrib.kafka.python.ops.kafka_dataset_ops import KafkaDataset
from tensorflow.python.util.all_util import remove_undocumented
_allowed_symbols = [
"KafkaDataset",
]
remove_undocumented(__name__)

View File

@ -1,322 +0,0 @@
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/framework/dataset.h"
#include "rdkafkacpp.h"
namespace tensorflow {
class KafkaDatasetOp : public DatasetOpKernel {
public:
using DatasetOpKernel::DatasetOpKernel;
void MakeDataset(OpKernelContext* ctx, DatasetBase** output) override {
const Tensor* topics_tensor;
OP_REQUIRES_OK(ctx, ctx->input("topics", &topics_tensor));
OP_REQUIRES(
ctx, topics_tensor->dims() <= 1,
errors::InvalidArgument("`topics` must be a scalar or a vector."));
std::vector<string> topics;
topics.reserve(topics_tensor->NumElements());
for (int i = 0; i < topics_tensor->NumElements(); ++i) {
topics.push_back(topics_tensor->flat<tstring>()(i));
}
std::string servers = "";
OP_REQUIRES_OK(
ctx, data::ParseScalarArgument<tstring>(ctx, "servers", &servers));
std::string group = "";
OP_REQUIRES_OK(ctx,
data::ParseScalarArgument<tstring>(ctx, "group", &group));
bool eof = false;
OP_REQUIRES_OK(ctx, data::ParseScalarArgument<bool>(ctx, "eof", &eof));
int64 timeout = -1;
OP_REQUIRES_OK(ctx,
data::ParseScalarArgument<int64>(ctx, "timeout", &timeout));
OP_REQUIRES(ctx, (timeout > 0),
errors::InvalidArgument(
"Timeout value should be large than 0, got ", timeout));
*output = new Dataset(ctx, std::move(topics), servers, group, eof, timeout);
}
private:
class Dataset : public DatasetBase {
public:
Dataset(OpKernelContext* ctx, std::vector<string> topics,
const string& servers, const string& group, const bool eof,
const int64 timeout)
: DatasetBase(DatasetContext(ctx)),
topics_(std::move(topics)),
servers_(servers),
group_(group),
eof_(eof),
timeout_(timeout) {}
std::unique_ptr<IteratorBase> MakeIteratorInternal(
const string& prefix) const override {
return std::unique_ptr<IteratorBase>(
new Iterator({this, strings::StrCat(prefix, "::Kafka")}));
}
const DataTypeVector& output_dtypes() const override {
static DataTypeVector* dtypes = new DataTypeVector({DT_STRING});
return *dtypes;
}
const std::vector<PartialTensorShape>& output_shapes() const override {
static std::vector<PartialTensorShape>* shapes =
new std::vector<PartialTensorShape>({{}});
return *shapes;
}
string DebugString() const override { return "KafkaDatasetOp::Dataset"; }
protected:
Status AsGraphDefInternal(SerializationContext* ctx,
DatasetGraphDefBuilder* b,
Node** output) const override {
Node* topics = nullptr;
TF_RETURN_IF_ERROR(b->AddVector(topics_, &topics));
Node* servers = nullptr;
TF_RETURN_IF_ERROR(b->AddScalar(servers_, &servers));
Node* group = nullptr;
TF_RETURN_IF_ERROR(b->AddScalar(group_, &group));
Node* eof = nullptr;
TF_RETURN_IF_ERROR(b->AddScalar(eof_, &eof));
Node* timeout = nullptr;
TF_RETURN_IF_ERROR(b->AddScalar(timeout_, &timeout));
TF_RETURN_IF_ERROR(
b->AddDataset(this, {topics, servers, group, eof, timeout}, output));
return Status::OK();
}
private:
class Iterator : public DatasetIterator<Dataset> {
public:
explicit Iterator(const Params& params)
: DatasetIterator<Dataset>(params) {}
Status GetNextInternal(IteratorContext* ctx,
std::vector<Tensor>* out_tensors,
bool* end_of_sequence) override {
mutex_lock l(mu_);
do {
// We are currently processing a topic, so try to read the next line.
if (consumer_.get()) {
while (true) {
if (limit_ >= 0 &&
(topic_partition_->offset() >= limit_ || offset_ >= limit_)) {
// EOF current topic
break;
}
std::unique_ptr<RdKafka::Message> message(
consumer_->consume(dataset()->timeout_));
if (message->err() == RdKafka::ERR_NO_ERROR) {
// Produce the line as output.
Tensor line_tensor(cpu_allocator(), DT_STRING, {});
line_tensor.scalar<tstring>()().assign(
static_cast<const char*>(message->payload()),
message->len());
out_tensors->emplace_back(std::move(line_tensor));
*end_of_sequence = false;
// Sync offset
offset_ = message->offset();
return Status::OK();
}
if (message->err() == RdKafka::ERR__PARTITION_EOF &&
dataset()->eof_) {
// EOF current topic
break;
}
if (message->err() != RdKafka::ERR__TIMED_OUT) {
return errors::Internal("Failed to consume:",
message->errstr());
}
message.reset(nullptr);
consumer_->poll(0);
}
// We have reached the end of the current topic, so maybe
// move on to next topic.
ResetStreamsLocked();
++current_topic_index_;
}
// Iteration ends when there are no more topic to process.
if (current_topic_index_ == dataset()->topics_.size()) {
*end_of_sequence = true;
return Status::OK();
}
TF_RETURN_IF_ERROR(SetupStreamsLocked(ctx->env()));
} while (true);
}
protected:
Status SaveInternal(IteratorStateWriter* writer) override {
mutex_lock l(mu_);
TF_RETURN_IF_ERROR(writer->WriteScalar(full_name("current_topic_index"),
current_topic_index_));
// `consumer_` is empty if
// 1. GetNext has not been called even once.
// 2. All topics have been read and iterator has been exhausted.
if (consumer_.get()) {
TF_RETURN_IF_ERROR(
writer->WriteScalar(full_name("current_pos"), offset_));
}
return Status::OK();
}
Status RestoreInternal(IteratorContext* ctx,
IteratorStateReader* reader) override {
mutex_lock l(mu_);
ResetStreamsLocked();
int64 current_topic_index;
TF_RETURN_IF_ERROR(reader->ReadScalar(full_name("current_topic_index"),
&current_topic_index));
current_topic_index_ = size_t(current_topic_index);
// The key "current_pos" is written only if the iterator was saved
// with an open topic.
if (reader->Contains(full_name("current_pos"))) {
int64 current_pos;
TF_RETURN_IF_ERROR(
reader->ReadScalar(full_name("current_pos"), &current_pos));
TF_RETURN_IF_ERROR(SetupStreamsLocked(ctx->env()));
topic_partition_->set_offset(current_pos);
if (topic_partition_->offset() != current_pos) {
return errors::Internal("Failed to restore to offset ",
current_pos);
}
offset_ = current_pos;
}
return Status::OK();
}
private:
// Sets up Kafka streams to read from the topic at
// `current_topic_index_`.
Status SetupStreamsLocked(Env* env) EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (current_topic_index_ >= dataset()->topics_.size()) {
return errors::InvalidArgument(
"current_topic_index_:", current_topic_index_,
" >= topics_.size():", dataset()->topics_.size());
}
// Actually move on to next topic.
string entry = dataset()->topics_[current_topic_index_];
std::vector<string> parts = str_util::Split(entry, ":");
if (parts.size() < 1) {
return errors::InvalidArgument("Invalid parameters: ", entry);
}
string topic = parts[0];
int32 partition = 0;
if (parts.size() > 1) {
if (!strings::safe_strto32(parts[1], &partition)) {
return errors::InvalidArgument("Invalid parameters: ", entry);
}
}
int64 offset = 0;
if (parts.size() > 2) {
if (!strings::safe_strto64(parts[2], &offset)) {
return errors::InvalidArgument("Invalid parameters: ", entry);
}
}
topic_partition_.reset(
RdKafka::TopicPartition::create(topic, partition, offset));
offset_ = topic_partition_->offset();
limit_ = -1;
if (parts.size() > 3) {
if (!strings::safe_strto64(parts[3], &limit_)) {
return errors::InvalidArgument("Invalid parameters: ", entry);
}
}
std::unique_ptr<RdKafka::Conf> conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::unique_ptr<RdKafka::Conf> topic_conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
std::string errstr;
RdKafka::Conf::ConfResult result =
conf->set("default_topic_conf", topic_conf.get(), errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to set default_topic_conf:", errstr);
}
result = conf->set("bootstrap.servers", dataset()->servers_, errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to set bootstrap.servers ",
dataset()->servers_, ":", errstr);
}
result = conf->set("group.id", dataset()->group_, errstr);
if (result != RdKafka::Conf::CONF_OK) {
return errors::Internal("Failed to set group.id ", dataset()->group_,
":", errstr);
}
consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr));
if (!consumer_.get()) {
return errors::Internal("Failed to create consumer:", errstr);
}
std::vector<RdKafka::TopicPartition*> partitions;
partitions.emplace_back(topic_partition_.get());
RdKafka::ErrorCode err = consumer_->assign(partitions);
if (err != RdKafka::ERR_NO_ERROR) {
return errors::Internal(
"Failed to assign partition [", topic_partition_->topic(), ", ",
topic_partition_->partition(), ", ", topic_partition_->offset(),
"]:", RdKafka::err2str(err));
}
return Status::OK();
}
// Resets all Kafka streams.
void ResetStreamsLocked() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
consumer_->unassign();
consumer_->close();
consumer_.reset(nullptr);
}
mutex mu_;
size_t current_topic_index_ GUARDED_BY(mu_) = 0;
int64 offset_ GUARDED_BY(mu_) = 0;
int64 limit_ GUARDED_BY(mu_) = -1;
std::unique_ptr<RdKafka::TopicPartition> topic_partition_ GUARDED_BY(mu_);
std::unique_ptr<RdKafka::KafkaConsumer> consumer_ GUARDED_BY(mu_);
};
const std::vector<string> topics_;
const std::string servers_;
const std::string group_;
const bool eof_;
const int64 timeout_;
};
};
REGISTER_KERNEL_BUILDER(Name("KafkaDataset").Device(DEVICE_CPU),
KafkaDatasetOp);
} // namespace tensorflow

View File

@ -1,44 +0,0 @@
/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/framework/common_shape_fns.h"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/shape_inference.h"
namespace tensorflow {
REGISTER_OP("KafkaDataset")
.Input("topics: string")
.Input("servers: string")
.Input("group: string")
.Input("eof: bool")
.Input("timeout: int64")
.Output("handle: variant")
.SetIsStateful()
.SetShapeFn(shape_inference::ScalarShape)
.Doc(R"doc(
Creates a dataset that emits the messages of one or more Kafka topics.
topics: A `tf.string` tensor containing one or more subscriptions,
in the format of [topic:partition:offset:length],
by default length is -1 for unlimited.
servers: A list of bootstrap servers.
group: The consumer group id.
eof: If True, the kafka reader will stop on EOF.
timeout: The timeout value for the Kafka Consumer to wait
(in millisecond).
)doc");
} // namespace tensorflow

View File

@ -1,117 +0,0 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
# ==============================================================================
"""Tests for KafkaDataset."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.contrib.kafka.python.ops import kafka_dataset_ops
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.data.ops import iterator_ops
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import errors
from tensorflow.python.ops import array_ops
from tensorflow.python.platform import test
class KafkaDatasetTest(test.TestCase):
def setUp(self):
# The Kafka server has to be setup before the test
# and tear down after the test manually.
# The docker engine has to be installed.
#
# To setup the Kafka server:
# $ bash kafka_test.sh start kafka
#
# To team down the Kafka server:
# $ bash kafka_test.sh stop kafka
pass
def testKafkaDataset(self):
topics = array_ops.placeholder(dtypes.string, shape=[None])
num_epochs = array_ops.placeholder(dtypes.int64, shape=[])
batch_size = array_ops.placeholder(dtypes.int64, shape=[])
repeat_dataset = kafka_dataset_ops.KafkaDataset(
topics, group="test", eof=True).repeat(num_epochs)
batch_dataset = repeat_dataset.batch(batch_size)
iterator = iterator_ops.Iterator.from_structure(
dataset_ops.get_legacy_output_types(batch_dataset))
init_op = iterator.make_initializer(repeat_dataset)
init_batch_op = iterator.make_initializer(batch_dataset)
get_next = iterator.get_next()
with self.cached_session() as sess:
# Basic test: read from topic 0.
sess.run(init_op, feed_dict={topics: ["test:0:0:4"], num_epochs: 1})
for i in range(5):
self.assertEqual("D" + str(i), sess.run(get_next))
with self.assertRaises(errors.OutOfRangeError):
sess.run(get_next)
# Basic test: read from topic 1.
sess.run(init_op, feed_dict={topics: ["test:0:5:-1"], num_epochs: 1})
for i in range(5):
self.assertEqual("D" + str(i + 5), sess.run(get_next))
with self.assertRaises(errors.OutOfRangeError):
sess.run(get_next)
# Basic test: read from both topics.
sess.run(
init_op,
feed_dict={
topics: ["test:0:0:4", "test:0:5:-1"],
num_epochs: 1
})
for j in range(2):
for i in range(5):
self.assertEqual("D" + str(i + j * 5), sess.run(get_next))
with self.assertRaises(errors.OutOfRangeError):
sess.run(get_next)
# Test repeated iteration through both files.
sess.run(
init_op,
feed_dict={
topics: ["test:0:0:4", "test:0:5:-1"],
num_epochs: 10
})
for _ in range(10):
for j in range(2):
for i in range(5):
self.assertEqual("D" + str(i + j * 5), sess.run(get_next))
with self.assertRaises(errors.OutOfRangeError):
sess.run(get_next)
# Test batched and repeated iteration through both files.
sess.run(
init_batch_op,
feed_dict={
topics: ["test:0:0:4", "test:0:5:-1"],
num_epochs: 10,
batch_size: 5
})
for _ in range(10):
self.assertAllEqual(["D" + str(i) for i in range(5)],
sess.run(get_next))
self.assertAllEqual(["D" + str(i + 5) for i in range(5)],
sess.run(get_next))
if __name__ == "__main__":
test.main()

View File

@ -1,50 +0,0 @@
#!/usr/bin/env bash
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
set -e
set -o pipefail
if [ "$#" -ne 2 ]; then
echo "Usage: $0 start|stop <kafka container name>" >&2
exit 1
fi
action=$1
container=$2
if [ "$action" == "start" ]; then
echo pull spotify/kafka
docker pull spotify/kafka
echo pull spotify/kafka successfully
docker run -d --rm --net=host --name=$container spotify/kafka
echo Wait 5 secs until kafka is up and running
sleep 5
echo Create test topic
docker exec $container bash -c '/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test'
echo Create test message
docker exec $container bash -c 'echo -e "D0\nD1\nD2\nD3\nD4\nD5\nD6\nD7\nD8\nD9" > /test'
echo Produce test message
docker exec $container bash -c '/opt/kafka_2.11-0.10.1.0/bin/kafka-console-producer.sh --topic test --broker-list 127.0.0.1:9092 < /test'
echo Container $container started successfully
elif [ "$action" == "stop" ]; then
docker rm -f $container
echo Container $container removed successfully
else
echo "Usage: $0 start|stop <kafka container name>" >&2
exit 1
fi

View File

@ -1,73 +0,0 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Kafka Dataset."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.contrib.kafka.python.ops import gen_dataset_ops
from tensorflow.contrib.kafka.python.ops import kafka_op_loader # pylint: disable=unused-import
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_spec
from tensorflow.python.util import deprecation
class KafkaDataset(dataset_ops.DatasetSource):
"""A Kafka Dataset that consumes the message.
"""
@deprecation.deprecated(
None,
"tf.contrib.kafka will be removed in 2.0, the support for Apache Kafka "
"will continue to be provided through the tensorflow/io GitHub project.")
def __init__(self,
topics,
servers="localhost",
group="",
eof=False,
timeout=1000):
"""Create a KafkaReader.
Args:
topics: A `tf.string` tensor containing one or more subscriptions,
in the format of [topic:partition:offset:length],
by default length is -1 for unlimited.
servers: A list of bootstrap servers.
group: The consumer group id.
eof: If True, the kafka reader will stop on EOF.
timeout: The timeout value for the Kafka Consumer to wait
(in millisecond).
"""
self._topics = ops.convert_to_tensor(
topics, dtype=dtypes.string, name="topics")
self._servers = ops.convert_to_tensor(
servers, dtype=dtypes.string, name="servers")
self._group = ops.convert_to_tensor(
group, dtype=dtypes.string, name="group")
self._eof = ops.convert_to_tensor(eof, dtype=dtypes.bool, name="eof")
self._timeout = ops.convert_to_tensor(
timeout, dtype=dtypes.int64, name="timeout")
super(KafkaDataset, self).__init__(self._as_variant_tensor())
def _as_variant_tensor(self):
return gen_dataset_ops.kafka_dataset(self._topics, self._servers,
self._group, self._eof, self._timeout)
@property
def element_spec(self):
return tensor_spec.TensorSpec([], dtypes.string)

View File

@ -1,24 +0,0 @@
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Python helper for loading kafka ops and kernels."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.contrib.util import loader
from tensorflow.python.platform import resource_loader
_dataset_ops = loader.load_op_library(
resource_loader.get_path_to_datafile("../../_dataset_ops.so"))

View File

@ -93,8 +93,6 @@ tensorflow/third_party/gpus/rocm_configure.bzl
tensorflow/third_party/grpc/BUILD
tensorflow/third_party/icu/udata.patch
tensorflow/third_party/jsoncpp.BUILD
tensorflow/third_party/kafka/BUILD
tensorflow/third_party/kafka/config.patch
tensorflow/third_party/libxsmm.BUILD
tensorflow/third_party/linenoise.BUILD
tensorflow/third_party/llvm/BUILD

View File

@ -183,15 +183,6 @@ filegroup(
"@com_github_googleapis_googleapis//:LICENSE",
"@com_github_googlecloudplatform_google_cloud_cpp//:LICENSE",
],
}) + select({
"//tensorflow:android": [],
"//tensorflow:ios": [],
"//tensorflow:linux_s390x": [],
"//tensorflow:windows": [],
"//tensorflow:no_kafka_support": [],
"//conditions:default": [
"@kafka//:LICENSE",
],
}) + select({
"//tensorflow/core/kernels:xsmm": [
"@libxsmm_archive//:LICENSE.md",

View File

@ -637,18 +637,6 @@ def tf_repositories(path_prefix = "", tf_repo_name = ""):
],
)
tf_http_archive(
name = "kafka",
build_file = clean_dep("//third_party:kafka/BUILD"),
patch_file = clean_dep("//third_party/kafka:config.patch"),
sha256 = "cc6ebbcd0a826eec1b8ce1f625ffe71b53ef3290f8192b6cae38412a958f4fd3",
strip_prefix = "librdkafka-0.11.5",
urls = [
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/edenhill/librdkafka/archive/v0.11.5.tar.gz",
"https://github.com/edenhill/librdkafka/archive/v0.11.5.tar.gz",
],
)
java_import_external(
name = "junit",
jar_sha256 = "59721f0805e223d84b90677887d9ff567dc534d7c502ca903c0c2b17f05c116a",

View File

@ -1,180 +0,0 @@
# Description:
# Kafka C/C++ (librdkafka) client library
licenses(["notice"]) # 2-clause BSD license
exports_files(["LICENSE"])
cc_library(
name = "kafka",
srcs = [
"config.h",
"src-cpp/ConfImpl.cpp",
"src-cpp/ConsumerImpl.cpp",
"src-cpp/HandleImpl.cpp",
"src-cpp/KafkaConsumerImpl.cpp",
"src-cpp/MessageImpl.cpp",
"src-cpp/MetadataImpl.cpp",
"src-cpp/ProducerImpl.cpp",
"src-cpp/QueueImpl.cpp",
"src-cpp/RdKafka.cpp",
"src-cpp/TopicImpl.cpp",
"src-cpp/TopicPartitionImpl.cpp",
"src/crc32c.c",
"src/crc32c.h",
"src/lz4.c",
"src/lz4.h",
"src/lz4frame.c",
"src/lz4frame.h",
"src/lz4frame_static.h",
"src/lz4hc.c",
"src/lz4hc.h",
"src/lz4opt.h",
"src/queue.h",
"src/rd.h",
"src/rdaddr.c",
"src/rdaddr.h",
"src/rdatomic.h",
"src/rdavg.h",
"src/rdavl.c",
"src/rdavl.h",
"src/rdbuf.c",
"src/rdbuf.h",
"src/rdcrc32.h",
"src/rddl.h",
"src/rdendian.h",
"src/rdgz.c",
"src/rdgz.h",
"src/rdinterval.h",
"src/rdkafka.c",
"src/rdkafka.h",
"src/rdkafka_admin.c",
"src/rdkafka_admin.h",
"src/rdkafka_assignor.c",
"src/rdkafka_assignor.h",
"src/rdkafka_aux.c",
"src/rdkafka_aux.h",
"src/rdkafka_background.c",
"src/rdkafka_broker.c",
"src/rdkafka_broker.h",
"src/rdkafka_buf.c",
"src/rdkafka_buf.h",
"src/rdkafka_cgrp.c",
"src/rdkafka_cgrp.h",
"src/rdkafka_conf.c",
"src/rdkafka_conf.h",
"src/rdkafka_confval.h",
"src/rdkafka_event.h",
"src/rdkafka_feature.c",
"src/rdkafka_feature.h",
"src/rdkafka_header.c",
"src/rdkafka_header.h",
"src/rdkafka_int.h",
"src/rdkafka_interceptor.c",
"src/rdkafka_interceptor.h",
"src/rdkafka_lz4.c",
"src/rdkafka_lz4.h",
"src/rdkafka_metadata.c",
"src/rdkafka_metadata.h",
"src/rdkafka_metadata_cache.c",
"src/rdkafka_msg.c",
"src/rdkafka_msg.h",
"src/rdkafka_msgset.h",
"src/rdkafka_msgset_reader.c",
"src/rdkafka_msgset_writer.c",
"src/rdkafka_offset.c",
"src/rdkafka_offset.h",
"src/rdkafka_op.c",
"src/rdkafka_op.h",
"src/rdkafka_partition.c",
"src/rdkafka_partition.h",
"src/rdkafka_pattern.c",
"src/rdkafka_pattern.h",
"src/rdkafka_proto.h",
"src/rdkafka_queue.c",
"src/rdkafka_queue.h",
"src/rdkafka_range_assignor.c",
"src/rdkafka_request.c",
"src/rdkafka_request.h",
"src/rdkafka_roundrobin_assignor.c",
"src/rdkafka_sasl.c",
"src/rdkafka_sasl.h",
"src/rdkafka_sasl_int.h",
"src/rdkafka_sasl_plain.c",
"src/rdkafka_subscription.c",
"src/rdkafka_timer.c",
"src/rdkafka_timer.h",
"src/rdkafka_topic.c",
"src/rdkafka_topic.h",
"src/rdkafka_transport.c",
"src/rdkafka_transport.h",
"src/rdkafka_transport_int.h",
"src/rdlist.c",
"src/rdlist.h",
"src/rdlog.c",
"src/rdlog.h",
"src/rdmurmur2.c",
"src/rdmurmur2.h",
"src/rdports.c",
"src/rdports.h",
"src/rdposix.h",
"src/rdrand.c",
"src/rdrand.h",
"src/rdregex.c",
"src/rdregex.h",
"src/rdstring.c",
"src/rdstring.h",
"src/rdsysqueue.h",
"src/rdtime.h",
"src/rdtypes.h",
"src/rdunittest.c",
"src/rdunittest.h",
"src/rdvarint.c",
"src/rdvarint.h",
"src/snappy.c",
"src/snappy.h",
"src/tinycthread.c",
"src/tinycthread.h",
"src/xxhash.c",
"src/xxhash.h",
] + select({
"@org_tensorflow//tensorflow:windows": [
"src/rdkafka_sasl_win32.c",
"src/rdwin32.h",
"src/regexp.c",
"src/regexp.h",
],
"//conditions:default": [],
}),
hdrs = [
"config.h",
"src-cpp/rdkafkacpp.h",
"src-cpp/rdkafkacpp_int.h",
"src/lz4.c",
"src/snappy_compat.h",
],
copts = select({
"@org_tensorflow//tensorflow:windows": [
"-DWIN32_LEAN_AND_MEAN",
"-DWITHOUT_WIN32_CONFIG",
"-DWITH_ZLIB=1",
"-DWITH_SSL=1",
"-DWITH_SNAPPY=1",
],
"//conditions:default": [],
}),
defines = ["LIBRDKAFKA_STATICLIB"],
includes = [
"src",
"src-cpp",
],
linkopts = select({
"@org_tensorflow//tensorflow:windows": ["-defaultlib:crypt32.lib"],
"//conditions:default": ["-lpthread"],
}),
visibility = ["//visibility:public"],
deps = [
"@boringssl//:ssl",
"@zlib_archive//:zlib",
],
)

View File

@ -1,44 +0,0 @@
diff -Naur a/config.h b/config.h
--- a/config.h 1970-01-01 00:00:00.000000000 +0000
+++ b/config.h 2017-10-28 00:57:03.316957390 +0000
@@ -0,0 +1,40 @@
+#pragma once
+#define WITHOUT_OPTIMIZATION 0
+#define ENABLE_DEVEL 0
+#define ENABLE_REFCNT_DEBUG 0
+#define ENABLE_SHAREDPTR_DEBUG 0
+
+#define HAVE_ATOMICS_32 1
+#define HAVE_ATOMICS_32_SYNC 1
+
+#if (HAVE_ATOMICS_32)
+# if (HAVE_ATOMICS_32_SYNC)
+# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL)
+# else
+# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST)
+# endif
+#endif
+
+#define HAVE_ATOMICS_64 1
+#define HAVE_ATOMICS_64_SYNC 1
+
+#if (HAVE_ATOMICS_64)
+# if (HAVE_ATOMICS_64_SYNC)
+# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL)
+# else
+# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST)
+# endif
+#endif
+
+
+#define WITH_ZLIB 1
+#define WITH_LIBDL 1
+#define WITH_PLUGINS 0
+#define WITH_SNAPPY 1
+#define WITH_SOCKEM 1
+#define WITH_SSL 1
+#define WITH_SASL 0
+#define WITH_SASL_SCRAM 0
+#define WITH_SASL_CYRUS 0
+#define HAVE_REGEX 1
+#define HAVE_STRNDUP 1