1
use fs2::FileExt;
2
pub use std::fs::OpenOptions;
3
use std::{
4
    collections::HashMap,
5
    fs::File,
6
    mem::replace,
7
    ops::{Bound, RangeBounds},
8
    path::Path,
9
    str,
10
    sync::Arc,
11
};
12

13
pub use crate::id::RecRef;
14
use crate::index::{
15
    config::{IndexType, IndexTypeId, Indexes, ValueMode, INDEX_DATA_PREFIX, INDEX_META_PREFIX},
16
    keeper::{IndexKeeper, IndexRawIter},
17
    nodes::{PageIter, PageIterBack, Value},
18
    tree::Index,
19
};
20
use crate::transaction::{
21
    PreparedState, SyncMode, Transaction, TxRead,
22
    TxSegCheck::{CREATED, DROPPED, NONE},
23
};
24
use crate::{
25
    address::Address,
26
    allocator::Allocator,
27
    config::{Config, TransactionConfig},
28
    discref::{Device, DiscRef, MemRef, PageOps, PAGE_METADATA_SIZE},
29
    error::{PRes, PersyError},
30
    id::{IndexId, PersyId, SegmentId, ToIndexId, ToSegmentId},
31
    io::{
32
        InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite, InfallibleWriteFormat,
33
        InfallibleWriteVarInt,
34
    },
35
    journal::{Journal, JournalId, JOURNAL_PAGE_EXP},
36
    record_scanner::{SegmentRawIter, SegmentSnapshotRawIter, TxSegmentRawIter},
37
    snapshot::{release_snapshot, EntryCase, SegmentSnapshop, SnapshotId, Snapshots},
38
};
39

40
#[cfg(feature = "background_ops")]
41
use crate::background::BackgroundOps;
42

43
const DEFAULT_PAGE_EXP: u8 = 10; // 2^10
44

45
pub struct PersyImpl {
46
    config: Arc<Config>,
47
    journal: Arc<Journal>,
48
    address: Address,
49
    indexes: Indexes,
50
    allocator: Arc<Allocator>,
51
    snapshots: Arc<Snapshots>,
52
    #[cfg(feature = "background_ops")]
53
    background_ops: BackgroundOps<SnapshotId>,
54
}
55

56
pub struct TxFinalize {
57
    transaction: Transaction,
58
    prepared: PreparedState,
59
}
60

61
/// Possible state of a transaction in the log
62 1
#[derive(PartialEq, Debug, Clone)]
63
pub enum RecoverStatus {
64
    /// Started but not completed
65
    Started,
66
    /// Successfully prepared
67
    PrepareCommit,
68
    /// rollback-ed
69
    Rollback,
70
    /// Successfully committed after prepared
71
    Commit,
72
    /// Successfully cleaned up resources after commit
73
    Cleanup,
74
}
75

76
/// Possible option for recover a transaction in prepared commit state
77 0
#[derive(PartialEq, Debug)]
78
pub enum CommitStatus {
79
    Rollback,
80
    Commit,
81
}
82
pub struct RecoverImpl {
83
    tx_id: HashMap<Vec<u8>, JournalId>,
84
    transactions: HashMap<JournalId, (RecoverStatus, Transaction, Option<CommitStatus>)>,
85
    order: Vec<JournalId>,
86
    journal_pages: Vec<u64>,
87
}
88

89
impl RecoverImpl {
90 1
    pub fn apply<C>(&mut self, recover: C) -> PRes<()>
91
    where
92
        C: Fn(&Vec<u8>) -> bool,
93
    {
94 1
        for (id, status) in self.list_transactions()? {
95 1
            if status == RecoverStatus::PrepareCommit {
96 0
                if recover(&id) {
97 0
                    self.commit(id)?;
98
                } else {
99 0
                    self.rollback(id)?;
100
                }
101
            }
102 1
        }
103 1
        Ok(())
104 1
    }
105

106 1
    pub fn list_transactions(&self) -> PRes<Vec<(Vec<u8>, RecoverStatus)>> {
107 1
        let mut res = Vec::new();
108 1
        for id in &self.order {
109 1
            if let Some((id, status)) = self
110
                .transactions
111
                .get(id)
112 1
                .map(|(s, tx, _)| (tx.meta_id().clone(), s.clone()))
113
            {
114 1
                res.push((id, status));
115
            }
116 1
        }
117 1
        Ok(res)
118 1
    }
119

120
    pub fn status(&self, tx_id: Vec<u8>) -> PRes<Option<RecoverStatus>> {
121
        if let Some(id) = self.tx_id.get(&tx_id) {
122
            Ok(self.transactions.get(id).map(|(s, _, _)| s.clone()))
123
        } else {
124
            Ok(None)
125
        }
126
    }
127

128 0
    pub fn commit(&mut self, tx_id: Vec<u8>) -> PRes<()> {
129 0
        if let Some(id) = self.tx_id.get(&tx_id) {
130 0
            if let Some(tx) = self.transactions.get_mut(id) {
131 0
                tx.2 = Some(CommitStatus::Commit);
132
            }
133
        }
134 0
        Ok(())
135 0
    }
136

137 0
    pub fn rollback(&mut self, tx_id: Vec<u8>) -> PRes<()> {
138 0
        if let Some(id) = self.tx_id.get(&tx_id) {
139 0
            if let Some(tx) = self.transactions.get_mut(id) {
140 0
                tx.2 = Some(CommitStatus::Rollback);
141
            }
142
        }
143 0
        Ok(())
144 0
    }
145
}
146

147
#[derive(Clone)]
148
/// Index definition details
149
pub struct IndexInfo {
150
    pub id: IndexId,
151
    pub value_mode: ValueMode,
152
    pub key_type: IndexTypeId,
153
    pub value_type: IndexTypeId,
154
}
155

156
#[cfg(feature = "background_ops")]
157 1
fn create_background_ops(
158
    journal: Arc<Journal>,
159
    allocator: Arc<Allocator>,
160
    snapshots: Arc<Snapshots>,
161
) -> PRes<BackgroundOps<SnapshotId>> {
162 1
    let all_sync = allocator.clone();
163 1
    BackgroundOps::new(
164 1
        move || all_sync.disc().sync(),
165 1
        move |all: &[SnapshotId]| {
166 1
            for snap in all {
167 1
                release_snapshot(*snap, &snapshots, &allocator, &journal)?;
168
            }
169 1
            Ok(())
170 1
        },
171
    )
172 0
}
173

174
impl PersyImpl {
175 1
    pub fn create(path: &Path) -> PRes<()> {
176 1
        let f = OpenOptions::new().write(true).read(true).create_new(true).open(path)?;
177 1
        PersyImpl::create_from_file(f)
178 1
    }
179

180 1
    pub fn create_from_file(f: File) -> PRes<()> {
181 1
        f.try_lock_exclusive()?;
182 1
        PersyImpl::init_file(f)?;
183 1
        Ok(())
184 1
    }
185

186 1
    fn init_file(fl: File) -> PRes<()> {
187 1
        PersyImpl::init(Box::new(DiscRef::new(fl)?))?;
188 1
        Ok(())
189 1
    }
190

191 1
    fn init(device: Box<dyn Device>) -> PRes<Box<dyn Device>> {
192
        // root_page is every time 0
193 1
        let root_page = device.create_page_raw(DEFAULT_PAGE_EXP)?;
194 1
        let (allocator_page, allocator) = Allocator::init(device, &Config::new())?;
195 1
        let address_page = Address::init(&allocator)?;
196 1
        let journal_page = Journal::init(&allocator)?;
197
        {
198 1
            let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
199
            // Version of the disc format
200 1
            root.write_u16(0);
201
            // Position of the start of address structure
202 1
            root.write_u64(address_page);
203
            // Start of the Log data, if shutdown well this will be every time 0
204 1
            root.write_u64(journal_page);
205 1
            root.write_u64(allocator_page);
206 1
            allocator.flush_page(root)?;
207
            // TODO: check this never go over the first page
208 1
        }
209 1
        allocator.disc().sync()?;
210 1
        Ok(allocator.release())
211 1
    }
212

213 1
    fn new(disc: Box<dyn Device>, config: Config) -> PRes<PersyImpl> {
214 1
        let address_page;
215
        let journal_page;
216
        let allocator_page;
217
        {
218 1
            let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
219 1
            pg.read_u16(); //THIS NOW is 0 all the times
220 1
            address_page = pg.read_u64();
221 1
            journal_page = pg.read_u64();
222 1
            allocator_page = pg.read_u64();
223 1
        }
224 1
        let config = Arc::new(config);
225 1
        let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
226 1
        let address = Address::new(&allocator, &config, address_page)?;
227 1
        let journal = Arc::new(Journal::new(&allocator, journal_page)?);
228 1
        let indexes = Indexes::new(&config);
229 1
        let snapshots = Arc::new(Snapshots::new());
230
        #[cfg(feature = "background_ops")]
231 1
        let background_ops = create_background_ops(journal.clone(), allocator.clone(), snapshots.clone())?;
232 1
        Ok(PersyImpl {
233 1
            config,
234 1
            journal,
235 1
            address,
236 1
            indexes,
237 1
            allocator,
238 1
            snapshots,
239
            #[cfg(feature = "background_ops")]
240 1
            background_ops,
241
        })
242 1
    }
243

244 1
    fn recover(&self) -> PRes<RecoverImpl> {
245 1
        let mut commit_order = Vec::new();
246 1
        let mut transactions = HashMap::new();
247 1
        let journal = &self.journal;
248 1
        let jp = journal.recover(|record, id| {
249 1
            let tx = transactions
250 1
                .entry(id.clone())
251 1
                .or_insert_with(|| (RecoverStatus::Started, Transaction::recover(id.clone()), None));
252 1
            tx.0 = match record.recover(&mut tx.1) {
253 1
                Err(_) => RecoverStatus::Rollback,
254 1
                Ok(_) if tx.0 == RecoverStatus::Rollback => RecoverStatus::Rollback,
255 1
                Ok(x) => match x {
256 1
                    RecoverStatus::Started => RecoverStatus::Started,
257
                    RecoverStatus::PrepareCommit => {
258 1
                        commit_order.push(id.clone());
259 1
                        RecoverStatus::PrepareCommit
260
                    }
261 1
                    RecoverStatus::Rollback => RecoverStatus::Rollback,
262
                    RecoverStatus::Commit => {
263 1
                        commit_order.push(id.clone());
264 1
                        RecoverStatus::Commit
265
                    }
266 1
                    RecoverStatus::Cleanup => RecoverStatus::Cleanup,
267
                },
268
            }
269 1
        })?;
270

271 1
        let mut transactions_id = HashMap::new();
272 1
        for (id, (_, tx, _)) in &transactions {
273 1
            transactions_id.insert(tx.meta_id().clone(), id.clone());
274
        }
275 1
        Ok(RecoverImpl {
276 1
            tx_id: transactions_id,
277 1
            transactions,
278 1
            order: commit_order,
279 1
            journal_pages: jp,
280
        })
281 1
    }
282 1
    pub fn final_recover(&self, mut recover: RecoverImpl) -> PRes<()> {
283 1
        let mut last_id = None;
284 1
        let allocator = &self.allocator;
285 1
        for id in recover.order {
286 1
            if let Some((status, mut tx, choosed)) = recover.transactions.remove(&id) {
287 1
                if status == RecoverStatus::PrepareCommit {
288 0
                    if choosed == Some(CommitStatus::Commit) || choosed.is_none() {
289 0
                        let prepared = tx.recover_prepare(self)?;
290 0
                        tx.recover_commit(self, prepared)?;
291 0
                        last_id = Some(id);
292 0
                    } else {
293 0
                        tx.recover_rollback(self)?;
294
                    }
295 1
                } else if status == RecoverStatus::Commit {
296 1
                    tx.recover_cleanup(self)?;
297
                }
298 1
            }
299 1
        }
300 1
        for p in recover.journal_pages {
301 1
            allocator.remove_from_free(p, JOURNAL_PAGE_EXP)?;
302 1
        }
303

304 1
        for (_, (_, tx, _)) in recover.transactions.iter_mut() {
305 1
            tx.recover_rollback(self)?;
306
        }
307 1
        if let Some(id) = last_id {
308 0
            self.journal.finished_to_clean(&[id])?;
309
        }
310 1
        allocator.trim_free_at_end()?;
311 1
        Ok(())
312 1
    }
313

314 1
    pub fn open_recover(f: File, config: Config) -> PRes<(PersyImpl, RecoverImpl)> {
315 1
        f.try_lock_exclusive()?;
316 1
        let persy = PersyImpl::new(Box::new(DiscRef::new(f)?), config)?;
317 1
        let rec = persy.recover()?;
318 1
        Ok((persy, rec))
319 1
    }
320

321 1
    pub fn memory(config: Config) -> PRes<PersyImpl> {
322 1
        let device = PersyImpl::init(Box::new(MemRef::new()?))?;
323 1
        PersyImpl::new(device, config)
324 1
    }
325

326 1
    pub fn begin_with(&self, mut config: TransactionConfig) -> PRes<Transaction> {
327 1
        let journal = &self.journal;
328 1
        let strategy = if let Some(st) = config.tx_strategy {
329 0
            st
330
        } else {
331 1
            self.config.tx_strategy().clone()
332
        };
333 1
        let meta_id = if let Some(id) = replace(&mut config.transaction_id, None) {
334 0
            id
335
        } else {
336 1
            Vec::new()
337 1
        };
338 1
        let sync_mode = if Some(true) == config.background_sync {
339 1
            SyncMode::BackgroundSync
340
        } else {
341 1
            SyncMode::Sync
342
        };
343

344 1
        Ok(Transaction::new(journal, &strategy, sync_mode, meta_id)?)
345 1
    }
346

347 1
    pub fn create_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<SegmentId> {
348 1
        match tx.exists_segment(segment) {
349 1
            DROPPED => {}
350
            CREATED(_) => {
351 0
                return Err(PersyError::SegmentAlreadyExists);
352
            }
353
            NONE => {
354 1
                if self.address.exists_segment(&segment)? {
355 0
                    return Err(PersyError::SegmentAlreadyExists);
356
                }
357
            }
358
        }
359 1
        let (segment_id, first_segment_page) = self.address.create_temp_segment(segment)?;
360 1
        tx.add_create_segment(&self.journal, segment, segment_id, first_segment_page)?;
361 1
        Ok(segment_id)
362 1
    }
363

364 1
    pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
365 1
        let (_, segment_id) = self.check_segment_tx(tx, segment)?;
366 1
        tx.add_drop_segment(&self.journal, segment, segment_id)?;
367 1
        Ok(())
368 1
    }
369

370 1
    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
371 1
        self.address.exists_segment(segment)
372 1
    }
373

374 1
    pub fn exists_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<bool> {
375 1
        match tx.exists_segment(segment) {
376 1
            DROPPED => Ok(false),
377 1
            CREATED(_) => Ok(true),
378 0
            NONE => self.address.exists_segment(segment),
379
        }
380 1
    }
381 1
    pub fn exists_index(&self, index: &str) -> PRes<bool> {
382 1
        self.exists_segment(&format!("{}{}", INDEX_META_PREFIX, index))
383 0
    }
384

385 1
    pub fn exists_index_tx(&self, tx: &Transaction, index: &str) -> PRes<bool> {
386 1
        self.exists_segment_tx(tx, &format!("{}{}", INDEX_META_PREFIX, index))
387 0
    }
388

389 1
    pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> PRes<SegmentId> {
390 1
        segment.to_segment_id(&self.address)
391 1
    }
392

393 1
    pub fn solve_segment_id_tx(&self, tx: &Transaction, segment: impl ToSegmentId) -> PRes<SegmentId> {
394 1
        let (sid, _) = segment.to_segment_id_tx(self, tx)?;
395 1
        Ok(sid)
396 1
    }
397

398 1
    pub fn solve_segment_id_snapshot(&self, snapshot: SnapshotId, segment: impl ToSegmentId) -> PRes<SegmentId> {
399 1
        segment.to_segment_id_snapshot(&self.snapshots, snapshot)
400 1
    }
401

402 1
    pub fn solve_index_id(&self, index: impl ToIndexId) -> PRes<IndexId> {
403 1
        index.to_index_id(&self.address)
404 1
    }
405

406 1
    pub fn solve_index_id_tx(&self, tx: &Transaction, index: impl ToIndexId) -> PRes<(IndexId, bool)> {
407 1
        index.to_index_id_tx(self, tx)
408 1
    }
409

410 1
    pub fn solve_index_id_snapshot(&self, snapshot: SnapshotId, index: impl ToIndexId) -> PRes<IndexId> {
411 1
        index.to_index_id_snapshot(&self.snapshots, snapshot)
412 1
    }
413

414
    /// check if a segment exist persistent or in tx.
415
    ///
416
    /// @return true if the segment was created in tx.
417 1
    pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, SegmentId)> {
418 1
        match tx.exists_segment(segment) {
419 1
            DROPPED => Err(PersyError::SegmentNotFound),
420 1
            CREATED(segment_id) => Ok((true, segment_id)),
421 1
            NONE => self
422
                .address
423 1
                .segment_id(segment)?
424 1
                .map_or(Err(PersyError::SegmentNotFound), |id| Ok((false, id))),
425
        }
426 1
    }
427

428 1
    pub fn write_record_metadata(len: u64, id: &RecRef) -> Vec<u8> {
429 1
        let mut val = Vec::new();
430 1
        val.write_u8(0);
431 1
        val.write_varint_u64(len);
432 1
        id.write(&mut val);
433
        val
434 1
    }
435 1
    pub fn read_record_metadata(meta: &mut dyn InfallibleRead) -> (u64, RecRef) {
436 1
        let _metadata_version = meta.read_u8();
437 1
        let len = meta.read_varint_u64();
438 1
        let id = RecRef::read(meta);
439 1
        (len, id)
440 1
    }
441

442 1
    pub fn insert_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, rec: &[u8]) -> PRes<RecRef> {
443 1
        let (segment_id, in_tx) = segment.to_segment_id_tx(self, tx)?;
444 1
        let len = rec.len() as u64;
445 1
        let allocator = &self.allocator;
446 1
        let address = &self.address;
447 1
        let (rec_ref, maybe_new_page) = if in_tx {
448 1
            address.allocate_temp(segment_id)
449
        } else {
450 1
            address.allocate(segment_id)
451 1
        }?;
452 1
        let metadata = PersyImpl::write_record_metadata(len, &rec_ref);
453 1
        let allocation_exp = exp_from_content_size(len + metadata.len() as u64);
454 1
        let mut pg = allocator.allocate(allocation_exp)?;
455 1
        let page = pg.get_index();
456 1
        tx.add_insert(&self.journal, segment_id, &rec_ref, page)?;
457 1
        if let Some(new_page) = maybe_new_page {
458 1
            tx.add_new_segment_page(&self.journal, segment_id, new_page.new_page, new_page.previus_page)?;
459
        }
460 1
        pg.write_all(&metadata);
461 1
        pg.write_all(rec);
462 1
        allocator.flush_page(pg)?;
463 1
        Ok(rec_ref)
464 1
    }
465

466 1
    fn read_snapshot(&self) -> PRes<SnapshotId> {
467 1
        self.snapshots.current_snapshot()
468 1
    }
469

470 1
    pub fn snapshot(&self) -> PRes<SnapshotId> {
471 1
        let snapshot_id = self.snapshots.read_snapshot()?;
472 1
        let segs = self
473
            .address
474 1
            .snapshot_list()?
475
            .into_iter()
476 1
            .map(|(name, id, first_page)| SegmentSnapshop::new(&name, id, first_page))
477 0
            .collect::<Vec<_>>();
478 1
        self.snapshots.fill_segments(snapshot_id, &segs)?;
479 1
        Ok(snapshot_id)
480 1
    }
481

482 1
    pub fn release_snapshot(&self, snapshot_id: SnapshotId) -> PRes<()> {
483 1
        release_snapshot(snapshot_id, &self.snapshots, &self.allocator, &self.journal)
484 1
    }
485

486 1
    fn read_ref_segment(
487
        &self,
488
        tx: &Transaction,
489
        segment_id: SegmentId,
490
        rec_ref: &RecRef,
491
    ) -> PRes<Option<(u64, u16, SegmentId)>> {
492 1
        Ok(match tx.read(rec_ref) {
493 1
            TxRead::RECORD(rec) => Some((rec.0, rec.1, segment_id)),
494 1
            TxRead::DELETED => None,
495 1
            TxRead::NONE => self
496
                .address
497 1
                .read(rec_ref, segment_id)?
498 1
                .map(|(pos, version)| (pos, version, segment_id)),
499
        })
500 1
    }
501

502 1
    fn read_page(&self, match_id: &RecRef, page: u64) -> PRes<Option<Vec<u8>>> {
503 1
        self.read_page_fn(match_id, page, |x| Vec::from(x))
504 1
    }
505

506 1
    fn read_page_fn<T, F>(&self, match_id: &RecRef, page: u64, f: F) -> PRes<Option<T>>
507
    where
508
        F: Fn(&[u8]) -> T,
509
    {
510 1
        if let Some(mut pg) = self.allocator.load_page_not_free(page)? {
511 1
            let (len, id) = PersyImpl::read_record_metadata(&mut pg);
512 1
            if id.page == match_id.page && id.pos == match_id.pos {
513 1
                Ok(Some(f(pg.slice(len as usize))))
514
            } else {
515 0
                Ok(None)
516
            }
517 1
        } else {
518 0
            Ok(None)
519
        }
520 1
    }
521

522 1
    pub fn read_tx_internal_fn<T, F>(
523
        &self,
524
        tx: &Transaction,
525
        segment_id: SegmentId,
526
        id: &RecRef,
527
        f: F,
528
    ) -> PRes<Option<(T, u16)>>
529
    where
530
        F: Fn(&[u8]) -> T,
531
    {
532 0
        loop {
533 1
            if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id, id)? {
534 1
                if let Some(record) = self.read_page_fn(id, page, &f)? {
535 1
                    break Ok(Some((record, version)));
536
                }
537 0
            } else {
538 1
                break Ok(None);
539
            }
540 0
        }
541 1
    }
542

543 1
    pub fn read_tx_internal(
544
        &self,
545
        tx: &Transaction,
546
        segment_id: SegmentId,
547
        id: &RecRef,
548
    ) -> PRes<Option<(Vec<u8>, u16)>> {
549 1
        self.read_tx_internal_fn(tx, segment_id, id, |x| Vec::from(x))
550 1
    }
551

552 1
    pub fn read_tx(&self, tx: &mut Transaction, segment: SegmentId, id: &RecRef) -> PRes<Option<Vec<u8>>> {
553 1
        if let Some((rec, version)) = self.read_tx_internal(tx, segment, id)? {
554 1
            tx.add_read(&self.journal, segment, id, version)?;
555 1
            Ok(Some(rec))
556 0
        } else {
557 1
            Ok(None)
558
        }
559 1
    }
560

561 1
    pub fn lock_record(
562
        &self,
563
        tx: &mut Transaction,
564
        segment: impl ToSegmentId,
565
        id: &RecRef,
566
        version: u16,
567
    ) -> PRes<bool> {
568 1
        let segment_id = segment.to_segment_id(&self.address)?;
569 1
        tx.lock_record(&self.address, segment_id, id, version)
570 1
    }
571

572 1
    pub fn unlock_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, id: &RecRef) -> PRes<()> {
573 1
        let segment_id = segment.to_segment_id(&self.address)?;
574 1
        tx.unlock_record(&self.address, segment_id, id)
575 1
    }
576

577 1
    pub fn read(&self, segment: SegmentId, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
578 0
        loop {
579 1
            if let Some((page, _)) = self.address.read(rec_ref, segment)? {
580 1
                if let Some(record) = self.read_page(rec_ref, page)? {
581 1
                    break Ok(Some(record));
582
                }
583 0
            } else {
584 1
                break Ok(None);
585
            }
586
        }
587 1
    }
588

589 1
    pub fn read_snap(&self, segment: SegmentId, rec_ref: &RecRef, snapshot: SnapshotId) -> PRes<Option<Vec<u8>>> {
590 1
        self.read_snap_fn(segment, rec_ref, snapshot, |x| Vec::from(x))
591 1
    }
592

593 1
    pub fn read_snap_fn<T, F>(
594
        &self,
595
        segment: SegmentId,
596
        rec_ref: &RecRef,
597
        snapshot: SnapshotId,
598
        f: F,
599
    ) -> PRes<Option<T>>
600
    where
601
        F: Fn(&[u8]) -> T,
602
    {
603 1
        let segment_id = segment;
604 1
        loop {
605 1
            if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
606 0
                match rec_vers.case {
607 1
                    EntryCase::Change(change) => {
608 1
                        if let Some(record) = self.read_page_fn(rec_ref, change.pos, &f)? {
609 1
                            break Ok(Some(record));
610
                        }
611 0
                    }
612
                    EntryCase::Insert => {
613 1
                        break Ok(None);
614
                    }
615
                }
616 1
            } else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
617 1
                if let Some(record) = self.read_page_fn(rec_ref, page, &f)? {
618 1
                    break Ok(Some(record));
619
                }
620 0
            } else {
621 0
                break Ok(None);
622 0
            }
623 0
        }
624 1
    }
625

626 1
    pub fn scan(&self, segment: SegmentId) -> PRes<SegmentRawIter> {
627 1
        let read_snapshot = self.read_snapshot()?;
628 1
        Ok(SegmentRawIter::new(segment, self.address.scan(segment)?, read_snapshot))
629 1
    }
630

631 1
    pub fn scan_snapshot(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
632 1
        let res = self.snapshots.scan(snapshot, segment_id)?.unwrap();
633 1
        Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
634 1
    }
635

636 1
    pub fn scan_tx<'a>(&'a self, tx: &'a Transaction, segment_id: SegmentId) -> PRes<TxSegmentRawIter> {
637 1
        let read_snapshot = self.read_snapshot()?;
638 1
        Ok(TxSegmentRawIter::new(
639
            tx,
640
            segment_id,
641 1
            self.address.scan(segment_id)?,
642
            read_snapshot,
643
        ))
644 1
    }
645

646 1
    pub fn update(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef, rec: &[u8]) -> PRes<()> {
647 1
        if let Some((_, version, segment)) = self.read_ref_segment(tx, segment, rec_ref)? {
648 1
            let allocator = &self.allocator;
649 1
            let journal = &self.journal;
650 1
            let len = rec.len();
651 1
            let metadata = PersyImpl::write_record_metadata(len as u64, &rec_ref);
652 1
            let allocation_exp = exp_from_content_size((len + metadata.len()) as u64);
653 1
            let mut pg = allocator.allocate(allocation_exp)?;
654 1
            let page = pg.get_index();
655 1
            tx.add_update(journal, segment, &rec_ref, page, version)?;
656 1
            pg.write_all(&metadata);
657 1
            pg.write_all(rec);
658 1
            allocator.flush_page(pg)
659 1
        } else {
660 1
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
661
        }
662 1
    }
663

664 1
    pub fn delete(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef) -> PRes<()> {
665 1
        if let Some((_, version, seg)) = self.read_ref_segment(tx, segment, rec_ref)? {
666 1
            tx.add_delete(&self.journal, seg, &rec_ref, version)
667
        } else {
668 0
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
669
        }
670 1
    }
671

672 1
    pub fn rollback(&self, mut tx: Transaction) -> PRes<()> {
673 1
        tx.rollback(self)
674 1
    }
675

676 1
    pub fn prepare(&self, tx: Transaction) -> PRes<TxFinalize> {
677 1
        let (tx, prepared) = tx.prepare(self)?;
678

679 1
        Ok(TxFinalize {
680 1
            transaction: tx,
681 1
            prepared,
682
        })
683 1
    }
684

685 1
    pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PRes<()> {
686 1
        let prepared = finalizer.prepared.clone();
687 1
        let tx = &mut finalizer.transaction;
688 1
        tx.rollback_prepared(self, prepared)
689 1
    }
690

691 1
    pub fn commit(&self, finalizer: &mut TxFinalize) -> PRes<()> {
692 1
        let prepared = finalizer.prepared.clone();
693 1
        let tx = &mut finalizer.transaction;
694 1
        tx.commit(self, prepared)
695 1
    }
696

697 1
    pub fn create_index<K, V>(&self, tx: &mut Transaction, index_name: &str, value_mode: ValueMode) -> PRes<()>
698
    where
699
        K: IndexType,
700
        V: IndexType,
701
    {
702 1
        Indexes::create_index::<K, V>(self, tx, index_name, 32, 128, value_mode)
703 1
    }
704

705 1
    pub fn drop_index(&self, tx: &mut Transaction, index_name: &str) -> PRes<()> {
706 1
        Indexes::drop_index(self, tx, index_name)
707 1
    }
708

709 1
    pub fn put<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: V) -> PRes<()>
710
    where
711
        K: IndexType,
712
        V: IndexType,
713
    {
714 1
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
715 1
        tx.add_put(index_id, k, v);
716 1
        Ok(())
717 1
    }
718

719 1
    pub fn remove<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: Option<V>) -> PRes<()>
720
    where
721
        K: IndexType,
722
        V: IndexType,
723
    {
724 1
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
725 1
        tx.add_remove(index_id, k, v);
726 1
        Ok(())
727 1
    }
728

729 1
    pub fn get_tx<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
730
    where
731
        K: IndexType,
732
        V: IndexType,
733
    {
734 1
        let (result, vm) = {
735 1
            let mut ik = Indexes::get_index_keeper_tx::<K, V>(self, tx, &index_id)?;
736 1
            self.indexes.read_lock(index_id.clone())?;
737 1
            (ik.get(k)?, IndexKeeper::<K, V>::value_mode(&ik))
738 1
        };
739 1
        self.indexes.read_unlock(index_id.clone())?;
740 1
        tx.apply_changes::<K, V>(vm, index_id, k, result)
741 1
    }
742

743 1
    pub fn get<K, V>(&self, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
744
    where
745
        K: IndexType,
746
        V: IndexType,
747
    {
748 1
        let read_snapshot = self.snapshot()?;
749 1
        let r = self.get_snapshot(index_id, read_snapshot, k);
750 1
        release_snapshot(read_snapshot, &self.snapshots, &self.allocator, &self.journal)?;
751 1
        r
752 1
    }
753

754 1
    pub fn get_snapshot<K, V>(&self, index_id: IndexId, snapshot: SnapshotId, k: &K) -> PRes<Option<Value<V>>>
755
    where
756
        K: IndexType,
757
        V: IndexType,
758
    {
759 1
        Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?.get(k)
760 1
    }
761

762 1
    pub fn index_next<K, V>(
763
        &self,
764
        index_id: &IndexId,
765
        read_snapshot: SnapshotId,
766
        next: Bound<&K>,
767
    ) -> PRes<PageIter<K, V>>
768
    where
769
        K: IndexType,
770
        V: IndexType,
771
    {
772 1
        Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.iter_from(next)
773 1
    }
774

775 1
    pub fn index_back<K, V>(
776
        &self,
777
        index_id: &IndexId,
778
        read_snapshot: SnapshotId,
779
        next: Bound<&K>,
780
    ) -> PRes<PageIterBack<K, V>>
781
    where
782
        K: IndexType,
783
        V: IndexType,
784
    {
785 1
        Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.back_iter_from(next)
786 1
    }
787

788 1
    pub fn range<K, V, R>(&self, index_id: IndexId, range: R) -> PRes<(ValueMode, IndexRawIter<K, V>)>
789
    where
790
        K: IndexType,
791
        V: IndexType,
792
        R: RangeBounds<K>,
793
    {
794 1
        let read_snapshot = self.snapshot()?;
795 1
        self.range_snapshot(index_id, read_snapshot, range, true)
796 1
    }
797

798 1
    pub fn range_snapshot<K, V, R>(
799
        &self,
800
        index_id: IndexId,
801
        snapshot: SnapshotId,
802
        range: R,
803
        release_snapshot: bool,
804
    ) -> PRes<(ValueMode, IndexRawIter<K, V>)>
805
    where
806
        K: IndexType,
807
        V: IndexType,
808
        R: RangeBounds<K>,
809
    {
810 1
        let mut ik = Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?;
811 1
        let after = ik.iter_from(range.start_bound())?;
812 1
        let before = ik.back_iter_from(range.end_bound())?;
813 1
        Ok((
814 1
            IndexKeeper::<K, V>::value_mode(&ik),
815 1
            IndexRawIter::new(index_id, snapshot, after, before, release_snapshot),
816
        ))
817 1
    }
818

819
    pub fn segment_name_tx(&self, tx: &Transaction, id: SegmentId) -> PRes<Option<(String, bool)>> {
820
        if tx.segment_created_in_tx(id) {
821
            Ok(tx.segment_name_by_id(id).map(|x| (x, true)))
822
        } else {
823
            self.address.segment_name_by_id(id).map(|k| k.map(|x| (x, false)))
824
        }
825
    }
826

827 1
    pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
828 1
        Ok(self
829
            .address
830 1
            .list()?
831
            .into_iter()
832 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
833
            .collect())
834 1
    }
835

836 1
    pub fn list_segments_snapshot(&self, snapshot_id: SnapshotId) -> PRes<Vec<(String, SegmentId)>> {
837 1
        let list = self.snapshots.list(snapshot_id)?;
838 1
        Ok(list
839
            .into_iter()
840 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
841
            .collect())
842 1
    }
843

844 1
    pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
845 1
        let snapshot = self.snapshot()?;
846 1
        let res = self.list_indexes_snapshot(snapshot);
847 1
        release_snapshot(snapshot, &self.snapshots, &self.allocator, &self.journal)?;
848 1
        res
849 1
    }
850 1
    pub fn list_indexes_snapshot(&self, snapshot: SnapshotId) -> PRes<Vec<(String, IndexInfo)>> {
851 1
        let list = self.snapshots.list(snapshot)?;
852 1
        list.into_iter()
853 1
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
854 1
            .map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
855 1
                name.drain(..INDEX_META_PREFIX.len());
856 1
                let info = self.index_info(snapshot, &name)?;
857 1
                Ok((name, info))
858 1
            })
859
            .collect()
860 1
    }
861

862 1
    pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, SegmentId)>> {
863 1
        Ok(tx
864 1
            .filter_list(&self.address.list()?)
865 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
866 1
            .map(|(name, id)| (name.to_string(), id))
867
            .collect())
868 1
    }
869

870 1
    pub fn list_indexes_tx(&self, tx: &Transaction) -> PRes<Vec<(String, IndexInfo)>> {
871 1
        tx.filter_list(&self.address.list()?)
872 1
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
873 1
            .map(|(name, id)| (name.to_string(), id))
874 1
            .map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
875 1
                name.drain(..INDEX_META_PREFIX.len());
876 1
                let info = self.index_info_tx(tx, &name)?;
877 1
                Ok((name, info))
878 1
            })
879
            .collect()
880 1
    }
881

882 1
    fn index_info(&self, snapshot: SnapshotId, name: &str) -> PRes<IndexInfo> {
883 1
        let id = self.solve_index_id_snapshot(snapshot, name)?;
884 1
        let index = Indexes::get_index(self, snapshot, &id)?;
885 1
        Ok(IndexInfo {
886 1
            id,
887 1
            value_mode: index.value_mode,
888 1
            key_type: IndexTypeId::from(index.key_type),
889 1
            value_type: IndexTypeId::from(index.value_type),
890
        })
891 1
    }
892 1
    fn index_info_tx(&self, tx: &Transaction, name: &str) -> PRes<IndexInfo> {
893 1
        let id = self.solve_index_id_tx(tx, name)?.0;
894 1
        let (index, _version) = Indexes::get_index_tx(self, tx, &id)?;
895 1
        Ok(IndexInfo {
896 1
            id,
897 1
            value_mode: index.value_mode,
898 1
            key_type: IndexTypeId::from(index.key_type),
899 1
            value_type: IndexTypeId::from(index.value_type),
900
        })
901 1
    }
902

903 1
    pub(crate) fn journal(&self) -> &Journal {
904 1
        &self.journal
905 1
    }
906

907 1
    pub(crate) fn address(&self) -> &Address {
908 1
        &self.address
909 1
    }
910

911 1
    pub(crate) fn allocator(&self) -> &Allocator {
912 1
        &self.allocator
913 1
    }
914

915 1
    pub(crate) fn indexes(&self) -> &Indexes {
916 1
        &self.indexes
917 1
    }
918

919 1
    pub(crate) fn snapshots(&self) -> &Snapshots {
920 1
        &self.snapshots
921 1
    }
922

923
    #[cfg(feature = "background_ops")]
924 1
    pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, snapshot_id: SnapshotId) -> PRes<()> {
925 1
        match sync_mode {
926 1
            SyncMode::Sync => {
927 1
                let allocator = self.allocator();
928 1
                allocator.disc().sync()
929
            }
930
            SyncMode::BackgroundSync => {
931 1
                self.snapshots.acquire(snapshot_id)?;
932 1
                self.background_ops.add_pending(snapshot_id)
933
            }
934
        }
935 1
    }
936

937
    #[cfg(not(feature = "background_ops"))]
938
    pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, _snapshot_id: SnapshotId) -> PRes<()> {
939
        match sync_mode {
940
            SyncMode::Sync => {
941
                let allocator = self.allocator();
942
                allocator.disc().sync()
943
            }
944
            SyncMode::BackgroundSync => unreachable!(),
945
        }
946
    }
947
}
948

949 1
pub fn exp_from_content_size(size: u64) -> u8 {
950
    // content + size + match_pointer:page+ match_pointer:pos  + page_header
951 1
    let final_size = size + u64::from(PAGE_METADATA_SIZE);
952 1
    let final_size = u64::max(final_size, 32);
953
    // Should be there a better way, so far is OK.
954 1
    let mut res: u8 = 1;
955 1
    loop {
956 1
        if final_size < (1 << res) {
957
            return res;
958
        }
959 1
        res += 1;
960
    }
961 1
}

Read our documentation on viewing source code .

Loading