1
use crate::{
2
    allocator::Allocator,
3
    config::Config,
4
    discref::{Page, PageOps, PAGE_METADATA_SIZE},
5
    error::{PRes, PersyError},
6
    id::{PersyId, RecRef, SegmentId},
7
    locks::{LockManager, RwLockManager},
8
    segment::{AllocatedSegmentPage, SegmentPage, SegmentPageIterator, SegmentPageRead, Segments},
9
    transaction::{DeleteRecord, InsertRecord, NewSegmentPage, SegmentOperation, UpdateRecord},
10
};
11
use std::{
12
    collections::{hash_map::Entry, HashMap, HashSet},
13
    sync::{Arc, RwLock},
14
};
15

16
pub const ADDRESS_ROOT_PAGE_EXP: u8 = 6; // 2^6
17
pub const ADDRESS_PAGE_EXP: u8 = 10; // 2^10
18
pub const ADDRESS_PAGE_SIZE: u32 = (1 << ADDRESS_PAGE_EXP) - PAGE_METADATA_SIZE; // 2^10 -2 size - page header
19
pub const ADDRESS_PAGE_ENTRY_COUNT: u32 = (ADDRESS_PAGE_SIZE - SEGMENT_DATA_OFFSET) / ADDRESS_ENTRY_SIZE;
20
pub const FLAG_EXISTS: u8 = 0b000_0001;
21
pub const FLAG_DELETED: u8 = 0b000_0010;
22
pub const SEGMENT_HASH_OFFSET: u32 = 16;
23
pub const SEGMENT_PAGE_DELETE_COUNT_OFFSET: u32 = 24;
24
pub const SEGMENT_DATA_OFFSET: u32 = 26;
25
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2; // Pointer to data page + flags + version management (not yet used)
26

27
pub struct OldRecordInfo {
28
    pub recref: RecRef,
29
    pub segment: SegmentId,
30
    pub record_page: u64,
31
    pub version: u16,
32
}
33

34
impl OldRecordInfo {
35 1
    fn new(recref: &RecRef, segment: SegmentId, record_page: u64, version: u16) -> OldRecordInfo {
36 1
        OldRecordInfo {
37 1
            recref: recref.clone(),
38
            segment,
39
            record_page,
40
            version,
41
        }
42 1
    }
43
}
44

45
/// Address segment keep the basic addressing of the data in the data segment for a specific
46
/// data block
47
pub struct Address {
48
    config: Arc<Config>,
49
    allocator: Arc<Allocator>,
50
    record_locks: LockManager<RecRef>,
51
    segment_locks: RwLockManager<SegmentId>,
52
    segments: RwLock<Segments>,
53
}
54

55
impl Address {
56 1
    pub fn new(all: &Arc<Allocator>, config: &Arc<Config>, page: u64) -> PRes<Address> {
57 1
        let segments = Segments::new(page, all)?;
58 1
        Ok(Address {
59 1
            config: config.clone(),
60 1
            allocator: all.clone(),
61 1
            record_locks: Default::default(),
62 1
            segment_locks: Default::default(),
63 1
            segments: RwLock::new(segments),
64 0
        })
65 1
    }
66

67 1
    pub fn init(all: &Allocator) -> PRes<u64> {
68 1
        let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
69 1
        let page_index = page.get_index();
70 1
        Segments::init(page, all)?;
71 1
        Ok(page_index)
72 1
    }
73

74 1
    pub fn scan(&self, segment: SegmentId) -> PRes<SegmentPageIterator> {
75 1
        let segments = self.segments.read()?;
76 1
        if let Some(segment) = segments.segment_by_id(segment) {
77 1
            Ok(SegmentPageIterator::new(segment.first_page))
78 1
        } else if let Some(temp_segment) = segments.segment_by_id_temp(segment) {
79 1
            Ok(SegmentPageIterator::new(temp_segment.first_page))
80
        } else {
81 0
            Err(PersyError::SegmentNotFound)
82
        }
83 1
    }
84

85 1
    pub fn scan_page_all(&self, cur_page: u64) -> PRes<(u64, [(u32, bool); ADDRESS_PAGE_ENTRY_COUNT as usize])> {
86
        // THIS IS ONLY FOR LOCK PROTECTION
87 1
        let _lock = self.segments.read()?;
88 1
        let mut page = self.allocator.load_page(cur_page)?;
89 1
        page.segment_scan_all_entries()
90 1
    }
91

92 1
    pub fn allocate_temp(&self, segment: SegmentId) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
93 1
        if let Some(found) = self.segments.write()?.get_temp_segment_mut(segment) {
94 1
            found.allocate_internal(&self.allocator)
95
        } else {
96 0
            Err(PersyError::SegmentNotFound)
97
        }
98 1
    }
99

100 1
    pub fn create_temp_segment(&self, segment: &str) -> PRes<(SegmentId, u64)> {
101 1
        self.segments.write()?.create_temp_segment(&self.allocator, segment)
102 1
    }
103

104 1
    pub fn drop_temp_segment(&self, segment: SegmentId) -> PRes<()> {
105 1
        self.segments.write()?.drop_temp_segment(&self.allocator, segment)
106 1
    }
107

108 1
    pub fn allocate(&self, segment: SegmentId) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
109 1
        if let Some(found) = self.segments.write()?.segments.get_mut(&segment) {
110 1
            found.allocate_internal(&self.allocator)
111
        } else {
112 0
            Err(PersyError::SegmentNotFound)
113
        }
114 1
    }
115

116 1
    pub fn acquire_locks(
117
        &self,
118
        records: &[(SegmentId, RecRef, u16)],
119
        created_updated: &[SegmentId],
120
        deleted: &[SegmentId],
121
    ) -> PRes<()> {
122 1
        let timeout = *self.config.transaction_lock_timeout();
123 1
        self.segment_locks.lock_all_write(&deleted, timeout)?;
124 1
        if let Err(x) = self.segment_locks.lock_all_read(&created_updated, timeout) {
125 0
            if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
126 0
                dbg!("unlock error: {}", e);
127 0
            }
128 0
            return Err(x);
129 1
        }
130

131 1
        let to_lock: Vec<_> = records.iter().map(|(_, id, _)| id.clone()).collect();
132 1
        if let Err(x) = self.record_locks.lock_all(&to_lock, timeout) {
133 0
            if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
134 0
                dbg!("unlock error: {}", e);
135 0
            }
136 0
            if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
137 0
                dbg!("unlock error: {}", e);
138 0
            }
139 1
            return Err(x);
140 1
        }
141

142 1
        let segs = self.segments.read()?;
143 1
        for segment in created_updated {
144 1
            if !segs.exists_real_or_temp(*segment) {
145 1
                if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
146 0
                    dbg!("unlock error: {}", e);
147 0
                }
148 1
                if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
149 0
                    dbg!("unlock error: {}", e);
150 0
                }
151 1
                if let Err(e) = self.record_locks.unlock_all(&to_lock) {
152 0
                    dbg!("unlock error: {}", e);
153 0
                }
154 1
                return Err(PersyError::SegmentNotFound);
155
            }
156
        }
157 1
        Ok(())
158 1
    }
159

160 1
    pub fn acquire_segment_read_lock(&self, segment: SegmentId) -> PRes<()> {
161 1
        let timeout = *self.config.transaction_lock_timeout();
162 1
        self.segment_locks.lock_all_read(&[segment], timeout)?;
163 1
        Ok(())
164 1
    }
165 1
    pub fn acquire_record_lock(&self, id: &RecRef) -> PRes<()> {
166 1
        let timeout = *self.config.transaction_lock_timeout();
167 1
        self.record_locks.lock_all(&[id.clone()], timeout)?;
168 1
        Ok(())
169 1
    }
170

171
    pub fn release_segment_read_lock(&self, segment: SegmentId) -> PRes<()> {
172
        self.segment_locks.unlock_all_read(&[segment])?;
173
        Ok(())
174
    }
175 1
    pub fn release_record_lock(&self, id: &RecRef) -> PRes<()> {
176 1
        self.record_locks.unlock_all(&[id.clone()])?;
177 1
        Ok(())
178 1
    }
179

180 1
    pub fn confirm_allocations(&self, segs: &[SegmentId], recover: bool) -> PRes<()> {
181 1
        let mut segments = self.segments.write()?;
182 1
        segments.confirm_allocations(&segs, &self.allocator, recover)?;
183 1
        Ok(())
184 1
    }
185

186 1
    pub fn check_persistent_records(
187
        &self,
188
        records: &[(SegmentId, RecRef, u16)],
189
        check_version: bool,
190
    ) -> PRes<Vec<OldRecordInfo>> {
191 1
        let mut current_record_pages = Vec::with_capacity(records.len());
192 1
        for &(segment, ref recref, version) in records {
193 1
            let val = self.read(recref, segment)?;
194 1
            if let Some((record, pers_version)) = val {
195 1
                current_record_pages.push(OldRecordInfo::new(&recref, segment, record, pers_version));
196 1
                if check_version && pers_version != version {
197 1
                    return Err(PersyError::VersionNotLastest);
198
                }
199
            } else {
200 1
                return Err(PersyError::RecordNotFound(PersyId(recref.clone())));
201
            }
202
        }
203 1
        Ok(current_record_pages)
204 1
    }
205

206 1
    pub fn release_locks<'a>(
207
        &self,
208
        records: impl Iterator<Item = &'a RecRef>,
209
        created_updated: &[SegmentId],
210
        deleted: &[SegmentId],
211
    ) -> PRes<()> {
212 1
        self.record_locks.unlock_all_iter(records)?;
213 1
        self.segment_locks.unlock_all_read(&created_updated)?;
214 1
        self.segment_locks.unlock_all_write(&deleted)?;
215 1
        Ok(())
216 1
    }
217

218 1
    pub fn rollback(&self, inserts: &[InsertRecord]) -> PRes<Vec<(SegmentId, u64)>> {
219 1
        let mut segments = self.segments.write()?;
220 1
        let mut pages_to_remove = Vec::new();
221 1
        let mut pages = HashMap::new();
222 1
        for insert in inserts {
223 1
            if segments.segments.contains_key(&insert.segment) {
224 1
                let page = insert.recref.page;
225 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
226 1
                if seg_page.segment_delete_entry(insert.segment, insert.recref.pos)? && seg_page.get_next()? != 0 {
227 0
                    pages_to_remove.push((insert.segment, page));
228
                }
229
            }
230
        }
231 1
        for (_, to_flush) in pages.into_iter() {
232 1
            self.allocator.flush_page(to_flush)?;
233 1
        }
234 1
        segments.flush_segments(&self.allocator)?;
235 1
        Ok(pages_to_remove)
236 1
    }
237

238 1
    pub fn apply(
239
        &self,
240
        segs_new_pages: &[NewSegmentPage],
241
        inserts: &[InsertRecord],
242
        updates: &[UpdateRecord],
243
        deletes: &[DeleteRecord],
244
        seg_ops: &[SegmentOperation],
245
        recover: bool,
246
    ) -> PRes<Vec<(SegmentId, u64)>> {
247 1
        let mut segments = self.segments.write()?;
248 1
        let mut dropped = HashSet::new();
249 1
        for seg_op in seg_ops {
250 1
            if let SegmentOperation::DROP(ref op) = *seg_op {
251 1
                dropped.insert(op.segment_id);
252
            }
253
        }
254 1
        let mut pages = HashMap::new();
255

256 1
        if recover {
257 1
            for new_page in segs_new_pages {
258 0
                let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
259 0
                p_page.set_next(new_page.page)?;
260 0
                let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
261 0
                n_page.set_prev(new_page.previous)?;
262 0
                n_page.set_segment_id(new_page.segment)?;
263
            }
264
        }
265 1
        for insert in inserts {
266 1
            if !dropped.contains(&insert.segment) {
267 1
                let page = insert.recref.page;
268 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
269 1
                seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page)?;
270
            }
271
        }
272

273 1
        for update in updates {
274 1
            if !dropped.contains(&update.segment) {
275 1
                let page = update.recref.page;
276 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
277 1
                seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page)?;
278
            }
279
        }
280 1
        let mut pages_to_remove = Vec::new();
281

282 1
        for delete in deletes {
283 1
            if !dropped.contains(&delete.segment) {
284 1
                let page = delete.recref.page;
285 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
286 1
                if seg_page.segment_delete_entry(delete.segment, delete.recref.pos)? {
287
                    // Avoid to remove last pages, for avoid concurrent operations with page
288
                    // creation
289 1
                    if seg_page.get_next()? != 0 {
290 1
                        pages_to_remove.push((delete.segment, page));
291
                    }
292
                }
293
            }
294
        }
295

296 1
        if recover {
297 1
            for (_, mut to_flush) in pages.into_iter() {
298 1
                to_flush.recalc_count()?;
299 1
                self.allocator.flush_page(to_flush)?;
300 1
            }
301

302 1
            let recover_page = |record_page: u64| {
303 1
                let page = self.allocator.load_page(record_page)?;
304 1
                self.allocator.remove_from_free(record_page, page.get_size_exp())
305 1
            };
306 1
            let mut segs = HashSet::new();
307 1
            for insert in inserts {
308 1
                recover_page(insert.record_page)?;
309 1
                segs.insert(insert.segment);
310
            }
311 1
            for update in updates {
312 1
                recover_page(update.record_page)?;
313 1
                segs.insert(update.segment);
314
            }
315 1
            for delete in deletes {
316 1
                segs.insert(delete.segment);
317
            }
318

319 1
            segments.confirm_allocations(&segs.into_iter().collect::<Vec<_>>(), &self.allocator, true)?;
320 1
        } else {
321 1
            for (_, to_flush) in pages.into_iter() {
322 1
                self.allocator.flush_page(to_flush)?;
323 1
            }
324
        }
325

326 1
        for seg_op in seg_ops {
327 1
            if let SegmentOperation::DROP(ref op) = *seg_op {
328 1
                segments.drop_segment(&op.name)?;
329
            }
330
        }
331

332 1
        for seg_op in seg_ops {
333 1
            if let SegmentOperation::CREATE(ref op) = *seg_op {
334 1
                segments.create_segment(op.segment_id, op.first_page)?;
335
            }
336
        }
337 1
        segments.flush_segments(&self.allocator)?;
338

339 1
        Ok(pages_to_remove)
340 1
    }
341

342 1
    pub fn collect_segment_pages(&self, segment: SegmentId) -> PRes<Vec<u64>> {
343 1
        let segments = self.segments.read()?;
344 1
        segments.collect_segment_pages(&self.allocator, segment)
345 1
    }
346

347 1
    pub fn clear_empty(&self, empty: &[(SegmentId, u64)]) -> PRes<()> {
348 1
        let mut segments = self.segments.write()?;
349 1
        for (segment, page) in empty {
350 1
            let mut p = self.allocator.load_page(*page)?;
351 1
            let next = p.get_next()?;
352 1
            let prev = p.get_prev()?;
353 1
            debug_assert!(next != 0);
354 1
            let mut next_page = self.allocator.write_page(next)?;
355
            // if (prev == 0), this is like doing setPrev(0)
356 1
            next_page.set_prev(prev)?;
357 1
            self.allocator.flush_page(next_page)?;
358 1
            if prev != 0 {
359 0
                let mut prev_page = self.allocator.write_page(prev)?;
360 0
                prev_page.set_next(next)?;
361 0
                self.allocator.flush_page(prev_page)?;
362 1
            } else if next != 0 {
363 1
                segments.set_first_page(*segment, next, &self.allocator)?;
364
            }
365 1
        }
366

367 1
        Ok(())
368 1
    }
369

370 1
    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
371 1
        Ok(self.segments.read()?.has_segment(segment))
372 1
    }
373

374 1
    pub fn segment_id(&self, segment: &str) -> PRes<Option<SegmentId>> {
375 1
        Ok(self.segments.read()?.segment_id(segment))
376 1
    }
377

378
    pub fn segment_name_by_id(&self, segment: SegmentId) -> PRes<Option<String>> {
379
        Ok(self.segments.read()?.segment_name_by_id(segment))
380
    }
381

382
    // Used only from the tests
383
    #[allow(dead_code)]
384 1
    pub fn insert(&self, segment_id: SegmentId, recref: &RecRef, record_page: u64) -> PRes<()> {
385 1
        let mut page = self.allocator.write_page(recref.page)?;
386 1
        page.segment_insert_entry(segment_id, recref.pos, record_page)?;
387 1
        self.allocator.flush_page(page)?;
388 1
        Ok(())
389 1
    }
390

391 1
    pub fn read(&self, recref: &RecRef, segment: SegmentId) -> PRes<Option<(u64, u16)>> {
392 1
        let mut page = self.allocator.load_page(recref.page)?;
393 1
        page.segment_read_entry(segment, recref.pos)
394 1
    }
395

396 1
    fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PRes<&'a mut Page> {
397 1
        Ok(match map.entry(k) {
398 1
            Entry::Occupied(entry) => entry.into_mut(),
399 1
            Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
400
        })
401 1
    }
402

403 1
    pub fn list(&self) -> PRes<Vec<(String, SegmentId)>> {
404 1
        Ok(self.segments.read()?.list())
405 1
    }
406 1
    pub fn snapshot_list(&self) -> PRes<Vec<(String, SegmentId, u64)>> {
407 1
        Ok(self.segments.read()?.snapshot_list())
408 1
    }
409
}
410

411
#[cfg(test)]
412
mod tests {
413
    use super::Address;
414
    use crate::id::SegmentId;
415
    use crate::transaction::{CreateSegment, DeleteRecord, InsertRecord, SegmentOperation, UpdateRecord};
416
    use crate::{allocator::Allocator, config::Config, discref::DiscRef};
417
    use std::sync::Arc;
418
    use tempfile::Builder;
419

420 1
    fn init_test_address(file_name: &str) -> (Address, SegmentId) {
421 1
        let file = Builder::new().prefix(file_name).suffix(".persy").tempfile().unwrap();
422 1
        let config = Arc::new(Config::new());
423 1
        let disc = Box::new(DiscRef::new(file.reopen().unwrap()).unwrap());
424 1
        let (_, allocator) = Allocator::init(disc, &config).unwrap();
425 1
        let page = Address::init(&allocator).unwrap();
426 1
        let addr = Address::new(&Arc::new(allocator), &config, page).unwrap();
427 1
        let (id, fp) = addr.create_temp_segment("def").unwrap();
428 1
        addr.segments.write().unwrap().create_segment(id, fp).unwrap();
429 1
        (addr, id)
430 1
    }
431

432
    #[test]
433 1
    fn test_init_and_new_address() {
434 1
        let (add, segment_id) = init_test_address("./addr_test");
435 1
        assert_eq!(
436 1
            add.segments
437
                .read()
438
                .unwrap()
439
                .segment_by_id(segment_id)
440
                .unwrap()
441
                .alloc_page,
442
            1088
443
        );
444 1
        assert_eq!(
445 1
            add.segments
446
                .read()
447
                .unwrap()
448
                .segment_by_id(segment_id)
449
                .unwrap()
450
                .alloc_pos,
451
            26
452
        );
453 1
    }
454

455
    #[test]
456 1
    fn test_insert_update_delete_read_apply_pointer() {
457 1
        let (add, segment_id) = init_test_address("./addr_insert_update_delete_apply_test.persy");
458 1
        let (recref, _) = add.allocate(segment_id).unwrap();
459 1
        add.insert(segment_id, &recref, 10).unwrap();
460 1
        let (recref_1, _) = add.allocate(segment_id).unwrap();
461 1
        add.insert(segment_id, &recref_1, 20).unwrap();
462

463 1
        let mut inserted = Vec::new();
464 1
        let (recref_2, _) = add.allocate(segment_id).unwrap();
465 1
        inserted.push(InsertRecord::new(segment_id, &recref_2, 30));
466

467 1
        let mut updated = Vec::new();
468 1
        updated.push(UpdateRecord::new(segment_id, &recref_1, 40, 1));
469

470 1
        let mut deleted = Vec::new();
471

472 1
        deleted.push(DeleteRecord::new(segment_id, &recref, 1));
473 1
        let mut seg_ops = Vec::new();
474 1
        seg_ops.push(SegmentOperation::CREATE(CreateSegment::new(
475
            "def",
476 1
            SegmentId::new(20),
477
            20,
478
        )));
479

480 1
        add.apply(&[], &inserted, &updated, &deleted, &seg_ops, false).unwrap();
481

482 1
        let read = add.read(&recref, segment_id).unwrap();
483 1
        let read_1 = add.read(&recref_1, segment_id).unwrap();
484 1
        let read_2 = add.read(&recref_2, segment_id).unwrap();
485 0
        match read {
486 1
            Some(_) => assert!(false),
487
            None => assert!(true),
488
        }
489 0
        match read_1 {
490 1
            Some(val) => assert_eq!(val.0, 40),
491 0
            None => assert!(false),
492
        }
493 0
        match read_2 {
494 1
            Some(val) => assert_eq!(val.0, 30),
495 0
            None => assert!(false),
496
        }
497 1
    }
498

499
    #[test]
500 1
    fn test_insert_scan() {
501 1
        let (add, segment_id) = init_test_address("./addr_scan_test.persy");
502 1
        let (recref, _) = add.allocate(segment_id).unwrap();
503 1
        add.insert(segment_id, &recref, 10).unwrap();
504 1
        let (recref_1, _) = add.allocate(segment_id).unwrap();
505 1
        add.insert(segment_id, &recref_1, 20).unwrap();
506 1
        let mut to_iter = add.scan(segment_id).unwrap();
507 1
        let mut count = 0;
508 1
        while to_iter.next(&add).is_some() {
509 1
            count += 1;
510
        }
511 1
        assert_eq!(count, 2);
512 1
        let mut iter = add.scan(segment_id).unwrap();
513 1
        let re = iter.next(&add).unwrap();
514 1
        assert_eq!(re.page, recref.page);
515 1
        assert_eq!(re.pos, recref.pos);
516 1
        let re_1 = iter.next(&add).unwrap();
517 1
        assert_eq!(re_1.page, recref_1.page);
518 1
        assert_eq!(re_1.pos, recref_1.pos);
519 1
    }
520

521
    #[test]
522 1
    fn test_insert_over_page() {
523 1
        let (add, segment_id) = init_test_address("./addr_insert_over_page.persy");
524 1
        for z in 0..1000 {
525 1
            let (recref, _) = add.allocate(segment_id).unwrap();
526 1
            add.insert(segment_id, &recref, z).unwrap();
527
        }
528 1
        let mut to_iter = add.scan(segment_id).unwrap();
529 1
        let mut count = 0;
530 1
        while to_iter.next(&add).is_some() {
531 1
            count += 1;
532
        }
533 1
        assert_eq!(count, 1000);
534 1
    }
535
}

Read our documentation on viewing source code .

Loading