tags 标签
顾名思义,tags就是一个消息的标签。
生成方式
Message msg = new Message("PullTopic", "pull", i+"", 1+"".getBytes());
Message msg = new Message();
msg.setTags("pull");上面代码中的 pull 就是标签。
如何使用了。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PullTopic", "pull");
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext Context) {
//这里你可以得到 pull标签的 消息
Message msg = list.get(0);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();哇!!! 是不是很棒。不同的MessageListenerConcurrently 实例消费不同的消息。能解决很多问题。
呵呵,呵呵
真相如何
tags,对Broker 是没有一点意义的。也不做任何的处理。
Consumer只是依据 tags尽心消息过滤而已
下面是 tags 处理的核心方法
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
public void subscribe(String topic, String fullClassName, String filterClassSource)
throws MQClientException {
try {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, "*");
subscriptionData.setSubString(fullClassName);
subscriptionData.setClassFilterMode(true);
subscriptionData.setFilterClassSource(filterClassSource);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}消息过滤的方法
public class PullAPIWrapper {
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}
for (MessageExt msg : msgListFilterAgain) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
}
//processPullResult 会被 pull操作成功之后调用
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private void pullAsyncImpl(//
final MessageQueue mq,//
final String subExpression,//
final long offset,//
final int maxNums,//
final PullCallback pullCallback,//
final boolean block,//
final long timeout) throws MQClientException, RemotingException, InterruptedException {
.......
this.pullAPIWrapper.pullKernelImpl(//
mq, // 1
subscriptionData.getSubString(), // 2
0L, // 3
offset, // 4
maxNums, // 5
sysFlag, // 6
0, // 7
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
timeoutMillis, // 9
CommunicationMode.ASYNC, // 10
new PullCallback() {
@Override
public void onException(Throwable e) {
pullCallback.onException(e);
}
@Override
public void onSuccess(PullResult pullResult) {
pullCallback.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper
.processPullResult(mq, pullResult, subscriptionData));
}
});
}
public void pullMessage(final PullRequest pullRequest) {
..........
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult =DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
........
}
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums,
boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
this.makeSureStateOK();
if (null == mq) {
throw new MQClientException("mq is null", null);
}
if (offset < 0) {
throw new MQClientException("offset < 0", null);
}
if (maxNums <= 0) {
throw new MQClientException("maxNums <= 0", null);
}
this.subscriptionAutomatically(mq.getTopic());
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis =
block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
mq, // 1
subscriptionData.getSubString(), // 2
0L, // 3
offset, // 4
maxNums, // 5
sysFlag, // 6
0, // 7
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
timeoutMillis, // 9
CommunicationMode.SYNC, // 10
null// 11
);
return this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
}所以 tags 作用只是 在 Consumer 做消息过滤。
比如 所有订单到 push到一个 主题里面,有N个服务这个主题进行消费,每个服务只对应一个平台的商家。类似的场景可以使用tags
DefaultMQPullConsumer 没有subscribe方法,所以消息过滤需要自己做。很好。
分享到:
相关推荐
TagsInput 是一种可编辑的输入框,通过回车或者分号来分割每个标签,用回退键删除上一个标签。用 vue 来实现还是比较简单的。 先看效果图,下面会一步一步实现他。 注:以下代码需要vue-cli环境才能执行 (一)伪造...
rocketmq-flume Source&Sink 该项目用于与之间的消息接收和投递。 首先请确定您已经对和有了基本的了解 确保本地maven库中已经存在,或者下载RocketMQ源码自行编译...tags 可选 空字符串 Tag名称,遵循RocketMQ配置方式
HOME/lib目录中(具体包会在后面描述)SinkSink配置说明配置项必填默认值说明namesrvAddr必填nullName Server地址,遵循RocketMQ配置方式producerGroup可选DEFAULT_PRODUCERProducer分组topic必填nullTopic名称tags可...
rocketmq-flume Source&Sink 该项目用于与之间的消息接收和投递。 首先请确定您已经对和有了基本的了解 确保本地maven库中已经存在,或者下载RocketMQ源码自行编译...tags 可选 空字符串 Tag名称,遵循RocketMQ配置方式
struts-tags.tld struts-tags.tldstruts-tags.tld struts-tags.tld struts-tags.tld
JS自动获取TAGS关键词,通过JS脚本JS自动获取TAGS关键词
SVN中的Branches分支以及Tags标签详解与应用举例
spark-tags_2.11-2.1.3-SNAPSHOT.jar
bash tags,搜索代码方便
struts2-tags-API 轻松学习tags标签
System tags elements
tags标签伪静态的方法
ColdFusion Tags大部分 集合
Struts2 WebWork 2.0 Tags API 中文文档 [CHM] webwork提供了一套不依赖于显示层技术的标签库。这一章我们将概括性的描述每一个标签, 比如此标签支持的属性,标签的行为等等。 大多数的标签都可以用于所有的模板...
(Android 11)Event所有的tags:/system/etc/event-log-tags
com.jagregory.shiro.freemarker.ShiroTags 已经打包过的权限标签,可直接使用
SVN的标准目录结构:trunk、branches、tags
gen_tags.vim, 用来轻松使用 ctags/gtags的vim和neovim的异步插件 gen_tags.vim 为方便用户使用 Vim/ NeoVim,简化了 ctags/ gtags的使用。它用于为你生成和维护多个平台支持的标签,在 Windows/Linux/macOS. 上测试...
struts2 WebWork 2 标签 tags API 参考文档 参考手册