1
use crate::{
2
    allocator::Allocator,
3
    config::Config,
4
    discref::{Page, PageOps, PAGE_METADATA_SIZE},
5
    error::{PRes, PersyError},
6
    id::{PersyId, RecRef, SegmentId},
7
    locks::{LockManager, RwLockManager},
8
    segment::{AllocatedSegmentPage, SegmentPage, SegmentPageIterator, SegmentPageRead, Segments},
9
    transaction::{DeleteRecord, InsertRecord, NewSegmentPage, SegmentOperation, UpdateRecord},
10
};
11
use std::{
12
    collections::{hash_map::Entry, HashMap, HashSet},
13
    sync::{Arc, RwLock},
14
};
15

16
pub const ADDRESS_ROOT_PAGE_EXP: u8 = 6; // 2^6
17
pub const ADDRESS_PAGE_EXP: u8 = 10; // 2^10
18
pub const ADDRESS_PAGE_SIZE: u32 = (1 << ADDRESS_PAGE_EXP) - PAGE_METADATA_SIZE; // 2^10 -2 size - page header
19
pub const ADDRESS_PAGE_ENTRY_COUNT: u32 = (ADDRESS_PAGE_SIZE - SEGMENT_DATA_OFFSET) / ADDRESS_ENTRY_SIZE;
20
pub const FLAG_EXISTS: u8 = 0b000_0001;
21
pub const FLAG_DELETED: u8 = 0b000_0010;
22
pub const SEGMENT_HASH_OFFSET: u32 = 16;
23
pub const SEGMENT_PAGE_DELETE_COUNT_OFFSET: u32 = 20;
24
pub const SEGMENT_DATA_OFFSET: u32 = 22;
25
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2; // Pointer to data page + flags + version management (not yet used)
26

27
pub struct OldRecordInfo {
28
    pub recref: RecRef,
29
    pub segment: SegmentId,
30
    pub record_page: u64,
31
    pub version: u16,
32
}
33

34
impl OldRecordInfo {
35 1
    fn new(recref: &RecRef, segment: SegmentId, record_page: u64, version: u16) -> OldRecordInfo {
36 1
        OldRecordInfo {
37 1
            recref: recref.clone(),
38
            segment,
39
            record_page,
40
            version,
41
        }
42 1
    }
43
}
44

45
/// Address segment keep the basic addressing of the data in the data segment for a specific
46
/// data block
47
pub struct Address {
48
    config: Arc<Config>,
49
    allocator: Arc<Allocator>,
50
    record_locks: LockManager<RecRef>,
51
    segment_locks: RwLockManager<SegmentId>,
52
    segments: RwLock<Segments>,
53
}
54

55
impl Address {
56 1
    pub fn new(all: &Arc<Allocator>, config: &Arc<Config>, page: u64) -> PRes<Address> {
57 1
        let segments = Segments::new(page, all)?;
58 1
        Ok(Address {
59 1
            config: config.clone(),
60 1
            allocator: all.clone(),
61 1
            record_locks: Default::default(),
62 1
            segment_locks: Default::default(),
63 1
            segments: RwLock::new(segments),
64 0
        })
65 1
    }
66

67 1
    pub fn init(all: &Allocator) -> PRes<u64> {
68 1
        let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
69 1
        let page_index = page.get_index();
70 1
        Segments::init(page, all)?;
71 1
        Ok(page_index)
72 1
    }
73

74 1
    pub fn scan(&self, segment: SegmentId) -> PRes<SegmentPageIterator> {
75 1
        let segments = self.segments.read()?;
76 1
        if let Some(segment) = segments.segment_by_id(segment) {
77 1
            Ok(SegmentPageIterator::new(segment.first_page))
78 1
        } else if let Some(temp_segment) = segments.segment_by_id_temp(segment) {
79 1
            Ok(SegmentPageIterator::new(temp_segment.first_page))
80
        } else {
81 0
            Err(PersyError::SegmentNotFound)
82
        }
83 1
    }
84

85 1
    pub fn scan_page(&self, cur_page: u64) -> PRes<(u64, Vec<u32>)> {
86
        // THIS IS ONLY FOR LOCK PROTECTION
87 1
        let _lock = self.segments.read()?;
88 1
        let mut page = self.allocator.load_page(cur_page)?;
89 1
        page.segment_scan_entries()
90 1
    }
91

92 1
    pub fn scan_page_all(&self, cur_page: u64) -> PRes<(u64, Vec<(u32, bool)>)> {
93
        // THIS IS ONLY FOR LOCK PROTECTION
94 1
        let _lock = self.segments.read()?;
95 1
        let mut page = self.allocator.load_page(cur_page)?;
96 1
        page.segment_scan_all_entries()
97 1
    }
98

99 1
    pub fn allocate_temp(&self, segment: SegmentId) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
100 1
        if let Some(found) = self.segments.write()?.get_temp_segment_mut(segment) {
101 1
            found.allocate_internal(&self.allocator)
102
        } else {
103 0
            Err(PersyError::SegmentNotFound)
104
        }
105 1
    }
106

107 1
    pub fn create_temp_segment(&self, segment: &str) -> PRes<(SegmentId, u64)> {
108 1
        self.segments.write()?.create_temp_segment(&self.allocator, segment)
109 1
    }
110

111 1
    pub fn drop_temp_segment(&self, segment: SegmentId) -> PRes<()> {
112 1
        self.segments.write()?.drop_temp_segment(&self.allocator, segment)
113 1
    }
114

115 1
    pub fn allocate(&self, segment: SegmentId) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
116 1
        if let Some(found) = self.segments.write()?.segments.get_mut(&segment) {
117 1
            found.allocate_internal(&self.allocator)
118
        } else {
119 0
            Err(PersyError::SegmentNotFound)
120
        }
121 1
    }
122

123 1
    pub fn acquire_locks(
124
        &self,
125
        records: &[(SegmentId, RecRef, u16)],
126
        created_updated: &[SegmentId],
127
        deleted: &[SegmentId],
128
    ) -> PRes<()> {
129 1
        let timeout = *self.config.transaction_lock_timeout();
130 1
        self.segment_locks.lock_all_write(&deleted, timeout)?;
131 1
        if let Err(x) = self.segment_locks.lock_all_read(&created_updated, timeout) {
132 0
            if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
133 0
                dbg!("unlock error: {}", e);
134 0
            }
135 0
            return Err(x);
136 1
        }
137

138 1
        let to_lock: Vec<_> = records.iter().map(|(_, id, _)| id.clone()).collect();
139 1
        if let Err(x) = self.record_locks.lock_all(&to_lock, timeout) {
140 0
            if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
141 0
                dbg!("unlock error: {}", e);
142 0
            }
143 0
            if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
144 0
                dbg!("unlock error: {}", e);
145 0
            }
146 1
            return Err(x);
147 1
        }
148

149 1
        let segs = self.segments.read()?;
150 1
        for segment in created_updated {
151 1
            if !segs.exists_real_or_temp(*segment) {
152 1
                if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
153 0
                    dbg!("unlock error: {}", e);
154 0
                }
155 1
                if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
156 0
                    dbg!("unlock error: {}", e);
157 0
                }
158 1
                if let Err(e) = self.record_locks.unlock_all(&to_lock) {
159 0
                    dbg!("unlock error: {}", e);
160 0
                }
161 1
                return Err(PersyError::SegmentNotFound);
162
            }
163
        }
164 1
        Ok(())
165 1
    }
166

167 1
    pub fn acquire_segment_read_lock(&self, segment: SegmentId) -> PRes<()> {
168 1
        let timeout = *self.config.transaction_lock_timeout();
169 1
        self.segment_locks.lock_all_read(&[segment], timeout)?;
170 1
        Ok(())
171 1
    }
172 1
    pub fn acquire_record_lock(&self, id: &RecRef) -> PRes<()> {
173 1
        let timeout = *self.config.transaction_lock_timeout();
174 1
        self.record_locks.lock_all(&[id.clone()], timeout)?;
175 1
        Ok(())
176 1
    }
177

178
    pub fn release_segment_read_lock(&self, segment: SegmentId) -> PRes<()> {
179
        self.segment_locks.unlock_all_read(&[segment])?;
180
        Ok(())
181
    }
182 1
    pub fn release_record_lock(&self, id: &RecRef) -> PRes<()> {
183 1
        self.record_locks.unlock_all(&[id.clone()])?;
184 1
        Ok(())
185 1
    }
186

187 1
    pub fn confirm_allocations(&self, segs: &[SegmentId], recover: bool) -> PRes<()> {
188 1
        let mut segments = self.segments.write()?;
189 1
        segments.confirm_allocations(&segs, &self.allocator, recover)?;
190 1
        Ok(())
191 1
    }
192

193 1
    pub fn check_persistent_records(
194
        &self,
195
        records: &[(SegmentId, RecRef, u16)],
196
        check_version: bool,
197
    ) -> PRes<Vec<OldRecordInfo>> {
198 1
        let mut current_record_pages = Vec::with_capacity(records.len());
199 1
        for &(segment, ref recref, version) in records {
200 1
            let val = self.read(recref, segment)?;
201 1
            if let Some((record, pers_version)) = val {
202 1
                current_record_pages.push(OldRecordInfo::new(&recref, segment, record, pers_version));
203 1
                if check_version && pers_version != version {
204 1
                    return Err(PersyError::VersionNotLastest);
205
                }
206
            } else {
207 1
                return Err(PersyError::RecordNotFound(PersyId(recref.clone())));
208
            }
209
        }
210 1
        Ok(current_record_pages)
211 1
    }
212

213 1
    pub fn release_locks<'a>(
214
        &self,
215
        records: impl Iterator<Item = &'a RecRef>,
216
        created_updated: &[SegmentId],
217
        deleted: &[SegmentId],
218
    ) -> PRes<()> {
219 1
        self.record_locks.unlock_all_iter(records)?;
220 1
        self.segment_locks.unlock_all_read(&created_updated)?;
221 1
        self.segment_locks.unlock_all_write(&deleted)?;
222 1
        Ok(())
223 1
    }
224

225 1
    pub fn rollback(&self, inserts: &[InsertRecord]) -> PRes<Vec<(SegmentId, u64)>> {
226 1
        let mut segments = self.segments.write()?;
227 1
        let mut pages_to_remove = Vec::new();
228 1
        let mut pages = HashMap::new();
229 1
        for insert in inserts {
230 1
            if segments.segments.contains_key(&insert.segment) {
231 1
                let page = insert.recref.page;
232 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
233 1
                if seg_page.segment_delete_entry(insert.segment, insert.recref.pos)? && seg_page.get_next()? != 0 {
234 0
                    pages_to_remove.push((insert.segment, page));
235
                }
236
            }
237
        }
238 1
        for (_, to_flush) in pages.into_iter() {
239 1
            self.allocator.flush_page(to_flush)?;
240 1
        }
241 1
        segments.flush_segments(&self.allocator)?;
242 1
        Ok(pages_to_remove)
243 1
    }
244

245 1
    pub fn apply(
246
        &self,
247
        segs_new_pages: &[NewSegmentPage],
248
        inserts: &[InsertRecord],
249
        updates: &[UpdateRecord],
250
        deletes: &[DeleteRecord],
251
        seg_ops: &[SegmentOperation],
252
        recover: bool,
253
    ) -> PRes<Vec<(SegmentId, u64)>> {
254 1
        let mut segments = self.segments.write()?;
255 1
        let mut dropped = HashSet::new();
256 1
        for seg_op in seg_ops {
257 1
            if let SegmentOperation::DROP(ref op) = *seg_op {
258 1
                dropped.insert(op.segment_id);
259
            }
260
        }
261 1
        let mut pages = HashMap::new();
262

263 1
        if recover {
264 1
            for new_page in segs_new_pages {
265 0
                let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
266 0
                p_page.set_next(new_page.page)?;
267 0
                let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
268 0
                n_page.set_prev(new_page.previous)?;
269 0
                n_page.set_segment_id(new_page.segment)?;
270
            }
271
        }
272 1
        for insert in inserts {
273 1
            if !dropped.contains(&insert.segment) {
274 1
                let page = insert.recref.page;
275 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
276 1
                seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page)?;
277
            }
278
        }
279

280 1
        for update in updates {
281 1
            if !dropped.contains(&update.segment) {
282 1
                let page = update.recref.page;
283 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
284 1
                seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page)?;
285
            }
286
        }
287 1
        let mut pages_to_remove = Vec::new();
288

289 1
        for delete in deletes {
290 1
            if !dropped.contains(&delete.segment) {
291 1
                let page = delete.recref.page;
292 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
293 1
                if seg_page.segment_delete_entry(delete.segment, delete.recref.pos)? {
294
                    // Avoid to remove last pages, for avoid concurrent operations with page
295
                    // creation
296 1
                    if seg_page.get_next()? != 0 {
297 1
                        pages_to_remove.push((delete.segment, page));
298
                    }
299
                }
300
            }
301
        }
302

303 1
        if recover {
304 1
            for (_, mut to_flush) in pages.into_iter() {
305 1
                to_flush.recalc_count()?;
306 1
                self.allocator.flush_page(to_flush)?;
307 1
            }
308

309 1
            let recover_page = |record_page: u64| {
310 1
                let page = self.allocator.load_page(record_page)?;
311 1
                self.allocator.remove_from_free(record_page, page.get_size_exp())
312 1
            };
313 1
            let mut segs = HashSet::new();
314 1
            for insert in inserts {
315 1
                recover_page(insert.record_page)?;
316 1
                segs.insert(insert.segment);
317
            }
318 1
            for update in updates {
319 1
                recover_page(update.record_page)?;
320 1
                segs.insert(update.segment);
321
            }
322 1
            for delete in deletes {
323 1
                segs.insert(delete.segment);
324
            }
325

326 1
            segments.confirm_allocations(&segs.into_iter().collect::<Vec<_>>(), &self.allocator, true)?;
327 1
        } else {
328 1
            for (_, to_flush) in pages.into_iter() {
329 1
                self.allocator.flush_page(to_flush)?;
330 1
            }
331
        }
332

333 1
        for seg_op in seg_ops {
334 1
            if let SegmentOperation::DROP(ref op) = *seg_op {
335 1
                segments.drop_segment(&op.name)?;
336
            }
337
        }
338

339 1
        for seg_op in seg_ops {
340 1
            if let SegmentOperation::CREATE(ref op) = *seg_op {
341 1
                segments.create_segment(op.segment_id, op.first_page)?;
342
            }
343
        }
344 1
        segments.flush_segments(&self.allocator)?;
345

346 1
        Ok(pages_to_remove)
347 1
    }
348

349 1
    pub fn collect_segment_pages(&self, segment: SegmentId) -> PRes<Vec<u64>> {
350 1
        let segments = self.segments.read()?;
351 1
        segments.collect_segment_pages(&self.allocator, segment)
352 1
    }
353

354 1
    pub fn clear_empty(&self, empty: &[(SegmentId, u64)]) -> PRes<()> {
355 1
        let mut segments = self.segments.write()?;
356 1
        for (segment, page) in empty {
357 1
            let mut p = self.allocator.load_page(*page)?;
358 1
            let next = p.get_next()?;
359 1
            let prev = p.get_prev()?;
360 1
            debug_assert!(next != 0);
361 1
            let mut next_page = self.allocator.write_page(next)?;
362
            // if (prev == 0), this is like doing setPrev(0)
363 1
            next_page.set_prev(prev)?;
364 1
            self.allocator.flush_page(next_page)?;
365 1
            if prev != 0 {
366 0
                let mut prev_page = self.allocator.write_page(prev)?;
367 0
                prev_page.set_next(next)?;
368 0
                self.allocator.flush_page(prev_page)?;
369 1
            } else if next != 0 {
370 1
                segments.set_first_page(*segment, next, &self.allocator)?;
371
            }
372 1
        }
373

374 1
        Ok(())
375 1
    }
376

377 1
    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
378 1
        Ok(self.segments.read()?.has_segment(segment))
379 1
    }
380

381 1
    pub fn segment_id(&self, segment: &str) -> PRes<Option<SegmentId>> {
382 1
        Ok(self.segments.read()?.segment_id(segment))
383 1
    }
384

385
    pub fn segment_name_by_id(&self, segment: SegmentId) -> PRes<Option<String>> {
386
        Ok(self.segments.read()?.segment_name_by_id(segment))
387
    }
388

389
    // Used only from the tests
390
    #[allow(dead_code)]
391 1
    pub fn insert(&self, segment_id: SegmentId, recref: &RecRef, record_page: u64) -> PRes<()> {
392 1
        let mut page = self.allocator.write_page(recref.page)?;
393 1
        page.segment_insert_entry(segment_id, recref.pos, record_page)?;
394 1
        self.allocator.flush_page(page)?;
395 1
        Ok(())
396 1
    }
397

398 1
    pub fn read(&self, recref: &RecRef, segment: SegmentId) -> PRes<Option<(u64, u16)>> {
399 1
        let mut page = self.allocator.load_page(recref.page)?;
400 1
        page.segment_read_entry(segment, recref.pos)
401 1
    }
402

403 1
    fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PRes<&'a mut Page> {
404 1
        Ok(match map.entry(k) {
405 1
            Entry::Occupied(entry) => entry.into_mut(),
406 1
            Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
407
        })
408 1
    }
409

410 1
    pub fn list(&self) -> PRes<Vec<(String, SegmentId)>> {
411 1
        Ok(self.segments.read()?.list())
412 1
    }
413 1
    pub fn snapshot_list(&self) -> PRes<Vec<(String, SegmentId, u64)>> {
414 1
        Ok(self.segments.read()?.snapshot_list())
415 1
    }
416
}
417

418
#[cfg(test)]
419
mod tests {
420
    use super::Address;
421
    use crate::id::SegmentId;
422
    use crate::transaction::{CreateSegment, DeleteRecord, InsertRecord, SegmentOperation, UpdateRecord};
423
    use crate::{allocator::Allocator, config::Config, discref::DiscRef};
424
    use std::sync::Arc;
425
    use tempfile::Builder;
426

427 1
    fn init_test_address(file_name: &str) -> (Address, SegmentId) {
428 1
        let file = Builder::new().prefix(file_name).suffix(".persy").tempfile().unwrap();
429 1
        let config = Arc::new(Config::new());
430 1
        let disc = Box::new(DiscRef::new(file.reopen().unwrap()).unwrap());
431 1
        let (_, allocator) = Allocator::init(disc, &config).unwrap();
432 1
        let page = Address::init(&allocator).unwrap();
433 1
        let addr = Address::new(&Arc::new(allocator), &config, page).unwrap();
434 1
        let (id, fp) = addr.create_temp_segment("def").unwrap();
435 1
        addr.segments.write().unwrap().create_segment(id, fp).unwrap();
436 1
        (addr, id)
437 1
    }
438

439
    #[test]
440 1
    fn test_init_and_new_address() {
441 1
        let (add, segment_id) = init_test_address("./addr_test");
442 1
        assert_eq!(
443 1
            add.segments
444
                .read()
445
                .unwrap()
446
                .segment_by_id(segment_id)
447
                .unwrap()
448
                .alloc_page,
449
            1088
450
        );
451 1
        assert_eq!(
452 1
            add.segments
453
                .read()
454
                .unwrap()
455
                .segment_by_id(segment_id)
456
                .unwrap()
457
                .alloc_pos,
458
            22
459
        );
460 1
    }
461

462
    #[test]
463 1
    fn test_insert_update_delete_read_apply_pointer() {
464 1
        let (add, segment_id) = init_test_address("./addr_insert_update_delete_apply_test.persy");
465 1
        let (recref, _) = add.allocate(segment_id).unwrap();
466 1
        add.insert(segment_id, &recref, 10).unwrap();
467 1
        let (recref_1, _) = add.allocate(segment_id).unwrap();
468 1
        add.insert(segment_id, &recref_1, 20).unwrap();
469

470 1
        let mut inserted = Vec::new();
471 1
        let (recref_2, _) = add.allocate(segment_id).unwrap();
472 1
        inserted.push(InsertRecord::new(segment_id, &recref_2, 30));
473

474 1
        let mut updated = Vec::new();
475 1
        updated.push(UpdateRecord::new(segment_id, &recref_1, 40, 1));
476

477 1
        let mut deleted = Vec::new();
478

479 1
        deleted.push(DeleteRecord::new(segment_id, &recref, 1));
480 1
        let mut seg_ops = Vec::new();
481 1
        seg_ops.push(SegmentOperation::CREATE(CreateSegment::new(
482
            "def",
483 1
            SegmentId::new(20),
484
            20,
485
        )));
486

487 1
        add.apply(&[], &inserted, &updated, &deleted, &seg_ops, false).unwrap();
488

489 1
        let read = add.read(&recref, segment_id).unwrap();
490 1
        let read_1 = add.read(&recref_1, segment_id).unwrap();
491 1
        let read_2 = add.read(&recref_2, segment_id).unwrap();
492 0
        match read {
493 1
            Some(_) => assert!(false),
494
            None => assert!(true),
495
        }
496 0
        match read_1 {
497 1
            Some(val) => assert_eq!(val.0, 40),
498 0
            None => assert!(false),
499
        }
500 0
        match read_2 {
501 1
            Some(val) => assert_eq!(val.0, 30),
502 0
            None => assert!(false),
503
        }
504 1
    }
505

506
    #[test]
507 1
    fn test_insert_scan() {
508 1
        let (add, segment_id) = init_test_address("./addr_scan_test.persy");
509 1
        let (recref, _) = add.allocate(segment_id).unwrap();
510 1
        add.insert(segment_id, &recref, 10).unwrap();
511 1
        let (recref_1, _) = add.allocate(segment_id).unwrap();
512 1
        add.insert(segment_id, &recref_1, 20).unwrap();
513 1
        let mut to_iter = add.scan(segment_id).unwrap();
514 1
        let mut count = 0;
515 1
        while to_iter.next(&add).is_some() {
516 1
            count += 1;
517
        }
518 1
        assert_eq!(count, 2);
519 1
        let mut iter = add.scan(segment_id).unwrap();
520 1
        let re = iter.next(&add).unwrap();
521 1
        assert_eq!(re.page, recref.page);
522 1
        assert_eq!(re.pos, recref.pos);
523 1
        let re_1 = iter.next(&add).unwrap();
524 1
        assert_eq!(re_1.page, recref_1.page);
525 1
        assert_eq!(re_1.pos, recref_1.pos);
526 1
    }
527

528
    #[test]
529 1
    fn test_insert_over_page() {
530 1
        let (add, segment_id) = init_test_address("./addr_insert_over_page.persy");
531 1
        for z in 0..1000 {
532 1
            let (recref, _) = add.allocate(segment_id).unwrap();
533 1
            add.insert(segment_id, &recref, z).unwrap();
534
        }
535 1
        let mut to_iter = add.scan(segment_id).unwrap();
536 1
        let mut count = 0;
537 1
        while to_iter.next(&add).is_some() {
538 1
            count += 1;
539
        }
540 1
        assert_eq!(count, 1000);
541 1
    }
542
}

Read our documentation on viewing source code .

Loading