1
pub mod nodes;
2
use crate::{
3
    error::PRes,
4
    index::{
5
        config::{IndexOrd, IndexType, ValueMode},
6
        keeper::{IndexKeeper, IndexModify},
7
        tree::nodes::{Leaf, Node, NodeRef, Nodes, PageIter, PageIterBack, Value},
8
    },
9
};
10
use std::{cmp::Ordering, ops::Bound, rc::Rc};
11

12
pub enum ValueChange<V> {
13
    ADD(V),
14
    REMOVE(Option<V>),
15
}
16

17
pub struct KeyChanges<K, V> {
18
    pub k: K,
19
    changes: Vec<ValueChange<V>>,
20
}
21

22
impl<K: IndexType, V: IndexType> KeyChanges<K, V> {
23
    #[allow(dead_code)]
24 1
    pub fn single_add(k: K, v: V) -> KeyChanges<K, V> {
25 1
        KeyChanges {
26
            k,
27 1
            changes: vec![ValueChange::ADD(v)],
28 0
        }
29 1
    }
30

31
    #[allow(dead_code)]
32 1
    fn single_delete(k: K, v: Option<V>) -> KeyChanges<K, V> {
33 1
        KeyChanges {
34
            k,
35 1
            changes: vec![ValueChange::REMOVE(v)],
36 0
        }
37 1
    }
38

39 1
    pub fn new(k: K, changes: Vec<ValueChange<V>>) -> KeyChanges<K, V> {
40 1
        KeyChanges { k, changes }
41 1
    }
42 1
    fn apply(&self, leaf: &mut Leaf<K, V>, value_mode: ValueMode, index: &str) -> PRes<bool> {
43 1
        let mut update = false;
44 1
        for vc in &self.changes {
45 1
            update |= match vc {
46 1
                ValueChange::ADD(v) => {
47 1
                    if leaf.len() > 0 {
48 1
                        leaf.insert_or_update(&self.k, &v, value_mode.clone(), index)?;
49
                    } else {
50 1
                        leaf.add(0, &self.k, v, value_mode.clone());
51
                    }
52 1
                    true
53
                }
54 1
                ValueChange::REMOVE(r) => leaf.remove(&self.k, &r),
55
            };
56
        }
57 1
        Ok(update)
58 1
    }
59
}
60

61
pub trait Index<K, V> {
62
    fn get(&mut self, k: &K) -> PRes<Option<Value<V>>>;
63
    fn iter_from(&mut self, first: Bound<&K>) -> PRes<PageIter<K, V>>;
64
    fn back_iter_from(&mut self, first: Bound<&K>) -> PRes<PageIterBack<K, V>>;
65
}
66

67
pub trait IndexApply<K, V>: Index<K, V> {
68
    fn apply(&mut self, adds: &[KeyChanges<K, V>]) -> PRes<()>;
69
}
70

71 1
#[derive(PartialEq, Clone, Debug)]
72
pub struct PosRef<K> {
73 1
    pub k: K,
74 1
    pub pos: usize,
75 1
    pub node_ref: NodeRef,
76 1
    pub sibling: Option<NodeRef>,
77 1
    pub version: u16,
78
}
79

80
impl<K: Clone> PosRef<K> {
81 1
    pub(crate) fn new(k: &K, pos: usize, node_ref: NodeRef, sibling: Option<NodeRef>) -> PosRef<K> {
82 1
        PosRef {
83 1
            k: k.clone(),
84
            pos,
85
            node_ref,
86 1
            sibling,
87
            version: 0,
88
        }
89 1
    }
90
}
91

92
struct ParentNodeChanged<K> {
93
    path: Vec<PosRef<K>>,
94
    children: Vec<PosRef<K>>,
95
}
96

97 1
fn split_link_save<K: IndexOrd + Clone, V, I: IndexModify<K, V>>(
98
    index: &mut I,
99
    mut prev_id: NodeRef,
100
    mut first: Node<K, V>,
101
    cur_version: u16,
102
) -> PRes<Vec<(K, NodeRef)>> {
103 1
    let new_nodes = first.split(index.top_limit());
104 1
    let mut ids = Vec::new();
105 1
    for (k, new_node) in new_nodes {
106 1
        let saved_id = index.insert(new_node)?;
107 1
        ids.push((k, saved_id));
108 1
    }
109 1
    if let Node::LEAF(leaf) = first {
110 1
        let mut prev_leaf = leaf;
111 1
        let mut prev_version = cur_version;
112 1
        for (_, id) in &ids {
113
            // Just unwrap, the leaf should be locked so expected to exist
114 1
            let (sibling_node, version) = index.load_modify(&id)?.unwrap();
115 1
            if let Node::LEAF(sibling) = index.owned(&id, sibling_node) {
116 1
                index.update(&prev_id, Node::LEAF(prev_leaf), prev_version)?;
117 1
                prev_id = id.clone();
118 1
                prev_leaf = sibling;
119 1
                prev_version = version;
120 0
            }
121 1
        }
122 1
        index.update(&prev_id, Node::LEAF(prev_leaf), prev_version)?;
123 1
    } else {
124 1
        index.update(&prev_id, first, cur_version)?;
125
    }
126 1
    Ok(ids)
127 1
}
128

129 1
fn group_by_parent<K>(updates: Vec<Vec<PosRef<K>>>) -> Vec<ParentNodeChanged<K>> {
130 1
    let mut parent_updates = Vec::new();
131 1
    let mut parent_node: Option<NodeRef> = None;
132 1
    let mut new_update: Option<ParentNodeChanged<K>> = None;
133 1
    for mut update in updates {
134 1
        if let Some(last) = update.pop() {
135 1
            if parent_node == update.last().map(|x| x.node_ref.clone()) {
136 1
                if let Some(p) = &mut new_update {
137 1
                    p.children.push(last);
138
                }
139
            } else {
140 1
                if let Some(p) = new_update {
141 1
                    parent_updates.push(p);
142
                }
143 1
                parent_node = update.last().map(|x| x.node_ref.clone());
144 1
                let mut children = Vec::new();
145 1
                children.push(last);
146 1
                new_update = Some(ParentNodeChanged { path: update, children });
147 1
            }
148 1
        }
149 1
    }
150 1
    if let Some(p) = new_update {
151 1
        parent_updates.push(p);
152
    }
153 1
    parent_updates
154 1
}
155 1
fn lock_parents<K: IndexOrd + Clone, V, I: IndexModify<K, V>>(
156
    index: &mut I,
157
    mut updates: Vec<Vec<PosRef<K>>>,
158
) -> PRes<Vec<Vec<PosRef<K>>>> {
159 1
    for update in &mut updates {
160 1
        loop {
161 1
            debug_assert!(
162 1
                update.len() >= 2,
163
                "never lock a path that shorter than 2 because that would be the root",
164
            );
165
            let locked = {
166 1
                let update = &update;
167 1
                let len = update.len();
168 1
                let rec_ref = &update[len - 2];
169 1
                if let Some((node, version)) = index.load_modify(&rec_ref.node_ref)? {
170 1
                    lock_logic(index, rec_ref, node, version)?.is_some()
171 1
                } else {
172 0
                    false
173
                }
174 1
            };
175 1
            if locked {
176
                break;
177
            } else {
178 1
                let last = update.last().unwrap().clone();
179 1
                if let Some(ref node) = index.get_root_refresh()? {
180 1
                    let mut path = Vec::new();
181 1
                    let mut cur_node = PosRef::new(&last.k, 0, node.clone(), None);
182 1
                    while cur_node.node_ref != last.node_ref {
183 1
                        let mut restart = true;
184 1
                        if let Some((node_modify, version)) = index.load_modify(&cur_node.node_ref)? {
185 1
                            if let Node::NODE(n) = &*node_modify {
186 1
                                cur_node.version = version;
187 1
                                path.push(cur_node.clone());
188 1
                                if let Some(x) = n.find_write(&last.k) {
189 1
                                    cur_node = x;
190 1
                                    restart = false;
191
                                }
192 1
                            }
193 1
                        }
194 1
                        if restart {
195 0
                            if let Some(ref node) = index.get_root_refresh()? {
196 0
                                cur_node = PosRef::new(&last.k, 0, node.clone(), None);
197 0
                                path.clear();
198
                            } else {
199 0
                                panic!("restart node finding but not root present");
200
                            }
201 0
                        }
202
                    }
203 1
                    path.push(last);
204 1
                    *update = path;
205 1
                }
206 1
            }
207
        }
208
    }
209 1
    Ok(updates)
210 1
}
211

212 1
fn merge_and_save<K: IndexOrd + Clone, V, I: IndexModify<K, V>>(
213
    index: &mut I,
214
    parent: &mut Nodes<K>,
215
    pos: usize,
216
    mut cur: Node<K, V>,
217
    version: u16,
218
) -> PRes<bool> {
219 1
    Ok(if pos > 0 {
220 1
        let node_ref = parent.get(pos - 1);
221 1
        let (dest_node, dest_version) = index.load_modify(&node_ref)?.unwrap();
222 1
        let mut dest_merge = index.owned(&node_ref, dest_node);
223 1
        dest_merge.merge_right(parent.get_key(pos - 1), &mut cur);
224 1
        index.update(&node_ref, dest_merge, dest_version)?;
225 1
        index.delete(&parent.remove(pos).unwrap(), version)?;
226 1
        false
227 1
    } else {
228 1
        let node_ref = parent.get(pos + 1);
229 1
        let (source_node, source_version) = index.load_modify(&node_ref)?.unwrap();
230 1
        let mut source_merge = index.owned(&node_ref, source_node);
231 1
        cur.merge_right(parent.get_key(pos), &mut source_merge);
232 1
        index.delete(&parent.remove(pos + 1).unwrap(), source_version)?;
233 1
        index.update(&parent.get(pos), cur, version)?;
234 1
        true
235 1
    })
236 1
}
237

238 1
fn lock_logic<K: IndexOrd + Clone, V, I: IndexModify<K, V>>(
239
    index: &mut I,
240
    cur_node: &PosRef<K>,
241
    node: Rc<Node<K, V>>,
242
    node_version: u16,
243
) -> PRes<Option<(Rc<Node<K, V>>, u16)>> {
244 1
    if let Some(sibling) = &cur_node.sibling {
245 1
        let sibling_load = if let Some(load) = index.load_modify(sibling)? {
246 1
            load
247
        } else {
248 0
            return Ok(None);
249 0
        };
250 1
        let ((first, second), (f, s)) = if cur_node.pos > 0 {
251 1
            ((sibling, &cur_node.node_ref), (sibling_load, (node, node_version)))
252
        } else {
253 1
            ((&cur_node.node_ref, sibling), ((node, node_version), sibling_load))
254
        };
255

256 1
        let (first_node, first_version) = f;
257 1
        let (second_node, second_version) = s;
258

259 1
        if let (Some(f), Some(s)) = (first_node.get_next(), second_node.get_prev()) {
260 1
            if f.cmp(s) != Ordering::Equal {
261 1
                return Ok(None);
262
            }
263
        } else {
264 0
            return Ok(None);
265
        }
266 1
        if cur_node.pos == 0 && !first_node.check_range(&cur_node.k) {
267 0
            return Ok(None);
268
        }
269 1
        if cur_node.pos > 0 && !second_node.check_range(&cur_node.k) {
270 0
            return Ok(None);
271
        }
272 1
        if !index.lock(first, first_version)? {
273 1
            return Ok(None);
274
        }
275 1
        if !index.lock(second, second_version)? {
276 1
            index.unlock(first)?;
277 1
            return Ok(None);
278
        }
279 1
        if cur_node.pos > 0 {
280 1
            Ok(Some((second_node, second_version)))
281
        } else {
282 1
            Ok(Some((first_node, first_version)))
283
        }
284 1
    } else if !node.check_range(&cur_node.k) {
285 0
        Ok(None)
286 1
    } else if index.lock(&cur_node.node_ref, node_version)? {
287 1
        Ok(Some((node, node_version)))
288
    } else {
289 1
        Ok(None)
290
    }
291 1
}
292

293
impl<K: IndexType, V: IndexType, T> IndexApply<K, V> for T
294
where
295
    T: IndexModify<K, V>,
296
    T: Index<K, V>,
297
{
298 1
    fn apply(&mut self, adds: &[KeyChanges<K, V>]) -> PRes<()> {
299 1
        let mut updates = Vec::new();
300 1
        let mut prev_leaf_id = None;
301 1
        for add in adds {
302 1
            let mut next: Option<PosRef<K>> = None;
303 1
            let mut path = Vec::new();
304 1
            loop {
305 1
                next = if let Some(cur_node) = &mut next {
306 1
                    let mut new_next = None;
307 1
                    if let Some((node_modify, version)) = self.load_modify(&cur_node.node_ref)? {
308 1
                        if let Node::NODE(n) = &*node_modify {
309 1
                            cur_node.version = version;
310 1
                            path.push(cur_node.clone());
311 1
                            new_next = n.find_write(&add.k);
312 1
                        } else if let Node::LEAF(ref _none) = &*node_modify {
313 1
                            cur_node.version = version;
314 1
                            if let Some((leaf_modify, version)) = lock_logic(self, cur_node, node_modify, version)? {
315 1
                                path.push(cur_node.clone());
316 1
                                if let Node::LEAF(mut leaf) = self.owned(&cur_node.node_ref, leaf_modify) {
317 1
                                    let node_ref = &cur_node.node_ref;
318 1
                                    if add.apply(&mut leaf, self.value_mode(), self.index_name())? {
319 1
                                        self.update(node_ref, Node::LEAF(leaf), version)?;
320 1
                                        if Some(node_ref.clone()) != prev_leaf_id {
321 1
                                            updates.push(path);
322
                                        }
323 1
                                        prev_leaf_id = Some(node_ref.clone())
324
                                    }
325 1
                                    break;
326 1
                                }
327 1
                            }
328 1
                        }
329 1
                    }
330 1
                    new_next
331 1
                } else if let Some(r) = self.get_root_refresh()? {
332 1
                    path.clear();
333 1
                    Some(PosRef::new(&add.k, 0, r.clone(), None))
334
                } else {
335 1
                    self.lock_config()?;
336 1
                    if let Some(r) = self.get_root_refresh()? {
337 1
                        path.clear();
338 1
                        Some(PosRef::new(&add.k, 0, r.clone(), None))
339
                    } else {
340 1
                        let mut leaf = Leaf::new();
341 1
                        add.apply(&mut leaf, self.value_mode(), self.index_name())?;
342 1
                        let leaf_ref = self.insert(Node::LEAF(leaf))?;
343 1
                        self.set_root(Some(leaf_ref))?;
344 1
                        break;
345 1
                    }
346 0
                }
347
            }
348 1
        }
349 1
        while updates.len() > 1 || (updates.len() == 1 && updates[0].len() > 1) {
350 1
            updates = lock_parents(self, updates)?;
351 1
            let parent_updates = group_by_parent(updates);
352 1
            updates = Vec::new();
353 1
            for update in parent_updates {
354 1
                let parent_id = update.path.last().unwrap().node_ref.clone();
355
                // It's locked, should not have miss read unwrap.
356 1
                let (read_node, n_version) = self.load_modify(&parent_id)?.unwrap();
357 1
                if let Node::NODE(mut n) = self.owned(&parent_id, read_node) {
358 1
                    let mut save = false;
359 1
                    let mut flags = vec![false; n.len()];
360 1
                    for ch in update.children {
361 1
                        let pos = n.find(&ch.k);
362 1
                        flags[pos.pos] = true;
363 1
                    }
364 1
                    let mut i = 0;
365 1
                    while i < n.len() {
366 1
                        let pos = i;
367 1
                        i += 1;
368 1
                        if flags[pos] && n.len() > 1 {
369
                            // It's locked, should not have miss read unwrap.
370 1
                            let cur_id = n.get(pos);
371 1
                            let (cur, version) = self.load_modify(&cur_id)?.unwrap();
372 1
                            if cur.len() < self.bottom_limit() {
373 1
                                let mut new_pos = pos;
374 1
                                let to_modify = self.owned(&cur_id, cur);
375 1
                                if merge_and_save(self, &mut n, pos, to_modify, version)? {
376 1
                                    new_pos += 1;
377
                                }
378 1
                                flags.remove(new_pos);
379 1
                                flags[new_pos - 1] = true;
380 1
                                i = new_pos - 1;
381 1
                                save = true;
382
                            }
383 1
                        }
384
                    }
385 1
                    for pos in 0..n.len() {
386 1
                        if flags[pos] {
387
                            // It's locked, should not have miss read unwrap.
388 1
                            let cur_id = n.get(pos);
389 1
                            let (cur, cur_version) = self.load_modify(&cur_id)?.unwrap();
390 1
                            if cur.len() > self.top_limit() {
391 1
                                let to_modify = self.owned(&cur_id, cur);
392 1
                                let mut ids = split_link_save(self, n.get(pos), to_modify, cur_version)?;
393 1
                                for _ in 0..ids.len() {
394 1
                                    flags.insert(pos, false);
395
                                }
396 1
                                n.insert_after(pos, &mut ids);
397 1
                                save = true;
398 1
                            }
399 1
                        }
400
                    }
401 1
                    if save {
402 1
                        self.update(&parent_id, Node::NODE(n), n_version)?;
403 1
                        if !update.path.is_empty() {
404 1
                            updates.push(update.path);
405
                        } else {
406 0
                            break;
407
                        }
408
                    }
409 1
                }
410 1
            }
411 1
        }
412 1
        if updates.len() == 1 && updates[0].len() == 1 {
413 1
            self.lock_config()?;
414 1
            while let Some(r) = self.get_root_refresh()? {
415 1
                if let Some((n, n_version)) = self.load_modify(&r)? {
416 1
                    if n.len() > self.top_limit() {
417 1
                        let to_modify = self.owned(&r, n);
418 1
                        let ids = split_link_save(self, r.clone(), to_modify, n_version)?;
419 1
                        let node = Node::NODE(Nodes::new_from_split(r, &ids));
420 1
                        let cur_id = self.insert(node)?;
421 1
                        self.set_root(Some(cur_id))?;
422 1
                    } else if n.len() == 1 {
423 1
                        if let Node::NODE(cn) = &*n {
424 1
                            self.delete(&r, n_version)?;
425 1
                            self.set_root(Some(cn.get(0)))?;
426
                        } else {
427 1
                            break;
428
                        }
429 1
                    } else if n.len() == 0 {
430 1
                        self.set_root(None)?;
431
                    } else {
432 1
                        break;
433
                    }
434 1
                }
435 1
            }
436
        }
437 1
        Ok(())
438 1
    }
439
}
440

441
impl<K: IndexType, V: IndexType, T> Index<K, V> for T
442
where
443
    T: IndexKeeper<K, V>,
444
{
445 1
    fn get(&mut self, k: &K) -> PRes<Option<Value<V>>> {
446 1
        Ok(if let Some(node) = self.get_root()? {
447 1
            let mut cur_node = node;
448 1
            loop {
449 1
                match self.load(&cur_node)? {
450 1
                    Node::NODE(n) => {
451 1
                        cur_node = n.find(k).node_ref;
452 1
                    }
453 1
                    Node::LEAF(leaf) => {
454 1
                        break leaf.find(k).map(|el| el.1).ok();
455 1
                    }
456
                }
457 0
            }
458
        } else {
459 1
            None
460
        })
461 1
    }
462

463 1
    fn iter_from(&mut self, first: Bound<&K>) -> PRes<PageIter<K, V>> {
464 1
        let mut path = Vec::new();
465 1
        let mut iter = if let Some(mut cur_node) = self.get_root()? {
466 1
            path.push((0, cur_node.clone()));
467 1
            loop {
468 1
                match self.load(&cur_node)? {
469 1
                    Node::NODE(n) => {
470 1
                        let value = match first {
471 1
                            Bound::Included(f) => {
472 1
                                let found = n.find(f);
473 1
                                (found.pos, found.node_ref)
474 1
                            }
475 1
                            Bound::Excluded(f) => {
476 1
                                let found = n.find(f);
477 1
                                (found.pos, found.node_ref)
478 1
                            }
479 1
                            Bound::Unbounded => (0, n.get(0)),
480
                        };
481 1
                        cur_node = value.1.clone();
482 1
                        path.push(value);
483 1
                    }
484 1
                    Node::LEAF(leaf) => {
485 1
                        break PageIter {
486 1
                            iter: leaf.iter_from(first).peekable(),
487
                        };
488 1
                    }
489
                }
490 0
            }
491
        } else {
492 1
            PageIter {
493 1
                iter: Vec::new().into_iter().peekable(),
494
            }
495 1
        };
496

497 1
        while iter.iter.peek().is_none() {
498 1
            let prev = path.pop();
499 1
            if let Some((_, node)) = path.last() {
500 1
                let (pos, _) = prev.unwrap();
501 1
                match self.load(&node)? {
502 1
                    Node::NODE(n) => {
503
                        // check if there are more elements in the node
504 1
                        if n.len() > pos + 1 {
505 1
                            let mut cur_node = n.get(pos + 1);
506 0
                            loop {
507 1
                                match self.load(&cur_node)? {
508 1
                                    Node::NODE(n) => {
509 0
                                        cur_node = n.get(0);
510 0
                                    }
511 1
                                    Node::LEAF(leaf) => {
512
                                        // Use here the key anyway, should start from the first in
513
                                        // any case
514 1
                                        iter = PageIter {
515 1
                                            iter: leaf.iter_from(first).peekable(),
516
                                        };
517
                                        break;
518 1
                                    }
519
                                }
520 0
                            }
521
                        }
522 1
                    }
523 0
                    Node::LEAF(_leaf) => {
524 0
                        panic!("can't happen");
525 0
                    }
526
                }
527 0
            } else {
528 1
                break;
529
            }
530
        }
531 1
        Ok(iter)
532 1
    }
533

534 1
    fn back_iter_from(&mut self, last: Bound<&K>) -> PRes<PageIterBack<K, V>> {
535 1
        let mut path = Vec::new();
536 1
        let mut iter = if let Some(mut cur_node) = self.get_root()? {
537 1
            path.push((0, cur_node.clone()));
538 1
            loop {
539 1
                match self.load(&cur_node)? {
540 1
                    Node::NODE(n) => {
541 0
                        let value = match last {
542 1
                            Bound::Included(f) => {
543 0
                                let found = n.find(f);
544 0
                                (found.pos, found.node_ref)
545 0
                            }
546 1
                            Bound::Excluded(f) => {
547 1
                                let found = n.find(f);
548 1
                                (found.pos, found.node_ref)
549 1
                            }
550
                            Bound::Unbounded => {
551 1
                                let pos = n.len() - 1;
552 1
                                (pos, n.get(pos))
553
                            }
554
                        };
555 1
                        cur_node = value.1.clone();
556 1
                        path.push(value);
557 1
                    }
558 1
                    Node::LEAF(leaf) => {
559 1
                        break PageIterBack {
560 1
                            iter: leaf.back_iter_from(last).peekable(),
561
                        };
562 1
                    }
563
                }
564 0
            }
565
        } else {
566 1
            PageIterBack {
567 1
                iter: Vec::new().into_iter().rev().peekable(),
568
            }
569 1
        };
570

571 1
        while iter.iter.peek().is_none() {
572 1
            let prev = path.pop();
573 1
            if let Some((_, node)) = path.last() {
574 1
                let (pos, _) = prev.unwrap();
575 1
                match self.load(&node)? {
576 1
                    Node::NODE(n) => {
577
                        // check if there are more elements in the node
578 1
                        if pos > 0 {
579 1
                            let mut cur_node = n.get(pos - 1);
580 0
                            loop {
581 1
                                match self.load(&cur_node)? {
582 1
                                    Node::NODE(n) => {
583 0
                                        cur_node = n.get(n.len() - 1);
584 0
                                    }
585 1
                                    Node::LEAF(leaf) => {
586
                                        // Use here the key anyway, should start from the first in
587
                                        // any case
588 1
                                        iter = PageIterBack {
589 1
                                            iter: leaf.back_iter_from(last).peekable(),
590
                                        };
591
                                        break;
592 1
                                    }
593
                                }
594 0
                            }
595
                        }
596 1
                    }
597 0
                    Node::LEAF(_leaf) => {
598 0
                        panic!("can't happen");
599 0
                    }
600
                }
601 0
            } else {
602 1
                break;
603
            }
604
        }
605 1
        Ok(iter)
606 1
    }
607
}
608

609
#[cfg(test)]
610
mod tests;

Read our documentation on viewing source code .

Loading