1
use fs2::FileExt;
2
pub use std::fs::OpenOptions;
3
use std::{
4
    collections::HashMap,
5
    fs::File,
6
    mem::replace,
7
    ops::{Bound, RangeBounds},
8
    path::Path,
9
    str,
10
    sync::Arc,
11
};
12

13
pub use crate::id::RecRef;
14
use crate::index::{
15
    config::{IndexType, IndexTypeId, Indexes, ValueMode, INDEX_DATA_PREFIX, INDEX_META_PREFIX},
16
    keeper::{IndexKeeper, IndexRawIter},
17
    nodes::{PageIter, PageIterBack, Value},
18
    tree::Index,
19
};
20
use crate::transaction::{
21
    PreparedState, SyncMode, Transaction, TxRead,
22
    TxSegCheck::{CREATED, DROPPED, NONE},
23
};
24
use crate::{
25
    address::Address,
26
    allocator::Allocator,
27
    config::{Config, TransactionConfig},
28
    discref::{Device, DiscRef, MemRef, PageOps, PAGE_METADATA_SIZE},
29
    error::{PRes, PersyError},
30
    id::{IndexId, PersyId, SegmentId, ToIndexId, ToSegmentId},
31
    io::{
32
        InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite, InfallibleWriteFormat,
33
        InfallibleWriteVarInt,
34
    },
35
    journal::{Journal, JournalId, JOURNAL_PAGE_EXP},
36
    record_scanner::{SegmentRawIter, SegmentSnapshotRawIter, TxSegmentRawIter},
37
    snapshot::{release_snapshot, EntryCase, SegmentSnapshop, SnapshotId, Snapshots},
38
};
39

40
#[cfg(feature = "background_ops")]
41
use crate::background::BackgroundOps;
42

43
const DEFAULT_PAGE_EXP: u8 = 10; // 2^10
44

45
pub struct PersyImpl {
46
    config: Arc<Config>,
47
    journal: Arc<Journal>,
48
    address: Address,
49
    indexes: Indexes,
50
    allocator: Arc<Allocator>,
51
    snapshots: Arc<Snapshots>,
52
    #[cfg(feature = "background_ops")]
53
    background_ops: BackgroundOps<SnapshotId>,
54
}
55

56
pub struct TxFinalize {
57
    transaction: Transaction,
58
    prepared: PreparedState,
59
}
60

61
/// Possible state of a transaction in the log
62 1
#[derive(PartialEq, Debug, Clone)]
63
pub enum RecoverStatus {
64
    /// Started but not completed
65
    Started,
66
    /// Successfully prepared
67
    PrepareCommit,
68
    /// rollback-ed
69
    Rollback,
70
    /// Successfully committed after prepared
71
    Commit,
72
    /// Successfully cleaned up resources after commit
73
    Cleanup,
74
}
75

76
/// Possible option for recover a transaction in prepared commit state
77 0
#[derive(PartialEq, Debug)]
78
pub enum CommitStatus {
79
    Rollback,
80
    Commit,
81
}
82
pub struct RecoverImpl {
83
    tx_id: HashMap<Vec<u8>, JournalId>,
84
    transactions: HashMap<JournalId, (RecoverStatus, Transaction, Option<CommitStatus>)>,
85
    order: Vec<JournalId>,
86
    journal_pages: Vec<u64>,
87
}
88

89
impl RecoverImpl {
90 1
    pub fn apply<C>(&mut self, recover: C) -> PRes<()>
91
    where
92
        C: Fn(&Vec<u8>) -> bool,
93
    {
94 1
        for (id, status) in self.list_transactions()? {
95 1
            if status == RecoverStatus::PrepareCommit {
96 0
                if recover(&id) {
97 0
                    self.commit(id)?;
98
                } else {
99 0
                    self.rollback(id)?;
100
                }
101
            }
102 1
        }
103 1
        Ok(())
104 1
    }
105

106 1
    pub fn list_transactions(&self) -> PRes<Vec<(Vec<u8>, RecoverStatus)>> {
107 1
        let mut res = Vec::new();
108 1
        for id in &self.order {
109 1
            if let Some((id, status)) = self
110
                .transactions
111
                .get(id)
112 1
                .map(|(s, tx, _)| (tx.meta_id().clone(), s.clone()))
113
            {
114 1
                res.push((id, status));
115
            }
116 1
        }
117 1
        Ok(res)
118 1
    }
119

120
    pub fn status(&self, tx_id: Vec<u8>) -> PRes<Option<RecoverStatus>> {
121
        if let Some(id) = self.tx_id.get(&tx_id) {
122
            Ok(self.transactions.get(id).map(|(s, _, _)| s.clone()))
123
        } else {
124
            Ok(None)
125
        }
126
    }
127

128 0
    pub fn commit(&mut self, tx_id: Vec<u8>) -> PRes<()> {
129 0
        if let Some(id) = self.tx_id.get(&tx_id) {
130 0
            if let Some(tx) = self.transactions.get_mut(id) {
131 0
                tx.2 = Some(CommitStatus::Commit);
132
            }
133
        }
134 0
        Ok(())
135 0
    }
136

137 0
    pub fn rollback(&mut self, tx_id: Vec<u8>) -> PRes<()> {
138 0
        if let Some(id) = self.tx_id.get(&tx_id) {
139 0
            if let Some(tx) = self.transactions.get_mut(id) {
140 0
                tx.2 = Some(CommitStatus::Rollback);
141
            }
142
        }
143 0
        Ok(())
144 0
    }
145
}
146

147
#[derive(Clone)]
148
/// Index definition details
149
pub struct IndexInfo {
150
    pub id: IndexId,
151
    pub value_mode: ValueMode,
152
    pub key_type: IndexTypeId,
153
    pub value_type: IndexTypeId,
154
}
155

156
#[cfg(feature = "background_ops")]
157 1
fn create_background_ops(
158
    journal: Arc<Journal>,
159
    allocator: Arc<Allocator>,
160
    snapshots: Arc<Snapshots>,
161
) -> PRes<BackgroundOps<SnapshotId>> {
162 1
    let all_sync = allocator.clone();
163 1
    BackgroundOps::new(
164 1
        move || all_sync.disc().sync(),
165 1
        move |all: &[SnapshotId]| {
166 1
            for snap in all {
167 1
                release_snapshot(*snap, &snapshots, &allocator, &journal)?;
168
            }
169 1
            Ok(())
170 1
        },
171
    )
172 0
}
173

174
impl PersyImpl {
175 1
    pub fn create(path: &Path) -> PRes<()> {
176 1
        let f = OpenOptions::new().write(true).read(true).create_new(true).open(path)?;
177 1
        PersyImpl::create_from_file(f)
178 1
    }
179

180 1
    pub fn create_from_file(f: File) -> PRes<()> {
181 1
        f.try_lock_exclusive()?;
182 1
        PersyImpl::init_file(f)?;
183 1
        Ok(())
184 1
    }
185

186 1
    fn init_file(fl: File) -> PRes<()> {
187 1
        PersyImpl::init(Box::new(DiscRef::new(fl)?))?;
188 1
        Ok(())
189 1
    }
190

191 1
    fn init(device: Box<dyn Device>) -> PRes<Box<dyn Device>> {
192
        // root_page is every time 0
193 1
        let root_page = device.create_page_raw(DEFAULT_PAGE_EXP)?;
194 1
        let (allocator_page, allocator) = Allocator::init(device, &Config::new())?;
195 1
        let address_page = Address::init(&allocator)?;
196 1
        let journal_page = Journal::init(&allocator)?;
197
        {
198 1
            let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
199
            // Version of the disc format
200 1
            root.write_u16(0);
201
            // Position of the start of address structure
202 1
            root.write_u64(address_page);
203
            // Start of the Log data, if shutdown well this will be every time 0
204 1
            root.write_u64(journal_page);
205 1
            root.write_u64(allocator_page);
206 1
            allocator.flush_page(root)?;
207
            // TODO: check this never go over the first page
208 1
        }
209 1
        allocator.disc().sync()?;
210 1
        Ok(allocator.release())
211 1
    }
212

213 1
    fn new(disc: Box<dyn Device>, config: Config) -> PRes<PersyImpl> {
214 1
        let address_page;
215
        let journal_page;
216
        let allocator_page;
217
        {
218 1
            let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
219 1
            pg.read_u16(); //THIS NOW is 0 all the times
220 1
            address_page = pg.read_u64();
221 1
            journal_page = pg.read_u64();
222 1
            allocator_page = pg.read_u64();
223 1
        }
224 1
        let config = Arc::new(config);
225 1
        let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
226 1
        let address = Address::new(&allocator, &config, address_page)?;
227 1
        let journal = Arc::new(Journal::new(&allocator, journal_page)?);
228 1
        let indexes = Indexes::new(&config);
229 1
        let snapshots = Arc::new(Snapshots::new());
230
        #[cfg(feature = "background_ops")]
231 1
        let background_ops = create_background_ops(journal.clone(), allocator.clone(), snapshots.clone())?;
232 1
        Ok(PersyImpl {
233 1
            config,
234 1
            journal,
235 1
            address,
236 1
            indexes,
237 1
            allocator,
238 1
            snapshots,
239
            #[cfg(feature = "background_ops")]
240 1
            background_ops,
241
        })
242 1
    }
243

244 1
    fn recover(&self) -> PRes<RecoverImpl> {
245 1
        let mut commit_order = Vec::new();
246 1
        let mut transactions = HashMap::new();
247 1
        let journal = &self.journal;
248 1
        let jp = journal.recover(|record, id| {
249 1
            let tx = transactions
250 1
                .entry(id.clone())
251 1
                .or_insert_with(|| (RecoverStatus::Started, Transaction::recover(id.clone()), None));
252 1
            tx.0 = match record.recover(&mut tx.1) {
253 1
                Err(_) => RecoverStatus::Rollback,
254 1
                Ok(_) if tx.0 == RecoverStatus::Rollback => RecoverStatus::Rollback,
255 1
                Ok(x) => match x {
256 1
                    RecoverStatus::Started => RecoverStatus::Started,
257
                    RecoverStatus::PrepareCommit => {
258 1
                        commit_order.push(id.clone());
259 1
                        RecoverStatus::PrepareCommit
260
                    }
261 1
                    RecoverStatus::Rollback => RecoverStatus::Rollback,
262
                    RecoverStatus::Commit => {
263 1
                        commit_order.push(id.clone());
264 1
                        RecoverStatus::Commit
265
                    }
266 1
                    RecoverStatus::Cleanup => RecoverStatus::Cleanup,
267
                },
268
            }
269 1
        })?;
270

271 1
        let mut transactions_id = HashMap::new();
272 1
        for (id, (_, tx, _)) in &transactions {
273 1
            transactions_id.insert(tx.meta_id().clone(), id.clone());
274
        }
275 1
        Ok(RecoverImpl {
276 1
            tx_id: transactions_id,
277 1
            transactions,
278 1
            order: commit_order,
279 1
            journal_pages: jp,
280
        })
281 1
    }
282 1
    pub fn final_recover(&self, mut recover: RecoverImpl) -> PRes<()> {
283 1
        let mut last_id = None;
284 1
        let allocator = &self.allocator;
285 1
        for id in recover.order {
286 1
            if let Some((status, mut tx, choosed)) = recover.transactions.remove(&id) {
287 1
                if status == RecoverStatus::PrepareCommit {
288 0
                    if choosed == Some(CommitStatus::Commit) || choosed.is_none() {
289 0
                        let prepared = tx.recover_prepare(self)?;
290 0
                        tx.recover_commit(self, prepared)?;
291 0
                        last_id = Some(id);
292 0
                    } else {
293 0
                        tx.recover_rollback(self)?;
294
                    }
295 1
                } else if status == RecoverStatus::Commit {
296 1
                    tx.recover_cleanup(self)?;
297
                }
298 1
            }
299 1
        }
300 1
        for p in recover.journal_pages {
301 1
            allocator.remove_from_free(p, JOURNAL_PAGE_EXP)?;
302 1
        }
303

304 1
        for (_, (_, tx, _)) in recover.transactions.iter_mut() {
305 1
            tx.recover_rollback(self)?;
306
        }
307 1
        if let Some(id) = last_id {
308 0
            self.journal.finished_to_clean(&[id])?;
309
        }
310 1
        allocator.trim_free_at_end()?;
311 1
        Ok(())
312 1
    }
313

314 1
    pub fn open_recover(f: File, config: Config) -> PRes<(PersyImpl, RecoverImpl)> {
315 1
        f.try_lock_exclusive()?;
316 1
        let persy = PersyImpl::new(Box::new(DiscRef::new(f)?), config)?;
317 1
        let rec = persy.recover()?;
318 1
        Ok((persy, rec))
319 1
    }
320

321 1
    pub fn memory(config: Config) -> PRes<PersyImpl> {
322 1
        let device = PersyImpl::init(Box::new(MemRef::new()?))?;
323 1
        PersyImpl::new(device, config)
324 1
    }
325

326 1
    pub fn begin_with(&self, mut config: TransactionConfig) -> PRes<Transaction> {
327 1
        let journal = &self.journal;
328 1
        let strategy = if let Some(st) = config.tx_strategy {
329 0
            st
330
        } else {
331 1
            self.config.tx_strategy().clone()
332
        };
333 1
        let meta_id = if let Some(id) = replace(&mut config.transaction_id, None) {
334 0
            id
335
        } else {
336 1
            Vec::new()
337 1
        };
338 1
        let sync_mode = if Some(true) == config.background_sync {
339 1
            SyncMode::BackgroundSync
340
        } else {
341 1
            SyncMode::Sync
342
        };
343

344 1
        Ok(Transaction::new(journal, &strategy, sync_mode, meta_id)?)
345 1
    }
346

347 1
    pub fn create_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<SegmentId> {
348 1
        match tx.exists_segment(segment) {
349 1
            DROPPED => {}
350
            CREATED(_) => {
351 0
                return Err(PersyError::SegmentAlreadyExists);
352
            }
353
            NONE => {
354 1
                if self.address.exists_segment(&segment)? {
355 0
                    return Err(PersyError::SegmentAlreadyExists);
356
                }
357
            }
358
        }
359 1
        let (segment_id, first_segment_page) = self.address.create_temp_segment(segment)?;
360 1
        tx.add_create_segment(&self.journal, segment, segment_id, first_segment_page)?;
361 1
        Ok(SegmentId::new(segment_id))
362 1
    }
363

364 1
    pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
365 1
        let (_, segment_id) = self.check_segment_tx(tx, segment)?;
366 1
        tx.add_drop_segment(&self.journal, segment, segment_id)?;
367 1
        Ok(())
368 1
    }
369

370 1
    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
371 1
        self.address.exists_segment(segment)
372 1
    }
373

374 1
    pub fn exists_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<bool> {
375 1
        match tx.exists_segment(segment) {
376 1
            DROPPED => Ok(false),
377 1
            CREATED(_) => Ok(true),
378 0
            NONE => self.address.exists_segment(segment),
379
        }
380 1
    }
381 1
    pub fn exists_index(&self, index: &str) -> PRes<bool> {
382 1
        self.exists_segment(&format!("{}{}", INDEX_META_PREFIX, index))
383 0
    }
384

385 1
    pub fn exists_index_tx(&self, tx: &Transaction, index: &str) -> PRes<bool> {
386 1
        self.exists_segment_tx(tx, &format!("{}{}", INDEX_META_PREFIX, index))
387 0
    }
388

389 1
    pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> PRes<SegmentId> {
390 1
        segment.to_segment_id(&self.address)
391 1
    }
392

393 1
    pub fn solve_segment_id_tx(&self, tx: &Transaction, segment: impl ToSegmentId) -> PRes<SegmentId> {
394 1
        let (sid, _) = segment.to_segment_id_tx(self, tx)?;
395 1
        Ok(sid)
396 1
    }
397

398 1
    pub fn solve_segment_id_snapshot(&self, snapshot: SnapshotId, segment: impl ToSegmentId) -> PRes<SegmentId> {
399 1
        segment.to_segment_id_snapshot(&self.snapshots, snapshot)
400 1
    }
401

402 1
    pub fn solve_index_id(&self, index: impl ToIndexId) -> PRes<IndexId> {
403 1
        index.to_index_id(&self.address)
404 1
    }
405

406 1
    pub fn solve_index_id_tx(&self, tx: &Transaction, index: impl ToIndexId) -> PRes<(IndexId, bool)> {
407 1
        index.to_index_id_tx(self, tx)
408 1
    }
409

410 1
    pub fn solve_index_id_snapshot(&self, snapshot: SnapshotId, index: impl ToIndexId) -> PRes<IndexId> {
411 1
        index.to_index_id_snapshot(&self.snapshots, snapshot)
412 1
    }
413

414
    /// check if a segment exist persistent or in tx.
415
    ///
416
    /// @return true if the segment was created in tx.
417 1
    pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, u32)> {
418 1
        match tx.exists_segment(segment) {
419 1
            DROPPED => Err(PersyError::SegmentNotFound),
420 1
            CREATED(segment_id) => Ok((true, segment_id)),
421 1
            NONE => self
422
                .address
423 1
                .segment_id(segment)?
424 1
                .map_or(Err(PersyError::SegmentNotFound), |id| Ok((false, id))),
425
        }
426 1
    }
427

428 1
    pub fn write_record_metadata(len: u64, id: &RecRef) -> Vec<u8> {
429 1
        let mut val = Vec::new();
430 1
        val.write_varint_u64(len);
431 1
        id.write(&mut val);
432
        val
433 0
    }
434 1
    pub fn read_record_metadata(meta: &mut dyn InfallibleRead) -> (u64, RecRef) {
435 1
        let len = meta.read_varint_u64();
436 1
        let id = RecRef::read(meta);
437 1
        (len, id)
438 1
    }
439

440 1
    pub fn insert_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, rec: &[u8]) -> PRes<RecRef> {
441 1
        let (segment_id, in_tx) = segment.to_segment_id_tx(self, tx)?;
442 1
        let len = rec.len() as u64;
443 1
        let allocator = &self.allocator;
444 1
        let address = &self.address;
445 1
        let (rec_ref, maybe_new_page) = if in_tx {
446 1
            address.allocate_temp(segment_id.id)
447
        } else {
448 1
            address.allocate(segment_id.id)
449 1
        }?;
450 1
        let metadata = PersyImpl::write_record_metadata(len, &rec_ref);
451 1
        let allocation_exp = exp_from_content_size(len + metadata.len() as u64);
452 1
        let mut pg = allocator.allocate(allocation_exp)?;
453 1
        let page = pg.get_index();
454 1
        tx.add_insert(&self.journal, segment_id.id, &rec_ref, page)?;
455 1
        if let Some(new_page) = maybe_new_page {
456 1
            tx.add_new_segment_page(&self.journal, segment_id.id, new_page.new_page, new_page.previus_page)?;
457
        }
458 1
        pg.write_all(&metadata);
459 1
        pg.write_all(rec);
460 1
        allocator.flush_page(pg)?;
461 1
        Ok(rec_ref)
462 1
    }
463

464 1
    fn read_snapshot(&self) -> PRes<SnapshotId> {
465 1
        self.snapshots.current_snapshot()
466 1
    }
467

468 1
    pub fn snapshot(&self) -> PRes<SnapshotId> {
469 1
        let snapshot_id = self.snapshots.read_snapshot()?;
470 1
        let segs = self
471
            .address
472 1
            .snapshot_list()?
473
            .into_iter()
474 1
            .map(|(name, id, first_page)| SegmentSnapshop::new(&name, SegmentId::new(id), first_page))
475 0
            .collect::<Vec<_>>();
476 1
        self.snapshots.fill_segments(snapshot_id, &segs)?;
477 1
        Ok(snapshot_id)
478 1
    }
479

480 1
    pub fn release_snapshot(&self, snapshot_id: SnapshotId) -> PRes<()> {
481 1
        release_snapshot(snapshot_id, &self.snapshots, &self.allocator, &self.journal)
482 1
    }
483

484 1
    fn read_ref_segment(&self, tx: &Transaction, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<(u64, u16, u32)>> {
485 1
        Ok(match tx.read(rec_ref) {
486 1
            TxRead::RECORD(rec) => Some((rec.0, rec.1, segment_id)),
487 1
            TxRead::DELETED => None,
488 1
            TxRead::NONE => self
489
                .address
490 1
                .read(rec_ref, segment_id)?
491 1
                .map(|(pos, version)| (pos, version, segment_id)),
492
        })
493 1
    }
494

495 1
    fn read_page(&self, match_id: &RecRef, page: u64) -> PRes<Option<Vec<u8>>> {
496 1
        self.read_page_fn(match_id, page, |x| Vec::from(x))
497 1
    }
498

499 1
    fn read_page_fn<T, F>(&self, match_id: &RecRef, page: u64, f: F) -> PRes<Option<T>>
500
    where
501
        F: Fn(&[u8]) -> T,
502
    {
503 1
        if let Some(mut pg) = self.allocator.load_page_not_free(page)? {
504 1
            let (len, id) = PersyImpl::read_record_metadata(&mut pg);
505 1
            if id.page == match_id.page && id.pos == match_id.pos {
506 1
                Ok(Some(f(pg.slice(len as usize))))
507
            } else {
508 0
                Ok(None)
509
            }
510 1
        } else {
511 0
            Ok(None)
512
        }
513 1
    }
514

515 1
    pub fn read_tx_internal_fn<T, F>(
516
        &self,
517
        tx: &Transaction,
518
        segment_id: SegmentId,
519
        id: &RecRef,
520
        f: F,
521
    ) -> PRes<Option<(T, u16)>>
522
    where
523
        F: Fn(&[u8]) -> T,
524
    {
525 0
        loop {
526 1
            if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id.id, id)? {
527 1
                if let Some(record) = self.read_page_fn(id, page, &f)? {
528 1
                    break Ok(Some((record, version)));
529
                }
530 0
            } else {
531 1
                break Ok(None);
532
            }
533 0
        }
534 1
    }
535

536 1
    pub fn read_tx_internal(
537
        &self,
538
        tx: &Transaction,
539
        segment_id: SegmentId,
540
        id: &RecRef,
541
    ) -> PRes<Option<(Vec<u8>, u16)>> {
542 1
        self.read_tx_internal_fn(tx, segment_id, id, |x| Vec::from(x))
543 1
    }
544

545 1
    pub fn read_tx(&self, tx: &mut Transaction, segment: SegmentId, id: &RecRef) -> PRes<Option<Vec<u8>>> {
546 1
        if let Some((rec, version)) = self.read_tx_internal(tx, segment, id)? {
547 1
            tx.add_read(&self.journal, segment.id, id, version)?;
548 1
            Ok(Some(rec))
549 0
        } else {
550 1
            Ok(None)
551
        }
552 1
    }
553

554 1
    pub fn lock_record(
555
        &self,
556
        tx: &mut Transaction,
557
        segment: impl ToSegmentId,
558
        id: &RecRef,
559
        version: u16,
560
    ) -> PRes<bool> {
561 1
        let segment_id = segment.to_segment_id(&self.address)?.id;
562 1
        tx.lock_record(&self.address, segment_id, id, version)
563 1
    }
564

565 1
    pub fn unlock_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, id: &RecRef) -> PRes<()> {
566 1
        let segment_id = segment.to_segment_id(&self.address)?.id;
567 1
        tx.unlock_record(&self.address, segment_id, id)
568 1
    }
569

570 1
    pub fn read(&self, segment: SegmentId, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
571 0
        loop {
572 1
            if let Some((page, _)) = self.address.read(rec_ref, segment.id)? {
573 1
                if let Some(record) = self.read_page(rec_ref, page)? {
574 1
                    break Ok(Some(record));
575
                }
576 0
            } else {
577 1
                break Ok(None);
578
            }
579
        }
580 1
    }
581

582 1
    pub fn read_snap(&self, segment: SegmentId, rec_ref: &RecRef, snapshot: SnapshotId) -> PRes<Option<Vec<u8>>> {
583 1
        self.read_snap_fn(segment, rec_ref, snapshot, |x| Vec::from(x))
584 1
    }
585

586 1
    pub fn read_snap_fn<T, F>(
587
        &self,
588
        segment: SegmentId,
589
        rec_ref: &RecRef,
590
        snapshot: SnapshotId,
591
        f: F,
592
    ) -> PRes<Option<T>>
593
    where
594
        F: Fn(&[u8]) -> T,
595
    {
596 1
        let segment_id = segment.id;
597 1
        loop {
598 1
            if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
599 0
                match rec_vers.case {
600 1
                    EntryCase::Change(change) => {
601 1
                        if let Some(record) = self.read_page_fn(rec_ref, change.pos, &f)? {
602 1
                            break Ok(Some(record));
603
                        }
604 0
                    }
605
                    EntryCase::Insert => {
606 1
                        break Ok(None);
607
                    }
608
                }
609 1
            } else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
610 1
                if let Some(record) = self.read_page_fn(rec_ref, page, &f)? {
611 1
                    break Ok(Some(record));
612
                }
613 0
            } else {
614 0
                break Ok(None);
615 0
            }
616 0
        }
617 1
    }
618

619 1
    pub fn scan(&self, segment: SegmentId) -> PRes<SegmentRawIter> {
620 1
        let read_snapshot = self.read_snapshot()?;
621 1
        Ok(SegmentRawIter::new(
622
            segment,
623 1
            self.address.scan(segment.id)?,
624
            read_snapshot,
625
        ))
626 1
    }
627

628 1
    pub fn scan_snapshot(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
629 1
        let res = self.snapshots.scan(snapshot, segment_id)?.unwrap();
630 1
        Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
631 1
    }
632

633 1
    pub fn scan_tx<'a>(&'a self, tx: &'a Transaction, segment_id: SegmentId) -> PRes<TxSegmentRawIter> {
634 1
        let read_snapshot = self.read_snapshot()?;
635 1
        Ok(TxSegmentRawIter::new(
636
            tx,
637
            segment_id,
638 1
            self.address.scan(segment_id.id)?,
639
            read_snapshot,
640
        ))
641 1
    }
642

643 1
    pub fn update(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef, rec: &[u8]) -> PRes<()> {
644 1
        if let Some((_, version, segment)) = self.read_ref_segment(tx, segment.id, rec_ref)? {
645 1
            let allocator = &self.allocator;
646 1
            let journal = &self.journal;
647 1
            let len = rec.len();
648 1
            let metadata = PersyImpl::write_record_metadata(len as u64, &rec_ref);
649 1
            let allocation_exp = exp_from_content_size((len + metadata.len()) as u64);
650 1
            let mut pg = allocator.allocate(allocation_exp)?;
651 1
            let page = pg.get_index();
652 1
            tx.add_update(journal, segment, &rec_ref, page, version)?;
653 1
            pg.write_all(&metadata);
654 1
            pg.write_all(rec);
655 1
            allocator.flush_page(pg)
656 1
        } else {
657 1
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
658
        }
659 1
    }
660

661 1
    pub fn delete(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef) -> PRes<()> {
662 1
        if let Some((_, version, seg)) = self.read_ref_segment(tx, segment.id, rec_ref)? {
663 1
            tx.add_delete(&self.journal, seg, &rec_ref, version)
664
        } else {
665 0
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
666
        }
667 1
    }
668

669 1
    pub fn rollback(&self, mut tx: Transaction) -> PRes<()> {
670 1
        tx.rollback(self)
671 1
    }
672

673 1
    pub fn prepare(&self, tx: Transaction) -> PRes<TxFinalize> {
674 1
        let (tx, prepared) = tx.prepare(self)?;
675

676 1
        Ok(TxFinalize {
677 1
            transaction: tx,
678 1
            prepared,
679
        })
680 1
    }
681

682 1
    pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PRes<()> {
683 1
        let prepared = finalizer.prepared.clone();
684 1
        let tx = &mut finalizer.transaction;
685 1
        tx.rollback_prepared(self, prepared)
686 1
    }
687

688 1
    pub fn commit(&self, finalizer: &mut TxFinalize) -> PRes<()> {
689 1
        let prepared = finalizer.prepared.clone();
690 1
        let tx = &mut finalizer.transaction;
691 1
        tx.commit(self, prepared)
692 1
    }
693

694 1
    pub fn create_index<K, V>(&self, tx: &mut Transaction, index_name: &str, value_mode: ValueMode) -> PRes<()>
695
    where
696
        K: IndexType,
697
        V: IndexType,
698
    {
699 1
        Indexes::create_index::<K, V>(self, tx, index_name, 32, 128, value_mode)
700 1
    }
701

702 1
    pub fn drop_index(&self, tx: &mut Transaction, index_name: &str) -> PRes<()> {
703 1
        Indexes::drop_index(self, tx, index_name)
704 1
    }
705

706 1
    pub fn put<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: V) -> PRes<()>
707
    where
708
        K: IndexType,
709
        V: IndexType,
710
    {
711 1
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
712 1
        tx.add_put(index_id, k, v);
713 1
        Ok(())
714 1
    }
715

716 1
    pub fn remove<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: Option<V>) -> PRes<()>
717
    where
718
        K: IndexType,
719
        V: IndexType,
720
    {
721 1
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
722 1
        tx.add_remove(index_id, k, v);
723 1
        Ok(())
724 1
    }
725

726 1
    pub fn get_tx<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
727
    where
728
        K: IndexType,
729
        V: IndexType,
730
    {
731 1
        let (result, vm) = {
732 1
            let mut ik = Indexes::get_index_keeper_tx::<K, V>(self, tx, &index_id)?;
733 1
            self.indexes.read_lock(index_id.clone())?;
734 1
            (ik.get(k)?, IndexKeeper::<K, V>::value_mode(&ik))
735 1
        };
736 1
        self.indexes.read_unlock(index_id.clone())?;
737 1
        tx.apply_changes::<K, V>(vm, index_id, k, result)
738 1
    }
739

740 1
    pub fn get<K, V>(&self, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
741
    where
742
        K: IndexType,
743
        V: IndexType,
744
    {
745 1
        let read_snapshot = self.snapshot()?;
746 1
        let r = self.get_snapshot(index_id, read_snapshot, k);
747 1
        release_snapshot(read_snapshot, &self.snapshots, &self.allocator, &self.journal)?;
748 1
        r
749 1
    }
750

751 1
    pub fn get_snapshot<K, V>(&self, index_id: IndexId, snapshot: SnapshotId, k: &K) -> PRes<Option<Value<V>>>
752
    where
753
        K: IndexType,
754
        V: IndexType,
755
    {
756 1
        Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?.get(k)
757 1
    }
758

759 1
    pub fn index_next<K, V>(
760
        &self,
761
        index_id: &IndexId,
762
        read_snapshot: SnapshotId,
763
        next: Bound<&K>,
764
    ) -> PRes<PageIter<K, V>>
765
    where
766
        K: IndexType,
767
        V: IndexType,
768
    {
769 1
        Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.iter_from(next)
770 1
    }
771

772 1
    pub fn index_back<K, V>(
773
        &self,
774
        index_id: &IndexId,
775
        read_snapshot: SnapshotId,
776
        next: Bound<&K>,
777
    ) -> PRes<PageIterBack<K, V>>
778
    where
779
        K: IndexType,
780
        V: IndexType,
781
    {
782 1
        Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.back_iter_from(next)
783 1
    }
784

785 1
    pub fn range<K, V, R>(&self, index_id: IndexId, range: R) -> PRes<(ValueMode, IndexRawIter<K, V>)>
786
    where
787
        K: IndexType,
788
        V: IndexType,
789
        R: RangeBounds<K>,
790
    {
791 1
        let read_snapshot = self.snapshot()?;
792 1
        self.range_snapshot(index_id, read_snapshot, range, true)
793 1
    }
794

795 1
    pub fn range_snapshot<K, V, R>(
796
        &self,
797
        index_id: IndexId,
798
        snapshot: SnapshotId,
799
        range: R,
800
        release_snapshot: bool,
801
    ) -> PRes<(ValueMode, IndexRawIter<K, V>)>
802
    where
803
        K: IndexType,
804
        V: IndexType,
805
        R: RangeBounds<K>,
806
    {
807 1
        let mut ik = Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?;
808 1
        let after = ik.iter_from(range.start_bound())?;
809 1
        let before = ik.back_iter_from(range.end_bound())?;
810 1
        Ok((
811 1
            IndexKeeper::<K, V>::value_mode(&ik),
812 1
            IndexRawIter::new(index_id, snapshot, after, before, release_snapshot),
813
        ))
814 1
    }
815

816
    pub fn segment_name_tx(&self, tx: &Transaction, id: u32) -> PRes<Option<(String, bool)>> {
817
        if tx.segment_created_in_tx(id) {
818
            Ok(tx.segment_name_by_id(id).map(|x| (x, true)))
819
        } else {
820
            self.address.segment_name_by_id(id).map(|k| k.map(|x| (x, false)))
821
        }
822
    }
823

824 1
    pub fn list_segments(&self) -> PRes<Vec<(String, u32)>> {
825 1
        Ok(self
826
            .address
827 1
            .list()?
828
            .into_iter()
829 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
830
            .collect())
831 1
    }
832

833 1
    pub fn list_segments_snapshot(&self, snapshot_id: SnapshotId) -> PRes<Vec<(String, SegmentId)>> {
834 1
        let list = self.snapshots.list(snapshot_id)?;
835 1
        Ok(list
836
            .into_iter()
837 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
838
            .collect())
839 1
    }
840

841 1
    pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
842 1
        let snapshot = self.snapshot()?;
843 1
        let res = self.list_indexes_snapshot(snapshot);
844 1
        release_snapshot(snapshot, &self.snapshots, &self.allocator, &self.journal)?;
845 1
        res
846 1
    }
847 1
    pub fn list_indexes_snapshot(&self, snapshot: SnapshotId) -> PRes<Vec<(String, IndexInfo)>> {
848 1
        let list = self.snapshots.list(snapshot)?;
849 1
        list.into_iter()
850 1
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
851 1
            .map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
852 1
                name.drain(..INDEX_META_PREFIX.len());
853 1
                let info = self.index_info(snapshot, &name)?;
854 1
                Ok((name, info))
855 1
            })
856
            .collect()
857 1
    }
858

859 1
    pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, u32)>> {
860 1
        Ok(tx
861 1
            .filter_list(&self.address.list()?)
862 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
863 1
            .map(|(name, id)| (name.to_string(), id))
864
            .collect())
865 1
    }
866

867 1
    pub fn list_indexes_tx(&self, tx: &Transaction) -> PRes<Vec<(String, IndexInfo)>> {
868 1
        tx.filter_list(&self.address.list()?)
869 1
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
870 1
            .map(|(name, id)| (name.to_string(), id))
871 1
            .map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
872 1
                name.drain(..INDEX_META_PREFIX.len());
873 1
                let info = self.index_info_tx(tx, &name)?;
874 1
                Ok((name, info))
875 1
            })
876
            .collect()
877 1
    }
878

879 1
    fn index_info(&self, snapshot: SnapshotId, name: &str) -> PRes<IndexInfo> {
880 1
        let id = self.solve_index_id_snapshot(snapshot, name)?;
881 1
        let index = Indexes::get_index(self, snapshot, &id)?;
882 1
        Ok(IndexInfo {
883 1
            id,
884 1
            value_mode: index.value_mode,
885 1
            key_type: IndexTypeId::from(index.key_type),
886 1
            value_type: IndexTypeId::from(index.value_type),
887
        })
888 1
    }
889 1
    fn index_info_tx(&self, tx: &Transaction, name: &str) -> PRes<IndexInfo> {
890 1
        let id = self.solve_index_id_tx(tx, name)?.0;
891 1
        let (index, _version) = Indexes::get_index_tx(self, tx, &id)?;
892 1
        Ok(IndexInfo {
893 1
            id,
894 1
            value_mode: index.value_mode,
895 1
            key_type: IndexTypeId::from(index.key_type),
896 1
            value_type: IndexTypeId::from(index.value_type),
897
        })
898 1
    }
899

900 1
    pub(crate) fn journal(&self) -> &Journal {
901 1
        &self.journal
902 1
    }
903

904 1
    pub(crate) fn address(&self) -> &Address {
905 1
        &self.address
906 1
    }
907

908 1
    pub(crate) fn allocator(&self) -> &Allocator {
909 1
        &self.allocator
910 1
    }
911

912 1
    pub(crate) fn indexes(&self) -> &Indexes {
913 1
        &self.indexes
914 1
    }
915

916 1
    pub(crate) fn snapshots(&self) -> &Snapshots {
917 1
        &self.snapshots
918 1
    }
919

920
    #[cfg(feature = "background_ops")]
921 1
    pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, snapshot_id: SnapshotId) -> PRes<()> {
922 1
        match sync_mode {
923 1
            SyncMode::Sync => {
924 1
                let allocator = self.allocator();
925 1
                allocator.disc().sync()
926
            }
927
            SyncMode::BackgroundSync => {
928 1
                self.snapshots.acquire(snapshot_id)?;
929 1
                self.background_ops.add_pending(snapshot_id)
930
            }
931
        }
932 1
    }
933

934
    #[cfg(not(feature = "background_ops"))]
935
    pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, _snapshot_id: SnapshotId) -> PRes<()> {
936
        match sync_mode {
937
            SyncMode::Sync => {
938
                let allocator = self.allocator();
939
                allocator.disc().sync()
940
            }
941
            SyncMode::BackgroundSync => unreachable!(),
942
        }
943
    }
944
}
945

946 1
pub fn exp_from_content_size(size: u64) -> u8 {
947
    // content + size + match_pointer:page+ match_pointer:pos  + page_header
948 1
    let final_size = size + u64::from(PAGE_METADATA_SIZE);
949 1
    let final_size = u64::max(final_size, 32);
950
    // Should be there a better way, so far is OK.
951 1
    let mut res: u8 = 1;
952 1
    loop {
953 1
        if final_size < (1 << res) {
954
            return res;
955
        }
956 1
        res += 1;
957
    }
958 1
}

Read our documentation on viewing source code .

Loading