1
use crate::index::{
2
    config::{IndexType, ValueMode},
3
    keeper::IndexTransactionKeeper,
4
    nodes::Value,
5
};
6
use crate::{
7
    address::Address,
8
    config::TxStrategy,
9
    error::{PRes, PersyError},
10
    id::{index_id_to_segment_id_data, index_id_to_segment_id_meta, IndexId, RecRef},
11
    journal::{Journal, JournalId},
12
    persy::PersyImpl,
13
    snapshot::{release_snapshot, SnapshotEntry, SnapshotId},
14
};
15
use std::{
16
    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
17
    ops::RangeBounds,
18
    vec::{self, IntoIter},
19
};
20

21 0
#[derive(Clone, Default)]
22
pub struct NewSegmentPage {
23 0
    pub segment: u32,
24 0
    pub page: u64,
25 0
    pub previous: u64,
26
}
27

28 1
#[derive(Clone, Default)]
29
pub struct InsertRecord {
30 1
    pub segment: u32,
31 1
    pub recref: RecRef,
32 1
    pub record_page: u64,
33
}
34

35 1
#[derive(Clone, Default)]
36
pub struct UpdateRecord {
37 1
    pub segment: u32,
38 1
    pub recref: RecRef,
39 1
    pub record_page: u64,
40 1
    pub version: u16,
41
}
42

43 0
#[derive(Clone, Default)]
44
pub struct ReadRecord {
45 0
    pub segment: u32,
46 0
    pub recref: RecRef,
47 0
    pub version: u16,
48
}
49

50 1
#[derive(Clone, Default)]
51
pub struct DeleteRecord {
52 1
    pub segment: u32,
53 1
    pub recref: RecRef,
54 1
    pub version: u16,
55
}
56

57 1
#[derive(Clone, Default)]
58
pub struct CreateSegment {
59 1
    pub name: String,
60 1
    pub segment_id: u32,
61 1
    pub first_page: u64,
62
}
63

64 0
#[derive(Clone, Default)]
65
pub struct DropSegment {
66 0
    pub name: String,
67 0
    pub segment_id: u32,
68
}
69

70 1
#[derive(Clone, PartialEq, Debug, PartialOrd, Ord, Eq, Default)]
71
pub struct FreedPage {
72 1
    pub page: u64,
73
}
74

75 1
#[derive(Default)]
76
pub struct PrepareCommit {}
77

78 1
#[derive(Default)]
79
pub struct Commit {}
80

81 1
#[derive(Default)]
82
pub struct Cleanup {}
83

84 1
#[derive(Default)]
85
pub struct Rollback {}
86

87 1
#[derive(Default)]
88
pub struct Metadata {
89 1
    pub strategy: TxStrategy,
90 1
    pub meta_id: Vec<u8>,
91
}
92

93
pub enum SegmentOperation {
94
    CREATE(CreateSegment),
95
    DROP(DropSegment),
96
}
97

98 1
#[derive(Clone)]
99
pub struct PreparedState {
100 1
    locked_indexes: Option<Vec<IndexId>>,
101 1
    snapshot_id: Option<SnapshotId>,
102 1
    data_locks: Option<(Vec<(u32, RecRef, u16)>, Vec<u32>, Vec<u32>)>,
103
}
104

105
impl PreparedState {
106 1
    fn new() -> PreparedState {
107 1
        PreparedState {
108 1
            locked_indexes: None,
109 1
            snapshot_id: None,
110 1
            data_locks: None,
111
        }
112 1
    }
113
}
114

115
pub enum SyncMode {
116
    Sync,
117
    BackgroundSync,
118
}
119

120
pub struct Transaction {
121
    strategy: TxStrategy,
122
    sync_mode: SyncMode,
123
    meta_id: Vec<u8>,
124
    id: JournalId,
125
    inserted: Vec<InsertRecord>,
126
    updated: Vec<UpdateRecord>,
127
    deleted: Vec<DeleteRecord>,
128
    read: HashMap<RecRef, ReadRecord>,
129
    segments_operations: Vec<SegmentOperation>,
130
    segs_created_names: HashSet<String>,
131
    segs_dropped_names: HashSet<String>,
132
    segs_created: HashSet<u32>,
133
    segs_dropped: HashSet<u32>,
134
    segs_updated: HashSet<u32>,
135
    freed_pages: Option<Vec<FreedPage>>,
136
    indexes: Option<IndexTransactionKeeper>,
137
    segs_new_pages: Vec<NewSegmentPage>,
138
    locked_index_segs: HashSet<u32>,
139
    locked_index_pages: HashSet<RecRef>,
140
    locked_index_tracking: HashSet<(u32, RecRef, u16)>,
141
}
142

143
pub enum TxRead {
144
    RECORD((u64, u16)),
145
    DELETED,
146
    NONE,
147
}
148

149
pub enum TxSegCheck {
150
    CREATED(u32),
151
    DROPPED,
152
    NONE,
153
}
154

155
pub struct TransactionInsertScanner<'a> {
156
    tx: &'a Transaction,
157
    segment: u32,
158
}
159

160
pub struct TransactionInsertIterator {
161
    iter: vec::IntoIter<InsertRecord>,
162
    segment: u32,
163
}
164

165
impl<'a> IntoIterator for TransactionInsertScanner<'a> {
166
    type Item = RecRef;
167
    type IntoIter = TransactionInsertIterator;
168

169 1
    fn into_iter(self) -> Self::IntoIter {
170 1
        let iter: vec::IntoIter<InsertRecord> = self.tx.inserted.clone().into_iter();
171 1
        TransactionInsertIterator {
172 1
            iter,
173
            segment: self.segment,
174
        }
175 1
    }
176
}
177

178
impl Iterator for TransactionInsertIterator {
179
    type Item = RecRef;
180 1
    fn next(&mut self) -> Option<RecRef> {
181 1
        loop {
182 1
            let next = self.iter.next();
183 1
            if let Some(rec) = next {
184 1
                if rec.segment == self.segment {
185 1
                    return Some(rec.recref);
186
                }
187
            } else {
188 1
                return None;
189
            }
190
        }
191 1
    }
192
}
193

194
impl Transaction {
195 1
    pub fn new(journal: &Journal, strategy: &TxStrategy, sync_mode: SyncMode, meta_id: Vec<u8>) -> PRes<Transaction> {
196 1
        let id = journal.start()?;
197 1
        journal.log(&Metadata::new(strategy, meta_id.clone()), &id)?;
198 1
        Ok(Transaction {
199 1
            strategy: strategy.clone(),
200
            sync_mode,
201 1
            meta_id,
202 1
            id,
203 1
            inserted: Vec::new(),
204 1
            updated: Vec::new(),
205 1
            deleted: Vec::new(),
206 1
            read: HashMap::new(),
207 1
            segments_operations: Vec::new(),
208 1
            segs_created_names: HashSet::new(),
209 1
            segs_dropped_names: HashSet::new(),
210 1
            segs_created: HashSet::new(),
211 1
            segs_dropped: HashSet::new(),
212 1
            segs_updated: HashSet::new(),
213 1
            freed_pages: None,
214 1
            indexes: Some(IndexTransactionKeeper::new()),
215 1
            segs_new_pages: Vec::new(),
216 1
            locked_index_segs: HashSet::new(),
217 1
            locked_index_pages: HashSet::new(),
218 1
            locked_index_tracking: HashSet::new(),
219 0
        })
220 1
    }
221

222 1
    pub fn recover(id: JournalId) -> Transaction {
223 1
        Transaction {
224
            strategy: TxStrategy::LastWin,
225
            sync_mode: SyncMode::Sync,
226 1
            meta_id: Vec::new(),
227
            id,
228 1
            inserted: Vec::new(),
229 1
            updated: Vec::new(),
230 1
            deleted: Vec::new(),
231 1
            read: HashMap::new(),
232 1
            segments_operations: Vec::new(),
233 1
            segs_created_names: HashSet::new(),
234 1
            segs_dropped_names: HashSet::new(),
235 1
            segs_created: HashSet::new(),
236 1
            segs_dropped: HashSet::new(),
237 1
            segs_updated: HashSet::new(),
238 1
            freed_pages: None,
239 1
            indexes: Some(IndexTransactionKeeper::new()),
240 1
            segs_new_pages: Vec::new(),
241 1
            locked_index_segs: HashSet::new(),
242 1
            locked_index_pages: HashSet::new(),
243 1
            locked_index_tracking: HashSet::new(),
244 0
        }
245 1
    }
246

247 1
    pub fn segment_created_in_tx(&self, segment: u32) -> bool {
248 1
        self.segs_created.contains(&segment)
249 1
    }
250

251
    pub fn segment_name_by_id(&self, segment: u32) -> Option<String> {
252
        for info in &self.segments_operations {
253
            if let SegmentOperation::CREATE(ref c) = info {
254
                if c.segment_id == segment {
255
                    return Some(c.name.clone());
256
                }
257
            }
258
        }
259
        None
260
    }
261 1
    pub fn exists_segment(&self, segment: &str) -> TxSegCheck {
262 1
        if self.segs_created_names.contains(segment) {
263 1
            for a in &self.segments_operations {
264 1
                if let SegmentOperation::CREATE(ref c) = a {
265 1
                    if c.name == segment {
266 1
                        return TxSegCheck::CREATED(c.segment_id);
267
                    }
268
                }
269
            }
270 1
        } else if self.segs_dropped_names.contains(segment) {
271 1
            return TxSegCheck::DROPPED;
272
        }
273 1
        TxSegCheck::NONE
274 1
    }
275

276 1
    pub fn add_create_segment(&mut self, journal: &Journal, name: &str, segment_id: u32, first_page: u64) -> PRes<()> {
277 1
        let create = CreateSegment::new(name, segment_id, first_page);
278

279 1
        journal.log(&create, &self.id)?;
280 1
        self.segments_operations.push(SegmentOperation::CREATE(create));
281 1
        self.segs_created.insert(segment_id);
282 1
        self.segs_created_names.insert(name.into());
283 1
        Ok(())
284 1
    }
285

286 1
    pub fn recover_add(&mut self, create: &CreateSegment) {
287 1
        self.segments_operations.push(SegmentOperation::CREATE(create.clone()));
288 1
        self.segs_created.insert(create.segment_id);
289 1
        self.segs_created_names.insert(create.name.clone());
290 1
    }
291

292 1
    pub fn add_drop_segment(&mut self, journal: &Journal, name: &str, segment_id: u32) -> PRes<()> {
293 1
        if self.segs_created_names.contains(name) {
294 1
            Err(PersyError::CannotDropSegmentCreatedInTx)
295
        } else {
296 1
            let drop = DropSegment::new(name, segment_id);
297 1
            journal.log(&drop, &self.id)?;
298 1
            self.segments_operations.push(SegmentOperation::DROP(drop));
299 1
            self.segs_dropped.insert(segment_id);
300 1
            self.segs_dropped_names.insert(name.into());
301 1
            Ok(())
302 1
        }
303 1
    }
304

305 0
    pub fn recover_drop(&mut self, drop: &DropSegment) {
306 0
        self.segments_operations.push(SegmentOperation::DROP(drop.clone()));
307 0
        self.segs_dropped.insert(drop.segment_id);
308 0
        self.segs_dropped_names.insert(drop.name.clone());
309 0
    }
310

311 1
    pub fn add_read(&mut self, journal: &Journal, segment: u32, recref: &RecRef, version: u16) -> PRes<()> {
312 1
        if self.strategy == TxStrategy::VersionOnRead {
313 1
            let read = ReadRecord::new(segment, recref, version);
314 1
            journal.log(&read, &self.id)?;
315 1
            self.read.insert(recref.clone(), read);
316
        }
317 1
        Ok(())
318 1
    }
319

320 0
    pub fn recover_read(&mut self, read: &ReadRecord) {
321 0
        self.read.insert(read.recref.clone(), read.clone());
322 0
    }
323

324 1
    pub fn add_insert(&mut self, journal: &Journal, segment: u32, rec_ref: &RecRef, record: u64) -> PRes<()> {
325 1
        self.segs_updated.insert(segment);
326 1
        let insert = InsertRecord::new(segment, rec_ref, record);
327

328 1
        journal.log(&insert, &self.id)?;
329 1
        self.inserted.push(insert);
330 1
        Ok(())
331 1
    }
332 1
    pub fn add_new_segment_page(
333
        &mut self,
334
        journal: &Journal,
335
        segment: u32,
336
        new_page: u64,
337
        previous_page: u64,
338
    ) -> PRes<()> {
339 1
        let new_page = NewSegmentPage::new(segment, new_page, previous_page);
340

341 1
        journal.log(&new_page, &self.id)?;
342 1
        self.segs_new_pages.push(new_page);
343 1
        Ok(())
344 1
    }
345

346 1
    pub fn recover_insert(&mut self, insert: &InsertRecord) {
347 1
        self.segs_updated.insert(insert.segment);
348 1
        self.inserted.push(insert.clone());
349 1
    }
350

351 1
    pub fn add_update(
352
        &mut self,
353
        journal: &Journal,
354
        segment: u32,
355
        rec_ref: &RecRef,
356
        record: u64,
357
        version: u16,
358
    ) -> PRes<()> {
359 1
        self.segs_updated.insert(segment);
360 1
        let update = UpdateRecord::new(segment, rec_ref, record, version);
361 1
        journal.log(&update, &self.id)?;
362 1
        self.updated.push(update);
363 1
        Ok(())
364 1
    }
365

366 1
    pub fn recover_update(&mut self, update: &UpdateRecord) {
367 1
        self.segs_updated.insert(update.segment);
368 1
        self.updated.push(update.clone());
369 1
    }
370

371 1
    pub fn add_delete(&mut self, journal: &Journal, segment: u32, rec_ref: &RecRef, version: u16) -> PRes<()> {
372 1
        self.segs_updated.insert(segment);
373 1
        let delete = DeleteRecord::new(segment, rec_ref, version);
374 1
        journal.log(&delete, &self.id)?;
375 1
        self.deleted.push(delete);
376 1
        Ok(())
377 1
    }
378

379 1
    pub fn add_put<K, V>(&mut self, index: IndexId, k: K, v: V)
380
    where
381
        K: IndexType,
382
        V: IndexType,
383
    {
384 1
        if let Some(ref mut indexes) = self.indexes {
385 1
            indexes.put(index, k, v);
386
        }
387 1
    }
388

389 1
    pub fn add_remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
390
    where
391
        K: IndexType,
392
        V: IndexType,
393
    {
394 1
        if let Some(ref mut indexes) = self.indexes {
395 1
            indexes.remove(index, k, v);
396
        }
397 1
    }
398

399 1
    pub fn apply_changes<K, V>(
400
        &self,
401
        vm: ValueMode,
402
        index: IndexId,
403
        k: &K,
404
        pers: Option<Value<V>>,
405
    ) -> PRes<Option<Value<V>>>
406
    where
407
        K: IndexType,
408
        V: IndexType,
409
    {
410 1
        if let Some(ref indexes) = self.indexes {
411 1
            indexes.apply_changes(index, vm, k, pers)
412
        } else {
413 0
            Ok(pers)
414
        }
415 1
    }
416

417 1
    pub fn index_range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
418
    where
419
        K: IndexType,
420
        V: IndexType,
421
        R: RangeBounds<K>,
422
    {
423 1
        if let Some(ind) = &self.indexes {
424 1
            ind.range::<K, V, R>(index, range)
425
        } else {
426 0
            None
427
        }
428 1
    }
429

430 1
    pub fn recover_delete(&mut self, delete: &DeleteRecord) {
431 1
        self.segs_updated.insert(delete.segment);
432 1
        self.deleted.push(delete.clone());
433 1
    }
434

435 1
    pub fn scan_insert(&self, seg: u32) -> TransactionInsertScanner {
436 1
        TransactionInsertScanner { tx: self, segment: seg }
437 1
    }
438

439 1
    pub fn read(&self, rec_ref: &RecRef) -> TxRead {
440 1
        for ele in &self.deleted {
441 1
            if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
442 1
                return TxRead::DELETED;
443
            }
444
        }
445 1
        if let Some(ele) = self
446
            .updated
447
            .iter()
448
            .rev()
449 1
            .find(|ele| ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos)
450
        {
451 1
            return TxRead::RECORD((ele.record_page, ele.version));
452
        }
453 1
        for ele in &self.inserted {
454 1
            if ele.recref.page == rec_ref.page && ele.recref.pos == rec_ref.pos {
455 1
                return TxRead::RECORD((ele.record_page, 1));
456
            }
457
        }
458 1
        TxRead::NONE
459 1
    }
460

461 1
    pub fn recover_prepare(&mut self, persy_impl: &PersyImpl) -> PRes<PreparedState> {
462 1
        let address = persy_impl.address();
463

464 1
        let mut prepared = PreparedState::new();
465 1
        let _ = self.collapse_operations();
466 1
        let (records, crt_upd_segs, dropped_segs) = self.coll_locks();
467 1
        if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
468 0
            self.recover_rollback(persy_impl)?;
469 0
            return Err(x);
470 1
        }
471 1
        prepared.data_locks = Some((records.clone(), crt_upd_segs.clone(), dropped_segs));
472 1
        let check_version = self.strategy != TxStrategy::LastWin;
473 1
        if let Err(x) = address.check_persistent_records(&records, check_version) {
474 0
            self.recover_rollback(persy_impl)?;
475 0
            return Err(x);
476 1
        }
477 1
        if let Err(x) = address.confirm_allocations(&crt_upd_segs, true) {
478 0
            self.recover_rollback(persy_impl)?;
479 0
            return Err(x);
480 1
        }
481 1
        Ok(prepared)
482 1
    }
483

484 1
    fn solve_index_locks(&self) -> Vec<(u32, RecRef, u16)> {
485 1
        let mut records = HashSet::new();
486 1
        for update in &self.updated {
487 1
            if self.locked_index_pages.contains(&update.recref) {
488 1
                records.insert((update.segment, update.recref.clone(), update.version));
489
            }
490
        }
491

492 1
        for delete in &self.deleted {
493 1
            if self.locked_index_pages.contains(&delete.recref) {
494 0
                records.insert((delete.segment, delete.recref.clone(), delete.version));
495
            }
496
        }
497

498 1
        records.into_iter().collect()
499 1
    }
500

501 1
    pub fn prepare(mut self, persy_impl: &PersyImpl) -> PRes<(Transaction, PreparedState)> {
502 1
        let indexes = persy_impl.indexes();
503 1
        let allocator = persy_impl.allocator();
504 1
        let journal = persy_impl.journal();
505 1
        let snapshots = persy_impl.snapshots();
506 1
        let address = persy_impl.address();
507

508 1
        let mut prepared = PreparedState::new();
509 1
        let ind = self.indexes;
510 1
        self.indexes = None;
511 1
        if let Some(mut ind_change) = ind {
512
            // TODO: handle dropped indexes in the same transaction
513 1
            let changed_indexes = ind_change.changed_indexes();
514 1
            for check in changed_indexes {
515 1
                let segment_meta = index_id_to_segment_id_meta(&check);
516 1
                if self.segs_dropped.contains(&segment_meta.id) {
517 0
                    ind_change.remove_changes(&check);
518
                }
519 1
            }
520

521 1
            let mut to_lock = ind_change.changed_indexes();
522 1
            to_lock.sort();
523 1
            if let Err(err) = indexes.read_lock_all(&to_lock) {
524 0
                prepared.locked_indexes = Some(to_lock);
525 1
                self.rollback_prepared(persy_impl, prepared)?;
526 0
                return Err(err);
527 1
            }
528 1
            for index_id in &to_lock {
529 1
                let segment_meta = index_id_to_segment_id_meta(index_id);
530 1
                address.acquire_segment_read_lock(segment_meta.id)?;
531 1
                self.locked_index_segs.insert(segment_meta.id);
532 1
                let segment_data = index_id_to_segment_id_data(index_id);
533 1
                address.acquire_segment_read_lock(segment_data.id)?;
534 1
                self.locked_index_segs.insert(segment_data.id);
535
            }
536 1
            prepared.locked_indexes = Some(to_lock);
537 1
            if let Err(x) = ind_change.apply(persy_impl, &mut self) {
538 1
                self.rollback_prepared(persy_impl, prepared)?;
539 1
                return Err(x);
540 1
            }
541 1
        }
542

543 1
        let mut freed_pages = self.collapse_operations();
544

545 1
        let (mut records, crt_upd_segs, dropped_segs) = self.coll_locks();
546 1
        if let Err(x) = address.acquire_locks(&records, &crt_upd_segs, &dropped_segs) {
547 1
            self.rollback_prepared(persy_impl, prepared)?;
548 1
            return Err(x);
549 1
        };
550 1
        records.extend_from_slice(&self.solve_index_locks());
551 1
        prepared.data_locks = Some((records.clone(), crt_upd_segs, dropped_segs));
552

553 1
        let check_version = self.strategy != TxStrategy::LastWin;
554 1
        let old_records = match address.check_persistent_records(&records, check_version) {
555 1
            Ok(old) => old,
556 1
            Err(x) => {
557 1
                self.rollback_prepared(persy_impl, prepared)?;
558 1
                return Err(x);
559 1
            }
560 0
        };
561 1
        let segs: Vec<_> = self.segs_updated.iter().copied().collect();
562 1
        if let Err(x) = address.confirm_allocations(&segs, false) {
563 0
            self.rollback_prepared(persy_impl, prepared)?;
564 0
            return Err(x);
565 1
        }
566

567 1
        for dropped_seg in &self.segs_dropped {
568 1
            let pages = address.collect_segment_pages(*dropped_seg)?;
569 1
            for p in pages.into_iter().map(FreedPage::new) {
570 1
                freed_pages.insert(p);
571 1
            }
572 1
        }
573 1
        let mut snapshot_entries = Vec::with_capacity(old_records.len());
574 1
        for old_record in &old_records {
575 1
            freed_pages.insert(FreedPage::new(old_record.record_page));
576 1
            snapshot_entries.push(SnapshotEntry::change(
577
                &old_record.recref,
578 1
                old_record.record_page,
579
                old_record.version,
580
            ));
581
        }
582 1
        for insert in &self.inserted {
583 1
            snapshot_entries.push(SnapshotEntry::insert(&insert.recref));
584
        }
585 1
        for freed_page in &freed_pages {
586 1
            journal.log(freed_page, &self.id)?;
587
        }
588 1
        let mut freed_pages_vec: Vec<_> = freed_pages.into_iter().collect();
589 1
        freed_pages_vec.reverse();
590 1
        let snapshot_id = snapshots.snapshot(snapshot_entries, freed_pages_vec.clone(), self.id.clone())?;
591 1
        prepared.snapshot_id = Some(snapshot_id);
592 1
        self.freed_pages = Some(freed_pages_vec);
593 1
        journal.prepare(&PrepareCommit::new(), &self.id)?;
594 1
        allocator.flush_free_list()?;
595 1
        self.sync(persy_impl, snapshot_id)?;
596 1
        journal.clear_in_queque()?;
597 1
        Ok((self, prepared))
598 1
    }
599

600 1
    fn sync(&self, persy_impl: &PersyImpl, snapshot_id: SnapshotId) -> PRes<()> {
601 1
        persy_impl.transaction_sync(&self.sync_mode, snapshot_id)
602 1
    }
603

604 1
    fn collapse_operations(&mut self) -> BTreeSet<FreedPage> {
605 1
        let mut pages_to_free = BTreeSet::new();
606 1
        let mut inserted_by_id = HashMap::new();
607 1
        for insert in self.inserted.drain(..) {
608 1
            inserted_by_id.insert(insert.recref.clone(), insert);
609 1
        }
610

611 1
        let mut updated_by_id = HashMap::new();
612 1
        for update in self.updated.drain(..) {
613 1
            match updated_by_id.entry(update.recref.clone()) {
614 1
                Entry::Vacant(e) => {
615 1
                    e.insert(update);
616
                }
617 1
                Entry::Occupied(mut e) => {
618 1
                    pages_to_free.insert(FreedPage::new(e.get().record_page));
619 1
                    e.get_mut().record_page = update.record_page;
620
                }
621
            }
622 1
        }
623

624 1
        for (k, insert) in &mut inserted_by_id {
625 1
            if let Some(update) = updated_by_id.remove(&k) {
626 1
                pages_to_free.insert(FreedPage::new(insert.record_page));
627 1
                insert.record_page = update.record_page;
628
            }
629
        }
630

631 1
        let mut i = 0;
632 1
        while i != self.deleted.len() {
633 1
            if let Some(insert) = inserted_by_id.remove(&self.deleted[i].recref) {
634 1
                self.deleted.remove(i);
635 1
                pages_to_free.insert(FreedPage::new(insert.record_page));
636
            } else {
637 1
                i += 1;
638
            }
639
        }
640

641 1
        for delete in &self.deleted {
642 1
            if let Some(update) = updated_by_id.remove(&delete.recref) {
643 1
                pages_to_free.insert(FreedPage::new(update.record_page));
644
            }
645
        }
646

647 1
        for (_, insert) in inserted_by_id.drain() {
648 1
            if self.segs_dropped.contains(&insert.segment) {
649 1
                pages_to_free.insert(FreedPage::new(insert.record_page));
650
            } else {
651 1
                self.inserted.push(insert);
652
            }
653 1
        }
654

655 1
        for (_, update) in updated_by_id.drain() {
656 1
            if self.segs_dropped.contains(&update.segment) {
657 0
                pages_to_free.insert(FreedPage::new(update.record_page));
658
            } else {
659 1
                self.updated.push(update);
660
            }
661 1
        }
662 1
        pages_to_free
663 1
    }
664

665 1
    fn coll_locks(&self) -> (Vec<(u32, RecRef, u16)>, Vec<u32>, Vec<u32>) {
666 1
        let mut crt_upd_segs = Vec::new();
667 1
        for create in &self.segs_created {
668 1
            if !&self.segs_dropped.contains(create) && !self.locked_index_segs.contains(create) {
669 1
                crt_upd_segs.push(*create);
670
            }
671
        }
672 1
        for update in &self.segs_updated {
673 1
            if !&self.segs_dropped.contains(update) && !self.locked_index_segs.contains(update) {
674 1
                crt_upd_segs.push(*update);
675
            }
676
        }
677

678 1
        let mut dropped_segs: Vec<_> = self.segs_dropped.iter().copied().collect();
679 1
        let mut records = HashSet::new();
680

681
        // No need to lock on inserted records the new id unique is managed by the address.
682
        //
683 1
        for update in &self.updated {
684 1
            let mut version = update.version;
685
            // I found values in the read only for VersionOnRead
686 1
            if let Some(read_v) = self.read.get(&update.recref) {
687 1
                version = read_v.version;
688
            }
689 1
            if !self.locked_index_pages.contains(&update.recref) {
690 1
                records.insert((update.segment, update.recref.clone(), version));
691
            }
692
        }
693

694 1
        for delete in &self.deleted {
695 1
            let mut version = delete.version;
696
            // I found values in the read only for VersionOnRead
697 1
            if let Some(read_v) = self.read.get(&delete.recref) {
698 0
                version = read_v.version;
699
            }
700 1
            if !self.locked_index_pages.contains(&delete.recref) {
701 1
                records.insert((delete.segment, delete.recref.clone(), version));
702
            }
703
        }
704

705 1
        for insert in &self.inserted {
706 1
            records.remove(&(insert.segment, insert.recref.clone(), 1));
707
        }
708

709 1
        let mut records: Vec<(u32, RecRef, u16)> = records.into_iter().collect();
710 1
        records.sort_by_key(|ref x| x.1.clone());
711 1
        crt_upd_segs.sort();
712 1
        dropped_segs.sort();
713 1
        (records, crt_upd_segs, dropped_segs)
714 1
    }
715

716 1
    fn internal_rollback(&self, persy_impl: &PersyImpl) -> PRes<Vec<(u32, u64)>> {
717 1
        let allocator = persy_impl.allocator();
718 1
        let address = persy_impl.address();
719

720 1
        let dropped_segs: Vec<_> = self.segs_created.iter().copied().collect();
721 1
        let address_to_free = address.rollback(&self.inserted)?;
722 1
        for insert in &self.inserted {
723 1
            if dropped_segs.contains(&insert.segment) {
724 1
                allocator.free(insert.record_page)?;
725
            }
726
        }
727

728 1
        for create in &self.segs_created {
729 1
            address.drop_temp_segment(*create)?;
730
        }
731

732 1
        for update in &self.updated {
733 1
            if dropped_segs.contains(&update.segment) {
734 0
                allocator.free(update.record_page)?;
735
            }
736
        }
737 1
        Ok(address_to_free)
738 1
    }
739

740 1
    pub fn recover_rollback(&self, persy_impl: &PersyImpl) -> PRes<()> {
741 1
        let allocator = persy_impl.allocator();
742 1
        let journal = persy_impl.journal();
743 1
        let address = persy_impl.address();
744

745 1
        journal.end(&Rollback::new(), &self.id)?;
746 1
        let address_to_free = self.internal_rollback(persy_impl)?;
747 1
        journal.finished_to_clean(&[self.id.clone()])?;
748 1
        if !address_to_free.is_empty() {
749 0
            address.clear_empty(&address_to_free)?;
750 0
            for (_, page) in address_to_free {
751 0
                allocator.free(page)?;
752 0
            }
753
        }
754 1
        Ok(())
755 1
    }
756

757 1
    pub fn rollback(&mut self, persy_impl: &PersyImpl) -> PRes<()> {
758 1
        let journal = persy_impl.journal();
759 1
        journal.end(&Rollback::new(), &self.id)?;
760 1
        let address_to_free = self.internal_rollback(persy_impl)?;
761 1
        journal.finished_to_clean(&[self.id.clone()])?;
762 1
        self.free_address_structures(address_to_free, persy_impl)?;
763 1
        Ok(())
764 1
    }
765

766 1
    pub fn rollback_prepared(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PRes<()> {
767 1
        let indexes = persy_impl.indexes();
768 1
        let allocator = persy_impl.allocator();
769 1
        let journal = persy_impl.journal();
770 1
        let snapshots = persy_impl.snapshots();
771 1
        let address = persy_impl.address();
772

773 1
        journal.end(&Rollback::new(), &self.id)?;
774

775 1
        let address_to_free = address.rollback(&self.inserted)?;
776 1
        self.internal_rollback(persy_impl)?;
777

778 1
        let mut indexes_to_unlock = self.locked_index_pages.clone();
779 1
        if let Some((records, crt_upd_segs, delete_segs)) = &prepared.data_locks {
780 1
            for rec in records {
781 1
                indexes_to_unlock.remove(&rec.1);
782
            }
783 1
            address.release_locks(records.iter().map(|(_, id, _)| id), crt_upd_segs, delete_segs)?;
784
        }
785

786 1
        address.release_locks(
787 1
            indexes_to_unlock.iter(),
788 1
            &self.locked_index_segs.iter().copied().collect::<Vec<_>>(),
789 1
            &[],
790 1
        )?;
791

792 1
        if let Some(il) = &prepared.locked_indexes {
793 1
            indexes.read_unlock_all(il)?;
794
        }
795 1
        self.free_address_structures(address_to_free, persy_impl)?;
796 1
        if let Some(snapshot_id) = prepared.snapshot_id {
797 1
            release_snapshot(snapshot_id, snapshots, allocator, journal)?;
798
        }
799 1
        Ok(())
800 1
    }
801

802 1
    pub fn recover_commit(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PRes<()> {
803 1
        let allocator = persy_impl.allocator();
804 1
        let journal = persy_impl.journal();
805 1
        let address = persy_impl.address();
806

807 1
        let address_to_free = self.internal_commit(persy_impl, true, &prepared)?;
808 1
        journal.end(&Commit::new(), &self.id)?;
809 1
        if !address_to_free.is_empty() {
810 0
            address.clear_empty(&address_to_free)?;
811 0
            for (_, page) in address_to_free {
812 0
                allocator.recover_free(page)?;
813 0
            }
814
        }
815 1
        self.recover_cleanup(persy_impl)
816 1
    }
817

818 1
    pub fn recover_cleanup(&self, persy_impl: &PersyImpl) -> PRes<()> {
819 1
        let allocator = persy_impl.allocator();
820 1
        if let Some(ref up_free) = self.freed_pages {
821 1
            for to_free in up_free {
822 1
                allocator.recover_free(to_free.page)?;
823
            }
824
        }
825 1
        persy_impl.allocator().disc().sync()?;
826 1
        Ok(())
827 1
    }
828

829 1
    fn free_pages_tx(&self, journal: &Journal, pages_to_free: &[(u32, u64)]) -> PRes<(JournalId, Vec<FreedPage>)> {
830 1
        let id = journal.start()?;
831 1
        let mut freed = Vec::new();
832 1
        for (_, page) in pages_to_free {
833 1
            let fp = FreedPage::new(*page);
834 1
            journal.log(&fp, &id)?;
835 1
            freed.push(fp);
836
        }
837 1
        journal.log(&PrepareCommit::new(), &id)?;
838 1
        journal.end(&Commit::new(), &id)?;
839 1
        Ok((id, freed))
840 1
    }
841

842 1
    fn internal_commit(
843
        &mut self,
844
        persy_impl: &PersyImpl,
845
        recover: bool,
846
        prepared: &PreparedState,
847
    ) -> PRes<Vec<(u32, u64)>> {
848 1
        let indexes = persy_impl.indexes();
849 1
        let address = persy_impl.address();
850

851 1
        let pages_to_unlink = address.apply(
852 1
            &self.segs_new_pages,
853 1
            &self.inserted,
854 1
            &self.updated,
855 1
            &self.deleted,
856 1
            &self.segments_operations,
857
            recover,
858 1
        )?;
859 1
        let mut indexes_to_unlock = self.locked_index_pages.clone();
860 1
        if let Some((records, crt_upd_segs, deleted_segs)) = &prepared.data_locks {
861 1
            for rec in records {
862 1
                indexes_to_unlock.remove(&rec.1);
863
            }
864 1
            address.release_locks(records.iter().map(|(_, id, _)| id), &crt_upd_segs, &deleted_segs)?;
865
        }
866

867 1
        address.release_locks(
868 1
            indexes_to_unlock.iter(),
869 1
            &self.locked_index_segs.iter().copied().collect::<Vec<_>>(),
870 1
            &[],
871 1
        )?;
872

873 1
        if let Some(il) = &prepared.locked_indexes {
874 1
            indexes.read_unlock_all(il)?;
875
        }
876

877 1
        Ok(pages_to_unlink)
878 1
    }
879

880 1
    fn free_address_structures(&self, address_to_free: Vec<(u32, u64)>, persy_impl: &PersyImpl) -> PRes<()> {
881 1
        let allocator = persy_impl.allocator();
882 1
        let journal = persy_impl.journal();
883 1
        let snapshots = persy_impl.snapshots();
884 1
        let address = persy_impl.address();
885 1
        if !address_to_free.is_empty() {
886 1
            let (tx_id, freed) = self.free_pages_tx(journal, &address_to_free)?;
887 1
            address.clear_empty(&address_to_free)?;
888 1
            let add_snap_id = snapshots.snapshot(Vec::new(), freed, tx_id)?;
889 1
            self.sync(persy_impl, add_snap_id)?;
890
            // Is a only free page snapshot not a logic snapshot, release straight away, the
891
            // purpose of this is to delay the free of pages till are not in use anymore
892 1
            release_snapshot(add_snap_id, snapshots, allocator, journal)?;
893 1
        }
894 1
        Ok(())
895 1
    }
896

897 1
    pub fn commit(&mut self, persy_impl: &PersyImpl, prepared: PreparedState) -> PRes<()> {
898 1
        let allocator = persy_impl.allocator();
899 1
        let journal = persy_impl.journal();
900 1
        let snapshots = persy_impl.snapshots();
901

902 1
        let address_to_free = self.internal_commit(persy_impl, false, &prepared)?;
903 1
        journal.end(&Commit::new(), &self.id)?;
904 1
        self.free_address_structures(address_to_free, persy_impl)?;
905

906 1
        if let Some(snapshot_id) = prepared.snapshot_id {
907 1
            release_snapshot(snapshot_id, snapshots, allocator, journal)?;
908
        }
909 1
        Ok(())
910 1
    }
911

912 1
    pub fn recover_metadata(&mut self, metadata: &Metadata) {
913 1
        self.strategy = metadata.strategy.clone();
914 1
        self.meta_id = metadata.meta_id.clone();
915 1
    }
916

917 1
    pub fn recover_freed_page(&mut self, freed: &FreedPage) {
918 1
        self.freed_pages.get_or_insert(Vec::new()).push(freed.clone());
919 1
    }
920

921 0
    pub fn recover_new_segment_page(&mut self, new_page: &NewSegmentPage) {
922 0
        self.segs_new_pages.push(new_page.clone());
923 0
    }
924

925 1
    pub fn meta_id<'a>(&'a self) -> &'a Vec<u8> {
926
        &self.meta_id
927 1
    }
928

929 1
    pub fn filter_list<'a>(&'a self, pers: &'a [(String, u32)]) -> impl Iterator<Item = (&'a str, u32)> + 'a {
930 1
        let outer = pers.iter().map(|(name, id)| (name.as_str(), *id));
931 1
        let inner = self.segments_operations.iter().filter_map(|seg| {
932 1
            if let SegmentOperation::CREATE(c) = seg {
933 1
                Some((c.name.as_str(), c.segment_id))
934
            } else {
935 1
                None
936
            }
937 1
        });
938

939 1
        outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
940 1
    }
941

942 1
    pub fn lock_record(&mut self, address: &Address, segment: u32, id: &RecRef, version: u16) -> PRes<bool> {
943 1
        let locked_page = if !self.locked_index_pages.contains(id) {
944 1
            address.acquire_record_lock(id)?;
945 1
            true
946
        } else {
947 0
            false
948
        };
949

950 1
        if address
951 1
            .check_persistent_records(&[(segment, id.clone(), version)], true)
952 1
            .is_ok()
953
        {
954 1
            if locked_page {
955 1
                self.locked_index_pages.insert(id.clone());
956 1
                self.locked_index_tracking.insert((segment, id.clone(), version));
957
            }
958 1
            Ok(true)
959
        } else {
960 1
            if locked_page {
961 1
                address.release_record_lock(id)?;
962
            }
963 1
            Ok(false)
964
        }
965 1
    }
966

967 1
    pub fn unlock_record(&mut self, address: &Address, _segment: u32, id: &RecRef) -> PRes<()> {
968 1
        assert!(self.locked_index_pages.remove(id));
969 1
        address.release_record_lock(id)?;
970 1
        Ok(())
971 1
    }
972
}
973

974
impl DeleteRecord {
975 1
    pub fn new(segment: u32, rec_ref: &RecRef, version: u16) -> DeleteRecord {
976 1
        DeleteRecord {
977
            segment,
978 1
            recref: rec_ref.clone(),
979
            version,
980
        }
981 1
    }
982
}
983

984
impl UpdateRecord {
985 1
    pub fn new(segment: u32, rec_ref: &RecRef, record: u64, version: u16) -> UpdateRecord {
986 1
        UpdateRecord {
987
            segment,
988 1
            recref: rec_ref.clone(),
989
            record_page: record,
990
            version,
991
        }
992 1
    }
993
}
994

995
impl ReadRecord {
996 1
    pub fn new(segment: u32, recref: &RecRef, version: u16) -> ReadRecord {
997 1
        ReadRecord {
998
            segment,
999 1
            recref: recref.clone(),
1000
            version,
1001
        }
1002 1
    }
1003
}
1004

1005
impl PrepareCommit {
1006 0
    pub fn new() -> PrepareCommit {
1007
        PrepareCommit {}
1008 1
    }
1009
}
1010

1011
impl Commit {
1012 0
    pub fn new() -> Commit {
1013
        Commit {}
1014 1
    }
1015
}
1016

1017
impl Rollback {
1018 0
    pub fn new() -> Rollback {
1019
        Rollback {}
1020 1
    }
1021
}
1022

1023
impl Cleanup {
1024 0
    pub fn new() -> Cleanup {
1025
        Cleanup {}
1026 1
    }
1027
}
1028

1029
impl InsertRecord {
1030 1
    pub fn new(segment: u32, rec_ref: &RecRef, record: u64) -> InsertRecord {
1031 1
        InsertRecord {
1032
            segment,
1033 1
            recref: rec_ref.clone(),
1034
            record_page: record,
1035
        }
1036 1
    }
1037
}
1038

1039
impl CreateSegment {
1040 1
    pub fn new(name: &str, segment_id: u32, first_page: u64) -> CreateSegment {
1041 1
        CreateSegment {
1042 1
            name: name.into(),
1043
            segment_id,
1044
            first_page,
1045
        }
1046 1
    }
1047
}
1048

1049
impl DropSegment {
1050 1
    pub fn new(name: &str, segment_id: u32) -> DropSegment {
1051 1
        DropSegment {
1052 1
            name: name.into(),
1053
            segment_id,
1054
        }
1055 1
    }
1056
}
1057

1058
impl Metadata {
1059 1
    pub fn new(strategy: &TxStrategy, meta_id: Vec<u8>) -> Metadata {
1060 1
        Metadata {
1061 1
            strategy: strategy.clone(),
1062 1
            meta_id,
1063
        }
1064 1
    }
1065
}
1066

1067
impl FreedPage {
1068 1
    pub fn new(page: u64) -> FreedPage {
1069 1
        FreedPage { page }
1070 1
    }
1071
}
1072

1073
impl NewSegmentPage {
1074 1
    pub fn new(segment: u32, page: u64, previous: u64) -> NewSegmentPage {
1075 1
        NewSegmentPage {
1076
            segment,
1077
            page,
1078
            previous,
1079
        }
1080 1
    }
1081
}
1082

1083
#[cfg(test)]
1084
mod tests {
1085
    use super::{DeleteRecord, FreedPage, InsertRecord, Transaction, UpdateRecord};
1086
    use crate::{id::RecRef, journal::JournalId};
1087

1088
    #[test]
1089 1
    fn test_scan_insert() {
1090 1
        let mut tx = Transaction::recover(JournalId::new(0, 0));
1091 1
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 2), 2));
1092 1
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(4, 2), 2));
1093 1
        tx.inserted.push(InsertRecord::new(20, &RecRef::new(0, 1), 3));
1094 1
        let mut count = 0;
1095 1
        for x in tx.scan_insert(10) {
1096 1
            assert_eq!(x.pos, 2);
1097 1
            count += 1;
1098 1
        }
1099 1
        assert_eq!(count, 2);
1100 1
    }
1101

1102
    #[test]
1103 1
    fn test_collapse() {
1104 1
        let mut tx = Transaction::recover(JournalId::new(0, 0));
1105 1
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 1), 1));
1106 1
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 2), 2));
1107 1
        tx.inserted.push(InsertRecord::new(20, &RecRef::new(3, 3), 3));
1108 1
        tx.inserted.push(InsertRecord::new(20, &RecRef::new(3, 4), 4));
1109 1
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 1), 5, 1));
1110 1
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 1), 6, 1));
1111 1
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 2), 7, 1));
1112 1
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 5), 8, 1));
1113 1
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 5), 9, 1));
1114 1
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 6), 10, 1));
1115 1
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 7), 11, 1));
1116 1
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 1), 0));
1117 1
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 3), 1));
1118 1
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 6), 2));
1119 1
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 8), 2));
1120 1
        let free = tx.collapse_operations();
1121 1
        assert_eq!(free.len(), 7);
1122 1
        for e in [1, 2, 3, 5, 6, 8, 10].iter().map(|x| FreedPage::new(*x)) {
1123 1
            assert!(free.contains(&e));
1124
        }
1125 1
        assert_eq!(tx.inserted.len(), 2);
1126 1
        assert_eq!(tx.updated.len(), 2);
1127 1
        assert_eq!(tx.deleted.len(), 2);
1128 1
    }
1129
}

Read our documentation on viewing source code .

Loading