当前位置: 首页 > news >正文

成都网站开发建设wordpress怎样添加轮播图

成都网站开发建设,wordpress怎样添加轮播图,wordpress支持七牛,wordpress+屏蔽ip插件RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实…

        RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实现的源码。

1 同步属性信息

Slave需要和Master同步的不只是消息本身,一些元数据信息也需要同步,比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在启动的时候,判断自己的角色是否是Slave,是的话就启动定时同步任务,如代码清单12-1所示。

代码清单12-1 Slave角色定时同步元数据信息

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.slaveSynchronize.syncAll();
            } catch (Throwable e) {
                log.error("ScheduledTask syncAll slave exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

 

在syncAll函数里,调用syncTopicConfig()、getAllConsumerOffset()、syncDelayOffset()和syncSubscriptionGroupConfig()进行元数据同步。我们以syncConsumerOffset为例,来看看底层的具体实现,如代码清单12-2所示。

代码清单12-2 getAllConsumerOffset具体实现

public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark());
}

 

getAllConsumerOffset()的基本逻辑是组装一个RemotingCommand,底层通过Netty将消息发送到Master角色的Broker,然后获取Offset信息。

2 同步消息体

下面介绍Master和Slave之间同步消息体内容的方法,也就是同步CommitLog内容的方法。CommitLog和元数据信息不同:首先,CommitLog的数据量比元数据要大;其次,对实时性和可靠性要求也不一样。元数据信息是定时同步的,在两次同步的时间差里,如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一致,不过这种情况还可以补救(手动调整Offset,重启Consumer等)。CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出故障,消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。

主要的实现代码在Broker模块的org.apache.rocketmq.store.ha包中,里面包括HAService、HAConnection和WaitNotifyObject这三个类。

HAService是实现commitLog同步的主体,它在Master机器和Slave机器上执行的逻辑不同,默认是在Master机器上执行,见代码清单12-3。

代码清单12-3 根据Broker角色,确定是否设置HaMasterAddress

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig
        .getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }

 

当Broker角色是Slave的时候,MasterAddr的值会被正确设置,这样HAService在启动的时候,在HAClient这个内部类中,connectMaster会被正确执行,如代码清单12-4所示。

代码清单12-4 Slave角色连接Master

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }
    return this.socketChannel != null;
}

 

从代码中可以看出,HAClient试图通过Java NIO函数去连接Master角色的Broker。Master角色有相应的监听代码,如代码清单12-5所示。

代码清单12-5 监听Slave的HA连接

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

CommitLog的同步,不是经过netty command的方式,而是直接进行TCP连接,这样效率更高。连接成功以后,通过对比Master和Slave的Offset,不断进行同步。

3 sync_master和async_master

sync_master和async_master是写在Broker配置文件里的配置参数,这个参数影响的是主从同步的方式。从字面意思理解,sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步过去;async_master是异步方式,也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。下面结合代码来分析,sync_master下的消息同步如代码清单12-6所示。

代码清单12-6 sync_master下的消息同步

public void handleHA(AppendMessageResult result,
    PutMessageResult putMessageResult, MessageExt messageExt) {
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore
        .getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait
            if (service.isSlaveOK(result.getWroteOffset() + result
                .getWroteBytes())) {
                GroupCommitRequest request = new GroupCommitRequest
                    (result.getWroteOffset() + result
                    .getWroteBytes());
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                boolean flushOK =
                    request.waitForFlush(this.defaultMessageStore
                        .getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do sync transfer other node, wait return, " +
                        "but failed, topic: " + messageExt
                        .getTopic() + " tags: "
                        + messageExt.getTags() + " client address: " +
                        messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus
                        .FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus
                    .SLAVE_NOT_AVAILABLE);
            }
        }
    }
}

 

在CommitLog类的putMessage函数末尾,调用handleHA函数。代码中的关键词是wakeupAll和waitForFlush,在同步方式下,Master每次写消息的时候,都会等待向Slave同步消息的过程,同步完成后再返回,如代码清单12-7所示。(putMessage函数比较长,仅列出关键的代码)。

代码清单12-7 putMessage中调用handleHA

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore
        .getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

  ……

    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);

    return putMessageResult;
}

 

http://www.yayakq.cn/news/943469/

相关文章:

  • html5 网站建设搜索引擎营销的原理是什么
  • 单片机做网站如何快速模仿一个网站
  • 机械行业做网站电子商城网站开发价格
  • 什么是网站的访问流量微信网站怎么收款
  • 免费网站模板大全欢迎页网页设计作品欣赏
  • wap类网站淘宝指数入口
  • 好便宜建站公众号网页制作软件
  • 安徽兴罗建设集团网站免费服务器主机
  • 做网站能赚多少钱抖音的商业营销手段
  • 做网站最下面写什么软件wordpress ap
  • 网站seo关键词排名优化视频号如何绑定小程序商店
  • 江西省城乡住房建设厅培训网站旅游网站建设网站
  • 在线免费网站wordpress 固定链接如何设置
  • 校园网站建设划分vlan工业和信息部网站备案
  • 腾讯云快速建站烟台做网站排名
  • 咋做黄页网站谷歌镜像网站怎么做
  • 网站网页是怎么做的物业管理系统业务流程图
  • iis如何用ip地址做域名访问网站医院网络营销推广方案
  • 企业网站建设的缺点自己制作小程序怎么做
  • dedecms模板站源码淘宝客wordpress引流
  • 汕头网站建站公司wordpress说说
  • 网站打开404错误怎么解决百度竞价排名名词解释
  • 织梦旅游网站教育机构招聘网站建设
  • .net 企业网站源码即墨网站建设招聘
  • 东莞seo建站怎么投放网络营销主要是什么
  • 站长统计幸福宝网站统计重庆今天刚刚发生的新闻
  • 长沙模板建站源码群晖ds1817做网站
  • 百度快照是怎么做上去的seo优化有哪些
  • 网站怎么做404 301怎样做元古建筑的网站结构图
  • 如何做公司的英文网站网站容易做吗