重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
1- Kafka的相关使用操作
创新互联公司专业为企业提供张家港网站建设、张家港做网站、张家港网站设计、张家港网站制作等企业网站建设、网页设计与制作、张家港企业网站模板建站服务,10年张家港做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。2- Kafka的核心原理:
什么是消息队列呢?
消息: 数据 只不过这个数据具有一种流动状态
队列: 存储数据的容器 只不过这个容器具有FIFO(先进先出)特性
消息队列: 数据在队列中, 从队列的一端传递到另一端的过程, 数据在整个队列中产生一种流动状态
1.2 常见的消息队列的产品常见的消息队列的产品:
Kafka是Apache旗下的一款开源免费的消息队列的中间件产品,最早是由领英公司开发的, 后期共享给Apache, 目前已经是Apache旗下的顶级开源的项目, 采用语言为Scala
官方网站: http://www.kafka.apache.org
适用场景: 数据传递工作, 需要将数据从一端传递到另一端, 此时可以通过Kafka来实现, 不局限两端的程序
在实时领域中, 主要是用于流式的数据处理工作
3 Kafka的架构Kafka Cluster: kafka集群
broker: kafka的节点
producer: 生产者
consumer: 消费者
Topic: 主题/话题 理解就是一个大的逻辑容器(管道)
shard: 分片. 一个Topic可以被分为N多个分片, 分片的数量与节点数据没有关系
replicas: 副本, 可以对每一个分片构建多个副本, 副本数量最多和节点数量一致(包含本身) 保证数据不丢失
zookeeper: 存储管理集群的元数据信息
4 Kafka的安装操作参考Kafka的集群安装文档 完成整个安装工作即可
如果安装后, 无法启动, 可能遇到的问题:
1) 配置文件中忘记修改broker id
2) 忘记修改监听的地址, 或者修改了但是前面的注释没有打开
如何启动Kafka集群:
启动zookeeper集群: 每个节点都要执行
cd /export/server/zookeeper/bin
./zkServer.sh start
启动完成后 需要通过 jps -m 查看是否启动 , 并且还需要通过:
./zkServer.sh status 查看状态, 必须看到一个leader 两个follower才认为启动成功了
启动Kafka集群:
单节点: 每个节点都需要执行
cd /export/server/kafka_2.12-2.4.1/bin
前台启动:
./kafka-server-start.sh ../config/server.properties
后台启动:
nohup ./kafka-server-start.sh ../config/server.properties 2>&1 &
注意: 第一次启动, 建议先前台启动, 观察是否可以正常启动, 如果OK, ctrl +C 退出, 然后挂载到后台
如何停止:
单节点: 每个节点都需要执行
cd /export/server/kafka_2.12-2.4.1/bin
操作:
jps 然后通过 kill -9
或者:
./kafka-server-stop.sh
配置一键化脚本: 仅用于启动Kafka 不会启动zookeeper, zookeeper还是需要单独启动, 或者配置zookeeper的一键化脚本
mkdir -p /export/onekey
cd /export/onekey/
上传即可
cd /export/onekey/
chmod 755 *.sh
Kafka是一个消息队列的中间件产品, 主要的作用: 将数据从程序一端传递到另一端的操作, 所以说学习Kafka主要学习如何使用Kafka生产数据, 以及如何使用Kafka消费数据
5.1 Kafka的shell命令使用./kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 3 --replication-factor 2
./kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181
./kafka-topics.sh --describe --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
./kafka-topics.sh --delete --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
注意:
默认情况下, 删除一个topic 仅仅是标记删除, 主要原因: Kafka担心直接删除, 会导致误删数据
如果想执行删除的时候, 直接将topic完整的删除掉: 此时需要在server.properties配置中修改一下配置为True
delete.topic.enable=true
如果本身Topic中的数据量非常少, 或者没有任何的使用, 此时Topic会自动先执行逻辑删除, 然后在物理删除, 不管是否配置delete.topic.enable
Topic 仅允许增大分片, 不允许减少分片 同时也不支持修改副本的数量
增大分区:
./kafka-topics.sh --alter --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 5
./kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test01
>
./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test01
默认从当前的时间开始消费数据, 如果想从头开始消费, 可以添加 --from-beginning 参数即可
5.2 Kafka的基准测试 Kafka的基准测试: 主要指的是将安装完成的Kafka集群, 进行测试操作, 测试其能否承载多大的并发量(读写的效率)
注意: 在进行Kafka的基准测试的时候, 受Topic的分片和副本的数量影响会比较大, 一般在测试的时候, 会构建多个topic, 每一个topic设置不同的分片和副本的数量, 比如: 一个设置分片多一些, 副本多一些 一个设置分片多一些, 副本少些…
./kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test02 --partitions 6 --replication-factor 1
cd /export/server/kafka/bin
./kafka-producer-perf-test.sh --topic test02 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
属性说明:
--num-records : 发送的总消息量
--throughput : 指定吞吐量(限流) -1 不限制
--record-size: 每条数据的大小(字节)
--producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1 : 设置kafka的地址和消息发送模式
cd /export/server/kafka/bin
./kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test02 --fetch-size 1048576 --messages 5000000
属性说明:
--fetch-size : 每次从Kafka端拉取数据量
--message: 测试的总消息量
假设Kafka的节点数量是无限多的:
topic分片数量越多, 理论上读写效率越高
topic副本数量越多, 整体执行效率越差
一般可以将分片的数量设置为副本数量的三倍左右 可以测试出比较最佳的性能 副本调整为1
5.3 Kafka的Java API的操作aliyun http://maven.aliyun.com/nexus/content/groups/public/ true false never org.apache.kafka kafka-clients 2.4.1 org.apache.commons commons-io 1.3.2 org.slf4j slf4j-log4j12 1.7.6 log4j log4j 1.2.16 org.apache.maven.plugins maven-compiler-plugin 3.1 1.8
package com.itheima.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerTest {public static void main(String[] args) {// 第一步: 创建kafka的生产者核心对象: KafkaProducer 传入相关的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producerproducer = new KafkaProducer<>(props);
//2. 执行发送数据操作
for (int i = 0; i< 10; i++) {ProducerRecordproducerRecord = new ProducerRecord<>(
"test01", "张三"+i
);
producer.send(producerRecord);
}
//3. 执行close 释放资源
producer.close();
}
}
5.3.2 演示如何从Kafka消费数据package com.itheima.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerTest {public static void main(String[] args) {// 1- 创建Kafka的消费者的核心对象: KafkaConsumer
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("group.id", "test"); // 消费者组的ID
props.put("enable.auto.commit", "true"); // 是否自动提交偏移量offset
props.put("auto.commit.interval.ms", "1000"); // 自动提交的间隔时间
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key值的反序列化的类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的值反序列化的类
KafkaConsumerconsumer = new KafkaConsumer<>(props);
//2. 订阅topic: 表示消费者从那个topic来消费数据 可以指定多个
consumer.subscribe(Arrays.asList("test01"));
while (true) {// 3. 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候, 等待的超时时间, 如果过了等待的时间, 返回空对象(对象存在, 但是内部没有数据 相当于空容器)
ConsumerRecordsrecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecordrecord : records) {long offset = record.offset();
String key = record.key();
String value = record.value();
// 偏移量: 每一条数据 其实就是一个偏移量 , 每个分片单独统计消息到达了第几个偏移量 偏移量从 0 开始的
System.out.println("消息的偏移量为:"+offset+"; key值为:"+key + "; value的值为:"+ value);
}
}
}
}
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧