1
use fs2::FileExt;
2
pub use std::fs::OpenOptions;
3
use std::{
4
    collections::HashMap,
5
    fs::File,
6
    mem::replace,
7
    ops::{Bound, RangeBounds},
8
    path::Path,
9
    str,
10
    sync::Arc,
11
};
12

13
pub use crate::id::RecRef;
14
use crate::index::{
15
    config::{IndexType, IndexTypeId, Indexes, ValueMode, INDEX_DATA_PREFIX, INDEX_META_PREFIX},
16
    keeper::{IndexKeeper, IndexRawIter},
17
    tree::{
18
        nodes::{PageIter, PageIterBack, Value},
19
        Index,
20
    },
21
};
22
use crate::transaction::{
23
    PreparedState, SyncMode, Transaction, TxRead,
24
    TxSegCheck::{CREATED, DROPPED, NONE},
25
};
26
use crate::{
27
    address::Address,
28
    allocator::Allocator,
29
    config::{Config, TransactionConfig},
30
    discref::{Device, DiscRef, MemRef, PageOps, PAGE_METADATA_SIZE},
31
    error::{PRes, PersyError},
32
    id::{IndexId, PersyId, SegmentId, ToIndexId, ToSegmentId},
33
    io::{
34
        InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite, InfallibleWriteFormat,
35
        InfallibleWriteVarInt,
36
    },
37
    journal::{Journal, JournalId, JOURNAL_PAGE_EXP},
38
    record_scanner::{SegmentRawIter, SegmentSnapshotRawIter, TxSegmentRawIter},
39
    snapshot::{release_snapshot, EntryCase, SegmentSnapshop, SnapshotId, Snapshots},
40
};
41

42
#[cfg(feature = "background_ops")]
43
use crate::background::BackgroundOps;
44

45
const DEFAULT_PAGE_EXP: u8 = 10; // 2^10
46

47
pub struct PersyImpl {
48
    config: Arc<Config>,
49
    journal: Arc<Journal>,
50
    address: Address,
51
    indexes: Indexes,
52
    allocator: Arc<Allocator>,
53
    snapshots: Arc<Snapshots>,
54
    #[cfg(feature = "background_ops")]
55
    background_ops: BackgroundOps<SnapshotId>,
56
}
57

58
pub struct TxFinalize {
59
    transaction: Transaction,
60
    prepared: PreparedState,
61
}
62

63
/// Possible state of a transaction in the log
64 1
#[derive(PartialEq, Debug, Clone)]
65
pub enum RecoverStatus {
66
    /// Started but not completed
67
    Started,
68
    /// Successfully prepared
69
    PrepareCommit,
70
    /// rollback-ed
71
    Rollback,
72
    /// Successfully committed after prepared
73
    Commit,
74
    /// Successfully cleaned up resources after commit
75
    Cleanup,
76
}
77

78
/// Possible option for recover a transaction in prepared commit state
79 0
#[derive(PartialEq, Debug)]
80
pub enum CommitStatus {
81
    Rollback,
82
    Commit,
83
}
84
pub struct RecoverImpl {
85
    tx_id: HashMap<Vec<u8>, JournalId>,
86
    transactions: HashMap<JournalId, (RecoverStatus, Transaction, Option<CommitStatus>)>,
87
    order: Vec<JournalId>,
88
    journal_pages: Vec<u64>,
89
}
90

91
impl RecoverImpl {
92 1
    pub fn apply<C>(&mut self, recover: C) -> PRes<()>
93
    where
94
        C: Fn(&Vec<u8>) -> bool,
95
    {
96 1
        for (id, status) in self.list_transactions()? {
97 1
            if status == RecoverStatus::PrepareCommit {
98 0
                if recover(&id) {
99 0
                    self.commit(id)?;
100
                } else {
101 0
                    self.rollback(id)?;
102
                }
103
            }
104 1
        }
105 1
        Ok(())
106 1
    }
107

108 1
    pub fn list_transactions(&self) -> PRes<Vec<(Vec<u8>, RecoverStatus)>> {
109 1
        let mut res = Vec::new();
110 1
        for id in &self.order {
111 1
            if let Some((id, status)) = self
112
                .transactions
113
                .get(id)
114 1
                .map(|(s, tx, _)| (tx.meta_id().clone(), s.clone()))
115
            {
116 1
                res.push((id, status));
117
            }
118 1
        }
119 1
        Ok(res)
120 1
    }
121

122
    pub fn status(&self, tx_id: Vec<u8>) -> PRes<Option<RecoverStatus>> {
123
        if let Some(id) = self.tx_id.get(&tx_id) {
124
            Ok(self.transactions.get(id).map(|(s, _, _)| s.clone()))
125
        } else {
126
            Ok(None)
127
        }
128
    }
129

130 0
    pub fn commit(&mut self, tx_id: Vec<u8>) -> PRes<()> {
131 0
        if let Some(id) = self.tx_id.get(&tx_id) {
132 0
            if let Some(tx) = self.transactions.get_mut(id) {
133 0
                tx.2 = Some(CommitStatus::Commit);
134
            }
135
        }
136 0
        Ok(())
137 0
    }
138

139 0
    pub fn rollback(&mut self, tx_id: Vec<u8>) -> PRes<()> {
140 0
        if let Some(id) = self.tx_id.get(&tx_id) {
141 0
            if let Some(tx) = self.transactions.get_mut(id) {
142 0
                tx.2 = Some(CommitStatus::Rollback);
143
            }
144
        }
145 0
        Ok(())
146 0
    }
147
}
148

149
#[derive(Clone)]
150
/// Index definition details
151
pub struct IndexInfo {
152
    pub id: IndexId,
153
    pub value_mode: ValueMode,
154
    pub key_type: IndexTypeId,
155
    pub value_type: IndexTypeId,
156
}
157

158
#[cfg(feature = "background_ops")]
159 1
fn create_background_ops(
160
    journal: Arc<Journal>,
161
    allocator: Arc<Allocator>,
162
    snapshots: Arc<Snapshots>,
163
) -> PRes<BackgroundOps<SnapshotId>> {
164 1
    let all_sync = allocator.clone();
165 1
    BackgroundOps::new(
166 1
        move || all_sync.disc().sync(),
167 1
        move |all: &[SnapshotId]| {
168 1
            for snap in all {
169 1
                release_snapshot(*snap, &snapshots, &allocator, &journal)?;
170
            }
171 1
            Ok(())
172 1
        },
173
    )
174 0
}
175

176
impl PersyImpl {
177 1
    pub fn create(path: &Path) -> PRes<()> {
178 1
        let f = OpenOptions::new().write(true).read(true).create_new(true).open(path)?;
179 1
        PersyImpl::create_from_file(f)
180 1
    }
181

182 1
    pub fn create_from_file(f: File) -> PRes<()> {
183 1
        f.try_lock_exclusive()?;
184 1
        PersyImpl::init_file(f)?;
185 1
        Ok(())
186 1
    }
187

188 1
    fn init_file(fl: File) -> PRes<()> {
189 1
        PersyImpl::init(Box::new(DiscRef::new(fl)?))?;
190 1
        Ok(())
191 1
    }
192

193 1
    fn init(device: Box<dyn Device>) -> PRes<Box<dyn Device>> {
194
        // root_page is every time 0
195 1
        let root_page = device.create_page_raw(DEFAULT_PAGE_EXP)?;
196 1
        let (allocator_page, allocator) = Allocator::init(device, &Config::new())?;
197 1
        let address_page = Address::init(&allocator)?;
198 1
        let journal_page = Journal::init(&allocator)?;
199
        {
200 1
            let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
201
            // Version of the disc format
202 1
            root.write_u16(0);
203
            // Position of the start of address structure
204 1
            root.write_u64(address_page);
205
            // Start of the Log data, if shutdown well this will be every time 0
206 1
            root.write_u64(journal_page);
207 1
            root.write_u64(allocator_page);
208 1
            allocator.flush_page(root)?;
209
            // TODO: check this never go over the first page
210 1
        }
211 1
        allocator.disc().sync()?;
212 1
        Ok(allocator.release())
213 1
    }
214

215 1
    fn new(disc: Box<dyn Device>, config: Config) -> PRes<PersyImpl> {
216 1
        let address_page;
217
        let journal_page;
218
        let allocator_page;
219
        {
220 1
            let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
221 1
            pg.read_u16(); //THIS NOW is 0 all the times
222 1
            address_page = pg.read_u64();
223 1
            journal_page = pg.read_u64();
224 1
            allocator_page = pg.read_u64();
225 1
        }
226 1
        let config = Arc::new(config);
227 1
        let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
228 1
        let address = Address::new(&allocator, &config, address_page)?;
229 1
        let journal = Arc::new(Journal::new(&allocator, journal_page)?);
230 1
        let indexes = Indexes::new(&config);
231 1
        let snapshots = Arc::new(Snapshots::new());
232
        #[cfg(feature = "background_ops")]
233 1
        let background_ops = create_background_ops(journal.clone(), allocator.clone(), snapshots.clone())?;
234 1
        Ok(PersyImpl {
235 1
            config,
236 1
            journal,
237 1
            address,
238 1
            indexes,
239 1
            allocator,
240 1
            snapshots,
241
            #[cfg(feature = "background_ops")]
242 1
            background_ops,
243
        })
244 1
    }
245

246 1
    fn recover(&self) -> PRes<RecoverImpl> {
247 1
        let mut commit_order = Vec::new();
248 1
        let mut transactions = HashMap::new();
249 1
        let journal = &self.journal;
250 1
        let jp = journal.recover(|record, id| {
251 1
            let tx = transactions
252 1
                .entry(id.clone())
253 1
                .or_insert_with(|| (RecoverStatus::Started, Transaction::recover(id.clone()), None));
254 1
            tx.0 = match record.recover(&mut tx.1) {
255 1
                Err(_) => RecoverStatus::Rollback,
256 1
                Ok(_) if tx.0 == RecoverStatus::Rollback => RecoverStatus::Rollback,
257 1
                Ok(x) => match x {
258 1
                    RecoverStatus::Started => RecoverStatus::Started,
259
                    RecoverStatus::PrepareCommit => {
260 1
                        commit_order.push(id.clone());
261 1
                        RecoverStatus::PrepareCommit
262
                    }
263 1
                    RecoverStatus::Rollback => RecoverStatus::Rollback,
264
                    RecoverStatus::Commit => {
265 1
                        commit_order.push(id.clone());
266 1
                        RecoverStatus::Commit
267
                    }
268 1
                    RecoverStatus::Cleanup => RecoverStatus::Cleanup,
269
                },
270
            }
271 1
        })?;
272

273 1
        let mut transactions_id = HashMap::new();
274 1
        for (id, (_, tx, _)) in &transactions {
275 1
            transactions_id.insert(tx.meta_id().clone(), id.clone());
276
        }
277 1
        Ok(RecoverImpl {
278 1
            tx_id: transactions_id,
279 1
            transactions,
280 1
            order: commit_order,
281 1
            journal_pages: jp,
282
        })
283 1
    }
284 1
    pub fn final_recover(&self, mut recover: RecoverImpl) -> PRes<()> {
285 1
        let mut last_id = None;
286 1
        let allocator = &self.allocator;
287 1
        for id in recover.order {
288 1
            if let Some((status, mut tx, choosed)) = recover.transactions.remove(&id) {
289 1
                if status == RecoverStatus::PrepareCommit {
290 0
                    if choosed == Some(CommitStatus::Commit) || choosed.is_none() {
291 0
                        let prepared = tx.recover_prepare(self)?;
292 0
                        tx.recover_commit(self, prepared)?;
293 0
                        last_id = Some(id);
294 0
                    } else {
295 0
                        tx.recover_rollback(self)?;
296
                    }
297 1
                } else if status == RecoverStatus::Commit {
298 1
                    tx.recover_cleanup(self)?;
299
                }
300 1
            }
301 1
        }
302 1
        for p in recover.journal_pages {
303 1
            allocator.remove_from_free(p, JOURNAL_PAGE_EXP)?;
304 1
        }
305

306 1
        for (_, (_, tx, _)) in recover.transactions.iter_mut() {
307 1
            tx.recover_rollback(self)?;
308
        }
309 1
        if let Some(id) = last_id {
310 0
            self.journal.finished_to_clean(&[id])?;
311
        }
312 1
        allocator.trim_free_at_end()?;
313 1
        Ok(())
314 1
    }
315

316 1
    pub fn open_recover(f: File, config: Config) -> PRes<(PersyImpl, RecoverImpl)> {
317 1
        f.try_lock_exclusive()?;
318 1
        let persy = PersyImpl::new(Box::new(DiscRef::new(f)?), config)?;
319 1
        let rec = persy.recover()?;
320 1
        Ok((persy, rec))
321 1
    }
322

323 1
    pub fn memory(config: Config) -> PRes<PersyImpl> {
324 1
        let device = PersyImpl::init(Box::new(MemRef::new()?))?;
325 1
        PersyImpl::new(device, config)
326 1
    }
327

328 1
    pub fn begin_with(&self, mut config: TransactionConfig) -> PRes<Transaction> {
329 1
        let journal = &self.journal;
330 1
        let strategy = if let Some(st) = config.tx_strategy {
331 0
            st
332
        } else {
333 1
            self.config.tx_strategy().clone()
334
        };
335 1
        let meta_id = if let Some(id) = replace(&mut config.transaction_id, None) {
336 0
            id
337
        } else {
338 1
            Vec::new()
339 1
        };
340 1
        let sync_mode = if Some(true) == config.background_sync {
341 1
            SyncMode::BackgroundSync
342
        } else {
343 1
            SyncMode::Sync
344
        };
345

346 1
        Ok(Transaction::new(journal, &strategy, sync_mode, meta_id)?)
347 1
    }
348

349 1
    pub fn create_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<SegmentId> {
350 1
        match tx.exists_segment(segment) {
351 1
            DROPPED => {}
352
            CREATED(_) => {
353 0
                return Err(PersyError::SegmentAlreadyExists);
354
            }
355
            NONE => {
356 1
                if self.address.exists_segment(&segment)? {
357 0
                    return Err(PersyError::SegmentAlreadyExists);
358
                }
359
            }
360
        }
361 1
        let (segment_id, first_segment_page) = self.address.create_temp_segment(segment)?;
362 1
        tx.add_create_segment(&self.journal, segment, segment_id, first_segment_page)?;
363 1
        Ok(segment_id)
364 1
    }
365

366 1
    pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
367 1
        let (_, segment_id) = self.check_segment_tx(tx, segment)?;
368 1
        tx.add_drop_segment(&self.journal, segment, segment_id)?;
369 1
        Ok(())
370 1
    }
371

372 1
    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
373 1
        self.address.exists_segment(segment)
374 1
    }
375

376 1
    pub fn exists_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<bool> {
377 1
        match tx.exists_segment(segment) {
378 1
            DROPPED => Ok(false),
379 1
            CREATED(_) => Ok(true),
380 0
            NONE => self.address.exists_segment(segment),
381
        }
382 1
    }
383 1
    pub fn exists_index(&self, index: &str) -> PRes<bool> {
384 1
        self.exists_segment(&format!("{}{}", INDEX_META_PREFIX, index))
385 0
    }
386

387 1
    pub fn exists_index_tx(&self, tx: &Transaction, index: &str) -> PRes<bool> {
388 1
        self.exists_segment_tx(tx, &format!("{}{}", INDEX_META_PREFIX, index))
389 0
    }
390

391 1
    pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> PRes<SegmentId> {
392 1
        segment.to_segment_id(&self.address)
393 1
    }
394

395 1
    pub fn solve_segment_id_tx(&self, tx: &Transaction, segment: impl ToSegmentId) -> PRes<SegmentId> {
396 1
        let (sid, _) = segment.to_segment_id_tx(self, tx)?;
397 1
        Ok(sid)
398 1
    }
399

400 1
    pub fn solve_segment_id_snapshot(&self, snapshot: SnapshotId, segment: impl ToSegmentId) -> PRes<SegmentId> {
401 1
        segment.to_segment_id_snapshot(&self.snapshots, snapshot)
402 1
    }
403

404 1
    pub fn solve_index_id(&self, index: impl ToIndexId) -> PRes<IndexId> {
405 1
        index.to_index_id(&self.address)
406 1
    }
407

408 1
    pub fn solve_index_id_tx(&self, tx: &Transaction, index: impl ToIndexId) -> PRes<(IndexId, bool)> {
409 1
        index.to_index_id_tx(self, tx)
410 1
    }
411

412 1
    pub fn solve_index_id_snapshot(&self, snapshot: SnapshotId, index: impl ToIndexId) -> PRes<IndexId> {
413 1
        index.to_index_id_snapshot(&self.snapshots, snapshot)
414 1
    }
415

416
    /// check if a segment exist persistent or in tx.
417
    ///
418
    /// @return true if the segment was created in tx.
419 1
    pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, SegmentId)> {
420 1
        match tx.exists_segment(segment) {
421 1
            DROPPED => Err(PersyError::SegmentNotFound),
422 1
            CREATED(segment_id) => Ok((true, segment_id)),
423 1
            NONE => self
424
                .address
425 1
                .segment_id(segment)?
426 1
                .map_or(Err(PersyError::SegmentNotFound), |id| Ok((false, id))),
427
        }
428 1
    }
429

430 1
    pub fn write_record_metadata(len: u64, id: &RecRef) -> Vec<u8> {
431 1
        let mut val = Vec::new();
432 1
        val.write_u8(0);
433 1
        val.write_varint_u64(len);
434 1
        id.write(&mut val);
435
        val
436 1
    }
437 1
    pub fn read_record_metadata(meta: &mut dyn InfallibleRead) -> (u64, RecRef) {
438 1
        let _metadata_version = meta.read_u8();
439 1
        let len = meta.read_varint_u64();
440 1
        let id = RecRef::read(meta);
441 1
        (len, id)
442 1
    }
443

444 1
    pub fn insert_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, rec: &[u8]) -> PRes<RecRef> {
445 1
        let (segment_id, in_tx) = segment.to_segment_id_tx(self, tx)?;
446 1
        let len = rec.len() as u64;
447 1
        let allocator = &self.allocator;
448 1
        let address = &self.address;
449 1
        let (rec_ref, maybe_new_page) = if in_tx {
450 1
            address.allocate_temp(segment_id)
451
        } else {
452 1
            address.allocate(segment_id)
453 1
        }?;
454 1
        let metadata = PersyImpl::write_record_metadata(len, &rec_ref);
455 1
        let allocation_exp = exp_from_content_size(len + metadata.len() as u64);
456 1
        let mut pg = allocator.allocate(allocation_exp)?;
457 1
        let page = pg.get_index();
458 1
        tx.add_insert(&self.journal, segment_id, &rec_ref, page)?;
459 1
        if let Some(new_page) = maybe_new_page {
460 1
            tx.add_new_segment_page(&self.journal, segment_id, new_page.new_page, new_page.previus_page)?;
461
        }
462 1
        pg.write_all(&metadata);
463 1
        pg.write_all(rec);
464 1
        allocator.flush_page(pg)?;
465 1
        Ok(rec_ref)
466 1
    }
467

468 1
    fn read_snapshot(&self) -> PRes<SnapshotId> {
469 1
        self.snapshots.current_snapshot()
470 1
    }
471

472 1
    pub fn snapshot(&self) -> PRes<SnapshotId> {
473 1
        let snapshot_id = self.snapshots.read_snapshot()?;
474 1
        let segs = self
475
            .address
476 1
            .snapshot_list()?
477
            .into_iter()
478 1
            .map(|(name, id, first_page)| SegmentSnapshop::new(&name, id, first_page))
479 0
            .collect::<Vec<_>>();
480 1
        self.snapshots.fill_segments(snapshot_id, &segs)?;
481 1
        Ok(snapshot_id)
482 1
    }
483

484 1
    pub fn release_snapshot(&self, snapshot_id: SnapshotId) -> PRes<()> {
485 1
        release_snapshot(snapshot_id, &self.snapshots, &self.allocator, &self.journal)
486 1
    }
487

488 1
    fn read_ref_segment(
489
        &self,
490
        tx: &Transaction,
491
        segment_id: SegmentId,
492
        rec_ref: &RecRef,
493
    ) -> PRes<Option<(u64, u16, SegmentId)>> {
494 1
        Ok(match tx.read(rec_ref) {
495 1
            TxRead::RECORD(rec) => Some((rec.0, rec.1, segment_id)),
496 1
            TxRead::DELETED => None,
497 1
            TxRead::NONE => self
498
                .address
499 1
                .read(rec_ref, segment_id)?
500 1
                .map(|(pos, version)| (pos, version, segment_id)),
501
        })
502 1
    }
503

504 1
    fn read_page(&self, match_id: &RecRef, page: u64) -> PRes<Option<Vec<u8>>> {
505 1
        self.read_page_fn(match_id, page, |x| Vec::from(x))
506 1
    }
507

508 1
    fn read_page_fn<T>(&self, match_id: &RecRef, page: u64, f: fn(&[u8]) -> T) -> PRes<Option<T>> {
509 1
        if let Some(mut pg) = self.allocator.load_page_not_free(page)? {
510 1
            let (len, id) = PersyImpl::read_record_metadata(&mut pg);
511 1
            if id.page == match_id.page && id.pos == match_id.pos {
512 1
                Ok(Some(f(pg.slice(len as usize))))
513
            } else {
514 0
                Ok(None)
515
            }
516 1
        } else {
517 0
            Ok(None)
518
        }
519 1
    }
520

521 1
    pub fn read_tx_internal_fn<T>(
522
        &self,
523
        tx: &Transaction,
524
        segment_id: SegmentId,
525
        id: &RecRef,
526
        f: fn(&[u8]) -> T,
527
    ) -> PRes<Option<(T, u16)>> {
528 0
        loop {
529 1
            if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id, id)? {
530 1
                if let Some(record) = self.read_page_fn(id, page, f)? {
531 1
                    break Ok(Some((record, version)));
532
                }
533 0
            } else {
534 1
                break Ok(None);
535
            }
536
        }
537 1
    }
538

539 1
    pub fn read_tx_internal(
540
        &self,
541
        tx: &Transaction,
542
        segment_id: SegmentId,
543
        id: &RecRef,
544
    ) -> PRes<Option<(Vec<u8>, u16)>> {
545 1
        self.read_tx_internal_fn(tx, segment_id, id, |x| Vec::from(x))
546 1
    }
547

548 1
    pub fn read_tx(&self, tx: &mut Transaction, segment: SegmentId, id: &RecRef) -> PRes<Option<Vec<u8>>> {
549 1
        if let Some((rec, version)) = self.read_tx_internal(tx, segment, id)? {
550 1
            tx.add_read(&self.journal, segment, id, version)?;
551 1
            Ok(Some(rec))
552 0
        } else {
553 1
            Ok(None)
554
        }
555 1
    }
556

557 1
    pub fn lock_record(
558
        &self,
559
        tx: &mut Transaction,
560
        segment: impl ToSegmentId,
561
        id: &RecRef,
562
        version: u16,
563
    ) -> PRes<bool> {
564 1
        let segment_id = segment.to_segment_id(&self.address)?;
565 1
        tx.lock_record(&self.address, segment_id, id, version)
566 1
    }
567

568 1
    pub fn unlock_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, id: &RecRef) -> PRes<()> {
569 1
        let segment_id = segment.to_segment_id(&self.address)?;
570 1
        tx.unlock_record(&self.address, segment_id, id)
571 1
    }
572

573 1
    pub fn read(&self, segment: SegmentId, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
574 0
        loop {
575 1
            if let Some((page, _)) = self.address.read(rec_ref, segment)? {
576 1
                if let Some(record) = self.read_page(rec_ref, page)? {
577 1
                    break Ok(Some(record));
578
                }
579 0
            } else {
580 1
                break Ok(None);
581
            }
582
        }
583 1
    }
584

585 1
    pub fn read_snap(&self, segment: SegmentId, rec_ref: &RecRef, snapshot: SnapshotId) -> PRes<Option<Vec<u8>>> {
586 1
        self.read_snap_fn(segment, rec_ref, snapshot, |x| Vec::from(x))
587 1
    }
588

589 1
    pub fn read_snap_fn<T>(
590
        &self,
591
        segment: SegmentId,
592
        rec_ref: &RecRef,
593
        snapshot: SnapshotId,
594
        f: fn(&[u8]) -> T,
595
    ) -> PRes<Option<T>> {
596 1
        let segment_id = segment;
597 0
        loop {
598 1
            if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
599 0
                match rec_vers.case {
600 1
                    EntryCase::Change(change) => {
601 1
                        if let Some(record) = self.read_page_fn(rec_ref, change.pos, f)? {
602 1
                            break Ok(Some(record));
603
                        }
604 0
                    }
605
                    EntryCase::Insert => {
606 1
                        break Ok(None);
607
                    }
608
                }
609 1
            } else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
610 1
                if let Some(record) = self.read_page_fn(rec_ref, page, f)? {
611 1
                    break Ok(Some(record));
612
                }
613 0
            } else {
614 1
                break Ok(None);
615 0
            }
616
        }
617 1
    }
618

619 1
    pub fn scan(&self, segment: SegmentId) -> PRes<SegmentRawIter> {
620 1
        let read_snapshot = self.read_snapshot()?;
621 1
        Ok(SegmentRawIter::new(segment, self.address.scan(segment)?, read_snapshot))
622 1
    }
623

624 1
    pub fn scan_snapshot_index(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
625 1
        let res = if let Some(r) = self.snapshots.scan(snapshot, segment_id)? {
626 1
            r
627
        } else {
628 1
            self.address.scan(segment_id)?
629 0
        };
630 1
        Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
631 1
    }
632

633 1
    pub fn scan_snapshot(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
634 1
        let res = self.snapshots.scan(snapshot, segment_id)?.unwrap();
635 1
        Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
636 1
    }
637

638 1
    pub fn scan_tx<'a>(&'a self, tx: &'a Transaction, segment_id: SegmentId) -> PRes<TxSegmentRawIter> {
639 1
        let read_snapshot = self.read_snapshot()?;
640 1
        Ok(TxSegmentRawIter::new(
641
            tx,
642
            segment_id,
643 1
            self.address.scan(segment_id)?,
644
            read_snapshot,
645
        ))
646 1
    }
647

648 1
    pub fn update(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef, rec: &[u8]) -> PRes<()> {
649 1
        if let Some((_, version, segment)) = self.read_ref_segment(tx, segment, rec_ref)? {
650 1
            let allocator = &self.allocator;
651 1
            let journal = &self.journal;
652 1
            let len = rec.len();
653 1
            let metadata = PersyImpl::write_record_metadata(len as u64, &rec_ref);
654 1
            let allocation_exp = exp_from_content_size((len + metadata.len()) as u64);
655 1
            let mut pg = allocator.allocate(allocation_exp)?;
656 1
            let page = pg.get_index();
657 1
            tx.add_update(journal, segment, &rec_ref, page, version)?;
658 1
            pg.write_all(&metadata);
659 1
            pg.write_all(rec);
660 1
            allocator.flush_page(pg)
661 1
        } else {
662 1
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
663
        }
664 1
    }
665

666 1
    pub fn delete(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef) -> PRes<()> {
667 1
        if let Some((_, version, seg)) = self.read_ref_segment(tx, segment, rec_ref)? {
668 1
            tx.add_delete(&self.journal, seg, &rec_ref, version)
669
        } else {
670 0
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
671
        }
672 1
    }
673

674 1
    pub fn rollback(&self, mut tx: Transaction) -> PRes<()> {
675 1
        tx.rollback(self)
676 1
    }
677

678 1
    pub fn prepare(&self, tx: Transaction) -> PRes<TxFinalize> {
679 1
        let (tx, prepared) = tx.prepare(self)?;
680

681 1
        Ok(TxFinalize {
682 1
            transaction: tx,
683 1
            prepared,
684
        })
685 1
    }
686

687 1
    pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PRes<()> {
688 1
        let prepared = finalizer.prepared.clone();
689 1
        let tx = &mut finalizer.transaction;
690 1
        tx.rollback_prepared(self, prepared)
691 1
    }
692

693 1
    pub fn commit(&self, finalizer: &mut TxFinalize) -> PRes<()> {
694 1
        let prepared = finalizer.prepared.clone();
695 1
        let tx = &mut finalizer.transaction;
696 1
        tx.commit(self, prepared)
697 1
    }
698

699 1
    pub fn create_index<K, V>(&self, tx: &mut Transaction, index_name: &str, value_mode: ValueMode) -> PRes<()>
700
    where
701
        K: IndexType,
702
        V: IndexType,
703
    {
704 1
        Indexes::create_index::<K, V>(self, tx, index_name, 32, 128, value_mode)
705 1
    }
706

707 1
    pub fn drop_index(&self, tx: &mut Transaction, index_name: &str) -> PRes<()> {
708 1
        Indexes::drop_index(self, tx, index_name)
709 1
    }
710

711 1
    pub fn put<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: V) -> PRes<()>
712
    where
713
        K: IndexType,
714
        V: IndexType,
715
    {
716 1
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
717 1
        tx.add_put(index_id, k, v);
718 1
        Ok(())
719 1
    }
720

721 1
    pub fn remove<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: Option<V>) -> PRes<()>
722
    where
723
        K: IndexType,
724
        V: IndexType,
725
    {
726 1
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
727 1
        tx.add_remove(index_id, k, v);
728 1
        Ok(())
729 1
    }
730

731 1
    pub fn get_tx<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
732
    where
733
        K: IndexType,
734
        V: IndexType,
735
    {
736 1
        let (result, vm) = {
737 1
            let mut ik = Indexes::get_index_keeper_tx::<K, V>(self, tx, &index_id)?;
738 1
            self.indexes.read_lock(index_id.clone())?;
739 1
            (ik.get(k)?, IndexKeeper::<K, V>::value_mode(&ik))
740 1
        };
741 1
        self.indexes.read_unlock(index_id.clone())?;
742 1
        tx.apply_changes::<K, V>(vm, index_id, k, result)
743 1
    }
744

745 1
    pub fn get<K, V>(&self, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
746
    where
747
        K: IndexType,
748
        V: IndexType,
749
    {
750 1
        let read_snapshot = self.snapshots.read_snapshot()?;
751 1
        let r = self.get_snapshot(index_id, read_snapshot, k);
752 1
        release_snapshot(read_snapshot, &self.snapshots, &self.allocator, &self.journal)?;
753 1
        r
754 1
    }
755

756 1
    pub fn get_snapshot<K, V>(&self, index_id: IndexId, snapshot: SnapshotId, k: &K) -> PRes<Option<Value<V>>>
757
    where
758
        K: IndexType,
759
        V: IndexType,
760
    {
761 1
        Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?.get(k)
762 1
    }
763

764 1
    pub fn index_next<K, V>(
765
        &self,
766
        index_id: &IndexId,
767
        read_snapshot: SnapshotId,
768
        next: Bound<&K>,
769
    ) -> PRes<PageIter<K, V>>
770
    where
771
        K: IndexType,
772
        V: IndexType,
773
    {
774 1
        Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.iter_from(next)
775 1
    }
776

777 1
    pub fn index_back<K, V>(
778
        &self,
779
        index_id: &IndexId,
780
        read_snapshot: SnapshotId,
781
        next: Bound<&K>,
782
    ) -> PRes<PageIterBack<K, V>>
783
    where
784
        K: IndexType,
785
        V: IndexType,
786
    {
787 1
        Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.back_iter_from(next)
788 1
    }
789

790 1
    pub fn range<K, V, R>(&self, index_id: IndexId, range: R) -> PRes<(ValueMode, IndexRawIter<K, V>)>
791
    where
792
        K: IndexType,
793
        V: IndexType,
794
        R: RangeBounds<K>,
795
    {
796 1
        let read_snapshot = self.snapshots.read_snapshot()?;
797 1
        self.range_snapshot(index_id, read_snapshot, range, true)
798 1
    }
799

800 1
    pub fn range_snapshot<K, V, R>(
801
        &self,
802
        index_id: IndexId,
803
        snapshot: SnapshotId,
804
        range: R,
805
        release_snapshot: bool,
806
    ) -> PRes<(ValueMode, IndexRawIter<K, V>)>
807
    where
808
        K: IndexType,
809
        V: IndexType,
810
        R: RangeBounds<K>,
811
    {
812 1
        let mut ik = Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?;
813 1
        let after = ik.iter_from(range.start_bound())?;
814 1
        let before = ik.back_iter_from(range.end_bound())?;
815 1
        Ok((
816 1
            IndexKeeper::<K, V>::value_mode(&ik),
817 1
            IndexRawIter::new(index_id, snapshot, after, before, release_snapshot),
818
        ))
819 1
    }
820

821
    pub fn segment_name_tx(&self, tx: &Transaction, id: SegmentId) -> PRes<Option<(String, bool)>> {
822
        if tx.segment_created_in_tx(id) {
823
            Ok(tx.segment_name_by_id(id).map(|x| (x, true)))
824
        } else {
825
            self.address.segment_name_by_id(id).map(|k| k.map(|x| (x, false)))
826
        }
827
    }
828

829 1
    pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
830 1
        Ok(self
831
            .address
832 1
            .list()?
833
            .into_iter()
834 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
835
            .collect())
836 1
    }
837

838 1
    pub fn list_segments_snapshot(&self, snapshot_id: SnapshotId) -> PRes<Vec<(String, SegmentId)>> {
839 1
        let list = self.snapshots.list(snapshot_id)?;
840 1
        Ok(list
841
            .into_iter()
842 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
843
            .collect())
844 1
    }
845

846 1
    pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
847 1
        let snapshot = self.snapshot()?;
848 1
        let res = self.list_indexes_snapshot(snapshot);
849 1
        release_snapshot(snapshot, &self.snapshots, &self.allocator, &self.journal)?;
850 1
        res
851 1
    }
852

853 1
    pub fn list_indexes_snapshot(&self, snapshot: SnapshotId) -> PRes<Vec<(String, IndexInfo)>> {
854 1
        let list = self.snapshots.list(snapshot)?;
855 1
        list.into_iter()
856 1
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
857 1
            .map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
858 1
                name.drain(..INDEX_META_PREFIX.len());
859 1
                let info = self.index_info(snapshot, &name)?;
860 1
                Ok((name, info))
861 1
            })
862
            .collect()
863 1
    }
864

865 1
    pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, SegmentId)>> {
866 1
        Ok(tx
867 1
            .filter_list(&self.address.list()?)
868 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
869 1
            .map(|(name, id)| (name.to_string(), id))
870
            .collect())
871 1
    }
872

873 1
    pub fn list_indexes_tx(&self, tx: &Transaction) -> PRes<Vec<(String, IndexInfo)>> {
874 1
        tx.filter_list(&self.address.list()?)
875 1
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
876 1
            .map(|(name, id)| (name.to_string(), id))
877 1
            .map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
878 1
                name.drain(..INDEX_META_PREFIX.len());
879 1
                let info = self.index_info_tx(tx, &name)?;
880 1
                Ok((name, info))
881 1
            })
882
            .collect()
883 1
    }
884

885 1
    fn index_info(&self, snapshot: SnapshotId, name: &str) -> PRes<IndexInfo> {
886 1
        let id = self.solve_index_id_snapshot(snapshot, name)?;
887 1
        let index = Indexes::get_index(self, snapshot, &id)?;
888 1
        Ok(IndexInfo {
889 1
            id,
890 1
            value_mode: index.value_mode,
891 1
            key_type: IndexTypeId::from(index.key_type),
892 1
            value_type: IndexTypeId::from(index.value_type),
893
        })
894 1
    }
895 1
    fn index_info_tx(&self, tx: &Transaction, name: &str) -> PRes<IndexInfo> {
896 1
        let id = self.solve_index_id_tx(tx, name)?.0;
897 1
        let (index, _version) = Indexes::get_index_tx(self, tx, &id)?;
898 1
        Ok(IndexInfo {
899 1
            id,
900 1
            value_mode: index.value_mode,
901 1
            key_type: IndexTypeId::from(index.key_type),
902 1
            value_type: IndexTypeId::from(index.value_type),
903
        })
904 1
    }
905

906 1
    pub(crate) fn journal(&self) -> &Journal {
907 1
        &self.journal
908 1
    }
909

910 1
    pub(crate) fn address(&self) -> &Address {
911 1
        &self.address
912 1
    }
913

914 1
    pub(crate) fn allocator(&self) -> &Allocator {
915 1
        &self.allocator
916 1
    }
917

918 1
    pub(crate) fn indexes(&self) -> &Indexes {
919 1
        &self.indexes
920 1
    }
921

922 1
    pub(crate) fn snapshots(&self) -> &Snapshots {
923 1
        &self.snapshots
924 1
    }
925

926
    #[cfg(feature = "background_ops")]
927 1
    pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, snapshot_id: SnapshotId) -> PRes<()> {
928 1
        match sync_mode {
929 1
            SyncMode::Sync => {
930 1
                let allocator = self.allocator();
931 1
                allocator.disc().sync()
932
            }
933
            SyncMode::BackgroundSync => {
934 1
                self.snapshots.acquire(snapshot_id)?;
935 1
                self.background_ops.add_pending(snapshot_id)
936
            }
937
        }
938 1
    }
939

940
    #[cfg(not(feature = "background_ops"))]
941
    pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, _snapshot_id: SnapshotId) -> PRes<()> {
942
        match sync_mode {
943
            SyncMode::Sync => {
944
                let allocator = self.allocator();
945
                allocator.disc().sync()
946
            }
947
            SyncMode::BackgroundSync => unreachable!(),
948
        }
949
    }
950
}
951

952 1
pub fn exp_from_content_size(size: u64) -> u8 {
953
    // content + size + match_pointer:page+ match_pointer:pos  + page_header
954 1
    let final_size = size + u64::from(PAGE_METADATA_SIZE);
955 1
    let final_size = u64::max(final_size, 32);
956
    // Should be there a better way, so far is OK.
957 1
    let mut res: u8 = 1;
958 1
    loop {
959 1
        if final_size < (1 << res) {
960
            return res;
961
        }
962 1
        res += 1;
963
    }
964 1
}

Read our documentation on viewing source code .

Loading