为什么我的@SendTo发送的消息@StreamListener有时接收不到?

来源:9-25 Spring Cloud Stream知识盘点【必看】

他门说这就是人生

2020-03-08

  1. 先看看我的日志吧:

第一次正常:

020-03-08 10:54:54.239  INFO 12644 --- [nio-9090-exec-1] c.g.r.a.mq.rabbit.StreamReceiver         : gaojingsi
2020-03-08 10:54:54.252  INFO 12644 --- [nio-9090-exec-1] c.g.r.a.mq.rabbit.StreamReceiver         : 1.png

第二次@SendTo(OUTPUT)发送的消息@StreamListener(OUTPUT)接收不到,并有日志信息:

2020-03-08 10:54:55.680  INFO 12644 --- [nio-9090-exec-9] c.g.r.a.mq.rabbit.StreamReceiver         : gaojingsi
2020-03-08 10:54:55.684  INFO 12644 --- [nio-9090-exec-9] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.0.253:5672]
2020-03-08 10:54:55.806  INFO 12644 --- [nio-9090-exec-9] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#75d16129:0/SimpleConnection@2b0ca81e [delegate=amqp://guest@192.168.0.253:5672/, localPort= 54442]
2020-03-08 10:54:55.826  INFO 12644 --- [nio-9090-exec-9] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (input.anonymous.wYZ3EkNeSwK0QKQy5arbFg) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2020-03-08 10:54:55.826  INFO 12644 --- [nio-9090-exec-9] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (springCloudBus.anonymous.3qNczjXIQQ2KhSrd8UwA9A) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.

第三次正常:

2020-03-08 10:56:03.637  INFO 12644 --- [nio-9090-exec-7] c.g.r.a.mq.rabbit.StreamReceiver         : gaojingsi
2020-03-08 10:56:03.637  INFO 12644 --- [nio-9090-exec-7] c.g.r.a.mq.rabbit.StreamReceiver         : 1.png

第四次@SendTo(OUTPUT)发送的消息@StreamListener(OUTPUT)接收不到,并没有提示:

2020-03-08 10:56:07.325  INFO 12644 --- [nio-9090-exec-3] c.g.r.a.mq.rabbit.StreamReceiver         : gaojingsi

第五次正常:

2020-03-08 10:56:17.806  INFO 12644 --- [nio-9090-exec-5] c.g.r.a.mq.rabbit.StreamReceiver         : gaojingsi
2020-03-08 10:56:17.807  INFO 12644 --- [nio-9090-exec-5] c.g.r.a.mq.rabbit.StreamReceiver         : 1.png

第六次@SendTo(OUTPUT)发送的消息@StreamListener(OUTPUT)接收不到,并没有提示:

2020-03-08 11:08:08.591  INFO 12644 --- [nio-9090-exec-6] c.g.r.a.mq.rabbit.StreamReceiver         : gaojingsi

......

下面是我的主要代码:

消息发送和接收的实体SysUser, SysUser的toString()返回的是name字段的值。

controller:

@RestController
@RequestMapping("/stream")
public class StreamMqController {

    @Autowired
    private Processor processor;

    @GetMapping("")
    public void testQueue() {
        SysUser sysUser = new SysUser();
        sysUser.setAvatar("1.png");
        sysUser.setName("gaojingsi");
        processor.input().send(MessageBuilder.withPayload(sysUser).build());
    }

}

消息的监听器:

@Component
@EnableBinding({Processor.class})
@Slf4j
public class StreamReceiver {

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public SysUser oneMessageIn(SysUser sysUser) {
        log.info("{}", sysUser);
        return sysUser;
    }

    @StreamListener(Processor.OUTPUT)
    public void oneMessageOut(SysUser sysUser) {
        log.info("{}", sysUser.getAvatar());
    }

}

请问这是怎么回事啊?

写回答

2回答

大目

2020-03-11

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-destination
          group: my-group
        output:
          destination: my-destination


添加这段配置即可。

0
1
他门说这就是人生
太感谢老师了
2020-03-11
共1条回复

大目

2020-03-08

您好,从代码看,是没有问题的。我看貌似收到和收不到呈101010的状态。建议排查下配置,看是否因为配置了partition导致。

0
6
他门说这就是人生
回复
大目
老师,麻烦抽空帮我看看哦
2020-03-10
共6条回复

Spring Cloud Alibaba微服务从入门到进阶

面向未来微服务:熟练掌握Spring Cloud Alibaba

3085 学习 · 1324 问题

查看课程