重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

KafkaConnect及FileConnector的示例分析

这篇文章将为大家详细讲解有关Kafka Connect及FileConnector的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

站在用户的角度思考问题,与客户深入沟通,找到北屯网站设计与北屯网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:成都做网站、成都网站建设、企业官网、英文网站、手机端网站、网站推广、空间域名、网页空间、企业邮箱。业务覆盖北屯地区。

一. Kafka Connect简介

Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便。

Kafka Connect及FileConnector的示例分析

如图中所示,左侧的Sources负责从其他异构系统中读取数据并导入到Kafka中;右侧的Sinks是把Kafka中的数据写入到其他的系统中。

二. 各种Kafka Connector

Kafka Connector很多,包括开源和商业版本的。如下列表中是常用的开源的Connector

ConnectorsReferences
JdbcSource, Sink
Elastic SearchSink1, Sink2, Sink3
CassandraSource1, Source 2, Sink1, Sink2
MongoDBSource
HBaseSink
SyslogSource
MQTT (Source)Source
Twitter (Source)Source, Sink
S3Sink1, Sink2

商业版的可以通过Confluent.io获得

三. 示例

3.1 FileConnector Demo

本例演示如何使用Kafka Connect把Source(test.txt)转为流数据再写入到Destination(test.sink.txt)中。如下图所示:

Kafka Connect及FileConnector的示例分析

本例使用到了两个Connector:

  • FileStreamSource:从test.txt中读取并发布到Broker中

  • FileStreamSink:从Broker中读取数据并写入到test.sink.txt文件中 其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
3.2 运行Demo

需要熟悉Kafka的一些命令行,参考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)

3.2.1 启动Kafka Broker
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/
[root@localhost kafka_2.11-0.11.0.0]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &
3.2.2 启动Source Connector和Sink Connector
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

3.3.3 打开console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
3.3.4 写入到test.txt文件中,并观察3.3.3中的变化
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
3.3.3中打开的窗口输出如下
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
3.3.5 查看test.sink.txt
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line

关于Kafka Connect及FileConnector的示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


网站栏目:KafkaConnect及FileConnector的示例分析
标题来源:http://cqcxhl.cn/article/pcescg.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP