1
use crate::{
2
    address::{
3
        Address, ADDRESS_ENTRY_SIZE, ADDRESS_PAGE_ENTRY_COUNT, ADDRESS_PAGE_EXP, ADDRESS_PAGE_SIZE, FLAG_DELETED,
4
        FLAG_EXISTS, SEGMENT_DATA_OFFSET, SEGMENT_HASH_OFFSET, SEGMENT_PAGE_DELETE_COUNT_OFFSET,
5
    },
6
    allocator::Allocator,
7
    discref::{Page, PageOps},
8
    error::{PRes, PersyError},
9
    flush_checksum::{double_buffer_check, write_root_page},
10
    id::{PersyId, RecRef, SegmentId},
11
    io::{read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
12
    persy::exp_from_content_size,
13
};
14
use std::{
15
    collections::{hash_map::DefaultHasher, HashMap},
16
    hash::{Hash, Hasher},
17
    str,
18
    sync::Arc,
19
    vec,
20
};
21

22
const SEGMENTS_ROOT_PAGE_VERSION_0: u8 = 0;
23
const SEGMENTS_ROOT_PAGE_VERSION: u8 = SEGMENTS_ROOT_PAGE_VERSION_0;
24

25
pub struct AllocatedSegmentPage {
26
    pub new_page: u64,
27
    pub previus_page: u64,
28
}
29

30
pub struct Segment {
31
    pub first_page: u64,
32
    persistent_page: u64,
33
    persistent_pos: u32,
34
    pub alloc_page: u64,
35
    pub alloc_pos: u32,
36
    segment_id: SegmentId,
37
    name: String,
38
}
39

40
impl Segment {
41 1
    pub fn new(
42
        first_page: u64,
43
        persistent_page: u64,
44
        persistent_pos: u32,
45
        alloc_page: u64,
46
        alloc_pos: u32,
47
        segment_id: SegmentId,
48
        name: &str,
49
    ) -> Segment {
50 1
        Segment {
51
            first_page,
52
            persistent_page,
53
            persistent_pos,
54
            alloc_page,
55
            alloc_pos,
56
            segment_id,
57 1
            name: name.to_string(),
58
        }
59 1
    }
60

61 1
    pub fn allocate_internal(&mut self, allocator: &Allocator) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
62 1
        let page = self.alloc_page;
63 1
        let pos = self.alloc_pos;
64 1
        let new_pos = pos + ADDRESS_ENTRY_SIZE;
65 1
        Ok(if new_pos > ADDRESS_PAGE_SIZE {
66 1
            let mut pg = allocator.write_page(page)?;
67
            // It can happen that the last page has all entry deleted, we never free the last
68
            // page, so may be there a last page to free, but with no space, in that case is better
69
            // reuse the page directly and avoid to allocate another page and push the last page in
70
            // the back leaking it.
71 1
            if pg.empty()? {
72 0
                let prev = pg.get_prev()?;
73 0
                pg.reset()?;
74 0
                pg.set_next(0)?;
75 0
                pg.set_prev(prev)?;
76 0
                pg.set_segment_id(self.segment_id)?;
77 0
                allocator.flush_page(pg)?;
78 0
                self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
79 0
                (RecRef::new(self.alloc_page, SEGMENT_DATA_OFFSET), None)
80
            } else {
81 1
                let mut new_pg = allocator.allocate(ADDRESS_PAGE_EXP)?;
82 1
                let new_page = new_pg.get_index();
83 1
                pg.set_next(new_page)?;
84 1
                allocator.flush_page(pg)?;
85 1
                new_pg.set_next(0)?;
86 1
                new_pg.set_prev(page)?;
87 1
                new_pg.set_segment_id(self.segment_id)?;
88 1
                allocator.flush_page(new_pg)?;
89 1
                self.alloc_page = new_page;
90 1
                self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
91 1
                (
92 1
                    RecRef::new(self.alloc_page, SEGMENT_DATA_OFFSET),
93 1
                    Some(AllocatedSegmentPage {
94
                        new_page,
95
                        previus_page: page,
96
                    }),
97
                )
98 1
            }
99 1
        } else {
100 1
            self.alloc_pos = new_pos;
101 1
            (RecRef::new(page, pos), None)
102
        })
103 1
    }
104

105 1
    pub fn collect_segment_pages(&self, allocator: &Allocator) -> PRes<Vec<u64>> {
106 1
        let mut pages = Vec::new();
107 1
        let mut page = self.first_page;
108 1
        let last = self.persistent_page;
109 0
        loop {
110 1
            let mut pag = allocator.load_page(page)?;
111 1
            let next = pag.read_u64();
112 1
            let mut pos = SEGMENT_DATA_OFFSET;
113 1
            loop {
114 1
                pag.seek(pos);
115 1
                let data_page = pag.read_u64();
116 1
                let flag = pag.read_u8();
117 1
                if entry_exits(flag) {
118 1
                    pages.push(data_page);
119
                }
120 1
                pos += ADDRESS_ENTRY_SIZE;
121 1
                if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
122
                    break;
123
                }
124
            }
125 1
            pages.push(page);
126 1
            if page == last {
127
                break;
128
            }
129 0
            page = next;
130 0
        }
131 1
        Ok(pages)
132 1
    }
133
}
134

135
pub(crate) trait SegmentPageRead: PageOps {
136
    fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<Option<(u64, u16)>>;
137
    fn segment_scan_all_entries(&mut self) -> PRes<(u64, [(u32, bool); ADDRESS_PAGE_ENTRY_COUNT as usize])>;
138
    fn segment_first_available_pos(&mut self) -> PRes<u32>;
139
    fn get_next(&mut self) -> PRes<u64>;
140
    fn get_prev(&mut self) -> PRes<u64>;
141
    fn empty(&mut self) -> PRes<bool>;
142
}
143
pub(crate) trait SegmentPage: SegmentPageRead {
144
    fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()>;
145
    fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()>;
146
    fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<bool>;
147
    fn set_segment_id(&mut self, id: SegmentId) -> PRes<()>;
148
    fn set_next(&mut self, next: u64) -> PRes<()>;
149
    fn set_prev(&mut self, prev: u64) -> PRes<()>;
150
    fn recalc_count(&mut self) -> PRes<()>;
151
}
152

153 1
fn entry_exits(flags: u8) -> bool {
154 1
    flags & FLAG_EXISTS == 1 && flags & FLAG_DELETED == 0
155 1
}
156

157
impl<T: InfallibleRead + PageOps> SegmentPageRead for T {
158 1
    fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<Option<(u64, u16)>> {
159 1
        self.seek(SEGMENT_HASH_OFFSET);
160 1
        let persistent_id = SegmentId::read(self);
161 1
        if persistent_id != segment_id {
162 0
            return Ok(None);
163
        }
164 1
        self.seek(pos);
165 1
        let record = self.read_u64();
166 1
        let flag = self.read_u8();
167 1
        let version = self.read_u16();
168 1
        Ok(if !entry_exits(flag) || record == 0 {
169 1
            None
170
        } else {
171 1
            Some((record, version))
172
        })
173 1
    }
174

175 1
    fn segment_scan_all_entries(&mut self) -> PRes<(u64, [(u32, bool); ADDRESS_PAGE_ENTRY_COUNT as usize])> {
176 1
        let next_page = self.read_u64();
177 1
        let mut pos = SEGMENT_DATA_OFFSET;
178 1
        let mut recs = [(0, false); ADDRESS_PAGE_ENTRY_COUNT as usize];
179 1
        let mut iter = 0;
180 1
        loop {
181 1
            self.seek(pos + 8);
182 1
            let flag = self.read_u8();
183 1
            recs[iter] = (pos, flag & FLAG_EXISTS == 1);
184 1
            pos += ADDRESS_ENTRY_SIZE;
185 1
            iter += 1;
186 1
            if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
187
                break;
188
            }
189
        }
190 1
        Ok((next_page, recs))
191 1
    }
192

193 1
    fn segment_first_available_pos(&mut self) -> PRes<u32> {
194 1
        let elements = (ADDRESS_PAGE_SIZE - SEGMENT_DATA_OFFSET) / ADDRESS_ENTRY_SIZE;
195 1
        let mut pos = SEGMENT_DATA_OFFSET + (elements - 1) * ADDRESS_ENTRY_SIZE;
196 1
        loop {
197 1
            self.seek(pos + 8);
198 1
            let flag = self.read_u8();
199 1
            if entry_exits(flag) {
200 1
                pos += ADDRESS_ENTRY_SIZE;
201
                break;
202
            }
203 1
            if pos == SEGMENT_DATA_OFFSET {
204 1
                break;
205
            }
206 1
            pos -= ADDRESS_ENTRY_SIZE;
207 1
            debug_assert!(pos >= SEGMENT_DATA_OFFSET);
208
        }
209 1
        Ok(pos)
210 1
    }
211

212 1
    fn get_next(&mut self) -> PRes<u64> {
213 1
        self.seek(0);
214 1
        Ok(self.read_u64())
215 1
    }
216

217 1
    fn get_prev(&mut self) -> PRes<u64> {
218 1
        self.seek(8);
219 1
        Ok(self.read_u64())
220 1
    }
221 1
    fn empty(&mut self) -> PRes<bool> {
222 1
        self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET);
223 1
        Ok(self.read_u16() as u32 == ADDRESS_PAGE_ENTRY_COUNT)
224 1
    }
225
}
226

227 1
fn inc_version(mut version: u16) -> u16 {
228 1
    version += 1;
229 1
    if version == 0 {
230 0
        1
231
    } else {
232 1
        version
233
    }
234 1
}
235

236
impl<T: InfallibleRead + InfallibleWrite + PageOps> SegmentPage for T {
237 1
    fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()> {
238 1
        debug_assert!(pos >= SEGMENT_DATA_OFFSET, "invalid page position {}", pos);
239 1
        self.seek(SEGMENT_HASH_OFFSET);
240 1
        let persistent_id = SegmentId::read(self);
241 1
        if persistent_id != segment_id {
242 0
            return Err(PersyError::SegmentNotFound);
243
        }
244
        // TODO: In case of restore this may get a wrong value, so on restore should be
245
        // re-calculated from the persistent flags.
246 1
        self.seek(pos);
247 1
        self.write_u64(record_page);
248 1
        self.write_u8(FLAG_EXISTS);
249 1
        self.write_u16(1);
250 1
        Ok(())
251 1
    }
252 1
    fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()> {
253 1
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
254 1
        self.seek(SEGMENT_HASH_OFFSET);
255 1
        let persistent_id = SegmentId::read(self);
256 1
        if persistent_id != segment_id {
257 0
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
258
        }
259 1
        self.seek(pos + 9);
260 1
        let version = self.read_u16();
261 1
        self.seek(pos);
262 1
        self.write_u64(record_page);
263 1
        self.seek(pos + 9);
264 1
        self.write_u16(inc_version(version));
265 1
        Ok(())
266 1
    }
267 1
    fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<bool> {
268 1
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
269 1
        self.seek(SEGMENT_HASH_OFFSET);
270 1
        let persistent_id = SegmentId::read(self);
271 1
        if persistent_id != segment_id {
272 0
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
273
        }
274 1
        self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET);
275 1
        let count = self.read_u16() + 1;
276 1
        self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET);
277 1
        self.write_u16(count);
278 1
        self.seek(pos + 8);
279 1
        let flag = self.read_u8();
280 1
        self.seek(pos + 8);
281 1
        self.write_u8(flag | FLAG_DELETED);
282 1
        Ok(count as u32 == ADDRESS_PAGE_ENTRY_COUNT)
283 1
    }
284

285 1
    fn set_segment_id(&mut self, id: SegmentId) -> PRes<()> {
286 1
        self.seek(SEGMENT_HASH_OFFSET);
287 1
        id.write(self);
288 1
        Ok(())
289 1
    }
290

291 1
    fn set_next(&mut self, next: u64) -> PRes<()> {
292 1
        self.seek(0);
293 1
        self.write_u64(next);
294 1
        Ok(())
295 1
    }
296

297 1
    fn set_prev(&mut self, prev: u64) -> PRes<()> {
298 1
        self.seek(8);
299 1
        self.write_u64(prev);
300 1
        Ok(())
301 1
    }
302

303 1
    fn recalc_count(&mut self) -> PRes<()> {
304 1
        let mut pos = SEGMENT_DATA_OFFSET;
305 1
        let mut count = 0;
306 1
        loop {
307 1
            self.seek(pos + 8);
308 1
            let flag = self.read_u8();
309 1
            if flag & FLAG_DELETED == FLAG_DELETED {
310 1
                count += 1;
311
            }
312 1
            pos += ADDRESS_ENTRY_SIZE;
313 1
            if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
314
                break;
315
            }
316
        }
317 1
        self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET);
318 1
        self.write_u8(count as u8);
319 1
        Ok(())
320 1
    }
321
}
322

323
pub struct Segments {
324
    pub root_page: u64,
325
    content_page: u64,
326
    other_page: u64,
327
    last_flush: u8,
328
    pub segments: HashMap<SegmentId, Segment>,
329
    pub segments_id: HashMap<String, SegmentId>,
330
    pub temp_segments: HashMap<SegmentId, Segment>,
331
    pub temp_segments_id: HashMap<String, SegmentId>,
332
}
333

334 1
pub fn segment_hash(segment: &str) -> u64 {
335
    let mut val: u64;
336 1
    let hasher = &mut DefaultHasher::new();
337 1
    segment.hash(hasher);
338 1
    val = hasher.finish();
339 1
    val <<= 32;
340 1
    val |= u64::from(rand::random::<u32>());
341
    val
342 1
}
343

344
impl Segments {
345 1
    pub fn new(root_page: u64, allocator: &Arc<Allocator>) -> PRes<Segments> {
346 1
        let mut buffer_0 = [0; 19];
347 1
        let mut buffer_1 = [0; 19];
348
        let page_id;
349
        let other_page_id;
350
        let last_flush;
351
        {
352 1
            let mut root = allocator.load_page(root_page)?;
353 1
            match root.read_u8() {
354 1
                SEGMENTS_ROOT_PAGE_VERSION_0 => {
355 1
                    root.read_exact(&mut buffer_0);
356 1
                    root.read_exact(&mut buffer_1);
357 1
                    let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
358 1
                    last_flush = flush;
359 1
                    if first {
360 1
                        page_id = read_u64(&buffer_0[0..8]);
361 1
                        other_page_id = read_u64(&buffer_0[8..16]);
362
                    } else {
363 1
                        page_id = read_u64(&buffer_1[0..8]);
364 1
                        other_page_id = read_u64(&buffer_1[8..16]);
365
                    }
366
                }
367 0
                _ => panic!("version not supported"),
368
            }
369 1
        }
370 1
        let mut segments = HashMap::new();
371 1
        let mut segments_id = HashMap::new();
372 1
        if page_id != 0 {
373 1
            let mut page = allocator.load_page(page_id)?;
374 1
            loop {
375 1
                let flag = page.read_u8();
376 1
                if flag == 1 {
377 1
                    let first_page = page.read_u64();
378 1
                    let persistent_page = page.read_u64();
379 1
                    let persistent_pos = page.read_u32();
380 1
                    let pers_hash = SegmentId::read(&mut page);
381 1
                    let name_size = page.read_u16() as usize;
382

383 1
                    let mut slice: Vec<u8> = vec![0; name_size];
384 1
                    page.read_exact(&mut slice);
385 1
                    let name: String = str::from_utf8(&slice[0..name_size])?.into();
386 1
                    segments.insert(
387
                        pers_hash,
388 1
                        Segment::new(
389
                            first_page,
390
                            persistent_page,
391
                            persistent_pos,
392
                            persistent_page,
393
                            persistent_pos,
394
                            pers_hash,
395 1
                            &name,
396
                        ),
397 1
                    );
398 1
                    segments_id.insert(name, pers_hash);
399 1
                } else {
400
                    break;
401
                }
402
            }
403 1
        }
404 1
        Ok(Segments {
405
            root_page,
406 1
            content_page: page_id,
407 1
            other_page: other_page_id,
408
            last_flush,
409 1
            segments,
410 1
            segments_id,
411 1
            temp_segments: HashMap::new(),
412 1
            temp_segments_id: HashMap::new(),
413 0
        })
414 1
    }
415

416 1
    pub fn init(mut root: Page, allocator: &Allocator) -> PRes<()> {
417 1
        let mut buffer = [0; 19];
418 1
        write_u64(&mut buffer[0..8], 0);
419 1
        write_u64(&mut buffer[8..16], 0);
420 1
        write_root_page(&mut root, &mut buffer, SEGMENTS_ROOT_PAGE_VERSION, 0)?;
421 1
        allocator.flush_page(root)?;
422 1
        Ok(())
423 1
    }
424

425 1
    pub fn segment_id(&self, segment: &str) -> Option<SegmentId> {
426 1
        if let Some(id) = self.segments_id.get(segment) {
427 1
            self.segments.get(id).map(|x| x.segment_id)
428
        } else {
429 1
            None
430
        }
431 1
    }
432

433 1
    pub fn segment_by_id(&self, id: SegmentId) -> Option<&Segment> {
434 1
        self.segments.get(&id)
435 1
    }
436

437
    pub fn segment_name_by_id(&self, id: SegmentId) -> Option<String> {
438
        self.segments.get(&id).map(|s| s.name.clone())
439
    }
440

441 1
    pub fn segment_by_id_temp(&self, id: SegmentId) -> Option<&Segment> {
442 1
        self.temp_segments.get(&id)
443 1
    }
444

445 1
    pub fn has_segment(&self, segment: &str) -> bool {
446 1
        self.segments_id.contains_key(segment)
447 1
    }
448

449 1
    pub fn create_temp_segment(&mut self, allocator: &Allocator, segment: &str) -> PRes<(SegmentId, u64)> {
450 1
        let mut allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
451 1
        let allocated_id = allocated.get_index();
452 1
        let segment_id = SegmentId::new(segment_hash(segment));
453 1
        let seg = Segment::new(
454
            allocated_id,
455
            allocated_id,
456
            SEGMENT_DATA_OFFSET,
457
            allocated_id,
458
            SEGMENT_DATA_OFFSET,
459 1
            segment_id,
460
            segment,
461
        );
462 1
        self.temp_segments.insert(segment_id, seg);
463 1
        self.temp_segments_id.insert(segment.to_string(), segment_id);
464 1
        allocated.write_u64(0);
465 1
        allocated.write_u64(0);
466 1
        segment_id.write(&mut allocated);
467 1
        allocator.flush_page(allocated)?;
468 1
        Ok((segment_id, allocated_id))
469 1
    }
470

471 1
    pub fn get_temp_segment_mut(&mut self, segment: SegmentId) -> Option<&mut Segment> {
472 1
        self.temp_segments.get_mut(&segment)
473 1
    }
474

475 1
    pub fn drop_temp_segment(&mut self, allocator: &Allocator, segment: SegmentId) -> PRes<()> {
476 1
        if let Some(segment) = self.temp_segments.remove(&segment) {
477 1
            self.temp_segments_id.remove(&segment.name);
478
            //TODO: to review this, may cause disc leaks in case of crash on rollback of segment
479
            // creation
480 1
            let pages = segment.collect_segment_pages(allocator)?;
481 1
            for page in pages {
482 1
                allocator.free(page)?;
483 1
            }
484 1
        }
485 1
        Ok(())
486 1
    }
487

488 1
    pub fn exists_real_or_temp(&self, segment: SegmentId) -> bool {
489 1
        self.segments.contains_key(&segment) || self.temp_segments.contains_key(&segment)
490 1
    }
491

492 1
    pub fn create_segment(&mut self, segment: SegmentId, first_page: u64) -> PRes<()> {
493 1
        if let Some(mut s) = self.temp_segments.remove(&segment) {
494
            // This is needed mainly for recover
495 1
            s.first_page = first_page;
496 1
            self.temp_segments_id.remove(&s.name);
497 1
            self.segments_id.insert(s.name.clone(), s.segment_id);
498 1
            self.segments.insert(s.segment_id, s);
499 1
        }
500 1
        Ok(())
501 1
    }
502

503 1
    pub fn drop_segment(&mut self, segment: &str) -> PRes<()> {
504 1
        if let Some(seg) = self.segments_id.remove(segment) {
505 1
            self.segments.remove(&seg);
506
        }
507

508 1
        Ok(())
509 1
    }
510 1
    pub fn collect_segment_pages(&self, allocator: &Allocator, segment: SegmentId) -> PRes<Vec<u64>> {
511 1
        if let Some(seg) = self.segments.get(&segment) {
512 1
            seg.collect_segment_pages(allocator)
513
        } else {
514 1
            Ok(Vec::new())
515
        }
516 1
    }
517

518 1
    pub fn set_first_page(&mut self, segment: SegmentId, first_page: u64, allocator: &Allocator) -> PRes<()> {
519 1
        if let Some(seg) = self.segments.get_mut(&segment) {
520 1
            seg.first_page = first_page;
521
        }
522 1
        self.flush_segments(allocator)?;
523 1
        Ok(())
524 1
    }
525

526 1
    pub fn confirm_allocations(&mut self, segments: &[SegmentId], allocator: &Allocator, recover: bool) -> PRes<()> {
527 1
        for seg in segments {
528
            // If happen that a segment is not found something went wrong and is better crash
529 1
            let segment = if let Some(s) = self.segments.get_mut(seg) {
530 1
                s
531 1
            } else if let Some(s) = self.temp_segments.get_mut(seg) {
532
                s
533
            } else {
534 0
                panic!("segment operation when segment do not exists");
535
            };
536 1
            segment.persistent_page = segment.alloc_page;
537 1
            if recover {
538
                // In case of recover recalculate the last available position
539 1
                let mut page = allocator.load_page(segment.alloc_page)?;
540 1
                segment.alloc_pos = page.segment_first_available_pos()?;
541 1
            }
542 1
            segment.persistent_pos = segment.alloc_pos;
543
        }
544 1
        Ok(())
545 1
    }
546

547 1
    pub fn flush_segments(&mut self, allocator: &Allocator) -> PRes<()> {
548 1
        let mut buffer = Vec::<u8>::new();
549 1
        for segment in self.segments.values() {
550 1
            buffer.write_u8(1);
551 1
            buffer.write_u64(segment.first_page);
552 1
            buffer.write_u64(segment.persistent_page);
553 1
            buffer.write_u32(segment.persistent_pos);
554 1
            segment.segment_id.write(&mut buffer);
555 1
            buffer.write_u16(segment.name.len() as u16);
556 1
            buffer.write_all(segment.name.as_bytes());
557
        }
558 1
        buffer.write_u8(0);
559 1
        let exp = exp_from_content_size(buffer.len() as u64);
560 1
        let other_page_id = self.content_page;
561 1
        let mut content_page_id = self.other_page;
562
        {
563 1
            let mut content_page = if content_page_id == 0 {
564 1
                allocator.allocate(exp)?
565 0
            } else {
566 1
                let mut page = allocator.write_page(content_page_id)?;
567 1
                if page.get_size_exp() != exp {
568 1
                    allocator.free(content_page_id)?;
569 1
                    page = allocator.allocate(exp)?;
570
                }
571 1
                page
572 0
            };
573 1
            content_page_id = content_page.get_index();
574 1
            content_page.write_all(&buffer);
575 1
            allocator.flush_page(content_page)?;
576 1
        }
577

578 1
        let mut root_buffer = [0; 19];
579 1
        write_u64(&mut root_buffer[0..8], content_page_id);
580 1
        write_u64(&mut root_buffer[8..16], other_page_id);
581 1
        let mut root = allocator.write_page(self.root_page)?;
582 1
        self.last_flush = write_root_page(&mut root, &mut root_buffer, SEGMENTS_ROOT_PAGE_VERSION, self.last_flush)?;
583 1
        allocator.flush_page(root)?;
584 1
        self.content_page = content_page_id;
585 1
        self.other_page = other_page_id;
586 1
        Ok(())
587 1
    }
588

589 1
    pub fn list(&self) -> Vec<(String, SegmentId)> {
590 1
        self.segments_id.iter().map(|(n, id)| (n.clone(), *id)).collect()
591 1
    }
592 1
    pub fn snapshot_list(&self) -> Vec<(String, SegmentId, u64)> {
593 1
        self.segments
594
            .iter()
595 1
            .map(|(id, seg)| (seg.name.clone(), *id, seg.first_page))
596
            .collect()
597 1
    }
598
}
599

600
pub struct SegmentPageIterator {
601
    cur_page: u64,
602
    next_page: u64,
603
    per_page_iterator: [(u32, bool); ADDRESS_PAGE_ENTRY_COUNT as usize],
604
    iter_pos: usize,
605
    include_deleted: bool,
606
}
607

608
impl SegmentPageIterator {
609 1
    pub fn new(first_page: u64) -> SegmentPageIterator {
610 1
        SegmentPageIterator {
611
            cur_page: first_page,
612
            next_page: first_page,
613 1
            per_page_iterator: [(0, false); ADDRESS_PAGE_ENTRY_COUNT as usize],
614
            iter_pos: 0,
615
            include_deleted: false,
616
        }
617 1
    }
618 1
    pub fn snapshot(first_page: u64) -> SegmentPageIterator {
619 1
        SegmentPageIterator {
620
            cur_page: first_page,
621
            next_page: first_page,
622 1
            per_page_iterator: [(0, false); ADDRESS_PAGE_ENTRY_COUNT as usize],
623
            iter_pos: 0,
624
            include_deleted: true,
625
        }
626 1
    }
627

628 1
    pub fn next(&mut self, address: &Address) -> Option<RecRef> {
629
        // This loop is needed because some pages may be empty
630 1
        loop {
631 1
            if self.iter_pos < self.per_page_iterator.len() {
632 1
                let (pos, exists) = self.per_page_iterator[self.iter_pos];
633 1
                self.iter_pos += 1;
634 1
                if exists || self.include_deleted {
635 1
                    break Some(RecRef::new(self.cur_page, pos));
636
                } else {
637 1
                    continue;
638
                }
639 1
            } else if self.next_page != 0 {
640 1
                self.cur_page = self.next_page;
641 1
                if let Ok((next_page, elements)) = address.scan_page_all(self.cur_page) {
642 1
                    self.next_page = next_page;
643 1
                    self.per_page_iterator = elements;
644 1
                    self.iter_pos = 0;
645
                }
646 1
            } else {
647 1
                break None;
648
            }
649
        }
650 1
    }
651
}
652

653
#[cfg(test)]
654
mod tests {
655
    use super::{segment_hash, SegmentPage, SegmentPageRead, Segments};
656
    use crate::{
657
        address::ADDRESS_PAGE_EXP,
658
        allocator::Allocator,
659
        config::Config,
660
        discref::{DiscRef, Page, PageOps},
661
        id::SegmentId,
662
    };
663
    use std::sync::Arc;
664
    use tempfile::Builder;
665

666 1
    fn create_allocator(file_name: &str) -> Allocator {
667 1
        let file = Builder::new()
668
            .prefix(file_name)
669
            .suffix(".persy")
670
            .tempfile()
671
            .unwrap()
672
            .reopen()
673 1
            .unwrap();
674 1
        let config = Arc::new(Config::new());
675 1
        let disc = Box::new(DiscRef::new(file).unwrap());
676 1
        let (_, allocator) = Allocator::init(disc, &config).unwrap();
677 1
        allocator.allocate(5).unwrap(); //Just to be sure it not start from 0, it cannot happen in not test cases.
678

679
        allocator
680 1
    }
681

682
    #[test]
683 1
    fn test_create_drop_segment() {
684 1
        let allocator = create_allocator("./raw_segment_create_delete.persy");
685 1
        let all = Arc::new(allocator);
686 1
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
687 1
        let root_index = root.get_index();
688 1
        Segments::init(root, &all).unwrap();
689 1
        let mut segments = Segments::new(root_index, &all).unwrap();
690

691 1
        let (id, fp) = segments.create_temp_segment(&all, "some").unwrap();
692 1
        segments.create_segment(id, fp).unwrap();
693 1
        segments.flush_segments(&all).unwrap();
694 1
        assert!(segments.segments_id.contains_key("some"));
695 1
        assert!(segments.segments.contains_key(&id));
696 1
        segments.drop_segment("some").unwrap();
697 1
        segments.flush_segments(&all).unwrap();
698 1
        assert!(!segments.segments_id.contains_key("some"));
699 1
        assert!(!segments.segments.contains_key(&id));
700 1
    }
701

702
    #[test]
703 1
    fn test_create_drop_temp_segment() {
704 1
        let allocator = create_allocator("./segment_create_delete.persy");
705 1
        let all = Arc::new(allocator);
706 1
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
707 1
        let root_index = root.get_index();
708 1
        Segments::init(root, &all).unwrap();
709 1
        let mut segments = Segments::new(root_index, &all).unwrap();
710

711 1
        let (id, _) = segments.create_temp_segment(&all, "some").unwrap();
712 1
        assert!(segments.temp_segments.contains_key(&id));
713 1
        assert!(segments.temp_segments_id.contains_key("some"));
714 1
        segments.drop_temp_segment(&all, id).unwrap();
715 1
        assert!(!segments.temp_segments.contains_key(&id));
716 1
        assert!(!segments.temp_segments_id.contains_key("some"));
717 1
    }
718

719
    #[test]
720 1
    fn test_create_close_drop_close_segment() {
721 1
        let allocator = create_allocator("./segment_pers_create_delete.persy");
722 1
        let all = Arc::new(allocator);
723 1
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
724 1
        let root_index = root.get_index();
725 1
        Segments::init(root, &all).unwrap();
726
        let id;
727
        {
728 1
            let mut segments = Segments::new(root_index, &all).unwrap();
729 1
            let (c_id, fp) = segments.create_temp_segment(&all, "some").unwrap();
730 1
            id = c_id;
731 1
            segments.create_segment(id, fp).unwrap();
732 1
            segments.flush_segments(&all).unwrap();
733 1
        }
734
        {
735 1
            let mut segments = Segments::new(root_index, &all).unwrap();
736 1
            assert_eq!(segments.segments.len(), 1);
737 1
            assert!(segments.segments_id.contains_key("some"));
738 1
            assert!(segments.segments.contains_key(&id));
739 1
            segments.drop_segment("some").unwrap();
740 1
            segments.flush_segments(&all).unwrap();
741 1
        }
742
        {
743 1
            let segments = Segments::new(root_index, &all).unwrap();
744 1
            assert!(!segments.segments_id.contains_key("some"));
745 1
            assert!(!segments.segments.contains_key(&id));
746 1
        }
747 1
    }
748

749
    #[test]
750 1
    fn test_create_close_drop_close_segment_off_page() {
751 1
        let allocator = create_allocator("./segment_pers_create_delete_off_page.persy");
752 1
        let all = Arc::new(allocator);
753 1
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
754 1
        let root_index = root.get_index();
755 1
        Segments::init(root, &all).unwrap();
756
        {
757 1
            let mut segments = Segments::new(root_index, &all).unwrap();
758 1
            for i in 0..100 {
759 1
                let (id, fp) = segments.create_temp_segment(&all, &format!("some{}", i)).unwrap();
760 1
                segments.create_segment(id, fp).unwrap();
761
            }
762 1
            segments.flush_segments(&all).unwrap();
763 1
        }
764
        {
765 1
            let mut segments = Segments::new(root_index, &all).unwrap();
766 1
            for i in 0..100 {
767 1
                assert!(segments.segments_id.contains_key(&format!("some{}", i)));
768 1
                segments.drop_segment(&format!("some{}", i)).unwrap();
769
            }
770 1
            segments.flush_segments(&all).unwrap();
771 1
        }
772
        {
773 1
            let segments = Segments::new(root_index, &all).unwrap();
774 1
            for i in 0..100 {
775 1
                assert!(!segments.segments_id.contains_key(&format!("some{}", i)));
776
            }
777 1
        }
778 1
    }
779

780
    #[test]
781 1
    fn test_seg_insert_read_pointer() {
782 1
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
783 1
        let segment_id = SegmentId::new(0);
784 1
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
785 1
        let read = page.segment_read_entry(segment_id, 30).unwrap();
786 0
        match read {
787 1
            Some(val) => assert_eq!(val.0, 10),
788 0
            None => assert!(false),
789
        }
790 1
    }
791

792
    #[test]
793 1
    fn test_seg_insert_update_read_pointer() {
794 1
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
795 1
        let segment_id = SegmentId::new(0);
796 1
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
797 1
        page.segment_update_entry(segment_id, 30, 15).unwrap();
798 1
        let read = page.segment_read_entry(segment_id, 30).unwrap();
799 0
        match read {
800 1
            Some(val) => assert_eq!(val.0, 15),
801 0
            None => assert!(false),
802
        }
803 1
    }
804

805
    #[test]
806 1
    fn test_seg_insert_delete_read_pointer() {
807 1
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
808 1
        let segment_id = SegmentId::new(0);
809 1
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
810 1
        page.segment_delete_entry(segment_id, 30).unwrap();
811 1
        let read = page.segment_read_entry(segment_id, 30).unwrap();
812 0
        match read {
813 1
            Some(_) => assert!(false),
814
            None => assert!(true),
815
        }
816 1
    }
817

818
    #[test]
819 1
    fn test_hash_id_generator() {
820 1
        assert!(0 != segment_hash("some"));
821 1
    }
822
}

Read our documentation on viewing source code .

Loading