1
use crate::index::{
2
    config::{IndexType, ValueMode},
3
    keeper::IndexTransactionKeeper,
4
    tree::nodes::Value,
5
};
6
use crate::{
7
    address::Address,
8
    config::TxStrategy,
9
    error::{PRes, PersyError},
10
    id::{index_id_to_segment_id_data, index_id_to_segment_id_meta, IndexId, RecRef, SegmentId},
11
    journal::{Journal, JournalId},
12
    persy::PersyImpl,
13
    snapshot::{release_snapshot, SnapshotEntry, SnapshotId},
14
};
15
use std::{
16
    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
17
    ops::RangeBounds,
18
    vec::{self, IntoIter},
19
};
20

21 1
#[derive(Clone, Default)]
22
pub struct NewSegmentPage {
23 1
    pub segment: SegmentId,
24 1
    pub page: u64,
25 1
    pub previous: u64,
26
}
27

28 1
#[derive(Clone, Default)]
29
pub struct InsertRecord {
30 1
    pub segment: SegmentId,
31 1
    pub recref: RecRef,
32 1
    pub record_page: u64,
33
}
34

35 1
#[derive(Clone, Default)]
36
pub struct UpdateRecord {
37 1
    pub segment: SegmentId,
38 1
    pub recref: RecRef,
39 1
    pub record_page: u64,
40 1
    pub version: u16,
41
}
42

43 1
#[derive(Clone, Default)]
44
pub struct ReadRecord {
45 1
    pub segment: SegmentId,
46 1
    pub recref: RecRef,
47 1
    pub version: u16,
48
}
49

50 1
#[derive(Clone, Default)]
51
pub struct DeleteRecord {
52 1
    pub segment: SegmentId,
53 1
    pub recref: RecRef,
54 1
    pub version: u16,
55
}
56

57 1
#[derive(Clone, Default)]
58
pub struct CreateSegment {
59 1
    pub name: String,
60 1
    pub segment_id: SegmentId,
61 1
    pub first_page: u64,
62
}
63

64 1
#[derive(Clone, Default)]
65
pub struct DropSegment {
66 1
    pub name: String,
67 1
    pub segment_id: SegmentId,
68
}
69

70 1
#[derive(Clone, PartialEq, Debug, PartialOrd, Ord, Eq, Default)]
71
pub struct FreedPage {
72 1
    pub page: u64,
73
}
74

75 1
#[derive(Default)]
76
pub struct PrepareCommit {}
77

78 1
#[derive(Default)]
79
pub struct Commit {}
80

81 1
#[derive(Default)]
82
pub struct Cleanup {}
83

84 1
#[derive(Default)]
85
pub struct Rollback {}
86

87 1
#[derive(Default)]
88
pub struct Metadata {
89 1
    pub strategy: TxStrategy,
90 1
    pub meta_id: Vec<u8>,
91
}
92

93
pub enum SegmentOperation {
94
    CREATE(CreateSegment),
95
    DROP(DropSegment),
96
}
97

98 1
#[derive(Clone)]
99
pub struct PreparedState {
100 1
    locked_indexes: Option<Vec<IndexId>>,
101 1
    snapshot_id: Option<SnapshotId>,
102 1
    data_locks: Option<(Vec<(SegmentId, RecRef, u16)>, Vec<SegmentId>, Vec<SegmentId>)>,
103
}
104

105
impl PreparedState {
106 1
    fn new() -> PreparedState {
107 1
        PreparedState {
108 1
            locked_indexes: None,
109 1
            snapshot_id: None,
110 1
            data_locks: None,
111
        }
112 1
    }
113
}
114

115
pub enum SyncMode {
116
    Sync,
117
    BackgroundSync,
118
}
119

120
pub struct Transaction {
121
    strategy: TxStrategy,
122
    sync_mode: SyncMode,
123
    meta_id: Vec<u8>,
124
    id: JournalId,
125
    inserted: Vec<InsertRecord>,
126
    updated: Vec<UpdateRecord>,
127
    deleted: Vec<DeleteRecord>,
128
    read: HashMap<RecRef, ReadRecord>,
129
    segments_operations: Vec<SegmentOperation>,
130
    segs_created_names: HashSet<String>,
131
    segs_dropped_names: HashSet<String>,
132
    segs_created: HashSet<SegmentId>,
133
    segs_dropped: HashSet<SegmentId>,
134
    segs_updated: HashSet<SegmentId>,
135
    freed_pages: Option<Vec<FreedPage>>,
136
    indexes: Option<IndexTransactionKeeper>,
137
    segs_new_pages: Vec<NewSegmentPage>,
138
    locked_index_segs: HashSet<SegmentId>,
139
    locked_index_pages: HashSet<RecRef>,
140
    locked_index_tracking: HashSet<(SegmentId, RecRef, u16)>,
141
}
142

143
pub enum TxRead {
144
    RECORD((u64, u16)),
145
    DELETED,
146
    NONE,
147
}
148

149
pub enum TxSegCheck {
150
    CREATED(SegmentId),
151
    DROPPED,
152
    NONE,
153
}
154

155
pub struct TransactionInsertScanner<'a> {
156
    tx: &'a Transaction,
157
    segment: SegmentId,
158
}
159

160
pub struct TransactionInsertIterator {
161
    iter: vec::IntoIter<InsertRecord>,
162
    segment: SegmentId,
163
}
164

165
impl<'a> IntoIterator for TransactionInsertScanner<'a> {
166
    type Item = RecRef;
167
    type IntoIter = TransactionInsertIterator;
168

169 1
    fn into_iter(self) -> Self::IntoIter {
170 1
        let iter: vec::IntoIter<InsertRecord> = self.tx.inserted.clone().into_iter();
171 1
        TransactionInsertIterator {
172 1
            iter,
173
            segment: self.segment,
174
        }
175 1
    }
176
}
177

178
impl Iterator for TransactionInsertIterator {
179
    type Item = RecRef;
180 1
    fn next(&mut self) -> Option<RecRef> {
181 1
        loop {
182 1
            let next = self.iter.next();
183 1
            if let Some(rec) = next {
184 1
                if rec.segment == self.segment {
185 1
                    return Some(rec.recref);
186
                }
187
            } else {
188 1
                return None;
189
            }
190
        }
191 1
    }
192
}
193

194
impl Transaction {
195 1
    pub fn new(journal: &Journal, strategy: &TxStrategy, sync_mode: SyncMode, meta_id: Vec<u8>) -> PRes<Transaction> {
196 1
        let id = journal.start()?;
197 1
        journal.log(&Metadata::new(strategy, meta_id.clone()), &id)?;
198 1
        Ok(Transaction {
199 1
            strategy: strategy.clone(),
200
            sync_mode,
201 1
            meta_id,
202 1
            id,
203 1
            inserted: Vec::new(),
204 1
            updated: Vec::new(),
205 1
            deleted: Vec::new(),
206 1
            read: HashMap::new(),
207 1
            segments_operations: Vec::new(),
208 1
            segs_created_names: HashSet::new(),
209 1
            segs_dropped_names: HashSet::new(),
210 1
            segs_created: HashSet::new(),
211 1
            segs_dropped: HashSet::new(),
212 1
            segs_updated: HashSet::new(),
213 1
            freed_pages: None,
214 1
            indexes: Some(IndexTransactionKeeper::new()),
215 1
            segs_new_pages: Vec::new(),
216 1
            locked_index_segs: HashSet::new(),
217 1
            locked_index_pages: HashSet::new(),
218 1
            locked_index_tracking: HashSet::new(),
219 0
        })
220 1
    }
221

222 1
    pub fn recover(id: JournalId) -> Transaction {
223 1
        Transaction {
224 1
            strategy: TxStrategy::LastWin,
225 1
            sync_mode: SyncMode::Sync,
226 1
            meta_id: Vec::new(),
227
            id,
228 1
            inserted: Vec::new(),
229 1
            updated: Vec::new(),
230 1
            deleted: Vec::new(),
231 1
            read: HashMap::new(),
232 1
            segments_operations: Vec::new(),
233 1
            segs_created_names: HashSet::new(),
234 1
            segs_dropped_names: HashSet::new(),
235 1
            segs_created: HashSet::new(),
236 1
            segs_dropped: HashSet::new(),
237 1
            segs_updated: HashSet::new(),
238 1
            freed_pages: None,
239 1
            indexes: Some(IndexTransactionKeeper::new()),
240 1
            segs_new_pages: Vec::new(),
241 1
            locked_index_segs: HashSet::new(),
242 1
            locked_index_pages: HashSet::new(),
243 1
            locked_index_tracking: HashSet::new(),
244 0
        }
245 1
    }
246

247 1
    pub fn segment_created_in_tx(&self, segment: SegmentId) -> bool {
248 1
        self.segs_created.contains(&segment)
249 1
    }
250

251
    pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
252
        for info in &self.segments_operations {
253
            if let SegmentOperation::CREATE(ref c) = info {
254
                if c.segment_id == segment {
255
                    return Some(c.name.clone());
256
                }
257
            }
258
        }
259
        None
260
    }
261 1
    pub fn exists_segment(&self, segment: &str) -> TxSegCheck {
262 1
        if self.segs_created_names.contains(segment) {
263 1
            for a in &self.segments_operations {
264 1
                if let SegmentOperation::CREATE(ref c) = a {
265 1
                    if c.name == segment {
266 1
                        return TxSegCheck::CREATED(c.segment_id);
267
                    }
268
                }
269
            }
270 1
        } else if self.segs_dropped_names.contains(segment) {
271 1
            return TxSegCheck::DROPPED;
272
        }
273 1
        TxSegCheck::NONE
274 1
    }
275

276 1
    pub fn add_create_segment(
277
        &mut self,
278
        journal: &Journal,
279
        name: &str,
280
        segment_id: SegmentId,
281
        first_page: u64,
282
    ) -> PRes<()> {
283 1
        let create = CreateSegment::new(name, segment_id, first_page);
284

285 1
        journal.log(&create, &self.id)?;
286 1
        self.segments_operations.push(SegmentOperation::CREATE(create));
287 1
        self.segs_created.insert(segment_id);
288 1
        self.segs_created_names.insert(name.into());
289 1
        Ok(())
290 1
    }
291

292 1
    pub fn recover_add(&mut self, create: &CreateSegment) {
293 1
        self.segments_operations.push(SegmentOperation::CREATE(create.clone()));
294 1
        self.segs_created.insert(create.segment_id);
295 1
        self.segs_created_names.insert(create.name.clone());
296 1
    }
297

298 1
    pub fn add_drop_segment(&mut self, journal: &Journal, name: &str, segment_id: SegmentId) -> PRes<()> {
299 1
        if self.segs_created_names.contains(name) {
300 1
            Err(PersyError::CannotDropSegmentCreatedInTx)
301
        } else {
302 1
            let drop = DropSegment::new(name, segment_id);
303 1
            journal.log(&drop, &self.id)?;
304 1
            self.segments_operations.push(SegmentOperation::DROP(drop));
305 1
            self.segs_dropped.insert(segment_id);
306 1
            self.segs_dropped_names.insert(name.into());
307 1
            Ok(())
308 1
        }
309 1
    }
310

311 0
    pub fn recover_drop(&mut self, drop: &DropSegment) {
312 0
        self.segments_operations.push(SegmentOperation::DROP(drop.clone()));
313 0
        self.segs_dropped.insert(drop.segment_id);
314 0
        self.segs_dropped_names.insert(drop.name.clone());
315 0
    }
316

317 1
    pub fn add_read(&mut self, journal: &Journal, segment: SegmentId, recref: &RecRef, version: u16) -> PRes<()> {
318 1
        if self.strategy == TxStrategy::VersionOnRead {
319 1
            let read = ReadRecord::new(segment, recref, version);
320 1
            journal.log(&read, &self.id)?;
321 1
            self.read.insert(recref.clone(), read);
322
        }
323 1
        Ok(())
324 1
    }
325

326 0
    pub fn recover_read(&mut self, read: &ReadRecord) {
327 0
        self.read.insert(read.recref.clone(), read.clone());
328 0
    }
329

330 1
    pub fn add_insert(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, record: u64) -> PRes<()> {
331 1
        self.segs_updated.insert(segment);
332 1
        let insert = InsertRecord::new(segment, rec_ref, record);
333

334 1
        journal.log(&insert, &self.id)?;
335 1
        self.inserted.push(insert);
336 1
        Ok(())
337 1
    }
338 1
    pub fn add_new_segment_page(
339
        &mut self,
340
        journal: &Journal,
341
        segment: SegmentId,
342
        new_page: u64,
343
        previous_page: u64,
344
    ) -> PRes<()> {
345 1
        let new_page = NewSegmentPage::new(segment, new_page, previous_page);
346

347 1
        journal.log(&new_page, &self.id)?;
348 1
        self.segs_new_pages.push(new_page);
349 1
        Ok(())
350 1
    }
351

352 1
    pub fn recover_insert(&mut self, insert: &InsertRecord) {
353 1
        self.segs_updated.insert(insert.segment);
354 1
        self.inserted.push(insert.clone());
355 1
    }
356

357 1
    pub fn add_update(
358
        &mut self,
359
        journal: &Journal,
360
        segment: SegmentId,
361
        rec_ref: &RecRef,
362
        record: u64,
363
        version: u16,
364
    ) -> PRes<()> {
365 1
        self.segs_updated.insert(segment);
366 1
        let update = UpdateRecord::new(segment, rec_ref, record, version);
367 1
        journal.log(&update, &self.id)?;
368 1
        self.updated.push(update);
369 1
        Ok(())
370 1
    }
371

372 1
    pub fn recover_update(&mut self, update: &UpdateRecord) {
373 1
        self.segs_updated.insert(update.segment);
374 1
        self.updated.push(update.clone());
375 1
    }
376

377 1
    pub fn add_delete(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, version: u16) -> PRes<()> {
378 1
        self.segs_updated.insert(segment);
379 1
        let delete = DeleteRecord::new(segment, rec_ref, version);
380 1
        journal.log(&delete, &self.id)?;
381 1
        self.deleted.push(delete);
382 1
        Ok(())
383 1
    }
384

385 1
    pub fn add_put<K, V>(&mut self, index: IndexId, k: K, v: V)
386
    where
387
        K: IndexType,
388
        V: IndexType,
389
    {
390 1
        if let Some(ref mut indexes) = self.indexes {
391 1
            indexes.put(index, k, v);
392
        }
393 1
    }
394

395 1
    pub fn add_remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
396
    where
397
        K: IndexType,
398
        V: IndexType,
399
    {
400 1
        if let Some(ref mut indexes) = self.indexes {
401 1
            indexes.remove(index, k, v);
402
        }
403 1
    }
404

405 1
    pub fn apply_changes<K, V>(
406
        &self,
407
        vm: ValueMode,
408
        index: IndexId,
409
        k: &K,
410
        pers: Option<Value<V>>,
411
    ) -> PRes<Option<Value<V>>>
412
    where
413
        K: IndexType,
414
        V: IndexType,
415
    {
416 1
        if let Some(ref indexes) = self.indexes {
417 1
            indexes.apply_changes(index, vm, k, pers)
418
        } else {
419 0
            Ok(pers)
420
        }
421 1
    }
422

423 1
    pub fn index_range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
424
    where
425
        K: IndexType,
426
        V: IndexType,
427
        R: RangeBounds<K>,
428
    {
429 1
        if let Some(ind) = &self.indexes {
430 1
            ind.range::<K, V, R>(index, range)
431
        } else {
432 0
            None
433
        }
434 1
    }
435

436 1
    pub fn recover_delete(&mut self, delete: &DeleteRecord) {
437 1
        self.segs_updated.insert(delete.segment);
438 1
        self.deleted.push(delete.clone());
439 1
    }
440

441 1
    pub fn scan_insert(&self, seg: SegmentId) -> TransactionInsertScanner {
442 1
        TransactionInsertScanner { tx: self, segment: seg }
443 1
    }
444

445 1
    pub fn read(&self, rec_ref: &RecRef) -> TxRead {
446 1
        for ele in &self.deleted {
447 1
            if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
448 1
                return TxRead::DELETED;
449
            }
450
        }
451 1
        if let Some(ele) = self
452
            .updated
453
            .iter()
454
            .rev()
455 1
            .find(|ele| ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos)
456
        {
457 1
            return TxRead::RECORD((ele.record_page, ele.version));
458
        }
459 1
        for ele in &self.inserted {
460 1
            if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
461 1
                return TxRead::RECORD((ele.record_page, 1));
462
            }
463
        }
464 1
        TxRead::NONE
465 1
    }
466

467 1
    pub fn recover_prepare(&mut self, persy_impl: &PersyImpl) -> PRes<PreparedState> {
468 1
        let address = persy_impl.address();
469

470 1
        let mut prepared = PreparedState::new();
471 1
        let _ = self.collapse_operations();
472 1
        let (records, crt_upd_segs, dropped_segs) = self.coll_locks();
473 1
        if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
474 0
            self.recover_rollback(persy_impl)?;
475 0
            return Err(x);
476 1
        }
477 1
        prepared.data_locks = Some((records.clone(), crt_upd_segs.clone(), dropped_segs));
478 1
        let check_version = self.strategy != TxStrategy::LastWin;
479 1
        if let Err(x) = address.check_persistent_records(&records, check_version) {
480 0
            self.recover_rollback(persy_impl)?;
481 0
            return Err(x);
482 1
        }
483 1
        if let Err(x) = address.confirm_allocations(&crt_upd_segs, true) {
484 0
            self.recover_rollback(persy_impl)?;
485 0
            return Err(x);
486 1
        }
487 1
        Ok(prepared)
488 1
    }
489

490 1
    fn solve_index_locks(&self) -> Vec<(SegmentId, RecRef, u16)> {
491 1
        let mut records = HashSet::new();
492 1
        for update in &self.updated {
493 1
            if self.locked_index_pages.contains(&update.recref) {
494 1
                records.insert((update.segment, update.recref.clone(), update.version));
495
            }
496
        }
497

498 1
        for delete in &self.deleted {
499 1
            if self.locked_index_pages.contains(&delete.recref) {
500 0
                records.insert((delete.segment, delete.recref.clone(), delete.version));
501
            }
502
        }
503

504 1
        records.into_iter().collect()
505 1
    }
506

507 1
    pub fn prepare(mut self, persy_impl: &PersyImpl) -> PRes<(Transaction, PreparedState)> {
508 1
        let indexes = persy_impl.indexes();
509 1
        let allocator = persy_impl.allocator();
510 1
        let journal = persy_impl.journal();
511 1
        let snapshots = persy_impl.snapshots();
512 1
        let address = persy_impl.address();
513

514 1
        let mut prepared = PreparedState::new();
515 1
        let ind = self.indexes;
516 1
        self.indexes = None;
517 1
        if let Some(mut ind_change) = ind {
518
            // TODO: handle dropped indexes in the same transaction
519 1
            let changed_indexes = ind_change.changed_indexes();
520 1
            for check in changed_indexes {
521 1
                let segment_meta = index_id_to_segment_id_meta(&check);
522 1
                if self.segs_dropped.contains(&segment_meta) {
523 0
                    ind_change.remove_changes(&check);
524
                }
525 1
            }
526

527 1
            let mut to_lock = ind_change.changed_indexes();
528 1
            to_lock.sort();
529 1
            if let Err(err) = indexes.read_lock_all(&to_lock) {
530 0
                prepared.locked_indexes = Some(to_lock);
531 1
                self.rollback_prepared(persy_impl, prepared)?;
532 0
                return Err(err);
533 1
            }
534 1
            for index_id in &to_lock {
535 1
                let segment_meta = index_id_to_segment_id_meta(index_id);
536 1
                address.acquire_segment_read_lock(segment_meta)?;
537 1
                self.locked_index_segs.insert(segment_meta);
538 1
                let segment_data = index_id_to_segment_id_data(index_id);
539 1
                address.acquire_segment_read_lock(segment_data)?;
540 1
                self.locked_index_segs.insert(segment_data);
541
            }
542 1
            prepared.locked_indexes = Some(to_lock);
543 1
            if let Err(x) = ind_change.apply(persy_impl, &mut self) {
544 1
                self.rollback_prepared(persy_impl, prepared)?;
545 1
                return Err(x);
546 1
            }
547 1
        }
548

549 1
        let mut freed_pages = self.collapse_operations();
550

551 1
        let (mut records, crt_upd_segs, dropped_segs) = self.coll_locks();
552 1
        if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
553 1
            self.rollback_prepared(persy_impl, prepared)?;
554 1
            return Err(x);
555 1
        };
556 1
        records.extend_from_slice(&self.solve_index_locks());
557 1
        prepared.data_locks = Some((records.clone(), crt_upd_segs, dropped_segs));
558

559 1
        let check_version = self.strategy != TxStrategy::LastWin;
560 1
        let old_records = match address.check_persistent_records(&records, check_version) {
561 1
            Ok(old) => old,
562 1
            Err(x) => {
563 1
                self.rollback_prepared(persy_impl, prepared)?;
564 1
                return Err(x);
565 1
            }
566 0
        };
567 1
        let segs: Vec<_> = self.segs_updated.iter().copied().collect();
568 1
        if let Err(x) = address.confirm_allocations(&segs, false) {
569 0
            self.rollback_prepared(persy_impl, prepared)?;
570 0
            return Err(x);
571 1
        }
572

573 1
        for dropped_seg in &self.segs_dropped {
574 1
            let pages = address.collect_segment_pages(*dropped_seg)?;
575 1
            for p in pages.into_iter().map(FreedPage::new) {
576 1
                freed_pages.insert(p);
577 1
            }
578 1
        }
579 1
        let mut snapshot_entries = Vec::with_capacity(old_records.len());
580 1
        for old_record in &old_records {
581 1
            freed_pages.insert(FreedPage::new(old_record.record_page));
582 1
            snapshot_entries.push(SnapshotEntry::change(
583
                &old_record.recref,
584 1
                old_record.record_page,
585
                old_record.version,
586
            ));
587
        }
588 1
        for insert in &self.inserted {
589 1
            snapshot_entries.push(SnapshotEntry::insert(&insert.recref));
590
        }
591 1
        for freed_page in &freed_pages {
592 1
            journal.log(freed_page, &self.id)?;
593
        }
594 1
        let mut freed_pages_vec: Vec<_> = freed_pages.into_iter().collect();
595 1
        freed_pages_vec.reverse();
596 1
        let snapshot_id = snapshots.snapshot(snapshot_entries, freed_pages_vec.clone(), self.id.clone())?;
597 1
        prepared.snapshot_id = Some(snapshot_id);
598 1
        self.freed_pages = Some(freed_pages_vec);
599 1
        journal.prepare(&PrepareCommit::new(), &self.id)?;
600 1
        allocator.flush_free_list()?;
601 1
        self.sync(persy_impl, snapshot_id)?;
602 1
        journal.clear_in_queque()?;
603 1
        Ok((self, prepared))
604 1
    }
605

606 1
    fn sync(&self, persy_impl: &PersyImpl, snapshot_id: SnapshotId) -> PRes<()> {
607 1
        persy_impl.transaction_sync(&self.sync_mode, snapshot_id)
608 1
    }
609

610 1
    fn collapse_operations(&mut self) -> BTreeSet<FreedPage> {
611 1
        let mut pages_to_free = BTreeSet::new();
612 1
        let mut inserted_by_id = HashMap::new();
613 1
        for insert in self.inserted.drain(..) {
614 1
            inserted_by_id.insert(insert.recref.clone(), insert);
615 1
        }
616

617 1
        let mut updated_by_id = HashMap::new();
618 1
        for update in self.updated.drain(..) {
619 1
            match updated_by_id.entry(update.recref.clone()) {
620 1
                Entry::Vacant(e) => {
621 1
                    e.insert(update);
622
                }
623 1
                Entry::Occupied(mut e) => {
624 1
                    pages_to_free.insert(FreedPage::new(e.get().record_page));
625 1
                    e.get_mut().record_page = update.record_page;
626
                }
627
            }
628 1
        }
629

630 1
        for (k, insert) in &mut inserted_by_id {
631 1
            if let Some(update) = updated_by_id.remove(&k) {
632 1
                pages_to_free.insert(FreedPage::new(insert.record_page));
633 1
                insert.record_page = update.record_page;
634
            }
635
        }
636

637 1
        let mut i = 0;
638 1
        while i != self.deleted.len() {
639 1
            if let Some(insert) = inserted_by_id.remove(&self.deleted[i].recref) {
640 1
                self.deleted.remove(i);
641 1
                pages_to_free.insert(FreedPage::new(insert.record_page));
642
            } else {
643 1
                i += 1;
644
            }
645
        }
646

647 1
        for delete in &self.deleted {
648 1
            if let Some(update) = updated_by_id.remove(&delete.recref) {
649 1
                pages_to_free.insert(FreedPage::new(update.record_page));
650
            }
651
        }
652

653 1
        for (_, insert) in inserted_by_id.drain() {
654 1
            if self.segs_dropped.contains(&insert.segment) {
655 1
                pages_to_free.insert(FreedPage::new(insert.record_page));
656
            } else {
657 1
                self.inserted.push(insert);
658
            }
659 1
        }
660

661 1
        for (_, update) in updated_by_id.drain() {
662 1
            if self.segs_dropped.contains(&update.segment) {
663 0
                pages_to_free.insert(FreedPage::new(update.record_page));
664
            } else {
665 1
                self.updated.push(update);
666
            }
667 1
        }
668 1
        pages_to_free
669 1
    }
670

671 1
    fn coll_locks(&self) -> (Vec<(SegmentId, RecRef, u16)>, Vec<SegmentId>, Vec<SegmentId>) {
672 1
        let mut crt_upd_segs = Vec::new();
673 1
        for create in &self.segs_created {
674 1
            if !&self.segs_dropped.contains(create) && !self.locked_index_segs.contains(create) {
675 1
                crt_upd_segs.push(*create);
676
            }
677
        }
678 1
        for update in &self.segs_updated {
679 1
            if !&self.segs_dropped.contains(update) && !self.locked_index_segs.contains(update) {
680 1
                crt_upd_segs.push(*update);
681
            }
682
        }
683

684 1
        let mut dropped_segs: Vec<_> = self.segs_dropped.iter().copied().collect();
685 1
        let mut records = HashSet::new();
686

687
        // No need to lock on inserted records the new id unique is managed by the address.
688
        //
689 1
        for update in &self.updated {
690 1
            let mut version = update.version;
691
            // I found values in the read only for VersionOnRead
692 1
            if let Some(read_v) = self.read.get(&update.recref) {
693 1
                version = read_v.version;
694
            }
695 1
            if !self.locked_index_pages.contains(&update.recref) {
696 1
                records.insert((update.segment, update.recref.clone(), version));
697
            }
698
        }
699

700 1
        for delete in &self.deleted {
701 1
            let mut version = delete.version;
702
            // I found values in the read only for VersionOnRead
703 1
            if let Some(read_v) = self.read.get(&delete.recref) {
704 0
                version = read_v.version;
705
            }
706 1
            if !self.locked_index_pages.contains(&delete.recref) {
707 1
                records.insert((delete.segment, delete.recref.clone(), version));
708
            }
709
        }
710

711 1
        for insert in &self.inserted {
712 1
            records.remove(&(insert.segment, insert.recref.clone(), 1));
713
        }
714

715 1
        let mut records: Vec<(SegmentId, RecRef, u16)> = records.into_iter().collect();
716 1
        records.sort_by_key(|ref x| x.1.clone());
717 1
        crt_upd_segs.sort();
718 1
        dropped_segs.sort();
719 1
        (records, crt_upd_segs, dropped_segs)
720 1
    }
721

722 1
    fn internal_rollback(&self, persy_impl: &PersyImpl) -> PRes<Vec<(SegmentId, u64)>> {
723 1
        let allocator = persy_impl.allocator();
724 1
        let address = persy_impl.address();
725

726 1
        let dropped_segs: Vec<_> = self.segs_created.iter().copied().collect();
727 1
        let address_to_free = address.rollback(&self.inserted)?;
728 1
        for insert in &self.inserted {
729 1
            if dropped_segs.contains(&insert.segment) {
730 1
                allocator.free(insert.record_page)?;
731
            }
732
        }
733

734 1
        for create in &self.segs_created {
735 1
            address.drop_temp_segment(*create)?;
736
        }
737

738 1
        for update in &self.updated {
739 1
            if dropped_segs.contains(&update.segment) {
740 0
                allocator.free(update.record_page)?;
741
            }
742
        }
743 1
        Ok(address_to_free)
744 1
    }
745

746 1
    pub fn recover_rollback(&self, persy_impl: &PersyImpl) -> PRes<()> {
747 1
        let allocator = persy_impl.allocator();
748 1
        let journal = persy_impl.journal();
749 1
        let address = persy_impl.address();
750

751 1
        journal.end(&Rollback::new(), &self.id)?;
752 1
        let address_to_free = self.internal_rollback(persy_impl)?;
753 1
        journal.finished_to_clean(&[self.id.clone()])?;
754 1
        if !address_to_free.is_empty() {
755 0
            address.clear_empty(&address_to_free)?;
756 0
            for (_, page) in address_to_free {
757 0
                allocator.free(page)?;
758 0
            }
759
        }
760 1
        Ok(())
761 1
    }
762

763 1
    pub fn rollback(&mut self, persy_impl: &PersyImpl) -> PRes<()> {
764 1
        let journal = persy_impl.journal();
765 1
        journal.end(&Rollback::new(), &self.id)?;
766 1
        let address_to_free = self.internal_rollback(persy_impl)?;
767 1
        journal.finished_to_clean(&[self.id.clone()])?;
768 1
        self.free_address_structures(address_to_free, persy_impl)?;
769 1
        Ok(())
770 1
    }
771

772 1
    pub fn rollback_prepared(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PRes<()> {
773 1
        let indexes = persy_impl.indexes();
774 1
        let allocator = persy_impl.allocator();
775 1
        let journal = persy_impl.journal();
776 1
        let snapshots = persy_impl.snapshots();
777 1
        let address = persy_impl.address();
778

779 1
        journal.end(&Rollback::new(), &self.id)?;
780

781 1
        let address_to_free = address.rollback(&self.inserted)?;
782 1
        self.internal_rollback(persy_impl)?;
783

784 1
        let mut indexes_to_unlock = self.locked_index_pages.clone();
785 1
        if let Some((records, crt_upd_segs, delete_segs)) = &prepared.data_locks {
786 1
            for rec in records {
787 1
                indexes_to_unlock.remove(&rec.1);
788
            }
789 1
            address.release_locks(records.iter().map(|(_, id, _)| id), crt_upd_segs, delete_segs)?;
790
        }
791

792 1
        address.release_locks(
793 1
            indexes_to_unlock.iter(),
794 1
            &self.locked_index_segs.iter().copied().collect::<Vec<_>>(),
795
            &[],
796 1
        )?;
797

798 1
        if let Some(il) = &prepared.locked_indexes {
799 1
            indexes.read_unlock_all(il)?;
800
        }
801 1
        self.free_address_structures(address_to_free, persy_impl)?;
802 1
        if let Some(snapshot_id) = prepared.snapshot_id {
803 1
            release_snapshot(snapshot_id, snapshots, allocator, journal)?;
804
        }
805 1
        Ok(())
806 1
    }
807

808 1
    pub fn recover_commit(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PRes<()> {
809 1
        let allocator = persy_impl.allocator();
810 1
        let journal = persy_impl.journal();
811 1
        let address = persy_impl.address();
812

813 1
        let address_to_free = self.internal_commit(persy_impl, true, &prepared)?;
814 1
        journal.end(&Commit::new(), &self.id)?;
815 1
        if !address_to_free.is_empty() {
816 0
            address.clear_empty(&address_to_free)?;
817 0
            for (_, page) in address_to_free {
818 0
                allocator.recover_free(page)?;
819 0
            }
820
        }
821 1
        self.recover_cleanup(persy_impl)
822 1
    }
823

824 1
    pub fn recover_cleanup(&self, persy_impl: &PersyImpl) -> PRes<()> {
825 1
        let allocator = persy_impl.allocator();
826 1
        if let Some(ref up_free) = self.freed_pages {
827 1
            for to_free in up_free {
828 1
                allocator.recover_free(to_free.page)?;
829
            }
830
        }
831 1
        persy_impl.allocator().disc().sync()?;
832 1
        Ok(())
833 1
    }
834

835 1
    fn free_pages_tx(
836
        &self,
837
        journal: &Journal,
838
        pages_to_free: &[(SegmentId, u64)],
839
    ) -> PRes<(JournalId, Vec<FreedPage>)> {
840 1
        let id = journal.start()?;
841 1
        let mut freed = Vec::new();
842 1
        for (_, page) in pages_to_free {
843 1
            let fp = FreedPage::new(*page);
844 1
            journal.log(&fp, &id)?;
845 1
            freed.push(fp);
846
        }
847 1
        journal.log(&PrepareCommit::new(), &id)?;
848 1
        journal.end(&Commit::new(), &id)?;
849 1
        Ok((id, freed))
850 1
    }
851

852 1
    fn internal_commit(
853
        &mut self,
854
        persy_impl: &PersyImpl,
855
        recover: bool,
856
        prepared: &PreparedState,
857
    ) -> PRes<Vec<(SegmentId, u64)>> {
858 1
        let indexes = persy_impl.indexes();
859 1
        let address = persy_impl.address();
860

861 1
        let pages_to_unlink = address.apply(
862 1
            &self.segs_new_pages,
863 1
            &self.inserted,
864 1
            &self.updated,
865 1
            &self.deleted,
866 1
            &self.segments_operations,
867
            recover,
868 1
        )?;
869 1
        let mut indexes_to_unlock = self.locked_index_pages.clone();
870 1
        if let Some((records, crt_upd_segs, deleted_segs)) = &prepared.data_locks {
871 1
            for rec in records {
872 1
                indexes_to_unlock.remove(&rec.1);
873
            }
874 1
            address.release_locks(records.iter().map(|(_, id, _)| id), &crt_upd_segs, &deleted_segs)?;
875
        }
876

877 1
        address.release_locks(
878 1
            indexes_to_unlock.iter(),
879 1
            &self.locked_index_segs.iter().copied().collect::<Vec<_>>(),
880
            &[],
881 1
        )?;
882

883 1
        if let Some(il) = &prepared.locked_indexes {
884 1
            indexes.read_unlock_all(il)?;
885
        }
886

887 1
        Ok(pages_to_unlink)
888 1
    }
889

890 1
    fn free_address_structures(&self, address_to_free: Vec<(SegmentId, u64)>, persy_impl: &PersyImpl) -> PRes<()> {
891 1
        let allocator = persy_impl.allocator();
892 1
        let journal = persy_impl.journal();
893 1
        let snapshots = persy_impl.snapshots();
894 1
        let address = persy_impl.address();
895 1
        if !address_to_free.is_empty() {
896 1
            let (tx_id, freed) = self.free_pages_tx(journal, &address_to_free)?;
897 1
            address.clear_empty(&address_to_free)?;
898 1
            let add_snap_id = snapshots.snapshot(Vec::new(), freed, tx_id)?;
899 1
            self.sync(persy_impl, add_snap_id)?;
900
            // Is a only free page snapshot not a logic snapshot, release straight away, the
901
            // purpose of this is to delay the free of pages till are not in use anymore
902 1
            release_snapshot(add_snap_id, snapshots, allocator, journal)?;
903 1
        }
904 1
        Ok(())
905 1
    }
906

907 1
    pub fn commit(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PRes<()> {
908 1
        let allocator = persy_impl.allocator();
909 1
        let journal = persy_impl.journal();
910 1
        let snapshots = persy_impl.snapshots();
911

912 1
        let address_to_free = self.internal_commit(persy_impl, false, &prepared)?;
913 1
        journal.end(&Commit::new(), &self.id)?;
914 1
        self.free_address_structures(address_to_free, persy_impl)?;
915

916 1
        if let Some(snapshot_id) = prepared.snapshot_id {
917 1
            release_snapshot(snapshot_id, snapshots, allocator, journal)?;
918
        }
919 1
        Ok(())
920 1
    }
921

922 1
    pub fn recover_metadata(&mut self, metadata: &Metadata) {
923 1
        self.strategy = metadata.strategy.clone();
924 1
        self.meta_id = metadata.meta_id.clone();
925 1
    }
926

927 1
    pub fn recover_freed_page(&mut self, freed: &FreedPage) {
928 1
        self.freed_pages.get_or_insert(Vec::new()).push(freed.clone());
929 1
    }
930

931 0
    pub fn recover_new_segment_page(&mut self, new_page: &NewSegmentPage) {
932 0
        self.segs_new_pages.push(new_page.clone());
933 0
    }
934

935 1
    pub fn meta_id<'a>(&'a self) -> &'a Vec<u8> {
936
        &self.meta_id
937 1
    }
938

939 1
    pub fn filter_list<'a>(
940
        &'a self,
941
        pers: &'a [(String, SegmentId)],
942
    ) -> impl Iterator<Item = (&'a str, SegmentId)> + 'a {
943 1
        let outer = pers.iter().map(|(name, id)| (name.as_str(), *id));
944 1
        let inner = self.segments_operations.iter().filter_map(|seg| {
945 1
            if let SegmentOperation::CREATE(c) = seg {
946 1
                Some((c.name.as_str(), c.segment_id))
947
            } else {
948 1
                None
949
            }
950 1
        });
951

952 1
        outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
953 1
    }
954

955 1
    pub fn lock_record(&mut self, address: &Address, segment: SegmentId, id: &RecRef, version: u16) -> PRes<bool> {
956 1
        let locked_page = if !self.locked_index_pages.contains(id) {
957 1
            address.acquire_record_lock(id)?;
958 1
            true
959
        } else {
960 0
            false
961
        };
962

963 1
        if address
964 1
            .check_persistent_records(&[(segment, id.clone(), version)], true)
965 1
            .is_ok()
966
        {
967 1
            if locked_page {
968 1
                self.locked_index_pages.insert(id.clone());
969 1
                self.locked_index_tracking.insert((segment, id.clone(), version));
970
            }
971 1
            Ok(true)
972
        } else {
973 1
            if locked_page {
974 1
                address.release_record_lock(id)?;
975
            }
976 1
            Ok(false)
977
        }
978 1
    }
979

980 1
    pub fn unlock_record(&mut self, address: &Address, _segment: SegmentId, id: &RecRef) -> PRes<()> {
981 1
        assert!(self.locked_index_pages.remove(id));
982 1
        address.release_record_lock(id)?;
983 1
        Ok(())
984 1
    }
985
}
986

987
impl DeleteRecord {
988 1
    pub fn new(segment: SegmentId, rec_ref: &RecRef, version: u16) -> DeleteRecord {
989 1
        DeleteRecord {
990
            segment,
991 1
            recref: rec_ref.clone(),
992
            version,
993
        }
994 1
    }
995
}
996

997
impl UpdateRecord {
998 1
    pub fn new(segment: SegmentId, rec_ref: &RecRef, record: u64, version: u16) -> UpdateRecord {
999 1
        UpdateRecord {
1000
            segment,
1001 1
            recref: rec_ref.clone(),
1002
            record_page: record,
1003
            version,
1004
        }
1005 1
    }
1006
}
1007

1008
impl ReadRecord {
1009 1
    pub fn new(segment: SegmentId, recref: &RecRef, version: u16) -> ReadRecord {
1010 1
        ReadRecord {
1011
            segment,
1012 1
            recref: recref.clone(),
1013
            version,
1014
        }
1015 1
    }
1016
}
1017

1018
impl PrepareCommit {
1019 1
    pub fn new() -> PrepareCommit {
1020
        PrepareCommit {}
1021 1
    }
1022
}
1023

1024
impl Commit {
1025 1
    pub fn new() -> Commit {
1026
        Commit {}
1027 1
    }
1028
}
1029

1030
impl Rollback {
1031 1
    pub fn new() -> Rollback {
1032
        Rollback {}
1033 1
    }
1034
}
1035

1036
impl Cleanup {
1037 1
    pub fn new() -> Cleanup {
1038
        Cleanup {}
1039 1
    }
1040
}
1041

1042
impl InsertRecord {
1043 1
    pub fn new(segment: SegmentId, rec_ref: &RecRef, record: u64) -> InsertRecord {
1044 1
        InsertRecord {
1045
            segment,
1046 1
            recref: rec_ref.clone(),
1047
            record_page: record,
1048
        }
1049 1
    }
1050
}
1051

1052
impl CreateSegment {
1053 1
    pub fn new(name: &str, segment_id: SegmentId, first_page: u64) -> CreateSegment {
1054 1
        CreateSegment {
1055 1
            name: name.into(),
1056
            segment_id,
1057
            first_page,
1058
        }
1059 1
    }
1060
}
1061

1062
impl DropSegment {
1063 1
    pub fn new(name: &str, segment_id: SegmentId) -> DropSegment {
1064 1
        DropSegment {
1065 1
            name: name.into(),
1066
            segment_id,
1067
        }
1068 1
    }
1069
}
1070

1071
impl Metadata {
1072 1
    pub fn new(strategy: &TxStrategy, meta_id: Vec<u8>) -> Metadata {
1073 1
        Metadata {
1074 1
            strategy: strategy.clone(),
1075 1
            meta_id,
1076
        }
1077 1
    }
1078
}
1079

1080
impl FreedPage {
1081 1
    pub fn new(page: u64) -> FreedPage {
1082 1
        FreedPage { page }
1083 1
    }
1084
}
1085

1086
impl NewSegmentPage {
1087 1
    pub fn new(segment: SegmentId, page: u64, previous: u64) -> NewSegmentPage {
1088 1
        NewSegmentPage {
1089
            segment,
1090
            page,
1091
            previous,
1092
        }
1093 1
    }
1094
}
1095

1096
#[cfg(test)]
1097
mod tests {
1098
    use super::{DeleteRecord, FreedPage, InsertRecord, Transaction, UpdateRecord};
1099
    use crate::{
1100
        id::{RecRef, SegmentId},
1101
        journal::JournalId,
1102
    };
1103

1104
    #[test]
1105 1
    fn test_scan_insert() {
1106 1
        let segment_id = SegmentId::new(10);
1107 1
        let segment_id_other = SegmentId::new(20);
1108 1
        let mut tx = Transaction::recover(JournalId::new(0, 0));
1109 1
        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
1110 1
        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(4, 2), 2));
1111 1
        tx.inserted
1112 1
            .push(InsertRecord::new(segment_id_other, &RecRef::new(0, 1), 3));
1113 1
        let mut count = 0;
1114 1
        for x in tx.scan_insert(segment_id) {
1115 1
            assert_eq!(x.pos, 2);
1116 1
            count += 1;
1117 1
        }
1118 1
        assert_eq!(count, 2);
1119 1
    }
1120

1121
    #[test]
1122 1
    fn test_collapse() {
1123 1
        let segment_id = SegmentId::new(10);
1124 1
        let segment_id_other = SegmentId::new(20);
1125 1
        let mut tx = Transaction::recover(JournalId::new(0, 0));
1126 1
        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 1), 1));
1127 1
        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
1128 1
        tx.inserted
1129 1
            .push(InsertRecord::new(segment_id_other, &RecRef::new(3, 3), 3));
1130 1
        tx.inserted
1131 1
            .push(InsertRecord::new(segment_id_other, &RecRef::new(3, 4), 4));
1132 1
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 5, 1));
1133 1
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 6, 1));
1134 1
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 2), 7, 1));
1135 1
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 8, 1));
1136 1
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 9, 1));
1137 1
        tx.updated
1138 1
            .push(UpdateRecord::new(segment_id, &RecRef::new(3, 6), 10, 1));
1139 1
        tx.updated
1140 1
            .push(UpdateRecord::new(segment_id, &RecRef::new(3, 7), 11, 1));
1141 1
        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 1), 0));
1142 1
        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 3), 1));
1143 1
        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 6), 2));
1144 1
        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 8), 2));
1145 1
        let free = tx.collapse_operations();
1146 1
        assert_eq!(free.len(), 7);
1147 1
        for e in [1, 2, 3, 5, 6, 8, 10].iter().map(|x| FreedPage::new(*x)) {
1148 1
            assert!(free.contains(&e));
1149
        }
1150 1
        assert_eq!(tx.inserted.len(), 2);
1151 1
        assert_eq!(tx.updated.len(), 2);
1152 1
        assert_eq!(tx.deleted.len(), 2);
1153 1
    }
1154
}

Read our documentation on viewing source code .

Loading