精华内容
下载资源
问答
  • redis主从保证数据一致性

    万次阅读 2020-03-11 11:00:04
    redis主从保证数据一致性 前言 在redis中为了保证redis的高可用,一般会搭建一种集群模式就是主从模式。 主从模式可以保证redis的高可用,那么redis是怎么保证主从服务器的数据一致性的,接下来我们浅谈下redis主...

    redis主从保证数据一致性

    1. 前言

    在redis中为了保证redis的高可用,一般会搭建一种集群模式就是主从模式。

    主从模式可以保证redis的高可用,那么redis是怎么保证主从服务器的数据一致性的,接下来我们浅谈下redis主(master)从(slave)同步的原理。

    1. 初次全量同步

    当一个redis服务器初次向主服务器发送salveof命令时,redis从服务器会进行一次全量同步,同步的步骤如下图所示:

    在这里插入图片描述

    • slave服务器向master发送psync命令(此时发送的是psync ? -1),告诉master我需要同步数据了。
    • master接收到psync命令后会进行BGSAVE命令生成RDB文件快照。
    • 生成完后,会将RDB文件发送给slave。
    • slave接收到文件会载入RDB快照,并且将数据库状态变更为master在执行BGSAVE时的状态一致。
    • master会发送保存在缓冲区里的所有写命令,告诉slave可以进行同步了
    • slave执行这些写命令。

    3.命令传播

    slave已经同步过master了,那么如果后续master进行了写操作,比如说一个简单的set name redis,那么master执行过当前命令后,会将当前命令发送给slave执行一遍,达成数据一致性。

    4.重新复制

    当slave断开重连之后会进行重新同步,重新同步分完全同步和部分同步

    首先来看看部分同步大致的走向

    在这里插入图片描述

    • 当slave断开重连后,会发送psync 命令给master。
    • master收到psync后会返回+continue回复,表示slave可以执行部分同步了。
    • master发送断线后的写命令给slave
    • slave执行写命令。
    实际上当slave发送psync命令给master之后,master还需要根据以下三点判断是否进行部分同步。

    先来介绍一下是哪三个方面:

    服务器运行ID
    每个redis服务器开启后会生成运行ID。

    当进行初次同步时,master会将自己的ID告诉slave,slave会记录下来,当slave断线重连后,发现ID是这个master的就会尝试进行部分重同步。当ID与现在连接的master不一样时会进行完整重同步。

    复制偏移量
    复制偏移量包括master复制偏移量和slave复制偏移量,当初次同步过后两个数据库的复制偏移量相同,之后master执行一次写命令,那么master的偏移量+1,master将写命令给slave,slave执行一次,slave偏移量+1,这样版本就能一致。

    复制积压缓冲区
    复制积压缓冲区是由master维护的固定长度的先进先出的队列。

    当slave发送psync,会将自己的偏移量也发送给master,当slave的偏移量之后的数据在缓冲区还存在,就会返回+continue通知slave进行部分重同步。

    当slave的偏移量之后的数据不在缓冲区了,就会进行完整重同步。

    结合以上三点,我们又可以总结下:

    • 当slave断开重连后,会发送psync 命令给master。
    • master首先会对服务器运行进行判断,如果与自己相同就进行判断偏移量
    • master会判断自己的偏移量与slave的偏移量是否一致。
    • 如果不一致,master会去缓冲区中判断slave的偏移量之后的数据是否存在。
    • 如果存在就会返回+continue回复,表示slave可以执行部分同步了。
    • master发送断线后的写命令给slave
    • slave执行写命令。
      5.主从同步最终流程

    在这里插入图片描述

    6.结语

    最近公司需要,我搭建了一套redis主从集群并且用哨兵进行监听实现主从切换。因此我根据《redis设计与实现》梳理了redis主从原理,给自己加深印象。

    展开全文
  • ZooKeeper是如何保证数据一致性

    万次阅读 2019-08-05 11:03:02
    在分布式系统里的多台服务器要对数据状态达成一致,其实是一件很有难度和挑战的事情,因为服务器集群环境的软硬件故障随时会发生,多台服务器对一个数据的记录保持一致,需要一些技巧和设计。 今天要讨论的是分布式...

    前面在讲HDFS和HBase架构分析的时候就提到了Zookeeper。在分布式系统里的多台服务器要对数据状态达成一致,其实是一件很有难度和挑战的事情,因为服务器集群环境的软硬件故障随时会发生,多台服务器对一个数据的记录保持一致,需要一些技巧和设计。

    今天要讨论的是分布式系统一致性与Zookeeper的架构

    通过之前的文章大家应该已经了解了HDFS为了保证整个集群的高可用,需要部署两台NameNode服务器一台作为主服务器,一台作为从服务器。当主服务器宕机时就切换到从服务器上访问。但是如果不同的应用程序或者DataNode做出的关于主服务器是否可用的判断不同,那么就会导致HDFS集群混乱。

    比如两个应用程序都需要对一个文件路径进行写操作,但是如果两个应用程序对于哪台是主服务器的判断不同,就会分别连接到两个不同的NameNode上,这样就会导致文件数据冲突,同一个文件指向了两份不同的数据。

    这种情况叫做脑裂,为了防止脑裂产生的根本,需要单独一个专门进行判断的服务器当裁判,让裁判决定哪个服务器是主服务器。。。

    但是这个做出判断决策的服务器也有可能出现故障不可访问,同样整个服务器集群也不能正常运行。所以这个判断决策的服务器必须由多台服务器组成,来保证高可用,任意一台服务器宕机都不会影响系统的可用性。

    那么问题又来了,这几台做出判断决策的服务器如何防止脑裂?

    这个时候比较常用的多台服务器状态一致性的解决方案是Zookeeper。

    Paxos算法与Zookeeper架构

    比如一个提供锁服务的分布式系统,由多台服务器构成一个集群对外提供锁服务,应用程序连接到任意一台服务器都可以获取或者释放锁,因此这些服务器必须严格保持状态一致,不能一台服务器将锁资源交给一个应用程序而另一台服务器将锁资源交给另一个应用程序,所以像这种分布式系统对数据一致性有更高的要求。

    Paxos算法就是用来解决这类问题,多台服务器通过内部投票表决机制决定一个数据的更新与写入。
    在这里插入图片描述
    应用程序连接到任意一台服务器后提起状态修改请求(也可以是获得某个状态所的请求),从图上看也就是服务器1,会将这个请求发送给集群中其他服务器进行表决。如果某个服务器同时收到了另一个应用程序同样的修改请求,它可能会拒绝服务器1的表决,并且自己也发起一个同样的表决请求,那么其他服务器就会根据时间戳和服务器排序进行表决。

    表决结果会发给其他所有服务器,发起表决的服务器会根据收到的表决结果决定请求是否可以执行,从而在收到请求的时候就保证了数据的一致性。

    Paxos算法比较复杂,为了简化实现,Zookeeper使用了一种叫ZAB(ZooKeeper Atomic
    Broadcast,ZooKeeper原子消息广播协议)的算法协议。基于ZAB算法,Zookeeper集群保证数据更新的一致性,并且通过集群方式保证了Zookeeper系统高可用。但是Zookeeper系统中所有服务器都存储相同的数据,也就是数据没有分片存储,因此不满足分区耐受性。

    Zookeeper通过一种树状结构记录数据,如下图:
    在这里插入图片描述
    应用程序可以通过路径的方式访问Zookeeper中的数据,比如/services/YaView/services/stupidname这样的路径方式修改、读取数据。Zookeeper还支持监听模式,当数据发生改变的时候,通知应用程序。

    因为大数据系统通常是主从架构,主服务器管理集群的状态和元信息,为了保证集群状态不发生脑裂,所以运行期只能有一个主服务器工作,举HDFS的例子来说,也就是一个Active状态的Namenode,但是为了保证高可用,所以还需要一个Standby状态的Namenode。

    那么问题就来了,其他服务器集群怎么知道哪个是active namenode,哪个是standby namenode?

    所以很多大数据系统都依赖Zookeeper提供的一致性数据服务,用于选举集群当前工作的主服
    务器。一台主服务器启动后向Zookeeper注册自己为当前工作的主服务器,因此另一台服务器
    就只能注册为热备主服务器,应用程序运行期都和当前工作的主服务器通信。

    因为Zookeeper系统的多台服务器存储相同的数据并且每次数据更新都要所有服务器投票表决, 所以和一般的分布式系统相反,Zookeeper集群的性能会随着服务器数量的增加而下降。
    在这里插入图片描述
    Zookeeper通过Paxos选举算法实现数据强一致性,并为各种大数据系统提高主服务器选举服务。虽然Zookeeper并没有什么特别强大的功能,但是在各类分布式系统和大数据系统中,Zookeeper出镜率非常高,因此也是很多系统的基础设施。

    展开全文
  • 1.分布式缓存的伸缩设计  1.1Memcached分布式缓存集群的访问模型 ...数据存储服务器必须保证数据的可靠存储,任何情况下都必须保证数据的可用和正确。  2.1关系数据库集群的伸...

      1.分布式缓存的伸缩性设计

      1.1Memcached分布式缓存集群的访问模型

      1.2分布式缓存的一致性Hash算法

     

      2.数据库存储服务器集群的伸缩性设计

      数据库存储服务器集群的伸缩性设计对数据的持久性和可用性提出了更高的要求。数据存储服务器必须保证数据的可靠存储,任何情况下都必须保证数据的可用性和正确性。

      2.1关系数据库集群的伸缩性设计

     

      除了数据库主从读写分离。不同业务数据表可以部署在不同的数据库集群上,俗称数据库分库。这种方式的限制条件是跨库的表不能Join操作。

     

     

     

     

     

      2.2NoSQL数据库的伸缩性设计

     

    转载于:https://www.cnblogs.com/wxgblogs/p/5484943.html

    展开全文
  • 负责通过数据,与其他服务器通信的端口 负责投票的端口 containsQuorum方法负责处理投票处理 判断是否过半 在配置文将中可以配置peerType属性,这个属性是服务器的类型(一般用于配置观察者) 快照和事务日志来...

    服务器的端口

    每个服务器会开放三个端口,这点可以通过配置文件中看出来
    在这里插入图片描述

    • clientPort:客户端访问的端口
    • ip第一个冒号后的端口:负责通过数据,与其他服务器通信的端口
    • ip第二个冒号后的端口:负责投票的端口

    Zxid

    在 ZAB 协议的事务编号 Zxid 设计中,Zxid 是一个 64 位的数字。

    其中低 32 位是一个简单的单调递增的计数器,针对客户端每一个事务请求,计数器加 1;

    而高 32 位则代表 Leader 周期 epoch 的编号。

    每个当选产生一个新的 Leader 服务器,就会从这个 Leader 服务器上取出其本地日志中最大事务的ZXID,并从中读取 epoch 值,然后加 1,以此作为新的 epoch,并将低 32 位从 0 开始计数。

    集群启动

    同样是通过QuorumPeerMain的main方法启动服务器,同样是先通过initializeAndRun方法进行初始化
    在这里插入图片描述
    这个parse方法就是对配置文件的解析
    在这里插入图片描述
    其中的parseProperties方法就是解析配置文件的方法,其中我会分析几个关键的地方
    在这里插入图片描述
    配置文件中的peerType属性就是配置服务器类型,一般都是配置观察者的时候使用的
    在这里插入图片描述
    后面会将配置的信息存放在一个map中,可以发现观察者服务器与其他服务器存放的位置不同
    在这里插入图片描述
    到最后会创建一个过半验证器,而这个过半数其实就是前面servers的数量的一般,是除开观察者的
    在这里插入图片描述
    创建过半验证器后会将所有服务器都放入servers中
    在这里插入图片描述
    集群启动的方式和单机模式有所不同
    在这里插入图片描述
    这个方法中主要干了三件事:

    1. 打开socket
    2. 配置服务器对象
    3. 启动服务器
    public void runFromConfig(QuorumPeerConfig config) throws IOException {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }
    
      LOG.info("Starting quorum peer");
      try {
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          // 打开socket
          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());
    
          // 下面都是将配置文件中的配置设置到quorumPeer对象中
          // 这个quorumPeer相当于当前服务器
          quorumPeer = getQuorumPeer();
    
          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  new File(config.getDataLogDir()),
                  new File(config.getDataDir())));
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
    
          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }
    
          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();
    
          // 启动服务器
          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }
    

    在启动服务器的方法中主要干了下面几件事,最后就是执行执行run方法

    @Override
    public synchronized void start() {
        // 导入快照信息到内存
        loadDataBase();
        // 开启线程接收客户端请求
        cnxnFactory.start();
        // 开始领导者选举
        startLeaderElection();
        // 启动线程,执行run方法
        super.start();
    }
    

    进入run方法我们主要关注那个while循环,这个while循环里有个选择语句,其中选择的就是当前服务器的状态,总共有下面四种状态

    1. LOOKING:选举状态
    2. OBSERVING:观察者
    3. FOLLOWING:跟随者
    4. LEADING:领导者

    这里就先不分析领导者选举了,就LEADING和FOLLOWING这两个状态来分析
    在这里插入图片描述

    • LEADING

    这里只分析lead方法
    在这里插入图片描述
    主要是开启了线程
    在这里插入图片描述
    在run方法中主要是接收了socket连接,这里的连接自然是learner的连接(learner包括follower和observer),然后会为每一个连接的服务器socket开启一个LearnerHandler线程
    在这里插入图片描述
    这里自然来分析一下LearnerHandler线程

    @Override
    public void run() {
        try {
            leader.addLearnerHandler(this);
            tickOfNextAckDeadline = leader.self.tick.get()
                    + leader.self.initLimit + leader.self.syncLimit;
    
            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);
    
            QuorumPacket qp = new QuorumPacket();
            // 接收learner发送的数据
            ia.readRecord(qp, "packet");
            if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
            	LOG.error("First packet " + qp.toString()
                        + " is not FOLLOWERINFO or OBSERVERINFO!");
                return;
            }
            byte learnerInfoData[] = qp.getData();
            if (learnerInfoData != null) {
            	if (learnerInfoData.length == 8) {
            		ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
            		this.sid = bbsid.getLong();
            	} else {
            		LearnerInfo li = new LearnerInfo();
            		ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
            		this.sid = li.getServerid();
            		this.version = li.getProtocolVersion();
            	}
            } else {
            	this.sid = leader.followerCounter.getAndDecrement();
            }
    
            LOG.info("Follower sid: " + sid + " : info : "
                    + leader.self.quorumPeers.get(sid));
                        
            if (qp.getType() == Leader.OBSERVERINFO) {
                  learnerType = LearnerType.OBSERVER;
            }
    
            // 拿到leader自身的领导者版本
            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
            
            long peerLastZxid;
            StateSummary ss = null;
            long zxid = qp.getZxid();
    
            // 这里this.getSid()是每个learner的id
            // 这个方法是得到当前最新的领导者版本号
            // 这个方法会把所有线程中对应的learner的版本号拿出来对比,得到一个最新的
            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            
            if (this.getVersion() < 0x10000) {
                // we are going to have to extrapolate the epoch information
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                // fake the message
                leader.waitForEpochAck(this.getSid(), ss);
            } else {
                byte ver[] = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
    
                // 这里leader也会创建以和Packet对象封装需要发送给learner的信息
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
                // 将数据发送给learner
                oa.writeRecord(newEpochPacket, "packet");
                // 刷新
                bufferedOutput.flush();
                // 创建一个ack包接收follower发送回来的数据
                QuorumPacket ackEpochPacket = new QuorumPacket();
                // 接收follower发送的ack数据
                ia.readRecord(ackEpochPacket, "packet");
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error(ackEpochPacket.toString()
                            + " is not ACKEPOCH");
                    return;
    			}
    			// 省略若干代码、、、、、、、
    }
    

    这里主要干了如下几件事情

    1. 接收learner发送的FOLLOWERINFO数据
    2. 计算出最新的epoch,主要的逻辑就是通过所有自身的Zxid和超过一半learner的Zxid(这里使用了过半机制)找到最新的epoch,并且最后加1
    3. 向learner发送LEADERINFO数据
    4. 接收follower返回的ACKEPOCH数据

    这里比较复杂的就getEpochToPropose方法,这个方法就是用来寻找最新的epoch的

    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
        synchronized(connectingFollowers) {
            if (!waitingForNewEpoch) {
                return epoch;
            }
            if (lastAcceptedEpoch >= epoch) {
                epoch = lastAcceptedEpoch+1;
            }
            if (isParticipant(sid)) {
                connectingFollowers.add(sid);
            }
            QuorumVerifier verifier = self.getQuorumVerifier();
            // containsQuorum是过半机制的验证
            if (connectingFollowers.contains(self.getId()) && 
                                            verifier.containsQuorum(connectingFollowers)) {
                waitingForNewEpoch = false;
                self.setAcceptedEpoch(epoch);
                connectingFollowers.notifyAll();
            } else {
                long start = Time.currentElapsedTime();
                long cur = start;
                long end = start + self.getInitLimit()*self.getTickTime();
                while(waitingForNewEpoch && cur < end) {
                    connectingFollowers.wait(end - cur);
                    cur = Time.currentElapsedTime();
                }
                if (waitingForNewEpoch) {
                    throw new InterruptedException("Timeout while waiting for epoch from quorum");        
                }
            }
            return epoch;
        }
    }
    
    • FOLLOWING

    同样这里只是分析followLeader方法
    在这里插入图片描述
    这里只分析前面和leader建立连接和拿到最新的epoch
    在这里插入图片描述
    与leader连接只是单纯的socket连接,就不再分析了,这里主要分析registerWithLeader方法

    protected long registerWithLeader(int pktType) throws IOException{
    	
    	// 省略若干代码、、、、、、
    
        // 将数据发送给leader
        writePacket(qp, true);
        // 接收leader发送的数据
        readPacket(qp);        
        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
    if (qp.getType() == Leader.LEADERINFO) {
        	// we are connected to a 1.0 server so accept the new epoch and read the next packet
        	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
        	byte epochBytes[] = new byte[4];
        	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
        	if (newEpoch > self.getAcceptedEpoch()) {
        		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
        		self.setAcceptedEpoch(newEpoch);
        	} else if (newEpoch == self.getAcceptedEpoch()) {
        		// since we have already acked an epoch equal to the leaders, we cannot ack
        		// again, but we still need to send our lastZxid to the leader so that we can
        		// sync with it if it does assume leadership of the epoch.
        		// the -1 indicates that this reply should not count as an ack for the new epoch
                wrappedEpochBytes.putInt(-1);
        	} else {
        		throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
        	}
        	// 封装ack数据
        	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
        	// 发送ack数据给leader
        	writePacket(ackNewEpoch, true);
            return ZxidUtils.makeZxid(newEpoch, 0);
    
    // 省略若干代码、、、、、、
    }
    

    这个方法主要干了下面几件事

    1. 将FOLLOWERINFO数据发送给leader
    2. 接收leader发送的LEADERINFO数据并得到最新的epoch
    3. 向leader发送ACKEPOCH数据

    至于observer的逻辑就不再分析了,和follower的相差无几,下面就是集群启动时的大致流程,至于数据同步会在下面分析。
    在这里插入图片描述

    同步数据

    在分析同步数据之前我们先回顾一下zookeeper的容灾备份机制,zookeeper有两种机制来保障数据的持久性,一个是快照数据,一个事务日志。

    其中快照数据包含了某一时刻之前的所有数据,但是zookeeper不会再每一次持久化的时候打快照,这个频率也不是固定的。

    不过肯定的是只是通过快照是无法保障数据的完整性的,还需要事务日志的协助,其中事务日志最多只能保存最近500次事务的日志。

    不过如果事务日志和快照数据同时起作用那么就可以保障数据的完整性,一般zookeeper再启动时会先将快照数据导入内存,然后查看快照数据最新的事务id时多少,然后查看事务日志是不是还有更新的数据,如果有就通过事务日志将没有持久化的数据重新执行一遍并持久化。

    承接上面的集群启动我们先回到LeaderHandler的run方法

    
    // 省略若干代码、、、、、、、、
    
     /* the default to send to the follower */
    // SNAP命令
    int packetToSend = Leader.SNAP;
    long zxidToSend = 0;
    long leaderLastZxid = 0;
    /** the packets that the follower needs to get updates from **/
    long updates = peerLastZxid;
    
    /* we are sending the diff check if we have proposals in memory to be able to 
     * send a diff to the 
     */ 
    ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
    ReadLock rl = lock.readLock();
    try {
        rl.lock();        
        final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
        final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
        LOG.info("Synchronizing with Follower sid: " + sid
                +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
                +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
                +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
    
        LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
    
        // peerLastZxid是learner的最新事务id
        if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
            // Follower is already sync with us, send empty diff
            LOG.info("leader and follower are in sync, zxid=0x{}",
                    Long.toHexString(peerLastZxid));
            packetToSend = Leader.DIFF;
            zxidToSend = peerLastZxid;
    
            // 如果learner的还有需要同步的数据的话
            // 就是learner的事务id不等于leader的事务id
        } else if (proposals.size() != 0) {
            LOG.debug("proposal size is {}", proposals.size());
            // 如果learner的事务id小于已经提交的事务id
            if ((maxCommittedLog >= peerLastZxid)
                    && (minCommittedLog <= peerLastZxid)) {
                LOG.debug("Sending proposals to follower");
    
                // as we look through proposals, this variable keeps track of previous
                // proposal Id.
                long prevProposalZxid = minCommittedLog;
    
                // Keep track of whether we are about to send the first packet.
                // Before sending the first packet, we have to tell the learner
                // whether to expect a trunc or a diff
                boolean firstPacket=true;
    
                // If we are here, we can use committedLog to sync with
                // follower. Then we only need to decide whether to
                // send trunc or not
                // 发送DIFF命令
                packetToSend = Leader.DIFF;
                zxidToSend = maxCommittedLog;
    
                for (Proposal propose: proposals) {
                    // skip the proposals the peer already has
                    if (propose.packet.getZxid() <= peerLastZxid) {
                        prevProposalZxid = propose.packet.getZxid();
                        continue;
                    } else {
                        // If we are sending the first packet, figure out whether to trunc
                        // in case the follower has some proposals that the leader doesn't
                        if (firstPacket) {
                            firstPacket = false;
                            // Does the peer have some proposals that the leader hasn't seen yet
                            if (prevProposalZxid < peerLastZxid) {
                                // send a trunc message before sending the diff
                                packetToSend = Leader.TRUNC;                                        
                                zxidToSend = prevProposalZxid;
                                updates = zxidToSend;
                            }
                        }
                        // 下面会将要同步的数据加入到一个queuedPackets队列中
                        queuePacket(propose.packet);
                        // 发送COMMIT命令
                        QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                null, null);
                        queuePacket(qcommit);
                    }
                }
    
                // 如果learner的事务id大于已经提交的事务id
            } else if (peerLastZxid > maxCommittedLog) {
                LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
                        Long.toHexString(maxCommittedLog),
                        Long.toHexString(updates));
    
                // 发送TRUNC命令
                packetToSend = Leader.TRUNC;
                zxidToSend = maxCommittedLog;
                updates = zxidToSend;
            } else {
                LOG.warn("Unhandled proposal scenario");
            }
        } else {
            // just let the state transfer happen
            LOG.debug("proposals is empty");
        }               
    
        LOG.info("Sending " + Leader.getPacketType(packetToSend));
        leaderLastZxid = leader.startForwarding(this, updates);
    
    } finally {
        rl.unlock();
    }
    
     QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
            ZxidUtils.makeZxid(newEpoch, 0), null, null);
     if (getVersion() < 0x10000) {
        oa.writeRecord(newLeaderQP, "packet");
    } else {
        queuedPackets.add(newLeaderQP);
    }
    bufferedOutput.flush();
    //Need to set the zxidToSend to the latest zxid
    if (packetToSend == Leader.SNAP) {
        zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
    }
    oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
    bufferedOutput.flush();
    
    /* if we are not truncating or sending a diff just send a snapshot */
    // 快照的数据直接通过socket发送出去
    if (packetToSend == Leader.SNAP) {
        LOG.info("Sending snapshot last zxid of peer is 0x"
                + Long.toHexString(peerLastZxid) + " " 
                + " zxid of leader is 0x"
                + Long.toHexString(leaderLastZxid)
                + "sent zxid of db as 0x" 
                + Long.toHexString(zxidToSend));
        // Dump data to peer
        leader.zk.getZKDatabase().serializeSnapshot(oa);
        oa.writeString("BenWasHere", "signature");
    }
    bufferedOutput.flush();
    
    // Start sending packets
    // 下面是处理TRUNC和DIFF命令
    new Thread() {
        public void run() {
            Thread.currentThread().setName(
                    "Sender-" + sock.getRemoteSocketAddress());
            try {
                sendPackets();
            } catch (InterruptedException e) {
                LOG.warn("Unexpected interruption",e);
            }
        }
    }.start();
    
    // 省略若干代码、、、、、、、、
    
    

    在上面的逻辑中,leader接收到learner的事务id,通过分析事务id,与自身已提交的事务id作对比,来决定与learner同步数据的方式,总共有如下三种方式:

    1. SNAP:如果learner的事务id比leader中事务日志的最小id还要小的时候,就需要通过快照数据的方式同步数据
    2. TRUNC:如果learner的事务id比leader中事务日志的最小id大但是比最大的id小的时候,就需要通过事务日志的方式同步数据
    3. DIFF:如果learner的事务id比leader中事务日志的最大id还要大的时候,就需要learner删除多余的数据

    下面会逐个分析每种方式的执行逻辑

    • SNAP

    一开始就会赋值SNAP命令
    在这里插入图片描述
    如果是SNAP方式同步数据会直接将需要同步的数据发送给learner
    在这里插入图片描述
    至于learner如果同步的数据就需要承接上面集群启动,在连接leader之后就是同步数据,通过syncWithLeader方法实现的
    在这里插入图片描述
    syncWithLeader方法中对于SNAP的处理如下,差不多就是两个步骤

    1. 清空数据
    2. 添加快照数据

    在这里插入图片描述

    • TRUNC

    如果learner的事务id大于已经提交的事务id就会发送TRUNC命令
    在这里插入图片描述
    在后面会开启一个线程去发送TRUNC命令
    在这里插入图片描述
    在learner的syncWithLeader方法中只是将多余的数据删除
    在这里插入图片描述

    • DIFF

    如果learner的事务id小于已经提交的事务id就会发送DIFF命令
    在这里插入图片描述
    并且会将需要同步的数据通过COMMIT发送过去,这里是先放到了一个队列中
    在这里插入图片描述
    同样是通过一个线程发送的数据
    在这里插入图片描述
    在learner的syncWithLeader方法先是设置了一个标志位
    在这里插入图片描述
    在这里插入图片描述
    前面我们直到leader发送了COMMIT命令,在这里也将需要同步的数据放到了一个队列中
    在这里插入图片描述
    不过这里需要注意的是这里已经进入了一个while循环中,只有通过leader发送UPTODATE命令才能跳出循环
    在这里插入图片描述
    在这里插入图片描述
    leader也是会发送UPTODATE命令
    在这里插入图片描述
    最后在learner中就会完成数据的提交并同步
    在这里插入图片描述

    这也就是集群模式下zookeeper的同步数据的方式,也是因为zookeeper通过快照和事务日志的方式持久化数据,所以zookeeper有如此的逻辑来同步数据。

    客户端发送请求

    处理器链

    首先我们来了解一下各种服务器端的处理器链,因为zookeeper服务器有三种角色,所以自然有三种不同的服务端对象,分别是

    1. LeaderZooKeeperServer
    2. FollowerZooKeeperServer
    3. ObserverZooKeeperServer

    自然各自的服务器都有其各自的处理器链,在下面setupRequestProcessors方法中就会初始化处理器链,这个方法每种服务器对象都做了重写。

        public synchronized void startup() {
            if (sessionTracker == null) {
                createSessionTracker();
            }
            startSessionTracker();
            // 初始化处理器链
            setupRequestProcessors();
            registerJMX();
            setState(State.RUNNING);
            notifyAll();
        }
    

    LeaderZooKeeperServer

    @Override
    protected void setupRequestProcessors() {
        /**
         * 下面总共初始化了两条处理器链
         * PrepRequestProcessor---->ProposalRequestProcessor---->CommitProcessor---->ToBeAppliedRequestProcessor---->FinalRequestProcessor
         * SyncRequestProcessor---->AckRequestProcessor
         */
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                finalProcessor, getLeader().toBeApplied);
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        // 这里ProposalRequestProcessor的构造器将AckRequestProcessor添加到SyncRequestProcessor的后面
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        // 启动了SyncRequestProcessor处理器
        proposalProcessor.initialize();
        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }
    
    1. PrepRequestProcessor:ACL安全验证,接收数据
    2. ProposalRequestProcessor:负责投票
    3. CommitProcessor:负责提交
    4. ToBeAppliedRequestProcessor
    5. FinalRequestProcessor:更新内存,返回response,触发事件

    同时单独启动了SyncRequestProcessor处理器

    1. SyncRequestProcessor
    2. AckRequestProcessor

    需要注意在ProposalRequestProcessor的构造方法中将AckRequestProcessor添加到SyncRequestProcessor的后面
    在这里插入图片描述

    FollowerZooKeeperServer

    @Override
    protected void setupRequestProcessors() {
       /**
        * 下面总共初始化了两条处理器链
        * FollowerRequestProcessor---->CommitProcessor---->FinalRequestProcessor
        * SyncRequestProcessor---->SendAckRequestProcessor
        */
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        // 这里创建了一个SendAckRequestProcessor处理器,在SyncRequestProcessor的后面
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        // 开启了SyncRequestProcessor处理器
        syncProcessor.start();
    }
    
    1. FollowerRequestProcessor
    2. CommitProcessor
    3. FinalRequestProcessor

    同样的单独启动了SyncRequestProcessor处理器

    1. SyncRequestProcessor
    2. SendAckRequestProcessor

    ObserverZooKeeperServer

    @Override
    protected void setupRequestProcessors() {
        /**
         * 下面总共初始化了一条处理器链
         * 而且还可以设置开启一个单独的处理器SyncRequestProcessor
         * 这个处理器如果开启了就可以使得观察者能够对事务的持久化和打快照
         * ObserverRequestProcessor---->CommitProcessor---->FinalRequestProcessor
         */
        // We might consider changing the processor behaviour of 
        // Observers to, for example, remove the disk sync requirements.
        // Currently, they behave almost exactly the same as followers.
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();
    
        /*
         * Observer should write to disk, so that the it won't request
         * too old txn from the leader which may lead to getting an entire
         * snapshot.
         *
         * However, this may degrade performance as it has to write to disk
         * and do periodic snapshot which may double the memory requirements
         */
        // 判断是否开启SyncRequestProcessor处理器
        if (syncRequestProcessorEnabled) {
            syncProcessor = new SyncRequestProcessor(this, null);
            syncProcessor.start();
        }
    }
    
    1. ObserverRequestProcessor
    2. CommitProcessor
    3. FinalRequestProcessor

    需要注意的是观察者的服务器可以开启或者关闭SyncRequestProcessor处理器,如果开启这个处理器,就可以使得观察者能对事务的持久化和打快照。

    客户端向Leader发送请求

    流程解析

    这里我们直接从处理器链开始分析,而且PrepRequestProcessor处理器我们之前已经分析过了,就不再赘述了,直接从第二个处理器ProposalRequestProcessor开始分析。

    这个处理器是负责处理投票的,会发现这个处理器直接调用下一个处理器CommitProcessor的processRequest方法

    public void processRequest(Request request) throws RequestProcessorException {
        // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = " + request.sessionId);
        // request.addRQRec(">prop");
                
        
        /* In the following IF-THEN-ELSE block, we process syncs on the leader. 
         * If the sync is coming from a follower, then the follower
         * handler adds it to syncHandler. Otherwise, if it is a client of
         * the leader that issued the sync command, then syncHandler won't 
         * contain the handler. In this case, we add it to syncHandler, and 
         * call processRequest on the next processor.
         */
        
        if(request instanceof LearnerSyncRequest){
            zks.getLeader().processSync((LearnerSyncRequest)request);
        } else {
                // 会直接调用下一个处理器
                // 调用下一个处理器的processRequest方法
                nextProcessor.processRequest(request);
            if (request.hdr != null) {
                // We need to sync and get consensus on any transactions
                try {
                    // propose方法会去投票
                    zks.getLeader().propose(request);
                } catch (XidRolloverException e) {
                    throw new RequestProcessorException(e.getMessage(), e);
                }
                // 持久化数据
                syncProcessor.processRequest(request);
            }
        }
    }
    

    主要还是将请求添加到queuedRequests队列中,主要逻辑在run方法中
    在这里插入图片描述
    这里第一次不会进入if语句,第二次queuedRequests中的请求会赋值给nextPending,所以会进入if语句,然后会被阻塞住
    在这里插入图片描述
    在这里插入图片描述
    那么回到ProposalRequestProcessor处理器,回调用propose去完成投票,或者说发出一个提议
    在这里插入图片描述
    这里只是向follower发送了一个PROPOSAL请求或者说提议
    在这里插入图片描述
    然后会调用SyncRequestProcessor处理器持久化数据
    在这里插入图片描述
    这里我们可以看看follower是怎么处理PROPOSAL请求的,在Follower对象中follower会不停的接收leader的请求并处理
    在这里插入图片描述
    下面是处理的逻辑
    在这里插入图片描述
    我们会发现这里会直接调用SyncRequestProcessor处理器持久化数据
    在这里插入图片描述
    如果SyncRequestProcessor持久化成功会调用下一个处理器就是SendAckRequestProcessor,这里会发现SendAckRequestProcessor的逻辑只是想leader发送Ack请求
    在这里插入图片描述
    那么就需要回到leader端对应的LearnerHandler接收请求,下面是处理的逻辑
    在这里插入图片描述
    这里主要是通过将发送ack请求的follower的id收集起来,然后通过过半验证机制判断是否可以提交。

    当验证通过后通过commit方法提交事务,发送COMMIT给所有的follower,通过inform方法发送INFORM给所有的observer,最后通过zk.commitProcessor.commit方法将请求添加到committedRequests队列中,唤醒之前的阻塞。

    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
    	
    	// 省略若干代码、、、、、、
    	
        if (lastCommitted >= zxid) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
                        Long.toHexString(lastCommitted), Long.toHexString(zxid));
            }
            // The proposal has already been committed
            return;
        }
        Proposal p = outstandingProposals.get(zxid);
        if (p == null) {
            LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
                    Long.toHexString(zxid), followerAddr);
            return;
        }
    
        // 添加learner的id
        // 用来过半机制的验证
        p.ackSet.add(sid);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Count for zxid: 0x{} is {}",
                    Long.toHexString(zxid), p.ackSet.size());
        }
        // 这里的containsQuorum是过半机制验证
        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){             
            if (zxid != lastCommitted+1) {
                LOG.warn("Commiting zxid 0x{} from {} not first!",
                        Long.toHexString(zxid), followerAddr);
                LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
            }
            outstandingProposals.remove(zxid);
            if (p.request != null) {
                toBeApplied.add(p);
            }
    
            if (p.request == null) {
                LOG.warn("Going to commmit null request for proposal: {}", p);
            }
            // 提交事务
            commit(zxid);
            inform(p);
            // 提交请求
            // 唤醒Commit处理器的阻塞
            zk.commitProcessor.commit(p.request);
            if(pendingSyncs.containsKey(zxid)){
                for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                    sendSync(r);
                }
            }
        }
    }
    

    这里就会将提交的请求交给下一个处理器处理
    在这里插入图片描述

    整体流程

    1. PrepRequestProcessor负责ACL安全验证,接收请求
    2. ProposalRequestProcessor直接调用CommitProcessor处理器,但是会被阻塞住
    3. ProposalRequestProcessor调用propose去投票,向follower发送PROPOSAL请求
    4. leader调用SyncRequestProcessor持久化数据
    5. follower接收PROPOSAL请求并通过SyncRequestProcessor持久化数据
    6. 如果follower持久化数据成功就会调用SendAckRequestProcessor处理器发送ACK请求
    7. leader接收ACK请求然后执行过半验证直到通过
    8. 过半验证通过会向所有的follower发送COMMIT,向所有的observer发送INFORM,将请求添加到一个提交队列中同时唤醒前面的阻塞
    9. 在leader端将请求交给CommitProcessor的下一个处理器处理

    客户端向follower发送请求

    流程解析

    同样我们直接分析FollowerRequestProcessor处理器,首先还是将请求添加到队列中,具体逻辑在run方法中
    在这里插入图片描述
    首先还是交给下一个处理器去处理,不过这里不会再做分析,主要还是对于请求转发的分析
    在这里插入图片描述
    这里会将请求交给request方法处理
    在这里插入图片描述
    这个request方法主要还是向leader发送REQUEST请求
    在这里插入图片描述
    回到LearnerHandler中,发现这里调用的是submitRequest方法
    在这里插入图片描述
    然后发现这个方法就是客户端直接对leader请求时调用的方法,那么接下来的逻辑就与上面客户端直接向leader请求的逻辑无异了
    在这里插入图片描述

    整体流程

    1. FollowerRequestProcessor向leader发送REQUEST请求
    2. leader接收REQUEST请求并调用submitRequest方法处理请求
    3. 同上面的客户端直接向leader发送请求
    展开全文
  • 我们的集群服务器多,并且有足够大多存储空间,可以多设置副本数,一般是1-3个副本数,如果集群服务器相对较少并且存储空间没有那么宽松,则可以只设定一份副本以保证容灾(副本数可以动态调整) 举个栗子: #在一个...
  • zookeeper数据一致性以及leader选举机制zookeeper数据一致性zk的数据监听机制、zk集群服务器的选举leader机制 zookeeper数据一致性 数据的一致通过zab协议保证数据的一致性 所有的写操作都从leader服务器操作,读...
  • 在Redis分布式集群中,保证数据能够均匀的分布在集群中每个机器中是Redis追求的基本操作,由于redis中的数据是动态变化的,所以为了保证通过最小的代价保证数据均匀分布,哈希一致性算法被提出 1.常规的解决方案导致...
  • 本文主要对zk的集群特性、一致性机制、选举机制进行简单分析。 一、zookeeper集群 配置多个实例共同构成一个集群对外提供服务以达到水平扩展的目的,每个服务器上的数据是相同的,每一个服务器均可以对外提供...
  • 很多文档说Zookeeper是强一致性保证,事实不然。关于一致性模型请参考http://bit1129.iteye.com/blog/2155336    Zookeeper的数据同步协议 Zookeeper采用称为Quorum Based Protocol的数据同步协议。假如...
  • 在 ZooKeeper 集群中,服务器分为 Leader 服务器、 Follower 服务器以及 Observer 服务器。 我们可以这样认为,Leader 选举是一个过程,在这个过程中 ZooKeeper 主要做了两个重要工作,一个是数据同步,另一个是选举...
  • 前言 互联网公司中,绝大部分都没有马爸爸系列的公司那样财大气粗,他们即没有强劲的服务器、也...这里我主要结合Redis集群来讲一下一致性Hash的相关问题。 Redis集群的使用 我们在使用Redis的过程中,为了保证Re
  • 1.1 zk可以用来保证数据在zk集群之间的数据的事务性一致。 Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 ...
  • zk的数据一致性问题

    2020-07-24 00:21:09
    1.假设有两个客户端A和B,分别连接到zk集群的server1和...答案是A或者B都有可能,原因是zk的数据一致性只是保证更新目录内容为Y时,有三个server服务器(比如server1,server4,server5)都进行了该目录的修改操作,
  • 多节点证明它们之间肯定会有数据的通信,同时,为了能够使zk集群对外是透明的,一个整体对外提供服务,那么客户端访问zk服务器的数据肯定是要数据同步,也即数据一致性。 zk集群是Leader/Follower模式来保证数据同步...
  • 基于该协议,ZooKeeper 实现了一种主从模式的系统架构来保持集群中各个副本之间的数据一致性。 分布式系统中leader负责外部客户端的写请求。follower服务器负责读跟同步。这时需要解决俩问题。 Leader 服务器是...
  • 对于分布式中间件缓存的节点同步其实还是很好处理的,应用服务器集群都是向中间件缓存操作缓存数据,只需要保证缓存中间件节点的数据一致性即可保证缓存数据一致性。当然,对于不同的缓存中间件,节点数据同步机制也...
  • 作用:Zookeeper 可以用来保证数据在zk集群之间的数据的事务性一致(原子操作)。 介绍:Zookeeper 是 Google 的 Chubby一个开源的实现,是 Hadoop 的分布式协调服务。 它包含一个简单的原语集,分布式应用程序可以...
  • 保证整个 ZooKeeper 集群服务器数据一致的前提下,ZooKeeper 集群才能对外提供服务。 why 介绍 ZooKeeper 集群数据同步之前,先要清楚为什么要进行数据同步。在 ZooKeeper 集群服务运行过程中,主要负责处理发送...
  • redis集群优势采用redis集群,可以保证数据分散存储(主机-主机),同时保证数据存储的一致性(主机-从机),并且在内部实现高可用的机制,实现了服务故障的自动迁移.搭建计划主从--3台主/3台从(1主1从) 端口--7000/7001/...
  • 我们搭建集群环境的时候,时间必须是要统一的,才能保证集群数据一致性。 一般操作是直接使用NTP,跟默认的时间服务器同步,但是最好还是让所有节点跟集群中的某台作为时间服务器的节点同步。 步骤:(节点有...
  • zookeeper的一致性

    2020-02-13 09:22:18
    zookeeper是分布式服务框架,zk是强一致性的,即分布式数据一致性。在多个zk中,其中一台服务器宕机,zk集群不可访问,选举完毕后,在对外暴露(服务中会缓存zk中的保存的url),保证每一个zk对外一致性。 集群中的的...
  • 1、需要在old cluster hbase停掉的情况下迁移或者停止收集数据进入hbase,保证迁移后的数据一致性 2、两个集群必须都配置外网IP,且new cluster每台服务器要把old cluster的所有外网IP、主机名配置hosts 3、在new .....
  • 而这些功能都是基于Zookeeper能在分布式的集群保证数据一致性的,这也是Zookeeper最核心的能力。 Zookeeper集群对数据的一致性的保证和Zookeeper集群高可用这两个核心功能归纳为两个知识点: 1.ZAB( ZooKeeper ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 400
精华内容 160
关键字:

服务器集群保证数据一致性