`
jahu
  • 浏览: 58118 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

rocketmq之tags

 
阅读更多
tags 标签
顾名思义,tags就是一个消息的标签。
生成方式
Message  msg = new Message("PullTopic", "pull", i+"", 1+"".getBytes());
Message  msg = new Message();
msg.setTags("pull");上面代码中的 pull 就是标签。
如何使用了。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PullTopic", "pull");
consumer.registerMessageListener(
    new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext Context) {
                        //这里你可以得到 pull标签的 消息
                        Message msg = list.get(0);       
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
            );
consumer.start();哇!!! 是不是很棒。不同的MessageListenerConcurrently 实例消费不同的消息。能解决很多问题。
呵呵,呵呵
真相如何
tags,对Broker 是没有一点意义的。也不做任何的处理。
Consumer只是依据 tags尽心消息过滤而已
下面是 tags 处理的核心方法
public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            SubscriptionData subscriptionData =
                    FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                        topic, subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }
   
    public void subscribe(String topic, String fullClassName, String filterClassSource)
        throws MQClientException {
        try {
            SubscriptionData subscriptionData =
                    FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                        topic, "*");
            subscriptionData.setSubString(fullClassName);
            subscriptionData.setClassFilterMode(true);
            subscriptionData.setFilterClassSource(filterClassSource);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
       
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }
}消息过滤的方法
public class PullAPIWrapper {
   
    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
            final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        if (PullStatus.FOUND == pullResult.getPullStatus()) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

            List<MessageExt> msgListFilterAgain = msgList;
            if (!subscriptionData.getTagsSet().isEmpty() &amp;&amp; !subscriptionData.isClassFilterMode()) {
                msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }

            if (this.hasHook()) {
                FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(unitMode);
                filterMessageContext.setMsgList(msgListFilterAgain);
                this.executeHook(filterMessageContext);
            }

            for (MessageExt msg : msgListFilterAgain) {
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                    Long.toString(pullResult.getMinOffset()));
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                    Long.toString(pullResult.getMaxOffset()));
            }

            pullResultExt.setMsgFoundList(msgListFilterAgain);
        }

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }
}

//processPullResult 会被 pull操作成功之后调用
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
    private void pullAsyncImpl(//
                               final MessageQueue mq,//
                               final String subExpression,//
                               final long offset,//
                               final int maxNums,//
                               final PullCallback pullCallback,//
                               final boolean block,//
                               final long timeout) throws MQClientException, RemotingException, InterruptedException {
    .......
    this.pullAPIWrapper.pullKernelImpl(//
                    mq, // 1
                    subscriptionData.getSubString(), // 2
                    0L, // 3
                    offset, // 4
                    maxNums, // 5
                    sysFlag, // 6
                    0, // 7
                    this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
                    timeoutMillis, // 9
                    CommunicationMode.ASYNC, // 10
                    new PullCallback() {

                        @Override
                        public void onException(Throwable e) {
                            pullCallback.onException(e);
                        }


                        @Override
                        public void onSuccess(PullResult pullResult) {
                            pullCallback.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper
                                    .processPullResult(mq, pullResult, subscriptionData));
                        }
                    });
                              
    }
   
    public void pullMessage(final PullRequest pullRequest) {
        ..........
         PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult =DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                                pullRequest.getMessageQueue(), pullResult, subscriptionData);
        ........
    }

    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums,
                                boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException {
    this.makeSureStateOK();

    if (null == mq) {
        throw new MQClientException("mq is null", null);

    }

    if (offset < 0) {
        throw new MQClientException("offset < 0", null);
    }

    if (maxNums <= 0) {
        throw new MQClientException("maxNums <= 0", null);
    }

    this.subscriptionAutomatically(mq.getTopic());

    int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

    SubscriptionData subscriptionData;
    try {
        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//
                mq.getTopic(), subExpression);
    } catch (Exception e) {
        throw new MQClientException("parse subscription error", e);
    }

    long timeoutMillis =
            block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;

    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
            mq, // 1
            subscriptionData.getSubString(), // 2
            0L, // 3
            offset, // 4
            maxNums, // 5
            sysFlag, // 6
            0, // 7
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
            timeoutMillis, // 9
            CommunicationMode.SYNC, // 10
            null// 11
    );

    return this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
}所以 tags 作用只是 在 Consumer 做消息过滤。
比如 所有订单到 push到一个 主题里面,有N个服务这个主题进行消费,每个服务只对应一个平台的商家。类似的场景可以使用tags
DefaultMQPullConsumer 没有subscribe方法,所以消息过滤需要自己做。很好。
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics