本节课完成之后receive()会报错

来源:9-24 Spring Cloud Stream + RocketMQ实现分布式事务02-重构消费者

L灬Lawliet

2019-08-15

应该与这位同学遇到的问题一样
https://coding.imooc.com/learn/questiondetail/136262.html
我这把异常贴出来

2019-08-15 18:01:31.337 ERROR 256252 --- [MessageThread_1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer#receive[1 args]; nested exception is java.lang.IllegalStateException: java.lang.ClassCastException@5077e9e8
Endpoint [com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer]
Method [public void com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer.receive(com.itmuch.usercenter.domain.dto.messaging.UserAddBonusMsgDTO)] with argument values:
 [0] [type=java.lang.String] [value={"userId":1,"bonus":50}] , failedMessage=GenericMessage [payload=byte[23], headers={rocketmq_QUEUE_ID=0, rocketmq_TOPIC=add-bonus, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=0A144E7B4A9818B4AAC24AD3AAC10000, rocketmq_SYS_FLAG=0, id=4351797a-bb66-2473-b9c4-b9a2d7789088, rocketmq_BORN_HOST=10.20.78.123, contentType=text/plain, rocketmq_BORN_TIMESTAMP=1565844185932, timestamp=1565863288334}]
	at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
	at org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter.access$300(RocketMQInboundChannelAdapter.java:41)
	at org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.lambda$onMessage$0(RocketMQInboundChannelAdapter.java:147)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
	at org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:146)
	at org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:139)
	at org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer$DefaultMessageListenerConcurrently.consumeMessage(RocketMQListenerBindingContainer.java:377)
	at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:411)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: java.lang.ClassCastException@5077e9e8
Endpoint [com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer]
Method [public void com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer.receive(com.itmuch.usercenter.domain.dto.messaging.UserAddBonusMsgDTO)] with argument values:
 [0] [type=java.lang.String] [value={"userId":1,"bonus":50}] 
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:176)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
	at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
	... 27 more
Caused by: java.lang.IllegalArgumentException: java.lang.ClassCastException@5077e9e8
	at sun.reflect.GeneratedMethodAccessor114.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
	... 29 more

就是接收到消息之后消息确实消费成功了 但是又抛出了异常 理论上这是会回滚的
如果老师您还是不太清楚我的问题的话 我把代码发一下

写回答

2回答

大目

2019-08-15

您好,我看了下你的代码,代码本身没什么问题。您RocketMQ服务器是4.5.1吗?看异常,是String无法转换成UserAddBonusMsgDTO

Method [public void com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer.receive(com.itmuch.usercenter.domain.dto.messaging.UserAddBonusMsgDTO)] with argument values:
 [0] [type=java.lang.String] [value={"userId":1,"bonus":50}] 
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:176)

这是一个典型的序列化/反序列化问题。

要想解决有两种方式:

  1. 使用和课上相同版本的RocketMQ;

  2. 将错就错,自己实现序列化,像下面这样:

   public void receive(String message) {
        UserAddBonusMsgDto userAddBonusMsgDto  = JSON.parseObject(message, UserAddBonusMsgDto.class);
        this.userService.addBonus(userAddBonusMsgDto);
    }


最后,您有一点理解不正确:

就是接收到消息之后消息确实消费成功了 但是又抛出了异常 理论上这是会回滚的

事务消息一旦投递到MQ,你的内容中心的事务就不可能回滚了,RocketMQ会保证你的消费者能消费到这条消息(重试N多次(15次还是18次来着),如果依然消费不了,就会进死信队列)。

就你的代码来看,RocketMQ会间隔一定的时间,然后重新尝试投递相同的消息。重试间隔如下,在conf/broker.conf文件里,用如下配置修改:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

当然,这些都是RocketMQ的细节,课上没有去展开。



对了,如果你对消息生产者那个回查周期感兴趣的话,也可以看下其他同学问的问题:http://coding.imooc.com/learn/questiondetail/135842.html

1
6
大目
回复
L灬Lawliet
建议使用和课上相同的版本哈,避免踩到不必要的坑 课上花费在版本上的时间还是挺多的?
2019-08-16
共6条回复

风舞炫动

2019-11-25

分享一下:同样遇到了该问题,我的解决办法是给DTO实现序列化接口之后就正常了

1
0

Spring Cloud Alibaba微服务从入门到进阶

面向未来微服务:熟练掌握Spring Cloud Alibaba

3084 学习 · 1324 问题

查看课程