急:stream消费消息 报错
来源:9-24 Spring Cloud Stream + RocketMQ实现分布式事务02-重构消费者

hthonor
2020-10-23
消费消息时报错:java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO
我的代码:
发送消息
this.source.output().send(
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("shared_id", id)
.setHeader("dto", JSON.toJSONString(shareAuditDTO))
.build()
);
消费消息
@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusStreamConsumer {
private final UserService userService;
@StreamListener(Sink.INPUT)
public void receive(Message<UserAddBonusMsgDTO> message){
UserAddBonusMsgDTO msgDTO = (UserAddBonusMsgDTO)message.getPayload(); //这一行报错
this.userService.addBonus(msgDTO);
}
}
报错
2020-10-23 21:49:11.659 ERROR 9682 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking AddBonusStreamConsumer#receive[1 args]; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO, failedMessage=GenericMessage [payload=byte[23], headers={rocketmq_QUEUE_ID=0, rocketmq_TOPIC=add-bonus, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=12, rocketmq_MESSAGE_ID=C0A802A4D93418B4AAC261194C2E0000, rocketmq_SYS_FLAG=0, id=e356a457-7dae-b6a1-65c5-550378b70c4d, rocketmq_BORN_HOST=192.168.2.164, contentType=text/plain, rocketmq_BORN_TIMESTAMP=1603110647863, timestamp=1603460948627}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter.access$300(RocketMQInboundChannelAdapter.java:42)
at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.lambda$onMessage$0(RocketMQInboundChannelAdapter.java:148)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:147)
at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:140)
at com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer$DefaultMessageListenerConcurrently.consumeMessage(RocketMQListenerBindingContainer.java:420)
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:412)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO
at com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer.receive(AddBonusStreamConsumer.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 26 more
2020-10-23 21:49:11.667 WARN 9682 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume message failed. messageExt:MessageExt [brokerName=hthonor.local, queueId=0, storeSize=404, queueOffset=11, sysFlag=0, bornTimestamp=1603110647863, bornHost=/192.168.2.164:51475, storeTimestamp=1603380551053, storeHost=/192.168.2.164:10911, msgId=C0A802A400002A9F000000000000AD38, commitLogOffset=44344, bodyCRC=51874911, reconsumeTimes=12, preparedTransactionOffset=0, toString()=Message{topic='add-bonus', flag=0, properties={rocketmq_RECONSUME_TIMES=12, CONSUME_START_TIME=1603460948612, MIN_OFFSET=0, REAL_TOPIC=%RETRY%bind, ORIGIN_MESSAGE_ID=C0A802A400002A9F0000000000000000, RETRY_TOPIC=add-bonus, MAX_OFFSET=12, id=f4eaa2c3-413a-0a7d-3e22-a8b47a12b0c1, UNIQ_KEY=C0A802A4D93418B4AAC261194C2E0000, CLUSTER=DefaultCluster, WAIT=false, contentType=text/plain, DELAY=14, timestamp=1603110647673, REAL_QID=0}, body=[123, 34, 117, 115, 101, 114, 73, 100, 34, 58, 50, 44, 34, 98, 111, 110, 117, 115, 34, 58, 53, 48, 125], transactionId='null'}]
org.springframework.messaging.MessagingException: Exception thrown while invoking AddBonusStreamConsumer#receive[1 args]; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter.access$300(RocketMQInboundChannelAdapter.java:42) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]
at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.lambda$onMessage$0(RocketMQInboundChannelAdapter.java:148) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.5.RELEASE.jar:na]
at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:147) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]
at com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter$BindingRocketMQListener.onMessage(RocketMQInboundChannelAdapter.java:140) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]
at com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer$DefaultMessageListenerConcurrently.consumeMessage(RocketMQListenerBindingContainer.java:420) ~[spring-cloud-starter-stream-rocketmq-2.2.1.RELEASE.jar:2.2.1.RELEASE]
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:412) [rocketmq-client-4.7.1.jar:4.7.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to com.itmuch.usercenter.domain.dto.UserAddBonusMsgDTO
at com.itmuch.usercenter.rocketmq.AddBonusStreamConsumer.receive(AddBonusStreamConsumer.java:27) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_65]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_65]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_65]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_65]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
... 26 common frames omitted
1回答
-
weixin_精慕门0269720
2020-10-24
message.getPayload() 的结果是String 你却用来强转,必然报错。麻烦用JSON转
022020-10-24
相似问题