消费者的close方法
来源:10-1 Kafka集群篇介绍

乃好
2020-04-25
老师您好,我实验发现调用消费者的close方法,其会将所有本次拉取到的数据全部提交,比如第一次拉取到1000条数据,我在处理第500条数据的时候一旦执行close方法,会直接提交offset为1000,请问老师有没有什么办法既可以关闭消费者,又不让他全部提交呢?
再详细描述一下场景吧,消费者接收到1000条数据,
遍历每一条数据{ 1. 对数据进行处理生成sql; 2. 执行sql; 3. 数据库事务进行提交; 4. consumer.commitAsync(); }
假设执行到步骤2的时候发生了比如违反唯一性约束条件此类的数据库错误,肯定会抛出异常,那么此时程序不会再继续执行了,在这样的情况下,我们肯定会按下IDE上的程序终止按钮,然后检查数据库,解决数据库的数据问题,然后再次按下运行按钮,消费者又可以接收到数据,并且顺利操作数据进入数据库!
但是现在遇到的问题是,代码运行在系统当中,发生了错误,除非关闭整个系统,那么刚刚那个消费者对象仍然会存在一段时间,我也不清楚它到底是个什么情况,总之不是在正常的运行,现在我能做的就是去解决数据库中数据的问题,并且我还新建了一个消费者,接着上次的位置开始消费,但是此时我发现这个新建的消费者是接收不到数据的,emmm也许是在消费者那章提到的“单个分区的消息只能由ConsumerGroup中某个Consumer消费”,因为之前那个消费者还没死透,所以这个新建的消费者无法收到消息。然后我又等待了一段时间,大概5分钟的样子,再次启动刚刚新建的消费者,发现又可以接着上次提交的位置开始消费了!
所以现在我现在就想,
5分钟的时间太长了,是不是有什么消费者的参数可以设置,在消费代码中发生异常,其“死亡”得可以快一点,以便解决了数据库数据问题后快速再次进行数据的消费?
或者在抛出异常的时候直接关闭掉这个消费者,这样我也不用等,问题解决了再新建一个消费者就是了。这也是我提这个问题的初衷,如果调用close,虽然可以关闭消费者,但是所有数据都被提交了,没法再接着发生异常的数据位置的地方重新开始消费。
1回答
-
Allen
2020-04-26
并不是全提交, 而是consumer的poll的结果进行提交,如果想要控制, 就改成手动提交, 提交可以指定offset
122020-04-27
相似问题