概述
需求来源,在review前人留下的屎山代码时发现如下截图所示的代码片段:
也就是说代码是空实现的。另外,从类名定义也知道需求未实现。
于是有此需求:已经消费过的消息重新消费。
调研
调研下来,主要有以下3种可能性方案
实现方案
- 修改偏移量,即offset,可通过脚本快速实现
- 新增Group,这将使 Kafka 认为您正在使用一个新的消费者组,并从起始偏移量开始重新消费消息。需通过代码实现
- 消息产生者重新发送消息,实际业务中不太常见,也不现实
Kafka的偏移量的保存方式,根据不同版本号有3种方式:保存在zookeeper中、保存在kafka的自带_consumer_offset这个topic中、保存在自定义的存储系统中。
版本
首先需要知道Kafka什么版本,找到Kafka的安装目录。可以通过find命令:find / -name '*kafka*'。
根据命令输出得知安装目录为:/usr/local/kafka,进入到libs目录,发现很多kafka_2.11-2.2.0.*文件。其中,2.11为scala版本,2.2.0为kafka版本。
脚本
使用Kafka自带的bin目录下的kafka-consumer-groups.sh脚本设置消费者组(consumer group)的位移, 这是0.11.0.0版本提供的新功能且只适用于新版本consumer。在此版本之前,如果要为已有的consumer group调整位移必须要手动编写Java程序调用KafkaConsumer.seek()方法。
使用此脚本可修改consumer group的位移,有个前提:consumer group必须是inactive的,即不能是处于正在工作中的状态。
格式:
./kafka-consumer-groups.sh --bootstrap-server ip:9092 --group <> --topic
如:./kafka-consumer-groups.sh --bootstrap-server 172.100.200.200:9092 --group collect_data_business_service --topic topic_event:0 --reset-offsets --to-offset 195725 --execute
topic:指定topic的作用域
- --all-topics:为consumer group下所有topic的所有分区调整位移
- --topic t1 --topic t2:为指定的若干个topic的所有分区调整位移
- --topic t1:0,1,2:为指定的topic的某个分区调整位移
reset-offsets:确定位移重设策略
- --to-earliest:把位移调整到分区当前最小位移
- --to-latest:把位移调整到分区当前最新位移
- --to-current:把位移调整到分区当前位移
- --to-offset
:把位移调整到指定位移处 - --shift-by N:把位移调整到当前位移 + N处,N为负数表示向前移动
- --to-datetime
:把位移调整到大于给定时间的最早位移处,datetime格式yyyy-MM-ddTHH:mm:ss.xxx - --by-duration
:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,如PT0H5M0S - --from-file
:从CSV文件中读取调整策略 常见报错:
- Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
原因:命令指定的IP有误,或端口不对,或未开放等原因导致连接超时
- Error: Assignments can only be reset if the group ‘collect_data_business_service’ is inactive, but the current state is Stable.
原因:在修改offset时,需要停止消费者应用程序,如杀掉Java进程,停止container容器等
- WARN New offset (0) is lower than earliest offset for topic partition topic_event-0. Value will be set to 195551 (kafka.admin.ConsumerGroupCommand$)
原因:
代码
也可以通过代码的方式,指定一个新的消费者组,即group.id。
@Override public void afterPropertiesSet() throws Exception { Properties p = new Properties(); p.setProperty("bootstrap.servers", "172.100.200.200:9092"); p.setProperty("group.id", "collect_data_business_service"); p.setProperty("auto.offset.reset", "earliest"); p.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); p.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer
consumer = new KafkaConsumer<>(p); consumer.subscribe(Collections.singleton("topic_event")); consumer.poll(Duration.ofSeconds(1)); consumer.seekToBeginning(consumer.assignment()); while (true) { ConsumerRecords message = consumer.poll(Duration.ofSeconds(1)); log.info("topic_event---collect_data_business_service--消费消息:" + message); CollectDataV2Content content = JsonUtil.jsonToBean(message.toString(), CollectDataV2Content.class); service.saveContent(content); } } 配置:
- auto.offset.reset:只能配置以下3种情况:latest, earliest, none,Spring-Kafka应用启动时会检查此项配置,不正确的配置会报错,应用启动失败
- key.deserializer:必须配置,否则报错:Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value.,应用启动失败。
- value.deserializer:同上
参考
- Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
猜你喜欢
- 4天前(a级景区评定机构)全国A级旅游景区创建与提升培训班在敦煌市举办
- 4天前(三亚海棠湾君悦度假酒店)三亚海棠湾君悦酒店暑期夏令营悦趣海岛游招募中
- 4天前(安徽民宿发展报告)首届安徽省乡村民宿创意设计大赛启动
- 4天前(澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布)澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布
- 4天前(星级饭店的发展困境)星级饭店转型之路:从市场逻辑到行业实践的深度探索
- 4天前(美诺酒店集团旗下臻选品牌m collection)美诺酒店集团启动盛橡品牌战略焕新 开启全球扩张新篇章
- 4天前(“三天跨两城”催生租车新需求,神州租车清明跨城订单同比增长416%)“三天跨两城”催生租车新需求,神州租车清明跨城订单同比增长416%
- 4天前(福州“一县一桌菜”“两马乡宴”品鉴会圆满举办,马尾美食共叙血脉亲情)福州“一县一桌菜”“两马乡宴”品鉴会圆满举办,马尾美食共叙血脉亲情
- 4天前(携程租车加盟合作)携程租车加盟优势全解析:开启旅游出行市场新篇章
- 4天前(阿斯塔纳航空属于哪个联盟)阿斯塔纳航空荣获Skytrax世界航空公司大奖,将继续助力中哈交流往来
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章