Compare 7ec3992 ... +5 ... 1eb8dc2


@@ -34,7 +34,7 @@
Loading
34 34
        InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite, InfallibleWriteFormat,
35 35
        InfallibleWriteVarInt,
36 36
    },
37 -
    journal::{Journal, JournalId, JOURNAL_PAGE_EXP},
37 +
    journal::{Journal, JournalId},
38 38
    record_scanner::{SegmentRawIter, SegmentSnapshotRawIter, TxSegmentRawIter},
39 39
    snapshot::{release_snapshot, EntryCase, SegmentSnapshop, SnapshotId, Snapshots},
40 40
};
@@ -85,7 +85,6 @@
Loading
85 85
    tx_id: HashMap<Vec<u8>, JournalId>,
86 86
    transactions: HashMap<JournalId, (RecoverStatus, Transaction, Option<CommitStatus>)>,
87 87
    order: Vec<JournalId>,
88 -
    journal_pages: Vec<u64>,
89 88
}
90 89
91 90
impl RecoverImpl {
@@ -247,7 +246,7 @@
Loading
247 246
        let mut commit_order = Vec::new();
248 247
        let mut transactions = HashMap::new();
249 248
        let journal = &self.journal;
250 -
        let jp = journal.recover(|record, id| {
249 +
        journal.recover(|record, id| {
251 250
            let tx = transactions
252 251
                .entry(id.clone())
253 252
                .or_insert_with(|| (RecoverStatus::Started, Transaction::recover(id.clone()), None));
@@ -278,7 +277,6 @@
Loading
278 277
            tx_id: transactions_id,
279 278
            transactions,
280 279
            order: commit_order,
281 -
            journal_pages: jp,
282 280
        })
283 281
    }
284 282
    pub fn final_recover(&self, mut recover: RecoverImpl) -> PRes<()> {
@@ -301,9 +299,6 @@
Loading
301 299
                }
302 300
            }
303 301
        }
304 -
        for p in recover.journal_pages {
305 -
            allocator.remove_from_free(p, JOURNAL_PAGE_EXP)?;
306 -
        }
307 302
        for (_, (_, tx, _)) in recover.transactions.iter_mut() {
308 303
            tx.recover_rollback(self)?;
309 304
        }

@@ -344,6 +344,10 @@
Loading
344 344
    fl.seek(SeekFrom::Start(page + 2))?;
345 345
    // Free pages are a linked list reading the next page and return
346 346
    let next = fl.read_u64()?;
347 +
    if next != 0 {
348 +
        fl.seek(SeekFrom::Start(next + 10))?;
349 +
        fl.write_u64(0)?;
350 +
    }
347 351
    fl.seek(SeekFrom::Start(page + 1))?;
348 352
    let mut moderator = fl.read_u8()?;
349 353
    moderator = free_flag_set(moderator, false);
@@ -359,6 +363,8 @@
Loading
359 363
) -> PRes<()> {
360 364
    fl.seek(SeekFrom::Start(page))?;
361 365
    let exp = fl.read_u8()?;
366 +
    let mut mitigator = fl.read_u8()?;
367 +
    debug_assert!(!is_free(mitigator), "freeing: {} already freed ", page);
362 368
    let size = (1 << exp) as u64; //EXP - (size_exp+size_mitigator);
363 369
    if page + size == fl.len()? {
364 370
        fl.set_len(page)?;
@@ -366,8 +372,6 @@
Loading
366 372
    } else {
367 373
        let old = update_list.update(exp, page)?;
368 374
        fl.seek(SeekFrom::Start(page + 1))?;
369 -
        let mut mitigator = fl.read_u8()?;
370 -
        debug_assert!(!is_free(mitigator), "freeing: {} already freed ", page);
371 375
        mitigator = free_flag_set(mitigator, true);
372 376
        let mut data: [u8; 9] = [0; 9];
373 377
        data[0] = mitigator;
@@ -405,20 +409,20 @@
Loading
405 409
    if page == 0 {
406 410
        return Ok(false);
407 411
    }
408 -
    fl.seek(SeekFrom::Start(page + 1))?;
412 +
    fl.seek(SeekFrom::Start(page))?;
409 413
    fl.read_exact(&mut header)?;
410 -
    if is_free(header[0]) {
414 +
    if is_free(header[1]) {
411 415
        let next = read_u64(&header[2..10]);
412 416
        let prev = read_u64(&header[10..18]);
413 417
        if prev == 0 {
414 418
            update_list.remove(exp, page, next)?;
415 419
        } else {
416 -
            let mut data = [8u8];
420 +
            let mut data = [0u8; 8];
417 421
            write_u64(&mut data, next);
418 -
            fl.seek(SeekFrom::Start(prev + 10))?;
422 +
            fl.seek(SeekFrom::Start(prev + 2))?;
419 423
            fl.write_all(&data)?;
420 424
            write_u64(&mut data, prev);
421 -
            fl.seek(SeekFrom::Start(next + 2))?;
425 +
            fl.seek(SeekFrom::Start(next + 10))?;
422 426
            fl.write_all(&data)?;
423 427
        }
424 428
        fl.set_len(page)?;
@@ -479,18 +483,18 @@
Loading
479 483
        if page == 0 {
480 484
            return Ok(false);
481 485
        }
482 -
        self.file.read_exact_at(&mut header, page + 1)?;
483 -
        if is_free(header[0]) {
486 +
        self.file.read_exact_at(&mut header, page)?;
487 +
        if is_free(header[1]) {
484 488
            let next = read_u64(&header[2..10]);
485 489
            let prev = read_u64(&header[10..18]);
486 490
            if prev == 0 {
487 491
                update_list.remove(exp[0], page, next)?;
488 492
            } else {
489 -
                let mut data = [8u8];
493 +
                let mut data = [0u8; 8];
490 494
                write_u64(&mut data, next);
491 -
                self.file.write_all_at(&data, prev + 10)?;
495 +
                self.file.write_all_at(&data, prev + 2)?;
492 496
                write_u64(&mut data, prev);
493 -
                self.file.write_all_at(&data, next + 2)?;
497 +
                self.file.write_all_at(&data, next + 10)?;
494 498
            }
495 499
            *lock = page;
496 500
            self.file.set_len(page)?;
@@ -554,12 +558,18 @@
Loading
554 558
        use std::os::unix::prelude::FileExt;
555 559
        let mut next_data = [0u8; 8];
556 560
        self.file.read_exact_at(&mut next_data, page + 2)?;
561 +
        let next_page = read_u64(&next_data);
562 +
        if next_page != 0 {
563 +
            let prev_data = [0u8; 8];
564 +
            self.file.write_all_at(&prev_data, next_page + 10)?;
565 +
        }
557 566
        // Free pages are a linked list reading the next page and return
558 567
        let mut mitigator = [0u8];
559 568
        self.file.read_exact_at(&mut mitigator, page + 1)?;
569 +
        debug_assert!(is_free(mitigator[0]), "allocating: {} already allocated ", page);
560 570
        mitigator[0] = free_flag_set(mitigator[0], false);
561 571
        self.file.write_all_at(&mitigator, page + 1)?;
562 -
        Ok(read_u64(&next_data))
572 +
        Ok(next_page)
563 573
    }
564 574
565 575
    fn sync(&self) -> PRes<()> {
@@ -572,11 +582,11 @@
Loading
572 582
        let mut exp = [0u8];
573 583
        self.file.read_exact_at(&mut exp, page)?;
574 584
        let size = (1 << exp[0]) as u64; //EXP - (size_exp+size_mitigator);
585 +
        let mut mitigator = [0u8];
586 +
        self.file.read_exact_at(&mut mitigator, page + 1)?;
587 +
        debug_assert!(!is_free(mitigator[0]), "freeing: {} already freed ", page);
575 588
        if !self.trim_if_possible(page, size)? {
576 589
            let old = update_list.update(exp[0], page)?;
577 -
            let mut mitigator = [0u8];
578 -
            self.file.read_exact_at(&mut mitigator, page + 1)?;
579 -
            debug_assert!(!is_free(mitigator[0]), "freeing: {} already freed ", page);
580 590
            mitigator[0] = free_flag_set(mitigator[0], true);
581 591
            let mut data: [u8; 9] = [0; 9];
582 592
            data[0] = mitigator[0];

@@ -321,15 +321,13 @@
Loading
321 321
        Ok((cur_page, cur_pos))
322 322
    }
323 323
324 -
    pub(crate) fn recover<T>(&self, mut found: T) -> PRes<Vec<u64>>
324 +
    pub(crate) fn recover<T>(&self, mut found: T) -> PRes<()>
325 325
    where
326 326
        T: FnMut(&dyn JournalEntry, &JournalId),
327 327
    {
328 -
        let mut journal_pages = Vec::new();
329 328
        let mut jr = self.journal.lock()?;
330 329
        let mut cur_page = jr.first_page;
331 330
        jr.last_page = jr.first_page;
332 -
        journal_pages.push(cur_page);
333 331
        let mut page = self.allocator.load_page(cur_page)?;
334 332
        page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
335 333
        loop {
@@ -343,7 +341,6 @@
Loading
343 341
                    jr.last_pos = last_pos - 1;
344 342
                    break;
345 343
                }
346 -
                journal_pages.push(cur_page);
347 344
                page = self.allocator.load_page(cur_page)?;
348 345
                page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
349 346
                jr.last_page = cur_page;
@@ -378,7 +375,7 @@
Loading
378 375
            }
379 376
        }
380 377
        jr.current = self.allocator.write_page(jr.last_page)?;
381 -
        Ok(journal_pages)
378 +
        Ok(())
382 379
    }
383 380
384 381
    fn required_space(&self, space: u32, jr: &mut MutexGuard<JournalShared>) -> PRes<()> {

@@ -487,6 +487,37 @@
Loading
487 487
    }
488 488
}
489 489
490 +
#[test]
491 +
pub fn test_open_close_restore_multiple_tx() {
492 +
    let file = Builder::new()
493 +
        .prefix("open_restore_close_multiple_tx")
494 +
        .suffix(".persy")
495 +
        .tempfile()
496 +
        .expect("expect temp file creation");
497 +
    Persy::create_from_file(file.reopen().unwrap()).unwrap();
498 +
    {
499 +
        let persy = Persy::open_from_file(file.reopen().unwrap(), Config::new()).unwrap();
500 +
        let mut tx = persy.begin().expect("error on transaction begin");
501 +
        tx.create_segment("def").expect("error on segment creation");
502 +
        let fin = tx.prepare().expect("error on commit prepare");
503 +
        fin.commit().expect("error on commit");
504 +
    }
505 +
    for ite in 1..20 {
506 +
        let recover = OpenOptions::new().recover_file(file.reopen().unwrap()).unwrap();
507 +
        assert!(recover.list_transactions().expect("can list transactions").len() <= 60);
508 +
        let persy = recover.finalize().expect("open correctly");
509 +
        for _ in 0..80 {
510 +
            let mut tx = persy.begin().expect("error on transaction begin");
511 +
            let val = String::from("aaa").into_bytes();
512 +
            tx.insert("def", &val).expect("error on insert value");
513 +
            let fin = tx.prepare().expect("error on commit prepare");
514 +
            fin.commit().expect("error on commit");
515 +
        }
516 +
        let counter = persy.scan("def").expect("read persistent records ").count();
517 +
        assert_eq!(ite * 80, counter);
518 +
    }
519 +
}
520 +
490 521
#[cfg(feature = "background_ops")]
491 522
#[test]
492 523
pub fn test_background_sync() {

@@ -110,11 +110,13 @@
Loading
110 110
    pub case: EntryCase,
111 111
}
112 112
113 +
#[derive(Debug)]
113 114
pub struct InternalSnapshots {
114 115
    mapping: HashMap<RecRef, Vec<RecordVersion>>,
115 116
    active_snapshots: Vec<SnapshotData>,
116 117
    snapshot_sequence: u64,
117 118
}
119 +
118 120
impl InternalSnapshots {
119 121
    fn next_snapshot_id(&mut self) -> SnapshotId {
120 122
        //TODO: this may overflow and should start again from 0 double check overflow failures

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Learn more Showing 3 files with coverage changes found.

Changes in src/io.rs
+3
Loading file...
Changes in src/discref.rs
-1
+1
Loading file...
Changes in src/allocator.rs
-7
+7
Loading file...
Files Coverage
src 0.07% 93.41%
tests 0.01% 99.13%
Project Totals (30 files) 94.46%
Loading