1
use crate::index::{
2
    config::{ByteVec, IndexOrd, IndexType, Indexes, ValueMode, WrapperType},
3
    serialization::{deserialize, serialize},
4
    tree::{
5
        nodes::{compare, Node, NodeRef, PageIter, PageIterBack, Value},
6
        IndexApply, KeyChanges, ValueChange as TreeValue,
7
    },
8
};
9
use crate::{
10
    error::{PRes, PersyError},
11
    id::{index_id_to_segment_id_data, index_id_to_segment_id_meta, IndexId, SegmentId},
12
    persy::PersyImpl,
13
    snapshot::SnapshotId,
14
    transaction::Transaction,
15
    PersyId,
16
};
17
use std::{
18
    cmp::Ordering,
19
    collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap},
20
    iter::DoubleEndedIterator,
21
    ops::{Bound, RangeBounds},
22
    rc::Rc,
23
    sync::Arc,
24
    vec::IntoIter,
25
};
26

27 1
#[derive(Clone)]
28
pub struct Container<T: IndexOrd> {
29 1
    value: T,
30
}
31
impl<T> WrapperType<T> for Container<T>
32
where
33
    T: IndexType,
34
    Self: Extractor,
35
{
36 1
    fn value(self) -> T {
37 1
        self.value
38 1
    }
39
}
40
impl<T: IndexOrd> PartialOrd for Container<T> {
41 1
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
42 1
        Some(IndexOrd::cmp(&self.value, &other.value))
43 1
    }
44
}
45
impl<T: IndexOrd> Ord for Container<T> {
46 1
    fn cmp(&self, other: &Self) -> Ordering {
47 1
        IndexOrd::cmp(&self.value, &other.value)
48 1
    }
49
}
50
impl<T: IndexOrd> PartialEq for Container<T> {
51 0
    fn eq(&self, other: &Self) -> bool {
52 0
        IndexOrd::cmp(&self.value, &other.value) == Ordering::Equal
53 0
    }
54
}
55
impl<T: IndexOrd> Eq for Container<T> {}
56
impl<T: IndexOrd> From<T> for Container<T> {
57 1
    fn from(value: T) -> Self {
58 1
        Container { value }
59 1
    }
60
}
61

62 1
#[derive(Clone, Debug, PartialEq)]
63
pub enum ValueChange<V> {
64 1
    ADD(V),
65 1
    REMOVE(Option<V>),
66
}
67

68 1
#[derive(Clone)]
69
pub enum Change {
70 1
    ADD(usize),
71 1
    REMOVE(Option<usize>),
72
}
73

74 1
#[derive(Clone)]
75
pub struct Changes {
76 1
    changes: Vec<Change>,
77
}
78

79
impl Changes {
80 1
    fn new(change: Change) -> Changes {
81 1
        Changes { changes: vec![change] }
82 1
    }
83 1
    fn push(&mut self, change: Change) {
84 1
        self.changes.push(change);
85 1
    }
86
}
87

88 1
fn add_value<V: IndexType>(values: &mut ValueContainer, val: V) -> usize {
89 1
    let v = V::Wrapper::get_vec_mut(values);
90 1
    let l = v.len();
91 1
    v.push(val.into());
92
    l
93 1
}
94 1
fn resolve_values<V: IndexType>(values: &ValueContainer, changes: Changes) -> Vec<ValueChange<V>> {
95 1
    let v = V::Wrapper::get_vec(values);
96 1
    changes
97
        .changes
98
        .iter()
99 1
        .map(|c| match c {
100 1
            Change::ADD(p) => ValueChange::ADD(v[*p].clone().value()),
101 1
            Change::REMOVE(o) => ValueChange::REMOVE(o.map(|p| v[p].clone().value())),
102 1
        })
103
        .collect()
104 1
}
105 1
fn add_entry<K: IndexType>(entries: &mut EntriesContainer, k: K, change: Change) {
106 1
    let v = K::Wrapper::get_entries_mut(entries).expect("wrong match from the type and the value container");
107 1
    match v.entry(k.into()) {
108 1
        BTreeEntry::Occupied(ref mut o) => {
109 1
            o.get_mut().push(change);
110
        }
111 1
        BTreeEntry::Vacant(va) => {
112 1
            va.insert(Changes::new(change));
113
        }
114
    }
115 1
}
116

117 1
fn get_changes<K: IndexType>(entries: &EntriesContainer, x: &K) -> Option<Changes> {
118 1
    if let Some(v) = K::Wrapper::get_entries(entries) {
119 1
        let k = x.clone().into();
120 1
        v.get(&k).map(Clone::clone)
121 0
    } else {
122 0
        None
123
    }
124 1
}
125

126 1
fn resolve_range<T: IndexType, R>(entries: &EntriesContainer, range: R) -> IntoIter<T>
127
where
128
    R: RangeBounds<T>,
129
{
130 1
    let into_range: (Bound<T::Wrapper>, Bound<T::Wrapper>) =
131 1
        (map_bound(range.start_bound()), map_bound(range.end_bound()));
132 1
    let v = T::Wrapper::get_entries(entries).expect("wrong match from the type and the value container");
133 1
    BTreeMap::range(v, into_range)
134 1
        .map(|x| x.0.clone().value())
135
        .collect::<Vec<T>>()
136
        .into_iter()
137 1
}
138

139
macro_rules! impl_index_data_type {
140
    ($t:ty, $v:path, $v2:path) => {
141
        impl Extractor for <$t as IndexType>::Wrapper {
142 1
            fn get_vec_mut(vc: &mut ValueContainer) -> &mut Vec<Self> {
143 1
                if let $v(ref mut v) = vc {
144
                    v
145
                } else {
146 0
                    panic!("wrong match from type and value container")
147
                }
148 1
            }
149 1
            fn get_vec(vc: &ValueContainer) -> &Vec<Self> {
150 1
                if let $v(ref v) = vc {
151
                    v
152
                } else {
153 0
                    panic!("wrong match from type and value container")
154
                }
155 1
            }
156

157 1
            fn get_entries(vc: &EntriesContainer) -> Option<&BTreeMap<Self, Changes>> {
158 1
                if let $v2(ref v) = vc {
159 1
                    Some(v)
160
                } else {
161 0
                    None
162
                }
163 1
            }
164 1
            fn get_entries_mut(vc: &mut EntriesContainer) -> Option<&mut BTreeMap<Self, Changes>> {
165 1
                if let $v2(ref mut v) = vc {
166 1
                    Some(v)
167
                } else {
168 0
                    None
169
                }
170 1
            }
171 1
            fn new_entries() -> EntriesContainer {
172 1
                $v2(BTreeMap::new())
173 1
            }
174

175 1
            fn new_values() -> ValueContainer {
176 1
                $v(Vec::new())
177 1
            }
178
        }
179
    };
180
}
181

182
macro_rules! container_enums {
183
    ($($variant:ident<$t:ty>),+,) => {
184
        #[derive(Clone)]
185
        pub enum EntriesContainer {
186
            $(
187
            $variant(BTreeMap<<$t as IndexType>::Wrapper, Changes>),
188
            )+
189
        }
190

191
        #[derive(Clone)]
192
        pub enum ValueContainer {
193
            $(
194
            $variant(Vec<<$t as IndexType>::Wrapper>),
195
            )+
196
        }
197

198 1
        fn eapplier(
199
            keys: &EntriesContainer,
200
            values: &ValueContainer,
201
            index_id: &IndexId,
202
            persy: &PersyImpl,
203
            tx: &mut Transaction,
204
        ) -> PRes<()> {
205 1
            match keys {
206
                $(
207 1
                EntriesContainer::$variant(k) => valapplier::<$t>(values, k, index_id, persy, tx),
208
                )+
209
            }
210 1
        }
211

212 1
        fn valapplier<K>(
213
            values: &ValueContainer,
214
            k: &BTreeMap<K::Wrapper, Changes>,
215
            index_id: &IndexId,
216
            persy: &PersyImpl,
217
            tx: &mut Transaction,
218
        ) -> PRes<()>
219
        where
220
            K: IndexType,
221
        {
222 1
            match values {
223
                $(
224 1
                ValueContainer::$variant(v) => apply_to_index::<K, $t>(persy, tx, index_id, k, v),
225
                )+
226
            }
227 1
        }
228

229
        $(
230
            impl_index_data_type!($t, ValueContainer::$variant, EntriesContainer::$variant);
231
        )+
232
    }
233
}
234

235
container_enums!(
236
    U8<u8>,
237
    U16<u16>,
238
    U32<u32>,
239
    U64<u64>,
240
    U128<u128>,
241
    I8<i8>,
242
    I16<i16>,
243
    I32<i32>,
244
    I64<i64>,
245
    I128<i128>,
246
    F32W<f32>,
247
    F64W<f64>,
248
    STRING<String>,
249
    PERSYID<PersyId>,
250
    BYTEVEC<ByteVec>,
251
);
252

253
pub trait Extractor: Sized {
254
    fn get_vec_mut(vc: &mut ValueContainer) -> &mut Vec<Self>;
255
    fn get_vec(vc: &ValueContainer) -> &Vec<Self>;
256
    fn get_entries(vc: &EntriesContainer) -> Option<&BTreeMap<Self, Changes>>;
257
    fn get_entries_mut(vc: &mut EntriesContainer) -> Option<&mut BTreeMap<Self, Changes>>;
258
    fn new_entries() -> EntriesContainer;
259
    fn new_values() -> ValueContainer;
260
}
261

262 1
fn apply_to_index<K, V>(
263
    persy: &PersyImpl,
264
    tx: &mut Transaction,
265
    index_id: &IndexId,
266
    keys: &BTreeMap<K::Wrapper, Changes>,
267
    values: &[V::Wrapper],
268
) -> PRes<()>
269
where
270
    K: IndexType,
271
    V: IndexType,
272
{
273 1
    let changes: Vec<_> = keys
274
        .iter()
275 1
        .map(|(k, c)| {
276 1
            let vals: Vec<_> = c
277
                .changes
278
                .iter()
279 1
                .map(|ch| match *ch {
280 1
                    Change::ADD(pos) => TreeValue::ADD(values[pos].clone().value()),
281 1
                    Change::REMOVE(pos) => TreeValue::REMOVE(pos.map(|p| values[p].clone().value())),
282 1
                })
283
                .collect();
284 1
            KeyChanges::new(k.clone().value(), vals)
285 1
        })
286
        .collect();
287 1
    let mut index = Indexes::get_index_keeper_tx::<K, V>(persy, tx, index_id)?;
288 1
    index.apply(&changes)?;
289 1
    index.update_changed()?;
290 1
    Ok(())
291 1
}
292

293 1
fn map_bound<T: Into<T1> + Clone, T1>(b: Bound<&T>) -> Bound<T1> {
294 1
    match b {
295 1
        Bound::Excluded(x) => Bound::Excluded((*x).clone().into()),
296 1
        Bound::Included(x) => Bound::Included((*x).clone().into()),
297 1
        Bound::Unbounded => Bound::Unbounded,
298
    }
299 1
}
300

301
pub struct IndexTransactionKeeper {
302
    indexex_changes: HashMap<IndexId, (EntriesContainer, ValueContainer)>,
303
}
304

305
impl IndexTransactionKeeper {
306 1
    pub fn new() -> IndexTransactionKeeper {
307 1
        IndexTransactionKeeper {
308 1
            indexex_changes: HashMap::new(),
309
        }
310 1
    }
311

312 1
    pub fn put<K, V>(&mut self, index: IndexId, k: K, v: V)
313
    where
314
        K: IndexType,
315
        V: IndexType,
316
    {
317 1
        match self.indexex_changes.entry(index) {
318 1
            Entry::Occupied(ref mut o) => {
319 1
                let (entries, values) = o.get_mut();
320 1
                let pos = add_value(values, v);
321 1
                add_entry(entries, k, Change::ADD(pos));
322
            }
323 1
            Entry::Vacant(va) => {
324 1
                let mut values = V::Wrapper::new_values();
325 1
                let mut keys = K::Wrapper::new_entries();
326 1
                let pos = add_value(&mut values, v);
327 1
                add_entry(&mut keys, k, Change::ADD(pos));
328 1
                va.insert((keys, values));
329 1
            }
330
        }
331 1
    }
332

333 1
    pub fn remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
334
    where
335
        K: IndexType,
336
        V: IndexType,
337
    {
338 1
        match self.indexex_changes.entry(index) {
339 1
            Entry::Occupied(ref mut o) => {
340 1
                let pos = if let Some(val) = v {
341 1
                    Some(add_value(&mut o.get_mut().1, val))
342 1
                } else {
343 1
                    None
344
                };
345 1
                add_entry(&mut o.get_mut().0, k, Change::REMOVE(pos));
346
            }
347 1
            Entry::Vacant(va) => {
348 1
                let mut values = V::Wrapper::new_values();
349 1
                let mut keys = K::Wrapper::new_entries();
350 1
                let pos = if let Some(val) = v {
351 0
                    Some(add_value(&mut values, val))
352
                } else {
353 1
                    None
354
                };
355 1
                add_entry(&mut keys, k, Change::REMOVE(pos));
356 1
                va.insert((keys, values));
357 1
            }
358
        }
359 1
    }
360

361 1
    pub fn get_changes<K, V>(&self, index: IndexId, k: &K) -> Option<Vec<ValueChange<V>>>
362
    where
363
        K: IndexType,
364
        V: IndexType,
365
    {
366 1
        self.indexex_changes
367
            .get(&index)
368 1
            .map(|ref o| get_changes(&o.0, k).map(|c| resolve_values(&o.1, c)))
369
            .and_then(std::convert::identity)
370 1
    }
371

372 1
    pub fn apply_changes<K, V>(
373
        &self,
374
        index_id: IndexId,
375
        vm: ValueMode,
376
        k: &K,
377
        pers: Option<Value<V>>,
378
    ) -> PRes<Option<Value<V>>>
379
    where
380
        K: IndexType,
381
        V: IndexType,
382
    {
383 1
        let mut result = pers;
384 1
        if let Some(key_changes) = self.get_changes::<K, V>(index_id, k) {
385 1
            for change in key_changes {
386 1
                result = match change {
387 1
                    ValueChange::ADD(add_value) => Some(if let Some(s_result) = result {
388 1
                        match s_result {
389 1
                            Value::SINGLE(v) => match vm {
390 1
                                ValueMode::REPLACE => Value::SINGLE(add_value),
391
                                ValueMode::EXCLUSIVE => {
392 1
                                    if compare(&v, &add_value) == Ordering::Equal {
393 1
                                        Value::SINGLE(v)
394
                                    } else {
395
                                        // TODO: recover index name
396 1
                                        return Err(PersyError::IndexDuplicateKey("".to_string(), format!("{}", k)));
397
                                    }
398
                                }
399
                                ValueMode::CLUSTER => {
400 1
                                    if compare(&v, &add_value) == Ordering::Equal {
401 1
                                        Value::SINGLE(v)
402
                                    } else {
403 1
                                        Value::CLUSTER(vec![v, add_value])
404
                                    }
405
                                }
406 1
                            },
407 0
                            Value::CLUSTER(mut values) => {
408 0
                                if let Ok(pos) = values.binary_search_by(|x| compare(x, &add_value)) {
409 0
                                    values.insert(pos, add_value);
410
                                }
411 0
                                Value::CLUSTER(values)
412 0
                            }
413
                        }
414 0
                    } else {
415 1
                        Value::SINGLE(add_value)
416 1
                    }),
417 1
                    ValueChange::REMOVE(rv) => rv.and_then(|remove_value| {
418 0
                        result.and_then(|s_result| match s_result {
419 0
                            Value::SINGLE(v) => {
420 0
                                if compare(&v, &remove_value) == Ordering::Equal {
421 0
                                    None
422
                                } else {
423 0
                                    Some(Value::SINGLE(v))
424
                                }
425 0
                            }
426 0
                            Value::CLUSTER(mut values) => {
427 0
                                if let Ok(pos) = values.binary_search_by(|x| compare(x, &remove_value)) {
428 0
                                    values.remove(pos);
429
                                }
430 0
                                Some(if values.len() == 1 {
431 0
                                    Value::SINGLE(values.pop().unwrap())
432
                                } else {
433 0
                                    Value::CLUSTER(values)
434
                                })
435 0
                            }
436 0
                        })
437 0
                    }),
438
                };
439 1
            }
440 1
        }
441 1
        Ok(result)
442 1
    }
443

444 1
    pub fn apply(&self, persy: &PersyImpl, tx: &mut Transaction) -> PRes<()> {
445 1
        for (index, values) in &self.indexex_changes {
446 1
            eapplier(&values.0, &values.1, index, persy, tx)?;
447
        }
448 1
        Ok(())
449 1
    }
450

451 1
    pub fn range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
452
    where
453
        K: IndexType,
454
        V: IndexType,
455
        R: RangeBounds<K>,
456
    {
457 1
        self.indexex_changes.get(&index).map(|x| resolve_range(&x.0, range))
458 1
    }
459

460 1
    pub fn changed_indexes(&self) -> Vec<IndexId> {
461 1
        self.indexex_changes.keys().cloned().collect()
462 1
    }
463 0
    pub fn remove_changes(&mut self, index_id: &IndexId) {
464 0
        self.indexex_changes.remove(index_id);
465 0
    }
466
}
467

468
pub trait IndexKeeper<K, V> {
469
    fn load(&self, node: &NodeRef) -> PRes<Node<K, V>>;
470
    fn get_root(&self) -> PRes<Option<NodeRef>>;
471
    fn value_mode(&self) -> ValueMode;
472
    fn index_name(&self) -> &String;
473
}
474

475
pub trait IndexModify<K, V>: IndexKeeper<K, V> {
476
    fn load_modify(&self, node: &NodeRef) -> PRes<Option<(Rc<Node<K, V>>, u16)>>;
477
    fn lock(&mut self, node: &NodeRef, version: u16) -> PRes<bool>;
478
    fn owned(&mut self, node_ref: &NodeRef, node: Rc<Node<K, V>>) -> Node<K, V>;
479
    fn unlock(&mut self, node: &NodeRef) -> PRes<bool>;
480
    fn unlock_config(&mut self) -> PRes<bool>;
481
    fn get_root_refresh(&mut self) -> PRes<Option<NodeRef>>;
482
    fn lock_config(&mut self) -> PRes<bool>;
483
    fn insert(&mut self, node: Node<K, V>) -> PRes<NodeRef>;
484
    fn update(&mut self, node_ref: &NodeRef, node: Node<K, V>, version: u16) -> PRes<()>;
485
    fn delete(&mut self, node: &NodeRef, version: u16) -> PRes<()>;
486
    fn set_root(&mut self, root: Option<NodeRef>) -> PRes<()>;
487
    fn bottom_limit(&self) -> usize;
488
    fn top_limit(&self) -> usize;
489
}
490

491
pub struct IndexSegmentKeeper<'a> {
492
    name: String,
493
    segment: SegmentId,
494
    root: Option<NodeRef>,
495
    store: &'a PersyImpl,
496
    snapshot: SnapshotId,
497
    value_mode: ValueMode,
498
}
499

500
impl<'a> IndexSegmentKeeper<'a> {
501 1
    pub fn new(
502
        name: &str,
503
        index_id: &IndexId,
504
        root: Option<NodeRef>,
505
        store: &'a PersyImpl,
506
        snapshot: SnapshotId,
507
        value_mode: ValueMode,
508
    ) -> IndexSegmentKeeper<'a> {
509 1
        IndexSegmentKeeper {
510 1
            name: name.to_string(),
511 1
            segment: index_id_to_segment_id_data(index_id),
512 1
            root,
513
            store,
514
            snapshot,
515
            value_mode,
516 0
        }
517 1
    }
518
}
519

520
impl<'a, K: IndexType, V: IndexType> IndexKeeper<K, V> for IndexSegmentKeeper<'a> {
521 1
    fn load(&self, node: &NodeRef) -> PRes<Node<K, V>> {
522 1
        let rec = self
523
            .store
524 1
            .read_snap_fn(self.segment, &node, self.snapshot, |x| deserialize(x))?
525 1
            .unwrap()?;
526 1
        Ok(rec)
527 1
    }
528 1
    fn get_root(&self) -> PRes<Option<NodeRef>> {
529 1
        Ok(self.root.clone())
530 1
    }
531 1
    fn value_mode(&self) -> ValueMode {
532 1
        self.value_mode.clone()
533 1
    }
534

535
    fn index_name(&self) -> &String {
536
        &self.name
537
    }
538
}
539

540
struct LockData {
541
    version: u16,
542
    counter: u32,
543
}
544

545
pub struct IndexSegmentKeeperTx<'a, K: IndexType, V: IndexType> {
546
    name: String,
547
    index_id: IndexId,
548
    root: Option<NodeRef>,
549
    config_version: u16,
550
    store: &'a PersyImpl,
551
    tx: &'a mut Transaction,
552
    value_mode: ValueMode,
553
    changed: Option<HashMap<NodeRef, (Rc<Node<K, V>>, u16)>>,
554
    bottom_limit: usize,
555
    top_limit: usize,
556
    locked: HashMap<NodeRef, LockData>,
557
    updated_root: bool,
558
}
559

560
impl<'a, K: IndexType, V: IndexType> IndexSegmentKeeperTx<'a, K, V> {
561 1
    pub fn new(
562
        name: &str,
563
        index_id: &IndexId,
564
        root: Option<NodeRef>,
565
        config_version: u16,
566
        store: &'a PersyImpl,
567
        tx: &'a mut Transaction,
568
        value_mode: ValueMode,
569
        bottom_limit: usize,
570
        top_limit: usize,
571
    ) -> IndexSegmentKeeperTx<'a, K, V> {
572 1
        IndexSegmentKeeperTx {
573 1
            name: name.to_string(),
574 1
            index_id: index_id.clone(),
575 1
            root,
576
            config_version,
577
            store,
578
            tx,
579
            value_mode,
580 1
            changed: None,
581
            bottom_limit,
582
            top_limit,
583 1
            locked: HashMap::new(),
584
            updated_root: false,
585 0
        }
586 1
    }
587 1
    pub fn update_changed(&mut self) -> PRes<()> {
588 1
        let segment = index_id_to_segment_id_data(&self.index_id);
589 1
        if let Some(m) = &self.changed {
590 1
            for (node_ref, node) in m {
591 1
                self.store.update(self.tx, segment, &node_ref, &serialize(&node.0)?)?;
592
            }
593
        }
594 1
        if self.updated_root {
595 1
            Indexes::update_index_root(self.store, self.tx, &self.index_id, self.root.clone())?;
596
        }
597 1
        Ok(())
598 1
    }
599
}
600

601
impl<'a, K: IndexType, V: IndexType> IndexModify<K, V> for IndexSegmentKeeperTx<'a, K, V> {
602 1
    fn load_modify(&self, node: &NodeRef) -> PRes<Option<(Rc<Node<K, V>>, u16)>> {
603 1
        if let Some(m) = &self.changed {
604 1
            if let Some(n) = m.get(node) {
605 1
                return Ok(Some(n.clone()));
606
            }
607
        }
608 1
        let segment = index_id_to_segment_id_data(&self.index_id);
609 1
        if let Some((rec, version)) = self
610
            .store
611 1
            .read_tx_internal_fn(self.tx, segment, &node, |x| deserialize(x))?
612
        {
613 1
            Ok(Some((Rc::new(rec?), version)))
614 0
        } else {
615 0
            Ok(None)
616
        }
617 1
    }
618 1
    fn lock(&mut self, node: &NodeRef, version: u16) -> PRes<bool> {
619 1
        if let Some(lock_data) = self.locked.get_mut(node) {
620 1
            if version == lock_data.version {
621 1
                lock_data.counter += 1;
622 1
                Ok(true)
623 0
            } else if version < lock_data.version {
624 0
                Ok(false)
625
            } else {
626 0
                panic!("wrong matched versions {} {}", version, lock_data.version);
627
            }
628
        } else {
629 1
            let segment = index_id_to_segment_id_data(&self.index_id);
630 1
            if self.store.lock_record(self.tx, &segment, node, version)? {
631 1
                self.locked.insert(node.clone(), LockData { version, counter: 1 });
632 1
                Ok(true)
633
            } else {
634 1
                Ok(false)
635
            }
636
        }
637 1
    }
638

639 1
    fn owned(&mut self, node_ref: &NodeRef, mut node: Rc<Node<K, V>>) -> Node<K, V> {
640 1
        debug_assert!(self.locked.contains_key(node_ref));
641 1
        if let Some(changed) = &mut self.changed {
642 1
            changed.remove(node_ref);
643
        }
644 1
        Rc::make_mut(&mut node);
645 1
        Rc::try_unwrap(node).ok().unwrap()
646 1
    }
647

648 1
    fn unlock(&mut self, node: &NodeRef) -> PRes<bool> {
649 1
        if let Entry::Occupied(mut x) = self.locked.entry(node.clone()) {
650 1
            x.get_mut().counter -= 1;
651 1
            if x.get().counter == 0 {
652 1
                x.remove();
653 1
                let segment = index_id_to_segment_id_data(&self.index_id);
654 1
                self.store.unlock_record(self.tx, &segment, node)?;
655 1
                Ok(true)
656
            } else {
657 0
                Ok(false)
658
            }
659
        } else {
660 0
            Ok(false)
661
        }
662 1
    }
663

664 1
    fn get_root_refresh(&mut self) -> PRes<Option<NodeRef>> {
665 1
        if !self.updated_root {
666 1
            let (config, version) = Indexes::get_index_tx(self.store, self.tx, &self.index_id)?;
667 1
            self.root = config.get_root();
668 1
            self.config_version = version;
669 1
        }
670 1
        Ok(self.root.clone())
671 1
    }
672
    fn unlock_config(&mut self) -> PRes<bool> {
673
        let config_id = Indexes::get_config_id(self.store, self.tx, &self.index_id)?.0;
674
        if let Entry::Occupied(mut x) = self.locked.entry(config_id.clone()) {
675
            x.get_mut().counter -= 1;
676
            if x.get().counter == 0 {
677
                x.remove();
678
                let segment = index_id_to_segment_id_meta(&self.index_id);
679
                self.store.unlock_record(self.tx, &segment, &config_id)?;
680
                Ok(true)
681
            } else {
682
                Ok(false)
683
            }
684
        } else {
685
            Ok(false)
686
        }
687
    }
688 1
    fn lock_config(&mut self) -> PRes<bool> {
689 1
        let config_id = Indexes::get_config_id(self.store, self.tx, &self.index_id)?.0;
690

691 1
        let segment = index_id_to_segment_id_meta(&self.index_id);
692 1
        if let Some(lock_data) = self.locked.get_mut(&config_id) {
693 1
            if self.config_version == lock_data.version {
694 1
                lock_data.counter += 1;
695 1
                Ok(true)
696
            } else {
697 0
                panic!("this should never happen");
698
            }
699 1
        } else if self
700
            .store
701 1
            .lock_record(self.tx, &segment, &config_id, self.config_version)?
702
        {
703 1
            self.locked.insert(
704 1
                config_id.clone(),
705 1
                LockData {
706 1
                    version: self.config_version,
707
                    counter: 1,
708
                },
709
            );
710 1
            Ok(true)
711
        } else {
712 1
            let (config, version) = Indexes::get_index_tx(self.store, self.tx, &self.index_id)?;
713 1
            self.root = config.get_root();
714 1
            self.config_version = version;
715 1
            Ok(false)
716 1
        }
717 1
    }
718

719 1
    fn insert(&mut self, node: Node<K, V>) -> PRes<NodeRef> {
720 1
        let segment = index_id_to_segment_id_data(&self.index_id);
721 1
        let node_ref = self.store.insert_record(self.tx, &segment, &serialize(&node)?)?;
722 1
        self.changed
723
            .get_or_insert_with(HashMap::new)
724 1
            .insert(node_ref.clone(), (Rc::new(node), 0));
725 1
        self.locked
726 1
            .insert(node_ref.clone(), LockData { version: 0, counter: 1 });
727 1
        Ok(node_ref)
728 1
    }
729

730 1
    fn update(&mut self, node_ref: &NodeRef, node: Node<K, V>, version: u16) -> PRes<()> {
731 1
        debug_assert!(self.locked.contains_key(node_ref));
732 1
        self.changed
733
            .get_or_insert_with(HashMap::new)
734 1
            .insert(node_ref.clone(), (Rc::new(node), version));
735 1
        Ok(())
736 1
    }
737

738 0
    fn delete(&mut self, node: &NodeRef, _version: u16) -> PRes<()> {
739 0
        debug_assert!(self.locked.contains_key(node));
740 0
        if let Some(m) = &mut self.changed {
741 0
            m.remove(node);
742
        }
743 0
        let segment = index_id_to_segment_id_data(&self.index_id);
744 0
        self.store.delete(self.tx, segment, &node)?;
745 0
        Ok(())
746 0
    }
747 1
    fn set_root(&mut self, root: Option<NodeRef>) -> PRes<()> {
748 1
        self.root = root;
749 1
        self.updated_root = true;
750 1
        Ok(())
751 1
    }
752

753 1
    fn bottom_limit(&self) -> usize {
754 1
        self.bottom_limit
755 1
    }
756 1
    fn top_limit(&self) -> usize {
757 1
        self.top_limit
758 1
    }
759
}
760

761
impl<'a, K: IndexType, V: IndexType> IndexKeeper<K, V> for IndexSegmentKeeperTx<'a, K, V> {
762 1
    fn load(&self, node: &NodeRef) -> PRes<Node<K, V>> {
763 1
        if let Some(m) = &self.changed {
764 0
            if let Some(n) = m.get(node) {
765 0
                return Ok(n.0.as_ref().clone());
766
            }
767
        }
768 1
        let segment = index_id_to_segment_id_data(&self.index_id);
769 1
        let (rec, _) = self
770
            .store
771 1
            .read_tx_internal_fn(self.tx, segment, &node, |x| deserialize(x))?
772 0
            .unwrap();
773 1
        Ok(rec?)
774 1
    }
775 1
    fn get_root(&self) -> PRes<Option<NodeRef>> {
776 1
        Ok(self.root.clone())
777 1
    }
778 1
    fn value_mode(&self) -> ValueMode {
779 1
        self.value_mode.clone()
780 1
    }
781

782 1
    fn index_name(&self) -> &String {
783
        &self.name
784 1
    }
785
}
786

787
/// Index Iterator implementation for iterating on a range of keys considering in transaction
788
/// changes
789
pub struct TxIndexRawIter<K: IndexType, V: IndexType> {
790
    index_name: IndexId,
791
    in_tx: Option<IntoIter<K>>,
792
    persistent: Option<IndexRawIter<K, V>>,
793
    in_tx_front: Option<Option<K>>,
794
    persistent_front: Option<Option<(K, Value<V>)>>,
795
    in_tx_back: Option<Option<K>>,
796
    persistent_back: Option<Option<(K, Value<V>)>>,
797
    value_mode: ValueMode,
798
}
799

800
impl<K, V> TxIndexRawIter<K, V>
801
where
802
    K: IndexType,
803
    V: IndexType,
804
{
805 1
    pub fn new(
806
        index_name: IndexId,
807
        in_tx: Option<IntoIter<K>>,
808
        persistent: Option<IndexRawIter<K, V>>,
809
        value_mode: ValueMode,
810
    ) -> TxIndexRawIter<K, V> {
811 1
        TxIndexRawIter {
812
            index_name,
813 1
            in_tx,
814 1
            persistent,
815 1
            in_tx_front: None,
816 1
            persistent_front: None,
817 1
            in_tx_back: None,
818 1
            persistent_back: None,
819
            value_mode,
820
        }
821 1
    }
822

823 1
    fn apply_changes(
824
        tx: &mut Transaction,
825
        vm: ValueMode,
826
        index: IndexId,
827
        k: K,
828
        pers: Option<Value<V>>,
829
    ) -> Option<(K, Value<V>)> {
830 1
        tx.apply_changes(vm, index, &k, pers).unwrap_or(None).map(|v| (k, v))
831 1
    }
832

833 1
    pub fn next(&mut self, persy_impl: &Arc<PersyImpl>, tx: &mut Transaction) -> Option<(K, Value<V>)> {
834 1
        loop {
835 1
            let vm = self.value_mode.clone();
836 1
            let index = self.index_name.clone();
837 1
            let apply_changes = |k, o| Self::apply_changes(tx, vm, index, k, o);
838 1
            match (&mut self.in_tx, &mut self.persistent) {
839 1
                (Some(it), Some(pers)) => {
840 1
                    match (
841 1
                        self.in_tx_front.get_or_insert_with(|| it.next()).clone(),
842 1
                        self.persistent_front
843 1
                            .get_or_insert_with(|| pers.next(persy_impl))
844
                            .clone(),
845 0
                    ) {
846 1
                        (Some(tx_k), Some((pers_k, vals))) => match tx_k.cmp(&pers_k) {
847 1
                            Ordering::Less => {
848 0
                                self.in_tx_front = None;
849 0
                                let res = apply_changes(tx_k, None);
850 0
                                if res.is_some() {
851 1
                                    break res;
852
                                }
853 0
                            }
854
                            Ordering::Equal => {
855 1
                                self.in_tx_front = None;
856 1
                                self.persistent_front = None;
857 1
                                let res = apply_changes(tx_k, Some(vals));
858

859 1
                                if res.is_some() {
860 0
                                    break res;
861
                                }
862 0
                            }
863
                            Ordering::Greater => {
864 1
                                self.persistent_front = None;
865 1
                                break Some((pers_k, vals));
866
                            }
867 1
                        },
868 1
                        (Some(tx_k), None) => {
869 1
                            self.in_tx_front = None;
870 1
                            let res = apply_changes(tx_k, None);
871 1
                            if res.is_some() {
872 1
                                break res;
873
                            }
874 1
                        }
875 1
                        (None, Some((pers_k, vals))) => {
876 0
                            self.persistent_front = None;
877 0
                            break Some((pers_k, vals));
878 0
                        }
879 1
                        (None, None) => break None,
880
                    }
881 1
                }
882 0
                (Some(it), None) => {
883 0
                    let res = apply_changes(it.next().unwrap(), None);
884 0
                    if res.is_some() {
885 0
                        break res;
886
                    }
887 0
                }
888 0
                (None, Some(pers)) => break pers.next(persy_impl),
889 0
                (None, None) => break None,
890
            }
891
        }
892 1
    }
893

894 1
    pub fn next_back(&mut self, persy_impl: &Arc<PersyImpl>, tx: &mut Transaction) -> Option<(K, Value<V>)> {
895 1
        loop {
896 1
            let vm = self.value_mode.clone();
897 1
            let index = self.index_name.clone();
898 1
            let apply_changes = |k, o| Self::apply_changes(tx, vm, index, k, o);
899 1
            match (&mut self.in_tx, &mut self.persistent) {
900 1
                (Some(it), Some(pers)) => {
901 1
                    match (
902 1
                        self.in_tx_back.get_or_insert_with(|| it.next_back()).clone(),
903 1
                        self.persistent_back
904 1
                            .get_or_insert_with(|| pers.next_back(persy_impl))
905
                            .clone(),
906 0
                    ) {
907 1
                        (Some(tx_k), Some((pers_k, vals))) => match tx_k.cmp(&pers_k) {
908 1
                            Ordering::Less => {
909 1
                                self.persistent_back = None;
910 1
                                break Some((pers_k, vals));
911
                            }
912
                            Ordering::Equal => {
913 1
                                self.in_tx_back = None;
914 1
                                self.persistent_back = None;
915 1
                                let res = apply_changes(tx_k, Some(vals));
916

917 1
                                if res.is_some() {
918 0
                                    break res;
919
                                }
920 0
                            }
921
                            Ordering::Greater => {
922 1
                                self.in_tx_back = None;
923 1
                                let res = apply_changes(tx_k, None);
924

925 1
                                if res.is_some() {
926 1
                                    break res;
927
                                }
928 0
                            }
929 1
                        },
930 1
                        (Some(tx_k), None) => {
931 1
                            self.in_tx_back = None;
932 1
                            let res = apply_changes(tx_k, None);
933 1
                            if res.is_some() {
934 1
                                break res;
935
                            }
936 1
                        }
937 1
                        (None, Some((pers_k, vals))) => {
938 1
                            self.persistent_back = None;
939 1
                            break Some((pers_k, vals));
940 0
                        }
941 1
                        (None, None) => break None,
942
                    }
943 1
                }
944 0
                (Some(it), None) => {
945 0
                    let res = apply_changes(it.next_back().unwrap(), None);
946 0
                    if res.is_some() {
947 0
                        break res;
948
                    }
949 0
                }
950 0
                (None, Some(pers)) => break pers.next_back(persy_impl),
951 0
                (None, None) => break None,
952
            }
953
        }
954 1
    }
955
}
956

957
pub struct IndexRawIter<K: IndexType, V: IndexType> {
958
    index_id: IndexId,
959
    read_snapshot: SnapshotId,
960
    iter: PageIter<K, V>,
961
    back: PageIterBack<K, V>,
962
    release_snapshot: bool,
963
}
964

965
impl<K, V> IndexRawIter<K, V>
966
where
967
    K: IndexType,
968
    V: IndexType,
969
{
970 1
    pub fn new(
971
        index_id: IndexId,
972
        read_snapshot: SnapshotId,
973
        iter: PageIter<K, V>,
974
        back: PageIterBack<K, V>,
975
        release_snapshot: bool,
976
    ) -> IndexRawIter<K, V> {
977 1
        IndexRawIter {
978
            index_id,
979
            read_snapshot,
980 1
            iter,
981 1
            back,
982
            release_snapshot,
983
        }
984 1
    }
985

986 1
    pub fn next(&mut self, persy_impl: &PersyImpl) -> Option<(K, Value<V>)> {
987 1
        let back_keep = self.back.iter.peek();
988 1
        if let (Some(s), Some(e)) = (self.iter.iter.peek(), back_keep) {
989 1
            if s.key.cmp(&e.key) == Ordering::Greater {
990 1
                return None;
991
            }
992
        }
993 1
        if let Some(n) = self.iter.iter.next() {
994 1
            if self.iter.iter.peek().is_none() {
995 1
                if let Ok(iter) = persy_impl.index_next(&self.index_id, self.read_snapshot, Bound::Excluded(&n.key)) {
996 1
                    self.iter = iter;
997
                }
998 1
            }
999 1
            Some((n.key, n.value))
1000 0
        } else {
1001 1
            None
1002
        }
1003 1
    }
1004

1005 1
    pub fn next_back(&mut self, persy_impl: &PersyImpl) -> Option<(K, Value<V>)> {
1006 1
        let front_keep = self.iter.iter.peek();
1007 1
        if let (Some(s), Some(e)) = (self.back.iter.peek(), front_keep) {
1008 1
            if s.key.cmp(&e.key) == Ordering::Less {
1009 1
                return None;
1010
            }
1011
        }
1012 1
        if let Some(n) = self.back.iter.next() {
1013 1
            if self.back.iter.peek().is_none() {
1014 1
                if let Ok(back) = persy_impl.index_back(&self.index_id, self.read_snapshot, Bound::Excluded(&n.key)) {
1015 1
                    self.back = back;
1016
                }
1017 1
            }
1018 1
            Some((n.key, n.value))
1019 0
        } else {
1020 1
            None
1021
        }
1022 1
    }
1023 1
    pub fn release(&self, persy_impl: &PersyImpl) -> PRes<()> {
1024 1
        if self.release_snapshot {
1025 1
            persy_impl.release_snapshot(self.read_snapshot)?;
1026
        }
1027 1
        Ok(())
1028 1
    }
1029
}
1030

1031
#[cfg(test)]
1032
mod tests {
1033
    use super::{ByteVec, IndexTransactionKeeper, IndexType, ValueChange};
1034
    use crate::id::{IndexId, PersyId, RecRef, SegmentId};
1035
    use std::fmt::Debug;
1036

1037 1
    fn keeper_test_for_type<K: IndexType + PartialEq, V: IndexType + Debug + PartialEq>(k: K, dk: K, v: V) {
1038 1
        let name = IndexId::new(SegmentId::new(30), SegmentId::new(40));
1039 1
        let mut keeper = IndexTransactionKeeper::new();
1040 1
        keeper.put(name.clone(), k.clone(), v.clone());
1041 1
        let ret = keeper.get_changes(name.clone(), &k);
1042 1
        assert_eq!(ret, Some(vec![ValueChange::ADD(v.clone())]));
1043 1
        keeper.remove(name.clone(), dk.clone(), Some(v.clone()));
1044 1
        let ret = keeper.get_changes(name, &dk);
1045 1
        assert_eq!(ret, Some(vec![ValueChange::REMOVE(Some(v))]));
1046 1
    }
1047

1048
    #[test]
1049 1
    fn simple_tx_keeper_test() {
1050 1
        keeper_test_for_type::<u8, u8>(10, 15, 10);
1051 1
        keeper_test_for_type::<u16, u16>(10, 15, 10);
1052 1
        keeper_test_for_type::<u32, u32>(10, 15, 10);
1053 1
        keeper_test_for_type::<u64, u64>(10, 15, 10);
1054 1
        keeper_test_for_type::<u128, u128>(10, 15, 10);
1055 1
        keeper_test_for_type::<i8, i8>(10, 15, 10);
1056 1
        keeper_test_for_type::<i16, i16>(10, 15, 10);
1057 1
        keeper_test_for_type::<i32, i32>(10, 15, 10);
1058 1
        keeper_test_for_type::<i64, i64>(10, 15, 10);
1059 1
        keeper_test_for_type::<i128, i128>(10, 15, 10);
1060 1
        keeper_test_for_type::<f32, f32>(20.0, 10.0, 20.0);
1061 1
        keeper_test_for_type::<f64, f64>(20.0, 10.0, 20.0);
1062 1
        keeper_test_for_type::<String, String>("a".to_string(), "b".to_string(), "a".to_string());
1063 1
        keeper_test_for_type::<ByteVec, ByteVec>(vec![0, 1].into(), vec![0, 2].into(), vec![0, 1].into());
1064 1
        let id = PersyId(RecRef::new(10, 20));
1065 1
        let id1 = PersyId(RecRef::new(20, 20));
1066 1
        let id2 = PersyId(RecRef::new(30, 20));
1067 1
        keeper_test_for_type::<PersyId, PersyId>(id, id1, id2);
1068 1
    }
1069
}

Read our documentation on viewing source code .

Loading