重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要讲解了“怎么理解kafka分区、生产和消费”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么理解kafka分区、生产和消费”吧!
周村网站建设公司成都创新互联公司,周村网站设计制作,有大型网站制作公司丰富经验。已为周村上千多家提供企业网站建设服务。企业网站搭建\外贸网站制作要多少钱,请找那个售后服务好的周村做网站的公司定做!
分区规则指的是将每个Topic划分成多个分区(Partition),每个分区是一组有序的消息日志,生产者生产的每条消息只会被发送到其中一个分区。
分区 (Partition) 都是一个有序的、不可变的数据序列,消息数据被不断的添加到序列的尾部。分区中的每一条消息数据都被赋予了一个连续的数字ID,即偏移量 (offset) ,用于唯一标识分区中的每条消息数据。
分区(Partition)的作用就是提供负载均衡的能力,单个topic的不同分区可存储在相同或不同节点机上,为实现系统的高伸缩性(Scalability),不同的分区被放置到不同节点的机器上,各节点机独立地执行各自分区的读写任务,如果性能不足,可通过添加新的节点机器来增加整体系统的吞吐量。
Kafka分区下数据使用消息日志(Log)方式保存数据,具体方式是在磁盘上创建只能追加写(Append-only)消息的物理文件。因为只能追加写入,因此避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作。Kafka日志文件分为多个日志段(Log Segment),消息被追加写到当前最新的日志段中,当写满一个日志段后Kafka会自动切分出一个新的日志段,并将旧的日志段封存。
Kafka将消息数据根据Partition进行存储,Partition分为若干Segment,每个Segment的大小相等。Segment由index file、log file、timeindex file等组成,后缀为".index"和".log",分别表示为Segment索引文件、数据文件,每一个Segment存储着多条信息。
分区策略是决定生产者将消息发送到哪个分区的算法。Kafka提供默认的分区策略,同时支持自定义分区策略。
Kafka 默认分区策略同时实现了两种策略:如果指定Key,那么默认实现按消息键保序策略;如果没有指定Key,则使用轮询策略
轮询策略(Round-robin),即顺序分配策略。如果一个Topic有3个分区,则第1条消息被发送到分区0,第2条被发送到分区1,第3条被发送到分区2,以此类推。当生产第4条消息时又会重新轮询将其分配到分区0。
轮询策略是Kafka Java生产者API默认提供的分区策略。如果未指定partitioner.class参数,那么生产者程序会按照轮询的方式在Topic的所有分区间均匀地存储消息。轮询策略有非常优秀的负载均衡表现,能保证消息最大限度地被平均分配到所有分区上。
随机策略(Randomness)是将消息随机地放置到任意一个分区上。如果要实现随机策略版的partition方法,Java版如下:
Listpartitions = cluster.partitionsForTopic(topic); return ThreadLocalRandom.current().nextInt(partitions.size());
先计算出Topic的总分区数,然后随机地返回一个小于分区数的正整数。随机策略本质上是力求将数据均匀地分散到各个分区,但实际表现要逊于轮询策略,如果追求数据的均匀分布,推荐使用轮询策略。
Kafka允许为每条消息定义消息键,简称为Key。Key可以是一个有着明确业务含义的字符串,如客户代码、部门编号或是业务ID等,也可以用来表征消息元数据。一旦消息被定义了Key,就可以保证同一个Key的所有消息都进入到相同的分区中。
实现分区策略的partition方法只需要两行代码即可:
Listpartitions = cluster.partitionsForTopic(topic); return Math.abs(key.hashCode()) % partitions.size();
基于地理位置的分区策略通常只针对大规模的Kafka集群,特别是跨城市、跨国家甚至跨大洲的集群。假设天猫计划为每个新注册用户提供一份注册礼品,比如欧美的用户注册天猫时可以免费得到一台iphone SE手机,而中国的新注册用户可以得到一台华为P40 Pro。为了实现相应的注册业务逻辑,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理欧美和中国用户的注册用户逻辑即可,同时必须把不同地理位置的用户注册的消息发送到不同机房中,因为处理注册消息的消费者程序只可能在某一个机房中启动着。基于地理位置的分区策略可以根据Broker的IP地址实现定制化的分区策略。
Listpartitions = cluster.partitionsForTopic(topic); return partitions.stream().filter(p -> isChina(p.leader().host())).map(PartitionInfo::partition).findAny().get();
可以从所有分区中找出Leader副本在中国的所有分区,然后随机挑选一个进行消息发送。
如果要自定义分区策略,需要显式地配置生产者端的参数partitioner.class。编写生产者程序时,可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner
接口(partition()和close()),通常只需要实现最重要的partition方法。
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前Kafka集群共有多少主题、多少Broker等)。设置partitioner.class参数为自己实现类的Full Qualified Name,生产者程序就会按照自定义分区策略的代码逻辑对消息进行分区。
无论消息是否被消费,kafka都会保留所有消息,同时定期检查旧的日志段是否能够被删除,从而回收磁盘空间,删除策略有两种:
基于时间:log.retention.hours=168
基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka 性能无关。
Kafka 2.1.0版本前,支持GZIP、Snappy、LZ4三种压缩算法。2.1.0版本开始正式支持Zstandard算法(简写为zstd ,Facebook开源的一个压缩算法),该算法能够提供超高的压缩比(compression ratio)。压缩算法可以使用压缩比和压缩/解压缩吞吐量两个指标进行衡量。不同压缩算法的性能比较如下:
生产环境中,GZIP、Snappy、LZ4、zstd性能表现各有千秋,在吞吐量方面:LZ4 > Snappy > zstd > GZIP;在压缩比方面,zstd > LZ4 > GZIP > Snappy。
如果要启用Producer端的压缩,Producer程序运行机器上的CPU资源必须充足。除了CPU资源充足,如果生产环境中带宽资源有限,也建议Producer端开启压缩。通常,带宽比CPU和内存要昂贵的多,因此千兆网络中Kafka集群带宽资源耗尽很容易出现。如果客户端机器CPU资源富余,建议Producer端开启zstd压缩,可以极大地节省网络资源消耗。对于解压缩,需要避免非正常的解压缩,如消息格式转换的解压缩操作、Broker与Producer解压缩算法不一致。
Producer发送压缩消息到Broker后,Broker会原封不动保存。当Consumer程序请求消息时,Broker 会原样发出,当消息到达Consumer端后,Consumer自行解压缩消息。Kafka会将使用的压缩算法封装进消息集合中,当Consumer读取到消息集合时,会知道消息使用的压缩算法。除了在Consumer端解压缩,Broker端也会进行解压缩,每个压缩过的消息集合在Broker端写入时都要发生解压缩操作,对消息执行各种验证。解压缩对Broker端性能是有一定影响的。
如果将Topic设置成单分区,该Topic的所有的消息都只在一个分区内读写,保证全局的顺序性,但将丧失Kafka多分区带来的高吞吐量和负载均衡的性能优势。
多分区消息保序的方法是按消息键保序策略,根据业务提取出需要保序的消息的逻辑主体,并建立消息标志位ID,,对标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区,既可以保证分区内的消息顺序,也可以享受到多分区带来的搞吞吐量。
说明:消息重试只是简单将消息重新发送到原来的分区,不会重新选择分区。
kafka只能保证分区内有序,无法保证分区间有序,所以消费时,数据是相对有序的。
在通过API方式发布消息时,生产者是以Record为消息进行发布的。Record中包含key与value,value才是消息本身,而key用于路由消息所要存放Partition。消息要写入到哪个Partition并不是随机的,而是由路由策略决定。
指定Partition,直接写入指定Partition。
没有指定Partition但指定了key,则通过对key的hash值与Partition数量取模,结果就是要选出的Partition索引。
Partition和key都未指定,则使用轮询算法选出一个Partition。
增加分区时,Partition内的消息不会重新进行分配,随着数据继续写入,新分区才会参与再平衡。
Producer先通过分区策略确定数据录入的partition,再从Zookeeper中找到Partition的Leader
Producer将消息发送给分区的Leader。
Leader将消息接入本地的Log,并通知ISR(In-sync Replicas,副本同步列表)的Followers。
ISR中的Followers从Leader中pull消息,写入本地Log后向Leader发送ACK(消息发送确认机制)。
Leader收到所有ISR中的Followers的ACK后,增加HW(high watermark,最后commit 的offset)并向Producer发送ACK,表示消息写入成功。
必须使用producer.send(msg, callback)接口发送消息。
Producer端设置acks参数值为all。acks参数值为all表示ISR中所有Broker副本都接收到消息,消息才算已提交。
设置Producer端retries参数值为一个较大值,表示Producer自动重试次数。当出现网络瞬时抖动时,消息发送可能会失败,此时Producer能够自动重试消息发送,避免消息丢失。
设置Broker端unclean.leader.election.enable = false,unclean.leader.election.enable参数用于控制有资格竞选分区Leader的Broker。如果一个Broker落后原Leader太多,那么成为新Leader必然会造成消息丢失。因此,要将unclean.leader.election.enable参数设置成false。
设置Broker端参数replication.factor >= 3,将消息保存多份副本。
设置Broker参数min.insync.replicas > 1,保证ISR中Broker副本的最少个数,在acks=-1时才生效。设置成大于1可以提升消息持久性,生产环境中不能使用默认值 1。
必须确保replication.factor > min.insync.replicas,如果两者相等,那么只要有一个副本挂机,整个分区无法正常工作。推荐设置成replication.factor = min.insync.replicas + 1。
确保消息消费完成再提交。设置Consumer端参数enable.auto.commit为false,并采用手动提交位移的方式。
Producer端拦截器实现类都要继承org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口有两个核心的方法:
onSend:在消息发送前被调用。
onAcknowledgement:在消息成功提交或发送失败后被调用。onAcknowledgement 调用要早于发送回调通知callback的调用。onAcknowledgement与onSend 方法不是在同一个线程中被调用,因此如果两个方法中使用了某个共享可变对象,要保证线程安全。
假设第一个拦截器的完整类路径是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个拦截器是com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,Producer指定拦截器的Java代码示例如下:
Properties props = new Properties(); Listinterceptors = new ArrayList<>(); interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1 interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Consumer向Broker提交连接请求,连接的Broker会向其发送Broker Controller的通信URL,即配置文件中的listeners地址;
当Consumer指定了要消费的Topic后,会向Broker Controller发送消费请求;
Broker Controller会为Consumer分配一个或几个Partition Leader,并将Partition的当前offset发送给Consumer;
Consumer会按照Broker Controller分配的Partition对其中的消息进行消费;
当Consumer消费完消息后,Consumer会向Broker发送一个消息已经被消费反馈,即消息的offset;
在Broker接收到Consumer的offset后,会更新相应的__consumer_offset中;
Consumer拦截器的实现类要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口,ConsumerInterceptor有两个核心方法。
onConsume:在消息返回给Consumer程序前调用。在开始正式处理消息前,拦截器会先做一些处理,再返回给Consumer。
onCommit:Consumer在提交位移后调用,可以进行一些打日志操作等。
同一个Consumer重复消费
当Consumer由于消费能力低而引发了消费超时,则可能会形成重复消费。
在某数据刚好消费完毕,但正准备提交offset时,消费时间超时,则Broker认为消息未消费成功,产生重复消费问题。
其解决方案:延长offset提交时间。
不同的Consumer重复消费
当Consumer消费了消息,但还没有提交offset时宕机,则已经被消费过的消息会被重复消费。
感谢各位的阅读,以上就是“怎么理解kafka分区、生产和消费”的内容了,经过本文的学习后,相信大家对怎么理解kafka分区、生产和消费这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!