@@ -358,7 +358,7 @@
Loading
358 358
        }
359 359
        let (segment_id, first_segment_page) = self.address.create_temp_segment(segment)?;
360 360
        tx.add_create_segment(&self.journal, segment, segment_id, first_segment_page)?;
361 -
        Ok(SegmentId::new(segment_id))
361 +
        Ok(segment_id)
362 362
    }
363 363
364 364
    pub fn drop_segment(&self, tx: &mut Transaction, segment: &str) -> PRes<()> {
@@ -414,7 +414,7 @@
Loading
414 414
    /// check if a segment exist persistent or in tx.
415 415
    ///
416 416
    /// @return true if the segment was created in tx.
417 -
    pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, u32)> {
417 +
    pub fn check_segment_tx(&self, tx: &Transaction, segment: &str) -> PRes<(bool, SegmentId)> {
418 418
        match tx.exists_segment(segment) {
419 419
            DROPPED => Err(PersyError::SegmentNotFound),
420 420
            CREATED(segment_id) => Ok((true, segment_id)),
@@ -445,17 +445,17 @@
Loading
445 445
        let allocator = &self.allocator;
446 446
        let address = &self.address;
447 447
        let (rec_ref, maybe_new_page) = if in_tx {
448 -
            address.allocate_temp(segment_id.id)
448 +
            address.allocate_temp(segment_id)
449 449
        } else {
450 -
            address.allocate(segment_id.id)
450 +
            address.allocate(segment_id)
451 451
        }?;
452 452
        let metadata = PersyImpl::write_record_metadata(len, &rec_ref);
453 453
        let allocation_exp = exp_from_content_size(len + metadata.len() as u64);
454 454
        let mut pg = allocator.allocate(allocation_exp)?;
455 455
        let page = pg.get_index();
456 -
        tx.add_insert(&self.journal, segment_id.id, &rec_ref, page)?;
456 +
        tx.add_insert(&self.journal, segment_id, &rec_ref, page)?;
457 457
        if let Some(new_page) = maybe_new_page {
458 -
            tx.add_new_segment_page(&self.journal, segment_id.id, new_page.new_page, new_page.previus_page)?;
458 +
            tx.add_new_segment_page(&self.journal, segment_id, new_page.new_page, new_page.previus_page)?;
459 459
        }
460 460
        pg.write_all(&metadata);
461 461
        pg.write_all(rec);
@@ -473,7 +473,7 @@
Loading
473 473
            .address
474 474
            .snapshot_list()?
475 475
            .into_iter()
476 -
            .map(|(name, id, first_page)| SegmentSnapshop::new(&name, SegmentId::new(id), first_page))
476 +
            .map(|(name, id, first_page)| SegmentSnapshop::new(&name, id, first_page))
477 477
            .collect::<Vec<_>>();
478 478
        self.snapshots.fill_segments(snapshot_id, &segs)?;
479 479
        Ok(snapshot_id)
@@ -483,7 +483,12 @@
Loading
483 483
        release_snapshot(snapshot_id, &self.snapshots, &self.allocator, &self.journal)
484 484
    }
485 485
486 -
    fn read_ref_segment(&self, tx: &Transaction, segment_id: u32, rec_ref: &RecRef) -> PRes<Option<(u64, u16, u32)>> {
486 +
    fn read_ref_segment(
487 +
        &self,
488 +
        tx: &Transaction,
489 +
        segment_id: SegmentId,
490 +
        rec_ref: &RecRef,
491 +
    ) -> PRes<Option<(u64, u16, SegmentId)>> {
487 492
        Ok(match tx.read(rec_ref) {
488 493
            TxRead::RECORD(rec) => Some((rec.0, rec.1, segment_id)),
489 494
            TxRead::DELETED => None,
@@ -525,7 +530,7 @@
Loading
525 530
        F: Fn(&[u8]) -> T,
526 531
    {
527 532
        loop {
528 -
            if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id.id, id)? {
533 +
            if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id, id)? {
529 534
                if let Some(record) = self.read_page_fn(id, page, &f)? {
530 535
                    break Ok(Some((record, version)));
531 536
                }
@@ -546,7 +551,7 @@
Loading
546 551
547 552
    pub fn read_tx(&self, tx: &mut Transaction, segment: SegmentId, id: &RecRef) -> PRes<Option<Vec<u8>>> {
548 553
        if let Some((rec, version)) = self.read_tx_internal(tx, segment, id)? {
549 -
            tx.add_read(&self.journal, segment.id, id, version)?;
554 +
            tx.add_read(&self.journal, segment, id, version)?;
550 555
            Ok(Some(rec))
551 556
        } else {
552 557
            Ok(None)
@@ -560,18 +565,18 @@
Loading
560 565
        id: &RecRef,
561 566
        version: u16,
562 567
    ) -> PRes<bool> {
563 -
        let segment_id = segment.to_segment_id(&self.address)?.id;
568 +
        let segment_id = segment.to_segment_id(&self.address)?;
564 569
        tx.lock_record(&self.address, segment_id, id, version)
565 570
    }
566 571
567 572
    pub fn unlock_record(&self, tx: &mut Transaction, segment: impl ToSegmentId, id: &RecRef) -> PRes<()> {
568 -
        let segment_id = segment.to_segment_id(&self.address)?.id;
573 +
        let segment_id = segment.to_segment_id(&self.address)?;
569 574
        tx.unlock_record(&self.address, segment_id, id)
570 575
    }
571 576
572 577
    pub fn read(&self, segment: SegmentId, rec_ref: &RecRef) -> PRes<Option<Vec<u8>>> {
573 578
        loop {
574 -
            if let Some((page, _)) = self.address.read(rec_ref, segment.id)? {
579 +
            if let Some((page, _)) = self.address.read(rec_ref, segment)? {
575 580
                if let Some(record) = self.read_page(rec_ref, page)? {
576 581
                    break Ok(Some(record));
577 582
                }
@@ -595,7 +600,7 @@
Loading
595 600
    where
596 601
        F: Fn(&[u8]) -> T,
597 602
    {
598 -
        let segment_id = segment.id;
603 +
        let segment_id = segment;
599 604
        loop {
600 605
            if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref)? {
601 606
                match rec_vers.case {
@@ -620,11 +625,7 @@
Loading
620 625
621 626
    pub fn scan(&self, segment: SegmentId) -> PRes<SegmentRawIter> {
622 627
        let read_snapshot = self.read_snapshot()?;
623 -
        Ok(SegmentRawIter::new(
624 -
            segment,
625 -
            self.address.scan(segment.id)?,
626 -
            read_snapshot,
627 -
        ))
628 +
        Ok(SegmentRawIter::new(segment, self.address.scan(segment)?, read_snapshot))
628 629
    }
629 630
630 631
    pub fn scan_snapshot(&self, segment_id: SegmentId, snapshot: SnapshotId) -> PRes<SegmentSnapshotRawIter> {
@@ -637,13 +638,13 @@
Loading
637 638
        Ok(TxSegmentRawIter::new(
638 639
            tx,
639 640
            segment_id,
640 -
            self.address.scan(segment_id.id)?,
641 +
            self.address.scan(segment_id)?,
641 642
            read_snapshot,
642 643
        ))
643 644
    }
644 645
645 646
    pub fn update(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef, rec: &[u8]) -> PRes<()> {
646 -
        if let Some((_, version, segment)) = self.read_ref_segment(tx, segment.id, rec_ref)? {
647 +
        if let Some((_, version, segment)) = self.read_ref_segment(tx, segment, rec_ref)? {
647 648
            let allocator = &self.allocator;
648 649
            let journal = &self.journal;
649 650
            let len = rec.len();
@@ -661,7 +662,7 @@
Loading
661 662
    }
662 663
663 664
    pub fn delete(&self, tx: &mut Transaction, segment: SegmentId, rec_ref: &RecRef) -> PRes<()> {
664 -
        if let Some((_, version, seg)) = self.read_ref_segment(tx, segment.id, rec_ref)? {
665 +
        if let Some((_, version, seg)) = self.read_ref_segment(tx, segment, rec_ref)? {
665 666
            tx.add_delete(&self.journal, seg, &rec_ref, version)
666 667
        } else {
667 668
            Err(PersyError::RecordNotFound(PersyId(rec_ref.clone())))
@@ -815,7 +816,7 @@
Loading
815 816
        ))
816 817
    }
817 818
818 -
    pub fn segment_name_tx(&self, tx: &Transaction, id: u32) -> PRes<Option<(String, bool)>> {
819 +
    pub fn segment_name_tx(&self, tx: &Transaction, id: SegmentId) -> PRes<Option<(String, bool)>> {
819 820
        if tx.segment_created_in_tx(id) {
820 821
            Ok(tx.segment_name_by_id(id).map(|x| (x, true)))
821 822
        } else {
@@ -823,7 +824,7 @@
Loading
823 824
        }
824 825
    }
825 826
826 -
    pub fn list_segments(&self) -> PRes<Vec<(String, u32)>> {
827 +
    pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
827 828
        Ok(self
828 829
            .address
829 830
            .list()?
@@ -858,7 +859,7 @@
Loading
858 859
            .collect()
859 860
    }
860 861
861 -
    pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, u32)>> {
862 +
    pub fn list_segments_tx(&self, tx: &Transaction) -> PRes<Vec<(String, SegmentId)>> {
862 863
        Ok(tx
863 864
            .filter_list(&self.address.list()?)
864 865
            .filter(|(name, _)| !name.starts_with(INDEX_META_PREFIX) && !name.starts_with(INDEX_DATA_PREFIX))

@@ -1029,11 +1029,11 @@
Loading
1029 1029
#[cfg(test)]
1030 1030
mod tests {
1031 1031
    use super::{ByteVec, IndexTransactionKeeper, IndexType, ValueChange};
1032 -
    use crate::id::{IndexId, PersyId, RecRef};
1032 +
    use crate::id::{IndexId, PersyId, RecRef, SegmentId};
1033 1033
    use std::fmt::Debug;
1034 1034
1035 1035
    fn keeper_test_for_type<K: IndexType + PartialEq, V: IndexType + Debug + PartialEq>(k: K, dk: K, v: V) {
1036 -
        let name = IndexId::new(30, 40);
1036 +
        let name = IndexId::new(SegmentId::new(30), SegmentId::new(40));
1037 1037
        let mut keeper = IndexTransactionKeeper::new();
1038 1038
        keeper.put(name.clone(), k.clone(), v.clone());
1039 1039
        let ret = keeper.get_changes(name.clone(), &k);

@@ -8,6 +8,7 @@
Loading
8 8
    discref::{Page, PageOps, ReadPage, PAGE_METADATA_SIZE},
9 9
    error::PRes,
10 10
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
11 +
    id::SegmentId,
11 12
    io::{
12 13
        read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite,
13 14
        InfallibleWriteFormat, InfallibleWriteVarInt,
@@ -396,7 +397,7 @@
Loading
396 397
    }
397 398
398 399
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
399 -
        buffer.write_varint_u32(self.segment);
400 +
        self.segment.write_varint(buffer);
400 401
        buffer.write_varint_u64(self.recref.page);
401 402
        buffer.write_varint_u32(self.recref.pos);
402 403
        buffer.write_varint_u16(self.version);
@@ -404,7 +405,7 @@
Loading
404 405
    }
405 406
406 407
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
407 -
        self.segment = buffer.read_varint_u32();
408 +
        self.segment = SegmentId::read_varint(buffer);
408 409
        self.recref.page = buffer.read_varint_u64();
409 410
        self.recref.pos = buffer.read_varint_u32();
410 411
        self.version = buffer.read_varint_u16();
@@ -423,7 +424,7 @@
Loading
423 424
    }
424 425
425 426
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
426 -
        buffer.write_varint_u32(self.segment);
427 +
        self.segment.write_varint(buffer);
427 428
        buffer.write_varint_u64(self.recref.page);
428 429
        buffer.write_varint_u32(self.recref.pos);
429 430
        buffer.write_varint_u16(self.version);
@@ -431,7 +432,7 @@
Loading
431 432
    }
432 433
433 434
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
434 -
        self.segment = buffer.read_varint_u32();
435 +
        self.segment = SegmentId::read_varint(buffer);
435 436
        self.recref.page = buffer.read_varint_u64();
436 437
        self.recref.pos = buffer.read_varint_u32();
437 438
        self.version = buffer.read_varint_u16();
@@ -449,7 +450,7 @@
Loading
449 450
    }
450 451
451 452
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
452 -
        buffer.write_varint_u32(self.segment);
453 +
        self.segment.write_varint(buffer);
453 454
        buffer.write_varint_u64(self.recref.page);
454 455
        buffer.write_varint_u32(self.recref.pos);
455 456
        buffer.write_varint_u64(self.record_page);
@@ -458,7 +459,7 @@
Loading
458 459
    }
459 460
460 461
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
461 -
        self.segment = buffer.read_varint_u32();
462 +
        self.segment = SegmentId::read_varint(buffer);
462 463
        self.recref.page = buffer.read_varint_u64();
463 464
        self.recref.pos = buffer.read_varint_u32();
464 465
        self.record_page = buffer.read_varint_u64();
@@ -478,7 +479,7 @@
Loading
478 479
    }
479 480
480 481
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
481 -
        buffer.write_varint_u32(self.segment);
482 +
        self.segment.write_varint(buffer);
482 483
        buffer.write_varint_u64(self.recref.page);
483 484
        buffer.write_varint_u32(self.recref.pos);
484 485
        buffer.write_varint_u64(self.record_page);
@@ -486,7 +487,7 @@
Loading
486 487
    }
487 488
488 489
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
489 -
        self.segment = buffer.read_varint_u32();
490 +
        self.segment = SegmentId::read_varint(buffer);
490 491
        self.recref.page = buffer.read_varint_u64();
491 492
        self.recref.pos = buffer.read_varint_u32();
492 493
        self.record_page = buffer.read_varint_u64();
@@ -622,7 +623,7 @@
Loading
622 623
    }
623 624
624 625
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
625 -
        buffer.write_varint_u32(self.segment_id);
626 +
        self.segment_id.write_varint(buffer);
626 627
        buffer.write_varint_u64(self.first_page);
627 628
        buffer.write_varint_u16(self.name.len() as u16);
628 629
        buffer.write_all(self.name.as_bytes());
@@ -630,7 +631,7 @@
Loading
630 631
    }
631 632
632 633
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
633 -
        self.segment_id = buffer.read_varint_u32();
634 +
        self.segment_id = SegmentId::read_varint(buffer);
634 635
        self.first_page = buffer.read_varint_u64();
635 636
        let string_size = buffer.read_varint_u16();
636 637
        let mut slice: Vec<u8> = vec![0; string_size as usize];
@@ -651,14 +652,14 @@
Loading
651 652
    }
652 653
653 654
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
654 -
        buffer.write_varint_u32(self.segment_id);
655 +
        self.segment_id.write_varint(buffer);
655 656
        buffer.write_varint_u16(self.name.len() as u16);
656 657
        buffer.write_all(self.name.as_bytes());
657 658
        Ok(())
658 659
    }
659 660
660 661
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
661 -
        self.segment_id = buffer.read_varint_u32();
662 +
        self.segment_id = SegmentId::read_varint(buffer);
662 663
        let string_size = buffer.read_varint_u16();
663 664
        let mut slice: Vec<u8> = vec![0; string_size as usize];
664 665
        buffer.read_exact(&mut slice[0..string_size as usize]);
@@ -699,14 +700,14 @@
Loading
699 700
    }
700 701
701 702
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
702 -
        buffer.write_varint_u32(self.segment);
703 +
        self.segment.write_varint(buffer);
703 704
        buffer.write_varint_u64(self.page);
704 705
        buffer.write_varint_u64(self.previous);
705 706
        Ok(())
706 707
    }
707 708
708 709
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
709 -
        self.segment = buffer.read_varint_u32();
710 +
        self.segment = SegmentId::read_varint(buffer);
710 711
        self.page = buffer.read_varint_u64();
711 712
        self.previous = buffer.read_varint_u64();
712 713
        Ok(())
@@ -739,7 +740,7 @@
Loading
739 740
        allocator::Allocator,
740 741
        config::{Config, TxStrategy},
741 742
        discref::DiscRef,
742 -
        id::RecRef,
743 +
        id::{RecRef, SegmentId},
743 744
    };
744 745
    use std::{
745 746
        io::{Cursor, Seek, SeekFrom},
@@ -775,15 +776,16 @@
Loading
775 776
    #[test]
776 777
    fn read_write_insert_record() {
777 778
        let mut buffer = Vec::<u8>::new();
778 -
        let to_write = InsertRecord::new(10, &RecRef::new(20, 10), 3);
779 +
        let seg_id = SegmentId::new(10);
780 +
        let to_write = InsertRecord::new(seg_id, &RecRef::new(20, 10), 3);
779 781
780 782
        to_write.write(&mut buffer).unwrap();
781 783
782 -
        let mut to_read = InsertRecord::new(0, &RecRef::new(0, 0), 0);
784 +
        let mut to_read = InsertRecord::default();
783 785
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
784 786
        to_read.read(&mut cursor).unwrap();
785 787
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
786 -
        assert_eq!(to_read.segment, 10);
788 +
        assert_eq!(to_read.segment, seg_id);
787 789
        assert_eq!(to_read.recref.page, 20);
788 790
        assert_eq!(to_read.recref.pos, 10);
789 791
        assert_eq!(to_read.record_page, 3);
@@ -792,15 +794,16 @@
Loading
792 794
    #[test]
793 795
    fn read_write_insert_read() {
794 796
        let mut buffer = Vec::<u8>::new();
795 -
        let to_write = ReadRecord::new(10, &RecRef::new(20, 10), 3);
797 +
        let seg_id = SegmentId::new(10);
798 +
        let to_write = ReadRecord::new(seg_id, &RecRef::new(20, 10), 3);
796 799
797 800
        to_write.write(&mut buffer).unwrap();
798 801
799 -
        let mut to_read = ReadRecord::new(0, &RecRef::new(0, 0), 0);
802 +
        let mut to_read = ReadRecord::default();
800 803
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
801 804
        to_read.read(&mut cursor).unwrap();
802 805
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
803 -
        assert_eq!(to_read.segment, 10);
806 +
        assert_eq!(to_read.segment, seg_id);
804 807
        assert_eq!(to_read.recref.page, 20);
805 808
        assert_eq!(to_read.recref.pos, 10);
806 809
        assert_eq!(to_read.version, 3);
@@ -809,15 +812,16 @@
Loading
809 812
    #[test]
810 813
    fn read_write_update_record() {
811 814
        let mut buffer = Vec::<u8>::new();
812 -
        let to_write = UpdateRecord::new(10, &RecRef::new(20, 10), 3, 1);
815 +
        let seg_id = SegmentId::new(10);
816 +
        let to_write = UpdateRecord::new(seg_id, &RecRef::new(20, 10), 3, 1);
813 817
814 818
        to_write.write(&mut buffer).unwrap();
815 819
816 -
        let mut to_read = UpdateRecord::new(0, &RecRef::new(0, 0), 0, 0);
820 +
        let mut to_read = UpdateRecord::default();
817 821
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
818 822
        to_read.read(&mut cursor).unwrap();
819 823
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
820 -
        assert_eq!(to_read.segment, 10);
824 +
        assert_eq!(to_read.segment, seg_id);
821 825
        assert_eq!(to_read.recref.page, 20);
822 826
        assert_eq!(to_read.recref.pos, 10);
823 827
        assert_eq!(to_read.record_page, 3);
@@ -827,15 +831,16 @@
Loading
827 831
    #[test]
828 832
    fn read_write_delete_record() {
829 833
        let mut buffer = Vec::<u8>::new();
830 -
        let to_write = DeleteRecord::new(10, &RecRef::new(20, 10), 1);
834 +
        let seg_id = SegmentId::new(10);
835 +
        let to_write = DeleteRecord::new(seg_id, &RecRef::new(20, 10), 1);
831 836
832 837
        to_write.write(&mut buffer).unwrap();
833 838
834 -
        let mut to_read = DeleteRecord::new(0, &RecRef::new(0, 0), 1);
839 +
        let mut to_read = DeleteRecord::default();
835 840
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
836 841
        to_read.read(&mut cursor).unwrap();
837 842
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
838 -
        assert_eq!(to_read.segment, 10);
843 +
        assert_eq!(to_read.segment, seg_id);
839 844
        assert_eq!(to_read.recref.page, 20);
840 845
        assert_eq!(to_read.recref.pos, 10);
841 846
        assert_eq!(to_read.version, 1);
@@ -844,28 +849,30 @@
Loading
844 849
    #[test]
845 850
    fn read_write_create_segment() {
846 851
        let mut buffer = Vec::<u8>::new();
847 -
        let to_write = CreateSegment::new("some", 10, 20);
852 +
        let seg_id = SegmentId::new(10);
853 +
        let to_write = CreateSegment::new("some", seg_id, 20);
848 854
        to_write.write(&mut buffer).unwrap();
849 -
        let mut to_read = CreateSegment::new("", 0, 0);
855 +
        let mut to_read = CreateSegment::default();
850 856
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
851 857
        to_read.read(&mut cursor).unwrap();
852 858
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
853 859
        assert_eq!(to_read.name, "some");
854 -
        assert_eq!(to_read.segment_id, 10);
860 +
        assert_eq!(to_read.segment_id, seg_id);
855 861
        assert_eq!(to_read.first_page, 20);
856 862
    }
857 863
858 864
    #[test]
859 865
    fn read_write_drop_segment() {
860 866
        let mut buffer = Vec::<u8>::new();
861 -
        let to_write = DropSegment::new("some", 20);
867 +
        let seg_id = SegmentId::new(20);
868 +
        let to_write = DropSegment::new("some", seg_id);
862 869
        to_write.write(&mut buffer).unwrap();
863 -
        let mut to_read = DropSegment::new("", 0);
870 +
        let mut to_read = DropSegment::default();
864 871
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
865 872
        to_read.read(&mut cursor).unwrap();
866 873
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
867 874
        assert_eq!(to_read.name, "some");
868 -
        assert_eq!(to_read.segment_id, 20);
875 +
        assert_eq!(to_read.segment_id, seg_id);
869 876
    }
870 877
871 878
    #[test]
@@ -885,13 +892,14 @@
Loading
885 892
    #[test]
886 893
    fn read_write_new_segment_page() {
887 894
        let mut buffer = Vec::<u8>::new();
888 -
        let to_write = NewSegmentPage::new(10, 20, 30);
895 +
        let seg_id = SegmentId::new(10);
896 +
        let to_write = NewSegmentPage::new(seg_id, 20, 30);
889 897
        to_write.write(&mut buffer).unwrap();
890 -
        let mut to_read = NewSegmentPage::new(0, 0, 0);
898 +
        let mut to_read = NewSegmentPage::default();
891 899
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
892 900
        to_read.read(&mut cursor).unwrap();
893 901
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
894 -
        assert_eq!(to_read.segment, 10);
902 +
        assert_eq!(to_read.segment, seg_id);
895 903
        assert_eq!(to_read.page, 20);
896 904
        assert_eq!(to_read.previous, 30);
897 905
    }
@@ -909,7 +917,8 @@
Loading
909 917
        let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
910 918
        let rp = Journal::init(&allocator).unwrap();
911 919
        let journal = Journal::new(&Arc::new(allocator), rp).unwrap();
912 -
        let rec = InsertRecord::new(10, &RecRef::new(1, 1), 1);
920 +
        let seg_id = SegmentId::new(10);
921 +
        let rec = InsertRecord::new(seg_id, &RecRef::new(1, 1), 1);
913 922
        let id = JournalId::new(1, 20);
914 923
        journal.log(&rec, &id).unwrap();
915 924
        journal.log(&rec, &id).unwrap();

@@ -7,7 +7,7 @@
Loading
7 7
    discref::{Page, PageOps},
8 8
    error::{PRes, PersyError},
9 9
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
10 -
    id::{PersyId, RecRef},
10 +
    id::{PersyId, RecRef, SegmentId},
11 11
    io::{read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
12 12
    persy::exp_from_content_size,
13 13
};
@@ -30,7 +30,7 @@
Loading
30 30
    persistent_pos: u32,
31 31
    pub alloc_page: u64,
32 32
    pub alloc_pos: u32,
33 -
    segment_id: u32,
33 +
    segment_id: SegmentId,
34 34
    name: String,
35 35
}
36 36
@@ -41,7 +41,7 @@
Loading
41 41
        persistent_pos: u32,
42 42
        alloc_page: u64,
43 43
        alloc_pos: u32,
44 -
        segment_id: u32,
44 +
        segment_id: SegmentId,
45 45
        name: &str,
46 46
    ) -> Segment {
47 47
        Segment {
@@ -130,7 +130,7 @@
Loading
130 130
}
131 131
132 132
pub(crate) trait SegmentPageRead: PageOps {
133 -
    fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>>;
133 +
    fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<Option<(u64, u16)>>;
134 134
    fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)>;
135 135
    fn segment_scan_all_entries(&mut self) -> PRes<(u64, Vec<(u32, bool)>)>;
136 136
    fn segment_first_available_pos(&mut self) -> PRes<u32>;
@@ -139,10 +139,10 @@
Loading
139 139
    fn empty(&mut self) -> PRes<bool>;
140 140
}
141 141
pub(crate) trait SegmentPage: SegmentPageRead {
142 -
    fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
143 -
    fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
144 -
    fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool>;
145 -
    fn set_segment_id(&mut self, id: u32) -> PRes<()>;
142 +
    fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()>;
143 +
    fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()>;
144 +
    fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<bool>;
145 +
    fn set_segment_id(&mut self, id: SegmentId) -> PRes<()>;
146 146
    fn set_next(&mut self, next: u64) -> PRes<()>;
147 147
    fn set_prev(&mut self, prev: u64) -> PRes<()>;
148 148
    fn recalc_count(&mut self) -> PRes<()>;
@@ -153,9 +153,9 @@
Loading
153 153
}
154 154
155 155
impl<T: InfallibleRead + PageOps> SegmentPageRead for T {
156 -
    fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>> {
156 +
    fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<Option<(u64, u16)>> {
157 157
        self.seek(SEGMENT_HASH_OFFSET)?;
158 -
        let persistent_id = self.read_u32();
158 +
        let persistent_id = SegmentId::read(self);
159 159
        if persistent_id != segment_id {
160 160
            return Ok(None);
161 161
        }
@@ -248,10 +248,10 @@
Loading
248 248
}
249 249
250 250
impl<T: InfallibleRead + InfallibleWrite + PageOps> SegmentPage for T {
251 -
    fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
251 +
    fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()> {
252 252
        debug_assert!(pos >= SEGMENT_DATA_OFFSET, "invalid page position {}", pos);
253 253
        self.seek(SEGMENT_HASH_OFFSET)?;
254 -
        let persistent_id = self.read_u32();
254 +
        let persistent_id = SegmentId::read(self);
255 255
        if persistent_id != segment_id {
256 256
            return Err(PersyError::SegmentNotFound);
257 257
        }
@@ -263,10 +263,10 @@
Loading
263 263
        self.write_u16(1);
264 264
        Ok(())
265 265
    }
266 -
    fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
266 +
    fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()> {
267 267
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
268 268
        self.seek(SEGMENT_HASH_OFFSET)?;
269 -
        let persistent_id = self.read_u32();
269 +
        let persistent_id = SegmentId::read(self);
270 270
        if persistent_id != segment_id {
271 271
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
272 272
        }
@@ -278,10 +278,10 @@
Loading
278 278
        self.write_u16(inc_version(version));
279 279
        Ok(())
280 280
    }
281 -
    fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool> {
281 +
    fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<bool> {
282 282
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
283 283
        self.seek(SEGMENT_HASH_OFFSET)?;
284 -
        let persistent_id = self.read_u32();
284 +
        let persistent_id = SegmentId::read(self);
285 285
        if persistent_id != segment_id {
286 286
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
287 287
        }
@@ -296,9 +296,9 @@
Loading
296 296
        Ok(count as u32 == ADDRESS_PAGE_ENTRY_COUNT)
297 297
    }
298 298
299 -
    fn set_segment_id(&mut self, id: u32) -> PRes<()> {
299 +
    fn set_segment_id(&mut self, id: SegmentId) -> PRes<()> {
300 300
        self.seek(SEGMENT_HASH_OFFSET)?;
301 -
        self.write_u32(id);
301 +
        id.write(self);
302 302
        Ok(())
303 303
    }
304 304
@@ -339,10 +339,10 @@
Loading
339 339
    content_page: u64,
340 340
    other_page: u64,
341 341
    last_flush: u8,
342 -
    pub segments: HashMap<u32, Segment>,
343 -
    pub segments_id: HashMap<String, u32>,
344 -
    pub temp_segments: HashMap<u32, Segment>,
345 -
    pub temp_segments_id: HashMap<String, u32>,
342 +
    pub segments: HashMap<SegmentId, Segment>,
343 +
    pub segments_id: HashMap<String, SegmentId>,
344 +
    pub temp_segments: HashMap<SegmentId, Segment>,
345 +
    pub temp_segments_id: HashMap<String, SegmentId>,
346 346
}
347 347
348 348
pub fn segment_hash(segment: &str) -> u32 {
@@ -386,7 +386,7 @@
Loading
386 386
                    let first_page = page.read_u64();
387 387
                    let persistent_page = page.read_u64();
388 388
                    let persistent_pos = page.read_u32();
389 -
                    let pers_hash = page.read_u32();
389 +
                    let pers_hash = SegmentId::new(page.read_u32());
390 390
                    let name_size = page.read_u16() as usize;
391 391
392 392
                    let mut slice: Vec<u8> = vec![0; name_size];
@@ -432,7 +432,7 @@
Loading
432 432
        Ok(())
433 433
    }
434 434
435 -
    pub fn segment_id(&self, segment: &str) -> Option<u32> {
435 +
    pub fn segment_id(&self, segment: &str) -> Option<SegmentId> {
436 436
        if let Some(id) = self.segments_id.get(segment) {
437 437
            self.segments.get(id).map(|x| x.segment_id)
438 438
        } else {
@@ -440,15 +440,15 @@
Loading
440 440
        }
441 441
    }
442 442
443 -
    pub fn segment_by_id(&self, id: u32) -> Option<&Segment> {
443 +
    pub fn segment_by_id(&self, id: SegmentId) -> Option<&Segment> {
444 444
        self.segments.get(&id)
445 445
    }
446 446
447 -
    pub fn segment_name_by_id(&self, id: u32) -> Option<String> {
447 +
    pub fn segment_name_by_id(&self, id: SegmentId) -> Option<String> {
448 448
        self.segments.get(&id).map(|s| s.name.clone())
449 449
    }
450 450
451 -
    pub fn segment_by_id_temp(&self, id: u32) -> Option<&Segment> {
451 +
    pub fn segment_by_id_temp(&self, id: SegmentId) -> Option<&Segment> {
452 452
        self.temp_segments.get(&id)
453 453
    }
454 454
@@ -456,10 +456,10 @@
Loading
456 456
        self.segments_id.contains_key(segment)
457 457
    }
458 458
459 -
    pub fn create_temp_segment(&mut self, allocator: &Allocator, segment: &str) -> PRes<(u32, u64)> {
459 +
    pub fn create_temp_segment(&mut self, allocator: &Allocator, segment: &str) -> PRes<(SegmentId, u64)> {
460 460
        let mut allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
461 461
        let allocated_id = allocated.get_index();
462 -
        let segment_id = segment_hash(segment);
462 +
        let segment_id = SegmentId::new(segment_hash(segment));
463 463
        let seg = Segment::new(
464 464
            allocated_id,
465 465
            allocated_id,
@@ -473,16 +473,16 @@
Loading
473 473
        self.temp_segments_id.insert(segment.to_string(), segment_id);
474 474
        allocated.write_u64(0);
475 475
        allocated.write_u64(0);
476 -
        allocated.write_u32(segment_id);
476 +
        segment_id.write(&mut allocated);
477 477
        allocator.flush_page(allocated)?;
478 478
        Ok((segment_id, allocated_id))
479 479
    }
480 480
481 -
    pub fn get_temp_segment_mut(&mut self, segment: u32) -> Option<&mut Segment> {
481 +
    pub fn get_temp_segment_mut(&mut self, segment: SegmentId) -> Option<&mut Segment> {
482 482
        self.temp_segments.get_mut(&segment)
483 483
    }
484 484
485 -
    pub fn drop_temp_segment(&mut self, allocator: &Allocator, segment: u32) -> PRes<()> {
485 +
    pub fn drop_temp_segment(&mut self, allocator: &Allocator, segment: SegmentId) -> PRes<()> {
486 486
        if let Some(segment) = self.temp_segments.remove(&segment) {
487 487
            self.temp_segments_id.remove(&segment.name);
488 488
            //TODO: to review this, may cause disc leaks in case of crash on rollback of segment
@@ -495,11 +495,11 @@
Loading
495 495
        Ok(())
496 496
    }
497 497
498 -
    pub fn exists_real_or_temp(&self, segment: u32) -> bool {
498 +
    pub fn exists_real_or_temp(&self, segment: SegmentId) -> bool {
499 499
        self.segments.contains_key(&segment) || self.temp_segments.contains_key(&segment)
500 500
    }
501 501
502 -
    pub fn create_segment(&mut self, segment: u32, first_page: u64) -> PRes<()> {
502 +
    pub fn create_segment(&mut self, segment: SegmentId, first_page: u64) -> PRes<()> {
503 503
        if let Some(mut s) = self.temp_segments.remove(&segment) {
504 504
            // This is needed mainly for recover
505 505
            s.first_page = first_page;
@@ -517,7 +517,7 @@
Loading
517 517
518 518
        Ok(())
519 519
    }
520 -
    pub fn collect_segment_pages(&self, allocator: &Allocator, segment: u32) -> PRes<Vec<u64>> {
520 +
    pub fn collect_segment_pages(&self, allocator: &Allocator, segment: SegmentId) -> PRes<Vec<u64>> {
521 521
        if let Some(seg) = self.segments.get(&segment) {
522 522
            seg.collect_segment_pages(allocator)
523 523
        } else {
@@ -525,7 +525,7 @@
Loading
525 525
        }
526 526
    }
527 527
528 -
    pub fn set_first_page(&mut self, segment: u32, first_page: u64, allocator: &Allocator) -> PRes<()> {
528 +
    pub fn set_first_page(&mut self, segment: SegmentId, first_page: u64, allocator: &Allocator) -> PRes<()> {
529 529
        if let Some(seg) = self.segments.get_mut(&segment) {
530 530
            seg.first_page = first_page;
531 531
        }
@@ -533,7 +533,7 @@
Loading
533 533
        Ok(())
534 534
    }
535 535
536 -
    pub fn confirm_allocations(&mut self, segments: &[u32], allocator: &Allocator, recover: bool) -> PRes<()> {
536 +
    pub fn confirm_allocations(&mut self, segments: &[SegmentId], allocator: &Allocator, recover: bool) -> PRes<()> {
537 537
        for seg in segments {
538 538
            // If happen that a segment is not found something went wrong and is better crash
539 539
            let segment = if let Some(s) = self.segments.get_mut(seg) {
@@ -561,7 +561,7 @@
Loading
561 561
            buffer.write_u64(segment.first_page);
562 562
            buffer.write_u64(segment.persistent_page);
563 563
            buffer.write_u32(segment.persistent_pos);
564 -
            buffer.write_u32(segment.segment_id);
564 +
            segment.segment_id.write(&mut buffer);
565 565
            buffer.write_u16(segment.name.len() as u16);
566 566
            buffer.write_all(segment.name.as_bytes());
567 567
        }
@@ -601,10 +601,10 @@
Loading
601 601
        Ok(())
602 602
    }
603 603
604 -
    pub fn list(&self) -> Vec<(String, u32)> {
604 +
    pub fn list(&self) -> Vec<(String, SegmentId)> {
605 605
        self.segments_id.iter().map(|(n, id)| (n.clone(), *id)).collect()
606 606
    }
607 -
    pub fn snapshot_list(&self) -> Vec<(String, u32, u64)> {
607 +
    pub fn snapshot_list(&self) -> Vec<(String, SegmentId, u64)> {
608 608
        self.segments
609 609
            .iter()
610 610
            .map(|(id, seg)| (seg.name.clone(), *id, seg.first_page))
@@ -686,6 +686,7 @@
Loading
686 686
        allocator::Allocator,
687 687
        config::Config,
688 688
        discref::{DiscRef, Page, PageOps},
689 +
        id::SegmentId,
689 690
    };
690 691
    use std::sync::Arc;
691 692
    use tempfile::Builder;
@@ -807,8 +808,9 @@
Loading
807 808
    #[test]
808 809
    fn test_seg_insert_read_pointer() {
809 810
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
810 -
        page.segment_insert_entry(0, 30, 10).unwrap();
811 -
        let read = page.segment_read_entry(0, 30).unwrap();
811 +
        let segment_id = SegmentId::new(0);
812 +
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
813 +
        let read = page.segment_read_entry(segment_id, 30).unwrap();
812 814
        match read {
813 815
            Some(val) => assert_eq!(val.0, 10),
814 816
            None => assert!(false),
@@ -818,9 +820,10 @@
Loading
818 820
    #[test]
819 821
    fn test_seg_insert_update_read_pointer() {
820 822
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
821 -
        page.segment_insert_entry(0, 30, 10).unwrap();
822 -
        page.segment_update_entry(0, 30, 15).unwrap();
823 -
        let read = page.segment_read_entry(0, 30).unwrap();
823 +
        let segment_id = SegmentId::new(0);
824 +
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
825 +
        page.segment_update_entry(segment_id, 30, 15).unwrap();
826 +
        let read = page.segment_read_entry(segment_id, 30).unwrap();
824 827
        match read {
825 828
            Some(val) => assert_eq!(val.0, 15),
826 829
            None => assert!(false),
@@ -830,9 +833,10 @@
Loading
830 833
    #[test]
831 834
    fn test_seg_insert_delete_read_pointer() {
832 835
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
833 -
        page.segment_insert_entry(0, 30, 10).unwrap();
834 -
        page.segment_delete_entry(0, 30).unwrap();
835 -
        let read = page.segment_read_entry(0, 30).unwrap();
836 +
        let segment_id = SegmentId::new(0);
837 +
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
838 +
        page.segment_delete_entry(segment_id, 30).unwrap();
839 +
        let read = page.segment_read_entry(segment_id, 30).unwrap();
836 840
        match read {
837 841
            Some(_) => assert!(false),
838 842
            None => assert!(true),

@@ -1041,12 +1041,7 @@
Loading
1041 1041
    /// # }
1042 1042
    /// ```
1043 1043
    pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
1044 -
        Ok(self
1045 -
            .persy_impl
1046 -
            .list_segments()?
1047 -
            .into_iter()
1048 -
            .map(|(name, id)| (name, SegmentId::new(id)))
1049 -
            .collect())
1044 +
        Ok(self.persy_impl.list_segments()?)
1050 1045
    }
1051 1046
1052 1047
    /// List all the existing indexes.
@@ -1640,12 +1635,7 @@
Loading
1640 1635
    /// # }
1641 1636
    /// ```
1642 1637
    pub fn list_segments(&self) -> PRes<Vec<(String, SegmentId)>> {
1643 -
        Ok(self
1644 -
            .persy_impl
1645 -
            .list_segments_tx(self.tx())?
1646 -
            .into_iter()
1647 -
            .map(|(name, id)| (name, SegmentId::new(id)))
1648 -
            .collect())
1638 +
        Ok(self.persy_impl.list_segments_tx(self.tx())?)
1649 1639
    }
1650 1640
1651 1641
    /// List all the existing indexes, considering changes in the transaction.

@@ -7,7 +7,7 @@
Loading
7 7
    address::Address,
8 8
    config::TxStrategy,
9 9
    error::{PRes, PersyError},
10 -
    id::{index_id_to_segment_id_data, index_id_to_segment_id_meta, IndexId, RecRef},
10 +
    id::{index_id_to_segment_id_data, index_id_to_segment_id_meta, IndexId, RecRef, SegmentId},
11 11
    journal::{Journal, JournalId},
12 12
    persy::PersyImpl,
13 13
    snapshot::{release_snapshot, SnapshotEntry, SnapshotId},
@@ -20,21 +20,21 @@
Loading
20 20
21 21
#[derive(Clone, Default)]
22 22
pub struct NewSegmentPage {
23 -
    pub segment: u32,
23 +
    pub segment: SegmentId,
24 24
    pub page: u64,
25 25
    pub previous: u64,
26 26
}
27 27
28 28
#[derive(Clone, Default)]
29 29
pub struct InsertRecord {
30 -
    pub segment: u32,
30 +
    pub segment: SegmentId,
31 31
    pub recref: RecRef,
32 32
    pub record_page: u64,
33 33
}
34 34
35 35
#[derive(Clone, Default)]
36 36
pub struct UpdateRecord {
37 -
    pub segment: u32,
37 +
    pub segment: SegmentId,
38 38
    pub recref: RecRef,
39 39
    pub record_page: u64,
40 40
    pub version: u16,
@@ -42,14 +42,14 @@
Loading
42 42
43 43
#[derive(Clone, Default)]
44 44
pub struct ReadRecord {
45 -
    pub segment: u32,
45 +
    pub segment: SegmentId,
46 46
    pub recref: RecRef,
47 47
    pub version: u16,
48 48
}
49 49
50 50
#[derive(Clone, Default)]
51 51
pub struct DeleteRecord {
52 -
    pub segment: u32,
52 +
    pub segment: SegmentId,
53 53
    pub recref: RecRef,
54 54
    pub version: u16,
55 55
}
@@ -57,14 +57,14 @@
Loading
57 57
#[derive(Clone, Default)]
58 58
pub struct CreateSegment {
59 59
    pub name: String,
60 -
    pub segment_id: u32,
60 +
    pub segment_id: SegmentId,
61 61
    pub first_page: u64,
62 62
}
63 63
64 64
#[derive(Clone, Default)]
65 65
pub struct DropSegment {
66 66
    pub name: String,
67 -
    pub segment_id: u32,
67 +
    pub segment_id: SegmentId,
68 68
}
69 69
70 70
#[derive(Clone, PartialEq, Debug, PartialOrd, Ord, Eq, Default)]
@@ -99,7 +99,7 @@
Loading
99 99
pub struct PreparedState {
100 100
    locked_indexes: Option<Vec<IndexId>>,
101 101
    snapshot_id: Option<SnapshotId>,
102 -
    data_locks: Option<(Vec<(u32, RecRef, u16)>, Vec<u32>, Vec<u32>)>,
102 +
    data_locks: Option<(Vec<(SegmentId, RecRef, u16)>, Vec<SegmentId>, Vec<SegmentId>)>,
103 103
}
104 104
105 105
impl PreparedState {
@@ -129,15 +129,15 @@
Loading
129 129
    segments_operations: Vec<SegmentOperation>,
130 130
    segs_created_names: HashSet<String>,
131 131
    segs_dropped_names: HashSet<String>,
132 -
    segs_created: HashSet<u32>,
133 -
    segs_dropped: HashSet<u32>,
134 -
    segs_updated: HashSet<u32>,
132 +
    segs_created: HashSet<SegmentId>,
133 +
    segs_dropped: HashSet<SegmentId>,
134 +
    segs_updated: HashSet<SegmentId>,
135 135
    freed_pages: Option<Vec<FreedPage>>,
136 136
    indexes: Option<IndexTransactionKeeper>,
137 137
    segs_new_pages: Vec<NewSegmentPage>,
138 -
    locked_index_segs: HashSet<u32>,
138 +
    locked_index_segs: HashSet<SegmentId>,
139 139
    locked_index_pages: HashSet<RecRef>,
140 -
    locked_index_tracking: HashSet<(u32, RecRef, u16)>,
140 +
    locked_index_tracking: HashSet<(SegmentId, RecRef, u16)>,
141 141
}
142 142
143 143
pub enum TxRead {
@@ -147,19 +147,19 @@
Loading
147 147
}
148 148
149 149
pub enum TxSegCheck {
150 -
    CREATED(u32),
150 +
    CREATED(SegmentId),
151 151
    DROPPED,
152 152
    NONE,
153 153
}
154 154
155 155
pub struct TransactionInsertScanner<'a> {
156 156
    tx: &'a Transaction,
157 -
    segment: u32,
157 +
    segment: SegmentId,
158 158
}
159 159
160 160
pub struct TransactionInsertIterator {
161 161
    iter: vec::IntoIter<InsertRecord>,
162 -
    segment: u32,
162 +
    segment: SegmentId,
163 163
}
164 164
165 165
impl<'a> IntoIterator for TransactionInsertScanner<'a> {
@@ -244,11 +244,11 @@
Loading
244 244
        }
245 245
    }
246 246
247 -
    pub fn segment_created_in_tx(&self, segment: u32) -> bool {
247 +
    pub fn segment_created_in_tx(&self, segment: SegmentId) -> bool {
248 248
        self.segs_created.contains(&segment)
249 249
    }
250 250
251 -
    pub fn segment_name_by_id(&self, segment: u32) -> Option<String> {
251 +
    pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
252 252
        for info in &self.segments_operations {
253 253
            if let SegmentOperation::CREATE(ref c) = info {
254 254
                if c.segment_id == segment {
@@ -273,7 +273,13 @@
Loading
273 273
        TxSegCheck::NONE
274 274
    }
275 275
276 -
    pub fn add_create_segment(&mut self, journal: &Journal, name: &str, segment_id: u32, first_page: u64) -> PRes<()> {
276 +
    pub fn add_create_segment(
277 +
        &mut self,
278 +
        journal: &Journal,
279 +
        name: &str,
280 +
        segment_id: SegmentId,
281 +
        first_page: u64,
282 +
    ) -> PRes<()> {
277 283
        let create = CreateSegment::new(name, segment_id, first_page);
278 284
279 285
        journal.log(&create, &self.id)?;
@@ -289,7 +295,7 @@
Loading
289 295
        self.segs_created_names.insert(create.name.clone());
290 296
    }
291 297
292 -
    pub fn add_drop_segment(&mut self, journal: &Journal, name: &str, segment_id: u32) -> PRes<()> {
298 +
    pub fn add_drop_segment(&mut self, journal: &Journal, name: &str, segment_id: SegmentId) -> PRes<()> {
293 299
        if self.segs_created_names.contains(name) {
294 300
            Err(PersyError::CannotDropSegmentCreatedInTx)
295 301
        } else {
@@ -308,7 +314,7 @@
Loading
308 314
        self.segs_dropped_names.insert(drop.name.clone());
309 315
    }
310 316
311 -
    pub fn add_read(&mut self, journal: &Journal, segment: u32, recref: &RecRef, version: u16) -> PRes<()> {
317 +
    pub fn add_read(&mut self, journal: &Journal, segment: SegmentId, recref: &RecRef, version: u16) -> PRes<()> {
312 318
        if self.strategy == TxStrategy::VersionOnRead {
313 319
            let read = ReadRecord::new(segment, recref, version);
314 320
            journal.log(&read, &self.id)?;
@@ -321,7 +327,7 @@
Loading
321 327
        self.read.insert(read.recref.clone(), read.clone());
322 328
    }
323 329
324 -
    pub fn add_insert(&mut self, journal: &Journal, segment: u32, rec_ref: &RecRef, record: u64) -> PRes<()> {
330 +
    pub fn add_insert(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, record: u64) -> PRes<()> {
325 331
        self.segs_updated.insert(segment);
326 332
        let insert = InsertRecord::new(segment, rec_ref, record);
327 333
@@ -332,7 +338,7 @@
Loading
332 338
    pub fn add_new_segment_page(
333 339
        &mut self,
334 340
        journal: &Journal,
335 -
        segment: u32,
341 +
        segment: SegmentId,
336 342
        new_page: u64,
337 343
        previous_page: u64,
338 344
    ) -> PRes<()> {
@@ -351,7 +357,7 @@
Loading
351 357
    pub fn add_update(
352 358
        &mut self,
353 359
        journal: &Journal,
354 -
        segment: u32,
360 +
        segment: SegmentId,
355 361
        rec_ref: &RecRef,
356 362
        record: u64,
357 363
        version: u16,
@@ -368,7 +374,7 @@
Loading
368 374
        self.updated.push(update.clone());
369 375
    }
370 376
371 -
    pub fn add_delete(&mut self, journal: &Journal, segment: u32, rec_ref: &RecRef, version: u16) -> PRes<()> {
377 +
    pub fn add_delete(&mut self, journal: &Journal, segment: SegmentId, rec_ref: &RecRef, version: u16) -> PRes<()> {
372 378
        self.segs_updated.insert(segment);
373 379
        let delete = DeleteRecord::new(segment, rec_ref, version);
374 380
        journal.log(&delete, &self.id)?;
@@ -432,7 +438,7 @@
Loading
432 438
        self.deleted.push(delete.clone());
433 439
    }
434 440
435 -
    pub fn scan_insert(&self, seg: u32) -> TransactionInsertScanner {
441 +
    pub fn scan_insert(&self, seg: SegmentId) -> TransactionInsertScanner {
436 442
        TransactionInsertScanner { tx: self, segment: seg }
437 443
    }
438 444
@@ -481,7 +487,7 @@
Loading
481 487
        Ok(prepared)
482 488
    }
483 489
484 -
    fn solve_index_locks(&self) -> Vec<(u32, RecRef, u16)> {
490 +
    fn solve_index_locks(&self) -> Vec<(SegmentId, RecRef, u16)> {
485 491
        let mut records = HashSet::new();
486 492
        for update in &self.updated {
487 493
            if self.locked_index_pages.contains(&update.recref) {
@@ -513,7 +519,7 @@
Loading
513 519
            let changed_indexes = ind_change.changed_indexes();
514 520
            for check in changed_indexes {
515 521
                let segment_meta = index_id_to_segment_id_meta(&check);
516 -
                if self.segs_dropped.contains(&segment_meta.id) {
522 +
                if self.segs_dropped.contains(&segment_meta) {
517 523
                    ind_change.remove_changes(&check);
518 524
                }
519 525
            }
@@ -527,11 +533,11 @@
Loading
527 533
            }
528 534
            for index_id in &to_lock {
529 535
                let segment_meta = index_id_to_segment_id_meta(index_id);
530 -
                address.acquire_segment_read_lock(segment_meta.id)?;
531 -
                self.locked_index_segs.insert(segment_meta.id);
536 +
                address.acquire_segment_read_lock(segment_meta)?;
537 +
                self.locked_index_segs.insert(segment_meta);
532 538
                let segment_data = index_id_to_segment_id_data(index_id);
533 -
                address.acquire_segment_read_lock(segment_data.id)?;
534 -
                self.locked_index_segs.insert(segment_data.id);
539 +
                address.acquire_segment_read_lock(segment_data)?;
540 +
                self.locked_index_segs.insert(segment_data);
535 541
            }
536 542
            prepared.locked_indexes = Some(to_lock);
537 543
            if let Err(x) = ind_change.apply(persy_impl, &mut self) {
@@ -662,7 +668,7 @@
Loading
662 668
        pages_to_free
663 669
    }
664 670
665 -
    fn coll_locks(&self) -> (Vec<(u32, RecRef, u16)>, Vec<u32>, Vec<u32>) {
671 +
    fn coll_locks(&self) -> (Vec<(SegmentId, RecRef, u16)>, Vec<SegmentId>, Vec<SegmentId>) {
666 672
        let mut crt_upd_segs = Vec::new();
667 673
        for create in &self.segs_created {
668 674
            if !&self.segs_dropped.contains(create) && !self.locked_index_segs.contains(create) {
@@ -706,14 +712,14 @@
Loading
706 712
            records.remove(&(insert.segment, insert.recref.clone(), 1));
707 713
        }
708 714
709 -
        let mut records: Vec<(u32, RecRef, u16)> = records.into_iter().collect();
715 +
        let mut records: Vec<(SegmentId, RecRef, u16)> = records.into_iter().collect();
710 716
        records.sort_by_key(|ref x| x.1.clone());
711 717
        crt_upd_segs.sort();
712 718
        dropped_segs.sort();
713 719
        (records, crt_upd_segs, dropped_segs)
714 720
    }
715 721
716 -
    fn internal_rollback(&self, persy_impl: &PersyImpl) -> PRes<Vec<(u32, u64)>> {
722 +
    fn internal_rollback(&self, persy_impl: &PersyImpl) -> PRes<Vec<(SegmentId, u64)>> {
717 723
        let allocator = persy_impl.allocator();
718 724
        let address = persy_impl.address();
719 725
@@ -826,7 +832,11 @@
Loading
826 832
        Ok(())
827 833
    }
828 834
829 -
    fn free_pages_tx(&self, journal: &Journal, pages_to_free: &[(u32, u64)]) -> PRes<(JournalId, Vec<FreedPage>)> {
835 +
    fn free_pages_tx(
836 +
        &self,
837 +
        journal: &Journal,
838 +
        pages_to_free: &[(SegmentId, u64)],
839 +
    ) -> PRes<(JournalId, Vec<FreedPage>)> {
830 840
        let id = journal.start()?;
831 841
        let mut freed = Vec::new();
832 842
        for (_, page) in pages_to_free {
@@ -844,7 +854,7 @@
Loading
844 854
        persy_impl: &PersyImpl,
845 855
        recover: bool,
846 856
        prepared: &PreparedState,
847 -
    ) -> PRes<Vec<(u32, u64)>> {
857 +
    ) -> PRes<Vec<(SegmentId, u64)>> {
848 858
        let indexes = persy_impl.indexes();
849 859
        let address = persy_impl.address();
850 860
@@ -877,7 +887,7 @@
Loading
877 887
        Ok(pages_to_unlink)
878 888
    }
879 889
880 -
    fn free_address_structures(&self, address_to_free: Vec<(u32, u64)>, persy_impl: &PersyImpl) -> PRes<()> {
890 +
    fn free_address_structures(&self, address_to_free: Vec<(SegmentId, u64)>, persy_impl: &PersyImpl) -> PRes<()> {
881 891
        let allocator = persy_impl.allocator();
882 892
        let journal = persy_impl.journal();
883 893
        let snapshots = persy_impl.snapshots();
@@ -926,7 +936,10 @@
Loading
926 936
        &self.meta_id
927 937
    }
928 938
929 -
    pub fn filter_list<'a>(&'a self, pers: &'a [(String, u32)]) -> impl Iterator<Item = (&'a str, u32)> + 'a {
939 +
    pub fn filter_list<'a>(
940 +
        &'a self,
941 +
        pers: &'a [(String, SegmentId)],
942 +
    ) -> impl Iterator<Item = (&'a str, SegmentId)> + 'a {
930 943
        let outer = pers.iter().map(|(name, id)| (name.as_str(), *id));
931 944
        let inner = self.segments_operations.iter().filter_map(|seg| {
932 945
            if let SegmentOperation::CREATE(c) = seg {
@@ -939,7 +952,7 @@
Loading
939 952
        outer.chain(inner).filter(move |x| !self.segs_dropped.contains(&x.1))
940 953
    }
941 954
942 -
    pub fn lock_record(&mut self, address: &Address, segment: u32, id: &RecRef, version: u16) -> PRes<bool> {
955 +
    pub fn lock_record(&mut self, address: &Address, segment: SegmentId, id: &RecRef, version: u16) -> PRes<bool> {
943 956
        let locked_page = if !self.locked_index_pages.contains(id) {
944 957
            address.acquire_record_lock(id)?;
945 958
            true
@@ -964,7 +977,7 @@
Loading
964 977
        }
965 978
    }
966 979
967 -
    pub fn unlock_record(&mut self, address: &Address, _segment: u32, id: &RecRef) -> PRes<()> {
980 +
    pub fn unlock_record(&mut self, address: &Address, _segment: SegmentId, id: &RecRef) -> PRes<()> {
968 981
        assert!(self.locked_index_pages.remove(id));
969 982
        address.release_record_lock(id)?;
970 983
        Ok(())
@@ -972,7 +985,7 @@
Loading
972 985
}
973 986
974 987
impl DeleteRecord {
975 -
    pub fn new(segment: u32, rec_ref: &RecRef, version: u16) -> DeleteRecord {
988 +
    pub fn new(segment: SegmentId, rec_ref: &RecRef, version: u16) -> DeleteRecord {
976 989
        DeleteRecord {
977 990
            segment,
978 991
            recref: rec_ref.clone(),
@@ -982,7 +995,7 @@
Loading
982 995
}
983 996
984 997
impl UpdateRecord {
985 -
    pub fn new(segment: u32, rec_ref: &RecRef, record: u64, version: u16) -> UpdateRecord {
998 +
    pub fn new(segment: SegmentId, rec_ref: &RecRef, record: u64, version: u16) -> UpdateRecord {
986 999
        UpdateRecord {
987 1000
            segment,
988 1001
            recref: rec_ref.clone(),
@@ -993,7 +1006,7 @@
Loading
993 1006
}
994 1007
995 1008
impl ReadRecord {
996 -
    pub fn new(segment: u32, recref: &RecRef, version: u16) -> ReadRecord {
1009 +
    pub fn new(segment: SegmentId, recref: &RecRef, version: u16) -> ReadRecord {
997 1010
        ReadRecord {
998 1011
            segment,
999 1012
            recref: recref.clone(),
@@ -1027,7 +1040,7 @@
Loading
1027 1040
}
1028 1041
1029 1042
impl InsertRecord {
1030 -
    pub fn new(segment: u32, rec_ref: &RecRef, record: u64) -> InsertRecord {
1043 +
    pub fn new(segment: SegmentId, rec_ref: &RecRef, record: u64) -> InsertRecord {
1031 1044
        InsertRecord {
1032 1045
            segment,
1033 1046
            recref: rec_ref.clone(),
@@ -1037,7 +1050,7 @@
Loading
1037 1050
}
1038 1051
1039 1052
impl CreateSegment {
1040 -
    pub fn new(name: &str, segment_id: u32, first_page: u64) -> CreateSegment {
1053 +
    pub fn new(name: &str, segment_id: SegmentId, first_page: u64) -> CreateSegment {
1041 1054
        CreateSegment {
1042 1055
            name: name.into(),
1043 1056
            segment_id,
@@ -1047,7 +1060,7 @@
Loading
1047 1060
}
1048 1061
1049 1062
impl DropSegment {
1050 -
    pub fn new(name: &str, segment_id: u32) -> DropSegment {
1063 +
    pub fn new(name: &str, segment_id: SegmentId) -> DropSegment {
1051 1064
        DropSegment {
1052 1065
            name: name.into(),
1053 1066
            segment_id,
@@ -1071,7 +1084,7 @@
Loading
1071 1084
}
1072 1085
1073 1086
impl NewSegmentPage {
1074 -
    pub fn new(segment: u32, page: u64, previous: u64) -> NewSegmentPage {
1087 +
    pub fn new(segment: SegmentId, page: u64, previous: u64) -> NewSegmentPage {
1075 1088
        NewSegmentPage {
1076 1089
            segment,
1077 1090
            page,
@@ -1083,16 +1096,22 @@
Loading
1083 1096
#[cfg(test)]
1084 1097
mod tests {
1085 1098
    use super::{DeleteRecord, FreedPage, InsertRecord, Transaction, UpdateRecord};
1086 -
    use crate::{id::RecRef, journal::JournalId};
1099 +
    use crate::{
1100 +
        id::{RecRef, SegmentId},
1101 +
        journal::JournalId,
1102 +
    };
1087 1103
1088 1104
    #[test]
1089 1105
    fn test_scan_insert() {
1106 +
        let segment_id = SegmentId::new(10);
1107 +
        let segment_id_other = SegmentId::new(20);
1090 1108
        let mut tx = Transaction::recover(JournalId::new(0, 0));
1091 -
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 2), 2));
1092 -
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(4, 2), 2));
1093 -
        tx.inserted.push(InsertRecord::new(20, &RecRef::new(0, 1), 3));
1109 +
        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
1110 +
        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(4, 2), 2));
1111 +
        tx.inserted
1112 +
            .push(InsertRecord::new(segment_id_other, &RecRef::new(0, 1), 3));
1094 1113
        let mut count = 0;
1095 -
        for x in tx.scan_insert(10) {
1114 +
        for x in tx.scan_insert(segment_id) {
1096 1115
            assert_eq!(x.pos, 2);
1097 1116
            count += 1;
1098 1117
        }
@@ -1101,22 +1120,28 @@
Loading
1101 1120
1102 1121
    #[test]
1103 1122
    fn test_collapse() {
1123 +
        let segment_id = SegmentId::new(10);
1124 +
        let segment_id_other = SegmentId::new(20);
1104 1125
        let mut tx = Transaction::recover(JournalId::new(0, 0));
1105 -
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 1), 1));
1106 -
        tx.inserted.push(InsertRecord::new(10, &RecRef::new(3, 2), 2));
1107 -
        tx.inserted.push(InsertRecord::new(20, &RecRef::new(3, 3), 3));
1108 -
        tx.inserted.push(InsertRecord::new(20, &RecRef::new(3, 4), 4));
1109 -
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 1), 5, 1));
1110 -
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 1), 6, 1));
1111 -
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 2), 7, 1));
1112 -
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 5), 8, 1));
1113 -
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 5), 9, 1));
1114 -
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 6), 10, 1));
1115 -
        tx.updated.push(UpdateRecord::new(10, &RecRef::new(3, 7), 11, 1));
1116 -
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 1), 0));
1117 -
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 3), 1));
1118 -
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 6), 2));
1119 -
        tx.deleted.push(DeleteRecord::new(10, &RecRef::new(3, 8), 2));
1126 +
        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 1), 1));
1127 +
        tx.inserted.push(InsertRecord::new(segment_id, &RecRef::new(3, 2), 2));
1128 +
        tx.inserted
1129 +
            .push(InsertRecord::new(segment_id_other, &RecRef::new(3, 3), 3));
1130 +
        tx.inserted
1131 +
            .push(InsertRecord::new(segment_id_other, &RecRef::new(3, 4), 4));
1132 +
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 5, 1));
1133 +
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 1), 6, 1));
1134 +
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 2), 7, 1));
1135 +
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 8, 1));
1136 +
        tx.updated.push(UpdateRecord::new(segment_id, &RecRef::new(3, 5), 9, 1));
1137 +
        tx.updated
1138 +
            .push(UpdateRecord::new(segment_id, &RecRef::new(3, 6), 10, 1));
1139 +
        tx.updated
1140 +
            .push(UpdateRecord::new(segment_id, &RecRef::new(3, 7), 11, 1));
1141 +
        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 1), 0));
1142 +
        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 3), 1));
1143 +
        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 6), 2));
1144 +
        tx.deleted.push(DeleteRecord::new(segment_id, &RecRef::new(3, 8), 2));
1120 1145
        let free = tx.collapse_operations();
1121 1146
        assert_eq!(free.len(), 7);
1122 1147
        for e in [1, 2, 3, 5, 6, 8, 10].iter().map(|x| FreedPage::new(*x)) {

@@ -2,7 +2,10 @@
Loading
2 2
    address::Address,
3 3
    error::{PRes, PersyError},
4 4
    index::config::{format_segment_name_data, format_segment_name_meta, index_name_from_meta_segment},
5 -
    io::{InfallibleRead, InfallibleReadVarInt, InfallibleWrite, InfallibleWriteVarInt},
5 +
    io::{
6 +
        InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite, InfallibleWriteFormat,
7 +
        InfallibleWriteVarInt,
8 +
    },
6 9
    persy::PersyImpl,
7 10
    snapshot::{SnapshotId, Snapshots},
8 11
    transaction::{Transaction, TxSegCheck},
@@ -97,7 +100,7 @@
Loading
97 100
/// # Serialization
98 101
/// This type supports serialization,
99 102
/// It does NOT serialize to the segment name.
100 -
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
103 +
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Default)]
101 104
pub struct SegmentId {
102 105
    pub(crate) id: u32,
103 106
}
@@ -107,6 +110,24 @@
Loading
107 110
    pub(crate) fn new(id: u32) -> Self {
108 111
        SegmentId { id }
109 112
    }
113 +
114 +
    pub(crate) fn write_varint(&self, write: &mut dyn InfallibleWrite) {
115 +
        write.write_varint_u32(self.id);
116 +
    }
117 +
118 +
    pub(crate) fn read_varint(read: &mut dyn InfallibleRead) -> SegmentId {
119 +
        let id = read.read_varint_u32();
120 +
        Self::new(id)
121 +
    }
122 +
123 +
    pub(crate) fn write(&self, write: &mut dyn InfallibleWrite) {
124 +
        write.write_u32(self.id);
125 +
    }
126 +
127 +
    pub(crate) fn read(read: &mut dyn InfallibleRead) -> SegmentId {
128 +
        let id = read.read_u32();
129 +
        Self::new(id)
130 +
    }
110 131
}
111 132
112 133
pub trait ToSegmentId {
@@ -139,8 +160,7 @@
Loading
139 160
    fn to_segment_id_tx(self, _persy: &PersyImpl, tx: &Transaction) -> PRes<(SegmentId, bool)> {
140 161
        // Is safe to check only if is created in tx, because a segment cannot be created and
141 162
        // dropped in the same tx
142 -
        let id = self.id;
143 -
        Ok((self, tx.segment_created_in_tx(id)))
163 +
        Ok((self, tx.segment_created_in_tx(self)))
144 164
    }
145 165
    #[inline]
146 166
    fn to_segment_id_snapshot(self, _snapshots: &Snapshots, _snapshot: SnapshotId) -> PRes<SegmentId> {
@@ -153,13 +173,13 @@
Loading
153 173
    fn to_segment_id(self, address: &Address) -> PRes<SegmentId> {
154 174
        address
155 175
            .segment_id(self.as_ref())?
156 -
            .map_or(Err(PersyError::SegmentNotFound), |id| Ok(SegmentId { id }))
176 +
            .map_or(Err(PersyError::SegmentNotFound), |id| Ok(id))
157 177
    }
158 178
    #[inline]
159 179
    fn to_segment_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(SegmentId, bool)> {
160 180
        persy
161 181
            .check_segment_tx(tx, self.as_ref())
162 -
            .map(|(cr_in_tx, id)| (SegmentId::new(id), cr_in_tx))
182 +
            .map(|(cr_in_tx, id)| (id, cr_in_tx))
163 183
    }
164 184
    fn to_segment_id_snapshot(self, snapshots: &Snapshots, snapshot: SnapshotId) -> PRes<SegmentId> {
165 185
        snapshots
@@ -185,8 +205,8 @@
Loading
185 205
/// Unique identifier of an index
186 206
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
187 207
pub struct IndexId {
188 -
    meta: u32,
189 -
    data: u32,
208 +
    meta: SegmentId,
209 +
    data: SegmentId,
190 210
}
191 211
192 212
pub trait ToIndexId {
@@ -212,7 +232,7 @@
Loading
212 232
impl ToIndexId for IndexId {
213 233
    #[inline]
214 234
    fn to_index_id(self, address: &Address) -> PRes<IndexId> {
215 -
        if self.data == 0 {
235 +
        if self.data.id == 0 {
216 236
            // backward compatible serialization does not have data, find it by name convention
217 237
            let meta_name = address
218 238
                .segment_name_by_id(self.meta)?
@@ -226,16 +246,16 @@
Loading
226 246
    }
227 247
228 248
    fn to_index_id_snapshot(self, snapshots: &Snapshots, snapshot: SnapshotId) -> PRes<IndexId> {
229 -
        if self.data == 0 {
249 +
        if self.data.id == 0 {
230 250
            // backward compatible serialization does not have data, find it by name convention
231 251
            let meta_name = snapshots
232 -
                .solve_segment_name(snapshot, SegmentId::new(self.meta))?
252 +
                .solve_segment_name(snapshot, self.meta)?
233 253
                .ok_or(PersyError::IndexNotFound)?;
234 254
            let data_name = format_segment_name_data(&index_name_from_meta_segment(&meta_name));
235 255
            let data = snapshots
236 256
                .solve_segment_id(snapshot, &data_name)?
237 257
                .ok_or(PersyError::IndexNotFound)?;
238 -
            Ok(IndexId::new(self.meta, data.id))
258 +
            Ok(IndexId::new(self.meta, data))
239 259
        } else {
240 260
            Ok(self)
241 261
        }
@@ -243,7 +263,7 @@
Loading
243 263
244 264
    #[inline]
245 265
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(IndexId, bool)> {
246 -
        if self.data == 0 {
266 +
        if self.data.id == 0 {
247 267
            // backward compatible serialization does not have data, find it by name convention
248 268
            let (meta_name, in_tx) = persy.segment_name_tx(tx, self.meta)?.ok_or(PersyError::IndexNotFound)?;
249 269
            let data_name = format_segment_name_data(&index_name_from_meta_segment(&meta_name));
@@ -288,7 +308,7 @@
Loading
288 308
        let data = snapshots
289 309
            .solve_segment_id(snapshot, &data_name)?
290 310
            .ok_or(PersyError::IndexNotFound)?;
291 -
        Ok(IndexId::new(meta.id, data.id))
311 +
        Ok(IndexId::new(meta, data))
292 312
    }
293 313
    #[inline]
294 314
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &Transaction) -> PRes<(IndexId, bool)> {
@@ -300,7 +320,7 @@
Loading
300 320
    }
301 321
}
302 322
impl IndexId {
303 -
    pub(crate) fn new(meta: u32, data: u32) -> IndexId {
323 +
    pub(crate) fn new(meta: SegmentId, data: SegmentId) -> IndexId {
304 324
        IndexId { meta, data }
305 325
    }
306 326
}
@@ -308,8 +328,8 @@
Loading
308 328
impl fmt::Display for IndexId {
309 329
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
310 330
        let mut buffer = Vec::new();
311 -
        buffer.write_all(u32_venc(self.meta, &mut u32_buffer()));
312 -
        buffer.write_all(u32_venc(self.data, &mut u32_buffer()));
331 +
        buffer.write_all(u32_venc(self.meta.id, &mut u32_buffer()));
332 +
        buffer.write_all(u32_venc(self.data.id, &mut u32_buffer()));
313 333
        buffer.push(0b0101_0101);
314 334
        write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
315 335
    }
@@ -327,15 +347,15 @@
Loading
327 347
        } else {
328 348
            0
329 349
        };
330 -
        Ok(IndexId::new(meta, data))
350 +
        Ok(IndexId::new(SegmentId::new(meta), SegmentId::new(data)))
331 351
    }
332 352
}
333 353
334 354
pub fn index_id_to_segment_id_meta(id: &IndexId) -> SegmentId {
335 -
    SegmentId { id: id.meta }
355 +
    id.meta
336 356
}
337 357
pub fn index_id_to_segment_id_data(id: &IndexId) -> SegmentId {
338 -
    SegmentId { id: id.data }
358 +
    id.data
339 359
}
340 360
341 361
#[cfg(test)]
@@ -376,7 +396,7 @@
Loading
376 396
377 397
    #[test]
378 398
    fn test_index_id_string() {
379 -
        let id = IndexId::new(20, 30);
399 +
        let id = IndexId::new(SegmentId::new(20), SegmentId::new(30));
380 400
        let s = format!("{}", id);
381 401
        assert_eq!(s.parse::<IndexId>().ok(), Some(id));
382 402
    }

@@ -77,7 +77,7 @@
Loading
77 77
        iterator: SegmentPageIterator,
78 78
        snapshot_id: SnapshotId,
79 79
    ) -> TxSegmentRawIter {
80 -
        let iter = tx.scan_insert(segment_id.id).into_iter();
80 +
        let iter = tx.scan_insert(segment_id).into_iter();
81 81
        TxSegmentRawIter {
82 82
            segment_id,
83 83
            tx_iterator: iter,

@@ -3,7 +3,7 @@
Loading
3 3
    config::Config,
4 4
    discref::{Page, PageOps, PAGE_METADATA_SIZE},
5 5
    error::{PRes, PersyError},
6 -
    id::{PersyId, RecRef},
6 +
    id::{PersyId, RecRef, SegmentId},
7 7
    locks::{LockManager, RwLockManager},
8 8
    segment::{AllocatedSegmentPage, SegmentPage, SegmentPageIterator, SegmentPageRead, Segments},
9 9
    transaction::{DeleteRecord, InsertRecord, NewSegmentPage, SegmentOperation, UpdateRecord},
@@ -26,13 +26,13 @@
Loading
26 26
27 27
pub struct OldRecordInfo {
28 28
    pub recref: RecRef,
29 -
    pub segment: u32,
29 +
    pub segment: SegmentId,
30 30
    pub record_page: u64,
31 31
    pub version: u16,
32 32
}
33 33
34 34
impl OldRecordInfo {
35 -
    fn new(recref: &RecRef, segment: u32, record_page: u64, version: u16) -> OldRecordInfo {
35 +
    fn new(recref: &RecRef, segment: SegmentId, record_page: u64, version: u16) -> OldRecordInfo {
36 36
        OldRecordInfo {
37 37
            recref: recref.clone(),
38 38
            segment,
@@ -48,7 +48,7 @@
Loading
48 48
    config: Arc<Config>,
49 49
    allocator: Arc<Allocator>,
50 50
    record_locks: LockManager<RecRef>,
51 -
    segment_locks: RwLockManager<u32>,
51 +
    segment_locks: RwLockManager<SegmentId>,
52 52
    segments: RwLock<Segments>,
53 53
}
54 54
@@ -71,7 +71,7 @@
Loading
71 71
        Ok(page_index)
72 72
    }
73 73
74 -
    pub fn scan(&self, segment: u32) -> PRes<SegmentPageIterator> {
74 +
    pub fn scan(&self, segment: SegmentId) -> PRes<SegmentPageIterator> {
75 75
        let segments = self.segments.read()?;
76 76
        if let Some(segment) = segments.segment_by_id(segment) {
77 77
            Ok(SegmentPageIterator::new(segment.first_page))
@@ -96,7 +96,7 @@
Loading
96 96
        page.segment_scan_all_entries()
97 97
    }
98 98
99 -
    pub fn allocate_temp(&self, segment: u32) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
99 +
    pub fn allocate_temp(&self, segment: SegmentId) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
100 100
        if let Some(found) = self.segments.write()?.get_temp_segment_mut(segment) {
101 101
            found.allocate_internal(&self.allocator)
102 102
        } else {
@@ -104,15 +104,15 @@
Loading
104 104
        }
105 105
    }
106 106
107 -
    pub fn create_temp_segment(&self, segment: &str) -> PRes<(u32, u64)> {
107 +
    pub fn create_temp_segment(&self, segment: &str) -> PRes<(SegmentId, u64)> {
108 108
        self.segments.write()?.create_temp_segment(&self.allocator, segment)
109 109
    }
110 110
111 -
    pub fn drop_temp_segment(&self, segment: u32) -> PRes<()> {
111 +
    pub fn drop_temp_segment(&self, segment: SegmentId) -> PRes<()> {
112 112
        self.segments.write()?.drop_temp_segment(&self.allocator, segment)
113 113
    }
114 114
115 -
    pub fn allocate(&self, segment: u32) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
115 +
    pub fn allocate(&self, segment: SegmentId) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
116 116
        if let Some(found) = self.segments.write()?.segments.get_mut(&segment) {
117 117
            found.allocate_internal(&self.allocator)
118 118
        } else {
@@ -120,7 +120,12 @@
Loading
120 120
        }
121 121
    }
122 122
123 -
    pub fn acquire_locks(&self, records: &[(u32, RecRef, u16)], created_updated: &[u32], deleted: &[u32]) -> PRes<()> {
123 +
    pub fn acquire_locks(
124 +
        &self,
125 +
        records: &[(SegmentId, RecRef, u16)],
126 +
        created_updated: &[SegmentId],
127 +
        deleted: &[SegmentId],
128 +
    ) -> PRes<()> {
124 129
        let timeout = *self.config.transaction_lock_timeout();
125 130
        self.segment_locks.lock_all_write(&deleted, timeout)?;
126 131
        if let Err(x) = self.segment_locks.lock_all_read(&created_updated, timeout) {
@@ -159,7 +164,7 @@
Loading
159 164
        Ok(())
160 165
    }
161 166
162 -
    pub fn acquire_segment_read_lock(&self, segment: u32) -> PRes<()> {
167 +
    pub fn acquire_segment_read_lock(&self, segment: SegmentId) -> PRes<()> {
163 168
        let timeout = *self.config.transaction_lock_timeout();
164 169
        self.segment_locks.lock_all_read(&[segment], timeout)?;
165 170
        Ok(())
@@ -170,7 +175,7 @@
Loading
170 175
        Ok(())
171 176
    }
172 177
173 -
    pub fn release_segment_read_lock(&self, segment: u32) -> PRes<()> {
178 +
    pub fn release_segment_read_lock(&self, segment: SegmentId) -> PRes<()> {
174 179
        self.segment_locks.unlock_all_read(&[segment])?;
175 180
        Ok(())
176 181
    }
@@ -179,7 +184,7 @@
Loading
179 184
        Ok(())
180 185
    }
181 186
182 -
    pub fn confirm_allocations(&self, segs: &[u32], recover: bool) -> PRes<()> {
187 +
    pub fn confirm_allocations(&self, segs: &[SegmentId], recover: bool) -> PRes<()> {
183 188
        let mut segments = self.segments.write()?;
184 189
        segments.confirm_allocations(&segs, &self.allocator, recover)?;
185 190
        Ok(())
@@ -187,7 +192,7 @@
Loading
187 192
188 193
    pub fn check_persistent_records(
189 194
        &self,
190 -
        records: &[(u32, RecRef, u16)],
195 +
        records: &[(SegmentId, RecRef, u16)],
191 196
        check_version: bool,
192 197
    ) -> PRes<Vec<OldRecordInfo>> {
193 198
        let mut current_record_pages = Vec::with_capacity(records.len());
@@ -208,8 +213,8 @@
Loading
208 213
    pub fn release_locks<'a>(
209 214
        &self,
210 215
        records: impl Iterator<Item = &'a RecRef>,
211 -
        created_updated: &[u32],
212 -
        deleted: &[u32],
216 +
        created_updated: &[SegmentId],
217 +
        deleted: &[SegmentId],
213 218
    ) -> PRes<()> {
214 219
        self.record_locks.unlock_all_iter(records)?;
215 220
        self.segment_locks.unlock_all_read(&created_updated)?;
@@ -217,7 +222,7 @@
Loading
217 222
        Ok(())
218 223
    }
219 224
220 -
    pub fn rollback(&self, inserts: &[InsertRecord]) -> PRes<Vec<(u32, u64)>> {
225 +
    pub fn rollback(&self, inserts: &[InsertRecord]) -> PRes<Vec<(SegmentId, u64)>> {
221 226
        let mut segments = self.segments.write()?;
222 227
        let mut pages_to_remove = Vec::new();
223 228
        let mut pages = HashMap::new();
@@ -245,7 +250,7 @@
Loading
245 250
        deletes: &[DeleteRecord],
246 251
        seg_ops: &[SegmentOperation],
247 252
        recover: bool,
248 -
    ) -> PRes<Vec<(u32, u64)>> {
253 +
    ) -> PRes<Vec<(SegmentId, u64)>> {
249 254
        let mut segments = self.segments.write()?;
250 255
        let mut dropped = HashSet::new();
251 256
        for seg_op in seg_ops {
@@ -341,12 +346,12 @@
Loading
341 346
        Ok(pages_to_remove)
342 347
    }
343 348
344 -
    pub fn collect_segment_pages(&self, segment: u32) -> PRes<Vec<u64>> {
349 +
    pub fn collect_segment_pages(&self, segment: SegmentId) -> PRes<Vec<u64>> {
345 350
        let segments = self.segments.read()?;
346 351
        segments.collect_segment_pages(&self.allocator, segment)
347 352
    }
348 353
349 -
    pub fn clear_empty(&self, empty: &[(u32, u64)]) -> PRes<()> {
354 +
    pub fn clear_empty(&self, empty: &[(SegmentId, u64)]) -> PRes<()> {
350 355
        let mut segments = self.segments.write()?;
351 356
        for (segment, page) in empty {
352 357
            let mut p = self.allocator.load_page(*page)?;
@@ -373,24 +378,24 @@
Loading
373 378
        Ok(self.segments.read()?.has_segment(segment))
374 379
    }
375 380
376 -
    pub fn segment_id(&self, segment: &str) -> PRes<Option<u32>> {
381 +
    pub fn segment_id(&self, segment: &str) -> PRes<Option<SegmentId>> {
377 382
        Ok(self.segments.read()?.segment_id(segment))
378 383
    }
379 384
380 -
    pub fn segment_name_by_id(&self, segment: u32) -> PRes<Option<String>> {
385 +
    pub fn segment_name_by_id(&self, segment: SegmentId) -> PRes<Option<String>> {
381 386
        Ok(self.segments.read()?.segment_name_by_id(segment))
382 387
    }
383 388
384 389
    // Used only from the tests
385 390
    #[allow(dead_code)]
386 -
    pub fn insert(&self, segment_id: u32, recref: &RecRef, record_page: u64) -> PRes<()> {
391 +
    pub fn insert(&self, segment_id: SegmentId, recref: &RecRef, record_page: u64) -> PRes<()> {
387 392
        let mut page = self.allocator.write_page(recref.page)?;
388 393
        page.segment_insert_entry(segment_id, recref.pos, record_page)?;
389 394
        self.allocator.flush_page(page)?;
390 395
        Ok(())
391 396
    }
392 397
393 -
    pub fn read(&self, recref: &RecRef, segment: u32) -> PRes<Option<(u64, u16)>> {
398 +
    pub fn read(&self, recref: &RecRef, segment: SegmentId) -> PRes<Option<(u64, u16)>> {
394 399
        let mut page = self.allocator.load_page(recref.page)?;
395 400
        page.segment_read_entry(segment, recref.pos)
396 401
    }
@@ -402,10 +407,10 @@
Loading
402 407
        })
403 408
    }
404 409
405 -
    pub fn list(&self) -> PRes<Vec<(String, u32)>> {
410 +
    pub fn list(&self) -> PRes<Vec<(String, SegmentId)>> {
406 411
        Ok(self.segments.read()?.list())
407 412
    }
408 -
    pub fn snapshot_list(&self) -> PRes<Vec<(String, u32, u64)>> {
413 +
    pub fn snapshot_list(&self) -> PRes<Vec<(String, SegmentId, u64)>> {
409 414
        Ok(self.segments.read()?.snapshot_list())
410 415
    }
411 416
}
@@ -413,12 +418,13 @@
Loading
413 418
#[cfg(test)]
414 419
mod tests {
415 420
    use super::Address;
421 +
    use crate::id::SegmentId;
416 422
    use crate::transaction::{CreateSegment, DeleteRecord, InsertRecord, SegmentOperation, UpdateRecord};
417 423
    use crate::{allocator::Allocator, config::Config, discref::DiscRef};
418 424
    use std::sync::Arc;
419 425
    use tempfile::Builder;
420 426
421 -
    fn init_test_address(file_name: &str) -> (Address, u32) {
427 +
    fn init_test_address(file_name: &str) -> (Address, SegmentId) {
422 428
        let file = Builder::new().prefix(file_name).suffix(".persy").tempfile().unwrap();
423 429
        let config = Arc::new(Config::new());
424 430
        let disc = Box::new(DiscRef::new(file.reopen().unwrap()).unwrap());
@@ -472,7 +478,11 @@
Loading
472 478
473 479
        deleted.push(DeleteRecord::new(segment_id, &recref, 1));
474 480
        let mut seg_ops = Vec::new();
475 -
        seg_ops.push(SegmentOperation::CREATE(CreateSegment::new("def", 20, 20)));
481 +
        seg_ops.push(SegmentOperation::CREATE(CreateSegment::new(
482 +
            "def",
483 +
            SegmentId::new(20),
484 +
            20,
485 +
        )));
476 486
477 487
        add.apply(&[], &inserted, &updated, &deleted, &seg_ops, false).unwrap();
478 488
Files Coverage
src 92.94%
tests 98.56%
Project Totals (29 files) 93.94%
1
coverage:
2
  status:
3
    project:
4
      default:
5
        target: 80%
6
        threshold: 1.0%
7
    patch: off
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading