精华内容
下载资源
问答
  • Redis-leader-by-lock 使用Redis Lock轻松实现集群领导者选举 仅使用Spring-Boot和Redis 动机 几乎所有使用Spring Boot进行领导力选举的例子都转到Hazelcast(现在已弃用)和Zookeeper(在简单情况下过强)中的...
  • 领导 查找etcd领导者的简单方法。 用法 设置ETCDCTL_PEERS环境变量: export ETCDCTL_PEERS=http://etcd0.example.com:2379,...运行etcd-leader命令: ./etcd-leader http://etcd1.example.com:2379
  • LEADER LV5600使用手册

    2020-04-20 22:57:52
    LEADER 4K示波器 LV5600 LV7600 英文版 日文版 操作手册 说明书 希望可以帮助到需要的人(英文版14.3M 共计662页)
  • var Leader = require ( 'redis-leader' ) ; 应用程序接口 新领导者(redis,选项) 创建一个新的领导者 redis是标识锁的字符串键 选项 ttl锁定时间,以毫秒为单位(到时间后会自动释放) wait Time between 2 ...
  • LEADER LT4610 操作手册 英日文版 4K同步信号发生器 4K信号发生器 内包含英文 日文版操作手册 英文版2.09M 181页 还有修改LOGO的程序 希望可以帮助到需要的人
  • 外包项目中的Leader

    2021-02-04 12:53:59
    本文结合实际参加项目的一些经历,谈一点关于作项目Leader的体会,错误之处,请批评指正。1.作一个项目leader带项目就是领兵打仗,胜仗的灵魂是统帅,项目负责人就是统帅。统帅之根本在于:知人善用,爱才如命。一个...
  • 关注Leader ROS计划 一个针对基本领导者的ROS软件包,以恒定的领导者速度追逐领导者位置。 ·· 目录 设置 (可选)单击Fork 安装ROS(运动或旋律) 在本地计算机上克隆项目。 $ cd ~ /catkin_ws/src $ git clone ...
  • Becoming a Technical Leader The Psychology of Technology, original epub.
  • Leader-follower 算法的matlab算法实现
  • Leader-following consensus problem with an accelerated motion leader
  • 下行异构网络中基于多领导者多跟随者博弈的功率控制算法,韩乔妮,杨博,异构网络(Heterogeneous networks, HetNets)是正在扩张的3G网络和新兴的4G网络的重要组成部分。然而,为了达到所有用户的服务质量(Quality ...
  • Leader-Follower Output Synchronization of Linear Heterogeneous Systems With Active Leader Using Reinforcement Learning
  • etcd-leader 正在开发中,尚不适合生产使用。 领导者选举模块建立在强大的选举算法上,并经过彻底的测试。 用法 提供一个配置客户端。 请注意,此包不依赖于 node-etcd。 它与^4.0.0版本的 node-etcd 兼容。 var...
  • leader-follower.zip

    2021-04-01 19:43:05
    多智能体系统一阶有领导者实现一致性的matlab仿真代码
  • Kafka Leader 倾斜
  • leader:咨询机构-源码

    2021-03-07 09:35:54
    leader:咨询机构
  • 关注当前的PGA巡回赛活动 跟随当前的PGA巡回赛赛事。 支持语言:English
  • 在战场环境中,战术分队的队形在面对复杂静态或动态障碍物时难以较好地保持,针对此问题,提出了基于Leader-Follower算法的改进队形控制方法。在leader寻径阶段,通过在战场导航网格中应用两阶段路径搜索方法,先...
  • zookepper中leader作用

    2019-02-20 10:39:01
    zookeeper中的leader的主要作用,一个zookeeper 集群 只有一个leader: 类似master/slave模式
  • This is simulation of leader follower control system.
  • In this paper a leader-following consensus problem of second-order multi-agent systems with fixed and switching topologies as well as non-uniform time-varying delays is considered For the case of ...
  • Algorithm-leader.js.cool.zip

    2019-09-17 11:43:58
    Algorithm-leader.js.cool.zip,英国,算法是为计算机程序高效、彻底地完成任务而创建的一组详细的准则。
  • Consensus control of nonlinear leader-follower multi-agent systems with actuating disturbances
  • 前一篇文章介绍了Leader节点的选举过程,选举完成之后,集群中的各节点根据选举结果设置当前结果为LEADER或FOLLOWING。 设置完成之后,根据各自的节点状态进行启动服务。本文主要介绍下LEADER和Follower节点的启动...

    前言:

    前一篇文章介绍了Leader节点的选举过程,选举完成之后,集群中的各节点根据选举结果设置当前结果为LEADER或FOLLOWING。

    设置完成之后,根据各自的节点状态进行启动服务。本文主要介绍下LEADER和Follower节点的启动过程。

    1.leader节点启动

    leader节点的启动入口依然是QuorumPeer.run()方法

    public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
     
        @Override
        public void run() {
         
            while (running) {
                switch (getPeerState()) {
                    case LOOKING:
                        ...
    
                    // follower节点启动        
                    case FOLLOWING:
                        try {
                            LOG.info("FOLLOWING");
                            setFollower(makeFollower(logFactory));
                            follower.followLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                        } finally {
                            follower.shutdown();
                            setFollower(null);
                            setPeerState(ServerState.LOOKING);
                        }
                        break; 
                    // leader节点启动    
                    case LEADING:
                        LOG.info("LEADING");
                        try {
                            // 重点在这里,设置leader,并执行lead方法
                            setLeader(makeLeader(logFactory));
                            leader.lead();
                            setLeader(null);
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                        } finally {
                            if (leader != null) {
                                leader.shutdown("Forcing shutdown");
                                setLeader(null);
                            }
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                }          
                        
        }
    }

    1.1 QuorumPeer.makeLeader() 生成Leader

    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
        // 生成LeaderZooKeeperServer,通过1.2 我们来研究下
        return new Leader(this, new LeaderZooKeeperServer(logFactory,this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }

    1.2 LeaderZooKeeperServer

    先来看下其继承结构,如下图所示

    其本质上还是我们之前分析过的ZookeeperServer,只不过它有自己特定的方法

    public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
        CommitProcessor commitProcessor;
     
        // 最重要的一个方法
        @Override
        protected void setupRequestProcessors() {
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                    finalProcessor, getLeader().toBeApplied);
            commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                    Long.toString(getServerId()), false,
                    getZooKeeperServerListener());
            commitProcessor.start();
            ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                    commitProcessor);
            proposalProcessor.initialize();
            firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
            ((PrepRequestProcessor)firstProcessor).start();
        }
    
    }

    有关于LeaderZooKeeperServer的最重要的方法就是setupRequestProcessors(),它自定义设置了RequestProcessor的处理链,这个不同于我们之前分析过的ZookeeperServer。

    ZookeeperServer处理链为:PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor

    LeaderZooKeeperServer处理链为:PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor

    后续我们在分析LEADER处理请求时再详细分析这个处理链

    2.启动leader,同步followers

    在leader选举结束后,是不是leader节点就直接启动就结束了呢?

    实际不是这样的,leader选举完成后,当前leader节点只是一个准leader节点,后续还需要与follower一系列的操作之后,才会真正的变成leader节点,同时对外提供服务。

    具体需要哪些操作呢,我们直接看源码。

    2.1 Leader.lead() 入口方法

    public class Leader {
    	void lead() throws IOException, InterruptedException {
            ...
            try {
                self.tick.set(0);
                // 加载snapshot数据
                zk.loadData();
                
                leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
    
                // LearnerCnxAcceptor用于接收follower连接,详见2.2
                cnxAcceptor = new LearnerCnxAcceptor();
                cnxAcceptor.start();
                
                readyToStart = true;
                // 重新设置当前leader的epoch,并根据该epoch设置新的zxid
                long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
                zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
                
                synchronized(this){
                    lastProposed = zk.getZxid();
                }
                
                newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                        null, null);
    
                // TODO,这里我们还不知道这个ack是什么含义,后续介绍
                waitForEpochAck(self.getId(), leaderStateSummary);
                self.setCurrentEpoch(epoch);
                try {
                    waitForNewLeaderAck(self.getId(), zk.getZxid());
                } catch (InterruptedException e) {
                    ...
                }
                
                // 执行到这里,说明所有的follower都已经回复了ack,说明整个集群是可运行态,当前leader也可以启动服务接收客户端请求了
                startZkServer();
                
                ...
                boolean tickSkip = true;
        
                while (true) {
                    Thread.sleep(self.tickTime / 2);
                    if (!tickSkip) {
                        self.tick.incrementAndGet();
                    }
                    HashSet<Long> syncedSet = new HashSet<Long>();
    
                    // lock on the followers when we use it.
                    syncedSet.add(self.getId());
    
                    for (LearnerHandler f : getLearners()) {
                        // Synced set is used to check we have a supporting quorum, so only
                        // PARTICIPANT, not OBSERVER, learners should be used
                        if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                            syncedSet.add(f.getSid());
                        }
                        // leader主动向follower发起ping请求
                        f.ping();
                    }
    				...
                  tickSkip = !tickSkip;
                }
            } finally {
                zk.unregisterJMX(this);
            }
        }
    }

    2.2 LearnerCnxAcceptor(接收follower连接的线程)

    class LearnerCnxAcceptor extends ZooKeeperThread{
        private volatile boolean stop = false;
    
        @Override
        public void run() {
            try {
                while (!stop) {
                    try{
                        Socket s = ss.accept();
                        // 设置socket读取超时时间为tickTime * initLimit
                        s.setSoTimeout(self.tickTime * self.initLimit);
                        s.setTcpNoDelay(nodelay);
    
                        BufferedInputStream is = new BufferedInputStream(
                            s.getInputStream());
                        // 这里又创建新的线程LearnerHandler,详见2.3
                        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                        fh.start();
                    } catch (SocketException e) {
                        ...
                    } catch (SaslException e){
                        LOG.error("Exception while connecting to quorum learner", e);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Exception while accepting follower", e);
            }
        }
    ...
    }

    2.3 LearnerHandler 真正处理leader-follower之间的请求

    /**
     * There will be an instance of this class created by the Leader for each
     * learner. All communication with a learner is handled by this
     * class.
     */
    public class LearnerHandler extends ZooKeeperThread {
        
    }

    通过LearnerHandler的注释可以很明确知道:Leader会为每一个连接上来的Follower都创建一个LearnerHandler线程,两者之间的所有交流都通过这个线程来处理。具体如何处理呢,暂且不表,继续回到主线。

    到这里,我们Leader节点的启动可以先告一段落。

    通过目前我们掌握的知识点,可以知道:

    * Leader监听Follower的连接,为每一个连接都创建一个LearnerHandler线程来处理所有的请求响应;

    * leader与follower之间有一系列的动作(需要等待ack,具体是什么动作我们还不知道),等所有的动作都结束后,最终才会真正启动leader服务,接收客户端请求。

    那么leader与follower之间是如何交互的,交互了哪些东西呢?下面我们就需要从Follower节点的启动看起了。

    3.Follower节点启动

    3.1 创建Follower对象

    public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
    	protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
            return new Follower(this, new FollowerZooKeeperServer(logFactory, 
                    this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
        }
    }

    3.2 FollowerZooKeeperServer的创建

    其继承结构与LeaderZookeeperServer类似,就是中间多了一层继承LearnerZooKeeperServer,这里不再重复展示,只展示下其最重要的方法

    public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
        protected void setupRequestProcessors() {
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            commitProcessor = new CommitProcessor(finalProcessor,
                    Long.toString(getServerId()), true,
                    getZooKeeperServerListener());
            commitProcessor.start();
            firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
            ((FollowerRequestProcessor) firstProcessor).start();
            syncProcessor = new SyncRequestProcessor(this,
                    new SendAckRequestProcessor((Learner)getFollower()));
            syncProcessor.start();
        }

    可以看到,FollowerZooKeeperServer接收到客户端请求时的处理链为FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

    后续在分析到客户端请求时再具体分析。

    4.启动Follower

    在上述1中,QuorumPeer通过调用Follower.followLeader()方法来启动并同步Leader信息,具体如下:

    public class Follower extends Learner{
    	void followLeader() throws InterruptedException {
            ...
            try {
                // 获取被选举的Leader信息
                QuorumServer leaderServer = findLeader();            
                try {
                    // 创建对Leader连接
                    connectToLeader(leaderServer.addr, leaderServer.hostname);
                    // 1.注册当前节点信息到leader
                    long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
    
                    long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                    if (newEpoch < self.getAcceptedEpoch()) {
                        LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                                + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                        throw new IOException("Error: Epoch of leader is lower");
                    }
                    // 2.同步leader数据
                    syncWithLeader(newEpochZxid);                
                    QuorumPacket qp = new QuorumPacket();
                    while (this.isRunning()) {
                        // 处理leader请求
                        readPacket(qp);
                        processPacket(qp);
                    }
                } catch (Exception e) {
                    LOG.warn("Exception when following the leader", e);
                    try {
                        sock.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
        
                    // clear pending revalidations
                    pendingRevalidations.clear();
                }
            } finally {
                zk.unregisterJMX((Learner)this);
            }
        }
    }

    总结:目前通过对Follower的粗略了解,我们知道:

    * 在启动时,Follower会主动连接Leader

    * 连接后,Follower会同步Leader数据

    具体怎么做呢,下面我们结合Leader和Follower的代码一起来看下。

    5.Leader/Follower启动期间交互

    下面我们串着分析Leader/Follower的代码。

    5.1 follower创建对leader长连接

    5.1.1 follower创建连接

    public class Learner {    
    	protected void connectToLeader(InetSocketAddress addr, String hostname)
                throws IOException, ConnectException, InterruptedException {
            // 直接获取到Leader的address后,创建socket连接
            sock = new Socket();        
            sock.setSoTimeout(self.tickTime * self.initLimit);
            for (int tries = 0; tries < 5; tries++) {
                try {
                    // 连接超时时间为tickTime * syncLimit
                    sock.connect(addr, self.tickTime * self.syncLimit);
                    sock.setTcpNoDelay(nodelay);
                    break;
                } catch (IOException e) {
                    ...
                }
                Thread.sleep(1000);
            }
    		...
        } 
    }

    5.1.2 Leader监听连接

    class LearnerCnxAcceptor extends ZooKeeperThread{
        private volatile boolean stop = false;
    	...
        @Override
        public void run() {
            try {
                while (!stop) {
                    try{
                        Socket s = ss.accept();
                        s.setSoTimeout(self.tickTime * self.initLimit);
                        s.setTcpNoDelay(nodelay);
    
                        BufferedInputStream is = new BufferedInputStream(
                            s.getInputStream());
                        // 监听到follower的连接后,为每一个连接创建一个LearnerHandler来处理两者之间的请求
                        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                        fh.start();
                    } catch (SocketException e) {
                        ...
                }
            } catch (Exception e) {
                LOG.warn("Exception while accepting follower", e);
            }
        }
    	...
    }

    5.2 follower向leader注册当前节点信息

    5.2.1 follower发送注册信息

    public class Learner {      
    	protected long registerWithLeader(int pktType) throws IOException{
        	long lastLoggedZxid = self.getLastLoggedZxid();
            QuorumPacket qp = new QuorumPacket();                
            qp.setType(pktType);
            qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
            
            // 发送当前节点信息
            LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
            ByteArrayOutputStream bsid = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
            boa.writeRecord(li, "LearnerInfo");
            qp.setData(bsid.toByteArray());
            // 发送请求到leader,并从leader获取响应结果
            writePacket(qp, true);
            // leader接收到注册信息后,将leader的基本信息发送回来
            readPacket(qp);        
            final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
    		if (qp.getType() == Leader.LEADERINFO) {
            	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
            	byte epochBytes[] = new byte[4];
            	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
                // 获取leader epoch信息
            	if (newEpoch > self.getAcceptedEpoch()) {
            		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
            		self.setAcceptedEpoch(newEpoch);
            	} else if (newEpoch == self.getAcceptedEpoch()) {
                    wrappedEpochBytes.putInt(-1);
            	} else {
            		throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
            	}
                // follower发送ack信息,一次完整的请求注册就结束了
            	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
            	writePacket(ackNewEpoch, true);
                return ZxidUtils.makeZxid(newEpoch, 0);
            } else {
            	...
            }
        } 
    }

    5.2.2 Leader接收注册信息

    public class LearnerHandler extends ZooKeeperThread {
    	public void run() {
            try {
    
                // 获取follower请求注册信息
                ia = BinaryInputArchive.getArchive(bufferedInput);
                bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
                oa = BinaryOutputArchive.getArchive(bufferedOutput);
    
                QuorumPacket qp = new QuorumPacket();
                ia.readRecord(qp, "packet");
                ...
                byte learnerInfoData[] = qp.getData();
                // 获取learner基本信息
                if (learnerInfoData != null) {
                	if (learnerInfoData.length == 8) {
                		ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                		this.sid = bbsid.getLong();
                	} else {
                		LearnerInfo li = new LearnerInfo();
                		ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
                		this.sid = li.getServerid();
                		this.version = li.getProtocolVersion();
                	}
                } else {
                	this.sid = leader.followerCounter.getAndDecrement();
                }
    
               ...
                
                long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
                
                long peerLastZxid;
                StateSummary ss = null;
                long zxid = qp.getZxid();
                long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
                // 正常情况下,follower发送过来的session就是0x10000
                if (this.getVersion() < 0x10000) {
                    ...
                } else {
                    byte ver[] = new byte[4];
                    ByteBuffer.wrap(ver).putInt(0x10000);
                    // leader将epoch信息发送到follower
                    QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
                    oa.writeRecord(newEpochPacket, "packet");
                    bufferedOutput.flush();
                    // leader接收follower的ack响应
                    QuorumPacket ackEpochPacket = new QuorumPacket();
                    ia.readRecord(ackEpochPacket, "packet");
                    if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                        LOG.error(ackEpochPacket.toString()
                                + " is not ACKEPOCH");
                        return;
    				}
                    ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                    ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                    // 等待followers返回ack响应
                    leader.waitForEpochAck(this.getSid(), ss);
                }
                
                ...
            }
        }
    }

    总结:笔者刚开始直接看learner这块代码也是很迷糊,为啥会有这么多东西,不知所云,但是一旦结合着follower的代码一起看的时候就很明了了。一个请求一个响应,每个follower都完成注册的时候,后续的工作就可以展开了。

    5.3 follower从leader同步数据信息

    数据同步就是将leader服务器上那些没有在learner服务器上提交过的事务请求同步给learner服务器,以保证learner和leader数据的相同。

    而同步又可以分为以下四类:

    * 直接差异化同步(DIFF同步)

    * 先回滚再差异化同步(TRUNC+DIFF同步)

    * 仅回滚同步(TRUNC同步)

    * 全量同步(SNAP同步)

    5.3.1 leader发送同步信息

    public class LearnerHandler extends ZooKeeperThread {
    	public void run() {
     
            ...
            ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
                ReadLock rl = lock.readLock();
                try {
                    rl.lock();  
                    // 如何确定以上四种状态,就是通过maxCommittedLog  minCommittedLog 和 peerLastZxid之间的关系比对进行的
                    final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                    final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
                    LOG.info("Synchronizing with Follower sid: " + sid
                            +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
                            +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
                            +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
    
                    LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
    
                    // follower已同步过
                    if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                        LOG.info("leader and follower are in sync, zxid=0x{}",
                                Long.toHexString(peerLastZxid));
                        packetToSend = Leader.DIFF;
                        zxidToSend = peerLastZxid;
                    } else if (proposals.size() != 0) {
                        LOG.debug("proposal size is {}", proposals.size());
                        // 直接进行差异化同步
                        if ((maxCommittedLog >= peerLastZxid)
                                && (minCommittedLog <= peerLastZxid)) {
                            LOG.debug("Sending proposals to follower");
                            long prevProposalZxid = minCommittedLog;
                            boolean firstPacket=true;
                            packetToSend = Leader.DIFF;
                            zxidToSend = maxCommittedLog;
                            // 将差异化的事务提交直接发送出去
                            for (Proposal propose: proposals) {
                                // skip the proposals the peer already has
                                if (propose.packet.getZxid() <= peerLastZxid) {
                                    prevProposalZxid = propose.packet.getZxid();
                                    continue;
                                } else {
                                    // If we are sending the first packet, figure out whether to trunc
                                    // in case the follower has some proposals that the leader doesn't
                                    if (firstPacket) {
                                        firstPacket = false;
                                        // Does the peer have some proposals that the leader hasn't seen yet
                                        if (prevProposalZxid < peerLastZxid) {
                                            // send a trunc message before sending the diff
                                            packetToSend = Leader.TRUNC;                                        
                                            zxidToSend = prevProposalZxid;
                                            updates = zxidToSend;
                                        }
                                    }
                                    queuePacket(propose.packet);
                                    QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                            null, null);
                                    queuePacket(qcommit);
                                }
                            }
                        // 回滚同步    
                        } else if (peerLastZxid > maxCommittedLog) {
                            LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
                                    Long.toHexString(maxCommittedLog),
                                    Long.toHexString(updates));
    
                            packetToSend = Leader.TRUNC;
                            zxidToSend = maxCommittedLog;
                            updates = zxidToSend;
                        } else {
                            LOG.warn("Unhandled proposal scenario");
                        }
                    } else {
                        // just let the state transfer happen
                        LOG.debug("proposals is empty");
                    }               
    
                    LOG.info("Sending " + Leader.getPacketType(packetToSend));
                    leaderLastZxid = leader.startForwarding(this, updates);
    
                } finally {
                    rl.unlock();
                }
    
            	// 默认进行全量同步
                if (packetToSend == Leader.SNAP) {
                    zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                }
                oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
                bufferedOutput.flush();
        }
    }

    5.3.2 follower接收同步数据信息

    public class Learner {    
        protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
         
            readPacket(qp);
            LinkedList<Long> packetsCommitted = new LinkedList<Long>();
            LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
            synchronized (zk) {
                // 准备接收数据
                if (qp.getType() == Leader.DIFF) {
                    LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                    snapshotNeeded = false;
                }
                // 直接进行全量同步
                else if (qp.getType() == Leader.SNAP) {
                    LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
                    zk.getZKDatabase().clear();
                    zk.getZKDatabase().deserializeSnapshot(leaderIs);
                    String signature = leaderIs.readString("signature");
                    if (!signature.equals("BenWasHere")) {
                        LOG.error("Missing signature. Got " + signature);
                        throw new IOException("Missing signature");                   
                    }
                    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                // 删除差异性数据    
                } else if (qp.getType() == Leader.TRUNC) {
                    //we need to truncate the log to the lastzxid of the leader
                    LOG.warn("Truncating log to get in sync with the leader 0x"
                            + Long.toHexString(qp.getZxid()));
                    boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                    if (!truncated) {
                        // not able to truncate the log
                        LOG.error("Not able to truncate the log "
                                + Long.toHexString(qp.getZxid()));
                        System.exit(13);
                    }
                    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                }
                else {
                    LOG.error("Got unexpected packet from leader "
                            + qp.getType() + " exiting ... " );
                    System.exit(13);
    
                }
                ...
                    
                while (self.isRunning()) {
                    // 读取同步数据信息,同步到当前节点
                    readPacket(qp);
                    switch(qp.getType()) {
                    }
        }
    }

    最终通过这种同步方式,让leader与learner的数据达到一致。

    总结:

    Leader与Follower节点的启动交互过程大致就如上所述。只有当过半的机器节点完成与leader的数据同步之后,此时的leader才可以启动端口真正的对外提供服务。

    过程可以归结为以下这张图(来自<<从Paxos到Zookeeper 分布式一致性原理与实践>>)

    展开全文
  • 死磕Zookeeper Leader选举算法

    千次阅读 2019-08-31 11:38:06
    文章目录摘要一、引言二、背景知识1、Zookeeper 集群状态2、Zookeeper 集群节点角色3、术语定义4、算法简介三、Zookeeper Leader选举流程1、何时选举2、为何选举3、如何选举四、Zookeeper 节点间数据同步1、何时选举...

    摘要

    本文旨在介绍Zookeeper集群Leader过程及相关算法分析,对Zookeeper数据同步及其他功能涉及有限。

    一、引言

    最近部门组织技术分享,分享内容是架构相关的,其中考虑线上集群数据一致性问题。由此引发了Zookeeper是如何实现数据一致的讨论,中间最重要环节就是Zookeeper 集群Leader的选举过程及算法,之前自己也网上搜罗过一些Zookeepre集群Leader选举相关的文章,但多数文章对一些选举的细节讲的都不够深入和透彻,以至于直到这次分享讨论时都没有完全明白整个Leader选举过程,遂以此文终结该问题。

    二、背景知识

    正文开始之前,先介绍一下Zookeeper相关的基本概念及下文中用到的相关术语。

    1、Zookeeper 集群角色

    分布式集群中最典型的部署就是Master/Slave 模式(主备模式)和Master/Follower 模式(主从模式),然而Zookeeper并未没有采用上述模式,而是将集群中的服务分为:Leader、Follower 和 Observer 三种角色,每个角色的职责与承担功能略有差异。

    • Leader:既能提供读服务也能提供写服务,负责更新(同步)数据及数据更新(同步)前的写请求的投票发起与决议,是整个集群(半数以上Server)数据一致性的保障,所有Follower与Observer的数据均由Leader同步所得;
    • Follower:用于接受客户端请求并想客户端返回结果,在选主过程中参与投票;
    • Observer:接受客户端连接,将写请求转发给leader,但不参加投票过程,只同步leader的状态(数据),设计observer的目的是为了扩展系统,提升数据读取速度与集群的负载能力;
    • Learner:Follower和Observer的统称;
    • Client:请求发起方,获取配置信息及更新配置请求;

    图1(集群角色关系)
    集群角色

    图2(集群角色职能)
    集群角色职能

    2、Zookeeper 集群服务器(Server)的状态

    Zookeeper集群中服务器具有以下四种状态,分别为:LOOKING、FOLLOWING、LEADING及OBSERVING。

    • LOOKING:寻找Leader的状态(选举Leader状态),当服务器处于该状态时,它会认为当前集群中没有Leader,因此需要进入Leader选举状态;
    • FOLLOWING:跟随者状态,表明当前服务器角色是Follower;
    • LEADING:领导者状态,表明当前服务器角色是Leader;
    • OBSERVING:观察者状态,表明当前服务器角色是Observer;

    3、术语定义

    Zookeeper集群常用术语及Leader选举过程中名次解释:
    常用术语:

    • znode:Zookeeper中的节点,数据操作与承载的主体,对zk的操作主要是对znode的操作,根据存活时间可分为:持久节点临时节点持久节点的存活时间不依赖于客户端会话,只有客户端在显式执行删除节点操作时节点才消失,
      临时节点的存活时间依赖于客户端会话,当会话结束,临时节点将会被自动删除(也可以手动删除),临时节点也不能拥有子节点 ;

    • ZAB协议(ZooKeeper Atomic Broadcast 原子广播):是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议,是Zookeeper实现分布式数据一致性的基础,基于该协议ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性,协议有两种基本模式:恢复模式消息广播模式

    • ZAB协议的两种基本模式

      1、整个服务框架在启动或是 Leader 服务器出现网络中断、崩溃退出与重启等异常情况时,ZAB 协议就会进人*恢复模式*并选举产生新的Leader服务器。当选举产生了新的 Leader 服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态(数据)同步之后,ZAB协议就会退出*恢复模式*进入*消息广播模式*。其中,所谓的状态(数据)同步用来保证集群中存在过半的机器能够和Leader服务器的数据状态保持一致。
      
      2、当一台同样遵守ZAB协议的服务器启动后加入到集群中时,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么新加入的服务器就会进人数据恢复模式(Leader服务器暂停更新):找到Leader所在的服务器,并与其进行数据同步,数据同步完成后进入消息广播模式。
      
      3、为了实现数据一致性,ZooKeeper设计成只允许唯一的一个Leader服务器来进行事务请求的处理。Leader服务器在接收到客户端的事务请求后,会生成对应的事务提案(zxid)并发起一轮广播协议,决定是否进行事务处理;而集群中的Follower接收到客户端的事务请求时,会将这个事务请求转发给Leader服务器,由Leader发起广播协议进而处理事务请求。
      

    Leader选举名词:

    • zxid(Zookeeper Transaction Id):Client发起事务请求Leader接收后,会对该事务请求进行提议(Proposal),针对这个提议Leader服务器会生成一个64位的数字(zxid),它的高32位是epoch(选举周期)用来标识Leader关系是否改变,低32位是electionEpoch/logicalclock(本地选举周期)用来判断是否为同一次选举。zxid会被记录到transaction log中, 值越大,表示数据越新;
    • epoch(peerEpoch):选举周期,每次选举最终确定完leader结束选举流程时会自增(zxid的前32位);
    • logicalclock(electionEpoch):本地选举周期,每次投票都会自增(zxid的后32位);
    • sid/myid(Server Id):服务器在集群中被标记id;

    4、算法简介

    Zookeeper集群Leader选取过程中主要有3中选举算法:

    • LeaderElection:LeaderElection是Fast Paxos最简单的一种实现,每个Server启动以后都询问其它的Server它要投票给谁,收到所有Server回复以后,就计算出zxid最大的哪个Server,并将这个Server相关信息设置成下一次要投票的Server。该算法于Zookeeper 3.4以后的版本废弃;

      算法流程如下
       a.选举线程首先向所有Server发起一次询问(包括自己); 
      
       b.选举线程收到回复后,验证是否是自己发起的询问(验证zxid是否一致),然后获取对方的sid,并存储到当前询问对象列表中,最后获取对方提议的leader相关信息(sid,zxid),并将这些信息存储到当次选举的投票记录表中;
      
       c.收到所有Server回复以后,就计算出zxid最大的那个Server,并将这个Server相关信息设置成下一次要投票的Server;
      
       d.线程将当前zxid最大的Server设置为当前Server要推荐的Leader,如果此时获胜的Server获得多数Server票数, 将当前获胜的Server设置为Leader,其他Server根据Leader相关信息设置自己的状态(数据),否则,继续这个过程,直到leader被选举出来;
      

      异常问题的处理:

      a.选举过程中,新Server的加入:
        新Server启动后它都会发起一次选举投票,由选举线程发起相关流程,该Server会获得当前zxid最大的Server,如果当次最大的Server没有获得不低于 n/2+1 (n为所有的Server数量)的票数,那么下一次投票时,该Server将Leader选举投票给zxid最大的Server,重复以上流程,最后一定能选举出一个Leader。
      b.选举过程中,Server的退出:
        只要保证集群中至少有n/2+1个Server是正常的就没有任何问题,如果少于n/2+1个Server存活,那么该集群就不能正常提供服务(Zookeeper集群协议决定),也就没有必要进行Leader选举了。
      c.选举过程中,Leader死亡:
        选举出Leader以后,每个Server的状态(FLLOWING)都已经确定,如果此时Leader死亡,Fllower都会向Leader发送Ping消息,检查Leader状态,以便数据同步,如果无法ping通,就改变自己的状为(FLLOWING ==> LOOKING),发起新的一轮选举。
      d.双主问题:
        Leader的选举是保证有且只有一个Leader产生,而且Follower重新选举与旧Leader恢复并退出基本上是同时发生的,当有一半以上Follower无法Ping通Leader时是就会认为Leader已经出问题开始重新选举。
      
    • FastLeaderElection:由于LeaderElection收敛速度较慢(需要等n台Server全部都收到其他对应的n-1台Server回复自己信息及自己投票信息后才进行下一轮选举,一般2-3轮投票能选出Leader),所以Zookeeper引入了FastLeaderElection选举算法,FastLeaderElection也是Zookeeper默认的Leader选举算法。FastLeaderElection是标准的Fast Paxos的实现,它首先向所有Server提议自己要成为Leader,当其它Server收到提议以后,比较投票中 sid 和 zxid 的值,并决定是否接受对方的提议,然后向对方发送接受提议完成的消息。FastLeaderElection算法通过异步的通信方式来收集其它节点的选票,同时在分析选票时又根据投票者的当前状态来作不同的处理,以加快Leader的选举进程;

      算法流程如下
       A.发起一轮投票选举,推举自己作为Leader,通知所有的服务器,等待接收外部选票; 
      
       B.只要当前服务器状态为LOOKING,进入循环,不断地读取其它Server发来的通知、进行比较、更新自己的投票、发送自己的投票、统计投票结果,直到Leader选出或出错退出;具体实现如下:
      
      B.算法实现
       从队列中取出一个Notification(选票),则根据消息中对方的状态进行相应的处理:
       1.LOOKING状态:
         a.如果其他Server发送过来Notification的逻辑时钟大于当前的逻辑时钟,说明这是一次新的选举投票,此时更新本机的逻辑时钟(logicalclock),清空投票箱(数据已经过期),判断Notification是否优于当前本机的投票,是的话用对方推荐的Leader更新下一次的投票,否则使用本机的投票(投自己),通知其它Server我的投票,跳到d;
         b.如果对方处于上轮投票,不予理睬,回到B;
         c.如果对方也处于本轮投票,判断对方的投票是否优于当前的投票,是的话更新当前的投票,否则使用初始的投票(投自己)并新生成Notification消息放入发送队列。通知其它Server我的投票
         d.将收到的投票放入自己的投票箱中。
         e.判断所推荐的Leader是否得到集群多数人的同意(根据计票器的实现不同,可以是单纯看数量是否超过n/2,也可以是按权重来判断,我们这里假设单纯看数量),如果得到多数人同意,那么还需等待一段时间,看是否有比当前更优的提议,如果没有,则认为投票结束。根据投票结果修改自己的状态。以上任何一条不满足,则继续循环。
         
       2.OBSERVING状态:不做任何事;
       
       3.FOLLOWING或LEADING状态:
        a.如果选举周期相同(选票是同一轮选举产生),将该数据保存到投票箱,根据当前投票箱的投票判断对方推荐的Leader是否得到多数人的同意,如果是则设置状态退出选举过程,否则到b;
        b.这是一条与当前逻辑时钟不符合的消息,或者对方推荐的Leader没有得到多数人的同意(有可能是收集到的投票数不够),那么说明可能在另一个选举过程中已经有了选举结果,于是将该选举结果加入到outofelection集合中,再根据outofelection来判断是否可以结束选举,如果可以也是保存逻辑时钟,设置状态,退出选举过程。否则继续循环。outofelection用于保存那些状态为FOLLOWING或者LEADING的ZooKeeper节点发送的选票,由于对方的状态为FOLLOWING或者LEADING,所以它们当前不参与选举过程(可能人家已经选完了),因此称为“out of election”;
      
    • AuthFastLeaderElection:AuthFastLeaderElection算法同FastLeaderElection算法基本一致,只是在消息中加入了认证信息,该算法在最新的Zookeeper中也建议弃用,这里就不做过多的介绍了,有需要的可自行网上查阅了解;

    三、Zookeeper Leader选举过程

    上面介绍了一些Zookeeper相关的知识,特别是Leader选举算法中的FastLeaderElection算法,实现起来比较复杂,看完之后也未必能够明白,下面重点通过实例的方式对该算法进行讲解说明。

    1、何时选举Leader

    • 服务器初始化启动时;
    • 服务器运行期间无法和Leader保持连接(Follower Ping不通 Leader);

    2、为何选举Leader

    Leader是保证分布式数据一致性的关键所在,所有Follower服务器都需要从Leader同步数据,如果集群中Leader不存在将无法保证集群中服务器上的数据一致性,Zookeeper也就失去了其存在的价值与意义。

    3、如何选举Leader

    Leader选举一般有两种情况:服务器初始化启动和服务器运行期间无法和Leader保持连接,下面分别针对这两种情况进行说明:

    服务器初始化启动时期Leader选举:

    若进行Leader选举,则至少需要3台机器(否则无法达到Leader收到投票过半数的要求),假设服务器集群中有5台机器(编号依次为Server1-Server5),在集群初始化阶段,所有服务器均启动完成后,此时任意两台机器都可以相互通信,每台机器都试图找到Leader,于是就进入Leader选举过程。
    选举过程如下:

    1.  各个Server发起投票。每个Server将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid(sid)和zxid,使用(myid, zxid)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),Server3的投票为(3, 0),Server4的投票为(4, 0),Server5的投票为(5, 0),然后各自将这个投票发给集群中其他机器;
    2.  接受来自其他Server的投票。集群的每个服务器收到其他Server的投票后,首先判断该投票的有效性,如检查是否是本轮投票(基于zxid)、是否来自LOOKING状态的服务器等;
    3. 处理投票。针对每一个接收到的投票,服务器都需要将其他Server的投票和自己的投票进行PK,PK规则如下:
      a. 比较zxid,zxid大的投票信息保留,并作为下一轮的投票信息;
      b.zxid相同比较myid(sid),myid(sid)大的投票信息保留,并作为下一轮的投票信息;
      对于Server1而言,它自己的投票是(1, 0),接收到的投票为(2, 0)、(3, 0)、(4, 0)、(5, 0),规矩比较规则,会更新自己的投票为(5, 0),然后重新投票;对于Server3、Server4同理也会更新自己的投票为(5,0);对Server5而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
    4. 统计投票结果。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于第二轮投票Server1、Server2、Server3而言,都统计出集群中已经有3台机器接受了(5, 0)的投票信息,此时便认为已经选出了Leader 为 Server5;
    5. 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING;
    
    服务器运行期间无法和Leader保持连接:

    在Zookeeper运行期间,Leader与Follower服务器各司其职,即便当有Follower服务器宕机或新加入,此时也不会影响Leader,但是一旦Leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮Leader选举,其过程和启动时期的Leader选举过程基本一致。假设Zookeepre集群中正在运行的服务器有Server1、Server2、Server3、Server4、Server5,当前Leader是Server5,若某一时刻Server5挂了,此时便开始新Leader的选举。选举过程如下:

    1.  变更状态。Leader挂后,余下的Follower服务器都会将自己的服务器状态变更为LOOKING,然后开始进入Leader选举;
    2.  每个Server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定Server1的zxid为121,Server2的zxid为122,Server3的zxid为122,Server4的zxid为121;在第一轮投票中,Server1、Server2、Server3、Server4都会投自己,产生投票(1, 121),(2, 122),(3, 122),(4, 121),然后各自将投票发送给集群中所有机器;
    3. 处理投票。针对每一个接收到的投票,服务器都需要将其他Server的投票和自己的投票进行PK,PK规则如下:
      a. 比较zxid,zxid大的投票信息保留,并作为下一轮的投票信息;
      b.zxid相同比较myid(sid),myid(sid)大的投票信息保留,并作为下一轮的投票信息;
      对于Server1而言,它自己的投票是(1, 121),接收到的投票为(2, 122)、(3, 122)、(4, 121),规矩比较规则,会更新自己的投票为(3,122),然后重新投票;对于Server2、Server4同理也会更新自己的投票为(3,122);对Server3而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
    4. 统计投票结果。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于第二轮投票Server1、Server2、Server4而言,都统计出集群中已经有3台机器接受了(3,122)的投票信息,此时便认为已经选出了Leader位Server3;
    5. 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么状态会有LOOKING变更为FOLLOWING,如果是Leader,就变更为LEADING;
    

    四、Zookeeper 一致性协议与数据同步

    Zookeeper实现数据一致性的核心是ZAB协议,该协议需要做到以下几点:

    • 集群在半数以下节点宕机的情况下,能正常对外提供服务;
    • 客户端的写请求全部转交给leader来处理,leader需确保写变更能实时同步给所有follower及observer;
    • leader宕机或整个集群重启时,需要确保那些已经在leader服务器上提交的事务最终被所有服务器都提交,确保丢弃那些只在leader服务器上被提出的事务,并保证集群能快速恢复到故障前的状态;

    Zab协议有两种模式: 崩溃恢复(选主+数据同步)和消息广播(事务操作),任何时候都需要保证只有一个主进程负责进行事务操作,而如果主进程崩溃了,就需要迅速选举出一个新的主进程,主进程的选举机制与事务操作机制是紧密相关的。

    1、选主后数据同步:选主算法中的zxid是从内存数据库中取的最新事务id,事务操作分为两个阶段:提出阶段和提交阶段,leader生成提议并广播给followers,收到半数以上的ack后,再广播commit消息,同时将事务操作应用到内存中。follower收到提议后先将事务写到本地事务日志,然后反馈ack,等接到leader的commit消息时,才会将事务操作应用到内存中。选主其实只是选出了内存数据是最新的节点,仅仅靠这个是无法保证已经在leader服务器上提交的事务最终被所有服务器都提交。比如leader发起提议p1,并收到半数以上follower关于p1的ack后,在广播commit消息之前宕机了,选举产生的新leader之前是follower,未收到关于p1的commit消息,内存中是没有p1的数据。而ZAB协议的设计是需要保证选主后,p1是需要应用到集群中的。这块的逻辑是通过选主后的数据同步来弥补。

    选主后,节点需要切换状态,leader切换成LEADING状态后的流程如下:

    • 重新加载本地磁盘上的数据快照至内存,并从日志文件中取出快照之后的所有事务操作,逐条应用至内存,并添加到已提交事务缓存commitedProposals。这样能保证日志文件中的事务操作,必定会应用到leader的内存数据库中。
    • 获取learner发送的FOLLOWERINFO/OBSERVERINFO信息,并与自身commitedProposals比对,确定采用哪种同步方式,不同的learner可能采用不同同步方式(DIFF同步、TRUNC+DIFF同步、SNAP同步)。这里是拿learner内存中的zxid与leader内存中的commitedProposals(min、max)比对,如果zxid介于min与max之间,但又不存在于commitedProposals中时,说明该zxid对应的事务需要TRUNC回滚;如果 zxid 介于min与max之间且存在于commitedProposals中,则leader需要将zxid+1~max 间所有事务同步给learner,这些内存缺失数据,很可能是因为leader切换过程中造成commit消息丢失,learner只完成了事务日志写入,未完成提交事务,未应用到内存。
    • leader主动向所有learner发送同步数据消息,每个learner有自己的发送队列,互不干扰。同步结束时,leader会向learner发送NEWLEADER指令,同时learner会反馈一个ack。当leader接收到来自learner的ack消息后,就认为当前learner已经完成了数据同步,同时进入“过半策略”等待阶段。当leader统计到收到了一半已上的ack时,会向所有已经完成数据同步的learner发送一个UPTODATE指令,用来通知learner集群已经完成了数据同步,可以对外服务了。

    2、事务操作:ZAB协议对于事务操作的处理是一个类似于二阶段提交过程。针对客户端的事务请求,leader服务器会为其生成对应的事务proposal,并将其发送给集群中所有follower机器,然后收集各自的选票,最后进行事务提交。流程如下图:
    事务操作
    ZAB协议的二阶段提交过程中,移除了中断逻辑(事务回滚),所有follower服务器要么正常反馈leader提出的事务proposal,要么就抛弃leader服务器。follower收到proposal后的处理很简单,将该proposal写入到事务日志,然后立即反馈ack给leader,也就是说如果不是网络、内存或磁盘等问题,follower肯定会写入成功,并正常反馈ack。leader收到过半follower的ack后,会广播commit消息给所有learner,并将事务应用到内存;learner收到commit消息后会将事务应用到内存。

    五、参考文献

    1.https://segmentfault.com/a/1190000016349824
    2.https://juejin.im/post/5b949d595188255c6a041c22
    3.https://www.cnblogs.com/hongdada/p/8145075.html
    4.https://blog.csdn.net/zhengzhihust/article/details/53456371
    5.https://zhuanlan.zhihu.com/p/25594630

    展开全文
  • zookeeper的Leader选举机制详解

    千次阅读 2019-07-22 08:33:01
      该函数用于判断Leader选举是否结束,即是否有一半以上的服务器选出了相同的Leader,其过程是将收到的选票与当前选票进行对比,选票相同的放入同一个集合,之后判断选票相同的集合是否超过了半数。   ...

    转载自:https://www.toutiao.com/i6701570306445672963/?tt_from=copy_link&utm_campaign=client_share&timestamp=1563546713&app=news_article_lite&utm_source=copy_link&utm_medium=toutiao_ios&req_id=201907192231531720170000013322D80&group_id=6701570306445672963

    分布式开发必须知道的Zookeeper知识及其的Leader选举机制(ZAB原子广播协议)

      ZooKeeper是Hadoop下的一个子项目,它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、分布式同步、组服务等; 它的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

    ZooKeeper系统架构

      下图就是Zookeeper的架构图:

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

      从上面的架构图中,我们需要了解的主要的信息有:

      ①ZooKeeper分为服务器端(Server)和客户端(Client),客户端可以连接到整个ZooKeeper服务的任意服务器上(Leader除外)。

      ②ZooKeeper 启动时,将从实例中选举一个Leader,Leader 负责处理数据更新等操作,一个更新操作成功的标志是当且仅当大多数Server在内存中成功修改数据(Quorom机制)。每个Server 在内存中存储了一份数据。

      ③Zookeeper是可以集群复制的,集群间通过Zab协议(Zookeeper Atomic Broadcast)来保持数据的一致性;

      ④Zab协议包含两个阶段:Leader Election阶段和Atomic Brodcast阶段。群中将选举出一个Leader,其他的机器则称为Follower,所有的写操作都被传送给Leader,并通过Brodcast将所有的更新告诉给Follower。 当Leader被选举出来,且大多数服务器完成了和leader的状态同步后,Leadder Election 的过程就结束了,就将会进入到Atomic Brodcast的过程。Atomic Brodcast同步Leader和Follower之间的信息,保证Leader和Follower具有形同的系统状态。


    Quorom机制简介

      在分布式系统中,冗余数据是保证可靠性的手段,因此冗余数据的一致性维护就非常重要。一般而言,一个写操作必须要对所有的冗余数据都更新完成了,才能称为成功结束。比如一份数据在5台设备上有冗余,因为不知道读数据会落在哪一台设备上,那么一次写操作,必须5台设备都更新完成,写操作才能返回。

      对于写操作比较频繁的系统,这个操作的瓶颈非常大。Quorum算法可以让写操作只要写完3台就返回。剩下的由系统内部缓慢同步完成。而读操作,则需要也至少读3台,才能保证至少可以读到一个最新的数据。


    Zookeeper中的四种角色

    ①Leader:领导者,负责进行投票的发起和决议,更新系统状态。

    ②Learner:学习者

    ③Follower(Learner的子类):跟随者,用于接受客户端请求并向客户端返回结结果,在选主过程中参与投票,Follower可以接收Client请求,如果是写请求将转发给Leader来更新系统状态。

    ④Observer:观察者,可以接收客户端连接,将写请求转发给Leader节点,但是不参与投票过程,只是同步Leader状态,因为Follower增多会导致投票阶段延迟增大,影响性能。Observer的目的是为了扩展系统,提高读取数据。


    为什么Zookeeper中的Server数目一般为基数?

      我们知道在Zookeeper中 Leader 选举算法采用了Quorom算法。该算法的核心思想是当多数Server写成功,则任务数据写成功。假设有3个Server,则最多允许一个Server挂掉;如果有4个Server,则同样最多允许一个Server挂掉。既然3个或者4个Server,同样最多允许1个Server挂掉,那么它们的可靠性是一样的,所以选择奇数个ZooKeeper Server即可,这里选择3个Server。


    Zookeeper用于Leader选举的算法

    ①基于UDP的LeaderElection

    ②基于UDP的FastLeaderElection

    ③基于UDP和认证的FastLeaderElection

    ④基于TCP的FastLeaderElection(默认值)


    FastLeaderElection机制

      接下来要说的就是Zookeeper的Leader选举机制核心算法FastLeaderElection类。FastLeaderElection实现了Election接口,其需要实现接口中定义的lookForLeader(核心的选举算法入口)方法和shutdown方法FastLeaderElection选举算法是标准的Fast Paxos算法实现,可解决LeaderElection选举算法收敛速度慢的问题。

    术语介绍

    sid(myid)

      每个Zookeeper服务器,都需要在数据文件夹下创建一个名为myid的文件,该文件包含整个Zookeeper集群唯一的ID(整数)。例如某Zookeeper集群包含三台服务器,hostname分别为zoo1、zoo2和zoo3,其myid分别为1、2和3,则在配置文件中其ID与hostname必须一一对应,如下所示。在该配置文件中,server.后面的数据即为myid(Leader选举时用的sid或者leader)。

    server.1=zoo1:2888:3888
    server.2=zoo2:2888:3888
    server.3=zoo3:2888:3888
    

    zxid

      类似于RDBMS中的事务ID,用于标识一次更新操作的Proposal ID。为了保证顺序性,该zkid必须单调递增。因此Zookeeper使用一个64位的数来表示,高32位是Leader的epoch,从1开始,每次选出新的Leader,epoch加一。低32位为该epoch内的序号,每次epoch变化,都将低32位的序号重置。这样保证了zxid的全局递增性。


    Zookeeper节点的四种状态

      截图为Zookeeper定义的四种服务器节点状态:

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    • LOOKING: 不确定Leader状态。该状态下的服务器认为当前集群中没有Leader,会发起Leader选举。
    • FOLLOWING: 跟随者状态。表明当前服务器角色是Follower,并且它知道Leader是谁。
    • LEADING: 领导者状态。表明当前服务器角色是Leader,它会维护与Follower间的心跳。
    • OBSERVING: 观察者状态。表明当前服务器角色是Observer,与Folower唯一的不同在于不参与选举,也不参与集群写操作时的投票。

    FastLeaderElection内部类

    FastLeaderElection的内部类的情况如下图:

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    • Notification:表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的id、zxid、选举周期等信息。
    • ToSend:表示发送给其他服务器的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的id、zxid、选举周期等信息。
    • Messenger:包含了WorkerReceiverWorkerSender两个内部类。WorkerReceiver实现了Runnable接口,是选票接收器。WorkerSender也实现了Runnable接口,为选票发送器

    Notification(收到的投票信息)

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    • leader:被推选的leader的id。
    • zxid:被推选的leader的事务id。
    • electionEpoch:推选者的选举周期。
    • state:推选者的状态。
    • sid:推选者的id。
    • peerEpoch:被推选者的选举周期。

    ToSend(发送的投票信息)

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    • leader:被推选的leader的id。
    • zxid:被推选的leader的事务id。
    • electionEpoch:推选者的选举周期。
    • state:推选者的状态。
    • sid:推选者的id。
    • peerEpoch:被推选者的选举周期。

    WorkerSender(选票发送器)

      WorkerSender也实现了Runnable接口,为选票发送器,其会不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中。

    • 获取选票

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    • 发送选票

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     


    WorkerReceiver(选票接收器)

      WorkerReceiver实现了Runnable接口,是选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息中。先会从QuorumCnxManager中的pollRecvQueue队列中取出其他服务器发来的选举消息,消息封装在Message数据结构中。然后判断消息中的服务器id是否包含在可以投票的服务器集合中,若不是,则会将本服务器的内部投票发送给该服务器,其流程如下:

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

      若包含该服务器,则根据消息(Message)解析出投票服务器的投票信息并将其封装为Notification,然后判断当前服务器是否为LOOKING,若为LOOKING,则直接将Notification放入FastLeaderElection的recvqueue。然后判断投票服务器是否为LOOKING状态,并且其选举周期小于当前服务器的逻辑时钟,则将本(当前)服务器的内部投票发送给该服务器,否则,直接忽略掉该投票。其流程如下:

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

      若本服务器的状态不为LOOKING,则会根据投票服务器中解析的version信息来构造ToSend消息,放入sendqueue,等待发送,起流程如下:

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     


    核心函数分析

    sendNotifications函数

      其会遍历所有的参与者投票集合,然后将自己的选票信息发送至上述所有的投票者集合,其并非同步发送,而是将ToSend消息放置于sendqueue中,之后由WorkerSender进行发送。

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    totalOrderPredicate函数

      该函数将接收的投票与自身投票进行PK,查看是否消息中包含的服务器id是否更优,其按照epoch、zxid、id的优先级进行PK。

    • 判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader。
    • 如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader。
    • 如果前面两个都相等那就比较服务器id,如果大,则其就是leader。

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    termPredicate函数

      该函数用于判断Leader选举是否结束,即是否有一半以上的服务器选出了相同的Leader,其过程是将收到的选票与当前选票进行对比,选票相同的放入同一个集合,之后判断选票相同的集合是否超过了半数。

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    checkLeader函数

      该函数检查是否已经完成了Leader的选举,此时Leader的状态应该是LEADING状态。

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    lookForLeader函数

      该函数就是leader选举的核心方法,代码行数有点多,这里仅分析其中的一部分。

    • 更新逻辑时钟、更新选票、发送选票

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    • 获取投票数据、连接服务器

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    • 选举Leader

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

    • LEADING状态处理

    这可能是全网把zookeeper的Leader选举机制讲的最透彻的一篇文章

     

      以上就是关于zookeeper的所有基本知识与Leader选举机制的讲解。

    展开全文
  • Teamleader API Teamleader APIPHP客户端 警告 该应用程序仍在开发中,可以实现重大更改。 请您自己承担风险使用。 安装 您可以使用composer安装软件包 composer require justijndepover/teamleader-api 用法 连接...
  • Leader-following consensus of nonlinear multiagent Systems With Stochastic Sampling
  • 对es的ccr功能研读后,进行一些参数解释及测试结论猜测
  • Kafka如何修改分区Leader | 文末送书8本

    千次阅读 多人点赞 2021-09-26 17:14:40
    他们有个需求是, 想指定某个分区中的其中一个副本为Leader 需求分析 对于这么一个问题,在我们生产环境还是挺常见的,经常有需要修改某个Topic中某分区的Leader 比如 topic1-0这个分区有3个副本[0,1,2], 按照「优先...

    该文章可能已过期,已不做勘误并更新,请访问原文地址(持续更新) 如何修改分区的指定副本为Leader|设计方案
    kafka知识图谱: Kafka知识图谱大全

    直接跳到末尾:读者福利 赠8本书籍!

    🔥《Kafka运维管控平台》🔥
    ✏️更强大的管控能力✏️
    🎾更高效的问题定位能力🎾
    🌅更便捷的集群运维能力🌅
    🎼更专业的资源治理🎼
    🌞更友好的运维生态🌞

    大家好,我是石臻臻,这是 「kafka专栏」 连载中的第「N」篇文章…

    前几天有个群友问我: kafka如何修改优先副本?
    他们有个需求是, 想指定某个分区中的其中一个副本为Leader

    在这里插入图片描述

    需求分析

    对于这么一个问题,在我们生产环境还是挺常见的,经常有需要修改某个Topic中某分区的Leader
    比如 topic1-0这个分区有3个副本[0,1,2], 按照「优先副本」的规则,那么 0 号副本肯定就是Leader
    我们都知道分区中的只有Leader副本才会提供读写副本,其他副本作为备份
    假如在某些情况下,「0」 号副本性能资源不够,或者网络不太好,或者IO压力比较大,那么肯定对Topic的整体读写性能有很大影响, 这个时候切换一台压力较小副本作为Leader就显得很重要;

    优先副本: 分区中的AR(所有副本)信息, 优先选择排在第一位的副本作为Leader
    Leader机制: 分区中只有一个Leader来承担读写,其他副本只是作为备份

    那么如何实现这样一个需求呢?

    解决方案

    知道了原理之后,我们就能想到对应的解决方案了
    只要将 分区的 AR 中的第一个位置,替换成你指定副本就行了;
    AR = { 0,1,2 } ==> AR = {2,1,0}

    一般能够达到这个目的有两种方案,下面我们来分析一下

    方案一: 分区副本重分配

    之前关于分区副本重分配 我已经写过很多文章了,如果想详细了解 分区副本重分配、数据迁移、副本扩缩容 可以看看链接的文章, 这里我就简单说一下;

    一般分区副本重分配主要有三个流程

    1. 生成推荐的迁移Json文件
    2. 执行迁移Json文件
    3. 验证迁移流程是否完成

    这里我们主要看第2步骤, 来看看迁移文件一般是什么样子的

    {
    	"version": 1,
    	"partitions": [{
    		"topic": "topic1",
    		"partition": 0,
    		"replicas": [0,1,2]
    	}]
    }
    

    这个迁移Json意思是, 把topic1的「0」号分区的副本分配成[0,1,2] ,也就是说 topic1-0号分区最终有3个副本分别在 {brokerId-0,brokerId-1,brokerId-2} ; 如果你有看过我之前写的 分区副本重分配原理源码分析 ,那么肯定就知道,不管你之前的分配方式是什么样子的, 最终副本分配都是 [0,1,2] , 之前副本多的,会被删掉,少的会被新增;

    那么我们想要实现 我们的需求
    是不是把这个Json文件 中的 “replicas”: [0,1,2] 改一下就行了,比如改成 “replicas”: [2,1,0]
    改完Json后执行,执行execute, 正式开始重分配流程! 迁移完成之后, 就会发现,Leader已经变成上面的第一个位置的副本「2」

    优缺点

    优点: 实现了需求, 并且主动切换了Leader

    缺点: 操作比较复杂容易出错,需要先获取原先的分区分配数据,然后手动修改Json文件,这里比较容易出错,影响会比较大,当然这些都可以通过校验接口来做好限制, 最重要的一点是 副本重分配当前只能有一个任务 !
    假如你当前有一个「副本重分配」的任务在,那么这里就不能够执行了, 「副本重分配」是一个比较「重」 了的操作,出错对集群的影响比较大

    方案二: 手动修改AR顺序

    首先,我们知道分区副本的分配数据是保存在zookeeper中的节点brokers/topics/{topicName} 中; 我们看个Topic1的节点数据例子;

    {
    	"version": 2,
    	"partitions": {
    		"2": [3, 2, 1],
    		"1": [2, 1, 3],
    		"4": [2, 3, 1],
    		"0": [1, 3, 2],
    		"3": [1, 2, 3]
    	},
    	"adding_replicas": {},
    	"removing_replicas": {}
    }
    

    数据解释:
    version:
    版本信息, 现在有 「1」、「2」 两个版本

    removing_replicas:
    需要删除的副本数据, 在进行分区副本重分配过程中,
    多余的副本会在数据迁移快完成的时候被删除掉,删除成功这里的数据会被清除

    adding_replicas:
    需要新增的副本数据,在进行分区副本重分配过程中,
    新增加的副本将会被新增,新增完成这里的数据会清除;

    partitions:
    Topic的所有分区副本分配方式; 上面表示总共有5个分区,以及对应的副本位置;

    知道了这些之后,想要修改优先副本,是不是可以通过直接修改zookeeper中的节点数据就行了; 比如
    我们把 「1」号分区的副本位置改成 [2,1,3]

    在这里插入图片描述
    改成这样之后, 还需要 执行 重新进行优先副本选举操作 ,例如通过kafka的命令执行

    
    sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic Topic1--election-type PREFERRED --partition 1
    
    

    --election-type : PREFERRED 这个表示的以优先副本的方式进行重新选举

    那么做完这两步之后, 我们的修改优先副本的目的就达成了…吗 ?

    实则并没有, 因为这里仅仅只是修改了 zookeeper节点的数据, 而bin/kafka-leader-election.sh 重选举的操作是Controller来进行的; 如果你对Controller的作用和源码足够了解, 肯定知道Controller里面保存了每个Topic的分区副本信息, 是保存在JVM内存中的, 然后我们手动修改Zookeeper中的节点,并没有触发 Controller更新自身的内存
    也就是说 就算我们执行了kafka-leader-election.sh, 它也不会有任何变化,因为优先副本没有被感知到修改了;

    解决这个问题也很简单,让Controller感知到数据的变更就行了
    最简单的方法, 让Controller发生重新选举, 数据重新加载!

    总结

    1. 手动修改zookeeper中的「AR」顺序
    2. Controller 重新选举
    3. 执行 分区副本重选举操作(优先副本策略)

    简单代码
    当然上面功能,肯定是要集成到LogiKM中的咯; 简单代码如下

    			// 这里转换成HashMap类型,切勿自定义类型,以防kafka节点数据后续新增数据节点,导致数据丢失
                HashMap partitionMap = zkConfig.get(ZkPathUtil.getBrokerTopicRoot(topicName), HashMap.class);
                JSONObject partitionJson = (JSONObject)partitionMap.get("partitions");
                JSONArray partitions = (JSONArray)partitionJson.get(partition);
    			
    			//部分代码省略
    			
    			 //调换序列 优先副本
                Integer first = partitions.getInteger(0);
                partitions.set(0,targetBroker);
                partitions.set(index,first);            
    
                zkUtils = ZookeeperUtils.getKafkaZkUtils(clusterDO.getZookeeper());
                String json = JSON.toJSONString(partitionMap);
    
                zkUtils.updatePersistentPath(ZkPathUtil.getBrokerTopicRoot(topicName), json,null);
    
                //写入成功之后触发一下 异步去优先副本选举
                new Thread(()->{
                    try {
                        // 1. 先让Controller重新选举 (不然上面修改的还没有生效)  (TODO.. 待优化  -> 频繁的Controller重选举对集群性能会有影响)
                        zkConfig.deletePath(ZkPathUtil.CONTROLLER_ROOT_NODE);
                        // 等待 Controller 选举一下
                        Thread.sleep(1000);
                        //2. 然后再发起副本重新选举
                        preferredReplicalElectCommand.preferredReplicaElection(clusterId,topicName,partition,"");
                    } catch (ConfigException | InterruptedException e) {
                        LOGGER.error("重新选举异常.e:{}",e);
                        e.printStackTrace();
                    }
    
                }).start();
    
    

    优缺点

    优点: 实现了目标需求, 简单, 操作方便

    缺点: 频繁的Controller重选举对生产环境来说会有一些影响;

    优化与改进

    第二种方案中,需要Controller 重选举, 频繁的选举肯定是对生产环境有影响的;
    Controller承担了非常多的责任,比如分区副本重分配删除topicLeader选举 等等还有很多都是它在干

    那么如何不进行Controller的重选举,也能达到我们的需求呢?

    我们的需求是,当我们 修改了zookeeper中的节点数据的时候,能够迅速的让Controller感知到,并更新自己的内存数据就行了;

    对于这个问题,我会在下一期文章中介绍

    问题

    看完这篇文章,提几个相关的问题给大家思考一下;

    1. 如果我在修改zk中的「AR」信息时候不仅仅是调换顺序,而是有新增或者删除副本会发生什么情况呢?
    2. 如果手动修改brokers/topics/{topicName}/partitions/{分区号}/state 节点里面的leader信息,能不能直接更新Leader?
    3. 副本选举的整个流程是什么样子的?

    大家可以思考一下, 问题答案我会在后面的文章中一一讲解!

    点个关注, 推送更多 干货 内容, 一起进 【滴滴技术答疑群 】 跟众多技术专家交流技术吧!


    再赠8本,欢度每周五

    从上周五开始决定送书到现在,已经送出去 「 30 」本书了, 又快到周五啦,接着搞, 本周五一口气送
    8 」本, 提供3款书任选,其中一本是 《分布式一致性算法开发实战》 ,

    参与方式:

    1. 给本文「一键三连」 支持博主
    2. 扫描下面二维码关注博主,在公众号里面回复 「66」参与抽奖

    【编辑推荐】
    1.系统:选举、日志和多个高级主题逐步深入讲解。
    2.详尽:通过3万行源码和测试,详细分析设计细节及实现难点。
    3.生产级:基于Netty的生产级异步IO实现。
    4.完整:包含交互式客户端的简易分布式KV服务。


    以下三选一

    👇🏻 扫描 下方 公众号 回复 「66」参与抽奖👇🏻

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 156,920
精华内容 62,768
关键字:

leader