急:stream消费消息 报错

来源:9-24 Spring Cloud Stream + RocketMQ实现分布式事务02-重构消费者

hthonor

2020-10-23

消费消息时报错:java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO


我的代码:

发送消息

this.source.output().send(

       MessageBuilder
               .withPayload(
                       UserAddBonusMsgDTO.builder()
                               .userId(share.getUserId())
                               .bonus(50)
                               .build()
               )
               .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
               .setHeader("shared_id", id)
               .setHeader("dto", JSON.toJSONString(shareAuditDTO))
               .build()
);


消费消息

@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusStreamConsumer {
   private final UserService userService;

   @StreamListener(Sink.INPUT)
   public void receive(Message<UserAddBonusMsgDTO> message){
       UserAddBonusMsgDTO msgDTO = (UserAddBonusMsgDTO)message.getPayload();    //这一行报错
       this.userService.addBonus(msgDTO);
   }
}


报错

2020-10-23 21:49:11.659 ERROR 9682 --- [MessageThread_1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking AddBonusStreamConsumer#receive[1 args]; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO, failedMessage=GenericMessage [payload=byte[23], headers={rocketmq_QUEUE_ID=0, rocketmq_TOPIC=add-bonus, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=12, rocketmq_MESSAGE_ID=C0A802A4D93418B4AAC261194C2E0000, rocketmq_SYS_FLAG=0, id=e356a457-7dae-b6a1-65c5-550378b70c4d, rocketmq_BORN_HOST=192.168.2.164, contentType=text/plain, rocketmq_BORN_TIMESTAMP=1603110647863, timestamp=1603460948627}]

at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)

at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)

at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)

at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)

at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)

at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)

at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)

at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)

at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)

at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)

at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)

at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter.access$300(RocketMQInboundChannelAdapter.java:42)

at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.lambda$onMessage$0(RocketMQInboundChannelAdapter.java:148)

at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)

at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)

at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:147)

at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:140)

at com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer$DefaultMessageListenerConcurrently.consumeMessage(RocketMQListenerBindingContainer.java:420)

at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:412)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO

at com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer.receive(AddBonusStreamConsumer.java:27)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)

at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)

at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)

... 26 more


2020-10-23 21:49:11.667  WARN 9682 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume message failed. messageExt:MessageExt [brokerName=hthonor.local, queueId=0, storeSize=404, queueOffset=11, sysFlag=0, bornTimestamp=1603110647863, bornHost=/192.168.2.164:51475, storeTimestamp=1603380551053, storeHost=/192.168.2.164:10911, msgId=C0A802A400002A9F000000000000AD38, commitLogOffset=44344, bodyCRC=51874911, reconsumeTimes=12, preparedTransactionOffset=0, toString()=Message{topic='add-bonus', flag=0, properties={rocketmq_RECONSUME_TIMES=12, CONSUME_START_TIME=1603460948612, MIN_OFFSET=0, REAL_TOPIC=%RETRY%bind, ORIGIN_MESSAGE_ID=C0A802A400002A9F0000000000000000, RETRY_TOPIC=add-bonus, MAX_OFFSET=12, id=f4eaa2c3-413a-0a7d-3e22-a8b47a12b0c1, UNIQ_KEY=C0A802A4D93418B4AAC261194C2E0000, CLUSTER=DefaultCluster, WAIT=false, contentType=text/plain, DELAY=14, timestamp=1603110647673, REAL_QID=0}, body=[123, 34, 117, 115, 101, 114, 73, 100, 34, 58, 50, 44, 34, 98, 111, 110, 117, 115, 34, 58, 53, 48, 125], transactionId='null'}]


org.springframework.messaging.MessagingException: Exception thrown while invoking AddBonusStreamConsumer#receive[1 args]; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO

at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]

at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter.access$300(RocketMQInboundChannelAdapter.java:42) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]

at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.lambda$onMessage$0(RocketMQInboundChannelAdapter.java:148) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]

at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]

at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.5.RELEASE.jar:na]

at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:147) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]

at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:140) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]

at com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer$DefaultMessageListenerConcurrently.consumeMessage(RocketMQListenerBindingContainer.java:420) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]

at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:412) [rocketmq-client-4.7.1.jar:4.7.1]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65]

at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO

at com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer.receive(AddBonusStreamConsumer.java:27) ~[classes/:na]

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_65]

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_65]

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_65]

at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_65]

at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]

at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]

... 26 common frames omitted




写回答

1回答

weixin_精慕门0269720

2020-10-24

message.getPayload() 的结果是String  你却用来强转,必然报错。麻烦用JSON转

0
2
大目
回复
hthonor
目前来看,你的代码没啥问题,并且课上也是用对象接的,但是没有发现问题,考虑是版本的问题。
2020-10-24
共2条回复

Spring Cloud Alibaba微服务从入门到进阶

面向未来微服务:熟练掌握Spring Cloud Alibaba

3085 学习 · 1324 问题

查看课程