B-Tree的Rust实现及其原理

本文基于用 Rust 实现的源码 b-tree 进行撰写

B-Tree 介绍

B-Tree 特例

在实现 B-Tree 之前,需要先了解下 2-32-3-42-4 Tree,因为它们都是 B-Tree 的特例

那么什么是 2-32-3-42-4 Tree 呢? 简单点来说,它们都是一种 多路查找树(Multiway Trees)

这些 Tree 中的 2、3、4 的意思如下

2-3 的意思是 Tree 中包含 2 节点3 节点
2-3-4 的意思是 Tree 中包含 2 节点3 节点4 节点
2-4 的意思是 Tree 中包含 2 节点4 节点

那么 2 节点3 节点 以及 4 节点 是什么?

  • 2 节点

    就是这个 2 节点 只包含 1 个 element/key2 个 children,对于这个 2 节点 来说,节点对应的值有如下的关系

    左子树节点值 < 2 节点 值 < 右子树节点值

    这个 2 节点 要么有 2 个 children,要么有 0 个 children

  • 3 节点

    就是这个 3 节点 只包含 一小一大 2 个 element/key3 个 children,对于这个 3 节点 来说,节点对应的值有如下的关系

    左子树节点值 < 3 节点 较小值 < 3 节点 较大值 < 右子树节点值

    这个 3 节点 要么有 3 个 children,要么有 0 个 children

  • 4 节点

    就是这个 4 节点 只包含 一小一中一大 3 个 element/key4 个 children,对于这个 4 节点 来说,节点对应的值有如下的关系

    左子树节点值 < 4 节点 最小值 < 4 节点 中间值 < 4 节点 最大值 < 右子树节点值

    这个 4 节点 要么有 4 个 children,要么有 0 个 children

可以看到 234 代表的就是这个节点 element 的数量 + 1 以及这个节点所能拥有的 children 的数量

比如下面这张图就是一个典型的 2-3 Tree 结构

B-Tree 一些重要的性质

对于 B-Tree 来讲,它有 3 种节点

  1. Internal nodes,也被称为 Inner nodes,就是除了 root nodeLeaf nodes 的那些节点
  2. root node,顾名思义就是根节点
  3. Leaf nodes,顾名思义就是叶子节点,所谓的叶子节点,就是指没有 children 的那些节点

比如上面的 2-3 Tree 结构的图,其对应的节点为

在 Knuth 对于 B-Tree 的定义中,第 1 条就指定了,B-Tree 节点的 children 的数量,即

节点最多只能拥有 m 个 children(无论这个节点是 Internal nodesroot node 还是 Leaf nodes)

在这个大前题下,根据 Knuth 对 B-Tree 的定义,它接下来几个性质是

  1. 除了 root node 之外,每个 Internal nodes 都至少有 m/2(向上取整) 个 children
  2. Leaf nodes 都至少有 2 个 children
  3. Leaf nodes 都有 k 个 children 和 k-1 个 key
  4. 所有的叶子节点(Leaf nodes)都在同一层级,它们不保存信息,也没有 children

其中比较重要的还是第 1 条,这个 m/2 很关键

B-Tree 实现

B-Tree 的数据结构

如果用 Rust 来实现 B-Tree,那么可以构造如下的结构

#[derive(Clone, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
pub struct BTree<T: Clone + PartialOrd> {
max_children_amount: usize,
B: usize,
n: usize,
root_index: usize,
bs: BlockStore<Node<T>>,
}

其中

  1. max_children_amount: 每个 node 的 children 的最大数量,也就是上文提到的 m
  2. B: max_children_amount / 2,这个很重要,用来判断节点后面需不需要 merge
  3. n: 在 B-Tree 中总共存放的 key 的数量
  4. root_index: 根节点索引
  5. bs: 存放 node 的堆

注意: 这里并不对 BlockStore 展开讲,因为它本身就是一个基于 Array 实现的一个 store,感兴趣的可以看源码 BlockStore 以及 ArrayStack,这里可以看作是一个读写 B-Tree 数据的地方

而对于 Node,则是如下的数据结构

#[derive(Clone, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
struct Node<T: Clone + PartialOrd> {
id: usize,
keys: Box<[Option<T>]>,
children: Box<[i32]>,
}
  1. id: 存放在 BlockStore 这个 store 中的一个标识符 id,通过这个 id 可以读取到在 BlockStore 中对应的 Node,同时也方便对 Node 进行更新操作。每次在 BlockStore 中存储一个 Node 时都会返回一个 id
  2. keys: 用 Box 指针包裹的一个 Option 的数组。keys 属于 B-Tree 的关键数据
  3. children: 即这个 Nodechildren,同样用 Box 指针包裹的一个 i32 的数组,数组里面存放的就是下一个层级 Nodeid如果当前节点的某个 key 下没有 children,那么这个 key 对应的 children[i] 的值为 -1

这里的一个疑问是,为什么 B-Tree 实现的数据结构中,它不像 二叉搜索树 一样,Node 维护一个指向 parent 的指针(二叉搜索树在删除的时候会用到 parent 指针)?

答案是

  1. B-Tree 中,没有 parent 指针,用递归实现 addremove 就会很方便
  2. B-Tree 有个操作叫做 split,即一个 parent 节点可能会被 split 成 3 个(上文中 max_children_amount = 2 这种情况下)新的节点: 新的 parent 节点、新的 parent 子节点 u 以及另一个新的 parent 子节点 w,对于新子节点 uw 来说,如果 Node 数据结构中存在一个 parent 指针,那么对于 uw 的 children 节点来讲,它们也需要更改它们的 parent 指针,以正确地指向 uw。这就会增加额外的内存访问的次数,max_children_amount 值小这种开销可以接受,但是 max_children_amount 值一大这种开销就很可观了,会降低 B-Tree 操作的效率

find 实现

BlockStore

在实现 find 之前,先简单的说下 BlockStore

每当 B-Tree 执行 add 操作或者节点 Node 执行 split 操作,成功添加了一个新节点 Node 时,就会调用 BlockStorenew_block 方法,代码如下

impl<T: Clone> BlockStore<T> {
...
pub fn new_block(&mut self, block: T) -> usize {
...
let id = self.blocks.size();
self.blocks.add(id, block);
id
}
...
}

可以看到,blocks 的作用很像是数组(或者说栈),把最新的值加入进去,返回的是当前最新的值在这个 blocks “数组”的位置

B-Tree find

对于 find 而言,它的逻辑也很简单

fn find(&self, x: &T) -> Option<T> {
let mut z = None;
// 根节点 index
let mut ui = self.root_index as i32;
while ui >= 0 {
// 从根节点开始找
let u = self.bs.read_block(ui as usize)?;
let i = Self::find_it(&u.keys, &x);
// 找到返回
if i < 0 {
return u.keys[(-(i + 1)) as usize].clone();
}
// 没找到继续找
if u.keys[i as usize].is_some() {
z = u.keys[i as usize].clone()
}

// 更新 children 指针
ui = u.children[i as usize];
}
z
}

逻辑是这样的

  1. 从根节点开始找
  2. 调用 find_it 开始二分查找,找到了就返回 -m - 1,没找到返回的是 lom 就是 mid,lo 就是 low,熟悉二分查找的都知道 mid = (low + high) / 2
  3. 没找到返回的就是 lo,需要重新沿着某个 children 一路往下找,比如在图中

    • 要找 0005,从根节点对应的 keys0004 开始找,调用 find_it 后,返回的是 1,即从 0004 节点的 children[1] 开始找

    • 当前 children[1] 对应的节点的 keys[0006, 0008],再调用 find_it,返回的是 0,即从 [0006, 0008] 节点的 children[0] 开始找

    • 当前 children[0] 对应的节点的 keys[0005],再调用 find_it,返回的是 -0 - 1,最终返回 keys[-(-0 -1 + 1)]keys[0],成功返回 0005

B-Tree find_it

那么,find_it 代码是怎么样的?

fn find_it(a: &[Option<T>], x: &T) -> i32 {
let mut lo = 0;
let mut hi = a.len();
// 二分查找
while hi != lo {
let m = (hi + lo) / 2;
match &a[m] {
// None 区域,继续找,直到 hi == lo
None => hi = m,
Some(v) if x < v => hi = m,
Some(v) if x > v => lo = m + 1,
// 如果 x 正好在数组 a 中,就返回 -m-1
_ => return -(m as i32) - 1,
}
}
// 找到需要插入的位置,返回 lo
lo as i32
}

很经典的二分查找,用一张图就可以表示这个过程

图中画 - 的位置就表示 None

add 实现

B-Tree add

add 实现主要还是依靠 递归 来实现

fn add(&mut self, x: T) -> bool {
// 注意这里传入的是根节点 root_index
match self.add_recursive(x, self.root_index) {
Ok(w) => {
// 执行递归添加之后
// 如果 w 不是 None,说明 B-Tree 执行 add 之后,发生了 split 操作
// w 是根节点执行 add 之后 split 出来的新节点
// 此时根节点需要做对应的更新
if let Some(mut w) = w {
// w 需要把它的第一个 key 拿出来作为新的根节点的 key
let x = w.remove(0);

// new 一个新节点作为新的根节点
let mut new_root = Node::new(self);

// w 需要把它的第一个 key 拿出来作为新的根节点的 key
new_root.keys[0] = x;

// 新根节点 children[0] 指向的是旧根节点
new_root.children[0] = self.root_index as i32;
// 新根节点 children[1] 指向的是 w
new_root.children[1] = w.id as i32;

// 更新新根节点 index
self.root_index = new_root.id;

// 更新 w 节点和新根节点在内存中的信息
self.bs.write_block(w.id, w);
self.bs.write_block(self.root_index, new_root);
}
// 新加了一个 key,当然 size 要 +1
self.n += 1;
true
}
Err(()) => false,
}
}

整体的过程如下图

max_children_amount = 2 时,假如根节点对应的 keys[0001, 0002]

当根节点 add0003 时,根节点对应的 keys 变化为

现在根节点对应的 keys 数量为 3 个,太多了,需要 split

对应于上面代码就是,根节点执行 split 后有两个新节点

  • keys[0001] 的节点,注意此时该节点的 root_index 还是指向旧的根节点的位置
  • keys[0002, 0003] 的节点,即 w

那么新的根节点的 keys 是什么呢? 就是 [0002],也就是 w 节点的 keys第 0 个

然后再通过改变新根节点的 children[0]children[1] 让它们分别指向 [0001][0003] 即可完成新根节点的替换

B-Tree add_recursive

当然最重点的代码还是 add_recursive 这个函数,看函数名可以知道,就是对 add 进行递归操作,那么怎么做递归呢?下面来看下代码

fn add_recursive(&mut self, mut x: T, ui: usize) -> Result<Option<Node<T>>, ()> {
// 从 store 中的 u id 位置读取出 u 节点
if let Some(mut u) = self.bs.read_block(ui) {
// 在 u 里面的 keys 中,查找 x 应该要放入到 u.keys 中的位置
let i = Self::find_it(&u.keys, &x);

// 如果 x 已经在 u 的 keys 中,那么不需要 add,直接返回 Err
if i < 0 {
return Err(());
}

// 如果要放入 x 的 u 节点的 children[i] 是一个 leaf 节点
// 说明 u 节点未满(not full)
if u.children[i as usize] < 0 {
// 直接放入到 u 节点中
u.add(x, -1);
// 更新这个 u 节点的信息
self.bs.write_block(u.id, u.clone());
} else {
// 如果要放入 x 的 u 节点的 children[i] 不是一个 leaf 节点
// 说明 u 节点已满(is full)
// 就需要深度递归到 u 的子节点去找能放入 x 的节点

// 在 u 节点的某一个子节点 u' 递归地把 x 添加进去
// 此时可能会生成一个新节点 w,上一次递归时 split 生成的 w
let w = self.add_recursive(x, u.children[i as usize] as usize)?;

// 如果真的生成了,说明 u' 被 split 过了
// w 节点是 u' 节点的一半
if let Some(mut w) = w {
// 这个时候 u 节点成为 w 节点 和已经 split 的 u' 节点的父节点,u 节点需要拿走 w 节点的第一个 key
x = w.remove(0).unwrap();
u.add(x, w.id as i32);

// 更新这个 w 和 u 节点的信息
self.bs.write_block(w.id, w);
self.bs.write_block(u.id, u.clone());
}
}

// 如果 u 满了,说明需要对 u 这个节点进行 split
if u.is_full() {
// 返回 split 之后的节点
Ok(u.split(self))
} else {
// 如果 u 未满,直接返回 None
Ok(None)
}

} else {
Err(())
}
}

对于 add_recursive 操作来说,首先需要确认的一件事情是: 要执行 add 操作的 x 需要加在哪个节点的哪个子树上?,对于某个节点 Node 上的 keys 数组来讲,要加入的 x,有 2 种情况

  1. 如果 x 比 keys 中某个 key 的值要小,那么它应该尽可能往这个 key 的左边的 children 去递归
  2. 如果 x 比 keys 中某个 key 的值要大,那么它应该尽可能往这个 key 的右边的 children 去递归

为什么? 因为我们的 keys 的值的顺序是从小到大排列的,这也是我们在实现 find 时能使用二分查找的原因

用图来解释上面的代码就是,假如此时 max_children_amount = 4,数据状态如下图所示

此时如果要加入 0015,那么它会先从根节点对应的 keys[0002, 0005, 0008, 0011] 开始

此时代码中的 u 节点即为 [0002, 0005, 0008, 0011],但 经过二分查找后 ,它发现 0015 > 0011,那么 应该要从 0011 右边的 children 去寻找(这里出现了向下递归)

此时找到的节点对应的 keys[0012, 0013, 0014],然后在这个 keys 中做 二分查找,它发现 [0014] 这个位置 2 对应的 children[2] 的值是 -1,说明 [0012, 0013, 0014] 这个节点是 未满

这个时候直接在这个节点的 keys 中插入 0015 变成 [0012, 0013, 0014, 0015]

那么,递归时 split 是什么样子呢?

比如现在继续加入 0016

依然是从根节点开始寻找要插入的节点

经过 二分查找后发现,它发现 0016 > 0011,那么 应该要从 0011 右边的 children 去寻找(这里出现了向下递归)

此时找到的节点对应的 keys[0012, 0013, 0014,0015],然后在这个 keys 中做 二分查找,它发现 [0015] 这个位置 3 对应的 children[3] 的值是 -1,说明 [0012, 0013, 0014, 0015] 这个节点是 “未满” 的(严格意义上这个其实已经满了,但是后面会有 is_full 的判断条件)

这个时候直接在这个节点的 keys 中插入 0016 变成 [0012, 0013, 0014, 0015, 0016]

这个时候节点 已满,开始做 split

经过 split 操作后,原来的节点的 keys: [0012, 0013, 0014, 0015, 0016] 会被拆分成

  1. [0012, 0013]: u' 节点
  2. [0014, 0015, 0016]: w 节点

而此时的 u 节点是 [0002, 0005, 0008, 0011]u 节点需要加入 w 节点的 第 1 个 key,然后变成下图

此时 u 节点变为了 [0002, 0005, 0008, 0011, 0014]u 节点 已满,开始做 split,同样的 u 节点 keys 会继续拆分成

  1. [0002, 0005]: u'' 节点
  2. [0008, 0011, 0014]: w' 节点

到最后,又回到了最开始说 B-Tree add 那一小节,将 w' 节点的 第 1 个 key 取出作为根节点,它的 children[0][0002, 0005],它的 children[1][0011, 0014],如下图

Node split

对于 Node 来讲,执行 split 的逻辑很简单

// 对该 Node 砍掉一半的信息
fn split(&mut self, t: &mut BTree<T>) -> Option<Node<T>> {
// 先生成一个新的 Node 节点
let mut w = Self::new(t);

// 取中间的 key
let j = self.keys.len() / 2;

// 处理 key
// 复制中间的 key 的后面的 key 给这个新 Node 节点
for (i, key) in self.keys[j..].iter_mut().enumerate() {
w.keys[i] = key.take();
}

// 处理 children
// 复制中间的 children 的后面的 children 给这个新 Node 节点
for (i, chd) in self.children[(j + 1)..].iter_mut().enumerate() {
w.children[i] = *chd;
*chd = -1;
}

// 一分为二,一部分 split 的节点留给自己,即 self
t.bs.write_block(self.id, self.clone());

// 另一部分给 w
Some(w)
}

核心逻辑就是将一个数组进行拆分,返回拆分出来的那部分,即 w,剩下的留给自己,即 self

比如将 [0002, 0005, 0008, 0011, 0014] 拆分成 [0002, 0005](对应 self) 以及 [0008, 0011, 0014](对应 w)

remove 实现

B-Tree remove

fn remove(&mut self, x: &T) -> Option<T> {
// 从根节点开始做递归遍历删除
match self.remove_recursive(x, self.root_index as i32) {
Some(y) => {
// 删除成功
self.n -= 1;
let r = self.bs.read_block(self.root_index);
if let Some(r) = r {
// 根节点 Node 没有 key 但是整个 B-Tree 有其他 Node 有 key
if r.size() == 0 && self.n > 0 {
// 就需要更新根节点 index
self.root_index = r.children[0] as usize;
}
}
Some(y)
}
None => None,
}
}

比如像 max_children_amount = 2 这种情况

假如要删除节点 [0002]

删除后变成

这个时候如果删除节点 [0001],最终会变成

此时,只剩下 [0000] 这个节点,但是此时 self.root_index 还是指向之前 [0001] 的位置(之前 [0001] 节点的 children[0] 指向的就是 [0000] 节点),,所以此时需要更新根节点的位置,即 self.root_index = r.children[0] as usize;

当然,上面的例子是先删除 [0002] 后删除 [0001],如果是 先删除 [0000] 后删除 [0001],则不需要更新根节点位置,因为此时 self.root_index 还指向之前 [0001] 的位置,然而这个位置已经放入了 [0002] 这个 keys,此时 r.size() == 1self.n == 1

也就是说,对于根节点来讲,除非是整个 B-Tree 删除到只剩下 1 个 key,否则 root_index 是不用变的,对于我们实现的 B-Tree 来讲,内部会有算法自动调整其他的 key 到根节点的位置

B-Tree remove_recursive

对于 remove_recursive 来说,也是递归删除

fn remove_recursive(&mut self, x: &T, ui: i32) -> Option<T> {
// 递归到 Leaf nodes 就 return
// 因为 Leaf nodes 的 children 没必要进行递归删除了
if ui < 0 {
return None;
}
let mut u = self.bs.read_block(ui as usize);
match u {
Some(ref mut u) => {
// 首先要找到要删除的 x
let mut i = Self::find_it(&u.keys, x);
// i < 0 表示找到了要删除的 x
if i < 0 {
// 找到的 x 的位置是 -m-1,这里先把它位置 Abs
i = -(i + 1);
// 如果 x 是在 leaf 节点上
if u.is_leaf() {
// 直接删除即可
let y = u.remove(i as usize);
self.bs.write_block(u.id, u.clone());
y
// 如果 x 不是在 leaf 节点上,而是在 internal 或者 root 节点上
} else {
// 先在 u 节点的某个 children 节点 u' 中删除最小的那个值 x',位置是 i+1
// x' 是 比 x 大的最小值,所以是选择 children[i+1]
let x = self.remove_smallest(u.children[i as usize + 1]);

// 删除 x
let y = u.keys[i as usize].take();

// 原来要删除 x 的位置,替换为 x'
u.keys[i as usize] = x;

self.bs.write_block(u.id, u.clone());

// 确保 u 节点的 children[i+1] 至少还有 B-1 个 key
self.check_underflow(u, i as usize + 1);

y
}
// i >= 0 表示没找到要删除的 x
} else {
// 如果选择 children[i] 则是比 x 小的值
// 从 u 节点的 children[i] 节点处递归删除
let y = self.remove_recursive(x, u.children[i as usize]);
if y.is_some() {
// 删除成功
// 确保 u 节点的 children[i] 至少还有 B-1 个 key
self.check_underflow(u, i as usize);
y
} else {
None
}
}
}
None => None,
}
}

由以上代码可知,递归的逻辑其实也比较简单

  1. 递归的终止条件是什么? 终止条件应该是,如果递归到了 Leaf nodes 的 children,那么就应该终止,因为 Leaf nodes 的 children 没有任何信息
  2. 递归到某个 Node 时,需要先查找要删除的 x 在不在该 Node 对应的 keys 中? 如果不在,需要继续递归。注意此时代码中 find_it 返回的 i 已经 包含了应该要从当前 Node 对应的某个 key 的哪个方向进行遍历,如果 x 对于这个 key 来说比较小,那么从这个 key 的左边寻找;反之则从右边寻找
  3. 如果在该 Node 对应的 keys 中,那么需要先确定这个 Node 是否属于 Leaf nodes? 如果是,那么直接删除即可
  4. 如果这个 Node 不属于 Leaf nodes,那么说明这个 Node 可能属于 Internal nodes 或者 root node,但无论是这 2 种的哪一种,都需要做如下步骤

    • Nodechildren[i+1] 中遍历,寻找刚好能大于 x最小值 x',将它删除掉
    • Nodekeys 中删除掉 x
    • 原来在 Nodekeys 中删除掉 x 的位置,替代为 x'

      比如当 max_children_amount = 3 时,在 B-Tree 中有如下的数据

      假如要删除的是 [0007] 即根节点,刚好大于 [0007] 的是哪个呢

      [0008]。什么? 难道 remove_smallest 这个函数做的也是递归删除吗?
      是的,它经历了如下的过程

      那么 remove_smallest 函数的实现细节是怎样的?

B-Tree remove_smallest

fn remove_smallest(&mut self, ui: i32) -> Option<T> {
// 读取节点信息
let mut u = self.bs.read_block(ui as usize);
if let Some(ref mut u) = u {
// 如果读取到的节点是 leaf 节点,直接删除即可
if u.is_leaf() {
// 注意这里要删除最小的,那么只能是删除 u 节点第一个 key
let y = u.remove(0);
self.bs.write_block(u.id, u.clone());
y
// 如果读取到的节点不是 leaf 节点,即是 internal 节点,需要做递归删除
} else {
// 注意这里要删除最小的,那么只能是 u 节点的 children[0] 开始递归遍历
let y = self.remove_smallest(u.children[0]);
// 确保 u 节点的 第 0 个子节点至少还有 B-1 个 key
self.check_underflow(u, 0);
y
}
} else {
None
}
}

逻辑也很简单,即

  1. 如果已经遍历到 Leaf nodes 节点了,那么直接删除这个 Leaf nodes 节点的 第一个 key,因为 第一个 keyNodekeys 总是最小的
  2. 如果还没遍历到 Leaf nodes 节点,那么继续从该 Node 节点的 children[0] 开始遍历,因为 第一个 childrenNode 对应的 key 中总是最小的

这里可以看到很多地方都调用到了 Noderemove 函数,那么它是怎么实现的?

Node remove

fn remove(&mut self, i: usize) -> Option<T> {
let n = self.keys.len();
// 把 keys[i] 变成 None
let y = self.keys.get_mut(i)?.take();
// keys 整体左移一位
self.keys[i..n].rotate_left(1);
y
}

可以看到,因为 keys 本身就是一个数组,那么只需要删除掉对应位置 ikey,然后 ikeys 末端的数据 整体左移 1 位,即完成了删除的目的,逻辑非常简单

B-Tree check_underflow

同样的,这里可以看到很多地方都调用到了 Nodecheck_underflow 函数,那么它有什么用呢?

其实这个函数的作用,如同注释一样,是 确保某个 Node 节点的第 i 个子节点至少还有 B-1 个 key,这里的 B 也就是之前提到的 max_children_amount / 2,跟能不能 merge 有关

代码实现如下

fn check_underflow(&mut self, u: &mut Node<T>, i: usize) {
// 叶子节点没必要做 check
if u.children[i] < 0 {
return;
}
if i == 0 {
// 确保 u 节点的 第 0 个子节点至少还有 B-1 个 key
self.check_underflow_zero(u, i);
} else {
// 确保 u 节点的 第 i 个子节点至少还有 B-1 个 key
self.check_underflow_nonzero(u, i);
}
}

为什么 u.children[i] < 0 就能判断是否是 Leaf nodes 即叶子节点? 因为对于 B-Tree 来讲,所有的叶子节点(Leaf nodes)都在同一层级,它们不保存信息,也没有 children,而对于非 Leaf nodes 的节点来说,由于保证了每个节点都会有 children,所以不可能存在 u.children[i] < 0 的情况

对于整个 check_underflow,它的核心逻辑是: 如果在删除过程中,某个节点的 key 删着删着 keys 个数太少了,比如小于 B - 1 个,这个时候要么叫这个节点的兄弟节点跟它 merge 合并成一个新节点,要么从兄弟节点那里 borrow 借一些节点过来以满足 B-Tree 的要求

比如下图

  • merge 节点

    $ \Downarrow$

  • borrow 节点

    $ \Downarrow$

注意上图中 key = m 的位置变化

这里有 2 种情况

  1. 如果 u 节点的第 0childrenw 节点的 keys 个数不够了,那么它只能同它 右边的兄弟节点 merge 成一个新节点或者 borrow 一些 keys 过来
  2. 如果 u 节点的 i(i > 0)childrenw 节点的 keys 个数不够了,那么它 可以 同它 左边的兄弟节点 merge 成一个新节点或者 borrow 一些 keys 过来

注意这里 情况 2 我用的词是 可以,其实也可以用 右边的兄弟节点,在代码中主要就是 u.children[i-1] 配合 shift_lru.children[i+1] 配合 shift_rl 的区别。这里主要采取的还是用 左边的兄弟节点 来做

对于 check_underflow_zero

fn check_underflow_zero(&mut self, u: &mut Node<T>, i: usize) {
// i = 0
// u 节点的第 0 个子节点,w
if let Some(ref mut w) = self.bs.read_block(u.children[i] as usize) {
// 如果 w 中拥有的 key 的数量 < B-1
if w.size() < self.B - 1 {
// w 的兄弟节点 v,节点顺序为 [w, v, ...]
if let Some(ref mut v) = self.bs.read_block(u.children[i + 1] as usize) {
// 如果 v 节点有足够多的 key,至少 > B 个
if v.size() > self.B {
// 那么可以从 v 向 w 移动一些 key,保证 v 和 w 都至少有 B-1 个 key
// i 是 0,即为 w 所在位置
self.shift_rl(u, i, v, w);
} else {
// 如果 v 节点的 key 都不够,则合并 v 和 w 节点,保证 v 和 w 都至少有 B-1 个 key
// i 是 0,即为 w 所在位置
// 注意这里参数是反的
self.merge(u, i, w, v);
}
}
}
}
}

对于 check_underflow_nonzero

fn check_underflow_nonzero(&mut self, u: &mut Node<T>, i: usize) {
// u 节点的第 i 个子节点,w
if let Some(ref mut w) = self.bs.read_block(u.children[i] as usize) {
// 如果 w 中拥有的 key 的数量 < B-1
if w.size() < self.B - 1 {
// w 的兄弟节点 v,节点顺序为 [..., v, w, ...]
if let Some(ref mut v) = self.bs.read_block(u.children[i - 1] as usize) {
// 如果 v 节点有足够多的 key,至少 > B 个
if v.size() > self.B {
// 那么可以从 v 向 w 移动一些 key,保证 v 和 w 都至少有 B-1 个 key
// i-1,即为 v 所在位置
self.shift_lr(u, i - 1, v, w);
} else {
// 如果 v 节点的 key 都不够,则合并 v 和 w 节点,保证 v 和 w 都至少有 B-1 个 key
// i-1,即为 v 所在位置
self.merge(u, i - 1, v, w);
}
}
}
}
}

其中

  • shift_rl: 即从右边的兄弟节点 borrow 一些 keyschildren左边的节点
  • shift_lr: 即从左边的兄弟节点 borrow 一些 keyschildren右边的节点
  • merge: 即合并两个节点成一个新节点

B-Tree shift_rl

代码以及注释如下

// w 的兄弟节点 v,节点顺序为 [w, v, ...]
// 从 w <- v
// u 是 v 和 w 的父节点
// 这里 i 传的为 w 的位置
// i = 0
fn shift_rl(&mut self, u: &mut Node<T>, i: usize, v: &mut Node<T>, w: &mut Node<T>) {
let sv = v.size();
let sw = w.size();

// 要从 v -> w 移动的 key 数量,其实就是 (sv - sw) / 2
let shift = (sw + sv) / 2 - sw;

// 这里就不需要腾出空间了,因为空间是有的

// 在 w 的 sw 的位置,u 需要把原来指向 w 的那个 key 给 w
w.keys[sw] = u.keys[i].take();

// 然后开始移动 key
// 开始移动从 v -> w 移动 key
for (i, key) in v.keys[0..(shift - 1)].iter_mut().enumerate() {
w.keys[sw + 1 + i] = key.take();
}
// children 也要移动
for (i, chd) in v.children[0..shift].iter_mut().enumerate() {
w.children[sw + 1 + i] = *chd;
*chd = -1;
}

// 移动完成
// 更新 u 的 key
// 在 v 的 shift - 1 的位置,v 需要把剩下的第一个 key 给 u
u.keys[i] = v.keys[shift - 1].take();

// 移动完成后,shift 前面的位置
// keys 都变成 None
// children 都是 -1
// 需要把后面的数据 rotate 到前面来
for i in 0..(self.max_children_amount - shift) {
v.keys.swap(i, shift + i);
}
// 然后为了安全起见,把剩下的再次设为 None
for key in v.keys[(sv - shift)..self.max_children_amount].iter_mut() {
key.take();
}

// children 的逻辑也是一样
// 需要把后面的数据 rotate 到前面来
for i in 0..(self.max_children_amount - shift + 1) {
v.children.swap(i, shift + i);
}
// 然后为了安全起见,把剩下的再次设为 -1
for chd in v.children[(sv - shift + 1)..(self.max_children_amount + 1)].iter_mut() {
*chd = -1;
}

// 更新 u v w 的节点信息
self.bs.write_block(u.id, u.clone());
self.bs.write_block(v.id, v.clone());
self.bs.write_block(w.id, w.clone());
}

过程如图所示

首先是初始状态,i = 0w 节点在 v 的左边,w 节点拥有较少的 key,需要从 v 节点 borrow

$ \Downarrow$

w 节点的 sw 的位置,u 节点需要把原来指向 w 节点的那个 keyw

$ \Downarrow$

开始从 v 节点 -> w 节点移动 key

$ \Downarrow$

移动完成后,更新 u 节点的 key,在 v 节点的 shift - 1 的位置,v 节点需要把 剩下的 第一个 keyu.key[i]

$ \Downarrow$

最后,需要把 v 节点后面的 key rotate 到前面来

B-Tree shift_lr

代码以及注释如下

// w 的兄弟节点 v,节点顺序为 [..., v, w, ...]
// 从 v -> w
// u 是 v 和 w 的父节点
// 这里 i 传的为 v 的位置
fn shift_lr(&mut self, u: &mut Node<T>, i: usize, v: &mut Node<T>, w: &mut Node<T>) {
let sv = v.size();
let sw = w.size();

// 要从 v -> w 移动的 key 数量,其实就是 (sv - sw) / 2
let shift = (sw + sv) / 2 - sw;

// w 要接收新的 key,这里需要腾出空间容纳这么多的 key
w.keys.rotate_right(shift);
w.children.rotate_right(shift);

// 首先要更新 u 的 key
// 在 w 的 shift - 1 的位置,u 需要把原来指向 v 的那个 key 给 w
w.keys[shift - 1] = u.keys[i].take();
// 更新 u 指向 v 的 key,把 v 的倒数第 shift 的 key 给 u
u.keys[i] = v.keys[sv - shift].take();

// 然后开始移动 key
// 开始移动从 v -> w 移动 key
for (i, key) in v.keys[(sv - shift + 1)..sv].iter_mut().enumerate() {
w.keys[i] = key.take();
}
// children 也要移动
for (i, chd) in v.children[(sv - shift + 1)..(sv + 1)].iter_mut().enumerate() {
w.children[i] = *chd;
*chd = -1;
}

// 更新 u v w 的节点信息
self.bs.write_block(u.id, u.clone());
self.bs.write_block(v.id, v.clone());
self.bs.write_block(w.id, w.clone());
}

过程如图所示

首先是初始状态,i > 0w 节点在 v 的右边,w 节点拥有较少的 key,需要从 v 节点 borrow

$ \Downarrow$

w 节点要接收新的 key,这里需要腾出空间去容纳要加入的 key

$ \Downarrow$

w 节点的 shift - 1 的位置,u 节点需要把原来指向 v 节点的那个 keyw 节点

$ \Downarrow$

更新 u 节点指向 vkey,把 v 节点的倒数第 shiftkeyu 节点

$ \Downarrow$

开始从 v 节点 -> w 移动 key

B-Tree merge

代码如下所示

fn merge(&mut self, u: &mut Node<T>, i: usize, v: &mut Node<T>, w: &mut Node<T>) {
// 确保 v 和 w 的顺序为 [..., v, w, ...]
// 逻辑上,合并成一个新的 v,删除 w
assert_eq!(v.id, u.children[i] as usize);
assert_eq!(w.id, u.children[i + 1] as usize);

let sv = v.size();
let sw = w.size();

// w 的 key 都给 v
for (i, key) in w.keys[0..sw].iter_mut().enumerate() {
v.keys[sv + 1 + i] = key.take();
}
// w 的 children 都给 v
for (i, chd) in w.children[0..(sw + 1)].iter_mut().enumerate() {
v.children[sv + 1 + i] = *chd;
*chd = -1;
}

// u 指向 v 位置的 key,给到 v 的 sv 这个位置
v.keys[sv] = u.keys[i].take();

// 那么 u 的 key 需要 rotate
for j in (i + 1)..self.max_children_amount {
u.keys.swap(j - 1, j);
}
u.keys[self.max_children_amount - 1].take();

// 那么 u 的 children 需要 rotate
for j in (i + 2)..(self.max_children_amount + 1) {
u.children.swap(j - 1, j);
}
u.children[self.max_children_amount] = -1;

// 更新 u v 的节点信息
self.bs.write_block(u.id, u.clone());
self.bs.write_block(v.id, v.clone());

// 删除 w
self.bs.free_block(w.id);
}

过程如图所示

首先是初始状态,w 节点在 v 的右边,w 节点和 v 节点都拥有较少的 key,它们俩都需要合并

$ \Downarrow$

w 节点的 key 都给 v 节点

$ \Downarrow$

u 节点指向 v 节点位置的 key,给到 v 节点的 sv 这个位置

$ \Downarrow$

然后 u 节点的 keys 需要 rotate

$ \Downarrow$

最后删除掉 w 节点

$ \Downarrow$

节点合并完成

加餐

算法复杂度分析

注意,下面要说的 Bmax_children_amout / 2

树高度 h 和 B 的关系

如果 B-Tree 的高度为 h,并且拥有 l 个叶子节点,那么它们和 B 的关系为

即可以推算出

find

对于函数 find_it,因为这里是一个在 Node 内执行的二分查找,所以对于一个数组 a 来说,它的时间复杂度是 $ O(\log({length}({a})))$ ,而因为 ${length}({a})=2B$,所以 $O(log(length(a)))$ = $O(log(2B)) $ = $O(log B)$

而因为有 l 个叶子节点的 B-Tree 的高度为 $O(\log_B\ell)$,所以对于拥有 nkeyB-Tree 来讲它的高度为 $O(\log_B{n})$

那么对于整个 B-Tree 来讲,每一层消耗的时间是 $O(log B)$,对于有 $O(\log_B{n})$ 这么大的高度而言的 B-Tree,它总的执行 find 函数所消耗的时间为

add

对于 add 来讲,每当加入一个 key 时,最多会发生 $O(\log_B{n})$ 次 split 操作,而每次执行 split 操作仅涉及到 3Node,并且每次 split 操作还要在 Node 之间移动 Bkeychildren,所以这个时候时间复杂度就是 $O(B\log{n})$

remove

对于 remove 来讲,每当删除一个 key 时,最多会发生 $O(\log_B{n})$ 次 mergeborrow 操作,而每次 merge 或者 borrow 操作花费的时间都是 $O(B)$,所以这个时候时间复杂度就是

$O(B\log{n})$

B-Tree 解决了什么问题

其实本质上,B-Tree 这种数据结构,是为了有效地管理大型随机访问文件的 index。上面的算法复杂度分析时提到,对于 B-Tree 来讲,它的 find/add/remove 复杂度基本都是 log,这是不是跟二分搜索树很像?那么为什么使用它呢?

因为对于大型 index 来讲,用二分搜索树的空间存储效率以及查找效率都太低了

比如对于拥有 100,000 条记录的数据来讲,用 B-Tree 来存,假如 B 足够大,那么基本这棵树的高度不超过 3 层,查找基本也只需要跳 3 次就能找到对应的 Node,但是对于二叉搜索树来讲就至少需要经过 $log(100000)$ / $log(2)$ = 16 次

并且对于磁盘来讲,B-Tree 由于一次性返回一堆数据到内存中,然后在这个内存中进行快速的查找,相比于二叉搜索树而言使用了更少的 CPU 时间(这个其实有点像缓存的思想)。因为对于二叉搜索树来讲这更像是一个点查询,每次查询都需要从磁盘拷贝数据到内存中,这个时间是非常耗时的

与 B+ Tree 的区别

B+ Tree 是基于 B-Tree 实现的,如图所示

我们可以看到跟 B-Tree 有几点不同

  1. Node 有多少个 element/key 就有多少个 children
  2. Nodeelement/key 个数的范围比 B-Tree 要多 1 个
  3. Leaf nodes 叶子节点包含信息, 信息主要是每个 记录(record) 对应的 地址
  4. 所有在同一层级的 Leaf nodes 叶子节点串联起来成一个单向链表,起始指针指向最小的那个 Leaf node

参考资料

B-Tree Visualization
B-Tree wiki
B-Trees
数据结构 23树 & 234树——b树 & b+树