Compare dd0268a ... +-1 ... dbd19a7


@@ -358,7 +358,7 @@
Loading
358 358
        }
359 359
        let (segment_id, first_segment_page) = self.address.create_temp_segment(segment)?;
360 360
        tx.add_create_segment(&self.journal, segment, segment_id, first_segment_page)?;
361 -
        Ok(SegmentId::new(segment_id))
361 +
        Ok(segment_id)
362 362
    }
363 363
364 364
    pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
@@ -414,7 +414,7 @@
Loading
414 414
    /// check if a segment exist persistent or in tx.
415 415
    ///
416 416
    /// @return true if the segment was created in tx.
417 -
    pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, u32)> {
417 +
    pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, SegmentId)> {
418 418
        match tx.exists_segment(segment) {
419 419
            DROPPED => Err(PersyError::SegmentNotFound),
420 420
            CREATED(segment_id) => Ok((true, segment_id)),
@@ -445,17 +445,17 @@
Loading
445 445
        let allocator = &self.allocator;
446 446
        let address = &self.address;
447 447
        let (rec_ref, maybe_new_page) = if in_tx {
448 -
            address.allocate_temp(segment_id.id)
448 +
            address.allocate_temp(segment_id)
449 449
        } else {
450 -
            address.allocate(segment_id.id)
450 +
            address.allocate(segment_id)
451 451
        }?;
452 452
        let metadata = PersyImpl::write_record_metadata(len, &rec_ref);
453 453
        let allocation_exp = exp_from_content_size(len + metadata.len() as u64);
454 454
        let mut pg = allocator.allocate(allocation_exp)?;
455 455
        let page = pg.get_index();
456 -
        tx.add_insert(&self.journal, segment_id.id, &rec_ref, page)?;
456 +
        tx.add_insert(&self.journal, segment_id, &rec_ref, page)?;
457 457
        if let Some(new_page) = maybe_new_page {
458 -
            tx.add_new_segment_page(&self.journal, segment_id.id, new_page.new_page, new_page.previus_page)?;
458 +
            tx.add_new_segment_page(&self.journal, segment_id, new_page.new_page, new_page.previus_page)?;
459 459
        }
460 460
        pg.write_all(&metadata);
461 461
        pg.write_all(rec);
@@ -473,7 +473,7 @@
Loading
473 473
            .address
474 474
            .snapshot_list()?
475 475
            .into_iter()
476 -
            .map(|(name, id, first_page)| SegmentSnapshop::new(&name, SegmentId::new(id), first_page))
476 +
            .map(|(name, id, first_page)| SegmentSnapshop::new(&name, id, first_page))
477 477
            .collect::<Vec<_>>();
478 478
        self.snapshots.fill_segments(snapshot_id, &segs)?;
479 479
        Ok(snapshot_id)
@@ -483,7 +483,12 @@
Loading
483 483
        release_snapshot(snapshot_id, &self.snapshots, &self.allocator, &self.journal)
484 484
    }
485 485
486 -
    fn read_ref_segment(&self, tx: &Transaction, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<(u64, u16, u32)>> {
486 +
    fn read_ref_segment(
487 +
        &self,
488 +
        tx: &Transaction,
489 +
        segment_id: SegmentId,
490 +
        rec_ref: &RecRef,
491 +
    ) -> PRes<Option<(u64, u16, SegmentId)>> {
487 492
        Ok(match tx.read(rec_ref) {
488 493
            TxRead::RECORD(rec) => Some((rec.0, rec.1, segment_id)),
489 494
            TxRead::DELETED => None,
@@ -525,7 +530,7 @@
Loading
525 530
        F: Fn(&[u8]) -> T,
526 531
    {
527 532
        loop {
528 -
            if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id.id, id)? {
533 +
            if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id, id)? {
529 534
                if let Some(record) = self.read_page_fn(id, page, &f)? {
530 535
                    break Ok(Some((record, version)));
531 536
                }
@@ -546,7 +551,7 @@
Loading
546 551
547 552
    pub fn read_tx(&self, tx: &mut Transaction, segment: SegmentId, id: &RecRef) -> PRes<Option<Vec<u8>>> {
548 553
        if let Some((rec, version)) = self.read_tx_internal(tx, segment, id)? {
549 -
            tx.add_read(&self.journal, segment.id, id, version)?;
554 +
            tx.add_read(&self.journal, segment, id, version)?;
550 555
            Ok(Some(rec))
551 556
        } else {
552 557
            Ok(None)
@@ -560,18 +565,18 @@
Loading
560 565
        id: &RecRef,
561 566
        version: u16,
562 567
    ) -> PRes<bool> {
563 -
        let segment_id = segment.to_segment_id(&self.address)?.id;
568 +
        let segment_id = segment.to_segment_id(&self.address)?;
564 569
        tx.lock_record(&self.address, segment_id, id, version)
565 570
    }
566 571
567 572
    pub fn unlock_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, id: &RecRef) -> PRes<()> {
568 -
        let segment_id = segment.to_segment_id(&self.address)?.id;
573 +
        let segment_id = segment.to_segment_id(&self.address)?;
569 574
        tx.unlock_record(&self.address, segment_id, id)
570 575
    }
571 576
572 577
    pub fn read(&self, segment: SegmentId, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
573 578
        loop {
574 -
            if let Some((page, _)) = self.address.read(rec_ref, segment.id)? {
579 +
            if let Some((page, _)) = self.address.read(rec_ref, segment)? {
575 580
                if let Some(record) = self.read_page(rec_ref, page)? {
576 581
                    break Ok(Some(record));
577 582
                }
@@ -595,7 +600,7 @@
Loading
595 600
    where
596 601
        F: Fn(&[u8]) -> T,
597 602
    {
598 -
        let segment_id = segment.id;
603 +
        let segment_id = segment;
599 604
        loop {
600 605
            if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
601 606
                match rec_vers.case {
@@ -620,11 +625,7 @@
Loading
620 625
621 626
    pub fn scan(&self, segment: SegmentId) -> PRes<SegmentRawIter> {
622 627
        let read_snapshot = self.read_snapshot()?;
623 -
        Ok(SegmentRawIter::new(
624 -
            segment,
625 -
            self.address.scan(segment.id)?,
626 -
            read_snapshot,
627 -
        ))
628 +
        Ok(SegmentRawIter::new(segment, self.address.scan(segment)?, read_snapshot))
628 629
    }
629 630
630 631
    pub fn scan_snapshot(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
@@ -637,13 +638,13 @@
Loading
637 638
        Ok(TxSegmentRawIter::new(
638 639
            tx,
639 640
            segment_id,
640 -
            self.address.scan(segment_id.id)?,
641 +
            self.address.scan(segment_id)?,
641 642
            read_snapshot,
642 643
        ))
643 644
    }
644 645
645 646
    pub fn update(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef, rec: &[u8]) -> PRes<()> {
646 -
        if let Some((_, version, segment)) = self.read_ref_segment(tx, segment.id, rec_ref)? {
647 +
        if let Some((_, version, segment)) = self.read_ref_segment(tx, segment, rec_ref)? {
647 648
            let allocator = &self.allocator;
648 649
            let journal = &self.journal;
649 650
            let len = rec.len();
@@ -661,7 +662,7 @@
Loading
661 662
    }
662 663
663 664
    pub fn delete(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef) -> PRes<()> {
664 -
        if let Some((_, version, seg)) = self.read_ref_segment(tx, segment.id, rec_ref)? {
665 +
        if let Some((_, version, seg)) = self.read_ref_segment(tx, segment, rec_ref)? {
665 666
            tx.add_delete(&self.journal, seg, &rec_ref, version)
666 667
        } else {
667 668
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
@@ -815,7 +816,7 @@
Loading
815 816
        ))
816 817
    }
817 818
818 -
    pub fn segment_name_tx(&self, tx: &Transaction, id: u32) -> PRes<Option<(String, bool)>> {
819 +
    pub fn segment_name_tx(&self, tx: &Transaction, id: SegmentId) -> PRes<Option<(String, bool)>> {
819 820
        if tx.segment_created_in_tx(id) {
820 821
            Ok(tx.segment_name_by_id(id).map(|x| (x, true)))
821 822
        } else {
@@ -823,7 +824,7 @@
Loading
823 824
        }
824 825
    }
825 826
826 -
    pub fn list_segments(&self) -> PRes<Vec<(String, u32)>> {
827 +
    pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
827 828
        Ok(self
828 829
            .address
829 830
            .list()?
@@ -858,7 +859,7 @@
Loading
858 859
            .collect()
859 860
    }
860 861
861 -
    pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, u32)>> {
862 +
    pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, SegmentId)>> {
862 863
        Ok(tx
863 864
            .filter_list(&self.address.list()?)
864 865
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))

@@ -1029,11 +1029,11 @@
Loading
1029 1029
#[cfg(test)]
1030 1030
mod tests {
1031 1031
    use super::{ByteVec, IndexTransactionKeeper, IndexType, ValueChange};
1032 -
    use crate::id::{IndexId, PersyId, RecRef};
1032 +
    use crate::id::{IndexId, PersyId, RecRef, SegmentId};
1033 1033
    use std::fmt::Debug;
1034 1034
1035 1035
    fn keeper_test_for_type<K: IndexType + PartialEq, V: IndexType + Debug + PartialEq>(k: K, dk: K, v: V) {
1036 -
        let name = IndexId::new(30, 40);
1036 +
        let name = IndexId::new(SegmentId::new(30), SegmentId::new(40));
1037 1037
        let mut keeper = IndexTransactionKeeper::new();
1038 1038
        keeper.put(name.clone(), k.clone(), v.clone());
1039 1039
        let ret = keeper.get_changes(name.clone(), &k);

@@ -8,6 +8,7 @@
Loading
8 8
    discref::{Page, PageOps, ReadPage, PAGE_METADATA_SIZE},
9 9
    error::PRes,
10 10
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
11 +
    id::SegmentId,
11 12
    io::{
12 13
        read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite,
13 14
        InfallibleWriteFormat, InfallibleWriteVarInt,
@@ -396,7 +397,7 @@
Loading
396 397
    }
397 398
398 399
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
399 -
        buffer.write_varint_u32(self.segment);
400 +
        self.segment.write_varint(buffer);
400 401
        buffer.write_varint_u64(self.recref.page);
401 402
        buffer.write_varint_u32(self.recref.pos);
402 403
        buffer.write_varint_u16(self.version);
@@ -404,7 +405,7 @@
Loading
404 405
    }
405 406
406 407
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
407 -
        self.segment = buffer.read_varint_u32();
408 +
        self.segment = SegmentId::read_varint(buffer);
408 409
        self.recref.page = buffer.read_varint_u64();
409 410
        self.recref.pos = buffer.read_varint_u32();
410 411
        self.version = buffer.read_varint_u16();
@@ -423,7 +424,7 @@
Loading
423 424
    }
424 425
425 426
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
426 -
        buffer.write_varint_u32(self.segment);
427 +
        self.segment.write_varint(buffer);
427 428
        buffer.write_varint_u64(self.recref.page);
428 429
        buffer.write_varint_u32(self.recref.pos);
429 430
        buffer.write_varint_u16(self.version);
@@ -431,7 +432,7 @@
Loading
431 432
    }
432 433
433 434
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
434 -
        self.segment = buffer.read_varint_u32();
435 +
        self.segment = SegmentId::read_varint(buffer);
435 436
        self.recref.page = buffer.read_varint_u64();
436 437
        self.recref.pos = buffer.read_varint_u32();
437 438
        self.version = buffer.read_varint_u16();
@@ -449,7 +450,7 @@
Loading
449 450
    }
450 451
451 452
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
452 -
        buffer.write_varint_u32(self.segment);
453 +
        self.segment.write_varint(buffer);
453 454
        buffer.write_varint_u64(self.recref.page);
454 455
        buffer.write_varint_u32(self.recref.pos);
455 456
        buffer.write_varint_u64(self.record_page);
@@ -458,7 +459,7 @@
Loading
458 459
    }
459 460
460 461
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
461 -
        self.segment = buffer.read_varint_u32();
462 +
        self.segment = SegmentId::read_varint(buffer);
462 463
        self.recref.page = buffer.read_varint_u64();
463 464
        self.recref.pos = buffer.read_varint_u32();
464 465
        self.record_page = buffer.read_varint_u64();
@@ -478,7 +479,7 @@
Loading
478 479
    }
479 480
480 481
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
481 -
        buffer.write_varint_u32(self.segment);
482 +
        self.segment.write_varint(buffer);
482 483
        buffer.write_varint_u64(self.recref.page);
483 484
        buffer.write_varint_u32(self.recref.pos);
484 485
        buffer.write_varint_u64(self.record_page);
@@ -486,7 +487,7 @@
Loading
486 487
    }
487 488
488 489
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
489 -
        self.segment = buffer.read_varint_u32();
490 +
        self.segment = SegmentId::read_varint(buffer);
490 491
        self.recref.page = buffer.read_varint_u64();
491 492
        self.recref.pos = buffer.read_varint_u32();
492 493
        self.record_page = buffer.read_varint_u64();
@@ -622,7 +623,7 @@
Loading
622 623
    }
623 624
624 625
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
625 -
        buffer.write_varint_u32(self.segment_id);
626 +
        self.segment_id.write_varint(buffer);
626 627
        buffer.write_varint_u64(self.first_page);
627 628
        buffer.write_varint_u16(self.name.len() as u16);
628 629
        buffer.write_all(self.name.as_bytes());
@@ -630,7 +631,7 @@
Loading
630 631
    }
631 632
632 633
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
633 -
        self.segment_id = buffer.read_varint_u32();
634 +
        self.segment_id = SegmentId::read_varint(buffer);
634 635
        self.first_page = buffer.read_varint_u64();
635 636
        let string_size = buffer.read_varint_u16();
636 637
        let mut slice: Vec<u8> = vec![0; string_size as usize];
@@ -651,14 +652,14 @@
Loading
651 652
    }
652 653
653 654
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
654 -
        buffer.write_varint_u32(self.segment_id);
655 +
        self.segment_id.write_varint(buffer);
655 656
        buffer.write_varint_u16(self.name.len() as u16);
656 657
        buffer.write_all(self.name.as_bytes());
657 658
        Ok(())
658 659
    }
659 660
660 661
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
661 -
        self.segment_id = buffer.read_varint_u32();
662 +
        self.segment_id = SegmentId::read_varint(buffer);
662 663
        let string_size = buffer.read_varint_u16();
663 664
        let mut slice: Vec<u8> = vec![0; string_size as usize];
664 665
        buffer.read_exact(&mut slice[0..string_size as usize]);
@@ -699,14 +700,14 @@
Loading
699 700
    }
700 701
701 702
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
702 -
        buffer.write_varint_u32(self.segment);
703 +
        self.segment.write_varint(buffer);
703 704
        buffer.write_varint_u64(self.page);
704 705
        buffer.write_varint_u64(self.previous);
705 706
        Ok(())
706 707
    }
707 708
708 709
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
709 -
        self.segment = buffer.read_varint_u32();
710 +
        self.segment = SegmentId::read_varint(buffer);
710 711
        self.page = buffer.read_varint_u64();
711 712
        self.previous = buffer.read_varint_u64();
712 713
        Ok(())
@@ -739,7 +740,7 @@
Loading
739 740
        allocator::Allocator,
740 741
        config::{Config, TxStrategy},
741 742
        discref::DiscRef,
742 -
        id::RecRef,
743 +
        id::{RecRef, SegmentId},
743 744
    };
744 745
    use std::{
745 746
        io::{Cursor, Seek, SeekFrom},
@@ -775,15 +776,16 @@
Loading
775 776
    #[test]
776 777
    fn read_write_insert_record() {
777 778
        let mut buffer = Vec::<u8>::new();
778 -
        let to_write = InsertRecord::new(10, &RecRef::new(20, 10), 3);
779 +
        let seg_id = SegmentId::new(10);
780 +
        let to_write = InsertRecord::new(seg_id, &RecRef::new(20, 10), 3);
779 781
780 782
        to_write.write(&mut buffer).unwrap();
781 783
782 -
        let mut to_read = InsertRecord::new(0, &RecRef::new(0, 0), 0);
784 +
        let mut to_read = InsertRecord::default();
783 785
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
784 786
        to_read.read(&mut cursor).unwrap();
785 787
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
786 -
        assert_eq!(to_read.segment, 10);
788 +
        assert_eq!(to_read.segment, seg_id);
787 789
        assert_eq!(to_read.recref.page, 20);
788 790
        assert_eq!(to_read.recref.pos, 10);
789 791
        assert_eq!(to_read.record_page, 3);
@@ -792,15 +794,16 @@
Loading
792 794
    #[test]
793 795
    fn read_write_insert_read() {
794 796
        let mut buffer = Vec::<u8>::new();
795 -
        let to_write = ReadRecord::new(10, &RecRef::new(20, 10), 3);
797 +
        let seg_id = SegmentId::new(10);
798 +
        let to_write = ReadRecord::new(seg_id, &RecRef::new(20, 10), 3);
796 799
797 800
        to_write.write(&mut buffer).unwrap();
798 801
799 -
        let mut to_read = ReadRecord::new(0, &RecRef::new(0, 0), 0);
802 +
        let mut to_read = ReadRecord::default();
800 803
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
801 804
        to_read.read(&mut cursor).unwrap();
802 805
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
803 -
        assert_eq!(to_read.segment, 10);
806 +
        assert_eq!(to_read.segment, seg_id);
804 807
        assert_eq!(to_read.recref.page, 20);
805 808
        assert_eq!(to_read.recref.pos, 10);
806 809
        assert_eq!(to_read.version, 3);
@@ -809,15 +812,16 @@
Loading
809 812
    #[test]
810 813
    fn read_write_update_record() {
811 814
        let mut buffer = Vec::<u8>::new();
812 -
        let to_write = UpdateRecord::new(10, &RecRef::new(20, 10), 3, 1);
815 +
        let seg_id = SegmentId::new(10);
816 +
        let to_write = UpdateRecord::new(seg_id, &RecRef::new(20, 10), 3, 1);
813 817
814 818
        to_write.write(&mut buffer).unwrap();
815 819
816 -
        let mut to_read = UpdateRecord::new(0, &RecRef::new(0, 0), 0, 0);
820 +
        let mut to_read = UpdateRecord::default();
817 821
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
818 822
        to_read.read(&mut cursor).unwrap();
819 823
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
820 -
        assert_eq!(to_read.segment, 10);
824 +
        assert_eq!(to_read.segment, seg_id);
821 825
        assert_eq!(to_read.recref.page, 20);
822 826
        assert_eq!(to_read.recref.pos, 10);
823 827
        assert_eq!(to_read.record_page, 3);
@@ -827,15 +831,16 @@
Loading
827 831
    #[test]
828 832
    fn read_write_delete_record() {
829 833
        let mut buffer = Vec::<u8>::new();
830 -
        let to_write = DeleteRecord::new(10, &RecRef::new(20, 10), 1);
834 +
        let seg_id = SegmentId::new(10);
835 +
        let to_write = DeleteRecord::new(seg_id, &RecRef::new(20, 10), 1);
831 836
832 837
        to_write.write(&mut buffer).unwrap();
833 838
834 -
        let mut to_read = DeleteRecord::new(0, &RecRef::new(0, 0), 1);
839 +
        let mut to_read = DeleteRecord::default();
835 840
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
836 841
        to_read.read(&mut cursor).unwrap();
837 842
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
838 -
        assert_eq!(to_read.segment, 10);
843 +
        assert_eq!(to_read.segment, seg_id);
839 844
        assert_eq!(to_read.recref.page, 20);
840 845
        assert_eq!(to_read.recref.pos, 10);
841 846
        assert_eq!(to_read.version, 1);
@@ -844,28 +849,30 @@
Loading
844 849
    #[test]
845 850
    fn read_write_create_segment() {
846 851
        let mut buffer = Vec::<u8>::new();
847 -
        let to_write = CreateSegment::new("some", 10, 20);
852 +
        let seg_id = SegmentId::new(10);
853 +
        let to_write = CreateSegment::new("some", seg_id, 20);
848 854
        to_write.write(&mut buffer).unwrap();
849 -
        let mut to_read = CreateSegment::new("", 0, 0);
855 +
        let mut to_read = CreateSegment::default();
850 856
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
851 857
        to_read.read(&mut cursor).unwrap();
852 858
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
853 859
        assert_eq!(to_read.name, "some");
854 -
        assert_eq!(to_read.segment_id, 10);
860 +
        assert_eq!(to_read.segment_id, seg_id);
855 861
        assert_eq!(to_read.first_page, 20);
856 862
    }
857 863
858 864
    #[test]
859 865
    fn read_write_drop_segment() {
860 866
        let mut buffer = Vec::<u8>::new();
861 -
        let to_write = DropSegment::new("some", 20);
867 +
        let seg_id = SegmentId::new(20);
868 +
        let to_write = DropSegment::new("some", seg_id);
862 869
        to_write.write(&mut buffer).unwrap();
863 -
        let mut to_read = DropSegment::new("", 0);
870 +
        let mut to_read = DropSegment::default();
864 871
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
865 872
        to_read.read(&mut cursor).unwrap();
866 873
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
867 874
        assert_eq!(to_read.name, "some");
868 -
        assert_eq!(to_read.segment_id, 20);
875 +
        assert_eq!(to_read.segment_id, seg_id);
869 876
    }
870 877
871 878
    #[test]
@@ -885,13 +892,14 @@
Loading
885 892
    #[test]
886 893
    fn read_write_new_segment_page() {
887 894
        let mut buffer = Vec::<u8>::new();
888 -
        let to_write = NewSegmentPage::new(10, 20, 30);
895 +
        let seg_id = SegmentId::new(10);
896 +
        let to_write = NewSegmentPage::new(seg_id, 20, 30);
889 897
        to_write.write(&mut buffer).unwrap();
890 -
        let mut to_read = NewSegmentPage::new(0, 0, 0);
898 +
        let mut to_read = NewSegmentPage::default();
891 899
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
892 900
        to_read.read(&mut cursor).unwrap();
893 901
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
894 -
        assert_eq!(to_read.segment, 10);
902 +
        assert_eq!(to_read.segment, seg_id);
895 903
        assert_eq!(to_read.page, 20);
896 904
        assert_eq!(to_read.previous, 30);
897 905
    }
@@ -909,7 +917,8 @@
Loading
909 917
        let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
910 918
        let rp = Journal::init(&allocator).unwrap();
911 919
        let journal = Journal::new(&Arc::new(allocator), rp).unwrap();
912 -
        let rec = InsertRecord::new(10, &RecRef::new(1, 1), 1);
920 +
        let seg_id = SegmentId::new(10);
921 +
        let rec = InsertRecord::new(seg_id, &RecRef::new(1, 1), 1);
913 922
        let id = JournalId::new(1, 20);
914 923
        journal.log(&rec, &id).unwrap();
915 924
        journal.log(&rec, &id).unwrap();

@@ -7,7 +7,7 @@
Loading
7 7
    discref::{Page, PageOps},
8 8
    error::{PRes, PersyError},
9 9
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
10 -
    id::{PersyId, RecRef},
10 +
    id::{PersyId, RecRef, SegmentId},
11 11
    io::{read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
12 12
    persy::exp_from_content_size,
13 13
};
@@ -30,7 +30,7 @@
Loading
30 30
    persistent_pos: u32,
31 31
    pub alloc_page: u64,
32 32
    pub alloc_pos: u32,
33 -
    segment_id: u32,
33 +
    segment_id: SegmentId,
34 34
    name: String,
35 35
}
36 36
@@ -41,7 +41,7 @@
Loading
41 41
        persistent_pos: u32,
42 42
        alloc_page: u64,
43 43
        alloc_pos: u32,
44 -
        segment_id: u32,
44 +
        segment_id: SegmentId,
45 45
        name: &str,
46 46
    ) -> Segment {
47 47
        Segment {
@@ -130,7 +130,7 @@
Loading
130 130
}
131 131
132 132
pub(crate) trait SegmentPageRead: PageOps {
133 -
    fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>>;
133 +
    fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<Option<(u64, u16)>>;
134 134
    fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)>;
135 135
    fn segment_scan_all_entries(&mut self) -> PRes<(u64, Vec<(u32, bool)>)>;
136 136
    fn segment_first_available_pos(&mut self) -> PRes<u32>;
@@ -139,10 +139,10 @@
Loading
139 139
    fn empty(&mut self) -> PRes<bool>;
140 140
}
141 141
pub(crate) trait SegmentPage: SegmentPageRead {
142 -
    fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
143 -
    fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
144 -
    fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool>;
145 -
    fn set_segment_id(&mut self, id: u32) -> PRes<()>;
142 +
    fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()>;
143 +
    fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()>;
144 +
    fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<bool>;
145 +
    fn set_segment_id(&mut self, id: SegmentId) -> PRes<()>;
146 146
    fn set_next(&mut self, next: u64) -> PRes<()>;
147 147
    fn set_prev(&mut self, prev: u64) -> PRes<()>;
148 148
    fn recalc_count(&mut self) -> PRes<()>;
@@ -153,9 +153,9 @@
Loading
153 153
}
154 154
155 155
impl<T: InfallibleRead + PageOps> SegmentPageRead for T {
156 -
    fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>> {
156 +
    fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<Option<(u64, u16)>> {
157 157
        self.seek(SEGMENT_HASH_OFFSET)?;
158 -
        let persistent_id = self.read_u32();
158 +
        let persistent_id = SegmentId::read(self);
159 159
        if persistent_id != segment_id {
160 160
            return Ok(None);
161 161
        }
@@ -248,10 +248,10 @@
Loading
248 248
}
249 249
250 250
impl<T: InfallibleRead + InfallibleWrite + PageOps> SegmentPage for T {
251 -
    fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
251 +
    fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()> {
252 252
        debug_assert!(pos >= SEGMENT_DATA_OFFSET, "invalid page position {}", pos);
253 253
        self.seek(SEGMENT_HASH_OFFSET)?;
254 -
        let persistent_id = self.read_u32();
254 +
        let persistent_id = SegmentId::read(self);
255 255
        if persistent_id != segment_id {
256 256
            return Err(PersyError::SegmentNotFound);
257 257
        }
@@ -263,10 +263,10 @@
Loading
263 263
        self.write_u16(1);
264 264
        Ok(())
265 265
    }
266 -
    fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
266 +
    fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()> {
267 267
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
268 268
        self.seek(SEGMENT_HASH_OFFSET)?;
269 -
        let persistent_id = self.read_u32();
269 +
        let persistent_id = SegmentId::read(self);
270 270
        if persistent_id != segment_id {
271 271
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
272 272
        }
@@ -278,10 +278,10 @@
Loading
278 278
        self.write_u16(inc_version(version));
279 279
        Ok(())
280 280
    }
281 -
    fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool> {
281 +
    fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<bool> {
282 282
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
283 283
        self.seek(SEGMENT_HASH_OFFSET)?;
284 -
        let persistent_id = self.read_u32();
284 +
        let persistent_id = SegmentId::read(self);
285 285
        if persistent_id != segment_id {
286 286
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
287 287
        }
@@ -296,9 +296,9 @@
Loading
296 296
        Ok(count as u32 == ADDRESS_PAGE_ENTRY_COUNT)
297 297
    }
298 298
299 -
    fn set_segment_id(&mut self, id: u32) -> PRes<()> {
299 +
    fn set_segment_id(&mut self, id: SegmentId) -> PRes<()> {
300 300
        self.seek(SEGMENT_HASH_OFFSET)?;
301 -
        self.write_u32(id);
301 +
        id.write(self);
302 302
        Ok(())
303 303
    }
304 304
@@ -339,10 +339,10 @@
Loading
339 339
    content_page: u64,
340 340
    other_page: u64,
341 341
    last_flush: u8,
342 -
    pub segments: HashMap<u32, Segment>,
343 -
    pub segments_id: HashMap<String, u32>,
344 -
    pub temp_segments: HashMap<u32, Segment>,
345 -
    pub temp_segments_id: HashMap<String, u32>,
342 +
    pub segments: HashMap<SegmentId, Segment>,
343 +
    pub segments_id: HashMap<String, SegmentId>,
344 +
    pub temp_segments: HashMap<SegmentId, Segment>,
345 +
    pub temp_segments_id: HashMap<String, SegmentId>,
346 346
}
347 347
348 348
pub fn segment_hash(segment: &str) -> u32 {
@@ -386,7 +386,7 @@
Loading
386 386
                    let first_page = page.read_u64();
387 387
                    let persistent_page = page.read_u64();
388 388
                    let persistent_pos = page.read_u32();
389 -
                    let pers_hash = page.read_u32();
389 +
                    let pers_hash = SegmentId::new(page.read_u32());
390 390
                    let name_size = page.read_u16() as usize;
391 391
392 392
                    let mut slice: Vec<u8> = vec![0; name_size];
@@ -432,7 +432,7 @@
Loading
432 432
        Ok(())
433 433
    }
434 434
435 -
    pub fn segment_id(&self, segment: &str) -> Option<u32> {
435 +
    pub fn segment_id(&self, segment: &str) -> Option<SegmentId> {
436 436
        if let Some(id) = self.segments_id.get(segment) {
437 437
            self.segments.get(id).map(|x| x.segment_id)
438 438
        } else {
@@ -440,15 +440,15 @@
Loading
440 440
        }
441 441
    }
442 442
443 -
    pub fn segment_by_id(&self, id: u32) -> Option<&Segment> {
443 +
    pub fn segment_by_id(&self, id: SegmentId) -> Option<&Segment> {
444 444
        self.segments.get(&id)
445 445
    }
446 446
447 -
    pub fn segment_name_by_id(&self, id: u32) -> Option<String> {
447 +
    pub fn segment_name_by_id(&self, id: SegmentId) -> Option<String> {
448 448
        self.segments.get(&id).map(|s| s.name.clone())
449 449
    }
450 450
451 -
    pub fn segment_by_id_temp(&self, id: u32) -> Option<&Segment> {
451 +
    pub fn segment_by_id_temp(&self, id: SegmentId) -> Option<&Segment> {
452 452
        self.temp_segments.get(&id)
453 453
    }
454 454
@@ -456,10 +456,10 @@
Loading
456 456
        self.segments_id.contains_key(segment)
457 457
    }
458 458
459 -
    pub fn create_temp_segment(&mut self, allocator: &Allocator, segment: &str) -> PRes<(u32, u64)> {
459 +
    pub fn create_temp_segment(&mut self, allocator: &Allocator, segment: &str) -> PRes<(SegmentId, u64)> {
460 460
        let mut allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
461 461
        let allocated_id = allocated.get_index();
462 -
        let segment_id = segment_hash(segment);
462 +
        let segment_id = SegmentId::new(segment_hash(segment));
463 463
        let seg = Segment::new(
464 464
            allocated_id,
465 465
            allocated_id,
@@ -473,16 +473,16 @@
Loading
473 473
        self.temp_segments_id.insert(segment.to_string(), segment_id);
474 474
        allocated.write_u64(0);
475 475
        allocated.write_u64(0);
476 -
        allocated.write_u32(segment_id);
476 +
        segment_id.write(&mut allocated);
477 477
        allocator.flush_page(allocated)?;
478 478
        Ok((segment_id, allocated_id))
479 479
    }
480 480
481 -
    pub fn get_temp_segment_mut(&mut self, segment: u32) -> Option<&mut Segment> {
481 +
    pub fn get_temp_segment_mut(&mut self, segment: SegmentId) -> Option<&mut Segment> {
482 482
        self.temp_segments.get_mut(&segment)
483 483
    }
484 484
485 -
    pub fn drop_temp_segment(&mut self, allocator: &Allocator, segment: u32) -> PRes<()> {
485 +
    pub fn drop_temp_segment(&mut self, allocator: &Allocator, segment: SegmentId) -> PRes<()> {
486 486
        if let Some(segment) = self.temp_segments.remove(&segment) {
487 487
            self.temp_segments_id.remove(&segment.name);
488 488
            //TODO: to review this, may cause disc leaks in case of crash on rollback of segment
@@ -495,11 +495,11 @@
Loading
495 495
        Ok(())
496 496
    }
497 497
498 -
    pub fn exists_real_or_temp(&self, segment: u32) -> bool {
498 +
    pub fn exists_real_or_temp(&self, segment: SegmentId) -> bool {
499 499
        self.segments.contains_key(&segment) || self.temp_segments.contains_key(&segment)
500 500
    }
501 501
502 -
    pub fn create_segment(&mut self, segment: u32, first_page: u64) -> PRes<()> {
502 +
    pub fn create_segment(&mut self, segment: SegmentId, first_page: u64) -> PRes<()> {
503 503
        if let Some(mut s) = self.temp_segments.remove(&segment) {
504 504
            // This is needed mainly for recover
505 505
            s.first_page = first_page;
@@ -517,7 +517,7 @@
Loading
517 517
518 518
        Ok(())
519 519
    }
520 -
    pub fn collect_segment_pages(&self, allocator: &Allocator, segment: u32) -> PRes<Vec<u64>> {
520 +
    pub fn collect_segment_pages(&self, allocator: &Allocator, segment: SegmentId) -> PRes<Vec<u64>> {
521 521
        if let Some(seg) = self.segments.get(&segment) {
522 522
            seg.collect_segment_pages(allocator)
523 523
        } else {
@@ -525,7 +525,7 @@
Loading
525 525
        }
526 526
    }
527 527
528 -
    pub fn set_first_page(&mut self, segment: u32, first_page: u64, allocator: &Allocator) -> PRes<()> {
528 +
    pub fn set_first_page(&mut self, segment: SegmentId, first_page: u64, allocator: &Allocator) -> PRes<()> {
529 529
        if let Some(seg) = self.segments.get_mut(&segment) {
530 530
            seg.first_page = first_page;
531 531
        }
@@ -533,7 +533,7 @@
Loading
533 533
        Ok(())
534 534
    }
535 535
536 -
    pub fn confirm_allocations(&mut self, segments: &[u32], allocator: &Allocator, recover: bool) -> PRes<()> {
536 +
    pub fn confirm_allocations(&mut self, segments: &[SegmentId], allocator: &Allocator, recover: bool) -> PRes<()> {
537 537
        for seg in segments {
538 538
            // If happen that a segment is not found something went wrong and is better crash
539 539
            let segment = if let Some(s) = self.segments.get_mut(seg) {
@@ -561,7 +561,7 @@
Loading
561 561
            buffer.write_u64(segment.first_page);
562 562
            buffer.write_u64(segment.persistent_page);
563 563
            buffer.write_u32(segment.persistent_pos);
564 -
            buffer.write_u32(segment.segment_id);
564 +
            segment.segment_id.write(&mut buffer);
565 565
            buffer.write_u16(segment.name.len() as u16);
566 566
            buffer.write_all(segment.name.as_bytes());
567 567
        }
@@ -601,10 +601,10 @@
Loading
601 601
        Ok(())
602 602
    }
603 603
604 -
    pub fn list(&self) -> Vec<(String, u32)> {
604 +
    pub fn list(&self) -> Vec<(String, SegmentId)> {
605 605
        self.segments_id.iter().map(|(n, id)| (n.clone(), *id)).collect()
606 606
    }
607 -
    pub fn snapshot_list(&self) -> Vec<(String, u32, u64)> {
607 +
    pub fn snapshot_list(&self) -> Vec<(String, SegmentId, u64)> {
608 608
        self.segments
609 609
            .iter()
610 610
            .map(|(id, seg)| (seg.name.clone(), *id, seg.first_page))
@@ -686,6 +686,7 @@
Loading
686 686
        allocator::Allocator,
687 687
        config::Config,
688 688
        discref::{DiscRef, Page, PageOps},
689 +
        id::SegmentId,
689 690
    };
690 691
    use std::sync::Arc;
691 692
    use tempfile::Builder;
@@ -807,8 +808,9 @@
Loading
807 808
    #[test]
808 809
    fn test_seg_insert_read_pointer() {
809 810
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
810 -
        page.segment_insert_entry(0, 30, 10).unwrap();
811 -
        let read = page.segment_read_entry(0, 30).unwrap();
811 +
        let segment_id = SegmentId::new(0);
812 +
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
813 +
        let read = page.segment_read_entry(segment_id, 30).unwrap();
812 814
        match read {
813 815
            Some(val) => assert_eq!(val.0, 10),
814 816
            None => assert!(false),
@@ -818,9 +820,10 @@
Loading
818 820
    #[test]
819 821
    fn test_seg_insert_update_read_pointer() {
820 822
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
821 -
        page.segment_insert_entry(0, 30, 10).unwrap();
822 -
        page.segment_update_entry(0, 30, 15).unwrap();
823 -
        let read = page.segment_read_entry(0, 30).unwrap();
823 +
        let segment_id = SegmentId::new(0);
824 +
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
825 +
        page.segment_update_entry(segment_id, 30, 15).unwrap();
826 +
        let read = page.segment_read_entry(segment_id, 30).unwrap();
824 827
        match read {
825 828
            Some(val) => assert_eq!(val.0, 15),
826 829
            None => assert!(false),
@@ -830,9 +833,10 @@
Loading
830 833
    #[test]
831 834
    fn test_seg_insert_delete_read_pointer() {
832 835
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
833 -
        page.segment_insert_entry(0, 30, 10).unwrap();
834 -
        page.segment_delete_entry(0, 30).unwrap();
835 -
        let read = page.segment_read_entry(0, 30).unwrap();
836 +
        let segment_id = SegmentId::new(0);
837 +
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
838 +
        page.segment_delete_entry(segment_id, 30).unwrap();
839 +
        let read = page.segment_read_entry(segment_id, 30).unwrap();
836 840
        match read {
837 841
            Some(_) => assert!(false),
838 842
            None => assert!(true),

@@ -1041,12 +1041,7 @@
Loading
1041 1041
    /// # }
1042 1042
    /// ```
1043 1043
    pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
1044 -
        Ok(self
1045 -
            .persy_impl
1046 -
            .list_segments()?
1047 -
            .into_iter()
1048 -
            .map(|(name, id)| (name, SegmentId::new(id)))
1049 -
            .collect())
1044 +
        Ok(self.persy_impl.list_segments()?)
1050 1045
    }
1051 1046
1052 1047
    /// List all the existing indexes.
@@ -1640,12 +1635,7 @@
Loading
1640 1635
    /// # }
1641 1636
    /// ```
1642 1637
    pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
1643 -
        Ok(self
1644 -
            .persy_impl
1645 -
            .list_segments_tx(self.tx())?
1646 -
            .into_iter()
1647 -
            .map(|(name, id)| (name, SegmentId::new(id)))
1648 -
            .collect())
1638 +
        Ok(self.persy_impl.list_segments_tx(self.tx())?)
1649 1639
    }
1650 1640
1651 1641
    /// List all the existing indexes, considering changes in the transaction.

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Learn more Showing 1 files with coverage changes found.

Changes in src/segment.rs
+1
Loading file...
Files Coverage
src 0.17% 92.94%
tests 98.56%
Project Totals (29 files) 93.94%
Loading