1
use fs2::FileExt;
2
pub use std::fs::OpenOptions;
3
use std::{
4
    collections::HashMap,
5
    fs::File,
6
    mem::replace,
7
    ops::{Bound, RangeBounds},
8
    path::Path,
9
    str,
10
    sync::Arc,
11
};
12

13
pub use crate::id::RecRef;
14
use crate::index::{
15
    config::{IndexType, IndexTypeId, Indexes, ValueMode, INDEX_DATA_PREFIX, INDEX_META_PREFIX},
16
    keeper::{IndexKeeper, IndexRawIter},
17
    nodes::{PageIter, PageIterBack, Value},
18
    tree::Index,
19
};
20
use crate::transaction::{
21
    PreparedState, SyncMode, Transaction, TxRead,
22
    TxSegCheck::{CREATED, DROPPED, NONE},
23
};
24
use crate::{
25
    address::Address,
26
    allocator::Allocator,
27
    config::{Config, TransactionConfig},
28
    discref::{Device, DiscRef, MemRef, PageOps, PAGE_METADATA_SIZE},
29
    error::{PRes, PersyError},
30
    id::{IndexId, PersyId, SegmentId, ToIndexId, ToSegmentId},
31
    io::{
32
        InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite, InfallibleWriteFormat,
33
        InfallibleWriteVarInt,
34
    },
35
    journal::{Journal, JournalId, JOURNAL_PAGE_EXP},
36
    record_scanner::{SegmentRawIter, SegmentSnapshotRawIter, TxSegmentRawIter},
37
    snapshot::{release_snapshot, EntryCase, SegmentSnapshop, SnapshotId, Snapshots},
38
};
39

40
#[cfg(feature = "background_ops")]
41
use crate::background::BackgroundOps;
42

43
const DEFAULT_PAGE_EXP: u8 = 10; // 2^10
44

45
pub struct PersyImpl {
46
    config: Arc<Config>,
47
    journal: Arc<Journal>,
48
    address: Address,
49
    indexes: Indexes,
50
    allocator: Arc<Allocator>,
51
    snapshots: Arc<Snapshots>,
52
    #[cfg(feature = "background_ops")]
53
    background_ops: BackgroundOps<SnapshotId>,
54
}
55

56
pub struct TxFinalize {
57
    transaction: Transaction,
58
    prepared: PreparedState,
59
}
60

61
/// Possible state of a transaction in the log
62 1
#[derive(PartialEq, Debug, Clone)]
63
pub enum RecoverStatus {
64
    /// Started but not completed
65
    Started,
66
    /// Successfully prepared
67
    PrepareCommit,
68
    /// rollback-ed
69
    Rollback,
70
    /// Successfully committed after prepared
71
    Commit,
72
    /// Successfully cleaned up resources after commit
73
    Cleanup,
74
}
75

76
/// Possible option for recover a transaction in prepared commit state
77 0
#[derive(PartialEq, Debug)]
78
pub enum CommitStatus {
79
    Rollback,
80
    Commit,
81
}
82
pub struct RecoverImpl {
83
    tx_id: HashMap<Vec<u8>, JournalId>,
84
    transactions: HashMap<JournalId, (RecoverStatus, Transaction, Option<CommitStatus>)>,
85
    order: Vec<JournalId>,
86
    journal_pages: Vec<u64>,
87
}
88

89
impl RecoverImpl {
90 1
    pub fn apply<C>(&mut self, recover: C) -> PRes<()>
91
    where
92
        C: Fn(&Vec<u8>) -> bool,
93
    {
94 1
        for (id, status) in self.list_transactions()? {
95 1
            if status == RecoverStatus::PrepareCommit {
96 0
                if recover(&id) {
97 0
                    self.commit(id)?;
98
                } else {
99 0
                    self.rollback(id)?;
100
                }
101
            }
102 1
        }
103 1
        Ok(())
104 1
    }
105

106 1
    pub fn list_transactions(&self) -> PRes<Vec<(Vec<u8>, RecoverStatus)>> {
107 1
        let mut res = Vec::new();
108 1
        for id in &self.order {
109 1
            if let Some((id, status)) = self
110
                .transactions
111
                .get(id)
112 1
                .map(|(s, tx, _)| (tx.meta_id().clone(), s.clone()))
113
            {
114 1
                res.push((id, status));
115
            }
116 1
        }
117 1
        Ok(res)
118 1
    }
119

120
    pub fn status(&self, tx_id: Vec<u8>) -> PRes<Option<RecoverStatus>> {
121
        if let Some(id) = self.tx_id.get(&tx_id) {
122
            Ok(self.transactions.get(id).map(|(s, _, _)| s.clone()))
123
        } else {
124
            Ok(None)
125
        }
126
    }
127

128 0
    pub fn commit(&mut self, tx_id: Vec<u8>) -> PRes<()> {
129 0
        if let Some(id) = self.tx_id.get(&tx_id) {
130 0
            if let Some(tx) = self.transactions.get_mut(id) {
131 0
                tx.2 = Some(CommitStatus::Commit);
132
            }
133
        }
134 0
        Ok(())
135 0
    }
136

137 0
    pub fn rollback(&mut self, tx_id: Vec<u8>) -> PRes<()> {
138 0
        if let Some(id) = self.tx_id.get(&tx_id) {
139 0
            if let Some(tx) = self.transactions.get_mut(id) {
140 0
                tx.2 = Some(CommitStatus::Rollback);
141
            }
142
        }
143 0
        Ok(())
144 0
    }
145
}
146

147
#[derive(Clone)]
148
/// Index definition details
149
pub struct IndexInfo {
150
    pub id: IndexId,
151
    pub value_mode: ValueMode,
152
    pub key_type: IndexTypeId,
153
    pub value_type: IndexTypeId,
154
}
155

156
#[cfg(feature = "background_ops")]
157 1
fn create_background_ops(
158
    journal: Arc<Journal>,
159
    allocator: Arc<Allocator>,
160
    snapshots: Arc<Snapshots>,
161
) -> PRes<BackgroundOps<SnapshotId>> {
162 1
    let all_sync = allocator.clone();
163 1
    BackgroundOps::new(
164 1
        move || all_sync.disc().sync(),
165 1
        move |all: &[SnapshotId]| {
166 1
            for snap in all {
167 1
                release_snapshot(*snap, &snapshots, &allocator, &journal)?;
168
            }
169 1
            Ok(())
170 1
        },
171
    )
172 0
}
173

174
impl PersyImpl {
175 1
    pub fn create(path: &Path) -> PRes<()> {
176 1
        let f = OpenOptions::new().write(true).read(true).create_new(true).open(path)?;
177 1
        PersyImpl::create_from_file(f)
178 1
    }
179

180 1
    pub fn create_from_file(f: File) -> PRes<()> {
181 1
        f.try_lock_exclusive()?;
182 1
        PersyImpl::init_file(f)?;
183 1
        Ok(())
184 1
    }
185

186 1
    fn init_file(fl: File) -> PRes<()> {
187 1
        PersyImpl::init(Box::new(DiscRef::new(fl)?))?;
188 1
        Ok(())
189 1
    }
190

191 1
    fn init(device: Box<dyn Device>) -> PRes<Box<dyn Device>> {
192
        // root_page is every time 0
193 1
        let root_page = device.create_page_raw(DEFAULT_PAGE_EXP)?;
194 1
        let (allocator_page, allocator) = Allocator::init(device, &Config::new())?;
195 1
        let address_page = Address::init(&allocator)?;
196 1
        let journal_page = Journal::init(&allocator)?;
197
        {
198 1
            let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
199
            // Version of the disc format
200 1
            root.write_u16(0);
201
            // Position of the start of address structure
202 1
            root.write_u64(address_page);
203
            // Start of the Log data, if shutdown well this will be every time 0
204 1
            root.write_u64(journal_page);
205 1
            root.write_u64(allocator_page);
206 1
            allocator.flush_page(root)?;
207
            // TODO: check this never go over the first page
208 1
        }
209 1
        allocator.disc().sync()?;
210 1
        Ok(allocator.release())
211 1
    }
212

213 1
    fn new(disc: Box<dyn Device>, config: Config) -> PRes<PersyImpl> {
214 1
        let address_page;
215
        let journal_page;
216
        let allocator_page;
217
        {
218 1
            let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
219 1
            pg.read_u16(); //THIS NOW is 0 all the times
220 1
            address_page = pg.read_u64();
221 1
            journal_page = pg.read_u64();
222 1
            allocator_page = pg.read_u64();
223 1
        }
224 1
        let config = Arc::new(config);
225 1
        let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
226 1
        let address = Address::new(&allocator, &config, address_page)?;
227 1
        let journal = Arc::new(Journal::new(&allocator, journal_page)?);
228 1
        let indexes = Indexes::new(&config);
229 1
        let snapshots = Arc::new(Snapshots::new());
230
        #[cfg(feature = "background_ops")]
231 1
        let background_ops = create_background_ops(journal.clone(), allocator.clone(), snapshots.clone())?;
232 1
        Ok(PersyImpl {
233 1
            config,
234 1
            journal,
235 1
            address,
236 1
            indexes,
237 1
            allocator,
238 1
            snapshots,
239
            #[cfg(feature = "background_ops")]
240 1
            background_ops,
241
        })
242 1
    }
243

244 1
    fn recover(&self) -> PRes<RecoverImpl> {
245 1
        let mut commit_order = Vec::new();
246 1
        let mut transactions = HashMap::new();
247 1
        let journal = &self.journal;
248 1
        let jp = journal.recover(|record, id| {
249 1
            let tx = transactions
250 1
                .entry(id.clone())
251 1
                .or_insert_with(|| (RecoverStatus::Started, Transaction::recover(id.clone()), None));
252 1
            tx.0 = match record.recover(&mut tx.1) {
253 1
                Err(_) => RecoverStatus::Rollback,
254 1
                Ok(_) if tx.0 == RecoverStatus::Rollback => RecoverStatus::Rollback,
255 1
                Ok(x) => match x {
256 1
                    RecoverStatus::Started => RecoverStatus::Started,
257
                    RecoverStatus::PrepareCommit => {
258 1
                        commit_order.push(id.clone());
259 1
                        RecoverStatus::PrepareCommit
260
                    }
261 1
                    RecoverStatus::Rollback => RecoverStatus::Rollback,
262
                    RecoverStatus::Commit => {
263 1
                        commit_order.push(id.clone());
264 1
                        RecoverStatus::Commit
265
                    }
266 1
                    RecoverStatus::Cleanup => RecoverStatus::Cleanup,
267
                },
268
            }
269 1
        })?;
270

271 1
        let mut transactions_id = HashMap::new();
272 1
        for (id, (_, tx, _)) in &transactions {
273 1
            transactions_id.insert(tx.meta_id().clone(), id.clone());
274
        }
275 1
        Ok(RecoverImpl {
276 1
            tx_id: transactions_id,
277 1
            transactions,
278 1
            order: commit_order,
279 1
            journal_pages: jp,
280
        })
281 1
    }
282 1
    pub fn final_recover(&self, mut recover: RecoverImpl) -> PRes<()> {
283 1
        let mut last_id = None;
284 1
        let allocator = &self.allocator;
285 1
        for id in recover.order {
286 1
            if let Some((status, mut tx, choosed)) = recover.transactions.remove(&id) {
287 1
                if status == RecoverStatus::PrepareCommit {
288 0
                    if choosed == Some(CommitStatus::Commit) || choosed.is_none() {
289 0
                        let prepared = tx.recover_prepare(self)?;
290 0
                        tx.recover_commit(self, prepared)?;
291 0
                        last_id = Some(id);
292 0
                    } else {
293 0
                        tx.recover_rollback(self)?;
294
                    }
295 1
                } else if status == RecoverStatus::Commit {
296 1
                    tx.recover_cleanup(self)?;
297
                }
298 1
            }
299 1
        }
300 1
        for p in recover.journal_pages {
301 1
            allocator.remove_from_free(p, JOURNAL_PAGE_EXP)?;
302 1
        }
303

304 1
        for (_, (_, tx, _)) in recover.transactions.iter_mut() {
305 1
            tx.recover_rollback(self)?;
306
        }
307 1
        if let Some(id) = last_id {
308 0
            self.journal.finished_to_clean(&[id])?;
309
        }
310 1
        allocator.trim_free_at_end()?;
311 1
        Ok(())
312 1
    }
313

314 1
    pub fn open_recover(f: File, config: Config) -> PRes<(PersyImpl, RecoverImpl)> {
315 1
        f.try_lock_exclusive()?;
316 1
        let persy = PersyImpl::new(Box::new(DiscRef::new(f)?), config)?;
317 1
        let rec = persy.recover()?;
318 1
        Ok((persy, rec))
319 1
    }
320

321 1
    pub fn memory(config: Config) -> PRes<PersyImpl> {
322 1
        let device = PersyImpl::init(Box::new(MemRef::new()?))?;
323 1
        PersyImpl::new(device, config)
324 1
    }
325

326 1
    pub fn begin_with(&self, mut config: TransactionConfig) -> PRes<Transaction> {
327 1
        let journal = &self.journal;
328 1
        let strategy = if let Some(st) = config.tx_strategy {
329 0
            st
330
        } else {
331 1
            self.config.tx_strategy().clone()
332
        };
333 1
        let meta_id = if let Some(id) = replace(&mut config.transaction_id, None) {
334 0
            id
335
        } else {
336 1
            Vec::new()
337 1
        };
338 1
        let sync_mode = if Some(true) == config.background_sync {
339 1
            SyncMode::BackgroundSync
340
        } else {
341 1
            SyncMode::Sync
342
        };
343

344 1
        Ok(Transaction::new(journal, &strategy, sync_mode, meta_id)?)
345 1
    }
346

347 1
    pub fn create_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<SegmentId> {
348 1
        match tx.exists_segment(segment) {
349 1
            DROPPED => {}
350
            CREATED(_) => {
351 0
                return Err(PersyError::SegmentAlreadyExists);
352
            }
353
            NONE => {
354 1
                if self.address.exists_segment(&segment)? {
355 0
                    return Err(PersyError::SegmentAlreadyExists);
356
                }
357
            }
358
        }
359 1
        let (segment_id, first_segment_page) = self.address.create_temp_segment(segment)?;
360 1
        tx.add_create_segment(&self.journal, segment, segment_id, first_segment_page)?;
361 1
        Ok(segment_id)
362 1
    }
363

364 1
    pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
365 1
        let (_, segment_id) = self.check_segment_tx(tx, segment)?;
366 1
        tx.add_drop_segment(&self.journal, segment, segment_id)?;
367 1
        Ok(())
368 1
    }
369

370 1
    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
371 1
        self.address.exists_segment(segment)
372 1
    }
373

374 1
    pub fn exists_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<bool> {
375 1
        match tx.exists_segment(segment) {
376 1
            DROPPED => Ok(false),
377 1
            CREATED(_) => Ok(true),
378 0
            NONE => self.address.exists_segment(segment),
379
        }
380 1
    }
381 1
    pub fn exists_index(&self, index: &str) -> PRes<bool> {
382 1
        self.exists_segment(&format!("{}{}", INDEX_META_PREFIX, index))
383 0
    }
384

385 1
    pub fn exists_index_tx(&self, tx: &Transaction, index: &str) -> PRes<bool> {
386 1
        self.exists_segment_tx(tx, &format!("{}{}", INDEX_META_PREFIX, index))
387 0
    }
388

389 1
    pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> PRes<SegmentId> {
390 1
        segment.to_segment_id(&self.address)
391 1
    }
392

393 1
    pub fn solve_segment_id_tx(&self, tx: &Transaction, segment: impl ToSegmentId) -> PRes<SegmentId> {
394 1
        let (sid, _) = segment.to_segment_id_tx(self, tx)?;
395 1
        Ok(sid)
396 1
    }
397

398 1
    pub fn solve_segment_id_snapshot(&self, snapshot: SnapshotId, segment: impl ToSegmentId) -> PRes<SegmentId> {
399 1
        segment.to_segment_id_snapshot(&self.snapshots, snapshot)
400 1
    }
401

402 1
    pub fn solve_index_id(&self, index: impl ToIndexId) -> PRes<IndexId> {
403 1
        index.to_index_id(&self.address)
404 1
    }
405

406 1
    pub fn solve_index_id_tx(&self, tx: &Transaction, index: impl ToIndexId) -> PRes<(IndexId, bool)> {
407 1
        index.to_index_id_tx(self, tx)
408 1
    }
409

410 1
    pub fn solve_index_id_snapshot(&self, snapshot: SnapshotId, index: impl ToIndexId) -> PRes<IndexId> {
411 1
        index.to_index_id_snapshot(&self.snapshots, snapshot)
412 1
    }
413

414
    /// check if a segment exist persistent or in tx.
415
    ///
416
    /// @return true if the segment was created in tx.
417 1
    pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, SegmentId)> {
418 1
        match tx.exists_segment(segment) {
419 1
            DROPPED => Err(PersyError::SegmentNotFound),
420 1
            CREATED(segment_id) => Ok((true, segment_id)),
421 1
            NONE => self
422
                .address
423 1
                .segment_id(segment)?
424 1
                .map_or(Err(PersyError::SegmentNotFound), |id| Ok((false, id))),
425
        }
426 1
    }
427

428 1
    pub fn write_record_metadata(len: u64, id: &RecRef) -> Vec<u8> {
429 1
        let mut val = Vec::new();
430 1
        val.write_u8(0);
431 1
        val.write_varint_u64(len);
432 1
        id.write(&mut val);
433
        val
434 1
    }
435 1
    pub fn read_record_metadata(meta: &mut dyn InfallibleRead) -> (u64, RecRef) {
436 1
        let _metadata_version = meta.read_u8();
437 1
        let len = meta.read_varint_u64();
438 1
        let id = RecRef::read(meta);
439 1
        (len, id)
440 1
    }
441

442 1
    pub fn insert_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, rec: &[u8]) -> PRes<RecRef> {
443 1
        let (segment_id, in_tx) = segment.to_segment_id_tx(self, tx)?;
444 1
        let len = rec.len() as u64;
445 1
        let allocator = &self.allocator;
446 1
        let address = &self.address;
447 1
        let (rec_ref, maybe_new_page) = if in_tx {
448 1
            address.allocate_temp(segment_id)
449
        } else {
450 1
            address.allocate(segment_id)
451 1
        }?;
452 1
        let metadata = PersyImpl::write_record_metadata(len, &rec_ref);
453 1
        let allocation_exp = exp_from_content_size(len + metadata.len() as u64);
454 1
        let mut pg = allocator.allocate(allocation_exp)?;
455 1
        let page = pg.get_index();
456 1
        tx.add_insert(&self.journal, segment_id, &rec_ref, page)?;
457 1
        if let Some(new_page) = maybe_new_page {
458 1
            tx.add_new_segment_page(&self.journal, segment_id, new_page.new_page, new_page.previus_page)?;
459
        }
460 1
        pg.write_all(&metadata);
461 1
        pg.write_all(rec);
462 1
        allocator.flush_page(pg)?;
463 1
        Ok(rec_ref)
464 1
    }
465

466 1
    fn read_snapshot(&self) -> PRes<SnapshotId> {
467 1
        self.snapshots.current_snapshot()
468 1
    }
469

470 1
    pub fn snapshot(&self) -> PRes<SnapshotId> {
471 1
        let snapshot_id = self.snapshots.read_snapshot()?;
472 1
        let segs = self
473
            .address
474 1
            .snapshot_list()?
475
            .into_iter()
476 1
            .map(|(name, id, first_page)| SegmentSnapshop::new(&name, id, first_page))
477 0
            .collect::<Vec<_>>();
478 1
        self.snapshots.fill_segments(snapshot_id, &segs)?;
479 1
        Ok(snapshot_id)
480 1
    }
481

482 1
    pub fn release_snapshot(&self, snapshot_id: SnapshotId) -> PRes<()> {
483 1
        release_snapshot(snapshot_id, &self.snapshots, &self.allocator, &self.journal)
484 1
    }
485

486 1
    fn read_ref_segment(
487
        &self,
488
        tx: &Transaction,
489
        segment_id: SegmentId,
490
        rec_ref: &RecRef,
491
    ) -> PRes<Option<(u64, u16, SegmentId)>> {
492 1
        Ok(match tx.read(rec_ref) {
493 1
            TxRead::RECORD(rec) => Some((rec.0, rec.1, segment_id)),
494 1
            TxRead::DELETED => None,
495 1
            TxRead::NONE => self
496
                .address
497 1
                .read(rec_ref, segment_id)?
498 1
                .map(|(pos, version)| (pos, version, segment_id)),
499
        })
500 1
    }
501

502 1
    fn read_page(&self, match_id: &RecRef, page: u64) -> PRes<Option<Vec<u8>>> {
503 1
        self.read_page_fn(match_id, page, |x| Vec::from(x))
504 1
    }
505

506 1
    fn read_page_fn<T, F>(&self, match_id: &RecRef, page: u64, f: F) -> PRes<Option<T>>
507
    where
508
        F: Fn(&[u8]) -> T,
509
    {
510 1
        if let Some(mut pg) = self.allocator.load_page_not_free(page)? {
511 1
            let (len, id) = PersyImpl::read_record_metadata(&mut pg);
512 1
            if id.page == match_id.page && id.pos == match_id.pos {
513 1
                Ok(Some(f(pg.slice(len as usize))))
514
            } else {
515 0
                Ok(None)
516
            }
517 1
        } else {
518 0
            Ok(None)
519
        }
520 1
    }
521

522 1
    pub fn read_tx_internal_fn<T, F>(
523
        &self,
524
        tx: &Transaction,
525
        segment_id: SegmentId,
526
        id: &RecRef,
527
        f: F,
528
    ) -> PRes<Option<(T, u16)>>
529
    where
530
        F: Fn(&[u8]) -> T,
531
    {
532 0
        loop {
533 1
            if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id, id)? {
534 1
                if let Some(record) = self.read_page_fn(id, page, &f)? {
535 1
                    break Ok(Some((record, version)));
536
                }
537 0
            } else {
538 1
                break Ok(None);
539
            }
540 0
        }
541 1
    }
542

543 1
    pub fn read_tx_internal(
544
        &self,
545
        tx: &Transaction,
546
        segment_id: SegmentId,
547
        id: &RecRef,
548
    ) -> PRes<Option<(Vec<u8>, u16)>> {
549 1
        self.read_tx_internal_fn(tx, segment_id, id, |x| Vec::from(x))
550 1
    }
551

552 1
    pub fn read_tx(&self, tx: &mut Transaction, segment: SegmentId, id: &RecRef) -> PRes<Option<Vec<u8>>> {
553 1
        if let Some((rec, version)) = self.read_tx_internal(tx, segment, id)? {
554 1
            tx.add_read(&self.journal, segment, id, version)?;
555 1
            Ok(Some(rec))
556 0
        } else {
557 1
            Ok(None)
558
        }
559 1
    }
560

561 1
    pub fn lock_record(
562
        &self,
563
        tx: &mut Transaction,
564
        segment: impl ToSegmentId,
565
        id: &RecRef,
566
        version: u16,
567
    ) -> PRes<bool> {
568 1
        let segment_id = segment.to_segment_id(&self.address)?;
569 1
        tx.lock_record(&self.address, segment_id, id, version)
570 1
    }
571

572 1
    pub fn unlock_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, id: &RecRef) -> PRes<()> {
573 1
        let segment_id = segment.to_segment_id(&self.address)?;
574 1
        tx.unlock_record(&self.address, segment_id, id)
575 1
    }
576

577 1
    pub fn read(&self, segment: SegmentId, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
578 0
        loop {
579 1
            if let Some((page, _)) = self.address.read(rec_ref, segment)? {
580 1
                if let Some(record) = self.read_page(rec_ref, page)? {
581 1
                    break Ok(Some(record));
582
                }
583 0
            } else {
584 1
                break Ok(None);
585
            }
586
        }
587 1
    }
588

589 1
    pub fn read_snap(&self, segment: SegmentId, rec_ref: &RecRef, snapshot: SnapshotId) -> PRes<Option<Vec<u8>>> {
590 1
        self.read_snap_fn(segment, rec_ref, snapshot, |x| Vec::from(x))
591 1
    }
592

593 1
    pub fn read_snap_fn<T, F>(
594
        &self,
595
        segment: SegmentId,
596
        rec_ref: &RecRef,
597
        snapshot: SnapshotId,
598
        f: F,
599
    ) -> PRes<Option<T>>
600
    where
601
        F: Fn(&[u8]) -> T,
602
    {
603 1
        let segment_id = segment;
604 1
        loop {
605 1
            if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
606 0
                match rec_vers.case {
607 1
                    EntryCase::Change(change) => {
608 1
                        if let Some(record) = self.read_page_fn(rec_ref, change.pos, &f)? {
609 1
                            break Ok(Some(record));
610
                        }
611 0
                    }
612
                    EntryCase::Insert => {
613 1
                        break Ok(None);
614
                    }
615
                }
616 1
            } else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
617 1
                if let Some(record) = self.read_page_fn(rec_ref, page, &f)? {
618 1
                    break Ok(Some(record));
619
                }
620 0
            } else {
621 1
                break Ok(None);
622 0
            }
623 0
        }
624 1
    }
625

626 1
    pub fn scan(&self, segment: SegmentId) -> PRes<SegmentRawIter> {
627 1
        let read_snapshot = self.read_snapshot()?;
628 1
        Ok(SegmentRawIter::new(segment, self.address.scan(segment)?, read_snapshot))
629 1
    }
630

631 1
    pub fn scan_snapshot_index(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
632 1
        let res = if let Some(r) = self.snapshots.scan(snapshot, segment_id)? {
633 1
            r
634
        } else {
635 1
            self.address.scan(segment_id)?
636 0
        };
637 1
        Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
638 1
    }
639

640 1
    pub fn scan_snapshot(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
641 1
        let res = self.snapshots.scan(snapshot, segment_id)?.unwrap();
642 1
        Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
643 1
    }
644

645 1
    pub fn scan_tx<'a>(&'a self, tx: &'a Transaction, segment_id: SegmentId) -> PRes<TxSegmentRawIter> {
646 1
        let read_snapshot = self.read_snapshot()?;
647 1
        Ok(TxSegmentRawIter::new(
648
            tx,
649
            segment_id,
650 1
            self.address.scan(segment_id)?,
651
            read_snapshot,
652
        ))
653 1
    }
654

655 1
    pub fn update(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef, rec: &[u8]) -> PRes<()> {
656 1
        if let Some((_, version, segment)) = self.read_ref_segment(tx, segment, rec_ref)? {
657 1
            let allocator = &self.allocator;
658 1
            let journal = &self.journal;
659 1
            let len = rec.len();
660 1
            let metadata = PersyImpl::write_record_metadata(len as u64, &rec_ref);
661 1
            let allocation_exp = exp_from_content_size((len + metadata.len()) as u64);
662 1
            let mut pg = allocator.allocate(allocation_exp)?;
663 1
            let page = pg.get_index();
664 1
            tx.add_update(journal, segment, &rec_ref, page, version)?;
665 1
            pg.write_all(&metadata);
666 1
            pg.write_all(rec);
667 1
            allocator.flush_page(pg)
668 1
        } else {
669 1
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
670
        }
671 1
    }
672

673 1
    pub fn delete(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef) -> PRes<()> {
674 1
        if let Some((_, version, seg)) = self.read_ref_segment(tx, segment, rec_ref)? {
675 1
            tx.add_delete(&self.journal, seg, &rec_ref, version)
676
        } else {
677 0
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
678
        }
679 1
    }
680

681 1
    pub fn rollback(&self, mut tx: Transaction) -> PRes<()> {
682 1
        tx.rollback(self)
683 1
    }
684

685 1
    pub fn prepare(&self, tx: Transaction) -> PRes<TxFinalize> {
686 1
        let (tx, prepared) = tx.prepare(self)?;
687

688 1
        Ok(TxFinalize {
689 1
            transaction: tx,
690 1
            prepared,
691
        })
692 1
    }
693

694 1
    pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PRes<()> {
695 1
        let prepared = finalizer.prepared.clone();
696 1
        let tx = &mut finalizer.transaction;
697 1
        tx.rollback_prepared(self, prepared)
698 1
    }
699

700 1
    pub fn commit(&self, finalizer: &mut TxFinalize) -> PRes<()> {
701 1
        let prepared = finalizer.prepared.clone();
702 1
        let tx = &mut finalizer.transaction;
703 1
        tx.commit(self, prepared)
704 1
    }
705

706 1
    pub fn create_index<K, V>(&self, tx: &mut Transaction, index_name: &str, value_mode: ValueMode) -> PRes<()>
707
    where
708
        K: IndexType,
709
        V: IndexType,
710
    {
711 1
        Indexes::create_index::<K, V>(self, tx, index_name, 32, 128, value_mode)
712 1
    }
713

714 1
    pub fn drop_index(&self, tx: &mut Transaction, index_name: &str) -> PRes<()> {
715 1
        Indexes::drop_index(self, tx, index_name)
716 1
    }
717

718 1
    pub fn put<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: V) -> PRes<()>
719
    where
720
        K: IndexType,
721
        V: IndexType,
722
    {
723 1
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
724 1
        tx.add_put(index_id, k, v);
725 1
        Ok(())
726 1
    }
727

728 1
    pub fn remove<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: K, v: Option<V>) -> PRes<()>
729
    where
730
        K: IndexType,
731
        V: IndexType,
732
    {
733 1
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
734 1
        tx.add_remove(index_id, k, v);
735 1
        Ok(())
736 1
    }
737

738 1
    pub fn get_tx<K, V>(&self, tx: &mut Transaction, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
739
    where
740
        K: IndexType,
741
        V: IndexType,
742
    {
743 1
        let (result, vm) = {
744 1
            let mut ik = Indexes::get_index_keeper_tx::<K, V>(self, tx, &index_id)?;
745 1
            self.indexes.read_lock(index_id.clone())?;
746 1
            (ik.get(k)?, IndexKeeper::<K, V>::value_mode(&ik))
747 1
        };
748 1
        self.indexes.read_unlock(index_id.clone())?;
749 1
        tx.apply_changes::<K, V>(vm, index_id, k, result)
750 1
    }
751

752 1
    pub fn get<K, V>(&self, index_id: IndexId, k: &K) -> PRes<Option<Value<V>>>
753
    where
754
        K: IndexType,
755
        V: IndexType,
756
    {
757 1
        let read_snapshot = self.snapshots.read_snapshot()?;
758 1
        let r = self.get_snapshot(index_id, read_snapshot, k);
759 1
        release_snapshot(read_snapshot, &self.snapshots, &self.allocator, &self.journal)?;
760 1
        r
761 1
    }
762

763 1
    pub fn get_snapshot<K, V>(&self, index_id: IndexId, snapshot: SnapshotId, k: &K) -> PRes<Option<Value<V>>>
764
    where
765
        K: IndexType,
766
        V: IndexType,
767
    {
768 1
        Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?.get(k)
769 1
    }
770

771 1
    pub fn index_next<K, V>(
772
        &self,
773
        index_id: &IndexId,
774
        read_snapshot: SnapshotId,
775
        next: Bound<&K>,
776
    ) -> PRes<PageIter<K, V>>
777
    where
778
        K: IndexType,
779
        V: IndexType,
780
    {
781 1
        Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.iter_from(next)
782 1
    }
783

784 1
    pub fn index_back<K, V>(
785
        &self,
786
        index_id: &IndexId,
787
        read_snapshot: SnapshotId,
788
        next: Bound<&K>,
789
    ) -> PRes<PageIterBack<K, V>>
790
    where
791
        K: IndexType,
792
        V: IndexType,
793
    {
794 1
        Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.back_iter_from(next)
795 1
    }
796

797 1
    pub fn range<K, V, R>(&self, index_id: IndexId, range: R) -> PRes<(ValueMode, IndexRawIter<K, V>)>
798
    where
799
        K: IndexType,
800
        V: IndexType,
801
        R: RangeBounds<K>,
802
    {
803 1
        let read_snapshot = self.snapshots.read_snapshot()?;
804 1
        self.range_snapshot(index_id, read_snapshot, range, true)
805 1
    }
806

807 1
    pub fn range_snapshot<K, V, R>(
808
        &self,
809
        index_id: IndexId,
810
        snapshot: SnapshotId,
811
        range: R,
812
        release_snapshot: bool,
813
    ) -> PRes<(ValueMode, IndexRawIter<K, V>)>
814
    where
815
        K: IndexType,
816
        V: IndexType,
817
        R: RangeBounds<K>,
818
    {
819 1
        let mut ik = Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?;
820 1
        let after = ik.iter_from(range.start_bound())?;
821 1
        let before = ik.back_iter_from(range.end_bound())?;
822 1
        Ok((
823 1
            IndexKeeper::<K, V>::value_mode(&ik),
824 1
            IndexRawIter::new(index_id, snapshot, after, before, release_snapshot),
825
        ))
826 1
    }
827

828
    pub fn segment_name_tx(&self, tx: &Transaction, id: SegmentId) -> PRes<Option<(String, bool)>> {
829
        if tx.segment_created_in_tx(id) {
830
            Ok(tx.segment_name_by_id(id).map(|x| (x, true)))
831
        } else {
832
            self.address.segment_name_by_id(id).map(|k| k.map(|x| (x, false)))
833
        }
834
    }
835

836 1
    pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
837 1
        Ok(self
838
            .address
839 1
            .list()?
840
            .into_iter()
841 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
842
            .collect())
843 1
    }
844

845 1
    pub fn list_segments_snapshot(&self, snapshot_id: SnapshotId) -> PRes<Vec<(String, SegmentId)>> {
846 1
        let list = self.snapshots.list(snapshot_id)?;
847 1
        Ok(list
848
            .into_iter()
849 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
850
            .collect())
851 1
    }
852

853 1
    pub fn list_indexes(&self) -> PRes<Vec<(String, IndexInfo)>> {
854 1
        let snapshot = self.snapshot()?;
855 1
        let res = self.list_indexes_snapshot(snapshot);
856 1
        release_snapshot(snapshot, &self.snapshots, &self.allocator, &self.journal)?;
857 1
        res
858 1
    }
859

860 1
    pub fn list_indexes_snapshot(&self, snapshot: SnapshotId) -> PRes<Vec<(String, IndexInfo)>> {
861 1
        let list = self.snapshots.list(snapshot)?;
862 1
        list.into_iter()
863 1
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
864 1
            .map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
865 1
                name.drain(..INDEX_META_PREFIX.len());
866 1
                let info = self.index_info(snapshot, &name)?;
867 1
                Ok((name, info))
868 1
            })
869
            .collect()
870 1
    }
871

872 1
    pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, SegmentId)>> {
873 1
        Ok(tx
874 1
            .filter_list(&self.address.list()?)
875 1
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))
876 1
            .map(|(name, id)| (name.to_string(), id))
877
            .collect())
878 1
    }
879

880 1
    pub fn list_indexes_tx(&self, tx: &Transaction) -> PRes<Vec<(String, IndexInfo)>> {
881 1
        tx.filter_list(&self.address.list()?)
882 1
            .filter(|(name, _)| name.starts_with(INDEX_META_PREFIX))
883 1
            .map(|(name, id)| (name.to_string(), id))
884 1
            .map(|(mut name, _id)| -> PRes<(String, IndexInfo)> {
885 1
                name.drain(..INDEX_META_PREFIX.len());
886 1
                let info = self.index_info_tx(tx, &name)?;
887 1
                Ok((name, info))
888 1
            })
889
            .collect()
890 1
    }
891

892 1
    fn index_info(&self, snapshot: SnapshotId, name: &str) -> PRes<IndexInfo> {
893 1
        let id = self.solve_index_id_snapshot(snapshot, name)?;
894 1
        let index = Indexes::get_index(self, snapshot, &id)?;
895 1
        Ok(IndexInfo {
896 1
            id,
897 1
            value_mode: index.value_mode,
898 1
            key_type: IndexTypeId::from(index.key_type),
899 1
            value_type: IndexTypeId::from(index.value_type),
900
        })
901 1
    }
902 1
    fn index_info_tx(&self, tx: &Transaction, name: &str) -> PRes<IndexInfo> {
903 1
        let id = self.solve_index_id_tx(tx, name)?.0;
904 1
        let (index, _version) = Indexes::get_index_tx(self, tx, &id)?;
905 1
        Ok(IndexInfo {
906 1
            id,
907 1
            value_mode: index.value_mode,
908 1
            key_type: IndexTypeId::from(index.key_type),
909 1
            value_type: IndexTypeId::from(index.value_type),
910
        })
911 1
    }
912

913 1
    pub(crate) fn journal(&self) -> &Journal {
914 1
        &self.journal
915 1
    }
916

917 1
    pub(crate) fn address(&self) -> &Address {
918 1
        &self.address
919 1
    }
920

921 1
    pub(crate) fn allocator(&self) -> &Allocator {
922 1
        &self.allocator
923 1
    }
924

925 1
    pub(crate) fn indexes(&self) -> &Indexes {
926 1
        &self.indexes
927 1
    }
928

929 1
    pub(crate) fn snapshots(&self) -> &Snapshots {
930 1
        &self.snapshots
931 1
    }
932

933
    #[cfg(feature = "background_ops")]
934 1
    pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, snapshot_id: SnapshotId) -> PRes<()> {
935 1
        match sync_mode {
936 1
            SyncMode::Sync => {
937 1
                let allocator = self.allocator();
938 1
                allocator.disc().sync()
939
            }
940
            SyncMode::BackgroundSync => {
941 1
                self.snapshots.acquire(snapshot_id)?;
942 1
                self.background_ops.add_pending(snapshot_id)
943
            }
944
        }
945 1
    }
946

947
    #[cfg(not(feature = "background_ops"))]
948
    pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, _snapshot_id: SnapshotId) -> PRes<()> {
949
        match sync_mode {
950
            SyncMode::Sync => {
951
                let allocator = self.allocator();
952
                allocator.disc().sync()
953
            }
954
            SyncMode::BackgroundSync => unreachable!(),
955
        }
956
    }
957
}
958

959 1
pub fn exp_from_content_size(size: u64) -> u8 {
960
    // content + size + match_pointer:page+ match_pointer:pos  + page_header
961 1
    let final_size = size + u64::from(PAGE_METADATA_SIZE);
962 1
    let final_size = u64::max(final_size, 32);
963
    // Should be there a better way, so far is OK.
964 1
    let mut res: u8 = 1;
965 1
    loop {
966 1
        if final_size < (1 << res) {
967
            return res;
968
        }
969 1
        res += 1;
970
    }
971 1
}

Read our documentation on viewing source code .

Loading