Yong Tang 16a2a9f6bb Add KafkaReader for processing streaming data with Apache Kafka (#14098)
* Add KafkaReader for processing streaming data with Apache Kafka

Apache Kafka is a widely used distributed streaming platform in
open source community. The goal of this fix is to create a contrib
Reader ops (inherits ReaderBase and is similiar to
TextLineReader/TFRecordReader) so that it is possible to reader
Kafka streaming data from TensorFlow in a similiar fashion.

This fix uses a C/C++ Apache Kafka client library librdkafka which
is released under the 2-clause BSD license, and is widely used in
a number of Kafka bindings such as Go, Python, C#/.Net, etc.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Add KafkaReader Python wrapper.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Add BUILD file and op registration for KafkaReader.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Add C++ Kernel for KafkaReader

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Add librdkafka to third_party packages in Bazel

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Add contrib/kafka to part of the contrib bazel file.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Update workspace.bzl

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Comment out clean_deps of `tensorflow/core:framework` and `tensorflow/core:lib`

so that it is possible to build with ReaderBase.

See 1419 for details.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Add group id flag.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Sync offset

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Add test cases and scipt to start and stop Kafka server (with docker)

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Convert to KafkaConsumer from the legacy Consumer with librdkafka

so that thread join does not hang.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Only output offset as the key.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Add timeout attr so that Kafka Consumer could use

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Build Kafka kernels by default, so that to get around the linkage issue.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Convert KafkaReader to KafkaDataset.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Fix workspace.bzl for kafka with tf_http_archive

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Add public visibility

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Address review feedbacks

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Optionally select Kafka support through ./configure

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
2018-01-26 12:45:35 -08:00

45 lines
1.2 KiB
Diff

diff -Naur a/config.h b/config.h
--- a/config.h 1970-01-01 00:00:00.000000000 +0000
+++ b/config.h 2017-10-28 00:57:03.316957390 +0000
@@ -0,0 +1,40 @@
+#pragma once
+#define WITHOUT_OPTIMIZATION 0
+#define ENABLE_DEVEL 0
+#define ENABLE_REFCNT_DEBUG 0
+#define ENABLE_SHAREDPTR_DEBUG 0
+
+#define HAVE_ATOMICS_32 1
+#define HAVE_ATOMICS_32_SYNC 1
+
+#if (HAVE_ATOMICS_32)
+# if (HAVE_ATOMICS_32_SYNC)
+# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL)
+# else
+# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST)
+# endif
+#endif
+
+#define HAVE_ATOMICS_64 1
+#define HAVE_ATOMICS_64_SYNC 1
+
+#if (HAVE_ATOMICS_64)
+# if (HAVE_ATOMICS_64_SYNC)
+# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL)
+# else
+# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST)
+# endif
+#endif
+
+
+#define WITH_ZLIB 1
+#define WITH_LIBDL 1
+#define WITH_PLUGINS 0
+#define WITH_SNAPPY 1
+#define WITH_SOCKEM 1
+#define WITH_SSL 1
+#define WITH_SASL 0
+#define WITH_SASL_SCRAM 0
+#define WITH_SASL_CYRUS 0
+#define HAVE_REGEX 1
+#define HAVE_STRNDUP 1