1
use crate::{
2
    allocator::Allocator,
3
    config::Config,
4
    discref::{Page, PageOps, PAGE_METADATA_SIZE},
5
    error::{PRes, PersyError},
6
    id::{PersyId, RecRef},
7
    locks::{LockManager, RwLockManager},
8
    segment::{AllocatedSegmentPage, SegmentPage, SegmentPageIterator, SegmentPageRead, Segments},
9
    transaction::{DeleteRecord, InsertRecord, NewSegmentPage, SegmentOperation, UpdateRecord},
10
};
11
use std::{
12
    collections::{hash_map::Entry, HashMap, HashSet},
13
    sync::{Arc, RwLock},
14
};
15

16
pub const ADDRESS_ROOT_PAGE_EXP: u8 = 6; // 2^6
17
pub const ADDRESS_PAGE_EXP: u8 = 10; // 2^10
18
pub const ADDRESS_PAGE_SIZE: u32 = (1 << ADDRESS_PAGE_EXP) - PAGE_METADATA_SIZE; // 2^10 -2 size - page header
19
pub const ADDRESS_PAGE_ENTRY_COUNT: u32 = (ADDRESS_PAGE_SIZE - SEGMENT_DATA_OFFSET) / ADDRESS_ENTRY_SIZE;
20
pub const FLAG_EXISTS: u8 = 0b000_0001;
21
pub const FLAG_DELETED: u8 = 0b000_0010;
22
pub const SEGMENT_HASH_OFFSET: u32 = 16;
23
pub const SEGMENT_PAGE_DELETE_COUNT_OFFSET: u32 = 20;
24
pub const SEGMENT_DATA_OFFSET: u32 = 22;
25
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2; // Pointer to data page + flags + version management (not yet used)
26

27
pub struct OldRecordInfo {
28
    pub recref: RecRef,
29
    pub segment: u32,
30
    pub record_page: u64,
31
    pub version: u16,
32
}
33

34
impl OldRecordInfo {
35 1
    fn new(recref: &RecRef, segment: u32, record_page: u64, version: u16) -> OldRecordInfo {
36 1
        OldRecordInfo {
37 1
            recref: recref.clone(),
38
            segment,
39
            record_page,
40
            version,
41
        }
42 1
    }
43
}
44

45
/// Address segment keep the basic addressing of the data in the data segment for a specific
46
/// data block
47
pub struct Address {
48
    config: Arc<Config>,
49
    allocator: Arc<Allocator>,
50
    record_locks: LockManager<RecRef>,
51
    segment_locks: RwLockManager<u32>,
52
    segments: RwLock<Segments>,
53
}
54

55
impl Address {
56 1
    pub fn new(all: &Arc<Allocator>, config: &Arc<Config>, page: u64) -> PRes<Address> {
57 1
        let segments = Segments::new(page, all)?;
58 1
        Ok(Address {
59 1
            config: config.clone(),
60 1
            allocator: all.clone(),
61 1
            record_locks: Default::default(),
62 1
            segment_locks: Default::default(),
63 1
            segments: RwLock::new(segments),
64 0
        })
65 1
    }
66

67 1
    pub fn init(all: &Allocator) -> PRes<u64> {
68 1
        let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
69 1
        let page_index = page.get_index();
70 1
        Segments::init(page, all)?;
71 1
        Ok(page_index)
72 1
    }
73

74 1
    pub fn scan(&self, segment: u32) -> PRes<SegmentPageIterator> {
75 1
        let segments = self.segments.read()?;
76 1
        if let Some(segment) = segments.segment_by_id(segment) {
77 1
            Ok(SegmentPageIterator::new(segment.first_page))
78 1
        } else if let Some(temp_segment) = segments.segment_by_id_temp(segment) {
79 1
            Ok(SegmentPageIterator::new(temp_segment.first_page))
80
        } else {
81 0
            Err(PersyError::SegmentNotFound)
82
        }
83 1
    }
84

85 1
    pub fn scan_page(&self, cur_page: u64) -> PRes<(u64, Vec<u32>)> {
86
        // THIS IS ONLY FOR LOCK PROTECTION
87 1
        let _lock = self.segments.read()?;
88 1
        let mut page = self.allocator.load_page(cur_page)?;
89 1
        page.segment_scan_entries()
90 1
    }
91

92 1
    pub fn scan_page_all(&self, cur_page: u64) -> PRes<(u64, Vec<(u32, bool)>)> {
93
        // THIS IS ONLY FOR LOCK PROTECTION
94 1
        let _lock = self.segments.read()?;
95 1
        let mut page = self.allocator.load_page(cur_page)?;
96 1
        page.segment_scan_all_entries()
97 1
    }
98

99 1
    pub fn allocate_temp(&self, segment: u32) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
100 1
        if let Some(found) = self.segments.write()?.get_temp_segment_mut(segment) {
101 1
            found.allocate_internal(&self.allocator)
102
        } else {
103 0
            Err(PersyError::SegmentNotFound)
104
        }
105 1
    }
106

107 1
    pub fn create_temp_segment(&self, segment: &str) -> PRes<(u32, u64)> {
108 1
        self.segments.write()?.create_temp_segment(&self.allocator, segment)
109 1
    }
110

111 1
    pub fn drop_temp_segment(&self, segment: u32) -> PRes<()> {
112 1
        self.segments.write()?.drop_temp_segment(&self.allocator, segment)
113 1
    }
114

115 1
    pub fn allocate(&self, segment: u32) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
116 1
        if let Some(found) = self.segments.write()?.segments.get_mut(&segment) {
117 1
            found.allocate_internal(&self.allocator)
118
        } else {
119 0
            Err(PersyError::SegmentNotFound)
120
        }
121 1
    }
122

123 1
    pub fn acquire_locks(&self, records: &[(u32, RecRef, u16)], created_updated: &[u32], deleted: &[u32]) -> PRes<()> {
124 1
        let timeout = *self.config.transaction_lock_timeout();
125 1
        self.segment_locks.lock_all_write(&deleted, timeout)?;
126 1
        if let Err(x) = self.segment_locks.lock_all_read(&created_updated, timeout) {
127 0
            if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
128 0
                dbg!("unlock error: {}", e);
129 0
            }
130 0
            return Err(x);
131 1
        }
132

133 1
        let to_lock: Vec<_> = records.iter().map(|(_, id, _)| id.clone()).collect();
134 1
        if let Err(x) = self.record_locks.lock_all(&to_lock, timeout) {
135 0
            if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
136 0
                dbg!("unlock error: {}", e);
137 0
            }
138 0
            if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
139 0
                dbg!("unlock error: {}", e);
140 0
            }
141 1
            return Err(x);
142 1
        }
143

144 1
        let segs = self.segments.read()?;
145 1
        for segment in created_updated {
146 1
            if !segs.exists_real_or_temp(*segment) {
147 1
                if let Err(e) = self.segment_locks.unlock_all_write(&deleted) {
148 0
                    dbg!("unlock error: {}", e);
149 0
                }
150 1
                if let Err(e) = self.segment_locks.unlock_all_read(&created_updated) {
151 0
                    dbg!("unlock error: {}", e);
152 0
                }
153 1
                if let Err(e) = self.record_locks.unlock_all(&to_lock) {
154 0
                    dbg!("unlock error: {}", e);
155 0
                }
156 1
                return Err(PersyError::SegmentNotFound);
157
            }
158
        }
159 1
        Ok(())
160 1
    }
161

162 1
    pub fn acquire_segment_read_lock(&self, segment: u32) -> PRes<()> {
163 1
        let timeout = *self.config.transaction_lock_timeout();
164 1
        self.segment_locks.lock_all_read(&[segment], timeout)?;
165 1
        Ok(())
166 1
    }
167 1
    pub fn acquire_record_lock(&self, id: &RecRef) -> PRes<()> {
168 1
        let timeout = *self.config.transaction_lock_timeout();
169 1
        self.record_locks.lock_all(&[id.clone()], timeout)?;
170 1
        Ok(())
171 1
    }
172

173
    pub fn release_segment_read_lock(&self, segment: u32) -> PRes<()> {
174
        self.segment_locks.unlock_all_read(&[segment])?;
175
        Ok(())
176
    }
177 1
    pub fn release_record_lock(&self, id: &RecRef) -> PRes<()> {
178 1
        self.record_locks.unlock_all(&[id.clone()])?;
179 1
        Ok(())
180 1
    }
181

182 1
    pub fn confirm_allocations(&self, segs: &[u32], recover: bool) -> PRes<()> {
183 1
        let mut segments = self.segments.write()?;
184 1
        segments.confirm_allocations(&segs, &self.allocator, recover)?;
185 1
        Ok(())
186 1
    }
187

188 1
    pub fn check_persistent_records(
189
        &self,
190
        records: &[(u32, RecRef, u16)],
191
        check_version: bool,
192
    ) -> PRes<Vec<OldRecordInfo>> {
193 1
        let mut current_record_pages = Vec::with_capacity(records.len());
194 1
        for &(segment, ref recref, version) in records {
195 1
            let val = self.read(recref, segment)?;
196 1
            if let Some((record, pers_version)) = val {
197 1
                current_record_pages.push(OldRecordInfo::new(&recref, segment, record, pers_version));
198 1
                if check_version && pers_version != version {
199 1
                    return Err(PersyError::VersionNotLastest);
200
                }
201
            } else {
202 1
                return Err(PersyError::RecordNotFound(PersyId(recref.clone())));
203
            }
204
        }
205 1
        Ok(current_record_pages)
206 1
    }
207

208 1
    pub fn release_locks<'a>(
209
        &self,
210
        records: impl Iterator<Item = &'a RecRef>,
211
        created_updated: &[u32],
212
        deleted: &[u32],
213
    ) -> PRes<()> {
214 1
        self.record_locks.unlock_all_iter(records)?;
215 1
        self.segment_locks.unlock_all_read(&created_updated)?;
216 1
        self.segment_locks.unlock_all_write(&deleted)?;
217 1
        Ok(())
218 1
    }
219

220 1
    pub fn rollback(&self, inserts: &[InsertRecord]) -> PRes<Vec<(u32, u64)>> {
221 1
        let mut segments = self.segments.write()?;
222 1
        let mut pages_to_remove = Vec::new();
223 1
        let mut pages = HashMap::new();
224 1
        for insert in inserts {
225 1
            if segments.segments.contains_key(&insert.segment) {
226 1
                let page = insert.recref.page;
227 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
228 1
                if seg_page.segment_delete_entry(insert.segment, insert.recref.pos)? && seg_page.get_next()? != 0 {
229 0
                    pages_to_remove.push((insert.segment, page));
230
                }
231
            }
232
        }
233 1
        for (_, to_flush) in pages.into_iter() {
234 1
            self.allocator.flush_page(to_flush)?;
235 1
        }
236 1
        segments.flush_segments(&self.allocator)?;
237 1
        Ok(pages_to_remove)
238 1
    }
239

240 1
    pub fn apply(
241
        &self,
242
        segs_new_pages: &[NewSegmentPage],
243
        inserts: &[InsertRecord],
244
        updates: &[UpdateRecord],
245
        deletes: &[DeleteRecord],
246
        seg_ops: &[SegmentOperation],
247
        recover: bool,
248
    ) -> PRes<Vec<(u32, u64)>> {
249 1
        let mut segments = self.segments.write()?;
250 1
        let mut dropped = HashSet::new();
251 1
        for seg_op in seg_ops {
252 1
            if let SegmentOperation::DROP(ref op) = *seg_op {
253 1
                dropped.insert(op.segment_id);
254
            }
255
        }
256 1
        let mut pages = HashMap::new();
257

258 1
        if recover {
259 1
            for new_page in segs_new_pages {
260 0
                let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
261 0
                p_page.set_next(new_page.page)?;
262 0
                let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
263 0
                n_page.set_prev(new_page.previous)?;
264 0
                n_page.set_segment_id(new_page.segment)?;
265
            }
266
        }
267 1
        for insert in inserts {
268 1
            if !dropped.contains(&insert.segment) {
269 1
                let page = insert.recref.page;
270 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
271 1
                seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page)?;
272
            }
273
        }
274

275 1
        for update in updates {
276 1
            if !dropped.contains(&update.segment) {
277 1
                let page = update.recref.page;
278 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
279 1
                seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page)?;
280
            }
281
        }
282 1
        let mut pages_to_remove = Vec::new();
283

284 1
        for delete in deletes {
285 1
            if !dropped.contains(&delete.segment) {
286 1
                let page = delete.recref.page;
287 1
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
288 1
                if seg_page.segment_delete_entry(delete.segment, delete.recref.pos)? {
289
                    // Avoid to remove last pages, for avoid concurrent operations with page
290
                    // creation
291 1
                    if seg_page.get_next()? != 0 {
292 1
                        pages_to_remove.push((delete.segment, page));
293
                    }
294
                }
295
            }
296
        }
297

298 1
        if recover {
299 1
            for (_, mut to_flush) in pages.into_iter() {
300 1
                to_flush.recalc_count()?;
301 1
                self.allocator.flush_page(to_flush)?;
302 1
            }
303

304 1
            let recover_page = |record_page: u64| {
305 1
                let page = self.allocator.load_page(record_page)?;
306 1
                self.allocator.remove_from_free(record_page, page.get_size_exp())
307 1
            };
308 1
            let mut segs = HashSet::new();
309 1
            for insert in inserts {
310 1
                recover_page(insert.record_page)?;
311 1
                segs.insert(insert.segment);
312
            }
313 1
            for update in updates {
314 1
                recover_page(update.record_page)?;
315 1
                segs.insert(update.segment);
316
            }
317 1
            for delete in deletes {
318 1
                segs.insert(delete.segment);
319
            }
320

321 1
            segments.confirm_allocations(&segs.into_iter().collect::<Vec<_>>(), &self.allocator, true)?;
322 1
        } else {
323 1
            for (_, to_flush) in pages.into_iter() {
324 1
                self.allocator.flush_page(to_flush)?;
325 1
            }
326
        }
327

328 1
        for seg_op in seg_ops {
329 1
            if let SegmentOperation::DROP(ref op) = *seg_op {
330 1
                segments.drop_segment(&op.name)?;
331
            }
332
        }
333

334 1
        for seg_op in seg_ops {
335 1
            if let SegmentOperation::CREATE(ref op) = *seg_op {
336 1
                segments.create_segment(op.segment_id, op.first_page)?;
337
            }
338
        }
339 1
        segments.flush_segments(&self.allocator)?;
340

341 1
        Ok(pages_to_remove)
342 1
    }
343

344 1
    pub fn collect_segment_pages(&self, segment: u32) -> PRes<Vec<u64>> {
345 1
        let segments = self.segments.read()?;
346 1
        segments.collect_segment_pages(&self.allocator, segment)
347 1
    }
348

349 1
    pub fn clear_empty(&self, empty: &[(u32, u64)]) -> PRes<()> {
350 1
        let mut segments = self.segments.write()?;
351 1
        for (segment, page) in empty {
352 1
            let mut p = self.allocator.load_page(*page)?;
353 1
            let next = p.get_next()?;
354 1
            let prev = p.get_prev()?;
355 1
            debug_assert!(next != 0);
356 1
            let mut next_page = self.allocator.write_page(next)?;
357
            // if (prev == 0), this is like doing setPrev(0)
358 1
            next_page.set_prev(prev)?;
359 1
            self.allocator.flush_page(next_page)?;
360 1
            if prev != 0 {
361 0
                let mut prev_page = self.allocator.write_page(prev)?;
362 0
                prev_page.set_next(next)?;
363 0
                self.allocator.flush_page(prev_page)?;
364 1
            } else if next != 0 {
365 1
                segments.set_first_page(*segment, next, &self.allocator)?;
366
            }
367 1
        }
368

369 1
        Ok(())
370 1
    }
371

372 1
    pub fn exists_segment(&self, segment: &str) -> PRes<bool> {
373 1
        Ok(self.segments.read()?.has_segment(segment))
374 1
    }
375

376 1
    pub fn segment_id(&self, segment: &str) -> PRes<Option<u32>> {
377 1
        Ok(self.segments.read()?.segment_id(segment))
378 1
    }
379

380
    pub fn segment_name_by_id(&self, segment: u32) -> PRes<Option<String>> {
381
        Ok(self.segments.read()?.segment_name_by_id(segment))
382
    }
383

384
    // Used only from the tests
385
    #[allow(dead_code)]
386 1
    pub fn insert(&self, segment_id: u32, recref: &RecRef, record_page: u64) -> PRes<()> {
387 1
        let mut page = self.allocator.write_page(recref.page)?;
388 1
        page.segment_insert_entry(segment_id, recref.pos, record_page)?;
389 1
        self.allocator.flush_page(page)?;
390 1
        Ok(())
391 1
    }
392

393 1
    pub fn read(&self, recref: &RecRef, segment: u32) -> PRes<Option<(u64, u16)>> {
394 1
        let mut page = self.allocator.load_page(recref.page)?;
395 1
        page.segment_read_entry(segment, recref.pos)
396 1
    }
397

398 1
    fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PRes<&'a mut Page> {
399 1
        Ok(match map.entry(k) {
400 1
            Entry::Occupied(entry) => entry.into_mut(),
401 1
            Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
402
        })
403 1
    }
404

405 1
    pub fn list(&self) -> PRes<Vec<(String, u32)>> {
406 1
        Ok(self.segments.read()?.list())
407 1
    }
408 1
    pub fn snapshot_list(&self) -> PRes<Vec<(String, u32, u64)>> {
409 1
        Ok(self.segments.read()?.snapshot_list())
410 1
    }
411
}
412

413
#[cfg(test)]
414
mod tests {
415
    use super::Address;
416
    use crate::transaction::{CreateSegment, DeleteRecord, InsertRecord, SegmentOperation, UpdateRecord};
417
    use crate::{allocator::Allocator, config::Config, discref::DiscRef};
418
    use std::sync::Arc;
419
    use tempfile::Builder;
420

421 1
    fn init_test_address(file_name: &str) -> (Address, u32) {
422 1
        let file = Builder::new().prefix(file_name).suffix(".persy").tempfile().unwrap();
423 1
        let config = Arc::new(Config::new());
424 1
        let disc = Box::new(DiscRef::new(file.reopen().unwrap()).unwrap());
425 1
        let (_, allocator) = Allocator::init(disc, &config).unwrap();
426 1
        let page = Address::init(&allocator).unwrap();
427 1
        let addr = Address::new(&Arc::new(allocator), &config, page).unwrap();
428 1
        let (id, fp) = addr.create_temp_segment("def").unwrap();
429 1
        addr.segments.write().unwrap().create_segment(id, fp).unwrap();
430 1
        (addr, id)
431 1
    }
432

433
    #[test]
434 1
    fn test_init_and_new_address() {
435 1
        let (add, segment_id) = init_test_address("./addr_test");
436 1
        assert_eq!(
437 1
            add.segments
438
                .read()
439
                .unwrap()
440
                .segment_by_id(segment_id)
441
                .unwrap()
442
                .alloc_page,
443
            1088
444
        );
445 1
        assert_eq!(
446 1
            add.segments
447
                .read()
448
                .unwrap()
449
                .segment_by_id(segment_id)
450
                .unwrap()
451
                .alloc_pos,
452
            22
453
        );
454 1
    }
455

456
    #[test]
457 1
    fn test_insert_update_delete_read_apply_pointer() {
458 1
        let (add, segment_id) = init_test_address("./addr_insert_update_delete_apply_test.persy");
459 1
        let (recref, _) = add.allocate(segment_id).unwrap();
460 1
        add.insert(segment_id, &recref, 10).unwrap();
461 1
        let (recref_1, _) = add.allocate(segment_id).unwrap();
462 1
        add.insert(segment_id, &recref_1, 20).unwrap();
463

464 1
        let mut inserted = Vec::new();
465 1
        let (recref_2, _) = add.allocate(segment_id).unwrap();
466 1
        inserted.push(InsertRecord::new(segment_id, &recref_2, 30));
467

468 1
        let mut updated = Vec::new();
469 1
        updated.push(UpdateRecord::new(segment_id, &recref_1, 40, 1));
470

471 1
        let mut deleted = Vec::new();
472

473 1
        deleted.push(DeleteRecord::new(segment_id, &recref, 1));
474 1
        let mut seg_ops = Vec::new();
475 1
        seg_ops.push(SegmentOperation::CREATE(CreateSegment::new("def", 20, 20)));
476

477 1
        add.apply(&[], &inserted, &updated, &deleted, &seg_ops, false).unwrap();
478

479 1
        let read = add.read(&recref, segment_id).unwrap();
480 1
        let read_1 = add.read(&recref_1, segment_id).unwrap();
481 1
        let read_2 = add.read(&recref_2, segment_id).unwrap();
482 0
        match read {
483 1
            Some(_) => assert!(false),
484
            None => assert!(true),
485
        }
486 0
        match read_1 {
487 1
            Some(val) => assert_eq!(val.0, 40),
488 0
            None => assert!(false),
489
        }
490 0
        match read_2 {
491 1
            Some(val) => assert_eq!(val.0, 30),
492 0
            None => assert!(false),
493
        }
494 1
    }
495

496
    #[test]
497 1
    fn test_insert_scan() {
498 1
        let (add, segment_id) = init_test_address("./addr_scan_test.persy");
499 1
        let (recref, _) = add.allocate(segment_id).unwrap();
500 1
        add.insert(segment_id, &recref, 10).unwrap();
501 1
        let (recref_1, _) = add.allocate(segment_id).unwrap();
502 1
        add.insert(segment_id, &recref_1, 20).unwrap();
503 1
        let mut to_iter = add.scan(segment_id).unwrap();
504 1
        let mut count = 0;
505 1
        while to_iter.next(&add).is_some() {
506 1
            count += 1;
507
        }
508 1
        assert_eq!(count, 2);
509 1
        let mut iter = add.scan(segment_id).unwrap();
510 1
        let re = iter.next(&add).unwrap();
511 1
        assert_eq!(re.page, recref.page);
512 1
        assert_eq!(re.pos, recref.pos);
513 1
        let re_1 = iter.next(&add).unwrap();
514 1
        assert_eq!(re_1.page, recref_1.page);
515 1
        assert_eq!(re_1.pos, recref_1.pos);
516 1
    }
517

518
    #[test]
519 1
    fn test_insert_over_page() {
520 1
        let (add, segment_id) = init_test_address("./addr_insert_over_page.persy");
521 1
        for z in 0..1000 {
522 1
            let (recref, _) = add.allocate(segment_id).unwrap();
523 1
            add.insert(segment_id, &recref, z).unwrap();
524
        }
525 1
        let mut to_iter = add.scan(segment_id).unwrap();
526 1
        let mut count = 0;
527 1
        while to_iter.next(&add).is_some() {
528 1
            count += 1;
529
        }
530 1
        assert_eq!(count, 1000);
531 1
    }
532
}

Read our documentation on viewing source code .

Loading