1
use crate::{
2
    address::{
3
        Address, ADDRESS_ENTRY_SIZE, ADDRESS_PAGE_ENTRY_COUNT, ADDRESS_PAGE_EXP, ADDRESS_PAGE_SIZE, FLAG_DELETED,
4
        FLAG_EXISTS, SEGMENT_DATA_OFFSET, SEGMENT_HASH_OFFSET, SEGMENT_PAGE_DELETE_COUNT_OFFSET,
5
    },
6
    allocator::Allocator,
7
    discref::{Page, PageOps},
8
    error::{PRes, PersyError},
9
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
10
    id::{PersyId, RecRef, SegmentId},
11
    io::{read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
12
    persy::exp_from_content_size,
13
};
14
use std::{
15
    collections::{hash_map::DefaultHasher, HashMap},
16
    hash::{Hash, Hasher},
17
    str,
18
    sync::Arc,
19
    vec,
20
};
21

22
pub struct AllocatedSegmentPage {
23
    pub new_page: u64,
24
    pub previus_page: u64,
25
}
26

27
pub struct Segment {
28
    pub first_page: u64,
29
    persistent_page: u64,
30
    persistent_pos: u32,
31
    pub alloc_page: u64,
32
    pub alloc_pos: u32,
33
    segment_id: SegmentId,
34
    name: String,
35
}
36

37
impl Segment {
38 1
    pub fn new(
39
        first_page: u64,
40
        persistent_page: u64,
41
        persistent_pos: u32,
42
        alloc_page: u64,
43
        alloc_pos: u32,
44
        segment_id: SegmentId,
45
        name: &str,
46
    ) -> Segment {
47 1
        Segment {
48
            first_page,
49
            persistent_page,
50
            persistent_pos,
51
            alloc_page,
52
            alloc_pos,
53
            segment_id,
54 1
            name: name.to_string(),
55
        }
56 1
    }
57

58 1
    pub fn allocate_internal(&mut self, allocator: &Allocator) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
59 1
        let page = self.alloc_page;
60 1
        let pos = self.alloc_pos;
61 1
        let new_pos = pos + ADDRESS_ENTRY_SIZE;
62 1
        Ok(if new_pos > ADDRESS_PAGE_SIZE {
63 1
            let mut pg = allocator.write_page(page)?;
64
            // It can happen that the last page has all entry deleted, we never free the last
65
            // page, so may be there a last page to free, but with no space, in that case is better
66
            // reuse the page directly and avoid to allocate another page and push the last page in
67
            // the back leaking it.
68 1
            if pg.empty()? {
69 0
                let prev = pg.get_prev()?;
70 0
                pg.reset()?;
71 0
                pg.set_next(0)?;
72 0
                pg.set_prev(prev)?;
73 0
                pg.set_segment_id(self.segment_id)?;
74 0
                allocator.flush_page(pg)?;
75 0
                self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
76 0
                (RecRef::new(self.alloc_page, SEGMENT_DATA_OFFSET), None)
77
            } else {
78 1
                let mut new_pg = allocator.allocate(ADDRESS_PAGE_EXP)?;
79 1
                let new_page = new_pg.get_index();
80 1
                pg.set_next(new_page)?;
81 1
                allocator.flush_page(pg)?;
82 1
                new_pg.set_next(0)?;
83 1
                new_pg.set_prev(page)?;
84 1
                new_pg.set_segment_id(self.segment_id)?;
85 1
                allocator.flush_page(new_pg)?;
86 1
                self.alloc_page = new_page;
87 1
                self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
88 1
                (
89 1
                    RecRef::new(self.alloc_page, SEGMENT_DATA_OFFSET),
90 1
                    Some(AllocatedSegmentPage {
91
                        new_page,
92
                        previus_page: page,
93
                    }),
94
                )
95 1
            }
96 1
        } else {
97 1
            self.alloc_pos = new_pos;
98 1
            (RecRef::new(page, pos), None)
99
        })
100 1
    }
101

102 1
    pub fn collect_segment_pages(&self, allocator: &Allocator) -> PRes<Vec<u64>> {
103 1
        let mut pages = Vec::new();
104 1
        let mut page = self.first_page;
105 1
        let last = self.persistent_page;
106 0
        loop {
107 1
            let mut pag = allocator.load_page(page)?;
108 1
            let next = pag.read_u64();
109 1
            let mut pos = SEGMENT_DATA_OFFSET;
110 1
            loop {
111 1
                pag.seek(pos)?;
112 1
                let data_page = pag.read_u64();
113 1
                let flag = pag.read_u8();
114 1
                if entry_exits(flag) {
115 1
                    pages.push(data_page);
116
                }
117 1
                pos += ADDRESS_ENTRY_SIZE;
118 1
                if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
119
                    break;
120
                }
121
            }
122 1
            pages.push(page);
123 1
            if page == last {
124
                break;
125
            }
126 0
            page = next;
127 0
        }
128 1
        Ok(pages)
129 1
    }
130
}
131

132
pub(crate) trait SegmentPageRead: PageOps {
133
    fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<Option<(u64, u16)>>;
134
    fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)>;
135
    fn segment_scan_all_entries(&mut self) -> PRes<(u64, Vec<(u32, bool)>)>;
136
    fn segment_first_available_pos(&mut self) -> PRes<u32>;
137
    fn get_next(&mut self) -> PRes<u64>;
138
    fn get_prev(&mut self) -> PRes<u64>;
139
    fn empty(&mut self) -> PRes<bool>;
140
}
141
pub(crate) trait SegmentPage: SegmentPageRead {
142
    fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()>;
143
    fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()>;
144
    fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<bool>;
145
    fn set_segment_id(&mut self, id: SegmentId) -> PRes<()>;
146
    fn set_next(&mut self, next: u64) -> PRes<()>;
147
    fn set_prev(&mut self, prev: u64) -> PRes<()>;
148
    fn recalc_count(&mut self) -> PRes<()>;
149
}
150

151 1
fn entry_exits(flags: u8) -> bool {
152 1
    flags & FLAG_EXISTS == 1 && flags & FLAG_DELETED == 0
153 1
}
154

155
impl<T: InfallibleRead + PageOps> SegmentPageRead for T {
156 1
    fn segment_read_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<Option<(u64, u16)>> {
157 1
        self.seek(SEGMENT_HASH_OFFSET)?;
158 1
        let persistent_id = SegmentId::read(self);
159 1
        if persistent_id != segment_id {
160 0
            return Ok(None);
161
        }
162 1
        self.seek(pos)?;
163 1
        let record = self.read_u64();
164 1
        let flag = self.read_u8();
165 1
        let version = self.read_u16();
166 1
        Ok(if !entry_exits(flag) || record == 0 {
167 1
            None
168
        } else {
169 1
            Some((record, version))
170
        })
171 1
    }
172

173 1
    fn segment_scan_all_entries(&mut self) -> PRes<(u64, Vec<(u32, bool)>)> {
174 1
        let next_page = self.read_u64();
175 1
        let mut pos = SEGMENT_DATA_OFFSET;
176 1
        let mut recs = Vec::new();
177 1
        loop {
178 1
            self.seek(pos + 8)?;
179 1
            let flag = self.read_u8();
180 1
            recs.push((pos, flag & FLAG_EXISTS == 1));
181 1
            pos += ADDRESS_ENTRY_SIZE;
182 1
            if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
183
                break;
184
            }
185
        }
186 1
        Ok((next_page, recs))
187 1
    }
188

189 1
    fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)> {
190 1
        let next_page = self.read_u64();
191 1
        let mut pos = SEGMENT_DATA_OFFSET;
192 1
        let mut recs = Vec::new();
193 1
        loop {
194 1
            self.seek(pos + 8)?;
195 1
            let flag = self.read_u8();
196 1
            if entry_exits(flag) {
197 1
                recs.push(pos);
198
            }
199 1
            pos += ADDRESS_ENTRY_SIZE;
200 1
            if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
201
                break;
202
            }
203
        }
204 1
        Ok((next_page, recs))
205 1
    }
206

207 1
    fn segment_first_available_pos(&mut self) -> PRes<u32> {
208 1
        let elements = (ADDRESS_PAGE_SIZE - SEGMENT_DATA_OFFSET) / ADDRESS_ENTRY_SIZE;
209 1
        let mut pos = SEGMENT_DATA_OFFSET + (elements - 1) * ADDRESS_ENTRY_SIZE;
210 1
        loop {
211 1
            self.seek(pos + 8)?;
212 1
            let flag = self.read_u8();
213 1
            if entry_exits(flag) {
214 1
                pos += ADDRESS_ENTRY_SIZE;
215
                break;
216
            }
217 1
            if pos == SEGMENT_DATA_OFFSET {
218 1
                break;
219
            }
220 1
            pos -= ADDRESS_ENTRY_SIZE;
221 1
            debug_assert!(pos >= SEGMENT_DATA_OFFSET);
222
        }
223 1
        Ok(pos)
224 1
    }
225

226 1
    fn get_next(&mut self) -> PRes<u64> {
227 1
        self.seek(0)?;
228 1
        Ok(self.read_u64())
229 1
    }
230

231 1
    fn get_prev(&mut self) -> PRes<u64> {
232 1
        self.seek(8)?;
233 1
        Ok(self.read_u64())
234 1
    }
235 1
    fn empty(&mut self) -> PRes<bool> {
236 1
        self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET)?;
237 1
        Ok(self.read_u16() as u32 == ADDRESS_PAGE_ENTRY_COUNT)
238 1
    }
239
}
240

241 1
fn inc_version(mut version: u16) -> u16 {
242 1
    version += 1;
243 1
    if version == 0 {
244 0
        1
245
    } else {
246 1
        version
247
    }
248 1
}
249

250
impl<T: InfallibleRead + InfallibleWrite + PageOps> SegmentPage for T {
251 1
    fn segment_insert_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()> {
252 1
        debug_assert!(pos >= SEGMENT_DATA_OFFSET, "invalid page position {}", pos);
253 1
        self.seek(SEGMENT_HASH_OFFSET)?;
254 1
        let persistent_id = SegmentId::read(self);
255 1
        if persistent_id != segment_id {
256 0
            return Err(PersyError::SegmentNotFound);
257
        }
258
        // TODO: In case of restore this may get a wrong value, so on restore should be
259
        // re-calculated from the persistent flags.
260 1
        self.seek(pos)?;
261 1
        self.write_u64(record_page);
262 1
        self.write_u8(FLAG_EXISTS);
263 1
        self.write_u16(1);
264 1
        Ok(())
265 1
    }
266 1
    fn segment_update_entry(&mut self, segment_id: SegmentId, pos: u32, record_page: u64) -> PRes<()> {
267 1
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
268 1
        self.seek(SEGMENT_HASH_OFFSET)?;
269 1
        let persistent_id = SegmentId::read(self);
270 1
        if persistent_id != segment_id {
271 0
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
272
        }
273 1
        self.seek(pos + 9)?;
274 1
        let version = self.read_u16();
275 1
        self.seek(pos)?;
276 1
        self.write_u64(record_page);
277 1
        self.seek(pos + 9)?;
278 1
        self.write_u16(inc_version(version));
279 1
        Ok(())
280 1
    }
281 1
    fn segment_delete_entry(&mut self, segment_id: SegmentId, pos: u32) -> PRes<bool> {
282 1
        debug_assert!(pos >= SEGMENT_DATA_OFFSET);
283 1
        self.seek(SEGMENT_HASH_OFFSET)?;
284 1
        let persistent_id = SegmentId::read(self);
285 1
        if persistent_id != segment_id {
286 0
            return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
287
        }
288 1
        self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET)?;
289 1
        let count = self.read_u16() + 1;
290 1
        self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET)?;
291 1
        self.write_u16(count);
292 1
        self.seek(pos + 8)?;
293 1
        let flag = self.read_u8();
294 1
        self.seek(pos + 8)?;
295 1
        self.write_u8(flag | FLAG_DELETED);
296 1
        Ok(count as u32 == ADDRESS_PAGE_ENTRY_COUNT)
297 1
    }
298

299 1
    fn set_segment_id(&mut self, id: SegmentId) -> PRes<()> {
300 1
        self.seek(SEGMENT_HASH_OFFSET)?;
301 1
        id.write(self);
302 1
        Ok(())
303 1
    }
304

305 1
    fn set_next(&mut self, next: u64) -> PRes<()> {
306 1
        self.seek(0)?;
307 1
        self.write_u64(next);
308 1
        Ok(())
309 1
    }
310

311 1
    fn set_prev(&mut self, prev: u64) -> PRes<()> {
312 1
        self.seek(8)?;
313 1
        self.write_u64(prev);
314 1
        Ok(())
315 1
    }
316

317 1
    fn recalc_count(&mut self) -> PRes<()> {
318 1
        let mut pos = SEGMENT_DATA_OFFSET;
319 1
        let mut count = 0;
320 1
        loop {
321 1
            self.seek(pos + 8)?;
322 1
            let flag = self.read_u8();
323 1
            if flag & FLAG_DELETED == FLAG_DELETED {
324 1
                count += 1;
325
            }
326 1
            pos += ADDRESS_ENTRY_SIZE;
327 1
            if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
328
                break;
329
            }
330
        }
331 1
        self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET)?;
332 1
        self.write_u8(count as u8);
333 1
        Ok(())
334 1
    }
335
}
336

337
pub struct Segments {
338
    pub root_page: u64,
339
    content_page: u64,
340
    other_page: u64,
341
    last_flush: u8,
342
    pub segments: HashMap<SegmentId, Segment>,
343
    pub segments_id: HashMap<String, SegmentId>,
344
    pub temp_segments: HashMap<SegmentId, Segment>,
345
    pub temp_segments_id: HashMap<String, SegmentId>,
346
}
347

348 1
pub fn segment_hash(segment: &str) -> u32 {
349
    let mut val: u32;
350 1
    let hasher = &mut DefaultHasher::new();
351 1
    segment.hash(hasher);
352 1
    val = hasher.finish() as u32;
353 1
    val <<= 16;
354 1
    val |= u32::from(rand::random::<u16>());
355
    val
356 1
}
357

358
impl Segments {
359 1
    pub fn new(root_page: u64, allocator: &Arc<Allocator>) -> PRes<Segments> {
360 1
        let mut buffer_0 = [0; 19];
361 1
        let mut buffer_1 = [0; 19];
362
        let page_id;
363
        let other_page_id;
364
        let last_flush;
365
        {
366 1
            let mut root = allocator.load_page(root_page)?;
367 1
            root.read_exact(&mut buffer_0);
368 1
            root.read_exact(&mut buffer_1);
369 1
            let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
370 1
            last_flush = flush;
371 1
            if first {
372 1
                page_id = read_u64(&buffer_0[0..8]);
373 1
                other_page_id = read_u64(&buffer_0[8..16]);
374
            } else {
375 1
                page_id = read_u64(&buffer_1[0..8]);
376 1
                other_page_id = read_u64(&buffer_1[8..16]);
377
            }
378 1
        }
379 1
        let mut segments = HashMap::new();
380 1
        let mut segments_id = HashMap::new();
381 1
        if page_id != 0 {
382 1
            let mut page = allocator.load_page(page_id)?;
383 1
            loop {
384 1
                let flag = page.read_u8();
385 1
                if flag == 1 {
386 1
                    let first_page = page.read_u64();
387 1
                    let persistent_page = page.read_u64();
388 1
                    let persistent_pos = page.read_u32();
389 1
                    let pers_hash = SegmentId::new(page.read_u32());
390 1
                    let name_size = page.read_u16() as usize;
391

392 1
                    let mut slice: Vec<u8> = vec![0; name_size];
393 1
                    page.read_exact(&mut slice);
394 1
                    let name: String = str::from_utf8(&slice[0..name_size])?.into();
395 1
                    segments.insert(
396
                        pers_hash,
397 1
                        Segment::new(
398
                            first_page,
399
                            persistent_page,
400
                            persistent_pos,
401
                            persistent_page,
402
                            persistent_pos,
403
                            pers_hash,
404 1
                            &name,
405
                        ),
406 1
                    );
407 1
                    segments_id.insert(name, pers_hash);
408 1
                } else {
409
                    break;
410
                }
411
            }
412 1
        }
413 1
        Ok(Segments {
414
            root_page,
415 1
            content_page: page_id,
416 1
            other_page: other_page_id,
417
            last_flush,
418 1
            segments,
419 1
            segments_id,
420 1
            temp_segments: HashMap::new(),
421 1
            temp_segments_id: HashMap::new(),
422 0
        })
423 1
    }
424

425 1
    pub fn init(mut root: Page, allocator: &Allocator) -> PRes<()> {
426 1
        let mut buffer = [0; 19];
427 1
        write_u64(&mut buffer[0..8], 0);
428 1
        write_u64(&mut buffer[8..16], 0);
429 1
        prepare_buffer_flush(&mut buffer, 0);
430 1
        root.write_all(&buffer);
431 1
        allocator.flush_page(root)?;
432 1
        Ok(())
433 1
    }
434

435 1
    pub fn segment_id(&self, segment: &str) -> Option<SegmentId> {
436 1
        if let Some(id) = self.segments_id.get(segment) {
437 1
            self.segments.get(id).map(|x| x.segment_id)
438
        } else {
439 1
            None
440
        }
441 1
    }
442

443 1
    pub fn segment_by_id(&self, id: SegmentId) -> Option<&Segment> {
444 1
        self.segments.get(&id)
445 1
    }
446

447
    pub fn segment_name_by_id(&self, id: SegmentId) -> Option<String> {
448
        self.segments.get(&id).map(|s| s.name.clone())
449
    }
450

451 1
    pub fn segment_by_id_temp(&self, id: SegmentId) -> Option<&Segment> {
452 1
        self.temp_segments.get(&id)
453 1
    }
454

455 1
    pub fn has_segment(&self, segment: &str) -> bool {
456 1
        self.segments_id.contains_key(segment)
457 1
    }
458

459 1
    pub fn create_temp_segment(&mut self, allocator: &Allocator, segment: &str) -> PRes<(SegmentId, u64)> {
460 1
        let mut allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
461 1
        let allocated_id = allocated.get_index();
462 1
        let segment_id = SegmentId::new(segment_hash(segment));
463 1
        let seg = Segment::new(
464
            allocated_id,
465
            allocated_id,
466
            SEGMENT_DATA_OFFSET,
467
            allocated_id,
468
            SEGMENT_DATA_OFFSET,
469 1
            segment_id,
470
            segment,
471
        );
472 1
        self.temp_segments.insert(segment_id, seg);
473 1
        self.temp_segments_id.insert(segment.to_string(), segment_id);
474 1
        allocated.write_u64(0);
475 1
        allocated.write_u64(0);
476 1
        segment_id.write(&mut allocated);
477 1
        allocator.flush_page(allocated)?;
478 1
        Ok((segment_id, allocated_id))
479 1
    }
480

481 1
    pub fn get_temp_segment_mut(&mut self, segment: SegmentId) -> Option<&mut Segment> {
482 1
        self.temp_segments.get_mut(&segment)
483 1
    }
484

485 1
    pub fn drop_temp_segment(&mut self, allocator: &Allocator, segment: SegmentId) -> PRes<()> {
486 1
        if let Some(segment) = self.temp_segments.remove(&segment) {
487 1
            self.temp_segments_id.remove(&segment.name);
488
            //TODO: to review this, may cause disc leaks in case of crash on rollback of segment
489
            // creation
490 1
            let pages = segment.collect_segment_pages(allocator)?;
491 1
            for page in pages {
492 1
                allocator.free(page)?;
493 1
            }
494 1
        }
495 1
        Ok(())
496 1
    }
497

498 1
    pub fn exists_real_or_temp(&self, segment: SegmentId) -> bool {
499 1
        self.segments.contains_key(&segment) || self.temp_segments.contains_key(&segment)
500 1
    }
501

502 1
    pub fn create_segment(&mut self, segment: SegmentId, first_page: u64) -> PRes<()> {
503 1
        if let Some(mut s) = self.temp_segments.remove(&segment) {
504
            // This is needed mainly for recover
505 1
            s.first_page = first_page;
506 1
            self.temp_segments_id.remove(&s.name);
507 1
            self.segments_id.insert(s.name.clone(), s.segment_id);
508 1
            self.segments.insert(s.segment_id, s);
509 1
        }
510 1
        Ok(())
511 1
    }
512

513 1
    pub fn drop_segment(&mut self, segment: &str) -> PRes<()> {
514 1
        if let Some(seg) = self.segments_id.remove(segment) {
515 1
            self.segments.remove(&seg);
516
        }
517

518 1
        Ok(())
519 1
    }
520 1
    pub fn collect_segment_pages(&self, allocator: &Allocator, segment: SegmentId) -> PRes<Vec<u64>> {
521 1
        if let Some(seg) = self.segments.get(&segment) {
522 1
            seg.collect_segment_pages(allocator)
523
        } else {
524 1
            Ok(Vec::new())
525
        }
526 1
    }
527

528 1
    pub fn set_first_page(&mut self, segment: SegmentId, first_page: u64, allocator: &Allocator) -> PRes<()> {
529 1
        if let Some(seg) = self.segments.get_mut(&segment) {
530 1
            seg.first_page = first_page;
531
        }
532 1
        self.flush_segments(allocator)?;
533 1
        Ok(())
534 1
    }
535

536 1
    pub fn confirm_allocations(&mut self, segments: &[SegmentId], allocator: &Allocator, recover: bool) -> PRes<()> {
537 1
        for seg in segments {
538
            // If happen that a segment is not found something went wrong and is better crash
539 1
            let segment = if let Some(s) = self.segments.get_mut(seg) {
540 1
                s
541 1
            } else if let Some(s) = self.temp_segments.get_mut(seg) {
542
                s
543
            } else {
544 0
                panic!("segment operation when segment do not exists");
545
            };
546 1
            segment.persistent_page = segment.alloc_page;
547 1
            if recover {
548
                // In case of recover recalculate the last available position
549 1
                let mut page = allocator.load_page(segment.alloc_page)?;
550 1
                segment.alloc_pos = page.segment_first_available_pos()?;
551 1
            }
552 1
            segment.persistent_pos = segment.alloc_pos;
553
        }
554 1
        Ok(())
555 1
    }
556

557 1
    pub fn flush_segments(&mut self, allocator: &Allocator) -> PRes<()> {
558 1
        let mut buffer = Vec::<u8>::new();
559 1
        for segment in self.segments.values() {
560 1
            buffer.write_u8(1);
561 1
            buffer.write_u64(segment.first_page);
562 1
            buffer.write_u64(segment.persistent_page);
563 1
            buffer.write_u32(segment.persistent_pos);
564 1
            segment.segment_id.write(&mut buffer);
565 1
            buffer.write_u16(segment.name.len() as u16);
566 1
            buffer.write_all(segment.name.as_bytes());
567
        }
568 1
        buffer.write_u8(0);
569 1
        let exp = exp_from_content_size(buffer.len() as u64);
570 1
        let other_page_id = self.content_page;
571 1
        let mut content_page_id = self.other_page;
572
        {
573 1
            let mut content_page = if content_page_id == 0 {
574 1
                allocator.allocate(exp)?
575 0
            } else {
576 1
                let mut page = allocator.write_page(content_page_id)?;
577 1
                if page.get_size_exp() != exp {
578 1
                    allocator.free(content_page_id)?;
579 1
                    page = allocator.allocate(exp)?;
580
                }
581 1
                page
582 0
            };
583 1
            content_page_id = content_page.get_index();
584 1
            content_page.write_all(&buffer);
585 1
            allocator.flush_page(content_page)?;
586 1
        }
587

588 1
        let mut root_buffer = [0; 19];
589 1
        write_u64(&mut root_buffer[0..8], content_page_id);
590 1
        write_u64(&mut root_buffer[8..16], other_page_id);
591 1
        let (last_flush, offset) = prepare_buffer_flush(&mut root_buffer, self.last_flush);
592 1
        self.last_flush = last_flush;
593
        {
594 1
            let mut root = allocator.write_page(self.root_page)?;
595 1
            root.seek(offset)?;
596 1
            root.write_all(&root_buffer);
597 1
            allocator.flush_page(root)?;
598 1
        }
599 1
        self.content_page = content_page_id;
600 1
        self.other_page = other_page_id;
601 1
        Ok(())
602 1
    }
603

604 1
    pub fn list(&self) -> Vec<(String, SegmentId)> {
605 1
        self.segments_id.iter().map(|(n, id)| (n.clone(), *id)).collect()
606 1
    }
607 1
    pub fn snapshot_list(&self) -> Vec<(String, SegmentId, u64)> {
608 1
        self.segments
609
            .iter()
610 1
            .map(|(id, seg)| (seg.name.clone(), *id, seg.first_page))
611
            .collect()
612 1
    }
613
}
614

615
pub struct SegmentPageIterator {
616
    cur_page: u64,
617
    next_page: u64,
618
    per_page_iterator: vec::IntoIter<u32>,
619
}
620

621
impl SegmentPageIterator {
622 1
    pub fn new(first_page: u64) -> SegmentPageIterator {
623 1
        SegmentPageIterator {
624
            cur_page: first_page,
625
            next_page: first_page,
626 1
            per_page_iterator: Vec::new().into_iter(),
627
        }
628 1
    }
629

630 1
    pub fn next(&mut self, address: &Address) -> Option<RecRef> {
631
        // This loop is needed because some pages may be empty
632 1
        loop {
633 1
            let iter = self.per_page_iterator.next();
634 1
            if iter.is_none() && self.next_page != 0 {
635 1
                self.cur_page = self.next_page;
636 1
                if let Ok((next_page, elements)) = address.scan_page(self.cur_page) {
637 1
                    self.next_page = next_page;
638 1
                    self.per_page_iterator = elements.into_iter();
639 1
                    continue;
640
                }
641 1
            }
642 1
            break iter;
643
        }
644 1
        .map(|pos| RecRef::new(self.cur_page, pos))
645 1
    }
646
}
647

648
pub struct SnapshotSegmentPageIterator {
649
    cur_page: u64,
650
    next_page: u64,
651
    per_page_iterator: vec::IntoIter<(u32, bool)>,
652
}
653

654
impl SnapshotSegmentPageIterator {
655 1
    pub fn new(first_page: u64) -> SnapshotSegmentPageIterator {
656 1
        SnapshotSegmentPageIterator {
657
            cur_page: first_page,
658
            next_page: first_page,
659 1
            per_page_iterator: Vec::new().into_iter(),
660
        }
661 1
    }
662

663 1
    pub fn next(&mut self, address: &Address) -> Option<RecRef> {
664
        // This loop is needed because some pages may be empty
665 1
        loop {
666 1
            let iter = self.per_page_iterator.next();
667 1
            if iter.is_none() && self.next_page != 0 {
668 1
                self.cur_page = self.next_page;
669 1
                if let Ok((next_page, elements)) = address.scan_page_all(self.cur_page) {
670 1
                    self.next_page = next_page;
671 1
                    self.per_page_iterator = elements.into_iter();
672 1
                    continue;
673
                }
674 1
            }
675 1
            break iter;
676
        }
677 1
        .map(|(pos, _exist)| RecRef::new(self.cur_page, pos))
678 1
    }
679
}
680

681
#[cfg(test)]
682
mod tests {
683
    use super::{segment_hash, SegmentPage, SegmentPageRead, Segments};
684
    use crate::{
685
        address::ADDRESS_PAGE_EXP,
686
        allocator::Allocator,
687
        config::Config,
688
        discref::{DiscRef, Page, PageOps},
689
        id::SegmentId,
690
    };
691
    use std::sync::Arc;
692
    use tempfile::Builder;
693

694 1
    fn create_allocator(file_name: &str) -> Allocator {
695 1
        let file = Builder::new()
696
            .prefix(file_name)
697
            .suffix(".persy")
698
            .tempfile()
699
            .unwrap()
700
            .reopen()
701 1
            .unwrap();
702 1
        let config = Arc::new(Config::new());
703 1
        let disc = Box::new(DiscRef::new(file).unwrap());
704 1
        let (_, allocator) = Allocator::init(disc, &config).unwrap();
705 1
        allocator.allocate(5).unwrap(); //Just to be sure it not start from 0, it cannot happen in not test cases.
706

707
        allocator
708 1
    }
709

710
    #[test]
711 1
    fn test_create_drop_segment() {
712 1
        let allocator = create_allocator("./raw_segment_create_delete.persy");
713 1
        let all = Arc::new(allocator);
714 1
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
715 1
        let root_index = root.get_index();
716 1
        Segments::init(root, &all).unwrap();
717 1
        let mut segments = Segments::new(root_index, &all).unwrap();
718

719 1
        let (id, fp) = segments.create_temp_segment(&all, "some").unwrap();
720 1
        segments.create_segment(id, fp).unwrap();
721 1
        segments.flush_segments(&all).unwrap();
722 1
        assert!(segments.segments_id.contains_key("some"));
723 1
        assert!(segments.segments.contains_key(&id));
724 1
        segments.drop_segment("some").unwrap();
725 1
        segments.flush_segments(&all).unwrap();
726 1
        assert!(!segments.segments_id.contains_key("some"));
727 1
        assert!(!segments.segments.contains_key(&id));
728 1
    }
729

730
    #[test]
731 1
    fn test_create_drop_temp_segment() {
732 1
        let allocator = create_allocator("./segment_create_delete.persy");
733 1
        let all = Arc::new(allocator);
734 1
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
735 1
        let root_index = root.get_index();
736 1
        Segments::init(root, &all).unwrap();
737 1
        let mut segments = Segments::new(root_index, &all).unwrap();
738

739 1
        let (id, _) = segments.create_temp_segment(&all, "some").unwrap();
740 1
        assert!(segments.temp_segments.contains_key(&id));
741 1
        assert!(segments.temp_segments_id.contains_key("some"));
742 1
        segments.drop_temp_segment(&all, id).unwrap();
743 1
        assert!(!segments.temp_segments.contains_key(&id));
744 1
        assert!(!segments.temp_segments_id.contains_key("some"));
745 1
    }
746

747
    #[test]
748 1
    fn test_create_close_drop_close_segment() {
749 1
        let allocator = create_allocator("./segment_pers_create_delete.persy");
750 1
        let all = Arc::new(allocator);
751 1
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
752 1
        let root_index = root.get_index();
753 1
        Segments::init(root, &all).unwrap();
754
        let id;
755
        {
756 1
            let mut segments = Segments::new(root_index, &all).unwrap();
757 1
            let (c_id, fp) = segments.create_temp_segment(&all, "some").unwrap();
758 1
            id = c_id;
759 1
            segments.create_segment(id, fp).unwrap();
760 1
            segments.flush_segments(&all).unwrap();
761 1
        }
762
        {
763 1
            let mut segments = Segments::new(root_index, &all).unwrap();
764 1
            assert_eq!(segments.segments.len(), 1);
765 1
            assert!(segments.segments_id.contains_key("some"));
766 1
            assert!(segments.segments.contains_key(&id));
767 1
            segments.drop_segment("some").unwrap();
768 1
            segments.flush_segments(&all).unwrap();
769 1
        }
770
        {
771 1
            let segments = Segments::new(root_index, &all).unwrap();
772 1
            assert!(!segments.segments_id.contains_key("some"));
773 1
            assert!(!segments.segments.contains_key(&id));
774 1
        }
775 1
    }
776

777
    #[test]
778 1
    fn test_create_close_drop_close_segment_off_page() {
779 1
        let allocator = create_allocator("./segment_pers_create_delete_off_page.persy");
780 1
        let all = Arc::new(allocator);
781 1
        let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
782 1
        let root_index = root.get_index();
783 1
        Segments::init(root, &all).unwrap();
784
        {
785 1
            let mut segments = Segments::new(root_index, &all).unwrap();
786 1
            for i in 0..100 {
787 1
                let (id, fp) = segments.create_temp_segment(&all, &format!("some{}", i)).unwrap();
788 1
                segments.create_segment(id, fp).unwrap();
789
            }
790 1
            segments.flush_segments(&all).unwrap();
791 1
        }
792
        {
793 1
            let mut segments = Segments::new(root_index, &all).unwrap();
794 1
            for i in 0..100 {
795 1
                assert!(segments.segments_id.contains_key(&format!("some{}", i)));
796 1
                segments.drop_segment(&format!("some{}", i)).unwrap();
797
            }
798 1
            segments.flush_segments(&all).unwrap();
799 1
        }
800
        {
801 1
            let segments = Segments::new(root_index, &all).unwrap();
802 1
            for i in 0..100 {
803 1
                assert!(!segments.segments_id.contains_key(&format!("some{}", i)));
804
            }
805 1
        }
806 1
    }
807

808
    #[test]
809 1
    fn test_seg_insert_read_pointer() {
810 1
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
811 1
        let segment_id = SegmentId::new(0);
812 1
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
813 1
        let read = page.segment_read_entry(segment_id, 30).unwrap();
814 0
        match read {
815 1
            Some(val) => assert_eq!(val.0, 10),
816 0
            None => assert!(false),
817
        }
818 1
    }
819

820
    #[test]
821 1
    fn test_seg_insert_update_read_pointer() {
822 1
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
823 1
        let segment_id = SegmentId::new(0);
824 1
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
825 1
        page.segment_update_entry(segment_id, 30, 15).unwrap();
826 1
        let read = page.segment_read_entry(segment_id, 30).unwrap();
827 0
        match read {
828 1
            Some(val) => assert_eq!(val.0, 15),
829 0
            None => assert!(false),
830
        }
831 1
    }
832

833
    #[test]
834 1
    fn test_seg_insert_delete_read_pointer() {
835 1
        let mut page = Page::new(vec![0; 1024], 0, 0, 10);
836 1
        let segment_id = SegmentId::new(0);
837 1
        page.segment_insert_entry(segment_id, 30, 10).unwrap();
838 1
        page.segment_delete_entry(segment_id, 30).unwrap();
839 1
        let read = page.segment_read_entry(segment_id, 30).unwrap();
840 0
        match read {
841 1
            Some(_) => assert!(false),
842
            None => assert!(true),
843
        }
844 1
    }
845

846
    #[test]
847 1
    fn test_hash_id_generator() {
848 1
        assert!(0 != segment_hash("some"));
849 1
    }
850
}

Read our documentation on viewing source code .

Loading