重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

consumer数量变化会怎样

本篇文章给大家分享的是有关consumer数量变化会怎样,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名申请、网页空间、营销软件、网站建设、景县网站维护、网站推广。

consumer数量变化会怎样

ConsumerManager
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    final Set subList, boolean isNotifyConsumerIdsChangedEnable) {

    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
            consumeFromWhere);
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            //通知同组内的其他consumer
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }

    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

    return r1 || r2;
}

public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    boolean isNotifyConsumerIdsChangedEnable) {
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null != consumerGroupInfo) {
        consumerGroupInfo.unregisterChannel(clientChannelInfo);
        if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
            ConsumerGroupInfo remove = this.consumerTable.remove(group);
            if (remove != null) {
                log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);

                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
            }
        }
        if (isNotifyConsumerIdsChangedEnable) {
            //单向通知channel
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }
}
DefaultConsumerIdsChangeListener
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
    case CHANGE:
        if (args == null || args.length < 1) {
            return;
        }
        List channels = (List) args[0];
        if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
            //对组内的其他consumer的channel连接发送单向通知(不管对方有木有收到)
            for (Channel chl : channels) {
                this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
            }
        }
        break;
}
Broker2Client
public void notifyConsumerIdsChanged(
    final Channel channel,
    final String consumerGroup) {
    if (null == consumerGroup) {
        log.error("notifyConsumerIdsChanged consumerGroup is null");
        return;
    }

    NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    RemotingCommand request =
        RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);

    try {
        this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    } catch (Exception e) {
        //发送异常,只是打印log
        log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
    }
}

通知channel是单向的,也就是不管对方有没有答复,都认为发送成功了,这样会有两种情况发生:

  1. channel收到消息:收到消息后,channel会触发rebalance,正常逻辑

  2. channel没收到消息:该consumer不会触发rebalance,存在问题!

    1. register:该consumer不知道已经有新的consumer加入,造成同一个mq会有多个consumer进行消费

    2. unregister:该consumer不知道有consumer下线,造成部分mq没有consumer负责消费

我们先看unregister这种情况

在consumer启动时,会同时启动一个RebalanceService线程,这个线程做的事就是每隔20秒主动进行一次rebalance,这样就能把unregister这种影响降低,最多导致该mq的消息会延迟20秒之后才有consumer负责消费

RebalanceService
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}

接下来分析比较大条的Register

同一个mq在同一组内有不同的consumer消费,这种情况在clustering模式下是有大问题的,会造成重复消费,消费进度错误等问题,带着rocketmq应该不至于犯如此低级错误的想法再继续看代码,果然别有洞天

RebalanceImpl
private void rebalanceByTopic(final String topic, final boolean isOrder) {
    //rebalance过程
    //关键点在这,在上面rebalance完之后, 就能知道自己该负责哪些mq的消费   
    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet, final boolean isOrder) {
    for (MessageQueue mq : mqSet) {
        //如果是新增的mq,会尝试调用远程broker lock mq,获取锁失败,则说明有其他consumer获取了锁,自己应该放弃消费该mq
        if (!this.processQueueTable.containsKey(mq)) {
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }   
        }
    }
}

以上就是consumer数量变化会怎样,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。


当前文章:consumer数量变化会怎样
当前地址:http://cqcxhl.cn/article/jigeph.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP