Cyrus Blog

FLAG{S0_H4PPY_C_U_H3R3} (>.<)

分布式入门0&1:系列目录 & CAP 原理和 Paxos 算法

本文共 3.1k 字,预计阅读时间 13 分钟。

2018 年 1 月,比特币从最高点开始下跌,本文初创于这段时间。由于种种原因,咕了一年,到了 2018 年底依旧没有写完。新年即将来临之际,我对文章内容进行了重新编排。将会按照以下顺序依次展开:

  • CAP 原理和 Paxos 算法,
  • 从 BFT 到 PBFT,
  • 从 Raft 和 ZAB 再看 Paxos
  • 分布式系统事务一致性(待定),
  • 分布式负载均衡算法(待定).

这一顺序是我的学习(和未来填坑)的顺序,我是偶然接触到 Paxos 从而接触到分布式的。

大部分相关系列文章会将 Paxos/Raft/ZAB 放在统一部分,并且按照 CAP,BFT 系列,Paxos 的顺序编排。没有关系,每篇文章都可以配合搜索引擎单独阅读,尤其是在查找特定算法时看到这个系列的初学者,自由阅读吧。

我会尽可能写出我的理解。通常我会用 Python 代码实现一下我的理解。

我只是一只玩了几年比特币却从零开始接触分布式一致性算法的萌新,目前工作和区块链 or 分布式并无关系,单纯的是填一个旧坑,充值 BTC 信仰,并希望这一系列文章能对零基础的、想了解分布式的新人们有些帮助。

最后,欢迎 dalao 指出错误。

新年快乐!梭哈愉快!

0x00 一致性算法用来做什么

CAP原理

一致性(Consistency)可用性(Availability)分区容忍性(Partition)
分区容忍性:具体指网络节点分区之后,区域间无法正常通信。我们在 0x03 Raft 算法部分会详细讲到。

很难满足怎么办

弱化一致性:允许差异化的场合
弱化可用性:需要绝对一致性,高并发下延长响应时间(Paxos、Raft算法等)或拒绝服务(金融业务等)
弱化分区容忍性:网络分区的概率小,如 ZooKeeper。

分布式系统的要求:一致性

可终止性:在有限时间内得到结果
共识性:不同节点结果完全相同
合法性:决策结果必须是其他进程的提案(proposal)
注:本文中的“达成共识”在某些情况下指的是一致性要求的三个性质同时达成,而不是仅包括共识性。

先来看一个算法吧

最基本也是初学者最难理解的一致性算法是 1990 年由 Leslie Lamport 提出的 Paxos 共识算法,也是第一个被证明的共识算法。我们从 Paxos 算法开始,逐步了解分布式一致性算法的实现过程。
需要说明的是,进程的提案者(proposer)和接受者(accepter)是松耦合的,不需要同一时间完成,一个进程也未必同时有两个角色功能。学习过程中我们将两者分开,实际生产环境中往往两个角色由一个进程担任。

0x01 Paxos算法怎么运行

中文版的 Paxos 算法运行过程讲解非常稀少,这里试图用最简单的语言和最程序化的逻辑讲解一下 Paxos 算法。

核心关键词

两阶段提交,预提案阶段的更新

第一阶段

提案者**只将 ID **递交给接受者。接受者二元组默认为 MaxID 负无穷,value 为空:

  1. 如果当前提案者的 ID 大于 MaxID,则更新 MaxID
    • 如果有提案,返回接受标志 Accept 同时返回 value,这时提案者的 value 会被更新。这种情况会因为别的提案者已经进行到了第二轮而导致。该步骤的目的是为了保证已接收的 value 不能更改,更大的 ID 而也不能更改,只能让这个 ID 的 value 变得一样。 我们称之为“预提案阶段的更新”。这个步骤非常关键。

    • 如果没有提案,返回接受标志 Accept 即可。

  2. 如果当前提案者的 ID 小于等于 MaxID,则返回拒绝标志 Refuse
    对于一个提案者而言,该阶段结束时:
  3. 每个提案者都给出了自己的 ID 给所有的接受者,他们知道了自己的提案被多少人接受了,也就是记录了此 ID。此时的提案者的 value 可能被修改,也可能不被修改。
  4. 每个接受者都已经有一个不是负无穷 MaxID 了吗?是的。因为只要有一个提案者结束了这个阶段,所有接受者都有了 MaxID。
  5. 每个接受者的 value 都是空的吗?不一定。可能有别的提案者已经给出过提案。这是“预提案阶段的更新”的产生原因。
  6. 如果有接受者的 value 不为空,提案者本来的 value 一定不在了吗?不一定,这个接受者的原 MaxID 比当前提案者要小的话,确实返回了回来,此时会有“预提案阶段的更新”。但是如果接受者的原提案 ID 比当前提案者要大,返回的只有一个 Refuse。
  7. 如果有接受者的 value 不为空,且 MaxID = 此提案者 ID,提案者本来的 value 一定不在了吗?是的,这就是“预提案阶段的更新”。

第二阶段

提案者根据自己的提案被接受的数目:

  1. 大多数接受:尝试将 ID 和 value 在此递交给所有接受的人
    • 如果接受者的 MaxID 大于等于 ID,就接受提案,锁定该接受者成功。
    • 如果接受者的 MaxID 小于 ID,拒绝提案,锁定失败。这种情况会因为别的提案者已经进行到了(下一次的)第一阶段而导致,另一个提案者正在第一阶段给出了更大的 ID。
  2. 大多数不接受:本轮提案失败,重新尝试第一阶段。
    该阶段结束时:
  3. 提案者知道了自己锁定了多少提案者,如果过半,向所有接受者广播当前提案并说明被确认接受。
  4. 如果锁定过半,一定是提案者原来的提案吗?不一定,存在“预提案阶段的更新”。
  5. 如果锁定没过半,假设没有“预提案阶段的更新”,原来的提案一定是没通过吗?不一定,可能锁定了一部分之后,另一个提案者对已锁定的和还未尝试锁定的接受者给出了更高的提案 ID,但是已锁定的会返回本提案者的 value,后面还未尝试锁定的接受者,可能无法锁定,但是依旧会被另一个提案者给出本提案者的 value。接受者实际接受的人数可能很多(部分未锁定的页用了本提案者 value),但是被另一个提案者广播确认。我们称之为“其他提案者的帮助”,下面会有案例说明。

0x02 用Python实现Paxos算法

关于 Paxos 算法的代码实现(甚至伪代码)比较难找,下面我们用简单的代码,清晰的给出 Paxos 算法的分支流程,方便于初学者理解。

提案和预提案

定义提案和预提案的类为:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Proposal:
pid = 0
pvalue = None

def __init__(self, pid, pvalue):
self.pid = pid
self.pvalue = pvalue

class PreProposal:
ppid = 0

def __init__(self, proposal):
self.ppid = proposal.pid

提案者

定义提案者的类为:

1
2
3
4
5
6
7
8
9
10
11
12
class Proposer:
name = "Proposer 0"
proposal = Proposal(0, None)
accepters = []

def __init__(self, nameid, proposal, accepters):
self.name = "Proposer %d" % nameid
self.proposal = proposal
self.accepters = accepters

def submit_proposal(self):
pass

提案行为中的预提案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# try pre-proposal
total_accept = 0
accepted_accepter = []
for accepter in self.accepters:
res = accepter.accept_pre_proposal(PreProposal(self.proposal))
if res[0] == "Accept":
total_accept += 1
accepted_accepter.append(accepter) #保留接受预提案者的集合,便于快速锁定提案
if res[1] is not None:
self.proposal.pvalue = res[1] #非空则进行“预提案阶段的更新”
print(">> Pre-Accepted and updated by %s" % self.name)
else:
print(">> Pre-Accepted by %s" % self.name)
elif res[0] == "Refuse":
print(">> Pre-Refused by %s" % self.name)
print("%s: %3d, %s\n" % (accepter.name, accepter.maxid, str(accepter.value)))

根据预提案的通过人数判断是否进行提案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

# try proposal
total_lock = 0
if total_accept > (len(self.accepters) / 2) + 1:
for accepter in accepted_accepter: #从接受提案者的集合中锁定
res = accepter.accept_proposal(self.proposal)
if res[0] == "Locked":
total_lock += 1
print(">> Locked by %s" % self.name) #锁定提案成功
elif res[0] == "Error":
print(">> Error: %s by %s" % (res[1], self.name)) #被其他提案者预提案并锁定
elif res[0] == "Refuse":
print(">> Refused by %s" % self.name)
print("%s: %3d, %s\n" % (accepter.name, accepter.maxid, str(accepter.value)))

再根据提案的所定人数判断是否进行广播:

1
2
3
4
5
6
7
8
9
10

# broadcast
if total_lock > (len(self.accepters) / 2) + 1:
for accepter in self.accepters:
res = accepter.accept_result(self.proposal)
if res[0] == "Accept":
print(">> %s by %s" % (res[1], self.name)) #广播提案给所有接受者
elif res[0] == "Error":
print(">> Error: %s by %s" % (res[1], self.name)) #一般不会出现,确保共识
print("%s: %3d, %s\n" % (accepter.name, accepter.maxid, str(accepter.value)))

接受者

定义接受者的类为:

1
2
3
4
5
6
7
8
9
10
11
class Accepter:
name = "Accepter 0"
maxid = -1
value = None
delay = 100

def __init__(self, nameid, delay = 100):
self.name = "Accepter %d" % nameid
self.maxid = -1
self.value = None
self.delay = delay

我们增加一个私有函数,模拟随机的网络延迟:

1
2
def __self_delay(self):
time.sleep(randint(0, self.delay) * 0.001)

对预提案的处理:

1
2
3
4
5
6
7
def accept_pre_proposal(self, pre_proposal):
self.__self_delay()
if pre_proposal.ppid > self.maxid:
self.maxid = pre_proposal.ppid
return ("Accept", self.value) # 同时返回提案内容,便于更新
else:
return ("Refuse",)

对提案的处理:

1
2
3
4
5
6
7
8
9
10
def accept_proposal(self, proposal):
self.__self_delay()
if proposal.pid >= self.maxid:
if self.value is None:
self.value = proposal.pvalue
return ("Locked",)
else:
return ("Error", "Can not edited at accept_proposal") # 已经被锁定时会出现
else:
return ("Refuse",)

对广播的最终提案的处理:

1
2
3
4
5
6
7
8
9
10
def accept_result(self, proposal):
self.__self_delay()
if self.value is None:
self.value = proposal.pvalue
return ("Accept", "Broadcast accept") # 广播到空提案
else:
if self.value == proposal.pvalue:
return ("Accept", "Broadcast already accept") # 广播到已锁定提案
else:
return ("Error", "Can not edited at accept_result") #广播到冲突的提案,一般不会出现

此时我们已经完成整个 Paxos 算法模型的搭建,我们通过一个 demo 来尝试运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def demo(n):

def make_proposal(proposer):
proposer.submit_proposal()

accepters = []
for i in range(0, n):
accepters.append(Accepter(i))

proposers = []
for i in range(0,n):
pid = randint(0, n * 100)
pvalue = "Content-%d" % randint(10000, 100000)
proposers.append(Proposer(i, Proposal(pid, pvalue), accepters))
print("Proposer %d: %3d, %s" % (i, pid, pvalue))
print()

threads = []
for i in range(0, n):
proposer = proposers[i]
threads.append(Thread(target=make_proposal, args=(proposer,)))
start_time = time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
end_time = time()
print("Time: %.6fs" % (end_time - start_time))
for accepter in accepters:
print("%s: %3d, %s" % (accepter.name, accepter.maxid, str(accepter.value)))

这个 demo 会给出详细的运行过程,以及相关的描述。我们这里看一个 n = 3 时非常典型的输出作为案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
Proposer 0:  17, Content-47679
Proposer 1: 28, Content-79747
Proposer 2: 74, Content-52168
>> Pre-Accepted by Proposer 0
Accepter 0: 17, None
>> Pre-Accepted by Proposer 1
Accepter 0: 28, None
>> Pre-Accepted by Proposer 0
Accepter 1: 17, None
>> Pre-Accepted by Proposer 1
Accepter 1: 28, None
>> Pre-Accepted by Proposer 0
Accepter 2: 17, None
>> Pre-Accepted by Proposer 1
Accepter 2: 28, None

# 此时 Proposer 0 和 Proposer 1 都未出局

# 但是显然 Proposer 0 的所有提案会在第二阶段被拒绝
>> Refused by Proposer 0
Accepter 0: 28, None
>> Refused by Proposer 0
Accepter 1: 28, None
>> Refused by Proposer 0
Accepter 2: 28, None

# Proposer 0 的所有提案在第二阶段被拒绝
>> Locked by Proposer 1
Accepter 0: 28, Content-79747

# 此时 Proposer 1 锁定了 Accepter 0

# 但是被 Proposer 2 插入
>> Pre-Accepted and updated by Proposer 2
Accepter 0: 74, Content-79747

# Accepter 0 已被锁定

# 把 Proposer 1 的提案返回给 Proposer 2

# Proposer 2 更新了提案,即“预提案阶段的更新”
>> Pre-Accepted by Proposer 2
Accepter 1: 74, None
>> Refused by Proposer 1
Accepter 1: 74, None
>> Pre-Accepted by Proposer 2
Accepter 2: 74, None
>> Refused by Proposer 1
Accepter 2: 74, None

# 此时 Proposer 1 已经无法锁定后两个提案

# 因为 Proposer 2 给出了更大的提案 ID
>> Error: Can not edited at accept_proposal by Proposer 2
Accepter 0: 74, Content-79747

# Proposer 2 无法更改已经被锁定的 Accepter 0
>> Locked by Proposer 2
Accepter 1: 74, Content-79747
>> Locked by Proposer 2
Accepter 2: 74, Content-79747

# Proposer 2 成功锁定后两个 Accepter
>> Broadcast already accept by Proposer 0
Accepter 0: 28, Content-79747
>> Broadcast already accept by Proposer 0
Accepter 1: 74, Content-79747
>> Broadcast already accept by Proposer 0
Accepter 2: 74, Content-79747

# Proposer 2 完成广播,但是其内容是 Proposer 1 的提案

# 即“其他提案者的帮助”
Time: 0.573580s
Accepter 0: 28, Content-79747
Accepter 1: 74, Content-79747
Accepter 2: 74, Content-79747

完整的代码可以在 Github 查看。

0x03 Reference

Gitbook:区块链技术指南
知乎问题:如何浅显易懂地解说 Paxos 的算法?