精华内容
下载资源
问答
  • Paxos:Paxos实现-源码

    2021-03-08 07:45:03
    Paxos Paxos实现
  • python-Paxos算法实现

    千次阅读 2018-06-20 15:33:43
    如果对Paxos算法逻辑理论不清楚的,请参考Paxos算法原理与推导这篇文章,这里不再赘述。 理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,...

    如果对Paxos算法逻辑理论不清楚的,请参考Paxos算法原理与推导这篇文章,这里不再赘述。

    理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,我觉着还是有必要自己亲自实践一下。

    这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼。这里环境用的是python2.7,
    这里写图片描述

    class Message:
        # command
        MSG_ACCEPTOR_AGREE = 0  # 追随者约定
        MSG_ACCEPTOR_ACCEPT = 1  # 追随者接受
        MSG_ACCEPTOR_REJECT = 2  # 追随者拒绝-网络不通
        MSG_ACCEPTOR_UNACCEPT = 3  # 追随者网络通-不同意
        MSG_ACCEPT = 4  # 接受
        MSG_PROPOSE = 5  # 提议
        MSG_EXT_PROPOSE = 6  # 额外提议
        MSG_HEARTBEAT = 7  # 心跳,每隔一段时间同步消息
    
        def __init__(self, command=None):
            self.command = command
    
        # 把收到的消息原原路返回,作为应答消息
        def copyAsReply(self, message):
            # 提议ID  #当前的ID  #发给谁  #谁发的
            self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
            self.value = message.value  # 发的信息
    

    然后是利用socket,线程和队列实现的消息处理器:

    # 基于socket传递消息,封装网络传递消息
    import threading
    import pickle
    import socket
    import queue
    
    
    class MessagePump(threading.Thread):
        # 收取消息线程
        class MPHelper(threading.Thread):
            #
            def __init__(self, owner):
                self.owner = owner
                threading.Thread.__init__(self)
    
            def run(self):
                while not self.owner.abort:  # 只要所有者线程没有结束,一直接受消息
                    try:
                        (bytes, addr) = self.owner.socket.recvfrom(2048)  # 收取消息
                        msg = pickle.loads(bytes)  # 读取二进制数据转化为消息
                        msg.source = addr[1]
                        self.owner.queue.put(msg)  # 队列存入消息
                    except Exception as e:
                        pass
    
        def __init__(self, owner, port, timeout=2):
            threading.Thread.__init__(self)
            self.owner = owner
            self.abort = False
            self.timeout = 2
            self.port = port
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # UDP通信
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000)  # 通信参数
            self.socket.bind(("localhost", port))  # 通信地址,ip,端口
            self.socket.settimeout(timeout)  # 超时设置
            self.queue = queue.Queue()  # 队列
            self.helper = MessagePump.MPHelper(self)  # 接收消息
    
        # 运行主线程
        def run(self):
            self.helper.start()  # 开启收消息的线程
            while not self.abort:
                message = self.waitForMessage()  # 阻塞等待
                self.owner.recvMessage(message)  # 收取消息
    
        # 等待消息
        def waitForMessage(self):
            try:
                msg = self.queue.get(True, 3)  # 抓取数据,最多等待3s
                return msg
            except:
                return None
    
        # 发送消息
        def sendMessage(self, message):
            bytes = pickle.dumps(message)  # 转化为二进制
            address = ("localhost", message.to)  # 地址ip,端口(ip,port)
            self.socket.sendto(bytes, address)
            return True
    
        #是否停止收取消息
        def doAbort(self):
            self.abort = True
    

    再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的

    from MessagePump import MessagePump
    import random
    
    
    class AdversarialMessagePump(MessagePump):  # 类的继承
        # 对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序
        def __init__(self, owner, port, timeout=2):
            MessagePump.__init__(self, owner, port, timeout)  # 初始化父类
            self.messages = set()  # 集合避免重复
    
        def waitForMessage(self):
            try:
                msg = self.queue.get(True, 0.1)  # 从队列抓取数据
                self.messages.add(msg)  # 添加消息
            except Exception as e:  # 处理异常
                pass
                # print(e)
            if len(self.messages) > 0 and random.random() < 0.95:  # Arbitrary!
                msg = random.choice(list(self.messages))  # 随机抓取消息发送
                self.messages.remove(msg)  # 删除消息
            else:
                msg = None
            return msg
    

    再来一个是记录类

    #  InstanceRecord本地记录类,主要记录追随者、领导者最高编号的协议
    from PaxosLeaderProtocol import PaxosLeaderProtocol
    
    
    class InstanceRecord:
        def __init__(self):
            self.protocols = {}
            self.highestID = (-1, -1)  # (port,count)
            self.value = None
    
        def addProtocol(self, protocol):
            self.protocols[protocol.proposalID] = protocol
            #
            if protocol.proposalID[1] > self.highestID[1] or (
                    protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
                self.highestID = protocol.proposalID  # 取得编号最大的协议
    
        def getProtocol(self, protocolID):
            return self.protocols[protocolID]
    
        def cleanProtocols(self):
            keys = self.protocols.keys()
            for k in keys:
                protocol = self.protocols[k]
                if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
                    print("删除协议")
                    del self.protocols[k]
    

    下面就是Acceptor的实现:

    # 追随者
    from MessagePump import MessagePump
    from Message import Message
    from InstanceRecord import InstanceRecord
    from PaxosAcceptorProtocol import PaxosAcceptorProtocol
    
    class PaxosAcceptor:
        def __init__(self, port, leaders):
            self.port = port
            self.leaders = leaders
            self.instances = {}  # 接口列表
            self.msgPump = MessagePump(self, self.port)  # 消息传递器
            self.failed = False
    
        # 开始消息传送
        def start(self):
            self.msgPump.start()
    
        # 停止
        def stop(self):
            self.msgPump.doAbort()
    
        # 失败
        def fail(self):
            self.failed = True
    
        def recover(self):
            self.failed = False
    
        # 发送消息
        def sendMessage(self, message):
            self.msgPump.sendMessage(message)
    
        # 收消息,只收取为提议的消息
        def recvMessage(self, message):
            if message == None:
                return
            if self.failed:  # 失败状态不收取消息
                return
    
            if message.command == Message.MSG_PROPOSE:  # 判断消息是否为提议
                if message.instanceID not in self.instances:
                    record = InstanceRecord()  # 记录器
                    self.instances[message.instanceID] = record
                protocol = PaxosAcceptorProtocol(self)  # 创建协议
                protocol.recvProposal(message)  # 收取消息
                self.instances[message.instanceID].addProtocol(protocol)
            else:
                self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
    
        # 通知客户端,
        def notifyClient(self, protocol, message):
            if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED:  # 提议被接受,通知
                self.instances[protocol.instanceID].value = message.value  # 储存信息
                print(u"协议被客户端接受 %s" % message.value)
    
        # 获取最高同意的建议
        def getHighestAgreedProposal(self, instance):
            return self.instances[instance].highestID  # (port,count)
    
        # 获取接口数据
        def getInstanceValue(self, instance):
            return self.instances[instance].value
    
    

    那再看下AcceptorProtocol的实现:

    from Message import Message
    
    
    class PaxosAcceptorProtocol(object):
        # State variables
        STATE_UNDEFINED = -1  # 协议没有定义的情况0
        STATE_PROPOSAL_RECEIVED = 0  # 收到消息
        STATE_PROPOSAL_REJECTED = 1  # 拒绝链接
        STATE_PROPOSAL_AGREED = 2  # 同意链接
        STATE_PROPOSAL_ACCEPTED = 3  # 同意请求
        STATE_PROPOSAL_UNACCEPTED = 4  # 拒绝请求
    
        def __init__(self, client):
            self.client = client
            self.state = PaxosAcceptorProtocol.STATE_UNDEFINED
    
        # 收取,只处理协议类型的消息
        def recvProposal(self, message):
    
            if message.command == Message.MSG_PROPOSE:  # 协议
                self.proposalID = message.proposalID
                self.instanceID = message.instanceID
                (port, count) = self.client.getHighestAgreedProposal(message.instanceID)  # 端口,协议内容的最高编号
                # 检测编号处理消息协议
                # 判断协议是否最高 
                if count < self.proposalID[1] or (count == self.proposalID[1] and port < self.proposalID[0]):
                    self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED  # 协议同意
                    print("同意协议:%s, %s " % (message.instanceID, message.value))
                    value = self.client.getInstanceValue(message.instanceID)
                    msg = Message(Message.MSG_ACCEPTOR_AGREE)  # 同意协议
                    msg.copyAsReply(message)
                    msg.value = value
                    msg.sequence = (port, count)
                    self.client.sendMessage(msg)  # 发送消息
                else:  # 不再接受比最高协议小的提议
                    self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
                return self.proposalID
            else:
                # 错误重试
                pass
    
        # 过度
        def doTransition(self, message):  # 如果当前协议状态是接受连接,消息类型是接受
            if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
                self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED  # 接收协议
                msg = Message(Message.MSG_ACCEPTOR_ACCEPT)  # 创造消息
                msg.copyAsReply(message)  # 拷贝并回复
                for l in self.client.leaders:
                    msg.to = l
                    self.client.sendMessage(msg)  # 给领导发送消息
                self.notifyClient(message)  # 通知自己
                return True
            raise Exception("并非预期的状态和命令")
    
        # 通知 自己客户端
        def notifyClient(self, message):
            self.client.notifyClient(self, message)
    

    接着看下Leader和LeaderProtocol实现:

    # 领导者
    
    import threading
    import Queue
    import time
    from Message import Message
    from MessagePump import MessagePump
    from InstanceRecord import InstanceRecord
    from PaxosLeaderProtocol import PaxosLeaderProtocol
    
    
    class PaxosLeader:
        # 定时监听
        class HeartbeatListener(threading.Thread):
            def __init__(self, leader):
                self.leader = leader
                self.queue = Queue.Queue()  # 消息队列
                self.abort = False
                threading.Thread.__init__(self)
    
            def newHB(self, message):
                self.queue.put(message)
    
            def doAbort(self):
                self.abort = True
    
            def run(self):  # 读取消息
                elapsed = 0
                while not self.abort:
                    s = time.time()
                    try:
                        hb = self.queue.get(True, 2)
                        # 设定规则,谁的端口号比较高,谁就是领导
                        if hb.source > self.leader.port:
                            self.leader.setPrimary(False)
                    except:
                        self.leader.setPrimary(True)
    
        # 定时发送
        class HeartbeatSender(threading.Thread):
            def __init__(self, leader):
                threading.Thread.__init__(self)
                self.leader = leader
                self.abort = False
    
            def doAbort(self):
                self.abort = True
    
            def run(self):
                while not self.abort:
                    time.sleep(1)
                    if self.leader.isPrimary:
                        msg = Message(Message.MSG_HEARTBEAT)
                        msg.source = self.leader.port
                        for leader in self.leader.leaders:
                            msg.to = leader
                            self.leader.sendMessage(msg)
    
        def __init__(self, port, leaders=None, acceptors=None):
    
            self.port = port
            if leaders == None:
                self.leaders = []
            else:
                self.leaders = leaders
    
            if acceptors == None:
                self.acceptors = []
            else:
                self.acceptors = acceptors
    
            self.group = self.leaders + self.acceptors  # 集合合并
            self.isPrimary = False  # 自身是不是领导
            self.proposalCount = 0
            self.msgPump = MessagePump(self, port)  # 消息传送器
            self.instances = {}
            self.hbListener = PaxosLeader.HeartbeatListener(self)  # 监听
            self.hbSender = PaxosLeader.HeartbeatSender(self)  # 发送心跳
            self.highestInstance = -1  # 协议状态
            self.stoped = True  # 是否正在运行
            self.lasttime = time.time()  # 最后一次时间
    
        def sendMessage(self, message):
            self.msgPump.sendMessage(message)
    
        def start(self):
            self.hbSender.start()
            self.hbListener.start()
            self.msgPump.start()
            self.stoped = False
    
        def stop(self):
            self.hbSender.doAbort()
            self.hbListener.doAbort()
            self.msgPump.doAbort()
            self.stoped = True
    
        def setPrimary(self, primary):  # 设置领导者
            if self.isPrimary != primary:
                # Only print if something's changed
                if primary:
                    print(u"我是leader%s" % self.port)
                else:
                    print(u"我不是leader%s" % self.port)
            self.isPrimary = primary
    
        # 获取所有的领导下面的追随者
        def getGroup(self):
            return self.group
    
        def getLeaders(self):
            return self.leaders
    
        def getAcceptors(self):
            return self.acceptors
    
        # 必须获得1/2以上的人支持
        def getQuorumSize(self):
            return (len(self.getAcceptors()) / 2) + 1
    
        def getInstanceValue(self, instanceID):
            if instanceID in self.instances:
                return self.instances[instanceID].value
            return None
    
        def getHistory(self):  # 历史记录
            return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]
    
        # 抓取同意的数量
        def getNumAccpted(self):
            return len([v for v in self.getHistory() if v != None])
    
        # 抓取空白时间处理下事务
        def findAndFillGaps(self):
            for i in range(1, self.highestInstance):
                if self.getInstanceValue(i) == None:
                    print("填充空白", i)
                    self.newProposal(0, i)
            self.lasttime = time.time()
    
        # 采集无用信息
        def garbageCollect(self):
            for i in self.instances:
                self.instances[i].cleanProtocols()
    
        # 通知领导
        def recvMessage(self, message):
            if self.stoped:
                return
            if message == None:
                if self.isPrimary and time.time() - self.lasttime > 15.0:
                    self.findAndFillGaps()
                    self.garbageCollect()
                return
            #处理心跳信息
            if message.command == Message.MSG_HEARTBEAT:
                self.hbListener.newHB(message)
                return True
            #处理额外的提议
            if message.command == Message.MSG_EXT_PROPOSE:
                print("额外的协议", self.port, self.highestInstance)
                if self.isPrimary:
                    self.newProposal(message.value)
                return True
    
            if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
                self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
    
            if message.command == Message.MSG_ACCEPTOR_ACCEPT:
                if message.instanceID not in self.instances:
                    self.instances[message.instanceID] = InstanceRecord()
                record = self.instances[message.instanceID]
                if message.proposalID not in record.protocols:#创建协议
                    protocol = PaxosLeaderProtocol(self)
                    protocol.state = PaxosLeaderProtocol.STATE_AGREED
                    protocol.proposalID = message.proposalID
                    protocol.instanceID = message.instanceID
                    protocol.value = message.value
                    record.addProtocol(protocol)
                else:
                    protocol = record.getProtocol(message.proposalID)
    
                protocol.doTransition(message)
    
            return True
    
        # 新建提议
        def newProposal(self, value, instance=None):
            protocol = PaxosLeaderProtocol(self)
            if instance == None:  # 创建协议标号
                self.highestInstance += 1
                instanceID = self.highestInstance
            else:
                instanceID = instance
            self.proposalCount += 1
            id = (self.port, self.proposalCount)
            if instanceID in self.instances:
                record = self.instances[instanceID]
            else:
                record = InstanceRecord()
                self.instances[instanceID] = record
            protocol.propose(value, id, instanceID)
            record.addProtocol(protocol)
    
        def notifyLeader(self, protocol, message):
            if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
                print("协议接口%s被%s接受" % (message.instanceID, message.value))
                self.instances[message.instanceID].accepted = True
                self.instances[message.instanceID].value = message.value
                self.highestInstance = max(message.instanceID, self.highestInstance)
                return
            if protocol.state == PaxosLeaderProtocol.STATE_REJECTED:  # 重新尝试
                self.proposalCount = max(self.proposalCount, message.highestPID[1])
                self.newProposal(message.value)
                return True
            if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
                pass
    
    

    LeaderProtocol实现:

    from Message import Message
    
    
    class PaxosLeaderProtocol(object):
        STATE_UNDEFINED = -1  # 协议没有定义的情况0
        STATE_PROPOSED = 0  # 协议消息
        STATE_REJECTED = 1  # 拒绝链接
        STATE_AGREED = 2  # 同意链接
        STATE_ACCEPTED = 3  # 同意请求
        STATE_UNACCEPTED = 4  # 拒绝请求
    
        def __init__(self, leader):
            self.leader = leader
            self.state = PaxosLeaderProtocol.STATE_UNDEFINED
            self.proposalID = (-1, -1)
            self.agreecount, self.acceptcount = (0, 0)
            self.rejectcount, self.unacceptcount = (0, 0)
            self.instanceID = -1
            self.highestseen = (0, 0)
    
        # 提议
        def propose(self, value, pID, instanceID):
            self.proposalID = pID
            self.value = value
            self.instanceID = instanceID
            message = Message(Message.MSG_PROPOSE)
            message.proposalID = pID
            message.instanceID = instanceID
            message.value = value
            for server in self.leader.getAcceptors():
                message.to = server
                self.leader.sendMessage(message)
            self.state = PaxosLeaderProtocol.STATE_PROPOSED
    
            return self.proposalID
    
        # 過度
        def doTransition(self, message):
            # 根據狀態運行協議
            if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
                if message.command == Message.MSG_ACCEPTOR_AGREE:
                    self.agreecount += 1
                    if self.agreecount >= self.leader.getQuorumSize():  # 选举
                        print(u"达成协议的法定人数,最后的价值回答是:%s" % message.value)
                        if message.value != None:
                            if message.sequence[0] > self.highestseen[0] or (
                                    message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
                                1]):
                                self.value = message.value
                                self.highestseen = message.sequence
    
                            self.state = PaxosLeaderProtocol.STATE_AGREED  # 同意更新
                            # 发送同意消息
                            msg = Message(Message.MSG_ACCEPT)
                            msg.copyAsReply(message)
                            msg.value = self.value
                            msg.leaderID = msg.to
                            for server in self.leader.getAcceptors():
                                msg.to = server
                                self.leader.sendMessage(msg)
                            self.leader.notifyLeader(self, message)
                        return True
    
                    if message.command == Message.MSG_ACCEPTOR_REJECT:
                        self.rejectcount += 1
                        if self.rejectcount >= self.leader.getQuorumSize():
                            self.state = PaxosLeaderProtocol.STATE_REJECTED
                            self.leader.notifyLeader(self, message)
                        return True
    
            if self.state == PaxosLeaderProtocol.STATE_AGREED:
                if message.command == Message.MSG_ACCEPTOR_ACCEPT:  # 同意协议
                    self.acceptcount += 1
                    if self.acceptcount >= self.leader.getQuorumSize():
                        self.state = PaxosLeaderProtocol.STATE_ACCEPTED  # 接受
                        self.leader.notifyLeader(self, message)
                if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
                    self.unacceptcount += 1
                    if self.unacceptcount >= self.leader.getQuorumSize():
                        self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
                        self.leader.notifyLeader(self, message)
    

    测试模块:

    import socket, pickle, time
    from Message import Message
    from PaxosAcceptor import PaxosAcceptor
    from PaxosLeader import PaxosLeader
    
    if __name__ == "__main__":
        # 设定5个客户端
        numclients = 5
        clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
        # 两个领导者
        leader1 = PaxosLeader(54321, [54322], [c.port for c in clients])
        leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])
    
        # 开启领导者与追随者
        leader1.start()
        leader1.setPrimary(True)
        leader2.setPrimary(True)
        leader2.start()
        for c in clients:
            c.start()
    
        # 破坏,客户端不链接
        clients[0].fail()
        clients[1].fail()
    
        # 通信
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # udp协议
        start = time.time()
        for i in range(1000):
            m = Message(Message.MSG_EXT_PROPOSE)  # 消息
            m.value = 0 + i  # 消息参数
            m.to = 54322  # 设置传递的端口
            bytes = pickle.dumps(m)  # 提取的二进制数据
            s.sendto(bytes, ("localhost", m.to))  # 发送消息
    
        while leader2.getNumAccpted() < 999:
            print("休眠的这一秒 %d " % leader2.getNumAccpted())
            time.sleep(1)
    
        print(u"休眠10秒")
        time.sleep(10)
        print(u"停止leaders")
        leader1.stop()
        leader2.stop()
        print(u"停止客户端")
        for c in clients:
            c.stop()
    
        print(u"leader1历史纪录")
        print(leader1.getHistory())
        print(u"leader2历史纪录")
        print(leader2.getHistory())
    
        end = time.time()
        print(u"一共用了%f秒" % (end - start))
    

    代码确实比较长,看起来有些困难,最好还是在pycharm上看这个逻辑,可以快速定位参数指向,如果有不对的地方欢迎指正

    展开全文
  • 在看这篇文章之前,如果之前对Paxos算法没有了解的童鞋可以看下这篇文章:Paxos算法原理与推导,相信了解Paxos算法后再来通过源码看算法实现应该会很酸爽。 Paxos算法中最重要的两个角色是Proposer和Acceptor。当然...

    可以进入我的博客查看原文。

    这篇主要来分析Paxos算法实现的部分,我想这应该也是读者最感兴趣的。在看这篇文章之前,如果之前对Paxos算法没有了解的童鞋可以看下这篇文章:Paxos算法原理与推导,相信了解Paxos算法后再来通过源码看算法实现应该会很酸爽。

    Paxos算法中最重要的两个角色是ProposerAcceptor。当然Leaner也很重要,特别是在PhxPaxos的实现中,Leaner具有重要的功能。但是因为《Paxos Made Simple》论文中主要还是Proposer和Acceptor,因此这篇文章还是以这两个角色为主,通过源码来回顾论文中Paxos算法的过程,同时也看看工程实现和论文的描述有什么区别。

    这里先贴出Paxos算法的过程,方便大家对照接下来的工程实现。

    • Prepare阶段:

      (a) Proposer选择一个提案编号N,然后向半数以上的Acceptor发送编号为N的Prepare请求。

      (b) 如果一个Acceptor收到一个编号为N的Prepare请求,且N大于该Acceptor已经响应过的所有Prepare请求的编号,那么它就会将它已经接受过的编号最大的提案(如果有的话)作为响应反馈给Proposer,同时该Acceptor承诺不再接受任何编号小于N的提案。

    • Accept阶段:

      (a) 如果Proposer收到半数以上Acceptor对其发出的编号为N的Prepare请求的响应,那么它就会发送一个针对[N,V]提案的Accept请求给半数以上的Acceptor。注意:V就是收到的响应中编号最大的提案的value,如果响应中不包含任何提案,那么V就由Proposer自己决定。

      (b) 如果Acceptor收到一个针对编号为N的提案的Accept请求,只要该Acceptor没有对编号大于N的Prepare请求做出过响应,它就接受该提案。

    Proposer

    因为Proposer需要维护或者说记录一些状态信息,包括自己的提案编号ProposalID、提出的Value、其他Proposer提出的最大的提案编号HighestOtherProposalID、Acceptor已经接受过的编号最大的提案的值等,因此这里专门有一个ProposerState类来管理这些信息。同样Acceptor也有一个AcceptorState类来管理Acceptor相关的信息。

    先来看下ProposerState的定义:

    class ProposerState
    {
    public:
        ProposerState(const Config * poConfig);
        ~ProposerState();
    
        void Init();
    
        void SetStartProposalID(const uint64_t llProposalID);
    
        void NewPrepare();
    
        void AddPreAcceptValue(const BallotNumber & oOtherPreAcceptBallot, const std::string & sOtherPreAcceptValue);
    
        /
    
        const uint64_t GetProposalID();
    
        const std::string & GetValue();
    
        void SetValue(const std::string & sValue);
    
        void SetOtherProposalID(const uint64_t llOtherProposalID);
    
        void ResetHighestOtherPreAcceptBallot();
    
    public:
        uint64_t m_llProposalID;
        uint64_t m_llHighestOtherProposalID;
        std::string m_sValue;
    
        BallotNumber m_oHighestOtherPreAcceptBallot;
    
        Config * m_poConfig;
    };复制代码

    基本都是对这些信息的set跟get,很容易理解。直接来看Proposer类的定义:

    class Proposer : public Base
    {
    public:
        Proposer(
                const Config * poConfig, 
                const MsgTransport * poMsgTransport,
                const Instance * poInstance,
                const Learner * poLearner,
                const IOLoop * poIOLoop);
        ~Proposer();
    
        //设置起始的ProposalID
        void SetStartProposalID(const uint64_t llProposalID);
    
        //初始化新的一轮Paxos过程,每一轮叫做一个Paxos Instance,每一轮确定一个值
        virtual void InitForNewPaxosInstance();
    
        //Proposer发起提案的入口函数。参数sValue即Proposer自己想提出的value,当然最终提出的value不一定是这个,需要根据Acceptor再Prepare阶段的回复来确定
        int NewValue(const std::string & sValue);
    
        //判断Proposer是否处于Prepare阶段或Accept阶段
        bool IsWorking();
    
        /
    
        //对应Paxos算法中的Prepare阶段
        void Prepare(const bool bNeedNewBallot = true);
    
        //Prepare阶段等待Acceptor的回复,统计投票并确定是否进入Accept阶段
        void OnPrepareReply(const PaxosMsg & oPaxosMsg);
    
        //Prepare阶段被拒绝
        void OnExpiredPrepareReply(const PaxosMsg & oPaxosMsg);
    
        //对应Paxos算法中的Accept阶段
        void Accept();
    
        //Accept阶段等待Acceptor的回复,统计投票并确定值(Value)是否被选定(Chosen)
        void OnAcceptReply(const PaxosMsg & oPaxosMsg);
    
        //Accept阶段被拒绝
        void OnExpiredAcceptReply(const PaxosMsg & oPaxosMsg);
    
        //Prepare阶段超时
        void OnPrepareTimeout();
    
        //Accept阶段超时
        void OnAcceptTimeout();
    
        //退出Prepare阶段
        void ExitPrepare();
    
        //退出Accept阶段
        void ExitAccept();
    
        //取消跳过Prepare阶段,也就是必须先Prepare阶段再Accept阶段
        void CancelSkipPrepare();
    
        /
    
        void AddPrepareTimer(const int iTimeoutMs = 0);
    
        void AddAcceptTimer(const int iTimeoutMs = 0);
    
    public:
        ProposerState m_oProposerState;
        MsgCounter m_oMsgCounter;
        Learner * m_poLearner;
    
        bool m_bIsPreparing;
        bool m_bIsAccepting;
    
        IOLoop * m_poIOLoop;
    
        uint32_t m_iPrepareTimerID;
        int m_iLastPrepareTimeoutMs;
        uint32_t m_iAcceptTimerID;
        int m_iLastAcceptTimeoutMs;
        uint64_t m_llTimeoutInstanceID;
    
        bool m_bCanSkipPrepare;
    
        bool m_bWasRejectBySomeone;
    
        TimeStat m_oTimeStat;
    };复制代码

    NewValue

    下面就从NewValue方法入手:

    int Proposer :: NewValue(const std::string & sValue)
    {
        BP->GetProposerBP()->NewProposal(sValue);
    
        if (m_oProposerState.GetValue().size() == 0)
        {
            m_oProposerState.SetValue(sValue);
        }
    
        m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
        m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
    
        //如果可以跳过Prepare阶段并且没有被Acceptor拒绝过,则直接进入Accept阶段
        if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
        {
            BP->GetProposerBP()->NewProposalSkipPrepare();
    
            PLGHead("skip prepare, directly start accept");
            Accept();
        }
    
        //否则先进入Prepare阶段
        else
        {
            //if not reject by someone, no need to increase ballot
            Prepare(m_bWasRejectBySomeone);
        }
    
        return 0;
    }复制代码

    这里可以直接进入Accept阶段的前提是该Proposer已经发起过Prepare请求且得到半数以上的同意(即通过了Prepare阶段),并且没有被任何Acceptor拒绝(说明没有Acceptor响应过比该Proposer的提案编号更高的提案)。那么,什么情况下可以跳过Prepare请求呢,这里应该对应的是选出一个master的情况?相当于raft里的leader?

    Prepare

    接下来直接看Prepare阶段:

    void Proposer :: Prepare(const bool bNeedNewBallot)
    {
        PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
                GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
                m_oProposerState.GetValue().size());
    
        BP->GetProposerBP()->Prepare();
        m_oTimeStat.Point();
    
        ExitAccept();
    
        //表明Proposer正处于Prepare阶段
        m_bIsPreparing = true;
    
        //不能跳过Prepare阶段
        m_bCanSkipPrepare = false;
    
        //目前还未被任意一个Acceptor拒绝
        m_bWasRejectBySomeone = false;
    
        m_oProposerState.ResetHighestOtherPreAcceptBallot();
    
        //如果需要产生新的投票,就调用NewPrepare产生新的ProposalID,新的ProposalID为当前已知的最大ProposalID+1
        if (bNeedNewBallot)
        {
            m_oProposerState.NewPrepare();
        }
    
        PaxosMsg oPaxosMsg;
    
        //设置Prepare消息的各个字段
        oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
        oPaxosMsg.set_instanceid(GetInstanceID());
        oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
        oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
    
        //MsgCount是专门用来统计票数的,根据计算的结果确定是否通过Prepare阶段或者Accept阶段
        m_oMsgCounter.StartNewRound();
    
        //Prepare超时定时器
        AddPrepareTimer();
    
        PLGHead("END OK");
    
        //将Prepare消息发送到各个节点
        BroadcastMessage(oPaxosMsg);
    }复制代码

    Proposer在Prepare阶段主要做了这么几件事:

    1. 重置各个状态位,表明当前正处于Prepare阶段。
    2. 获取提案编号ProposalID。当bNeedNewBallot为true时需要将ProposalID+1。否则沿用之前的ProposalID。bNeedNewBallot是在NewValue中调用Prepare方法时传入的m_bWasRejectBySomeone参数。也就是如果之前没有被任何Acceptor拒绝(说明还没有明确出现更大的ProposalID),则不需要获取新的ProposalID。对应的场景是Prepare阶段超时了,在超时时间内没有收到过半Acceptor同意的消息,因此需要重新执行Prepare阶段,此时只需要沿用原来的ProposalID即可。
    3. 发送Prepare请求。该请求PaxosMsg是Protocol Buffer定义的一个message,包含MsgType、InstanceID、NodeID、ProposalID等字段。在BroadcastMessage(oPaxosMsg)中还会将oPaxosMsg序列化后才发送出去。

    PaxosMsg的定义如下,Prepare和Accept阶段Proposer和Acceptor的所有消息都用PaxosMsg来表示:

    message PaxosMsg
    {
        required int32 MsgType = 1;
        optional uint64 InstanceID = 2;
        optional uint64 NodeID = 3;
        optional uint64 ProposalID = 4;
        optional uint64 ProposalNodeID = 5;
        optional bytes Value = 6;
        optional uint64 PreAcceptID = 7;
        optional uint64 PreAcceptNodeID = 8;
        optional uint64 RejectByPromiseID = 9;
        optional uint64 NowInstanceID = 10;
        optional uint64 MinChosenInstanceID = 11;
        optional uint32 LastChecksum = 12;
        optional uint32 Flag = 13;
        optional bytes SystemVariables = 14;
        optional bytes MasterVariables = 15;
    };复制代码

    OnPrepareReply

    Proposer发出Prepare请求后就开始等待Acceptor的回复。当Proposer所在节点收到PaxosPrepareReply消息后,就会调用Proposer的OnPrepareReply(oPaxosMsg),其中oPaxosMsg是Acceptor回复的消息。

    void Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
    {
        PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
                oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), 
                oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
    
        BP->GetProposerBP()->OnPrepareReply();
    
        //如果Proposer不是在Prepare阶段,则忽略该消息
        if (!m_bIsPreparing)
        {
            BP->GetProposerBP()->OnPrepareReplyButNotPreparing();
            //PLGErr("Not preparing, skip this msg");
            return;
        }
    
        //如果ProposalID不同,也忽略
        if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
        {
            BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();
            //PLGErr("ProposalID not same, skip this msg");
            return;
        }
    
        //加入一个收到的消息,用于MsgCounter统计
        m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
    
        //如果该消息不是拒绝,即Acceptor同意本次Prepare请求
        if (oPaxosMsg.rejectbypromiseid() == 0)
        {
            BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
            PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu", 
                    oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
            //加入MsgCounter用于统计投票
            m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
            //将Acceptor返回的它接受过的编号最大的提案记录下来(如果有的话),用于确定Accept阶段的Value
            m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
        }
    
        //Acceptor拒绝了Prepare请求
        else
        {
            PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
    
            //同样也要记录到MsgCounter用于统计投票
            m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
    
            //记录被Acceptor拒绝过,待会儿如果重新进入Prepare阶段的话就需要获取更大的ProposalID
            m_bWasRejectBySomeone = true;
    
            //记录下别的Proposer提出的更大的ProposalID。这样重新发起Prepare请求时才知道需要用多大的ProposalID
            m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
        }
    
    
        //本次Prepare请求通过了。也就是得到了半数以上Acceptor的同意
        if (m_oMsgCounter.IsPassedOnThisRound())
        {
            int iUseTimeMs = m_oTimeStat.Point();
            BP->GetProposerBP()->PreparePass(iUseTimeMs);
            PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
            m_bCanSkipPrepare = true;
    
            //进入Accept阶段
            Accept();
        }
    
        //本次Prepare请求没有通过
        else if (m_oMsgCounter.IsRejectedOnThisRound()
                || m_oMsgCounter.IsAllReceiveOnThisRound())
        {
            BP->GetProposerBP()->PrepareNotPass();
            PLGImp("[Not Pass] wait 30ms and restart prepare");
    
             //随机等待一段时间后重新发起Prepare请求
            AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
        }
    
        PLGHead("END");
    }复制代码

    该阶段Proposer主要做了以下事情:

    1. 判断消息是否有效。包括ProposalID是否相同,自身是否处于Prepare阶段等。因为网络是不可靠的,有些消息可能延迟很久,等收到的时候已经不需要了,所以需要做这些判断。

    2. 将收到的消息加入MsgCounter用于统计。

    3. 根据收到的消息更新自身状态。包括Acceptor承诺过的ProposalID,以及Acceptor接受过的编号最大的提案等。

    4. 根据MsgCounter统计的Acceptor投票结果决定是进入Acceptor阶段还是重新发起Prepare请求。这里如果判断需要重新发起Prepare请求的话,也不是立即进行,而是等待一段随机的时间,这样做的好处是减少不同Proposer之间的冲突,采取的策略跟raft中leader选举冲突时在一段随机的选举超时时间后重新发起选举的做法类似。

    注:这里跟Paxos算法中提案编号对应的并不是ProposalID,而是BallotNumber。BallotNumber由ProposalID和NodeID组成。还实现了运算符重载。如果ProposalID大,则BallotNumber(即提案编号)大。在ProposalID相同的情况下,NodeID大的BallotNumber大。

    Accept

    接下来Proposer就进入Accept阶段:

    void Proposer :: Accept()
    {
        PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu", 
                m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
    
        BP->GetProposerBP()->Accept();
        m_oTimeStat.Point();
    
        ExitPrepare();
        m_bIsAccepting = true;
    
        //设置Accept请求的消息内容
        PaxosMsg oPaxosMsg;
        oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
        oPaxosMsg.set_instanceid(GetInstanceID());
        oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
        oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
        oPaxosMsg.set_value(m_oProposerState.GetValue());
        oPaxosMsg.set_lastchecksum(GetLastChecksum());
    
        m_oMsgCounter.StartNewRound();
    
        AddAcceptTimer();
    
        PLGHead("END");
    
        //发给各个节点
        BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
    }复制代码

    Accept请求中PaxosMsg里的Value是这样确定的:如果Prepare阶段有Acceptor的回复中带有提案值,则该Value为所有的Acceptor的回复中,编号最大的提案的值。否则就是Proposer在最初调用NewValue时传入的值。

    OnAcceptReply

    void Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
    {
        PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
                oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), 
                oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
    
        BP->GetProposerBP()->OnAcceptReply();
    
        if (!m_bIsAccepting)
        {
            //PLGErr("Not proposing, skip this msg");
            BP->GetProposerBP()->OnAcceptReplyButNotAccepting();
            return;
        }
    
        if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
        {
            //PLGErr("ProposalID not same, skip this msg");
            BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();
            return;
        }
    
        m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
    
        if (oPaxosMsg.rejectbypromiseid() == 0)
        {
            PLGDebug("[Accept]");
            m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
        }
        else
        {
            PLGDebug("[Reject]");
            m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
    
            m_bWasRejectBySomeone = true;
    
            m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
        }
    
        if (m_oMsgCounter.IsPassedOnThisRound())
        {
            int iUseTimeMs = m_oTimeStat.Point();
            BP->GetProposerBP()->AcceptPass(iUseTimeMs);
            PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
            ExitAccept();
    
            //让Leaner学习被选定(Chosen)的值
            m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
        }
        else if (m_oMsgCounter.IsRejectedOnThisRound()
                || m_oMsgCounter.IsAllReceiveOnThisRound())
        {
            BP->GetProposerBP()->AcceptNotPass();
            PLGImp("[Not pass] wait 30ms and Restart prepare");
            AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
        }
    
        PLGHead("END");
    }复制代码

    这里跟OnPrepareReply的过程基本一致,因此就不加太多注释了。比较大的区别在于最后如果过半的Acceptor接受了该Accept请求,则说明该Value被选定(Chosen)了,就发送消息,让每个节点上的Learner学习该Value。因为Leaner不是本文的重点,这里就不详细介绍了。

    Acceptor

    Acceptor的逻辑比Proposer更简单。同样先看它的定义:

    class Acceptor : public Base
    {
    public:
        Acceptor(
                const Config * poConfig, 
                const MsgTransport * poMsgTransport, 
                const Instance * poInstance,
                const LogStorage * poLogStorage);
        ~Acceptor();
    
        virtual void InitForNewPaxosInstance();
    
        int Init();
    
        AcceptorState * GetAcceptorState();
    
        //Prepare阶段回复Prepare请求
        int OnPrepare(const PaxosMsg & oPaxosMsg);
    
        //Accept阶段回复Accept请求
        void OnAccept(const PaxosMsg & oPaxosMsg);
    
    //private:
        AcceptorState m_oAcceptorState;
    };复制代码

    OnPrepare

    OnPrepare用于处理收到的Prepare请求,逻辑如下:

    int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
    {
        PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
                oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
    
        BP->GetAcceptorBP()->OnPrepare();
    
        PaxosMsg oReplyPaxosMsg;
        oReplyPaxosMsg.set_instanceid(GetInstanceID());
        oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
        oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
        oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
    
        //构造接收到的Prepare请求里的提案编号
        BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
    
        //提案编号大于承诺过的提案编号
        if (oBallot >= m_oAcceptorState.GetPromiseBallot())
        {
            PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
                    "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
                    m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                    m_oAcceptorState.GetPromiseBallot().m_llNodeID,
                    m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
                    m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
    
            //返回之前接受过的提案的编号
            oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
            oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
            //如果接受过的提案编号大于0(<=0说明没有接受过提案),则设置接受过的提案的Value
            if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
            {
                oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
            }
    
            //更新承诺的提案编号为新的提案编号(因为新的提案编号更大)
            m_oAcceptorState.SetPromiseBallot(oBallot);
    
            //信息持久化
            int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
            if (ret != 0)
            {
                BP->GetAcceptorBP()->OnPreparePersistFail();
                PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                        GetInstanceID(), ret);
    
                return -1;
            }
    
            BP->GetAcceptorBP()->OnPreparePass();
        }
    
        //提案编号小于承诺过的提案编号,需要拒绝
        else
        {
            BP->GetAcceptorBP()->OnPrepareReject();
    
            PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", 
                    m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                    m_oAcceptorState.GetPromiseBallot().m_llNodeID);
    
            //拒绝该Prepare请求,并返回承诺过的ProposalID      
            oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
        }
    
        nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
    
        PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
                GetInstanceID(), oPaxosMsg.nodeid());;
    
        //向发出Prepare请求的Proposer回复消息
        SendMessage(iReplyNodeID, oReplyPaxosMsg);
    
        return 0;
    }复制代码

    OnAccept

    再来看看OnAccept:

    void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
    {
        PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
                oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
    
        BP->GetAcceptorBP()->OnAccept();
    
        PaxosMsg oReplyPaxosMsg;
        oReplyPaxosMsg.set_instanceid(GetInstanceID());
        oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
        oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
        oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
    
        BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
    
        //提案编号不小于承诺过的提案编号(注意:这里是“>=”,而再OnPrepare中是“>”,可以先思考下为什么),需要接受该提案
        if (oBallot >= m_oAcceptorState.GetPromiseBallot())
        {
            PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
                    "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
                    m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                    m_oAcceptorState.GetPromiseBallot().m_llNodeID,
                    m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
                    m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
    
            //更新承诺的提案编号;接受的提案编号、提案值
            m_oAcceptorState.SetPromiseBallot(oBallot);
            m_oAcceptorState.SetAcceptedBallot(oBallot);
            m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
    
            //信息持久化
            int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
            if (ret != 0)
            {
                BP->GetAcceptorBP()->OnAcceptPersistFail();
    
                PLGErr("Persist fail, Now.InstanceID %lu ret %d",
                        GetInstanceID(), ret);
    
                return;
            }
    
            BP->GetAcceptorBP()->OnAcceptPass();
        }
    
        //需要拒绝该提案
        else
        {
            BP->GetAcceptorBP()->OnAcceptReject();
    
            PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", 
                    m_oAcceptorState.GetPromiseBallot().m_llProposalID, 
                    m_oAcceptorState.GetPromiseBallot().m_llNodeID);
    
            //拒绝的消息中附上承诺过的ProposalID
            oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
        }
    
        nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
    
        PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
                GetInstanceID(), oPaxosMsg.nodeid());
    
        //将响应发送给Proposer
        SendMessage(iReplyNodeID, oReplyPaxosMsg);
    }复制代码

    结语

    通过阅读源码可以发现,整个PhxPaxos完全基于Lamport的《Paxos Made Simple》进行工程化,没有进行任何算法变种。这对于学习Paxos算法的人来说真的是一笔宝贵的财富,所以如果对Paxos算法感兴趣,应该深入地去阅读PhxPaxos的源码,相信看完后大家对Paxos会有更深的理解。同时我们也发现,在工程实现上还是有很多细节需要注意,这比单纯理解算法要难得多。

    可以进入我的博客查看原文。

    欢迎关注公众号: FullStackPlan 获取更多干货

    展开全文
  • 前言 写完 paxos的直观解释 ...既然Talk is cheap, 那么就Show me the code, 这次我们把教程中描述的内容直接用代码实现出来, 希望能覆盖到教程中的涉及的每个细节. 帮助大家理解paxos的运行机制. 这是一个基于paxos,

    本文链接: https://blog.openacid.com/algo/paxoskv/
    200行代码实现基于paxos的kv存储

    前言

    写完 paxos的直观解释 之后, 网友都说疗效甚好, 但是也会对这篇教程中一些环节提出疑问(有疑问说明真的看懂了 🤔 ) , 例如怎么把只能确定一个值的paxos应用到实际场景中.

    既然Talk is cheap, 那么就Show me the code, 这次我们把教程中描述的内容直接用代码实现出来, 希望能覆盖到教程中的涉及的每个细节. 帮助大家理解paxos的运行机制.

    这是一个基于paxos, 200行代码的kv存储系统的简单实现, 作为 paxos的直观解释 这篇教程中的代码示例部分. Paxos的原理本文不再介绍了, 本文提到的数据结构使用protobuf定义, 网络部分使用grpc定义. 另外200行go代码实现paxos存储.

    文中的代码可能做了简化, 完整代码实现在 paxoskv 这个项目中(naive分支).

    运行和使用

    🚀

    跑一下:

    git clone https://github.com/openacid/paxoskv.git
    cd paxoskv
    go test -v ./...
    

    这个项目中除了paxos实现, 用3个test case描述了3个paxos运行的例子,

    测试代码描述了几个paxos运行例子的行为, 运行测试可以确认paxos的实现符合预期.

    本文中 protobuf 的数据结构定义如下:

    service PaxosKV {
        rpc Prepare (Proposer) returns (Acceptor) {}
        rpc Accept (Proposer) returns (Acceptor) {}
    }
    message BallotNum {
        int64 N          = 1;
        int64 ProposerId = 2;
    }
    message Value {
        int64 Vi64 = 1;
    }
    message PaxosInstanceId {
        string Key = 1;
        int64  Ver = 2;
    }
    message Acceptor {
        BallotNum LastBal = 1;
        Value     Val     = 2;
        BallotNum VBal    = 3;
    }
    message Proposer {
        PaxosInstanceId Id  = 1;
        BallotNum       Bal = 2;
        Value           Val = 3;
    }
    

    以及主要的函数实现:

    // struct KVServer
    Storage : map[string]Versions
    func Accept(c context.Context, r *Proposer) (*Acceptor, error)
    func Prepare(c context.Context, r *Proposer) (*Acceptor, error)
    func getLockedVersion(id *PaxosInstanceId) *Version
    
    // struct Proposer
    func Phase1(acceptorIds []int64, quorum int) (*Value, *BallotNum, error)
    func Phase2(acceptorIds []int64, quorum int) (*BallotNum, error)
    func RunPaxos(acceptorIds []int64, val *Value) (*Value)
    func rpcToAll(acceptorIds []int64, action string) ([]*Acceptor)
    
    func ServeAcceptors(acceptorIds []int64) ([]*grpc.Server)
    

    从头实现paxoskv

    Paxos 相关的数据结构

    在这个例子中我们的数据结构和服务框架使用 protobufgrpc 实现, 首先是最底层的paxos数据结构:

    Proposer 和 Acceptor

    slide-27 中我们介绍了1个 Acceptor 所需的字段:

    在存储端(Acceptor)也有几个概念:

    • last_rnd 是Acceptor记住的最后一次进行写前读取的Proposer(客户端)是谁, 以此来决定谁可以在后面真正把一个值写到存储中.
    • v 是最后被写入的值.
    • vrnd 跟v是一对, 它记录了在哪个Round中v被写入了.

    img

    原文中这些名词是参考了 paxos made simple 中的名称, 但在 Leslie Lamport 后面的几篇paper中都换了名称, 为了后续方便, 在paxoskv的代码实现中也做了相应的替换:

    rnd      ==> Bal   // 每一轮paxos的编号, BallotNum
    vrnd     ==> VBal  // 在哪个Ballot中v被Acceptor 接受(voted)
    last_rnd ==> LastBal
    

    Proposer的字段也很简单, 它需要记录:

    • 当前的ballot number: Bal,
    • 以及它选择在Phase2运行的值: Val (slide-29).

    于是在这个项目中用protobuf定义这两个角色的数据结构, 如代码 paxoskv.proto 中的声明, 如下:

    message Acceptor {
      BallotNum LastBal = 1;
      Value     Val = 2;
      BallotNum VBal = 3;
    }
    
    message Proposer {
      PaxosInstanceId Id = 1;
    
      BallotNum Bal = 2;
      Value     Val = 3;
    }
    

    其中Proposer还需要一个PaxosInstanceId, 来标识当前的paxos实例为哪个key的哪个version在做决定, paxos made simple 中只描述了一个paxos实例的算法(对应一个key的一次修改), 要实现多次修改, 就需要增加这个字段来区分不同的paxos实例:

    message PaxosInstanceId {
      string Key = 1;
      int64  Ver = 2;
    }
    

    paxoskv.proto 还定义了一个BallotNum, 因为要保证全系统内的BallotNum都有序且不重复, 一般的做法就是用一个本地单调递增的整数, 和一个全局唯一的id组合起来实现:

    message BallotNum {
        int64 N = 1;
        int64 ProposerId = 2;
    }
    

    定义RPC消息结构

    RPC消息定义了Proposer和Acceptor之间的通讯.

    在一个paxos系统中, 至少要有4个消息:

    • Phase1的 Prepare-request, Prepare-reply,
    • 和Phase2的 Accept-request, Accept-reply,

    slide-28 所描述的(原文中使用rnd, 这里使用Bal, 都是同一个概念):

    Phase-1(Prepare):

    request:
        Bal: int
    
    reply:
        LastBal: int
        Val:     string
        VBal:    int
    

    Phase-2(Accept):

    request:
        Bal: int
        Val:   string
    
    reply:
        LastBal: int
    

    在Prepare-request或Accept-request中, 发送的是一部分或全部的Proposer的字段, 因此我们在代码中:

    • 直接把Proposer的结构体作为request的结构体.
    • 同样把Acceptor的结构体作为reply的结构体.

    在使用的时候只使用其中几个字段. 对应我们的 RPC 服务 PaxosKV 定义如下:

    service PaxosKV {
        rpc Prepare (Proposer) returns (Acceptor) {}
        rpc Accept (Proposer) returns (Acceptor) {}
    }
    

    使用protobuf和grpc生成服务框架

    🚀

    protobuf可以将paxoskv.proto直接生成go代码( 代码库中已经包含了生成好的代码: paxoskv.pb.go, 只有修改paxoskv.proto 之后才需要重新生成)

    • 首先安装protobuf的编译器 protoc, 可以根据 install-protoc 中的步骤安装, 一般简单的一行命令就可以了:

      • Linux: apt install -y protobuf-compiler
      • Mac: brew install protobuf

      安装好之后通过protoc --version确认版本, 至少应该是3.x: libprotoc 3.13.0

    • 安装protoc的go语言生成插件 protoc-gen-go:

      go get -u github.com/golang/protobuf/protoc-gen-go

    • 重新编译protokv.proto文件: 直接make gen 或:

        protoc \
            --proto_path=proto \
            --go_out=plugins=grpc:paxoskv \
            paxoskv.proto
      

    生成后的paxoskv.pb.go代码中可以看到, 其中主要的数据结构例如Acceptor的定义:

    type Acceptor struct {
      LastBal *BallotNum ...
      Val     *Value ...
      VBal    *BallotNum ...
            ...
    }
    

    以及KV服务的client端和server端的代码, client端是实现好的, server端只有一个interface, 后面我们需要来完成它的实现:

    type paxosKVClient struct {
      cc *grpc.ClientConn
    }
    type PaxosKVClient interface {
      Prepare(
        ctx context.Context,
        in *Proposer,
        opts ...grpc.CallOption
      ) (*Acceptor, error)
    
      Accept(
        ctx context.Context,
        in *Proposer,
        opts ...grpc.CallOption
      ) (*Acceptor, error)
    }
    
    type PaxosKVServer interface {
      Prepare(context.Context,
              *Proposer) (*Acceptor, error)
      Accept(context.Context,
             *Proposer) (*Acceptor, error)
    }
    

    实现存储的服务器端

    impl.go 是所有实现部分, 我们定义一个KVServer结构体, 用来实现grpc服务的interface PaxosKVServer; 其中使用一个内存里的map结构模拟数据的存储:

    type Version struct {
      mu       sync.Mutex
      acceptor Acceptor
    }
    type Versions map[int64]*Version
    type KVServer struct {
      mu      sync.Mutex
      Storage map[string]Versions
    }
    

    其中Version对应一个key的一次变化, 也就是对应一个paxos实例. Versions对应一个key的一系列变化. Storage就是所有key的所有变化.

    实现 Acceptor 的 grpc 服务 handler

    Acceptor, 是这个系统里的server端, 监听一个端口, 等待Proposer发来的请求并处理, 然后给出应答.

    根据paxos的定义, Acceptor的逻辑很简单: 在 slide-28 中描述:

    img

    根据教程里的描述, 为 KVServer 定义handle Prepare-request的代码:

    func (s *KVServer) Prepare(
        c context.Context,
        r *Proposer) (*Acceptor, error) {
    
      v := s.getLockedVersion(r.Id)
      defer v.mu.Unlock()
    
      reply := v.acceptor
    
      if r.Bal.GE(v.acceptor.LastBal) {
        v.acceptor.LastBal = r.Bal
      }
    
      return &reply, nil
    }
    

    这段代码分3步:

    • 取得paxos实例,
    • 生成应答: Acceptor总是返回LastBal, Val, VBal 这3个字段, 所以直接把Acceptor赋值给reply.
    • 最后更新Acceptor的状态: 然后按照paxos算法描述, 如果请求中的ballot number更大, 则记录下来, 表示不在接受更小ballot number的Proposer.

    其中getLockedVersion()KVServer.Storage中根据request 发来的PaxosInstanceId中的字段key和ver获取一个指定Acceptor的实例:

    func (s *KVServer) getLockedVersion(
        id *PaxosInstanceId) *Version {
    
      s.mu.Lock()
      defer s.mu.Unlock()
    
      key := id.Key
      ver := id.Ver
      rec, found := s.Storage[key]
      if !found {
        rec = Versions{}
        s.Storage[key] = rec
      }
    
      v, found := rec[ver]
      if !found {
        // initialize an empty paxos instance
        rec[ver] = &Version{
          acceptor: Acceptor{
            LastBal: &BallotNum{},
            VBal:    &BallotNum{},
          },
        }
        v = rec[ver]
      }
    
      v.mu.Lock()
      return v
    }
    

    handle Accept-request的处理类似, 在 slide-31 中描述: img

    Accept() 要记录3个值,

    • LastBal: Acceptor看到的最大的ballot number;
    • Val: Proposer选择的值,
    • 以及VBal: Proposer的ballot number:
    func (s *KVServer) Accept(
        c context.Context,
        r *Proposer) (*Acceptor, error) {
    
      v := s.getLockedVersion(r.Id)
      defer v.mu.Unlock()
    
      reply := Acceptor{
        LastBal: &*v.acceptor.LastBal,
      }
    
      if r.Bal.GE(v.acceptor.LastBal) {
        v.acceptor.LastBal = r.Bal
        v.acceptor.Val = r.Val
        v.acceptor.VBal = r.Bal
      }
    
      return &reply, nil
    }
    

    Acceptor 的逻辑到此完整了, 再看Proposer:

    实现Proposer 逻辑

    Proposer的运行分2个阶段, Phase1 和 Phase2, 与 Prepare 和 Accept 对应.

    Phase1

    impl.go 的实现中, Proposer.Phase1()函数负责Phase1的逻辑:

    func (p *Proposer) Phase1(
        acceptorIds []int64,
        quorum int) (*Value, *BallotNum, error) {
    
      replies := p.rpcToAll(acceptorIds, "Prepare")
    
      ok := 0
      higherBal := *p.Bal
      maxVoted := &Acceptor{VBal: &BallotNum{}}
    
      for _, r := range replies {
        if !p.Bal.GE(r.LastBal) {
          higherBal = *r.LastBal
          continue
        }
    
        if r.VBal.GE(maxVoted.VBal) {
          maxVoted = r
        }
    
        ok += 1
        if ok == quorum {
          return maxVoted.Val, nil, nil
        }
      }
    
      return nil, &higherBal, NotEnoughQuorum
    }
    

    这段代码首先通过 rpcToAll() 向所有Acceptor发送Prepare-request请求, 然后找出所有的成功的reply:

    • 如果发现一个更大的ballot number, 表示一个Prepare失败: 有更新的Proposer存在;
    • 否则, 它是一个成功的应答, 再看它有没有返回一个已经被Acceptor接受(voted)的值.

    最后, 成功应答如果达到多数派(quorum), 则认为Phase1 完成, 返回最后一个被voted的值, 也就是VBal最大的那个. 让上层调用者继续Phase2;

    如果没有达到quorum, 这时可能是有多个Proposer并发运行而造成冲突, 有更大的ballot number, 这时则把见到的最大ballot number返回, 由上层调用者提升ballot number再重试.

    client 与 server 端的连接

    上面用到的 rpcToAll 在这个项目中的实现client端(Proposer)到server端(Acceptor)的通讯, 它是一个十分 简洁美观 简陋的 grpc 客户端实现:

    func (p *Proposer) rpcToAll(
        acceptorIds []int64,
        action string) []*Acceptor {
    
      replies := []*Acceptor{}
    
      for _, aid := range acceptorIds {
        var err error
        address := fmt.Sprintf("127.0.0.1:%d",
            AcceptorBasePort+int64(aid))
    
        conn, err := grpc.Dial(
            address, grpc.WithInsecure())
        if err != nil {
          log.Fatalf("did not connect: %v", err)
        }
        defer conn.Close()
    
        c := NewPaxosKVClient(conn)
    
        ctx, cancel := context.WithTimeout(
            context.Background(), time.Second)
        defer cancel()
    
        var reply *Acceptor
        if action == "Prepare" {
          reply, err = c.Prepare(ctx, p)
        } else if action == "Accept" {
          reply, err = c.Accept(ctx, p)
        }
        if err != nil {
          continue
        }
        replies = append(replies, reply)
      }
      return replies
    }
    

    Phase2

    Proposer运行的Phase2 在slide-30 中描述, 比Phase1更简单:

    在第2阶段phase-2, Proposer X将它选定的值写入到Acceptor中, 这个值可能是它自己要写入的值, 或者是它从某个Acceptor上读到的v(修复).

    func (p *Proposer) Phase2(
        acceptorIds []int64,
        quorum int) (*BallotNum, error) {
    
      replies := p.rpcToAll(acceptorIds, "Accept")
    
      ok := 0
      higherBal := *p.Bal
      for _, r := range replies {
        if !p.Bal.GE(r.LastBal) {
          higherBal = *r.LastBal
          continue
        }
        ok += 1
        if ok == quorum {
          return nil, nil
        }
      }
    
      return &higherBal, NotEnoughQuorum
    }
    

    我们看到, 它只需要确认成 Phase2 的功应答数量达到quorum就可以了. 另外同样它也有责任在 Phase2 失败时返回看到的更大的ballot number, 因为在 Phase1 和 Phase2 之间可能有其他 Proposer 使用更大的ballot number打断了当前Proposer的执行, 就像slide-33 的冲突解决的例子中描述的那样. 后面讲.

    完整的paxos逻辑

    完整的 paxos 由 Proposer 负责, 包括: 如何选择一个值, 使得一致性得以保证. 如 slide-29 中描述的:

    Proposer X收到多数(quorum)个应答, 就认为是可以继续运行的.如果没有联系到多于半数的acceptor, 整个系统就hang住了, 这也是paxos声称的只能运行少于半数的节点失效. 这时Proposer面临2种情况:

    所有应答中都没有任何非空的v, 这表示系统之前是干净的, 没有任何值已经被其他paxos客户端完成了写入(因为一个多数派读一定会看到一个多数派写的结果). 这时Proposer X继续将它要写的值在phase-2中真正写入到多于半数的Acceptor中.

    如果收到了某个应答包含被写入的v和vrnd, 这时, Proposer X 必须假设有其他客户端(Proposer) 正在运行, 虽然X不知道对方是否已经成功结束, 但任何已经写入的值都不能被修改!, 所以X必须保持原有的值. 于是X将看到的最大vrnd对应的v作为X的phase-2将要写入的值.

    这时实际上可以认为X执行了一次(不知是否已经中断的)其他客户端(Proposer)的修复.

    img

    基于 Acceptor 的服务端和 Proposer 2个 Phase 的实现, 最后把这些环节组合到一起组成一个完整的paxos, 在我们的代码 RunPaxos 这个函数中完成这些事情:

    func (p *Proposer) RunPaxos(
        acceptorIds []int64,
        val *Value) *Value {
    
      quorum := len(acceptorIds)/2 + 1
    
      for {
        p.Val = val
    
        maxVotedVal, higherBal, err := p.Phase1(
            acceptorIds, quorum)
    
        if err != nil {
          p.Bal.N = higherBal.N + 1
          continue
        }
    
        if maxVotedVal != nil {
          p.Val = maxVotedVal
        }
    
        // val == nil 是一个读操作,
        // 没有读到voted值不需要Phase2
        if p.Val == nil {
          return nil
        }
    
        higherBal, err = p.Phase2(
            acceptorIds, quorum)
    
        if err != nil {
          p.Bal.N = higherBal.N + 1
          continue
        }
    
        return p.Val
      }
    }
    

    这段代码完成了几件事: 运行 Phase1, 有voted的值就选它, 没有就选自己要写的值val, 然后运行 Phase2.

    就像 Phase1 Phase2 中描述的一样, 任何一个阶段, 如果没达到quorum, 就需要提升遇到的更大的ballot number, 重试去解决遇到的ballot number冲突.

    这个函数接受2个参数:

    • 所有Acceptor的列表(用一个整数的id表示一个Acceptor),
    • 以及要提交的值.

    其中, 按照paxos的描述, 这个值val不一定能提交: 如果paxos在 Phase1 完成后看到了其他已经接受的值(voted value), 那就要选择已接收的值, 放弃val. 遇到这种情况, 在我们的系统中, 例如要写入key=foo, ver=3的值为bar, 如果没能选择bar, 就要选择下一个版本key=foo, ver=4再尝试写入.

    这样不断的重试循环, 写操作最终都能成功写入一个值(一个key的一个版本的值).

    实现读操作

    在我们这个NB(naive and bsice)的系统中, 读和写一样都要通过一次paxos算法来完成. 因为写入过程就是一次paxos执行, 而paxos只保证在一个quorum中写入确定的值, 不保证所有节点都有这个值. 因此一次读操作如果要读到最后写入的值, 至少要进行一次多数派读.

    但多数派读还不够: 它可能读到一个未完成的paxos写入, 如 slide-11 中描述的脏读问题, 读取到的最大VBal的值, 可能不是确定的值(写入到多数派).

    例如下面的状态:

    Val=foo    Val=bar    ?
    VBal=3     VBal=2     ?
    -------    -------    --
    A0         A1         A2
    

    如果Proposer试图读, 在 Phase1 联系到A0 A1这2个Acceptor, 那么foo和bar这2个值哪个是确定下来的, 要取决于A2的状态. 所以这时要再把最大VBal的值跑完一次 Phase2, 让它被确定下来, 然后才能把结果返回给上层(否则另一个Proposer可能联系到A1 和 A2, 然后认为Val=bar是被确定的值).

    当然如果 Proposer 在读取流程的 Phase1 成功后没有看到任何已经voted的值(例如没有看到foo或bar), 就不用跑 Phase2 了.

    所以在这个版本的实现中, 读操作也是一次 RunPaxos 函数的调用, 除了它并不propose任何新的值, 为了支持读操作, 所以在上面的代码中 Phase2 之前加入一个判断, 如果传入的val和已voted的值都为空, 则直接返回:

    if p.Val == nil {
      return nil
    }
    

    Example_setAndGetByKeyVer 这个测试用例展示了如何使用paxos实现一个kv存储, 实现读和写的代码大概这样:

    prop := Proposer{
      Id: &PaxosInstanceId{
        Key: "foo",
        Ver: 0,
      },
      Bal: &BallotNum{N: 0, ProposerId: 2},
    }
    
    // 写:
    v := prop.RunPaxos(acceptorIds, &Value{Vi64: 5})
    
    // 读:
    v := prop.RunPaxos(acceptorIds, nil)
    

    到现在为止, 本文中涉及到的功能都实现完了, 完整实现在 impl.go 中.

    接着我们用测试用例实现1下 paxos的直观解释 中列出的2个例子, 从代码看poxos的运行:

    文中例子

    第1个例子是 paxos 无冲突的运行 slide-32:

    img

    把它写成test case, 确认教程中每步操作之后的结果都如预期 TestCase1SingleProposer:

    func TestCase1SingleProposer(t *testing.T) {
      ta := require.New(t)
    
      acceptorIds := []int64{0, 1, 2}
      quorum := 2
    
      // 启动3个Acceptor的服务
      servers := ServeAcceptors(acceptorIds)
      defer func() {
        for _, s := range servers {
          s.Stop()
        }
      }()
    
      // 用要更新的key和version定义paxos 实例的id
      paxosId := &PaxosInstanceId{
        Key: "i",
        Ver: 0,
      }
    
      var val int64 = 10
    
      // 定义Proposer, 随便选个Proposer id 10.
      var pidx int64 = 10
      px := Proposer{
        Id:  paxosId,
        Bal: &BallotNum{N: 0, ProposerId: pidx},
      }
    
      // 用左边2个Acceptor运行Phase1,
      // 成功, 没有看到其他的ballot number
      latestVal, higherBal, err := px.Phase1(
          []int64{0, 1}, quorum)
    
      ta.Nil(err, "constitued a quorum")
      ta.Nil(higherBal, "no other proposer is seen")
      ta.Nil(latestVal, "no voted value")
    
      // Phase1成功后, 因为没有看到其他voted的值,
      // Proposer选择它自己的值进行后面的Phase2
      px.Val = &Value{Vi64: val}
    
      // Phase 2
      higherBal, err = px.Phase2(
          []int64{0, 1}, quorum)
    
      ta.Nil(err, "constitued a quorum")
      ta.Nil(higherBal, "no other proposer is seen")
    }
    

    第2个例子对应2个Proposer遇到冲突并解决冲突的例子, 略长不贴在文中了, 代码可以在 TestCase2DoubleProposer 看到

    img

    下一步

    我们实现了指定key, ver的存储系统, 但相比真正生产可用的kv存储, 还缺少一些东西:

    • 写操作一般都不需要用户指定ver, 所以还需要实现对指定key查找最大ver的功能. 这些跟paxos关系不大, 现在这个实现中就省去了这些逻辑. 以后再讲. 🤔

    • 其次为了让读操作不需要指定ver, 还需要一个snapshot功能, 也就是保存一个key-value的map, 这个map中只需要记录每个key最新的value值(以及ver等). 有了这个map之后, 已经确认的值对应的version就可以删掉了. 也就是说Versions 结构只作为每个key的修改日志存在, 用于存储每次修改对应的paxos实例.

    • snapshot功能还会引入应另外一个需求, 就是paxos made simple 中的 learn 的行为, 对应Phase3, 本文中描述的这个存储中, 只有Proposer知道某个key-ver达到多数派, Acceptor还不知道, (所以读的时候还要走一遍paxos). 在论文中的描述是Acceptor接受一个值时(vote), 也要把这个事情通知其他 Learner角色, 我们可以给每个Acceptor也设定成Learner: Acceptor vote一个值时除了应答Proposer, 也广播这个事件给其他Acceptor, 这样每个Acceptor也就可以知道哪个值是达到quorum了(safe), 可以直接被读取.

      但在实际实现时, 这种方法产生的消息会达到 n² 级别的数量. 所以一般做法是让Proposer做这件事: 当Proposer收到一个quorum的Phase2应答后, 再广播一条消息告诉所有的Acceptor: 这个paxos实例已经safe了, 这个消息在大多数系统中都就称作Commit.

    以上这3块内容, 后续播出, 下个版本的实现将使用经典的log 加 snapshot的方式存储数据.

    各位朋友对哪些方面感兴趣, 欢迎催更 🤔…


    本文用到的代码在 paxoskv 项目的 naive 分支上: https://github.com/openacid/paxoskv/tree/naive

    如有什么本文遗漏的地方, 或有任何好想法, 欢迎随时交流讨论,

    本文相关问题可以在 paxoskv 这个项目上提 基hub issue.

    本文链接: https://blog.openacid.com/algo/paxoskv/

    openacid

    展开全文
  • paxos 算法实现和设计要点

    千次阅读 2013-11-21 22:42:49
    我们相信选择一种描述语言,与和系统其它部分揉合在一块的显式代码实现相比,要更容易对状态机进行推导和修改。 运行时一致性检查 :虽然算法正确,但是人为错误、代码bug会导致系统不一致, 所以工程上添加周期...


    先想到的几个点

    1. 用了fast Paxos, 只有一个leader(master,coordinator).  如果leader fail(单点),那么新的master需要选举。  master选举本身也是个 basic paxos instance, 所以多个节点都请求为master,这里多个节点都扮演了proposer(不是fast paxos了), 这就是普通的paxos算法了.

    2.  Phase 2 中如果有收到acceptor 返回的值,那么必须使用其序列号最高的那个值。

    3.  Paxos Group replica直接的连接维护: 如果是FC,那么使用connection事件;如果以太网连接,那么必须去polling/ping 对方的机器。其实这里问题很多。

    3.1  节点间的连接状态是否要维护在所有的节点上还是就master上。 个人觉得应该在所有的节点上。

    3.2  如何处理网络的transient ?

    3.3 网络出问题。 集群被隔成几个小集群,互相不能通信了。 某两台机器之间不能连接了。

    4. 假设用basic paxos,  有proposer 1, proposer 2和 proposer 3,他们先后发出一轮(one epouch) paxos, 分别是instance 1, instance 2, instance 3, 然后instance 1已经accept 了 proposal from proposer 1 ,  acceptor 2 在想 accept proposal from proposer 1之前promise了 proposal from instance 2 然后accept了proposal frominstance 3 但是在proposer 1和proposer 2获得多数派的acknowledge之前被proposer 3的proposal 抢占了。 所以, proposer 3 的paxos Instance 3会得到不同的值,但是不幸的是在instance 3 达到多数派回复之前,instance 1 达到多数派了,但是按paxos 协议,instance 3选取的是 instance 2 的值啊。  

        所以这里问题是, acceptor回复的是自己之前accept过的值和epoch号还是 回复自己之前accept过且已经被多数派accept也就是说已经choosed的值啊 ???????????


    5. Proposal是 单个的还是批量的,单个的话会导致效率问题,批量的话又如何组织呢 ?

    6. Learner的可以由proposer/master来担任, master如果故障那么learner也故障,反之也一样,而master的单点故障已经有master lease和paxos选举解决了。

    7. 选举quorum也即是replica 多数派的确定。 最好是可以动态管理的。


    参考chubby

    1. sector 4.1):

    用了fast paxos, 只有一个leader(master,coordinator).  如果leaderfail(单点),那么新的master需要选举。  master选举本身也是paxos instance, 所以多个节点都请求为master,这里多个节点都扮演了proposer(不是fast paxos了), 这就是普通的paxos算法了,这样就需要解决两个工程问题:

    1.1 编号的全局性通过取模偏序编号解决;

    1.2  如果有已经有accept的coordinator,一个instance会有多个round,那么新的propser最新一次propose必须使用编号最高的值,如果通过那么这儿proposer就是新的coordinator.  其实就是下面方案2的情况。

    收到多数派的promise

    根据包含的value不同,这些promise又分三种情况:

    • 多数派的value是相同的,说明之前已经达成了最终决议
    • value互不相同,之前并没有达成最终决议
    • 返回的value全部为null

    全部为null的情况比较好处理,只要proposer自由决定value即可;多数派达成一致的情况也好处理,选择已经达成决议的value提交即可,value互不相同的情况有两种处理方式:

    • 方案1:自由确定一个value。原因:反正之前没有达成决议,本次提交哪个value应该是没有限制的。
    • 方案2:选择promise编号最大的value。原因:本次选举(instance)已经有提案了,虽未获通过,但以后的提案应该继续之前的,Lamport在paxos simple的论文中选择这种方式。

    其实问题的本质是:在一个instance内,一个acceptor是否能accept多个value?约束P2只是要求,如果某个value v已被选出,那之后选出的还应该是v;反过来说,如果value v还没有被多数派accept,也没有限制acceptor只accept一个value。

    感觉两种处理方式都可以,只要选择一个value,能在paxos之后的过程中达成一致即可。其实不然,有可能value v已经成为了最终决议,但acceptor不知道,如果这时不选择value v而选其他value,会导致在一次instance内达成两个决议。


    2.   sector 4.2)

    2.1 如果有的节点比较慢,或者故障过,那么最近几次的paxos instance可能missed, so A catch-up mechanism is used to catch up with the master replica.

    2.2 Each replica has the persistent log to record all paxos behavior. So one instance requires a sequence of 5 writes( for each of the propose, promise, accept, acknowledgement and commit(这里或许该learn)) to disk. The disk flush can dominate the overall latency of the system. So has to reduce it. 

       This is achieved by ignore Propose message or promise step because Master ID does not change between instances in fast paxos, becuase only one proposer is master. So no live lock here.  With this optimization, only one write to disk per instance on each replica.  The master writes to disk immediately after sending its accept
    message and other replicas write to disk prior to sending their acknowledge message.


    3. 硬盘错误处理,persistent log

    可能发生文件内容被改变也可能是文件不访问。 内容改变检查的话用checksum of content的方法。 要区分是文件不可访问还是带着空盘的新节点,chubby用的是GFS maker, 这你可以用其他方法。 

    发生错误的节点得用catchup方法尽快同步到最新状态,期间算作paxos机器的一部分,但是不参加选举。这样既可以维护paxos的完整性又可以降低系统延迟。


    4. Master Lease

    基本是为了让replicas知道master还活着,更进一步可以让master知道replica还活着,这样master可以转让部分读取请求给replica。

    一个lease,一个租约,本质上是个定时器。 到期了就得更新,否则无效。 replica给之前的当然现在也活着master发送一个lease,期间replica拒绝任何来自其他节点paxos消息(propose, accept),lease快过期的时候,master向replica申请更新租约。  在实现上,master上的lease时间跨度比replica上的lease短一点点,这样是为了避免系统时钟频率漂移导致的误差。 这有力的保证了,老master lease失效前,新master不会产生。这样client不会向老的master发生读取请求。 


    paxos算法本身保证了数据在各个节点一致性,另外把lease 概念扩展到普通的relica上,这样可以让普通的replica 服务读取请求。 这样可以大大提高性能,毕竟在有些场景中读取远远比写来的多。


    如果在一个网络不稳定环境,master会暂时性的失去连接,那么新的master会被选举并工作,新的master会来维护sequence number在多个paxos 实例之间。 但是老的master 回来后,尝试连接成功后会发起新的paxos 实例并且以更高的sequence number来抢回master位置(这里开始不理解了,不是有lease机制保证只有一个master吗? 老的master回来后发现lease应该expire了啊,那么replicas会拒绝来自老的master的paxos消息啊 ???)。 所以建议,master要经常去提高sequence number,  就是有事没事的就去提高一下。


    这里引发了工程中的现实问题,master如何fail over到新的master, 这里的租约机制只说明了如何防止一个新的master的产生在老master过期之前,但是没有提及如何选举产生新的master, client如何知道新的master IP.  我这里提出很多问题

    •   譬如原来7个replica互相联通,某个为master,突然网络问题,3 replica互联,另外四个互联,这个时候为保证多数派一致性,必须选用4个节点集群担任新的paxos集群,但是4个节点在分裂为2对2呢 ? 或者master被完全隔离与网络之外。 
    • 新选出的master如何通知client, 假设简单的socket通知。或者采用client提交read 请求的时候,老的master的告诉client 他lease过期, 然后client随机去请求一个replica,直到选举新master的quorum中的replica告诉client新master的IP。
    • lease机制导致了 lease时间内无法选举产生新的master。 这一点SVC比 chubby强,SVC 不用paxos算法选举新的master。 而是网络连接地图会维护网络连接状态,他直接选举多数派中的第一个节点为 master。


    5.  sector 5.4 Group membership, sector 5.5 Snapshots  and 5.6 Database transactions


    Group membership 变更问题,本文没有透露。 可惜。 不过我觉得需要一层连接地图层。

    Snapshots 是为了catchup 变快,和 记录paxos 动作的磁盘不够大时候。 paxos决议本质上是对一些数据结构进行操作的 状态机命令。可以对数据结构进行快照,然后把老的日志数据清除, 但是Paxos本身不知道具体的应用的数据结构,所以应用程序进行snapshot.

    然后replica恢复的时候,从snapshot + snapshot之后的日志进行恢复,然后再向leader replica申请 leader的snapshot 和日志。

    chubby的状态最终都维护在那个 fault-tolerant DB上,所以对DB的一些操作进行封装。 诸如insert,delete,lookup,原子性的cas(compare and swap)等操作, 其中 CAS操作是具有原子性的,所以进行简单封装可以模拟数据库事务,这个非常有用。 我们采用针对整个数据库的snapshot以及应用在该snapshot之上的数据库操作日志,实现了一个log-structured设计。操作日志就是Paxos日志。该实现会周期性地对数据库进行snapshot,以及进行相应日志的截断。



    6.  sector ) Software Engineering

    高效的算法表示: 容错算法本就是很难正确地进行表达,即使是以伪代码的形式。在当算法的代码是与其他代码共同构建一个完整系统的时候,情况会更糟。核心算法的阅读,理解,或者是在出现bug时的debug都变得更加困难。 我们相信选择一种描述语言,与和系统其它部分揉合在一块的显式代码实现相比,要更容易对状态机进行推导和修改。

    运行时一致性检查:虽然算法正确,但是人为错误、代码bug会导致系统不一致, 所以工程上添加周期性的数据库一致性检查。Master周期性地提交一个针对数据库日志的checksum请求{!与读写请求一样,该请求也会被串行化,这样就可以保证它是对所有副本的相同状态计算的checksum}。 每个副本计算的checksun 会和master发过来的checksum进行比较,如果不一致则.....

    测试:

    并发性:总之,我们认为通过限制并发性来保证执行的可重复性,是一个正确的目标。不幸的是,产品的发展使得我们无法一直保持这些目标。






    展开全文
  • Ceph Monitor Paxos实现

    2020-06-15 21:42:03
    Monitor的Paxos实现主要包括Leveldb、Paxos和PaxosService三层,其中Leveldb负责底层存储,Paxos负责Paxos算法的具体实现,PaxosService则是基于Paxos提供的服务,包括OSDMap、MonitorMap、PGMap、CRUSHMap。...
  • Background200行代码实现paxos-kv中介绍了一款非常简洁的分布式kv存储实现, 它是基于classic-paxos实现分布式一致性. 在paxos的直观解释中我们...
  • ceph存储 ceph集群Paxos算法实现

    千次阅读 2014-12-17 11:30:20
    Ceph中Paxos算法的实现,省略了Prepare阶段,并且Leader选举成功后每次执行算法使用同一个提案号。在Prepare阶段要完成(a)、(b)和(c)三件事,前两件事在Recovery阶段完成,Leader和Peon的 已接受的最大提案号 保持...
  • Paxos算法Java实现

    2021-05-08 22:55:17
    Paxos 算法的简要 Java 实现。 前置知识 首先我们还是熟悉一下 gRPC 的使用,我们需要通过使用 ServerBuilder 构建 RPC 的服务端,以下是示例代码: ServerBuilder .forPort(node.peerSet.getSelf().getPort...
  • Paxos 实现之 Chubby

    2021-04-27 19:19:25
    Chubby的底层一致性实现就是以Paxos算法为基础,Chubby提供了粗粒度的分布式锁服务,开发人员直接调用Chubby的锁服务接口即可实现分布式系统中多个进程之间粗粒度的同控制,从而保证分布式数据的一致性。 1 设计目标...
  • Zookeeper介绍-Paxos实现

    2017-05-16 09:15:38
    ZooKeeper是近期比较热门的一个类Paxos实现。也是一个逐渐得到广泛应用的开源的分布式锁服务实现。被认为是Chubby的开源版,虽然具体实现有很多差异。ZooKeeper概要的介绍可以看官方文档:...
  • Paxos算法分析和实现

    2021-06-01 14:42:15
    Paxos算法背景内容算法过程代码实现 背景 Paxos算法,是用来解决分布式环境中一致性问题的算法,其在zookeeper等框架中均有体现。 提到一致性,这是一个特别常见的问题,缓存和数据库的一致性,事务的一致性,再到...
  • Paxos

    2020-07-31 11:34:15
    Paxos算法解决的问题正是分布式一致性问题,即一个分布式系统中的各个进程如何就某个值(决议)达成一致。Paxos算法运行在允许宕机故障的异步系统中,不要求可靠的消息传递,可容忍消息丢失、延迟、乱序以及重复。它...
  • 0 paxos算法解决了什么问题 现在有n个人组成提一个会议,这个会议的目的是为了确定今年的税率,那么每个人都会提出自己认为的今年的合理的税率,为了大家能够达成一致,有了paxos算法。实际里,这个会议就是一个集群...
  • paxos深入

    2017-05-25 08:25:43
    paxos深入 paxos实现
  • paxos

    千次阅读 2016-07-13 17:55:38
    作者:朱一聪 ...来源:知乎 著作权归作者所有。...作为一个因为毕设和这个密切相关从而有了解的人表示,paxos本身并不复杂,在> Lamport用两段话就描述清楚了它的流程。他老人家也说paxos其实是个简单的算法。但是是我
  • 前文《理解 Paxos》只包含伪代码,帮助了理解但又不够爽,既然现在都讲究Talk is cheap. Show me the code.这次就把文章中的伪代码用 Go 语言实现出来...
  • 1.paxos算法 为什么需要paxos 相关概念:Paxos算法是基于消息传递且具有高度容错特性的一致性算法,是目前公认的解决分布式一致性问题最有效的算法之一,其解决的问题就是在分布式系统中如何就某个值(决议)达成...
  • ZooKeeper是近期比较热门的一个类Paxos实现。也是一个逐渐得到广泛应用的开源的分布式锁服务实现。被认为是Chubby的开源版,虽然具体实现有很多差异。ZooKeeper概要的介绍可以看官方文档:...
  • 前文《理解 Paxos》只包含伪代码,帮助了理解但又不够爽,既然现在都讲究Talk is cheap. Show me the code.这次就把文章中的伪代码用 Go 语言实现出来...
  • Paxos分布式一致性算法 阅读《从Paxos到Zookeeper》一书第二章,学习Paxos算法。 这个算法对我这个小菜鸡来说,看一遍书还是非常晦涩难懂的,所以不断参考网上生动形象的讲解及重复看了好多遍书上的这块内容,算是...
  • Paxos 算法包含 2 个部分: 1、Basic Paxos : 描述多节点之间如何就某个值...这样对于业务代码就没有侵入性,不需要再业务代码实现算法逻辑。 接受者:对每个提议的值进行投票,并存储接受的值。一般来说,集群中的
  • 3.Java本身是一门繁琐的语言,所以难免会有很大冗余代码,这主要是为了保证程序的稳定性,还有一部分是出于习惯。Paxos其实并不是一个复杂的算法,至少基本的不是。程序中所采用的的面向对象的设计也不一定是合理的...
  • Paxos算法之旅(四)zookeeper代码解析

    千次阅读 2011-02-22 18:41:00
    Paxos算法之旅(四)zookeeper代码解析Posted by linxuan on 2010-11-10 Leave a comment (3)Go to comments ZooKeeper是近期比较热门的一个类Paxos实现。也是一个逐渐得到广泛应用的开源的分布式锁服务实现...
  • 腾讯开源的 Paxos库 PhxPaxos 代码解读---Accept阶段(一) 在看Accept阶段代码之前, 我们再回想一下 Basic Paxos算法; 1. Basic Paxos 算法是为了使集群中的Acceptor们达成一个最终的值, 或者不能达成一个最终...
  • 摘要:提到分布式算法,就不得不提 Paxos 算法,在过去几十年里,它基本上是分布式共识的代 名词,因为当前最常用的一批共识算法都是基于它改进的。比如,Fast Paxos 算法、 Cheap Paxos 算法、Raft 算法、ZAB 协议...
  • Paxos算法

    2020-08-14 17:36:50
    最终一致性的经典实现Paxos算法 Paxos算法又被称为两阶段算法,请注意不是实现分布式事务的两阶段提交协议。Paxos算法的工作场景是基于CAP原理构建的分布式系统,是在这样的环境中如何高效率的达到数据的最终一致...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,038
精华内容 4,415
关键字:

paxos代码实现