python 기반 Paxos 알고리즘 구현
여기 서 먼저 설명 하 겠 습 니 다.python 이라는 동적 언어 는 익숙 하지 않 은 사람 에 게 어색 할 수 있 습 니 다.자바 처럼 매개 변수 유형 이 고정 되 어 있 지 않 기 때문에 보기 에는 약간 아 플 수 있 습 니 다.이곳 환경 은 python 2.7 을 사용 합 니 다.
class Message:
# command
MSG_ACCEPTOR_AGREE = 0 #
MSG_ACCEPTOR_ACCEPT = 1 #
MSG_ACCEPTOR_REJECT = 2 # -
MSG_ACCEPTOR_UNACCEPT = 3 # -
MSG_ACCEPT = 4 #
MSG_PROPOSE = 5 #
MSG_EXT_PROPOSE = 6 #
MSG_HEARTBEAT = 7 # ,
def __init__(self, command=None):
self.command = command
# ,
def copyAsReply(self, message):
# ID # ID # #
self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
self.value = message.value #
그리고 socket,스 레 드,대기 열 을 이용 한 메시지 처리 장치 입 니 다.
# socket ,
import threading
import pickle
import socket
import queue
class MessagePump(threading.Thread):
#
class MPHelper(threading.Thread):
#
def __init__(self, owner):
self.owner = owner
threading.Thread.__init__(self)
def run(self):
while not self.owner.abort: # ,
try:
(bytes, addr) = self.owner.socket.recvfrom(2048) #
msg = pickle.loads(bytes) #
msg.source = addr[1]
self.owner.queue.put(msg) #
except Exception as e:
pass
def __init__(self, owner, port, timeout=2):
threading.Thread.__init__(self)
self.owner = owner
self.abort = False
self.timeout = 2
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000) #
self.socket.bind(("localhost", port)) # ,ip,
self.socket.settimeout(timeout) #
self.queue = queue.Queue() #
self.helper = MessagePump.MPHelper(self) #
#
def run(self):
self.helper.start() #
while not self.abort:
message = self.waitForMessage() #
self.owner.recvMessage(message) #
#
def waitForMessage(self):
try:
msg = self.queue.get(True, 3) # , 3s
return msg
except:
return None
#
def sendMessage(self, message):
bytes = pickle.dumps(message) #
address = ("localhost", message.to) # ip, (ip,port)
self.socket.sendto(bytes, address)
return True
#
def doAbort(self):
self.abort = True
메시지 처리 기 를 하나 더 추가 합 니 다.아 날로 그 메시지 의 전달,지연,가방 을 잃 어 버 립 니 다.사실 이 종 류 는 별 쓸모 가 없습니다.이것 은 아 날로 그 테스트 를 위해 준비 한 것 입 니 다.
from MessagePump import MessagePump
import random
class AdversarialMessagePump(MessagePump): #
# , , ,
def __init__(self, owner, port, timeout=2):
MessagePump.__init__(self, owner, port, timeout) #
self.messages = set() #
def waitForMessage(self):
try:
msg = self.queue.get(True, 0.1) #
self.messages.add(msg) #
except Exception as e: #
pass
# print(e)
if len(self.messages) > 0 and random.random() < 0.95: # Arbitrary!
msg = random.choice(list(self.messages)) #
self.messages.remove(msg) #
else:
msg = None
return msg
또 하 나 는 기록 류.
# InstanceRecord , 、
from PaxosLeaderProtocol import PaxosLeaderProtocol
class InstanceRecord:
def __init__(self):
self.protocols = {}
self.highestID = (-1, -1) # (port,count)
self.value = None
def addProtocol(self, protocol):
self.protocols[protocol.proposalID] = protocol
#
if protocol.proposalID[1] > self.highestID[1] or (
protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
self.highestID = protocol.proposalID #
def getProtocol(self, protocolID):
return self.protocols[protocolID]
def cleanProtocols(self):
keys = self.protocols.keys()
for k in keys:
protocol = self.protocols[k]
if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
print(" ")
del self.protocols[k]
다음은 Acceptor 의 실현 이다.
#
from MessagePump import MessagePump
from Message import Message
from InstanceRecord import InstanceRecord
from PaxosAcceptorProtocol import PaxosAcceptorProtocol
class PaxosAcceptor:
def __init__(self, port, leaders):
self.port = port
self.leaders = leaders
self.instances = {} #
self.msgPump = MessagePump(self, self.port) #
self.failed = False
#
def start(self):
self.msgPump.start()
#
def stop(self):
self.msgPump.doAbort()
#
def fail(self):
self.failed = True
def recover(self):
self.failed = False
#
def sendMessage(self, message):
self.msgPump.sendMessage(message)
# ,
def recvMessage(self, message):
if message == None:
return
if self.failed: #
return
if message.command == Message.MSG_PROPOSE: #
if message.instanceID not in self.instances:
record = InstanceRecord() #
self.instances[message.instanceID] = record
protocol = PaxosAcceptorProtocol(self) #
protocol.recvProposal(message) #
self.instances[message.instanceID].addProtocol(protocol)
else:
self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
# ,
def notifyClient(self, protocol, message):
if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: # ,
self.instances[protocol.instanceID].value = message.value #
print(u" %s" % message.value)
#
def getHighestAgreedProposal(self, instance):
return self.instances[instance].highestID # (port,count)
#
def getInstanceValue(self, instance):
return self.instances[instance].value
그럼 AcceptorProtocol 의 실현 을 살 펴 보 겠 습 니 다.
from Message import Message
class PaxosAcceptorProtocol(object):
# State variables
STATE_UNDEFINED = -1 # 0
STATE_PROPOSAL_RECEIVED = 0 #
STATE_PROPOSAL_REJECTED = 1 #
STATE_PROPOSAL_AGREED = 2 #
STATE_PROPOSAL_ACCEPTED = 3 #
STATE_PROPOSAL_UNACCEPTED = 4 #
def __init__(self, client):
self.client = client
self.state = PaxosAcceptorProtocol.STATE_UNDEFINED
# ,
def recvProposal(self, message):
if message.command == Message.MSG_PROPOSE: #
self.proposalID = message.proposalID
self.instanceID = message.instanceID
(port, count) = self.client.getHighestAgreedProposal(message.instanceID) # ,
#
#
if count < self.proposalID[1] or (count == self.proposalID[1] and port < self.proposalID[0]):
self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED #
print(" :%s, %s " % (message.instanceID, message.value))
value = self.client.getInstanceValue(message.instanceID)
msg = Message(Message.MSG_ACCEPTOR_AGREE) #
msg.copyAsReply(message)
msg.value = value
msg.sequence = (port, count)
self.client.sendMessage(msg) #
else: #
self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
return self.proposalID
else:
#
pass
#
def doTransition(self, message): # ,
if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED #
msg = Message(Message.MSG_ACCEPTOR_ACCEPT) #
msg.copyAsReply(message) #
for l in self.client.leaders:
msg.to = l
self.client.sendMessage(msg) #
self.notifyClient(message) #
return True
raise Exception(" ")
#
def notifyClient(self, message):
self.client.notifyClient(self, message)
이어서 리더 와 리더 프로 토 콜 의 실현 을 살 펴 보 겠 습 니 다.
#
import threading
import Queue
import time
from Message import Message
from MessagePump import MessagePump
from InstanceRecord import InstanceRecord
from PaxosLeaderProtocol import PaxosLeaderProtocol
class PaxosLeader:
#
class HeartbeatListener(threading.Thread):
def __init__(self, leader):
self.leader = leader
self.queue = Queue.Queue() #
self.abort = False
threading.Thread.__init__(self)
def newHB(self, message):
self.queue.put(message)
def doAbort(self):
self.abort = True
def run(self): #
elapsed = 0
while not self.abort:
s = time.time()
try:
hb = self.queue.get(True, 2)
# , ,
if hb.source > self.leader.port:
self.leader.setPrimary(False)
except:
self.leader.setPrimary(True)
#
class HeartbeatSender(threading.Thread):
def __init__(self, leader):
threading.Thread.__init__(self)
self.leader = leader
self.abort = False
def doAbort(self):
self.abort = True
def run(self):
while not self.abort:
time.sleep(1)
if self.leader.isPrimary:
msg = Message(Message.MSG_HEARTBEAT)
msg.source = self.leader.port
for leader in self.leader.leaders:
msg.to = leader
self.leader.sendMessage(msg)
def __init__(self, port, leaders=None, acceptors=None):
self.port = port
if leaders == None:
self.leaders = []
else:
self.leaders = leaders
if acceptors == None:
self.acceptors = []
else:
self.acceptors = acceptors
self.group = self.leaders + self.acceptors #
self.isPrimary = False #
self.proposalCount = 0
self.msgPump = MessagePump(self, port) #
self.instances = {}
self.hbListener = PaxosLeader.HeartbeatListener(self) #
self.hbSender = PaxosLeader.HeartbeatSender(self) #
self.highestInstance = -1 #
self.stoped = True #
self.lasttime = time.time() #
def sendMessage(self, message):
self.msgPump.sendMessage(message)
def start(self):
self.hbSender.start()
self.hbListener.start()
self.msgPump.start()
self.stoped = False
def stop(self):
self.hbSender.doAbort()
self.hbListener.doAbort()
self.msgPump.doAbort()
self.stoped = True
def setPrimary(self, primary): #
if self.isPrimary != primary:
# Only print if something's changed
if primary:
print(u" leader%s" % self.port)
else:
print(u" leader%s" % self.port)
self.isPrimary = primary
#
def getGroup(self):
return self.group
def getLeaders(self):
return self.leaders
def getAcceptors(self):
return self.acceptors
# 1/2
def getQuorumSize(self):
return (len(self.getAcceptors()) / 2) + 1
def getInstanceValue(self, instanceID):
if instanceID in self.instances:
return self.instances[instanceID].value
return None
def getHistory(self): #
return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]
#
def getNumAccpted(self):
return len([v for v in self.getHistory() if v != None])
#
def findAndFillGaps(self):
for i in range(1, self.highestInstance):
if self.getInstanceValue(i) == None:
print(" ", i)
self.newProposal(0, i)
self.lasttime = time.time()
#
def garbageCollect(self):
for i in self.instances:
self.instances[i].cleanProtocols()
#
def recvMessage(self, message):
if self.stoped:
return
if message == None:
if self.isPrimary and time.time() - self.lasttime > 15.0:
self.findAndFillGaps()
self.garbageCollect()
return
#
if message.command == Message.MSG_HEARTBEAT:
self.hbListener.newHB(message)
return True
#
if message.command == Message.MSG_EXT_PROPOSE:
print(" ", self.port, self.highestInstance)
if self.isPrimary:
self.newProposal(message.value)
return True
if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
if message.command == Message.MSG_ACCEPTOR_ACCEPT:
if message.instanceID not in self.instances:
self.instances[message.instanceID] = InstanceRecord()
record = self.instances[message.instanceID]
if message.proposalID not in record.protocols:#
protocol = PaxosLeaderProtocol(self)
protocol.state = PaxosLeaderProtocol.STATE_AGREED
protocol.proposalID = message.proposalID
protocol.instanceID = message.instanceID
protocol.value = message.value
record.addProtocol(protocol)
else:
protocol = record.getProtocol(message.proposalID)
protocol.doTransition(message)
return True
#
def newProposal(self, value, instance=None):
protocol = PaxosLeaderProtocol(self)
if instance == None: #
self.highestInstance += 1
instanceID = self.highestInstance
else:
instanceID = instance
self.proposalCount += 1
id = (self.port, self.proposalCount)
if instanceID in self.instances:
record = self.instances[instanceID]
else:
record = InstanceRecord()
self.instances[instanceID] = record
protocol.propose(value, id, instanceID)
record.addProtocol(protocol)
def notifyLeader(self, protocol, message):
if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
print(" %s %s " % (message.instanceID, message.value))
self.instances[message.instanceID].accepted = True
self.instances[message.instanceID].value = message.value
self.highestInstance = max(message.instanceID, self.highestInstance)
return
if protocol.state == PaxosLeaderProtocol.STATE_REJECTED: #
self.proposalCount = max(self.proposalCount, message.highestPID[1])
self.newProposal(message.value)
return True
if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
pass
LeaderProtocol 실현:
from Message import Message
class PaxosLeaderProtocol(object):
STATE_UNDEFINED = -1 # 0
STATE_PROPOSED = 0 #
STATE_REJECTED = 1 #
STATE_AGREED = 2 #
STATE_ACCEPTED = 3 #
STATE_UNACCEPTED = 4 #
def __init__(self, leader):
self.leader = leader
self.state = PaxosLeaderProtocol.STATE_UNDEFINED
self.proposalID = (-1, -1)
self.agreecount, self.acceptcount = (0, 0)
self.rejectcount, self.unacceptcount = (0, 0)
self.instanceID = -1
self.highestseen = (0, 0)
#
def propose(self, value, pID, instanceID):
self.proposalID = pID
self.value = value
self.instanceID = instanceID
message = Message(Message.MSG_PROPOSE)
message.proposalID = pID
message.instanceID = instanceID
message.value = value
for server in self.leader.getAcceptors():
message.to = server
self.leader.sendMessage(message)
self.state = PaxosLeaderProtocol.STATE_PROPOSED
return self.proposalID
# ^
def doTransition(self, message):
# B\ fh
if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
if message.command == Message.MSG_ACCEPTOR_AGREE:
self.agreecount += 1
if self.agreecount >= self.leader.getQuorumSize(): #
print(u" , :%s" % message.value)
if message.value != None:
if message.sequence[0] > self.highestseen[0] or (
message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
1]):
self.value = message.value
self.highestseen = message.sequence
self.state = PaxosLeaderProtocol.STATE_AGREED #
#
msg = Message(Message.MSG_ACCEPT)
msg.copyAsReply(message)
msg.value = self.value
msg.leaderID = msg.to
for server in self.leader.getAcceptors():
msg.to = server
self.leader.sendMessage(msg)
self.leader.notifyLeader(self, message)
return True
if message.command == Message.MSG_ACCEPTOR_REJECT:
self.rejectcount += 1
if self.rejectcount >= self.leader.getQuorumSize():
self.state = PaxosLeaderProtocol.STATE_REJECTED
self.leader.notifyLeader(self, message)
return True
if self.state == PaxosLeaderProtocol.STATE_AGREED:
if message.command == Message.MSG_ACCEPTOR_ACCEPT: #
self.acceptcount += 1
if self.acceptcount >= self.leader.getQuorumSize():
self.state = PaxosLeaderProtocol.STATE_ACCEPTED #
self.leader.notifyLeader(self, message)
if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
self.unacceptcount += 1
if self.unacceptcount >= self.leader.getQuorumSize():
self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
self.leader.notifyLeader(self, message)
테스트 모듈:
import socket, pickle, time
from Message import Message
from PaxosAcceptor import PaxosAcceptor
from PaxosLeader import PaxosLeader
if __name__ == "__main__":
# 5
numclients = 5
clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
#
leader1 = PaxosLeader(54321, [54322], [c.port for c in clients])
leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])
#
leader1.start()
leader1.setPrimary(True)
leader2.setPrimary(True)
leader2.start()
for c in clients:
c.start()
# ,
clients[0].fail()
clients[1].fail()
#
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp
start = time.time()
for i in range(1000):
m = Message(Message.MSG_EXT_PROPOSE) #
m.value = 0 + i #
m.to = 54322 #
bytes = pickle.dumps(m) #
s.sendto(bytes, ("localhost", m.to)) #
while leader2.getNumAccpted() < 999:
print(" %d " % leader2.getNumAccpted())
time.sleep(1)
print(u" 10 ")
time.sleep(10)
print(u" leaders")
leader1.stop()
leader2.stop()
print(u" ")
for c in clients:
c.stop()
print(u"leader1 ")
print(leader1.getHistory())
print(u"leader2 ")
print(leader2.getHistory())
end = time.time()
print(u" %f " % (end - start))
코드 가 확실히 길 고 어려워 보 입 니 다.pycharm 에서 이 논 리 를 보 는 것 이 좋 습 니 다.매개 변 수 를 빠르게 찾 을 수 있 습 니 다.잘못된 부분 이 있 으 면 지적 을 환영 합 니 다.이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
로마 숫자를 정수로 또는 그 반대로 변환그 중 하나는 로마 숫자를 정수로 변환하는 함수를 만드는 것이었고 두 번째는 그 반대를 수행하는 함수를 만드는 것이었습니다. 문자만 포함합니다'I', 'V', 'X', 'L', 'C', 'D', 'M' ; 문자열이 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.