背景
某些时候,kafka上游生产者生产的消息有错误,或者下游消费者并不需要消费某部分的数据,这时候,通常有两个解决方案,一种是对数据做不解析处理,直接略过。另一种就是暂时关掉kafka的消费者组,等到生产者正常后再进行消费,但由于kafka本身是默认断点续传的,此时就需要我们先重置kafka中当前kafka组的offset。
解决方案
更改消费者组
由于kafka对某topic中offset的管理是以组的形式来进行的,因此,在新建或更改消费者组后,对于offset的管理也会重新开始,策略取决于配置的auto.offset.reset参数
在重启动时指定起始offset
在再次启动时,通过配置指定要消费topic中分区的offset
@KafkaListener(groupId = "topic_group_test",topicPartitions = { @TopicPartition(topic = "topic_test",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "9830")) })
java springboot版本
通过kafka服务端脚本指定重置
kafka-consumer-groups.sh --bootstrap-server 10.202.13.27:9092 \ --group cjw --reset-offsets --topic cjw-test --to-earliest --execute
具体支持8种操作
–to-earliest
–to-latest
–to-current
–to-offset
–shift-by N: 把位移调整到当前位移+N 处,N可以为负数
–to-datetime : 把位移调整到大于给定时间的最早位移处,datetime格式yyyy-MM-ddTHH:mm:ss.xxx
–by-duration :把位移调整到距离当前时间指定间隔的位移处,格式为PnDTnHnMnS
–from-file :从CSV文件中读取调整策略
通过API代码来指定
consumer.seekToEnd( consumer.partitionsFor(topic).stream().map(partitionInfo-> new TopicPartition(topic,partitionInfo.partition())) .collect(Collectors.toList()) );
void seek(TopicPartition partition, long offset);
Void seek(TopicPartition partition,OffsetAndMetadata offsetAndMetadata);
Void seekToBeginning(Collection partitions)
Void seekToEnd(Collection partitions)
注意
以上所有操作都需要在消费者组处于未激活的情况下进行
使用代码方式时,需要指定所有分区的消费策略
如有侵犯您的版权,请及时联系3500663466#qq.com(#换@),我们将第一时间删除本站数据。
暂无评论内容