订阅关系
来源:5-2 核心配置参数讲解

雏鹰飞翔
2020-07-30
同一个组中,同一个消费者,底下的topic和tag必须保持一致 。意思是一个组中,一个topic只能定义一个tag?
如果有多个tag,就需要定义多个组?
比如我使用单例如此构造我的消费者,,他们也会造成订阅关系的混乱?
@Slf4j
@Component
public class DefaultConsumer {
private DefaultMQPushConsumer consumer;
@Autowired
private OnMessage onMessage;
private DefaultConsumer(@Value("${whl.rocketMQ.namesrv}") String srv, @Value("${whl.rocketMQ.group}") String group) {
this.consumer = new DefaultMQPushConsumer(group);
this.consumer.setNamesrvAddr(srv);
this.consumer.setConsumeThreadMin(10);
this.consumer.setConsumeThreadMax(30);
}
public void getMessage(String topic,String tag) {
try {
consumer.subscribe(topic,tag);
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
onMessage.onMessage(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
} catch (MQClientException e) {
e.printStackTrace();
} catch (Exception e){
e.printStackTrace();
}
}
public void start(){
try{
this.consumer.start();
System.out.printf("Consumer Started.%n");
}catch (MQClientException e){
e.printStackTrace();
}
}
}
@Component
public class TestImpl {
@Autowired
private DefaultConsumer defaultConsumer;
@PostConstruct
public void test(){
defaultConsumer.getMessage(TopicName.TEST, "*");
defaultConsumer.getMessage(TopicName.PEOPLE,"*");
defaultConsumer.getMessage(TopicName.PAYMENT, "*");
defaultConsumer.getMessage(TopicName.PRODUCT, "*");
defaultConsumer.getMessage(TopicName.ORDER, "*");
defaultConsumer.start();
}
}
写回答
1回答
-
阿神
2020-08-03
tag一般是用于做过滤的,或者判断走不同的逻辑的,而不是用于区分不同的消费者
00
相似问题