重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
Kafka所支持的分区策略:
1- 粘性分区策略(2.4版本下, 支持轮询策略) Java客户端支持, 但是Python客户端不支持
2- hash取模的策略
3- 指定分区策略
4- 随机分区策略: Python客户端是支持的 Java不支持
5- 自定义分区策略
1- 指定分区策略
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
在生产端, 构建数据承载对象的时候, 采用此构造, 即可采用指定分区的策略
分区的编号从0开始的
注意: 指定分区与defaultPartitioner类没有任何的关系
2- Hash 取模策略
2.1 创建数据承载对象的时候, 必须使用仅传递 k v的构造方法, 即可使用Hash方式
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
注意: 当执行Hash取模分区策略, 其底层是通过一个默认的分区类来完成Hash取模计算: DefaultPartitioner(默认分区类)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// 当有key的时候, 采用Hash取模的方式
List
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
说明: 在使用此种分发策略的时候, Key值一定是可变的, 千万不要固定不变
3- 粘性分区策略
3.1 创建数据承载对象的时候,只需要传递value即可, 此时采用的粘性的分区策略
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
注意: 当执行Hash取模分区策略, 其底层是通过一个默认的分区类来完成Hash取模计算: DefaultPartitioner(默认分区类)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
// 当key为null的时候, 采用的分发的方式为stickyPartition(粘性分区)
return stickyPartitionCache.partition(topic, cluster);
}
// 当有key的时候, 采用Hash取模的方式
List
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
4- 自定义分区策略
如何自定义分区呢? 抄 抄DefaultPartitioner
1- 创建一个类, 实现 Partitioner 接口
2- 重写接口中方法: partition() 和 close方法, 主要是重写partition()
partition方法的参数列表说明:
String topic: 指定要写入到那个topic上
Object key : 表示 key值
byte[] keyBytes: 表示 key的字节数组
Object value: 表示value的值
byte[] valueBytes: 表示value的字节数组
Cluster cluster : 集群对象 可以帮助获取某个topic有多少个分片
3) 将自定义的分区类, 配置到生产者的配置信息中:
key: partitioner.class
value值:
默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner
修改为我们自己写的类即可
将其放置到生产者的properties中
# 博学谷IT 技术支持
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧