1
use crate::index::{
2
    config::{ByteVec, IndexOrd, IndexType, Indexes, ValueMode, WrapperType},
3
    nodes::{compare, Node, NodeRef, PageIter, PageIterBack, Value},
4
    serialization::{deserialize, serialize},
5
    tree::{IndexApply, KeyChanges, ValueChange as TreeValue},
6
};
7
use crate::{
8
    error::{PRes, PersyError},
9
    id::{index_id_to_segment_id_data, index_id_to_segment_id_meta, IndexId, SegmentId},
10
    persy::PersyImpl,
11
    snapshot::SnapshotId,
12
    transaction::Transaction,
13
    PersyId,
14
};
15
use std::{
16
    cmp::Ordering,
17
    collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap},
18
    iter::DoubleEndedIterator,
19
    ops::{Bound, RangeBounds},
20
    rc::Rc,
21
    sync::Arc,
22
    vec::IntoIter,
23
};
24

25 1
#[derive(Clone)]
26
pub struct Container<T: IndexOrd> {
27 1
    value: T,
28
}
29
impl<T> WrapperType<T> for Container<T>
30
where
31
    T: IndexType,
32
    Self: Extractor,
33
{
34 1
    fn value(self) -> T {
35 1
        self.value
36 1
    }
37
}
38
impl<T: IndexOrd> PartialOrd for Container<T> {
39 1
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
40 1
        Some(IndexOrd::cmp(&self.value, &other.value))
41 1
    }
42
}
43
impl<T: IndexOrd> Ord for Container<T> {
44 1
    fn cmp(&self, other: &Self) -> Ordering {
45 1
        IndexOrd::cmp(&self.value, &other.value)
46 1
    }
47
}
48
impl<T: IndexOrd> PartialEq for Container<T> {
49 0
    fn eq(&self, other: &Self) -> bool {
50 0
        IndexOrd::cmp(&self.value, &other.value) == Ordering::Equal
51 0
    }
52
}
53
impl<T: IndexOrd> Eq for Container<T> {}
54
impl<T: IndexOrd> From<T> for Container<T> {
55 1
    fn from(value: T) -> Self {
56 1
        Container { value }
57 1
    }
58
}
59

60 1
#[derive(Clone, Debug, PartialEq)]
61
pub enum ValueChange<V> {
62 1
    ADD(V),
63 1
    REMOVE(Option<V>),
64
}
65

66 1
#[derive(Clone)]
67
pub enum Change {
68 1
    ADD(usize),
69 1
    REMOVE(Option<usize>),
70
}
71

72 1
#[derive(Clone)]
73
pub struct Changes {
74 1
    changes: Vec<Change>,
75
}
76

77
impl Changes {
78 1
    fn new(change: Change) -> Changes {
79 1
        Changes { changes: vec![change] }
80 1
    }
81 1
    fn push(&mut self, change: Change) {
82 1
        self.changes.push(change);
83 1
    }
84
}
85

86 1
fn add_value<V: IndexType>(values: &mut ValueContainer, val: V) -> usize {
87 1
    let v = V::Wrapper::get_vec_mut(values);
88 1
    let l = v.len();
89 1
    v.push(val.into());
90
    l
91 1
}
92 1
fn resolve_values<V: IndexType>(values: &ValueContainer, changes: Changes) -> Vec<ValueChange<V>> {
93 1
    let v = V::Wrapper::get_vec(values);
94 1
    changes
95
        .changes
96
        .iter()
97 1
        .map(|c| match c {
98 1
            Change::ADD(p) => ValueChange::ADD(v[*p].clone().value()),
99 1
            Change::REMOVE(o) => ValueChange::REMOVE(o.map(|p| v[p].clone().value())),
100 1
        })
101
        .collect()
102 1
}
103 1
fn add_entry<K: IndexType>(entries: &mut EntriesContainer, k: K, change: Change) {
104 1
    let v = K::Wrapper::get_entries_mut(entries).expect("wrong match from the type and the value container");
105 1
    match v.entry(k.into()) {
106 1
        BTreeEntry::Occupied(ref mut o) => {
107 1
            o.get_mut().push(change);
108
        }
109 1
        BTreeEntry::Vacant(va) => {
110 1
            va.insert(Changes::new(change));
111
        }
112
    }
113 1
}
114

115 1
fn get_changes<K: IndexType>(entries: &EntriesContainer, x: &K) -> Option<Changes> {
116 1
    if let Some(v) = K::Wrapper::get_entries(entries) {
117 1
        let k = x.clone().into();
118 1
        v.get(&k).map(Clone::clone)
119 0
    } else {
120 0
        None
121
    }
122 1
}
123

124 1
fn resolve_range<T: IndexType, R>(entries: &EntriesContainer, range: R) -> IntoIter<T>
125
where
126
    R: RangeBounds<T>,
127
{
128 1
    let into_range: (Bound<T::Wrapper>, Bound<T::Wrapper>) =
129 1
        (map_bound(range.start_bound()), map_bound(range.end_bound()));
130 1
    let v = T::Wrapper::get_entries(entries).expect("wrong match from the type and the value container");
131 1
    BTreeMap::range(v, into_range)
132 1
        .map(|x| x.0.clone().value())
133
        .collect::<Vec<T>>()
134
        .into_iter()
135 1
}
136

137
macro_rules! impl_index_data_type {
138
    ($t:ty, $v:path, $v2:path) => {
139
        impl Extractor for <$t as IndexType>::Wrapper {
140 1
            fn get_vec_mut(vc: &mut ValueContainer) -> &mut Vec<Self> {
141 1
                if let $v(ref mut v) = vc {
142
                    v
143
                } else {
144 0
                    panic!("wrong match from type and value container")
145
                }
146 1
            }
147 1
            fn get_vec(vc: &ValueContainer) -> &Vec<Self> {
148 1
                if let $v(ref v) = vc {
149
                    v
150
                } else {
151 0
                    panic!("wrong match from type and value container")
152
                }
153 1
            }
154

155 1
            fn get_entries(vc: &EntriesContainer) -> Option<&BTreeMap<Self, Changes>> {
156 1
                if let $v2(ref v) = vc {
157 1
                    Some(v)
158
                } else {
159 0
                    None
160
                }
161 1
            }
162 1
            fn get_entries_mut(vc: &mut EntriesContainer) -> Option<&mut BTreeMap<Self, Changes>> {
163 1
                if let $v2(ref mut v) = vc {
164 1
                    Some(v)
165
                } else {
166 0
                    None
167
                }
168 1
            }
169 1
            fn new_entries() -> EntriesContainer {
170 1
                $v2(BTreeMap::new())
171 1
            }
172

173 1
            fn new_values() -> ValueContainer {
174 1
                $v(Vec::new())
175 1
            }
176
        }
177
    };
178
}
179

180
macro_rules! container_enums {
181
    ($($variant:ident<$t:ty>),+,) => {
182
        #[derive(Clone)]
183
        pub enum EntriesContainer {
184
            $(
185
            $variant(BTreeMap<<$t as IndexType>::Wrapper, Changes>),
186
            )+
187
        }
188

189
        #[derive(Clone)]
190
        pub enum ValueContainer {
191
            $(
192
            $variant(Vec<<$t as IndexType>::Wrapper>),
193
            )+
194
        }
195

196 1
        fn eapplier(
197
            keys: &EntriesContainer,
198
            values: &ValueContainer,
199
            index_id: &IndexId,
200
            persy: &PersyImpl,
201
            tx: &mut Transaction,
202
        ) -> PRes<()> {
203 1
            match keys {
204
                $(
205 1
                EntriesContainer::$variant(k) => valapplier::<$t>(values, k, index_id, persy, tx),
206
                )+
207
            }
208 1
        }
209

210 1
        fn valapplier<K>(
211
            values: &ValueContainer,
212
            k: &BTreeMap<K::Wrapper, Changes>,
213
            index_id: &IndexId,
214
            persy: &PersyImpl,
215
            tx: &mut Transaction,
216
        ) -> PRes<()>
217
        where
218
            K: IndexType,
219
        {
220 1
            match values {
221
                $(
222 1
                ValueContainer::$variant(v) => apply_to_index::<K, $t>(persy, tx, index_id, k, v),
223
                )+
224
            }
225 1
        }
226

227
        $(
228
            impl_index_data_type!($t, ValueContainer::$variant, EntriesContainer::$variant);
229
        )+
230
    }
231
}
232

233
container_enums!(
234
    U8<u8>,
235
    U16<u16>,
236
    U32<u32>,
237
    U64<u64>,
238
    U128<u128>,
239
    I8<i8>,
240
    I16<i16>,
241
    I32<i32>,
242
    I64<i64>,
243
    I128<i128>,
244
    F32W<f32>,
245
    F64W<f64>,
246
    STRING<String>,
247
    PERSYID<PersyId>,
248
    BYTEVEC<ByteVec>,
249
);
250

251
pub trait Extractor: Sized {
252
    fn get_vec_mut(vc: &mut ValueContainer) -> &mut Vec<Self>;
253
    fn get_vec(vc: &ValueContainer) -> &Vec<Self>;
254
    fn get_entries(vc: &EntriesContainer) -> Option<&BTreeMap<Self, Changes>>;
255
    fn get_entries_mut(vc: &mut EntriesContainer) -> Option<&mut BTreeMap<Self, Changes>>;
256
    fn new_entries() -> EntriesContainer;
257
    fn new_values() -> ValueContainer;
258
}
259

260 1
fn apply_to_index<K, V>(
261
    persy: &PersyImpl,
262
    tx: &mut Transaction,
263
    index_id: &IndexId,
264
    keys: &BTreeMap<K::Wrapper, Changes>,
265
    values: &[V::Wrapper],
266
) -> PRes<()>
267
where
268
    K: IndexType,
269
    V: IndexType,
270
{
271 1
    let changes: Vec<_> = keys
272
        .iter()
273 1
        .map(|(k, c)| {
274 1
            let vals: Vec<_> = c
275
                .changes
276
                .iter()
277 1
                .map(|ch| match *ch {
278 1
                    Change::ADD(pos) => TreeValue::ADD(values[pos].clone().value()),
279 1
                    Change::REMOVE(pos) => TreeValue::REMOVE(pos.map(|p| values[p].clone().value())),
280 1
                })
281
                .collect();
282 1
            KeyChanges::new(k.clone().value(), vals)
283 1
        })
284
        .collect();
285 1
    let mut index = Indexes::get_index_keeper_tx::<K, V>(persy, tx, index_id)?;
286 1
    index.apply(&changes)?;
287 1
    index.update_changed()?;
288 1
    Ok(())
289 1
}
290

291 1
fn map_bound<T: Into<T1> + Clone, T1>(b: Bound<&T>) -> Bound<T1> {
292 1
    match b {
293 1
        Bound::Excluded(x) => Bound::Excluded((*x).clone().into()),
294 1
        Bound::Included(x) => Bound::Included((*x).clone().into()),
295 1
        Bound::Unbounded => Bound::Unbounded,
296
    }
297 1
}
298

299
pub struct IndexTransactionKeeper {
300
    indexex_changes: HashMap<IndexId, (EntriesContainer, ValueContainer)>,
301
}
302

303
impl IndexTransactionKeeper {
304 1
    pub fn new() -> IndexTransactionKeeper {
305 1
        IndexTransactionKeeper {
306 1
            indexex_changes: HashMap::new(),
307
        }
308 1
    }
309

310 1
    pub fn put<K, V>(&mut self, index: IndexId, k: K, v: V)
311
    where
312
        K: IndexType,
313
        V: IndexType,
314
    {
315 1
        match self.indexex_changes.entry(index) {
316 1
            Entry::Occupied(ref mut o) => {
317 1
                let (entries, values) = o.get_mut();
318 1
                let pos = add_value(values, v);
319 1
                add_entry(entries, k, Change::ADD(pos));
320
            }
321 1
            Entry::Vacant(va) => {
322 1
                let mut values = V::Wrapper::new_values();
323 1
                let mut keys = K::Wrapper::new_entries();
324 1
                let pos = add_value(&mut values, v);
325 1
                add_entry(&mut keys, k, Change::ADD(pos));
326 1
                va.insert((keys, values));
327 1
            }
328
        }
329 1
    }
330

331 1
    pub fn remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
332
    where
333
        K: IndexType,
334
        V: IndexType,
335
    {
336 1
        match self.indexex_changes.entry(index) {
337 1
            Entry::Occupied(ref mut o) => {
338 1
                let pos = if let Some(val) = v {
339 1
                    Some(add_value(&mut o.get_mut().1, val))
340 1
                } else {
341 1
                    None
342
                };
343 1
                add_entry(&mut o.get_mut().0, k, Change::REMOVE(pos));
344
            }
345 1
            Entry::Vacant(va) => {
346 1
                let mut values = V::Wrapper::new_values();
347 1
                let mut keys = K::Wrapper::new_entries();
348 1
                let pos = if let Some(val) = v {
349 0
                    Some(add_value(&mut values, val))
350
                } else {
351 1
                    None
352
                };
353 1
                add_entry(&mut keys, k, Change::REMOVE(pos));
354 1
                va.insert((keys, values));
355 1
            }
356
        }
357 1
    }
358

359 1
    pub fn get_changes<K, V>(&self, index: IndexId, k: &K) -> Option<Vec<ValueChange<V>>>
360
    where
361
        K: IndexType,
362
        V: IndexType,
363
    {
364 1
        self.indexex_changes
365
            .get(&index)
366 1
            .map(|ref o| get_changes(&o.0, k).map(|c| resolve_values(&o.1, c)))
367
            .and_then(std::convert::identity)
368 1
    }
369

370 1
    pub fn apply_changes<K, V>(
371
        &self,
372
        index_id: IndexId,
373
        vm: ValueMode,
374
        k: &K,
375
        pers: Option<Value<V>>,
376
    ) -> PRes<Option<Value<V>>>
377
    where
378
        K: IndexType,
379
        V: IndexType,
380
    {
381 1
        let mut result = pers;
382 1
        if let Some(key_changes) = self.get_changes::<K, V>(index_id, k) {
383 1
            for change in key_changes {
384 1
                result = match change {
385 1
                    ValueChange::ADD(add_value) => Some(if let Some(s_result) = result {
386 1
                        match s_result {
387 1
                            Value::SINGLE(v) => match vm {
388 1
                                ValueMode::REPLACE => Value::SINGLE(add_value),
389
                                ValueMode::EXCLUSIVE => {
390 1
                                    if compare(&v, &add_value) == Ordering::Equal {
391 1
                                        Value::SINGLE(v)
392
                                    } else {
393
                                        // TODO: recover index name
394 1
                                        return Err(PersyError::IndexDuplicateKey("".to_string(), format!("{}", k)));
395
                                    }
396
                                }
397
                                ValueMode::CLUSTER => {
398 1
                                    if compare(&v, &add_value) == Ordering::Equal {
399 1
                                        Value::SINGLE(v)
400
                                    } else {
401 1
                                        Value::CLUSTER(vec![v, add_value])
402
                                    }
403
                                }
404 1
                            },
405 0
                            Value::CLUSTER(mut values) => {
406 0
                                if let Ok(pos) = values.binary_search_by(|x| compare(x, &add_value)) {
407 0
                                    values.insert(pos, add_value);
408
                                }
409 0
                                Value::CLUSTER(values)
410 0
                            }
411
                        }
412 0
                    } else {
413 1
                        Value::SINGLE(add_value)
414 1
                    }),
415 1
                    ValueChange::REMOVE(rv) => rv.and_then(|remove_value| {
416 0
                        result.and_then(|s_result| match s_result {
417 0
                            Value::SINGLE(v) => {
418 0
                                if compare(&v, &remove_value) == Ordering::Equal {
419 0
                                    None
420
                                } else {
421 0
                                    Some(Value::SINGLE(v))
422
                                }
423 0
                            }
424 0
                            Value::CLUSTER(mut values) => {
425 0
                                if let Ok(pos) = values.binary_search_by(|x| compare(x, &remove_value)) {
426 0
                                    values.remove(pos);
427
                                }
428 0
                                Some(if values.len() == 1 {
429 0
                                    Value::SINGLE(values.pop().unwrap())
430
                                } else {
431 0
                                    Value::CLUSTER(values)
432
                                })
433 0
                            }
434 0
                        })
435 0
                    }),
436
                };
437 1
            }
438 1
        }
439 1
        Ok(result)
440 1
    }
441

442 1
    pub fn apply(&self, persy: &PersyImpl, tx: &mut Transaction) -> PRes<()> {
443 1
        for (index, values) in &self.indexex_changes {
444 1
            eapplier(&values.0, &values.1, index, persy, tx)?;
445
        }
446 1
        Ok(())
447 1
    }
448

449 1
    pub fn range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
450
    where
451
        K: IndexType,
452
        V: IndexType,
453
        R: RangeBounds<K>,
454
    {
455 1
        self.indexex_changes.get(&index).map(|x| resolve_range(&x.0, range))
456 1
    }
457

458 1
    pub fn changed_indexes(&self) -> Vec<IndexId> {
459 1
        self.indexex_changes.keys().cloned().collect()
460 1
    }
461 0
    pub fn remove_changes(&mut self, index_id: &IndexId) {
462 0
        self.indexex_changes.remove(index_id);
463 0
    }
464
}
465

466
pub trait IndexKeeper<K, V> {
467
    fn load(&self, node: &NodeRef) -> PRes<Node<K, V>>;
468
    fn get_root(&self) -> PRes<Option<NodeRef>>;
469
    fn value_mode(&self) -> ValueMode;
470
    fn index_name(&self) -> &String;
471
}
472

473
pub trait IndexModify<K, V>: IndexKeeper<K, V> {
474
    fn load_modify(&self, node: &NodeRef) -> PRes<Option<(Rc<Node<K, V>>, u16)>>;
475
    fn lock(&mut self, node: &NodeRef, version: u16) -> PRes<bool>;
476
    fn owned(&mut self, node_ref: &NodeRef, node: Rc<Node<K, V>>) -> Node<K, V>;
477
    fn unlock(&mut self, node: &NodeRef) -> PRes<bool>;
478
    fn unlock_config(&mut self) -> PRes<bool>;
479
    fn get_root_refresh(&mut self) -> PRes<Option<NodeRef>>;
480
    fn lock_config(&mut self) -> PRes<bool>;
481
    fn insert(&mut self, node: Node<K, V>) -> PRes<NodeRef>;
482
    fn update(&mut self, node_ref: &NodeRef, node: Node<K, V>, version: u16) -> PRes<()>;
483
    fn delete(&mut self, node: &NodeRef, version: u16) -> PRes<()>;
484
    fn set_root(&mut self, root: Option<NodeRef>) -> PRes<()>;
485
    fn bottom_limit(&self) -> usize;
486
    fn top_limit(&self) -> usize;
487
}
488

489
pub struct IndexSegmentKeeper<'a> {
490
    name: String,
491
    segment: SegmentId,
492
    root: Option<NodeRef>,
493
    store: &'a PersyImpl,
494
    snapshot: SnapshotId,
495
    value_mode: ValueMode,
496
}
497

498
impl<'a> IndexSegmentKeeper<'a> {
499 1
    pub fn new(
500
        name: &str,
501
        index_id: &IndexId,
502
        root: Option<NodeRef>,
503
        store: &'a PersyImpl,
504
        snapshot: SnapshotId,
505
        value_mode: ValueMode,
506
    ) -> IndexSegmentKeeper<'a> {
507 1
        IndexSegmentKeeper {
508 1
            name: name.to_string(),
509 1
            segment: index_id_to_segment_id_data(index_id),
510 1
            root,
511
            store,
512
            snapshot,
513
            value_mode,
514 0
        }
515 1
    }
516
}
517

518
impl<'a, K: IndexType, V: IndexType> IndexKeeper<K, V> for IndexSegmentKeeper<'a> {
519 1
    fn load(&self, node: &NodeRef) -> PRes<Node<K, V>> {
520 1
        let rec = self
521
            .store
522 1
            .read_snap_fn(self.segment, &node, self.snapshot, |x| deserialize(x))?
523 1
            .unwrap()?;
524 1
        Ok(rec)
525 1
    }
526 1
    fn get_root(&self) -> PRes<Option<NodeRef>> {
527 1
        Ok(self.root.clone())
528 1
    }
529 1
    fn value_mode(&self) -> ValueMode {
530 1
        self.value_mode.clone()
531 1
    }
532

533
    fn index_name(&self) -> &String {
534
        &self.name
535
    }
536
}
537

538
struct LockData {
539
    version: u16,
540
    counter: u32,
541
}
542

543
pub struct IndexSegmentKeeperTx<'a, K: IndexType, V: IndexType> {
544
    name: String,
545
    index_id: IndexId,
546
    root: Option<NodeRef>,
547
    config_version: u16,
548
    store: &'a PersyImpl,
549
    tx: &'a mut Transaction,
550
    value_mode: ValueMode,
551
    changed: Option<HashMap<NodeRef, (Rc<Node<K, V>>, u16)>>,
552
    bottom_limit: usize,
553
    top_limit: usize,
554
    locked: HashMap<NodeRef, LockData>,
555
    updated_root: bool,
556
}
557

558
impl<'a, K: IndexType, V: IndexType> IndexSegmentKeeperTx<'a, K, V> {
559 1
    pub fn new(
560
        name: &str,
561
        index_id: &IndexId,
562
        root: Option<NodeRef>,
563
        config_version: u16,
564
        store: &'a PersyImpl,
565
        tx: &'a mut Transaction,
566
        value_mode: ValueMode,
567
        bottom_limit: usize,
568
        top_limit: usize,
569
    ) -> IndexSegmentKeeperTx<'a, K, V> {
570 1
        IndexSegmentKeeperTx {
571 1
            name: name.to_string(),
572 1
            index_id: index_id.clone(),
573 1
            root,
574
            config_version,
575
            store,
576
            tx,
577
            value_mode,
578 1
            changed: None,
579
            bottom_limit,
580
            top_limit,
581 1
            locked: HashMap::new(),
582
            updated_root: false,
583 0
        }
584 1
    }
585 1
    pub fn update_changed(&mut self) -> PRes<()> {
586 1
        let segment = index_id_to_segment_id_data(&self.index_id);
587 1
        if let Some(m) = &self.changed {
588 1
            for (node_ref, node) in m {
589 1
                self.store.update(self.tx, segment, &node_ref, &serialize(&node.0)?)?;
590
            }
591
        }
592 1
        if self.updated_root {
593 1
            Indexes::update_index_root(self.store, self.tx, &self.index_id, self.root.clone())?;
594
        }
595 1
        Ok(())
596 1
    }
597
}
598

599
impl<'a, K: IndexType, V: IndexType> IndexModify<K, V> for IndexSegmentKeeperTx<'a, K, V> {
600 1
    fn load_modify(&self, node: &NodeRef) -> PRes<Option<(Rc<Node<K, V>>, u16)>> {
601 1
        if let Some(m) = &self.changed {
602 1
            if let Some(n) = m.get(node) {
603 1
                return Ok(Some(n.clone()));
604
            }
605
        }
606 1
        let segment = index_id_to_segment_id_data(&self.index_id);
607 1
        if let Some((rec, version)) = self
608
            .store
609 1
            .read_tx_internal_fn(self.tx, segment, &node, |x| deserialize(x))?
610
        {
611 1
            Ok(Some((Rc::new(rec?), version)))
612 0
        } else {
613 0
            Ok(None)
614
        }
615 1
    }
616 1
    fn lock(&mut self, node: &NodeRef, version: u16) -> PRes<bool> {
617 1
        if let Some(lock_data) = self.locked.get_mut(node) {
618 1
            if version == lock_data.version {
619 1
                lock_data.counter += 1;
620 1
                Ok(true)
621 0
            } else if version < lock_data.version {
622 0
                Ok(false)
623
            } else {
624 0
                panic!("wrong matched versions {} {}", version, lock_data.version);
625
            }
626
        } else {
627 1
            let segment = index_id_to_segment_id_data(&self.index_id);
628 1
            if self.store.lock_record(self.tx, &segment, node, version)? {
629 1
                self.locked.insert(node.clone(), LockData { version, counter: 1 });
630 1
                Ok(true)
631
            } else {
632 1
                Ok(false)
633
            }
634
        }
635 1
    }
636

637 1
    fn owned(&mut self, node_ref: &NodeRef, mut node: Rc<Node<K, V>>) -> Node<K, V> {
638 1
        debug_assert!(self.locked.contains_key(node_ref));
639 1
        if let Some(changed) = &mut self.changed {
640 1
            changed.remove(node_ref);
641
        }
642 1
        Rc::make_mut(&mut node);
643 1
        Rc::try_unwrap(node).ok().unwrap()
644 1
    }
645

646 1
    fn unlock(&mut self, node: &NodeRef) -> PRes<bool> {
647 1
        if let Entry::Occupied(mut x) = self.locked.entry(node.clone()) {
648 1
            x.get_mut().counter -= 1;
649 1
            if x.get().counter == 0 {
650 1
                x.remove();
651 1
                let segment = index_id_to_segment_id_data(&self.index_id);
652 1
                self.store.unlock_record(self.tx, &segment, node)?;
653 1
                Ok(true)
654
            } else {
655 0
                Ok(false)
656
            }
657
        } else {
658 0
            Ok(false)
659
        }
660 1
    }
661

662 1
    fn get_root_refresh(&mut self) -> PRes<Option<NodeRef>> {
663 1
        if !self.updated_root {
664 1
            let (config, version) = Indexes::get_index_tx(self.store, self.tx, &self.index_id)?;
665 1
            self.root = config.get_root();
666 1
            self.config_version = version;
667 1
        }
668 1
        Ok(self.root.clone())
669 1
    }
670
    fn unlock_config(&mut self) -> PRes<bool> {
671
        let config_id = Indexes::get_config_id(self.store, self.tx, &self.index_id)?.0;
672
        if let Entry::Occupied(mut x) = self.locked.entry(config_id.clone()) {
673
            x.get_mut().counter -= 1;
674
            if x.get().counter == 0 {
675
                x.remove();
676
                let segment = index_id_to_segment_id_meta(&self.index_id);
677
                self.store.unlock_record(self.tx, &segment, &config_id)?;
678
                Ok(true)
679
            } else {
680
                Ok(false)
681
            }
682
        } else {
683
            Ok(false)
684
        }
685
    }
686 1
    fn lock_config(&mut self) -> PRes<bool> {
687 1
        let config_id = Indexes::get_config_id(self.store, self.tx, &self.index_id)?.0;
688

689 1
        let segment = index_id_to_segment_id_meta(&self.index_id);
690 1
        if let Some(lock_data) = self.locked.get_mut(&config_id) {
691 1
            if self.config_version == lock_data.version {
692 1
                lock_data.counter += 1;
693 1
                Ok(true)
694
            } else {
695 0
                panic!("this should never happen");
696
            }
697 1
        } else if self
698
            .store
699 1
            .lock_record(self.tx, &segment, &config_id, self.config_version)?
700
        {
701 1
            self.locked.insert(
702 1
                config_id.clone(),
703 1
                LockData {
704 1
                    version: self.config_version,
705
                    counter: 1,
706
                },
707
            );
708 1
            Ok(true)
709
        } else {
710 1
            let (config, version) = Indexes::get_index_tx(self.store, self.tx, &self.index_id)?;
711 1
            self.root = config.get_root();
712 1
            self.config_version = version;
713 1
            Ok(false)
714 1
        }
715 1
    }
716

717 1
    fn insert(&mut self, node: Node<K, V>) -> PRes<NodeRef> {
718 1
        let segment = index_id_to_segment_id_data(&self.index_id);
719 1
        let node_ref = self.store.insert_record(self.tx, &segment, &serialize(&node)?)?;
720 1
        self.changed
721
            .get_or_insert_with(HashMap::new)
722 1
            .insert(node_ref.clone(), (Rc::new(node), 0));
723 1
        self.locked
724 1
            .insert(node_ref.clone(), LockData { version: 0, counter: 1 });
725 1
        Ok(node_ref)
726 1
    }
727

728 1
    fn update(&mut self, node_ref: &NodeRef, node: Node<K, V>, version: u16) -> PRes<()> {
729 1
        debug_assert!(self.locked.contains_key(node_ref));
730 1
        self.changed
731
            .get_or_insert_with(HashMap::new)
732 1
            .insert(node_ref.clone(), (Rc::new(node), version));
733 1
        Ok(())
734 1
    }
735

736 0
    fn delete(&mut self, node: &NodeRef, _version: u16) -> PRes<()> {
737 0
        debug_assert!(self.locked.contains_key(node));
738 0
        if let Some(m) = &mut self.changed {
739 0
            m.remove(node);
740
        }
741 0
        let segment = index_id_to_segment_id_data(&self.index_id);
742 0
        self.store.delete(self.tx, segment, &node)?;
743 0
        Ok(())
744 0
    }
745 1
    fn set_root(&mut self, root: Option<NodeRef>) -> PRes<()> {
746 1
        self.root = root;
747 1
        self.updated_root = true;
748 1
        Ok(())
749 1
    }
750

751 1
    fn bottom_limit(&self) -> usize {
752 1
        self.bottom_limit
753 1
    }
754 1
    fn top_limit(&self) -> usize {
755 1
        self.top_limit
756 1
    }
757
}
758

759
impl<'a, K: IndexType, V: IndexType> IndexKeeper<K, V> for IndexSegmentKeeperTx<'a, K, V> {
760 1
    fn load(&self, node: &NodeRef) -> PRes<Node<K, V>> {
761 1
        if let Some(m) = &self.changed {
762 0
            if let Some(n) = m.get(node) {
763 0
                return Ok(n.0.as_ref().clone());
764
            }
765
        }
766 1
        let segment = index_id_to_segment_id_data(&self.index_id);
767 1
        let (rec, _) = self
768
            .store
769 1
            .read_tx_internal_fn(self.tx, segment, &node, |x| deserialize(x))?
770 0
            .unwrap();
771 1
        Ok(rec?)
772 1
    }
773 1
    fn get_root(&self) -> PRes<Option<NodeRef>> {
774 1
        Ok(self.root.clone())
775 1
    }
776 1
    fn value_mode(&self) -> ValueMode {
777 1
        self.value_mode.clone()
778 1
    }
779

780 1
    fn index_name(&self) -> &String {
781
        &self.name
782 1
    }
783
}
784

785
/// Index Iterator implementation for iterating on a range of keys considering in transaction
786
/// changes
787
pub struct TxIndexRawIter<K: IndexType, V: IndexType> {
788
    index_name: IndexId,
789
    in_tx: Option<IntoIter<K>>,
790
    persistent: Option<IndexRawIter<K, V>>,
791
    in_tx_front: Option<Option<K>>,
792
    persistent_front: Option<Option<(K, Value<V>)>>,
793
    in_tx_back: Option<Option<K>>,
794
    persistent_back: Option<Option<(K, Value<V>)>>,
795
    value_mode: ValueMode,
796
}
797

798
impl<K, V> TxIndexRawIter<K, V>
799
where
800
    K: IndexType,
801
    V: IndexType,
802
{
803 1
    pub fn new(
804
        index_name: IndexId,
805
        in_tx: Option<IntoIter<K>>,
806
        persistent: Option<IndexRawIter<K, V>>,
807
        value_mode: ValueMode,
808
    ) -> TxIndexRawIter<K, V> {
809 1
        TxIndexRawIter {
810
            index_name,
811 1
            in_tx,
812 1
            persistent,
813 1
            in_tx_front: None,
814 1
            persistent_front: None,
815 1
            in_tx_back: None,
816 1
            persistent_back: None,
817
            value_mode,
818
        }
819 1
    }
820

821 1
    fn apply_changes(
822
        tx: &mut Transaction,
823
        vm: ValueMode,
824
        index: IndexId,
825
        k: K,
826
        pers: Option<Value<V>>,
827
    ) -> Option<(K, Value<V>)> {
828 1
        tx.apply_changes(vm, index, &k, pers).unwrap_or(None).map(|v| (k, v))
829 1
    }
830

831 1
    pub fn next(&mut self, persy_impl: &Arc<PersyImpl>, tx: &mut Transaction) -> Option<(K, Value<V>)> {
832 1
        loop {
833 1
            let vm = self.value_mode.clone();
834 1
            let index = self.index_name.clone();
835 1
            let apply_changes = |k, o| Self::apply_changes(tx, vm, index, k, o);
836 1
            match (&mut self.in_tx, &mut self.persistent) {
837 1
                (Some(it), Some(pers)) => {
838 1
                    match (
839 1
                        self.in_tx_front.get_or_insert_with(|| it.next()).clone(),
840 1
                        self.persistent_front
841 1
                            .get_or_insert_with(|| pers.next(persy_impl))
842
                            .clone(),
843 0
                    ) {
844 1
                        (Some(tx_k), Some((pers_k, vals))) => match tx_k.cmp(&pers_k) {
845 1
                            Ordering::Less => {
846 0
                                self.in_tx_front = None;
847 0
                                let res = apply_changes(tx_k, None);
848 0
                                if res.is_some() {
849 1
                                    break res;
850
                                }
851 0
                            }
852
                            Ordering::Equal => {
853 1
                                self.in_tx_front = None;
854 1
                                self.persistent_front = None;
855 1
                                let res = apply_changes(tx_k, Some(vals));
856

857 1
                                if res.is_some() {
858 0
                                    break res;
859
                                }
860 0
                            }
861
                            Ordering::Greater => {
862 1
                                self.persistent_front = None;
863 1
                                break Some((pers_k, vals));
864
                            }
865 1
                        },
866 1
                        (Some(tx_k), None) => {
867 1
                            self.in_tx_front = None;
868 1
                            let res = apply_changes(tx_k, None);
869 1
                            if res.is_some() {
870 1
                                break res;
871
                            }
872 1
                        }
873 1
                        (None, Some((pers_k, vals))) => {
874 0
                            self.persistent_front = None;
875 0
                            break Some((pers_k, vals));
876 0
                        }
877 1
                        (None, None) => break None,
878
                    }
879 1
                }
880 0
                (Some(it), None) => {
881 0
                    let res = apply_changes(it.next().unwrap(), None);
882 0
                    if res.is_some() {
883 0
                        break res;
884
                    }
885 0
                }
886 0
                (None, Some(pers)) => break pers.next(persy_impl),
887 0
                (None, None) => break None,
888
            }
889
        }
890 1
    }
891

892 1
    pub fn next_back(&mut self, persy_impl: &Arc<PersyImpl>, tx: &mut Transaction) -> Option<(K, Value<V>)> {
893 1
        loop {
894 1
            let vm = self.value_mode.clone();
895 1
            let index = self.index_name.clone();
896 1
            let apply_changes = |k, o| Self::apply_changes(tx, vm, index, k, o);
897 1
            match (&mut self.in_tx, &mut self.persistent) {
898 1
                (Some(it), Some(pers)) => {
899 1
                    match (
900 1
                        self.in_tx_back.get_or_insert_with(|| it.next_back()).clone(),
901 1
                        self.persistent_back
902 1
                            .get_or_insert_with(|| pers.next_back(persy_impl))
903
                            .clone(),
904 0
                    ) {
905 1
                        (Some(tx_k), Some((pers_k, vals))) => match tx_k.cmp(&pers_k) {
906 1
                            Ordering::Less => {
907 1
                                self.persistent_back = None;
908 1
                                break Some((pers_k, vals));
909
                            }
910
                            Ordering::Equal => {
911 1
                                self.in_tx_back = None;
912 1
                                self.persistent_back = None;
913 1
                                let res = apply_changes(tx_k, Some(vals));
914

915 1
                                if res.is_some() {
916 0
                                    break res;
917
                                }
918 0
                            }
919
                            Ordering::Greater => {
920 1
                                self.in_tx_back = None;
921 1
                                let res = apply_changes(tx_k, None);
922

923 1
                                if res.is_some() {
924 1
                                    break res;
925
                                }
926 0
                            }
927 1
                        },
928 1
                        (Some(tx_k), None) => {
929 1
                            self.in_tx_back = None;
930 1
                            let res = apply_changes(tx_k, None);
931 1
                            if res.is_some() {
932 1
                                break res;
933
                            }
934 1
                        }
935 1
                        (None, Some((pers_k, vals))) => {
936 1
                            self.persistent_back = None;
937 1
                            break Some((pers_k, vals));
938 0
                        }
939 1
                        (None, None) => break None,
940
                    }
941 1
                }
942 0
                (Some(it), None) => {
943 0
                    let res = apply_changes(it.next_back().unwrap(), None);
944 0
                    if res.is_some() {
945 0
                        break res;
946
                    }
947 0
                }
948 0
                (None, Some(pers)) => break pers.next_back(persy_impl),
949 0
                (None, None) => break None,
950
            }
951
        }
952 1
    }
953
}
954

955
pub struct IndexRawIter<K: IndexType, V: IndexType> {
956
    index_id: IndexId,
957
    read_snapshot: SnapshotId,
958
    iter: PageIter<K, V>,
959
    back: PageIterBack<K, V>,
960
    release_snapshot: bool,
961
}
962

963
impl<K, V> IndexRawIter<K, V>
964
where
965
    K: IndexType,
966
    V: IndexType,
967
{
968 1
    pub fn new(
969
        index_id: IndexId,
970
        read_snapshot: SnapshotId,
971
        iter: PageIter<K, V>,
972
        back: PageIterBack<K, V>,
973
        release_snapshot: bool,
974
    ) -> IndexRawIter<K, V> {
975 1
        IndexRawIter {
976
            index_id,
977
            read_snapshot,
978 1
            iter,
979 1
            back,
980
            release_snapshot,
981
        }
982 1
    }
983

984 1
    pub fn next(&mut self, persy_impl: &PersyImpl) -> Option<(K, Value<V>)> {
985 1
        let back_keep = self.back.iter.peek();
986 1
        if let (Some(s), Some(e)) = (self.iter.iter.peek(), back_keep) {
987 1
            if s.key.cmp(&e.key) == Ordering::Greater {
988 1
                return None;
989
            }
990
        }
991 1
        if let Some(n) = self.iter.iter.next() {
992 1
            if self.iter.iter.peek().is_none() {
993 1
                if let Ok(iter) = persy_impl.index_next(&self.index_id, self.read_snapshot, Bound::Excluded(&n.key)) {
994 1
                    self.iter = iter;
995
                }
996 1
            }
997 1
            Some((n.key, n.value))
998 0
        } else {
999 1
            None
1000
        }
1001 1
    }
1002

1003 1
    pub fn next_back(&mut self, persy_impl: &PersyImpl) -> Option<(K, Value<V>)> {
1004 1
        let front_keep = self.iter.iter.peek();
1005 1
        if let (Some(s), Some(e)) = (self.back.iter.peek(), front_keep) {
1006 1
            if s.key.cmp(&e.key) == Ordering::Less {
1007 1
                return None;
1008
            }
1009
        }
1010 1
        if let Some(n) = self.back.iter.next() {
1011 1
            if self.back.iter.peek().is_none() {
1012 1
                if let Ok(back) = persy_impl.index_back(&self.index_id, self.read_snapshot, Bound::Excluded(&n.key)) {
1013 1
                    self.back = back;
1014
                }
1015 1
            }
1016 1
            Some((n.key, n.value))
1017 0
        } else {
1018 1
            None
1019
        }
1020 1
    }
1021 1
    pub fn release(&self, persy_impl: &PersyImpl) -> PRes<()> {
1022 1
        if self.release_snapshot {
1023 1
            persy_impl.release_snapshot(self.read_snapshot)?;
1024
        }
1025 1
        Ok(())
1026 1
    }
1027
}
1028

1029
#[cfg(test)]
1030
mod tests {
1031
    use super::{ByteVec, IndexTransactionKeeper, IndexType, ValueChange};
1032
    use crate::id::{IndexId, PersyId, RecRef, SegmentId};
1033
    use std::fmt::Debug;
1034

1035 1
    fn keeper_test_for_type<K: IndexType + PartialEq, V: IndexType + Debug + PartialEq>(k: K, dk: K, v: V) {
1036 1
        let name = IndexId::new(SegmentId::new(30), SegmentId::new(40));
1037 1
        let mut keeper = IndexTransactionKeeper::new();
1038 1
        keeper.put(name.clone(), k.clone(), v.clone());
1039 1
        let ret = keeper.get_changes(name.clone(), &k);
1040 1
        assert_eq!(ret, Some(vec![ValueChange::ADD(v.clone())]));
1041 1
        keeper.remove(name.clone(), dk.clone(), Some(v.clone()));
1042 1
        let ret = keeper.get_changes(name, &dk);
1043 1
        assert_eq!(ret, Some(vec![ValueChange::REMOVE(Some(v))]));
1044 1
    }
1045

1046
    #[test]
1047 1
    fn simple_tx_keeper_test() {
1048 1
        keeper_test_for_type::<u8, u8>(10, 15, 10);
1049 1
        keeper_test_for_type::<u16, u16>(10, 15, 10);
1050 1
        keeper_test_for_type::<u32, u32>(10, 15, 10);
1051 1
        keeper_test_for_type::<u64, u64>(10, 15, 10);
1052 1
        keeper_test_for_type::<u128, u128>(10, 15, 10);
1053 1
        keeper_test_for_type::<i8, i8>(10, 15, 10);
1054 1
        keeper_test_for_type::<i16, i16>(10, 15, 10);
1055 1
        keeper_test_for_type::<i32, i32>(10, 15, 10);
1056 1
        keeper_test_for_type::<i64, i64>(10, 15, 10);
1057 1
        keeper_test_for_type::<i128, i128>(10, 15, 10);
1058 1
        keeper_test_for_type::<f32, f32>(20.0, 10.0, 20.0);
1059 1
        keeper_test_for_type::<f64, f64>(20.0, 10.0, 20.0);
1060 1
        keeper_test_for_type::<String, String>("a".to_string(), "b".to_string(), "a".to_string());
1061 1
        keeper_test_for_type::<ByteVec, ByteVec>(vec![0, 1].into(), vec![0, 2].into(), vec![0, 1].into());
1062 1
        let id = PersyId(RecRef::new(10, 20));
1063 1
        let id1 = PersyId(RecRef::new(20, 20));
1064 1
        let id2 = PersyId(RecRef::new(30, 20));
1065 1
        keeper_test_for_type::<PersyId, PersyId>(id, id1, id2);
1066 1
    }
1067
}

Read our documentation on viewing source code .

Loading