Facebook Cassandra

2021-08-07

Cassandra的目标是构建在上百台的节点之上(可能会跨越多个不同的data center)。Cassandra用于设计满足Facebook的Inbox Search的存储需求,这要求该存储系统需要能够处理非常高的写吞吐,每天数十亿的写入,以及随着用户规模增长而扩容。并且由于用户根据地理位置可能被不同的data center服务,所以在不同的data center之间复制数据是很关键的。

Data Model

Cassandra中的表是一个分布式的、由key索引的多维map。其中:

  • value是一个高度结构化的object。

  • row key是一个string类型数据,没有大小限制(通常是16-36 bytes大小)。

  • Column可以组成一个set叫做column family,这与Bigtable很像

  • 对于一个replica中的单行读写,不管其涉及到的列有多少个,其操作都是原子的。。

Cassandra有两种column family:

  • Simple column family

  • Super column family。可被视为在一个column family之中的column family

应用可以指定Super column family和Simple column faily中的column排列顺序,包括按时间排序和按名字排序。

column family中的列都需要通过column_family:column的形式来访问,super column family中的列需要通过column_family:super_column:column的形式来访问。

API

Cassandra API包括以下三种简单的方法:

insert(table, key, rowMutation)

get(table, key, columnName)

delete(table, key, columnName)

columnName可以是一个column family中的指定的column、一个column family、一个super column family或者一个super column中的column

System Architecture

该篇论文主要集中讲述Cassandra中应用的核心的分布式技术:partitioning, replication, membership, failure handling and scaling,所有这些模块协同起来处理读写请求。

一个对于key的读/写请求被路由到Cassandra中的节点。

  • 对于写入,系统将该请求路由到所有的副本,并且等待quorum数量的副本写入完成的ack。

  • 对于读取,根据client需要的一致性保证,系统可能会将请求路由到最近的副本,或者路由到所有的副本,等待quorum数量的response。

Partitioning

Cassandra的一个关键特性是其拥有逐渐扩展的能力,这就需要能够动态的为数据在集群的一系列的节点上分区的能力。Cassandra使用一致性hash来对数据进行分区,该hash函数使用了保序hash函数。一致性hash的细节可以参考我以前写的文章数据分片

基础的一致性hash有几个挑战:

  • 每个节点在环上的位置时随机的,这会带来不同节点不统一的数据和负载分布

  • 它忽略了节点的异构性问题

面对这些问题,通常有两种解决办法:

  1. 为每个节点分配环上的多个位置

  2. 分析环上的负载情况,然后具有轻负载的节点从环上向前移动,用以减轻高负载节点的负载

Cassandra选择第二种方法,主要基于一下两点考虑:

  1. 它的设计和实现更容易驾驭

  2. 可以帮助负载均衡作出更加确定的选择

另外需要说明,每一个节点都知道系统中的所有其他节点以及其负责的数据range。

Replication

Cassandra使用Replication来实现高可用以及持久性。每个数据都被复制到N个节点上,其中N是每个Cassandra自己的replication配置。如Partitioning一节所述,每个key都会被分配到一个coordinator节点,除了将数据在本地持久化以外,coordinator还需要将数据复制到环上的N-1个节点之上。Cassandra为客户端提供了数据复制的多种不同选择,包括:Rack Unaware, Rack ware, Datacenter aware,前两者在一个data center,后者跨越多个data center。根据客户端选择的replication policy来选择副本:

  • Rack Unaware。如果客户端选择了Rack Unware,那么选择coordinator在环下游的N-1个节点作为non-coordinator副本。

  • Rack Aware和Datacenter Aware。这两种情况稍微复杂。Cassandra在集群中通过zookeeper选择一个leader节点出来,该leader节点告诉一个节点其需要作为副本的环上的range。并且leader会保证每个节点不会为超过N-1个range作副本。关于一个节点负责作为副本的range的meta信息会缓存在本地,并且持久化在zk上,这样当一个节点重启的时候可以从zk拉取该信息。我们借用了Dynamo的说法,将负责一个指定range的节点叫做preference list。需要说明的一点是,Rack Aware和Datacenter aware的区别在于前者的副本在同一个data center,而后者需要跨data center。

如上一节所述,每一个节点都知道系统中的所有其他节点,以及其负责的range。通过放宽quorum要求,即使在节点发生故障或者发生网络分区的情况下,Cassandra仍然可以提供持久化保证。Cassandra可以通过配置,让每一个row都复制到多个data center,本质上,一个key的preference list是由跨越多个datacenter的存储节点构成的,这些datacenter通过高速网络连接,这种设计让Cassandra可以在整个data center都挂掉的时候仍然可以正常提供服务。

在Cassandra中,并没有使用一致性协议来进行replication,所以Cassandra是一个最终一致性的系统

Membership

Cassandra的cluster membership是基于Scuttlebutt实现的,Scuttlebutt是一个非常高效的、反熵的Gossip协议。其突出特性是其高效的CPU利用率以及高效的gossip channel利用率。在Cassandra中,Gossip不仅应用在membership,也应用在传播其他系统相关控制状态。

Failure Detection

failure detection是一个节点用来判断其他节点up或者down的机制,用来防止尝试向已经下线的机器发送请求。Cassandra使用了Φ Accrual Failure Detector的变种。Accrual Failure Detector的思想是,不简单的使用Bool值来判断节点的up或者down,而是使用一个value来代表每个节点的怀疑度。该value是φ,该值是动态调整的,用来表示当前被监控节点的网络和负载条件。

其判断方法如下:

  1. 一个给定的Φ

  2. 每个节点维护一个窗口,用于记录gossip message从其他节点的到达间隔时间,从而获得一个时间分布,并且根据分布获取φ,

  3. 当φ > Φ时,就可以认为该主机已经宕机了。

当然这里可能会产生误判,误判的可能如下:

  • Φ=1, 10%

  • Φ=2, 1%

  • Φ=3, 0.1%

通过查阅Cassandra手册,Cassandra默认采用8。并且Cassandra建议,在不稳定的网络中可以将其提高到10-12,用于避免false failure。值大于12或者小于5都是不推荐的。

另外需要说明的一点,在Scuttlebutt的paper中,其使用的是Gaussian分布,而Cassandra采用的是指数分布,因为Cassandra认为由于gossip channel以及其对延迟的影响,指数分布更加相似。

Bootstraping

当一个节点第一次启动时:

  • 它首先选择一个随机的token,用以匹配环上的位置。为了容错的需要,该信息会被持久化到本地和zk中。

  • 随后,该信息会通过gossip传输到整个集群。这就是我们怎么知道所有节点以及其对应的环上位置的这使得所有的节点都可以转发一个请求到正确的节点

  • 读取配置,该配置中包括集群中的几个联络点,我们称这个初始的联络点叫做集群的种子(seed)。种子也可以来自一个类似于zk的服务。

Scaling the Cluster

当一个新节点加入系统时,会被分配一个token,这样便可以缓解负载较高的节点的负载。这样新的节点会分担部分先前其他节点负责的范围。Cassandra的boostrap算法是由系统中的其他节点发起,通过命令行工具或者Cassandra的web dashboard。

准备转移数据的节点通过内核到内核的copy技术将数据copy到新节点。根据运维经验,单个节点的数据传输速率可以达到40MB/sec,Cassandra还在努力通过多个副本参与并行化传输来进行改善,类似于Bittorrent技术。

Local Persistence

Cassandra依赖本地文件系统做持久化,数据以一种有助于高效地数据检索的格式存储在磁盘上。与RocksDB类似,一个写入操作包括:

  • 一次commit log写入,该写入用于持久化以及恢复使用。系统中有一个专门的磁盘用于顺序写入日志,因此可以最大化磁盘吞吐

  • 一次内存写入。内存写入执行要在commit log写入之后。当内存中的数据结构的大小(根据数据量大小和对象数量)超过一个给定的阈值,它将会把自己dump到磁盘中。这个dump写入时执行在系统中的一个磁盘上。

所有的写入是顺序的,并且会产生一个基于row key的index(block index)用于高效的检索,该index文件将会与data文件一起持久化。

经过一段时间,很多文件可能会存储在磁盘上,然后一个后台的merge操作会将这些文件合并整理成一个文件。这个过程与Bigtable的compaction过程非常类似。

对于读操作:

  • 首先查询in-memory数据结构,查询是根据从新到旧的顺序

  • 查询磁盘文件。当磁盘查询时,需要从磁盘上多个文件中查询该key。为了避免查询不包含该key的文件,针对每个文件的bloom filter需要保存在内存中,查询文件之前,需要首先查询bloom filter查看该key是否在该文件中。一个column family可能包含很多个column,当需要检索的column距离key较远时需要利用一些特殊的索引。为了避免扫描磁盘上的每一列,Cassandra维护了一份列索引来帮助我们直接定位到正确的磁盘chunk。由于指定key的列已经被序列化并写入到磁盘,因此按照每个磁盘256K的chunk boundary创建索引的。该boundary是可以配置的,但是256K大小对Cassandra来说运行的很好。

Implementation Details

单台机器上的Cassandra主要包括如下几个模块:partitioning module, the cluster membership and failure detection module and the storage engine module。所有这些模块依赖一个事件驱动模块,其将消息处理pipeline和task pipeline划分成了多个阶段。另外,该事件驱动模块是按照SEDA实现的。

Cassandra内部使用了TCP和UDP两种网络协议,其中:

  • 所有的系统控制信息都依赖于基于UDP协议的消息传输,

  • 用于replication和request routing等应用相关的消息则依赖于TCP协议传输。

请求路由模块的实现使用了一个状态机,当集群的任一节点收到请求时,状态机都会在一下几种状态之间切换:

  • 定位拥有这个key的节点

  • 将请求路由到此节点并等待响应到达

  • 如果response没有在配置的超时时间内到达,则将此请求设置为失败并返回给客户端

  • 根据时间戳计算出最新的response

  • 为任何数据不是最新的副本安排数据修复。

为了论述起见,这里不讨论故障的详细情况。系统可以被配置为同步写入或者异步写入。对于特定的需要高吞吐的系统,我们会选择异步replication。对于使用同步的情况,我们需要等待quorum数量的response返回后才会返回结果给客户端。

commit log

任何的日志系统都存在一个清除commit log的机制。在Cassandra中使用一种滚动的提交日志,在一个旧的提交日志超过一个特定的可配置大小后,就推出一个新的提交日志。在Facebook线上生产环境中,128MB的大小运行的特别好。每个commit log都有一个header,该header是一个固定大小的bit vector,其大小大于可能的column family数量。每个commit log都有一个bit vector并且会在内存中维护。对该bit vector有如下几个操作:

  • 修改

每个column family都有一个in-memory数据结构和一个磁盘上的data file。每次当一个column family的in-memory数据结构被持久化到磁盘中,在commit header中设置一个bit,用于表示该column family已经成功持久化到磁盘上。

  • 检查

每次滚动提交日志时,都会检查其bit vector,并检查之前滚动的提交日志的所有bit vector。如果发现所有的column family都持久化到了磁盘上,那么该commit log就可以清除了。

问题:既然每个column family都有独立的data file,为什么对于commit log要等所有的column family都持久化完成再清除?能否以column family为粒度进行清理?

写入到commit log的操作可以分为normal模式和fast sync模式两种。在fast sync模式中,写入commit log的数据会被缓存起来。这意味着当机器宕机时,是有可能数据丢失的。在这种模式下,in-memory数据结构被持久化到磁盘时,也是会进行缓存的。

写操作

传统的数据库并不是设计用来处理高写入吞吐的,在Cassandra中,所有向磁盘的写入都是顺序的,以最大化磁盘写吞吐。由于文件dump到磁盘后不会再修改了,所以在读取时无需进行加锁。Cassandra的服务实例的读写操作实际上都是无锁操作,因此并不需要应付基于B-Tree的数据库实现中存在的并发问题。

Cassandra是通过primary key来索引数据的。磁盘上的data file会被分成一系列的block。每个block最多包含128个key,并根据block index进行区分。block index记录block内key的相对偏移及其数据的大小。当in-memory数据结构被dump到磁盘上时,一个block index将会生成。为了快速存取,该block index同样会在内存中维护。

读操作

读操作同样会先查询in-memory数据结构,如果找到则返回给客户端,因为该in-memory数据结构肯定会包含最新的数据。如果没有找到则会以反向时间顺序去依次查找磁盘,因为这样先去查询最新的文件,一旦找到数据便可以返回给客户端。

compaction

随着时间,磁盘上的数据文件数量将会增多,Cassandra会运行一个非常类似于Bigtable系统的Compaction进程,通过它将多个文件合并成一个。merge操作是在一系列排好序的文件进行合并,系统总会对大小相近的两个文件进行compaction。例如,永远不会出现一个100GB的文件和一个50GB的文件进行compaction的情况。每隔一段时间,一个major compaction将会执行,将所有相关的文件合并成一个大的文件。compaction是一个IO密集型操作,需要对此进行大量的优化以做到不影响后续的读请求。