tglman / persy
1
use crate::{
2
    error::{IndexChangeError, PERes, PIRes},
3
    id::{index_id_to_segment_id_data, index_id_to_segment_id_meta, IndexId, SegmentId},
4
    index::{
5
        config::{ByteVec, IndexOrd, IndexType, Indexes, ValueMode, WrapperType},
6
        serialization::{deserialize, serialize},
7
        tree::{
8
            nodes::{compare, Node, NodeRef, PageIter, PageIterBack, Value},
9
            IndexApply, KeyChanges, ValueChange as TreeValue,
10
        },
11
    },
12
    persy::PersyImpl,
13
    snapshots::SnapshotId,
14
    transaction_impl::TransactionImpl,
15
    PersyId,
16
};
17
use std::{
18
    cmp::Ordering,
19
    collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap},
20
    iter::DoubleEndedIterator,
21
    ops::{Bound, RangeBounds},
22
    rc::Rc,
23
    sync::Arc,
24
    vec::IntoIter,
25
};
26

27 1
#[derive(Clone)]
28
pub struct Container<T: IndexOrd> {
29 1
    value: T,
30
}
31
impl<T> WrapperType<T> for Container<T>
32
where
33
    T: IndexType,
34
    Self: Extractor,
35
{
36 1
    fn value(self) -> T {
37 1
        self.value
38 1
    }
39
}
40
impl<T: IndexOrd> PartialOrd for Container<T> {
41 1
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
42 1
        Some(IndexOrd::cmp(&self.value, &other.value))
43 1
    }
44
}
45
impl<T: IndexOrd> Ord for Container<T> {
46 1
    fn cmp(&self, other: &Self) -> Ordering {
47 1
        IndexOrd::cmp(&self.value, &other.value)
48 1
    }
49
}
50
impl<T: IndexOrd> PartialEq for Container<T> {
51 0
    fn eq(&self, other: &Self) -> bool {
52 0
        IndexOrd::cmp(&self.value, &other.value) == Ordering::Equal
53 0
    }
54
}
55
impl<T: IndexOrd> Eq for Container<T> {}
56
impl<T: IndexOrd> From<T> for Container<T> {
57 1
    fn from(value: T) -> Self {
58 1
        Container { value }
59 1
    }
60
}
61

62 1
#[derive(Clone, Debug, PartialEq)]
63
pub enum ValueChange<V> {
64 1
    Add(V),
65 1
    Remove(Option<V>),
66
}
67

68 1
#[derive(Clone)]
69
pub enum Change {
70 1
    Add(usize),
71 1
    Remove(Option<usize>),
72
}
73

74 1
#[derive(Clone)]
75
pub struct Changes {
76 1
    changes: Vec<Change>,
77
}
78

79
impl Changes {
80 1
    fn new(change: Change) -> Changes {
81 1
        Changes { changes: vec![change] }
82 1
    }
83 1
    fn push(&mut self, change: Change) {
84 1
        self.changes.push(change);
85 1
    }
86
}
87

88 1
fn add_value<V: IndexType>(values: &mut ValueContainer, val: V) -> usize {
89 1
    let v = V::Wrapper::get_vec_mut(values);
90 1
    let l = v.len();
91 1
    v.push(val.into());
92
    l
93 1
}
94 1
fn resolve_values<V: IndexType>(values: &ValueContainer, changes: Changes) -> Vec<ValueChange<V>> {
95 1
    let v = V::Wrapper::get_vec(values);
96 1
    changes
97
        .changes
98
        .iter()
99 1
        .map(|c| match c {
100 1
            Change::Add(p) => ValueChange::Add(v[*p].clone().value()),
101 1
            Change::Remove(o) => ValueChange::Remove(o.map(|p| v[p].clone().value())),
102 1
        })
103
        .collect()
104 1
}
105 1
fn add_entry<K: IndexType>(entries: &mut EntriesContainer, k: K, change: Change) {
106 1
    let v = K::Wrapper::get_entries_mut(entries).expect("wrong match from the type and the value container");
107 1
    match v.entry(k.into()) {
108 1
        BTreeEntry::Occupied(ref mut o) => {
109 1
            o.get_mut().push(change);
110
        }
111 1
        BTreeEntry::Vacant(va) => {
112 1
            va.insert(Changes::new(change));
113
        }
114
    }
115 1
}
116

117 1
fn get_changes<K: IndexType>(entries: &EntriesContainer, x: &K) -> Option<Changes> {
118 1
    if let Some(v) = K::Wrapper::get_entries(entries) {
119 1
        let k = x.clone().into();
120 1
        v.get(&k).map(Clone::clone)
121 1
    } else {
122 0
        None
123
    }
124 1
}
125

126 1
fn resolve_range<T: IndexType, R>(entries: &EntriesContainer, range: R) -> IntoIter<T>
127
where
128
    R: RangeBounds<T>,
129
{
130 1
    let into_range: (Bound<T::Wrapper>, Bound<T::Wrapper>) =
131 1
        (map_bound(range.start_bound()), map_bound(range.end_bound()));
132 1
    let v = T::Wrapper::get_entries(entries).expect("wrong match from the type and the value container");
133 1
    BTreeMap::range(v, into_range)
134 1
        .map(|x| x.0.clone().value())
135
        .collect::<Vec<T>>()
136
        .into_iter()
137 1
}
138

139
macro_rules! impl_index_data_type {
140
    ($t:ty, $v:path, $v2:path) => {
141
        impl Extractor for <$t as IndexType>::Wrapper {
142 1
            fn get_vec_mut(vc: &mut ValueContainer) -> &mut Vec<Self> {
143 1
                if let $v(ref mut v) = vc {
144
                    v
145
                } else {
146 0
                    panic!("wrong match from type and value container")
147
                }
148 1
            }
149 1
            fn get_vec(vc: &ValueContainer) -> &Vec<Self> {
150 1
                if let $v(ref v) = vc {
151
                    v
152
                } else {
153 0
                    panic!("wrong match from type and value container")
154
                }
155 1
            }
156

157 1
            fn get_entries(vc: &EntriesContainer) -> Option<&BTreeMap<Self, Changes>> {
158 1
                if let $v2(ref v) = vc {
159 1
                    Some(v)
160
                } else {
161 0
                    None
162
                }
163 1
            }
164 1
            fn get_entries_mut(vc: &mut EntriesContainer) -> Option<&mut BTreeMap<Self, Changes>> {
165 1
                if let $v2(ref mut v) = vc {
166 1
                    Some(v)
167
                } else {
168 0
                    None
169
                }
170 1
            }
171 1
            fn new_entries() -> EntriesContainer {
172 1
                $v2(BTreeMap::new())
173 1
            }
174

175 1
            fn new_values() -> ValueContainer {
176 1
                $v(Vec::new())
177 1
            }
178
        }
179
    };
180
}
181

182
macro_rules! container_enums {
183
    ($($variant:ident<$t:ty>),+,) => {
184
        #[derive(Clone)]
185
        pub enum EntriesContainer {
186
            $(
187
            $variant(BTreeMap<<$t as IndexType>::Wrapper, Changes>),
188
            )+
189
        }
190

191
        #[derive(Clone)]
192
        pub enum ValueContainer {
193
            $(
194
            $variant(Vec<<$t as IndexType>::Wrapper>),
195
            )+
196
        }
197

198 1
        fn eapplier(
199
            keys: &EntriesContainer,
200
            values: &ValueContainer,
201
            index_id: &IndexId,
202
            persy: &PersyImpl,
203
            tx: &mut TransactionImpl,
204
        ) -> PIRes<()> {
205 0
            match keys {
206
                $(
207 1
                EntriesContainer::$variant(k) => valapplier::<$t>(values, k, index_id, persy, tx),
208
                )+
209
            }
210 1
        }
211

212 1
        fn valapplier<K>(
213
            values: &ValueContainer,
214
            k: &BTreeMap<K::Wrapper, Changes>,
215
            index_id: &IndexId,
216
            persy: &PersyImpl,
217
            tx: &mut TransactionImpl,
218
        ) -> PIRes<()>
219
        where
220
            K: IndexType,
221
        {
222 0
            match values {
223
                $(
224 1
                ValueContainer::$variant(v) => apply_to_index::<K, $t>(persy, tx, index_id, k, v),
225
                )+
226
            }
227 1
        }
228

229
        $(
230
            impl_index_data_type!($t, ValueContainer::$variant, EntriesContainer::$variant);
231
        )+
232
    }
233
}
234

235
container_enums!(
236
    U8<u8>,
237
    U16<u16>,
238
    U32<u32>,
239
    U64<u64>,
240
    U128<u128>,
241
    I8<i8>,
242
    I16<i16>,
243
    I32<i32>,
244
    I64<i64>,
245
    I128<i128>,
246
    F32W<f32>,
247
    F64W<f64>,
248
    String<String>,
249
    PersyId<PersyId>,
250
    ByteVec<ByteVec>,
251
);
252

253
pub trait Extractor: Sized {
254
    fn get_vec_mut(vc: &mut ValueContainer) -> &mut Vec<Self>;
255
    fn get_vec(vc: &ValueContainer) -> &Vec<Self>;
256
    fn get_entries(vc: &EntriesContainer) -> Option<&BTreeMap<Self, Changes>>;
257
    fn get_entries_mut(vc: &mut EntriesContainer) -> Option<&mut BTreeMap<Self, Changes>>;
258
    fn new_entries() -> EntriesContainer;
259
    fn new_values() -> ValueContainer;
260
}
261

262 1
fn apply_to_index<K, V>(
263
    persy: &PersyImpl,
264
    tx: &mut TransactionImpl,
265
    index_id: &IndexId,
266
    keys: &BTreeMap<K::Wrapper, Changes>,
267
    values: &[V::Wrapper],
268
) -> PIRes<()>
269
where
270
    K: IndexType,
271
    V: IndexType,
272
{
273 1
    let changes: Vec<_> = keys
274
        .iter()
275 1
        .map(|(k, c)| {
276 1
            let vals: Vec<_> = c
277
                .changes
278
                .iter()
279 1
                .map(|ch| match *ch {
280 1
                    Change::Add(pos) => TreeValue::Add(values[pos].clone().value()),
281 1
                    Change::Remove(pos) => TreeValue::Remove(pos.map(|p| values[p].clone().value())),
282 1
                })
283
                .collect();
284 1
            KeyChanges::new(k.clone().value(), vals)
285 1
        })
286
        .collect();
287 1
    let mut index = Indexes::get_index_keeper_tx::<K, V>(persy, tx, index_id)?;
288 1
    index.apply(&changes)?;
289 1
    index.update_changed()?;
290 1
    Ok(())
291 1
}
292

293 1
fn map_bound<T: Into<T1> + Clone, T1>(b: Bound<&T>) -> Bound<T1> {
294 1
    match b {
295 1
        Bound::Excluded(x) => Bound::Excluded((*x).clone().into()),
296 1
        Bound::Included(x) => Bound::Included((*x).clone().into()),
297 1
        Bound::Unbounded => Bound::Unbounded,
298
    }
299 1
}
300

301
pub struct IndexTransactionKeeper {
302
    indexex_changes: BTreeMap<IndexId, (EntriesContainer, ValueContainer)>,
303
}
304

305
impl IndexTransactionKeeper {
306 1
    pub fn new() -> IndexTransactionKeeper {
307 1
        IndexTransactionKeeper {
308 1
            indexex_changes: BTreeMap::new(),
309
        }
310 1
    }
311

312 1
    pub fn put<K, V>(&mut self, index: IndexId, k: K, v: V)
313
    where
314
        K: IndexType,
315
        V: IndexType,
316
    {
317 1
        match self.indexex_changes.entry(index) {
318 1
            BTreeEntry::Occupied(ref mut o) => {
319 1
                let (entries, values) = o.get_mut();
320 1
                let pos = add_value(values, v);
321 1
                add_entry(entries, k, Change::Add(pos));
322
            }
323 1
            BTreeEntry::Vacant(va) => {
324 1
                let mut values = V::Wrapper::new_values();
325 1
                let mut keys = K::Wrapper::new_entries();
326 1
                let pos = add_value(&mut values, v);
327 1
                add_entry(&mut keys, k, Change::Add(pos));
328 1
                va.insert((keys, values));
329 1
            }
330
        }
331 1
    }
332

333 1
    pub fn remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
334
    where
335
        K: IndexType,
336
        V: IndexType,
337
    {
338 1
        match self.indexex_changes.entry(index) {
339 1
            BTreeEntry::Occupied(ref mut o) => {
340 1
                let pos = v.map(|val| add_value(&mut o.get_mut().1, val));
341 1
                add_entry(&mut o.get_mut().0, k, Change::Remove(pos));
342
            }
343 1
            BTreeEntry::Vacant(va) => {
344 1
                let mut values = V::Wrapper::new_values();
345 1
                let mut keys = K::Wrapper::new_entries();
346 1
                let pos = v.map(|val| add_value(&mut values, val));
347 1
                add_entry(&mut keys, k, Change::Remove(pos));
348 1
                va.insert((keys, values));
349 1
            }
350
        }
351 1
    }
352

353 1
    pub fn get_changes<K, V>(&self, index: IndexId, k: &K) -> Option<Vec<ValueChange<V>>>
354
    where
355
        K: IndexType,
356
        V: IndexType,
357
    {
358 1
        self.indexex_changes
359
            .get(&index)
360 1
            .map(|ref o| get_changes(&o.0, k).map(|c| resolve_values(&o.1, c)))
361
            .and_then(std::convert::identity)
362 1
    }
363

364 1
    pub fn apply_changes<K, V>(
365
        &self,
366
        index_id: IndexId,
367
        vm: ValueMode,
368
        k: &K,
369
        pers: Option<Value<V>>,
370
    ) -> Result<Option<Value<V>>, IndexChangeError>
371
    where
372
        K: IndexType,
373
        V: IndexType,
374
    {
375 1
        let mut result = pers;
376 1
        if let Some(key_changes) = self.get_changes::<K, V>(index_id, k) {
377 1
            for change in key_changes {
378 1
                result = match change {
379 1
                    ValueChange::Add(add_value) => Some(if let Some(s_result) = result {
380 1
                        match s_result {
381 1
                            Value::Single(v) => match vm {
382 1
                                ValueMode::Replace => Value::Single(add_value),
383
                                ValueMode::Exclusive => {
384 1
                                    if compare(&v, &add_value) == Ordering::Equal {
385 1
                                        Value::Single(v)
386
                                    } else {
387
                                        // TODO: recover index name
388 1
                                        return Err(IndexChangeError::IndexDuplicateKey(
389 1
                                            "".to_string(),
390 1
                                            format!("{}", k),
391 0
                                        ));
392
                                    }
393
                                }
394
                                ValueMode::Cluster => {
395 1
                                    if compare(&v, &add_value) == Ordering::Equal {
396 1
                                        Value::Single(v)
397
                                    } else {
398 1
                                        Value::Cluster(vec![v, add_value])
399
                                    }
400
                                }
401 1
                            },
402 0
                            Value::Cluster(mut values) => {
403 0
                                if let Ok(pos) = values.binary_search_by(|x| compare(x, &add_value)) {
404 0
                                    values.insert(pos, add_value);
405
                                }
406 0
                                Value::Cluster(values)
407 0
                            }
408
                        }
409 0
                    } else {
410 1
                        Value::Single(add_value)
411 1
                    }),
412 1
                    ValueChange::Remove(rv) => rv.and_then(|remove_value| {
413 0
                        result.and_then(|s_result| match s_result {
414 0
                            Value::Single(v) => {
415 0
                                if compare(&v, &remove_value) == Ordering::Equal {
416 0
                                    None
417
                                } else {
418 0
                                    Some(Value::Single(v))
419
                                }
420 0
                            }
421 0
                            Value::Cluster(mut values) => {
422 0
                                if let Ok(pos) = values.binary_search_by(|x| compare(x, &remove_value)) {
423 0
                                    values.remove(pos);
424
                                }
425 0
                                Some(if values.len() == 1 {
426 0
                                    Value::Single(values.pop().unwrap())
427
                                } else {
428 0
                                    Value::Cluster(values)
429
                                })
430 0
                            }
431 0
                        })
432 0
                    }),
433
                };
434 1
            }
435 1
        }
436 1
        Ok(result)
437 1
    }
438

439 1
    pub fn apply(&self, persy: &PersyImpl, tx: &mut TransactionImpl) -> PIRes<()> {
440 1
        for (index, values) in &self.indexex_changes {
441 1
            eapplier(&values.0, &values.1, &index, persy, tx)?;
442
        }
443 1
        Ok(())
444 1
    }
445

446 1
    pub fn range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
447
    where
448
        K: IndexType,
449
        V: IndexType,
450
        R: RangeBounds<K>,
451
    {
452 1
        self.indexex_changes.get(&index).map(|x| resolve_range(&x.0, range))
453 1
    }
454

455 1
    pub fn changed_indexes(&self) -> Vec<IndexId> {
456 1
        self.indexex_changes.keys().cloned().collect()
457 1
    }
458 0
    pub fn remove_changes(&mut self, index_id: &IndexId) {
459 0
        self.indexex_changes.remove(index_id);
460 0
    }
461
}
462

463
pub trait IndexKeeper<K, V> {
464
    fn load(&self, node: &NodeRef) -> PERes<Node<K, V>>;
465
    fn get_root(&self) -> PERes<Option<NodeRef>>;
466
    fn value_mode(&self) -> ValueMode;
467
    fn index_name(&self) -> &String;
468
}
469

470
pub trait IndexModify<K, V>: IndexKeeper<K, V> {
471
    fn load_modify(&self, node: &NodeRef) -> PIRes<Option<(Rc<Node<K, V>>, u16)>>;
472
    fn lock(&mut self, node: &NodeRef, version: u16) -> PIRes<bool>;
473
    fn owned(&mut self, node_ref: &NodeRef, node: Rc<Node<K, V>>) -> Node<K, V>;
474
    fn unlock(&mut self, node: &NodeRef) -> PIRes<bool>;
475
    fn unlock_config(&mut self) -> PIRes<bool>;
476
    fn get_root_refresh(&mut self) -> PIRes<Option<NodeRef>>;
477
    fn lock_config(&mut self) -> PIRes<bool>;
478
    fn insert(&mut self, node: Node<K, V>) -> PIRes<NodeRef>;
479
    fn update(&mut self, node_ref: &NodeRef, node: Node<K, V>, version: u16) -> PIRes<()>;
480
    fn delete(&mut self, node: &NodeRef, version: u16) -> PIRes<()>;
481
    fn set_root(&mut self, root: Option<NodeRef>) -> PIRes<()>;
482
    fn bottom_limit(&self) -> usize;
483
    fn top_limit(&self) -> usize;
484
}
485

486
pub struct IndexSegmentKeeper<'a> {
487
    name: String,
488
    segment: SegmentId,
489
    root: Option<NodeRef>,
490
    store: &'a PersyImpl,
491
    snapshot: SnapshotId,
492
    value_mode: ValueMode,
493
}
494

495
impl<'a> IndexSegmentKeeper<'a> {
496 1
    pub fn new(
497
        name: &str,
498
        index_id: &IndexId,
499
        root: Option<NodeRef>,
500
        store: &'a PersyImpl,
501
        snapshot: SnapshotId,
502
        value_mode: ValueMode,
503
    ) -> IndexSegmentKeeper<'a> {
504 1
        IndexSegmentKeeper {
505 1
            name: name.to_string(),
506 1
            segment: index_id_to_segment_id_data(index_id),
507 1
            root,
508
            store,
509
            snapshot,
510
            value_mode,
511 0
        }
512 1
    }
513
}
514

515
impl<'a, K: IndexType, V: IndexType> IndexKeeper<K, V> for IndexSegmentKeeper<'a> {
516 1
    fn load(&self, node: &NodeRef) -> PERes<Node<K, V>> {
517 1
        let rec = self
518
            .store
519 1
            .read_snap_fn(self.segment, &node, self.snapshot, |x| deserialize(x))?
520 1
            .unwrap()?;
521 1
        Ok(rec)
522 1
    }
523 1
    fn get_root(&self) -> PERes<Option<NodeRef>> {
524 1
        Ok(self.root.clone())
525 1
    }
526 1
    fn value_mode(&self) -> ValueMode {
527 1
        self.value_mode.clone()
528 1
    }
529

530
    fn index_name(&self) -> &String {
531
        &self.name
532
    }
533
}
534

535
struct LockData {
536
    version: u16,
537
    counter: u32,
538
}
539

540
pub struct IndexSegmentKeeperTx<'a, K: IndexType, V: IndexType> {
541
    name: String,
542
    index_id: IndexId,
543
    root: Option<NodeRef>,
544
    config_version: u16,
545
    store: &'a PersyImpl,
546
    tx: &'a mut TransactionImpl,
547
    value_mode: ValueMode,
548
    changed: Option<HashMap<NodeRef, (Rc<Node<K, V>>, u16)>>,
549
    bottom_limit: usize,
550
    top_limit: usize,
551
    locked: HashMap<NodeRef, LockData>,
552
    updated_root: bool,
553
}
554

555
impl<'a, K: IndexType, V: IndexType> IndexSegmentKeeperTx<'a, K, V> {
556 1
    pub fn new(
557
        name: &str,
558
        index_id: &IndexId,
559
        root: Option<NodeRef>,
560
        config_version: u16,
561
        store: &'a PersyImpl,
562
        tx: &'a mut TransactionImpl,
563
        value_mode: ValueMode,
564
        bottom_limit: usize,
565
        top_limit: usize,
566
    ) -> IndexSegmentKeeperTx<'a, K, V> {
567 1
        IndexSegmentKeeperTx {
568 1
            name: name.to_string(),
569 1
            index_id: index_id.clone(),
570 1
            root,
571
            config_version,
572
            store,
573
            tx,
574
            value_mode,
575 1
            changed: None,
576
            bottom_limit,
577
            top_limit,
578 1
            locked: HashMap::new(),
579
            updated_root: false,
580 0
        }
581 1
    }
582 1
    pub fn update_changed(&mut self) -> PIRes<()> {
583 1
        let segment = index_id_to_segment_id_data(&self.index_id);
584 1
        if let Some(m) = &self.changed {
585 1
            for (node_ref, node) in m {
586 1
                self.store.update(self.tx, segment, &node_ref, &serialize(&node.0)?)?;
587
            }
588
        }
589 1
        if self.updated_root {
590 1
            Indexes::update_index_root(self.store, self.tx, &self.index_id, self.root.clone())?;
591
        }
592 1
        Ok(())
593 1
    }
594
}
595

596
impl<'a, K: IndexType, V: IndexType> IndexModify<K, V> for IndexSegmentKeeperTx<'a, K, V> {
597 1
    fn load_modify(&self, node: &NodeRef) -> PIRes<Option<(Rc<Node<K, V>>, u16)>> {
598 1
        if let Some(m) = &self.changed {
599 1
            if let Some(n) = m.get(node) {
600 1
                return Ok(Some(n.clone()));
601
            }
602
        }
603 1
        let segment = index_id_to_segment_id_data(&self.index_id);
604 1
        if let Some((rec, version)) = self
605
            .store
606 1
            .read_tx_internal_fn(self.tx, segment, &node, |x| deserialize(x))?
607
        {
608 1
            Ok(Some((Rc::new(rec?), version)))
609
        } else {
610 0
            Ok(None)
611
        }
612 1
    }
613 1
    fn lock(&mut self, node: &NodeRef, version: u16) -> PIRes<bool> {
614 1
        if let Some(lock_data) = self.locked.get_mut(node) {
615 1
            if version == lock_data.version {
616 1
                lock_data.counter += 1;
617 1
                Ok(true)
618 0
            } else if version < lock_data.version {
619 0
                Ok(false)
620
            } else {
621 0
                panic!("wrong matched versions {} {}", version, lock_data.version);
622
            }
623
        } else {
624 1
            let segment = index_id_to_segment_id_data(&self.index_id);
625 1
            if self.store.lock_record(self.tx, segment, node, version)? {
626 1
                self.locked.insert(node.clone(), LockData { version, counter: 1 });
627 1
                Ok(true)
628
            } else {
629 1
                Ok(false)
630
            }
631
        }
632 1
    }
633

634 1
    fn owned(&mut self, node_ref: &NodeRef, mut node: Rc<Node<K, V>>) -> Node<K, V> {
635 1
        debug_assert!(self.locked.contains_key(node_ref));
636 1
        if let Some(changed) = &mut self.changed {
637 1
            changed.remove(node_ref);
638
        }
639 1
        Rc::make_mut(&mut node);
640 1
        Rc::try_unwrap(node).ok().unwrap()
641 1
    }
642

643 1
    fn unlock(&mut self, node: &NodeRef) -> PIRes<bool> {
644 1
        if let Entry::Occupied(mut x) = self.locked.entry(node.clone()) {
645 1
            x.get_mut().counter -= 1;
646 1
            if x.get().counter == 0 {
647 1
                x.remove();
648 1
                let segment = index_id_to_segment_id_data(&self.index_id);
649 1
                self.store.unlock_record(self.tx, segment, node)?;
650 1
                Ok(true)
651
            } else {
652 0
                Ok(false)
653
            }
654
        } else {
655 0
            Ok(false)
656
        }
657 0
    }
658

659 1
    fn get_root_refresh(&mut self) -> PIRes<Option<NodeRef>> {
660 1
        if !self.updated_root {
661 1
            let (config, version) = Indexes::get_index_tx(self.store, self.tx, &self.index_id)?;
662 1
            self.root = config.get_root();
663 1
            self.config_version = version;
664 1
        }
665 1
        Ok(self.root.clone())
666 1
    }
667
    fn unlock_config(&mut self) -> PIRes<bool> {
668
        let config_id = Indexes::get_config_id(self.store, self.tx, &self.index_id)?.0;
669
        if let Entry::Occupied(mut x) = self.locked.entry(config_id.clone()) {
670
            x.get_mut().counter -= 1;
671
            if x.get().counter == 0 {
672
                x.remove();
673
                let segment = index_id_to_segment_id_meta(&self.index_id);
674
                self.store.unlock_record(self.tx, segment, &config_id)?;
675
                Ok(true)
676
            } else {
677
                Ok(false)
678
            }
679
        } else {
680
            Ok(false)
681
        }
682
    }
683 1
    fn lock_config(&mut self) -> PIRes<bool> {
684 1
        let config_id = Indexes::get_config_id(self.store, self.tx, &self.index_id)?.0;
685

686 1
        let segment = index_id_to_segment_id_meta(&self.index_id);
687 1
        if let Some(lock_data) = self.locked.get_mut(&config_id) {
688 1
            if self.config_version == lock_data.version {
689 1
                lock_data.counter += 1;
690 1
                Ok(true)
691
            } else {
692 0
                panic!("this should never happen");
693
            }
694 1
        } else if self
695
            .store
696 1
            .lock_record(self.tx, segment, &config_id, self.config_version)?
697
        {
698 1
            self.locked.insert(
699 1
                config_id.clone(),
700 1
                LockData {
701 1
                    version: self.config_version,
702
                    counter: 1,
703
                },
704
            );
705 1
            Ok(true)
706
        } else {
707 1
            let (config, version) = Indexes::get_index_tx(self.store, self.tx, &self.index_id)?;
708 1
            self.root = config.get_root();
709 1
            self.config_version = version;
710 1
            Ok(false)
711 1
        }
712 1
    }
713

714 1
    fn insert(&mut self, node: Node<K, V>) -> PIRes<NodeRef> {
715 1
        let segment = index_id_to_segment_id_data(&self.index_id);
716 1
        let node_ref = self.store.insert_record(self.tx, &segment, &serialize(&node)?)?;
717 1
        self.changed
718
            .get_or_insert_with(HashMap::new)
719 1
            .insert(node_ref.clone(), (Rc::new(node), 0));
720 1
        self.locked
721 1
            .insert(node_ref.clone(), LockData { version: 0, counter: 1 });
722 1
        Ok(node_ref)
723 1
    }
724

725 1
    fn update(&mut self, node_ref: &NodeRef, node: Node<K, V>, version: u16) -> PIRes<()> {
726 1
        debug_assert!(self.locked.contains_key(node_ref));
727 1
        self.changed
728
            .get_or_insert_with(HashMap::new)
729 1
            .insert(node_ref.clone(), (Rc::new(node), version));
730 1
        Ok(())
731 1
    }
732

733 0
    fn delete(&mut self, node: &NodeRef, _version: u16) -> PIRes<()> {
734 0
        debug_assert!(self.locked.contains_key(node));
735 0
        if let Some(m) = &mut self.changed {
736 0
            m.remove(node);
737
        }
738 0
        let segment = index_id_to_segment_id_data(&self.index_id);
739 0
        self.store.delete(self.tx, segment, &node)?;
740 0
        Ok(())
741 0
    }
742 1
    fn set_root(&mut self, root: Option<NodeRef>) -> PIRes<()> {
743 1
        self.root = root;
744 1
        self.updated_root = true;
745 1
        Ok(())
746 1
    }
747

748 1
    fn bottom_limit(&self) -> usize {
749 1
        self.bottom_limit
750 1
    }
751 1
    fn top_limit(&self) -> usize {
752 1
        self.top_limit
753 1
    }
754
}
755

756
impl<'a, K: IndexType, V: IndexType> IndexKeeper<K, V> for IndexSegmentKeeperTx<'a, K, V> {
757 1
    fn load(&self, node: &NodeRef) -> PERes<Node<K, V>> {
758 1
        if let Some(m) = &self.changed {
759 0
            if let Some(n) = m.get(node) {
760 0
                return Ok(n.0.as_ref().clone());
761
            }
762
        }
763 1
        let segment = index_id_to_segment_id_data(&self.index_id);
764 1
        let (rec, _) = self
765
            .store
766 1
            .read_tx_internal_fn(self.tx, segment, &node, |x| deserialize(x))?
767
            .unwrap();
768 1
        rec
769 1
    }
770 1
    fn get_root(&self) -> PERes<Option<NodeRef>> {
771 1
        Ok(self.root.clone())
772 1
    }
773 1
    fn value_mode(&self) -> ValueMode {
774 1
        self.value_mode.clone()
775 1
    }
776

777 1
    fn index_name(&self) -> &String {
778
        &self.name
779 1
    }
780
}
781

782
/// Index Iterator implementation for iterating on a range of keys considering in transaction
783
/// changes
784
pub struct TxIndexRawIter<K: IndexType, V: IndexType> {
785
    index_name: IndexId,
786
    in_tx: Option<IntoIter<K>>,
787
    persistent: Option<IndexRawIter<K, V>>,
788
    in_tx_front: Option<Option<K>>,
789
    persistent_front: Option<Option<(K, Value<V>)>>,
790
    in_tx_back: Option<Option<K>>,
791
    persistent_back: Option<Option<(K, Value<V>)>>,
792
    value_mode: ValueMode,
793
}
794

795
impl<K, V> TxIndexRawIter<K, V>
796
where
797
    K: IndexType,
798
    V: IndexType,
799
{
800 1
    pub fn new(
801
        index_name: IndexId,
802
        in_tx: Option<IntoIter<K>>,
803
        persistent: Option<IndexRawIter<K, V>>,
804
        value_mode: ValueMode,
805
    ) -> TxIndexRawIter<K, V> {
806 1
        TxIndexRawIter {
807
            index_name,
808 1
            in_tx,
809 1
            persistent,
810 1
            in_tx_front: None,
811 1
            persistent_front: None,
812 1
            in_tx_back: None,
813 1
            persistent_back: None,
814
            value_mode,
815
        }
816 1
    }
817

818 1
    fn apply_changes(
819
        tx: &mut TransactionImpl,
820
        vm: ValueMode,
821
        index: IndexId,
822
        k: K,
823
        pers: Option<Value<V>>,
824
    ) -> Option<(K, Value<V>)> {
825 1
        tx.apply_changes(vm, index, &k, pers).unwrap_or(None).map(|v| (k, v))
826 1
    }
827

828 1
    pub fn next(&mut self, persy_impl: &Arc<PersyImpl>, tx: &mut TransactionImpl) -> Option<(K, Value<V>)> {
829
        loop {
830 1
            let vm = self.value_mode.clone();
831 1
            let index = self.index_name.clone();
832 1
            let apply_changes = |k, o| Self::apply_changes(tx, vm, index, k, o);
833 1
            match (&mut self.in_tx, &mut self.persistent) {
834 1
                (Some(it), Some(pers)) => {
835 1
                    match (
836 1
                        self.in_tx_front.get_or_insert_with(|| it.next()).clone(),
837 1
                        self.persistent_front
838 1
                            .get_or_insert_with(|| pers.next(persy_impl))
839
                            .clone(),
840 0
                    ) {
841 1
                        (Some(tx_k), Some((pers_k, vals))) => match tx_k.cmp(&pers_k) {
842 1
                            Ordering::Less => {
843 0
                                self.in_tx_front = None;
844 0
                                let res = apply_changes(tx_k, None);
845 0
                                if res.is_some() {
846 0
                                    break res;
847
                                }
848 0
                            }
849
                            Ordering::Equal => {
850 1
                                self.in_tx_front = None;
851 1
                                self.persistent_front = None;
852 1
                                let res = apply_changes(tx_k, Some(vals));
853

854 1
                                if res.is_some() {
855 0
                                    break res;
856
                                }
857 1
                            }
858
                            Ordering::Greater => {
859 1
                                self.persistent_front = None;
860 1
                                break Some((pers_k, vals));
861
                            }
862 1
                        },
863 1
                        (Some(tx_k), None) => {
864 1
                            self.in_tx_front = None;
865 1
                            let res = apply_changes(tx_k, None);
866 1
                            if res.is_some() {
867 1
                                break res;
868
                            }
869 1
                        }
870 1
                        (None, Some((pers_k, vals))) => {
871 0
                            self.persistent_front = None;
872 0
                            break Some((pers_k, vals));
873 0
                        }
874 1
                        (None, None) => break None,
875
                    }
876 1
                }
877 0
                (Some(it), None) => {
878 0
                    let res = apply_changes(it.next().unwrap(), None);
879 0
                    if res.is_some() {
880 0
                        break res;
881
                    }
882 0
                }
883 0
                (None, Some(pers)) => break pers.next(persy_impl),
884 0
                (None, None) => break None,
885
            }
886
        }
887 1
    }
888

889 1
    pub fn next_back(&mut self, persy_impl: &Arc<PersyImpl>, tx: &mut TransactionImpl) -> Option<(K, Value<V>)> {
890
        loop {
891 1
            let vm = self.value_mode.clone();
892 1
            let index = self.index_name.clone();
893 1
            let apply_changes = |k, o| Self::apply_changes(tx, vm, index, k, o);
894 1
            match (&mut self.in_tx, &mut self.persistent) {
895 1
                (Some(it), Some(pers)) => {
896 1
                    match (
897 1
                        self.in_tx_back.get_or_insert_with(|| it.next_back()).clone(),
898 1
                        self.persistent_back
899 1
                            .get_or_insert_with(|| pers.next_back(persy_impl))
900
                            .clone(),
901 0
                    ) {
902 1
                        (Some(tx_k), Some((pers_k, vals))) => match tx_k.cmp(&pers_k) {
903 1
                            Ordering::Less => {
904 1
                                self.persistent_back = None;
905 1
                                break Some((pers_k, vals));
906
                            }
907
                            Ordering::Equal => {
908 1
                                self.in_tx_back = None;
909 1
                                self.persistent_back = None;
910 1
                                let res = apply_changes(tx_k, Some(vals));
911

912 1
                                if res.is_some() {
913 0
                                    break res;
914
                                }
915 1
                            }
916
                            Ordering::Greater => {
917 1
                                self.in_tx_back = None;
918 1
                                let res = apply_changes(tx_k, None);
919

920 1
                                if res.is_some() {
921 1
                                    break res;
922
                                }
923 0
                            }
924 1
                        },
925 1
                        (Some(tx_k), None) => {
926 1
                            self.in_tx_back = None;
927 1
                            let res = apply_changes(tx_k, None);
928 1
                            if res.is_some() {
929 1
                                break res;
930
                            }
931 1
                        }
932 1
                        (None, Some((pers_k, vals))) => {
933 1
                            self.persistent_back = None;
934 1
                            break Some((pers_k, vals));
935 0
                        }
936 1
                        (None, None) => break None,
937
                    }
938 1
                }
939 0
                (Some(it), None) => {
940 0
                    let res = apply_changes(it.next_back().unwrap(), None);
941 0
                    if res.is_some() {
942 0
                        break res;
943
                    }
944 0
                }
945 0
                (None, Some(pers)) => break pers.next_back(persy_impl),
946 0
                (None, None) => break None,
947
            }
948
        }
949 1
    }
950
}
951

952
pub struct IndexRawIter<K: IndexType, V: IndexType> {
953
    index_id: IndexId,
954
    read_snapshot: SnapshotId,
955
    iter: PageIter<K, V>,
956
    back: PageIterBack<K, V>,
957
    release_snapshot: bool,
958
}
959

960
impl<K, V> IndexRawIter<K, V>
961
where
962
    K: IndexType,
963
    V: IndexType,
964
{
965 1
    pub fn new(
966
        index_id: IndexId,
967
        read_snapshot: SnapshotId,
968
        iter: PageIter<K, V>,
969
        back: PageIterBack<K, V>,
970
        release_snapshot: bool,
971
    ) -> IndexRawIter<K, V> {
972 1
        IndexRawIter {
973
            index_id,
974
            read_snapshot,
975 1
            iter,
976 1
            back,
977
            release_snapshot,
978
        }
979 1
    }
980

981 1
    pub fn next(&mut self, persy_impl: &PersyImpl) -> Option<(K, Value<V>)> {
982 1
        let back_keep = self.back.iter.peek();
983 1
        if let (Some(s), Some(e)) = (self.iter.iter.peek(), back_keep) {
984 1
            if s.key.cmp(&e.key) == Ordering::Greater {
985 1
                return None;
986
            }
987
        }
988 1
        if let Some(n) = self.iter.iter.next() {
989 1
            if self.iter.iter.peek().is_none() {
990 1
                if let Ok(iter) = persy_impl.index_next(&self.index_id, self.read_snapshot, Bound::Excluded(&n.key)) {
991 1
                    self.iter = iter;
992
                }
993 1
            }
994 1
            Some((n.key, n.value))
995 0
        } else {
996 1
            None
997
        }
998 1
    }
999

1000 1
    pub fn next_back(&mut self, persy_impl: &PersyImpl) -> Option<(K, Value<V>)> {
1001 1
        let front_keep = self.iter.iter.peek();
1002 1
        if let (Some(s), Some(e)) = (self.back.iter.peek(), front_keep) {
1003 1
            if s.key.cmp(&e.key) == Ordering::Less {
1004 1
                return None;
1005
            }
1006
        }
1007 1
        if let Some(n) = self.back.iter.next() {
1008 1
            if self.back.iter.peek().is_none() {
1009 1
                if let Ok(back) = persy_impl.index_back(&self.index_id, self.read_snapshot, Bound::Excluded(&n.key)) {
1010 1
                    self.back = back;
1011
                }
1012 1
            }
1013 1
            Some((n.key, n.value))
1014 0
        } else {
1015 1
            None
1016
        }
1017 1
    }
1018 1
    pub fn release(&self, persy_impl: &PersyImpl) -> PERes<()> {
1019 1
        if self.release_snapshot {
1020 1
            persy_impl.release_snapshot(self.read_snapshot)?;
1021
        }
1022 1
        Ok(())
1023 1
    }
1024
}
1025

1026
#[cfg(test)]
1027
mod tests {
1028
    use super::{ByteVec, IndexTransactionKeeper, IndexType, ValueChange};
1029
    use crate::id::{IndexId, PersyId, RecRef, SegmentId};
1030
    use std::fmt::Debug;
1031

1032 1
    fn keeper_test_for_type<K: IndexType + PartialEq, V: IndexType + Debug + PartialEq>(k: K, dk: K, v: V) {
1033 1
        let name = IndexId::new(SegmentId::new(30), SegmentId::new(40));
1034 1
        let mut keeper = IndexTransactionKeeper::new();
1035 1
        keeper.put(name.clone(), k.clone(), v.clone());
1036 1
        let ret = keeper.get_changes(name.clone(), &k);
1037 1
        assert_eq!(ret, Some(vec![ValueChange::Add(v.clone())]));
1038 1
        keeper.remove(name.clone(), dk.clone(), Some(v.clone()));
1039 1
        let ret = keeper.get_changes(name, &dk);
1040 1
        assert_eq!(ret, Some(vec![ValueChange::Remove(Some(v))]));
1041 1
    }
1042

1043
    #[test]
1044 1
    fn simple_tx_keeper_test() {
1045 1
        keeper_test_for_type::<u8, u8>(10, 15, 10);
1046 1
        keeper_test_for_type::<u16, u16>(10, 15, 10);
1047 1
        keeper_test_for_type::<u32, u32>(10, 15, 10);
1048 1
        keeper_test_for_type::<u64, u64>(10, 15, 10);
1049 1
        keeper_test_for_type::<u128, u128>(10, 15, 10);
1050 1
        keeper_test_for_type::<i8, i8>(10, 15, 10);
1051 1
        keeper_test_for_type::<i16, i16>(10, 15, 10);
1052 1
        keeper_test_for_type::<i32, i32>(10, 15, 10);
1053 1
        keeper_test_for_type::<i64, i64>(10, 15, 10);
1054 1
        keeper_test_for_type::<i128, i128>(10, 15, 10);
1055 1
        keeper_test_for_type::<f32, f32>(20.0, 10.0, 20.0);
1056 1
        keeper_test_for_type::<f64, f64>(20.0, 10.0, 20.0);
1057 1
        keeper_test_for_type::<String, String>("a".to_string(), "b".to_string(), "a".to_string());
1058 1
        keeper_test_for_type::<ByteVec, ByteVec>(vec![0, 1].into(), vec![0, 2].into(), vec![0, 1].into());
1059 1
        let id = PersyId(RecRef::new(10, 20));
1060 1
        let id1 = PersyId(RecRef::new(20, 20));
1061 1
        let id2 = PersyId(RecRef::new(30, 20));
1062 1
        keeper_test_for_type::<PersyId, PersyId>(id, id1, id2);
1063 1
    }
1064
}

Read our documentation on viewing source code .

Loading