python 기반 Paxos 알고리즘 구현

알고리즘 을 가장 빨리 이해 하고 가장 깊 은 방법 을 이해 합 니 다.저 는 자신 이 수 동 으로 실현 할 수 있다 고 생각 합 니 다.프로젝트 에서 자신 이 실현 하지 않 아 도 되 지만 이미 봉 인 된 알고리즘 라 이브 러 리 가 있 습 니 다.저희 가 호출 할 수 있 도록 제 가 직접 실천 할 필요 가 있다 고 생각 합 니 다.
여기 서 먼저 설명 하 겠 습 니 다.python 이라는 동적 언어 는 익숙 하지 않 은 사람 에 게 어색 할 수 있 습 니 다.자바 처럼 매개 변수 유형 이 고정 되 어 있 지 않 기 때문에 보기 에는 약간 아 플 수 있 습 니 다.이곳 환경 은 python 2.7 을 사용 합 니 다.

class Message:
  # command
  MSG_ACCEPTOR_AGREE = 0 #      
  MSG_ACCEPTOR_ACCEPT = 1 #      
  MSG_ACCEPTOR_REJECT = 2 #      -    
  MSG_ACCEPTOR_UNACCEPT = 3 #       -   
  MSG_ACCEPT = 4 #   
  MSG_PROPOSE = 5 #   
  MSG_EXT_PROPOSE = 6 #     
  MSG_HEARTBEAT = 7 #   ,          
  def __init__(self, command=None):
    self.command = command
  #            ,      
  def copyAsReply(self, message):
    #   ID #   ID #    #   
    self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
    self.value = message.value #     
그리고 socket,스 레 드,대기 열 을 이용 한 메시지 처리 장치 입 니 다.

#   socket    ,        
import threading
import pickle
import socket
import queue
class MessagePump(threading.Thread):
  #       
  class MPHelper(threading.Thread):
    #
    def __init__(self, owner):
      self.owner = owner
      threading.Thread.__init__(self)
    def run(self):
      while not self.owner.abort: #            ,      
        try:
          (bytes, addr) = self.owner.socket.recvfrom(2048) #     
          msg = pickle.loads(bytes) #             
          msg.source = addr[1]
          self.owner.queue.put(msg) #       
        except Exception as e:
          pass

  def __init__(self, owner, port, timeout=2):
    threading.Thread.__init__(self)
    self.owner = owner
    self.abort = False
    self.timeout = 2
    self.port = port
    self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP  
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000) #     
    self.socket.bind(("localhost", port)) #     ,ip,  
    self.socket.settimeout(timeout) #     
    self.queue = queue.Queue() #   
    self.helper = MessagePump.MPHelper(self) #     

  #      
  def run(self):
    self.helper.start() #         
    while not self.abort:
      message = self.waitForMessage() #     
      self.owner.recvMessage(message) #     

  #     
  def waitForMessage(self):
    try:
      msg = self.queue.get(True, 3) #     ,    3s
      return msg
    except:
      return None

  #     
  def sendMessage(self, message):
    bytes = pickle.dumps(message) #       
    address = ("localhost", message.to) #   ip,  (ip,port)
    self.socket.sendto(bytes, address)
    return True
  #        
  def doAbort(self):
    self.abort = True
메시지 처리 기 를 하나 더 추가 합 니 다.아 날로 그 메시지 의 전달,지연,가방 을 잃 어 버 립 니 다.사실 이 종 류 는 별 쓸모 가 없습니다.이것 은 아 날로 그 테스트 를 위해 준비 한 것 입 니 다.

from MessagePump import MessagePump
import random
class AdversarialMessagePump(MessagePump): #     
  #       ,           ,       ,         
  def __init__(self, owner, port, timeout=2):
    MessagePump.__init__(self, owner, port, timeout) #      
    self.messages = set() #       

  def waitForMessage(self):
    try:
      msg = self.queue.get(True, 0.1) #        
      self.messages.add(msg) #     
    except Exception as e: #     
      pass
      # print(e)
    if len(self.messages) > 0 and random.random() < 0.95: # Arbitrary!
      msg = random.choice(list(self.messages)) #         
      self.messages.remove(msg) #     
    else:
      msg = None
    return msg
또 하 나 는 기록 류.

# InstanceRecord     ,       、          
from PaxosLeaderProtocol import PaxosLeaderProtocol
class InstanceRecord:
  def __init__(self):
    self.protocols = {}
    self.highestID = (-1, -1) # (port,count)
    self.value = None

  def addProtocol(self, protocol):
    self.protocols[protocol.proposalID] = protocol
    #
    if protocol.proposalID[1] > self.highestID[1] or (
        protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
      self.highestID = protocol.proposalID #          

  def getProtocol(self, protocolID):
    return self.protocols[protocolID]

  def cleanProtocols(self):
    keys = self.protocols.keys()
    for k in keys:
      protocol = self.protocols[k]
      if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
        print("    ")
        del self.protocols[k]
다음은 Acceptor 의 실현 이다.

#    
from MessagePump import MessagePump
from Message import Message
from InstanceRecord import InstanceRecord
from PaxosAcceptorProtocol import PaxosAcceptorProtocol
class PaxosAcceptor:
  def __init__(self, port, leaders):
    self.port = port
    self.leaders = leaders
    self.instances = {} #     
    self.msgPump = MessagePump(self, self.port) #      
    self.failed = False

  #       
  def start(self):
    self.msgPump.start()

  #   
  def stop(self):
    self.msgPump.doAbort()

  #   
  def fail(self):
    self.failed = True

  def recover(self):
    self.failed = False

  #     
  def sendMessage(self, message):
    self.msgPump.sendMessage(message)

  #    ,         
  def recvMessage(self, message):
    if message == None:
      return
    if self.failed: #          
      return

    if message.command == Message.MSG_PROPOSE: #          
      if message.instanceID not in self.instances:
        record = InstanceRecord() #    
        self.instances[message.instanceID] = record
      protocol = PaxosAcceptorProtocol(self) #     
      protocol.recvProposal(message) #     
      self.instances[message.instanceID].addProtocol(protocol)
    else:
      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)

  #      ,
  def notifyClient(self, protocol, message):
    if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: #      ,  
      self.instances[protocol.instanceID].value = message.value #     
      print(u"         %s" % message.value)

  #          
  def getHighestAgreedProposal(self, instance):
    return self.instances[instance].highestID # (port,count)

  #       
  def getInstanceValue(self, instance):
    return self.instances[instance].value
그럼 AcceptorProtocol 의 실현 을 살 펴 보 겠 습 니 다.

from Message import Message
class PaxosAcceptorProtocol(object):
  # State variables
  STATE_UNDEFINED = -1 #          0
  STATE_PROPOSAL_RECEIVED = 0 #     
  STATE_PROPOSAL_REJECTED = 1 #     
  STATE_PROPOSAL_AGREED = 2 #     
  STATE_PROPOSAL_ACCEPTED = 3 #     
  STATE_PROPOSAL_UNACCEPTED = 4 #     

  def __init__(self, client):
    self.client = client
    self.state = PaxosAcceptorProtocol.STATE_UNDEFINED

  #   ,          
  def recvProposal(self, message):

    if message.command == Message.MSG_PROPOSE: #   
      self.proposalID = message.proposalID
      self.instanceID = message.instanceID
      (port, count) = self.client.getHighestAgreedProposal(message.instanceID) #   ,         
      #           
      #          
      if count < self.proposalID[1] or (count == self.proposalID[1] and port < self.proposalID[0]):
        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED #     
        print("    :%s, %s " % (message.instanceID, message.value))
        value = self.client.getInstanceValue(message.instanceID)
        msg = Message(Message.MSG_ACCEPTOR_AGREE) #     
        msg.copyAsReply(message)
        msg.value = value
        msg.sequence = (port, count)
        self.client.sendMessage(msg) #     
      else: #              
        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
      return self.proposalID
    else:
      #     
      pass
  #   
  def doTransition(self, message): #              ,       
    if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
      self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED #     
      msg = Message(Message.MSG_ACCEPTOR_ACCEPT) #     
      msg.copyAsReply(message) #      
      for l in self.client.leaders:
        msg.to = l
        self.client.sendMessage(msg) #        
      self.notifyClient(message) #     
      return True
    raise Exception("          ")

  #         
  def notifyClient(self, message):
    self.client.notifyClient(self, message)
이어서 리더 와 리더 프로 토 콜 의 실현 을 살 펴 보 겠 습 니 다.

#    
import threading
import Queue
import time
from Message import Message
from MessagePump import MessagePump
from InstanceRecord import InstanceRecord
from PaxosLeaderProtocol import PaxosLeaderProtocol
class PaxosLeader:
  #     
  class HeartbeatListener(threading.Thread):
    def __init__(self, leader):
      self.leader = leader
      self.queue = Queue.Queue() #     
      self.abort = False
      threading.Thread.__init__(self)

    def newHB(self, message):
      self.queue.put(message)

    def doAbort(self):
      self.abort = True

    def run(self): #     
      elapsed = 0
      while not self.abort:
        s = time.time()
        try:
          hb = self.queue.get(True, 2)
          #     ,        ,     
          if hb.source > self.leader.port:
            self.leader.setPrimary(False)
        except:
          self.leader.setPrimary(True)

  #     
  class HeartbeatSender(threading.Thread):
    def __init__(self, leader):
      threading.Thread.__init__(self)
      self.leader = leader
      self.abort = False
    def doAbort(self):
      self.abort = True
    def run(self):
      while not self.abort:
        time.sleep(1)
        if self.leader.isPrimary:
          msg = Message(Message.MSG_HEARTBEAT)
          msg.source = self.leader.port
          for leader in self.leader.leaders:
            msg.to = leader
            self.leader.sendMessage(msg)

  def __init__(self, port, leaders=None, acceptors=None):
    self.port = port
    if leaders == None:
      self.leaders = []
    else:
      self.leaders = leaders
    if acceptors == None:
      self.acceptors = []
    else:
      self.acceptors = acceptors
    self.group = self.leaders + self.acceptors #     
    self.isPrimary = False #        
    self.proposalCount = 0
    self.msgPump = MessagePump(self, port) #      
    self.instances = {}
    self.hbListener = PaxosLeader.HeartbeatListener(self) #   
    self.hbSender = PaxosLeader.HeartbeatSender(self) #     
    self.highestInstance = -1 #     
    self.stoped = True #       
    self.lasttime = time.time() #       

  def sendMessage(self, message):
    self.msgPump.sendMessage(message)

  def start(self):
    self.hbSender.start()
    self.hbListener.start()
    self.msgPump.start()
    self.stoped = False

  def stop(self):
    self.hbSender.doAbort()
    self.hbListener.doAbort()
    self.msgPump.doAbort()
    self.stoped = True

  def setPrimary(self, primary): #      
    if self.isPrimary != primary:
      # Only print if something's changed
      if primary:
        print(u"  leader%s" % self.port)
      else:
        print(u"   leader%s" % self.port)
    self.isPrimary = primary

  #              
  def getGroup(self):
    return self.group

  def getLeaders(self):
    return self.leaders

  def getAcceptors(self):
    return self.acceptors

  #     1/2      
  def getQuorumSize(self):
    return (len(self.getAcceptors()) / 2) + 1

  def getInstanceValue(self, instanceID):
    if instanceID in self.instances:
      return self.instances[instanceID].value
    return None

  def getHistory(self): #     
    return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]

  #        
  def getNumAccpted(self):
    return len([v for v in self.getHistory() if v != None])

  #            
  def findAndFillGaps(self):
    for i in range(1, self.highestInstance):
      if self.getInstanceValue(i) == None:
        print("    ", i)
        self.newProposal(0, i)
    self.lasttime = time.time()

  #       
  def garbageCollect(self):
    for i in self.instances:
      self.instances[i].cleanProtocols()

  #     
  def recvMessage(self, message):
    if self.stoped:
      return
    if message == None:
      if self.isPrimary and time.time() - self.lasttime > 15.0:
        self.findAndFillGaps()
        self.garbageCollect()
      return
    #      
    if message.command == Message.MSG_HEARTBEAT:
      self.hbListener.newHB(message)
      return True
    #       
    if message.command == Message.MSG_EXT_PROPOSE:
      print("     ", self.port, self.highestInstance)
      if self.isPrimary:
        self.newProposal(message.value)
      return True

    if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)

    if message.command == Message.MSG_ACCEPTOR_ACCEPT:
      if message.instanceID not in self.instances:
        self.instances[message.instanceID] = InstanceRecord()
      record = self.instances[message.instanceID]
      if message.proposalID not in record.protocols:#    
        protocol = PaxosLeaderProtocol(self)
        protocol.state = PaxosLeaderProtocol.STATE_AGREED
        protocol.proposalID = message.proposalID
        protocol.instanceID = message.instanceID
        protocol.value = message.value
        record.addProtocol(protocol)
      else:
        protocol = record.getProtocol(message.proposalID)

      protocol.doTransition(message)

    return True
  #     
  def newProposal(self, value, instance=None):
    protocol = PaxosLeaderProtocol(self)
    if instance == None: #       
      self.highestInstance += 1
      instanceID = self.highestInstance
    else:
      instanceID = instance
    self.proposalCount += 1
    id = (self.port, self.proposalCount)
    if instanceID in self.instances:
      record = self.instances[instanceID]
    else:
      record = InstanceRecord()
      self.instances[instanceID] = record
    protocol.propose(value, id, instanceID)
    record.addProtocol(protocol)

  def notifyLeader(self, protocol, message):
    if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
      print("    %s %s  " % (message.instanceID, message.value))
      self.instances[message.instanceID].accepted = True
      self.instances[message.instanceID].value = message.value
      self.highestInstance = max(message.instanceID, self.highestInstance)
      return
    if protocol.state == PaxosLeaderProtocol.STATE_REJECTED: #     
      self.proposalCount = max(self.proposalCount, message.highestPID[1])
      self.newProposal(message.value)
      return True
    if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
      pass
LeaderProtocol 실현:

from Message import Message
class PaxosLeaderProtocol(object):
  STATE_UNDEFINED = -1 #          0
  STATE_PROPOSED = 0 #     
  STATE_REJECTED = 1 #     
  STATE_AGREED = 2 #     
  STATE_ACCEPTED = 3 #     
  STATE_UNACCEPTED = 4 #     
  def __init__(self, leader):
    self.leader = leader
    self.state = PaxosLeaderProtocol.STATE_UNDEFINED
    self.proposalID = (-1, -1)
    self.agreecount, self.acceptcount = (0, 0)
    self.rejectcount, self.unacceptcount = (0, 0)
    self.instanceID = -1
    self.highestseen = (0, 0)
  #   
  def propose(self, value, pID, instanceID):
    self.proposalID = pID
    self.value = value
    self.instanceID = instanceID
    message = Message(Message.MSG_PROPOSE)
    message.proposalID = pID
    message.instanceID = instanceID
    message.value = value
    for server in self.leader.getAcceptors():
      message.to = server
      self.leader.sendMessage(message)
    self.state = PaxosLeaderProtocol.STATE_PROPOSED

    return self.proposalID

  # ^ 
  def doTransition(self, message):
    #  B\ fh
    if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
      if message.command == Message.MSG_ACCEPTOR_AGREE:
        self.agreecount += 1
        if self.agreecount >= self.leader.getQuorumSize(): #   
          print(u"         ,        :%s" % message.value)
          if message.value != None:
            if message.sequence[0] > self.highestseen[0] or (
                message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
              1]):
              self.value = message.value
              self.highestseen = message.sequence

            self.state = PaxosLeaderProtocol.STATE_AGREED #     
            #       
            msg = Message(Message.MSG_ACCEPT)
            msg.copyAsReply(message)
            msg.value = self.value
            msg.leaderID = msg.to
            for server in self.leader.getAcceptors():
              msg.to = server
              self.leader.sendMessage(msg)
            self.leader.notifyLeader(self, message)
          return True

        if message.command == Message.MSG_ACCEPTOR_REJECT:
          self.rejectcount += 1
          if self.rejectcount >= self.leader.getQuorumSize():
            self.state = PaxosLeaderProtocol.STATE_REJECTED
            self.leader.notifyLeader(self, message)
          return True

    if self.state == PaxosLeaderProtocol.STATE_AGREED:
      if message.command == Message.MSG_ACCEPTOR_ACCEPT: #     
        self.acceptcount += 1
        if self.acceptcount >= self.leader.getQuorumSize():
          self.state = PaxosLeaderProtocol.STATE_ACCEPTED #   
          self.leader.notifyLeader(self, message)
      if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
        self.unacceptcount += 1
        if self.unacceptcount >= self.leader.getQuorumSize():
          self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
          self.leader.notifyLeader(self, message)
테스트 모듈:

import socket, pickle, time
from Message import Message
from PaxosAcceptor import PaxosAcceptor
from PaxosLeader import PaxosLeader

if __name__ == "__main__":
  #   5    
  numclients = 5
  clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
  #      
  leader1 = PaxosLeader(54321, [54322], [c.port for c in clients])
  leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])

  #          
  leader1.start()
  leader1.setPrimary(True)
  leader2.setPrimary(True)
  leader2.start()
  for c in clients:
    c.start()

  #   ,      
  clients[0].fail()
  clients[1].fail()

  #   
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp  
  start = time.time()
  for i in range(1000):
    m = Message(Message.MSG_EXT_PROPOSE) #   
    m.value = 0 + i #     
    m.to = 54322 #        
    bytes = pickle.dumps(m) #         
    s.sendto(bytes, ("localhost", m.to)) #     

  while leader2.getNumAccpted() < 999:
    print("       %d " % leader2.getNumAccpted())
    time.sleep(1)
  print(u"  10 ")
  time.sleep(10)
  print(u"  leaders")
  leader1.stop()
  leader2.stop()
  print(u"     ")
  for c in clients:
    c.stop()
  print(u"leader1    ")
  print(leader1.getHistory())
  print(u"leader2    ")
  print(leader2.getHistory())
  end = time.time()
  print(u"    %f " % (end - start))
코드 가 확실히 길 고 어려워 보 입 니 다.pycharm 에서 이 논 리 를 보 는 것 이 좋 습 니 다.매개 변 수 를 빠르게 찾 을 수 있 습 니 다.잘못된 부분 이 있 으 면 지적 을 환영 합 니 다.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기