订阅关系

来源:5-2 核心配置参数讲解

雏鹰飞翔

2020-07-30

同一个组中,同一个消费者,底下的topic和tag必须保持一致 。意思是一个组中,一个topic只能定义一个tag?
如果有多个tag,就需要定义多个组?

比如我使用单例如此构造我的消费者,,他们也会造成订阅关系的混乱?

@Slf4j
@Component
public class DefaultConsumer {

    private DefaultMQPushConsumer consumer;

    @Autowired
    private OnMessage onMessage;

    private DefaultConsumer(@Value("${whl.rocketMQ.namesrv}") String srv, @Value("${whl.rocketMQ.group}") String group) {
        this.consumer = new DefaultMQPushConsumer(group);
        this.consumer.setNamesrvAddr(srv);
        this.consumer.setConsumeThreadMin(10);
        this.consumer.setConsumeThreadMax(30);
    }

    public void getMessage(String topic,String tag) {

        try {
            consumer.subscribe(topic,tag);
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    onMessage.onMessage(msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (Exception e){
            e.printStackTrace();
        }
    }

    public void start(){
        try{
            this.consumer.start();
            System.out.printf("Consumer Started.%n");
        }catch (MQClientException e){
            e.printStackTrace();
        }
    }
}
@Component
public class TestImpl {

    @Autowired
    private DefaultConsumer defaultConsumer;

    @PostConstruct
    public void test(){
        defaultConsumer.getMessage(TopicName.TEST, "*");
        defaultConsumer.getMessage(TopicName.PEOPLE,"*");
        defaultConsumer.getMessage(TopicName.PAYMENT, "*");
        defaultConsumer.getMessage(TopicName.PRODUCT, "*");
        defaultConsumer.getMessage(TopicName.ORDER, "*");
        defaultConsumer.start();
    }
}
写回答

1回答

阿神

2020-08-03

tag一般是用于做过滤的,或者判断走不同的逻辑的,而不是用于区分不同的消费者

0
0

RocketMQ核心技术精讲与高并发抗压实战

理论+实战,系统掌握RocketMQ核心技术, 挑战高并发抗压实战

1182 学习 · 304 问题

查看课程