1
use crate::transaction::{
2
    Cleanup, Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata, NewSegmentPage,
3
    PrepareCommit, ReadRecord, Rollback, Transaction, UpdateRecord,
4
};
5
use crate::{
6
    allocator::Allocator,
7
    config::TxStrategy,
8
    discref::{Page, PageOps, ReadPage, PAGE_METADATA_SIZE},
9
    error::PRes,
10
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
11
    id::SegmentId,
12
    io::{
13
        read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite,
14
        InfallibleWriteFormat, InfallibleWriteVarInt,
15
    },
16
    persy::RecoverStatus,
17
};
18
use std::{
19
    collections::hash_map::HashMap,
20
    str,
21
    sync::{Arc, Mutex, MutexGuard},
22
};
23

24
pub const JOURNAL_PAGE_EXP: u8 = 10; // 2^10
25
const JOURNAL_PAGE_SIZE: u32 = (1 << JOURNAL_PAGE_EXP) - PAGE_METADATA_SIZE; // 2^ 10 -2 size - page header
26
const JOURNAL_PAGE_NEXT_OFFSET: u32 = 0;
27
const JOURNAL_PAGE_PREV_OFFSET: u32 = 8;
28
const JOURNAL_PAGE_CONTENT_OFFSET: u32 = 16;
29

30
struct StartListEntry {
31
    next: Option<JournalId>,
32
    prev: Option<JournalId>,
33
}
34

35
impl StartListEntry {
36 1
    pub fn new(prev: Option<JournalId>) -> StartListEntry {
37 1
        StartListEntry { next: None, prev }
38 1
    }
39
}
40

41
struct StartList {
42
    transactions: HashMap<JournalId, StartListEntry>,
43
    last: Option<JournalId>,
44
}
45

46
impl StartList {
47 1
    fn new() -> StartList {
48 1
        StartList {
49 1
            transactions: HashMap::new(),
50 1
            last: None,
51
        }
52 1
    }
53

54 1
    fn push(&mut self, id: &JournalId) {
55 1
        self.transactions
56 1
            .insert(id.clone(), StartListEntry::new(self.last.clone()));
57 1
        if let Some(ref lst) = self.last {
58 1
            self.transactions.get_mut(lst).unwrap().next = Some(id.clone());
59
        }
60 1
        self.last = Some(id.clone());
61 1
    }
62

63 1
    fn remove(&mut self, id: &JournalId) -> bool {
64 1
        if let Some(entry) = self.transactions.remove(id) {
65 1
            if let Some(ref next) = entry.next {
66 1
                self.transactions.get_mut(next).unwrap().prev = entry.prev.clone();
67
            }
68 1
            if let Some(ref prev) = entry.prev {
69 1
                self.transactions.get_mut(prev).unwrap().next = entry.next.clone();
70
            }
71 1
            if let Some(ref l) = self.last {
72 1
                if l == id {
73 1
                    self.last = entry.prev.clone();
74
                }
75
            }
76 1
            entry.prev.is_none()
77
        } else {
78 1
            false
79
        }
80 1
    }
81
}
82

83
struct JournalPagesToFree {
84
    free_tx_id: JournalId,
85
    pages: Vec<u64>,
86
}
87

88
struct JournalShared {
89
    root: u64,
90
    first_page: u64,
91
    last_page: u64,
92
    last_pos: u32,
93
    starts: StartList,
94
    current: Page,
95
    last_flush: u8,
96
    to_clear: Vec<JournalId>,
97
    to_free: Option<JournalPagesToFree>,
98
}
99

100
/// Journal segment is the area where the transactional log is kept
101
pub struct Journal {
102
    allocator: Arc<Allocator>,
103
    journal: Mutex<JournalShared>,
104
}
105

106
pub(crate) trait JournalEntry {
107
    fn get_type(&self) -> u8;
108
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()>;
109
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()>;
110
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus>;
111
}
112

113
pub struct Start {}
114

115 1
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
116
pub struct JournalId {
117 1
    page: u64,
118 1
    pos: u32,
119
}
120

121 1
fn recover_entry<T>(entry: &mut dyn JournalEntry, page: &mut ReadPage, found: &mut T, id: &JournalId) -> PRes<()>
122
where
123
    T: FnMut(&dyn JournalEntry, &JournalId),
124
{
125 1
    entry.read(page)?;
126 1
    found(entry, id);
127 1
    Ok(())
128 1
}
129

130
impl Journal {
131 1
    pub fn new(all: &Arc<Allocator>, page: u64) -> PRes<Journal> {
132 1
        let first_page;
133
        let last_flush;
134 1
        let mut buffer_0 = [0; 11];
135 1
        let mut buffer_1 = [0; 11];
136

137
        let current;
138
        {
139 1
            let mut page = all.load_page(page)?;
140 1
            page.read_exact(&mut buffer_0);
141 1
            page.read_exact(&mut buffer_1);
142 1
            let (last_flush_ret, first) = double_buffer_check(&buffer_0, &buffer_1);
143 1
            last_flush = last_flush_ret;
144 1
            first_page = read_u64(if first { &buffer_0[0..8] } else { &buffer_1[0..8] });
145 1
        }
146 1
        current = if first_page != 0 {
147 1
            all.write_page(first_page)?
148 0
        } else {
149
            // Empty 0 sized page
150 0
            Page::new(Vec::new(), 0, 0, 0)
151
        };
152 1
        let journal = JournalShared {
153 1
            root: page,
154 1
            first_page,
155 1
            last_page: first_page,
156
            last_pos: 0,
157 1
            starts: StartList::new(),
158 1
            current,
159
            last_flush,
160 1
            to_clear: Vec::new(),
161 1
            to_free: None,
162 0
        };
163

164 1
        Ok(Journal {
165 1
            allocator: all.clone(),
166 1
            journal: Mutex::new(journal),
167 0
        })
168 1
    }
169

170 1
    pub fn init(allocator: &Allocator) -> PRes<u64> {
171 1
        let mut root_page = allocator.allocate(5)?;
172 1
        let first_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
173 1
        let mut buffer = [0; 11];
174 1
        write_u64(&mut buffer[0..8], first_page.get_index());
175 1
        prepare_buffer_flush(&mut buffer, 0);
176 1
        let root_page_index = root_page.get_index();
177 1
        root_page.write_all(&buffer);
178 1
        allocator.flush_page(root_page)?;
179 1
        Ok(root_page_index)
180 1
    }
181

182 1
    pub fn start(&self) -> PRes<JournalId> {
183 1
        let val = self.internal_log(&Start::new(), &JournalId::new(0, 0), false)?;
184 1
        let id = JournalId::new(val.0, val.1);
185 1
        self.journal.lock()?.starts.push(&id);
186 1
        Ok(id)
187 1
    }
188

189 1
    pub(crate) fn prepare(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
190 1
        self.internal_log(entry, id, true)?;
191 1
        Ok(())
192 1
    }
193

194 1
    pub(crate) fn end(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
195 1
        self.internal_log(entry, id, true)?;
196
        // TODO:optimization keep in memory last page and move the flush here.
197 1
        Ok(())
198 1
    }
199

200 1
    pub fn clear_in_queque(&self) -> PRes<()> {
201 1
        let mut pages_to_free = Vec::new();
202
        {
203 1
            let mut lock = self.journal.lock()?;
204 1
            let ids = lock.to_clear.clone();
205 1
            lock.to_clear.clear();
206 1
            for id in ids {
207 1
                if lock.starts.remove(&id) {
208 1
                    let first_page = lock.first_page;
209 1
                    let mut free_cursor = id.page;
210 0
                    loop {
211
                        let read;
212
                        {
213 1
                            let mut cur = self.allocator.load_page(free_cursor)?;
214 1
                            cur.seek(JOURNAL_PAGE_PREV_OFFSET)?;
215 1
                            read = cur.read_u64();
216 1
                        }
217 1
                        if free_cursor != id.page {
218 0
                            pages_to_free.push(free_cursor)
219
                        }
220 1
                        if free_cursor == first_page {
221
                            break;
222
                        }
223 0
                        free_cursor = read;
224
                    }
225

226 1
                    let mut buffer = [0; 11];
227 1
                    write_u64(&mut buffer[0..8], id.page);
228 1
                    let (flush, offset) = prepare_buffer_flush(&mut buffer, lock.last_flush);
229 1
                    lock.last_flush = flush;
230 1
                    let mut root_page = self.allocator.write_page(lock.root)?;
231 1
                    root_page.seek(offset)?;
232 1
                    root_page.write_all(&buffer);
233 1
                    self.allocator.flush_page(root_page)?;
234 1
                    lock.first_page = id.page;
235 1
                }
236 1
            }
237 1
        }
238 1
        self.free_pages_tx(pages_to_free)?;
239 1
        Ok(())
240 1
    }
241 1
    pub fn free_pages_tx(&self, pages_to_free: Vec<u64>) -> PRes<()> {
242 1
        let mut prev_tx_id = None;
243
        {
244
            //Free the pages of previous clear, logged 2 tx ahead, now synced because of the second
245
            //tx
246 1
            let mut lock = self.journal.lock()?;
247 1
            if let Some(to_free) = &lock.to_free {
248 0
                for page in &to_free.pages {
249 0
                    self.allocator.free(*page)?;
250
                }
251 0
                prev_tx_id = Some(to_free.free_tx_id.clone());
252
            }
253 1
            lock.to_free = None;
254 1
        }
255 1
        if let Some(id) = prev_tx_id {
256
            // Enqueue for clear the free pages journal pages tx
257 0
            self.finished_to_clean(&[id])?;
258
        }
259 1
        if !pages_to_free.is_empty() {
260
            // Log the pages removed from the journal for free
261 0
            let id = self.start()?;
262 0
            for page in &pages_to_free {
263 0
                self.log(&FreedPage::new(*page), &id)?;
264
            }
265 0
            self.log(&PrepareCommit::new(), &id)?;
266 0
            self.end(&Commit::new(), &id)?;
267 0
            let mut lock = self.journal.lock()?;
268 0
            lock.to_free = Some(JournalPagesToFree {
269 0
                free_tx_id: id,
270 0
                pages: pages_to_free,
271 0
            });
272 0
        }
273 1
        Ok(())
274 1
    }
275

276 1
    pub fn finished_to_clean(&self, ids: &[JournalId]) -> PRes<()> {
277 1
        for id in ids {
278 1
            self.log(&Cleanup::new(), id)?;
279
        }
280 1
        let mut lock = self.journal.lock()?;
281 1
        lock.to_clear.extend_from_slice(ids);
282 1
        Ok(())
283 1
    }
284

285 1
    pub(crate) fn log(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
286 1
        self.internal_log(entry, id, false)?;
287 1
        Ok(())
288 1
    }
289

290 1
    fn internal_log(&self, entry: &dyn JournalEntry, id: &JournalId, flush: bool) -> PRes<(u64, u32)> {
291 1
        let mut buffer = Vec::<u8>::new();
292 1
        buffer.write_u8(entry.get_type());
293 1
        buffer.write_varint_u64(id.page);
294 1
        buffer.write_varint_u32(id.pos);
295 1
        entry.write(&mut buffer)?;
296

297
        let cur_page;
298
        let cur_pos;
299
        {
300 1
            let mut jr = self.journal.lock()?;
301 1
            self.required_space(buffer.len() as u32, &mut jr)?;
302 1
            cur_page = jr.last_page;
303 1
            cur_pos = jr.last_pos;
304 1
            jr.current.seek(cur_pos)?;
305 1
            jr.current.write_all(&*buffer);
306 1
            if flush {
307 1
                self.allocator.flush_page(jr.current.clone())?;
308
            }
309 1
            jr.last_pos += buffer.len() as u32;
310 1
        }
311 1
        Ok((cur_page, cur_pos))
312 1
    }
313

314 1
    pub(crate) fn recover<T>(&self, mut found: T) -> PRes<Vec<u64>>
315
    where
316
        T: FnMut(&dyn JournalEntry, &JournalId),
317
    {
318 1
        let mut journal_pages = Vec::new();
319 1
        let mut jr = self.journal.lock()?;
320 1
        let mut cur_page = jr.first_page;
321 1
        jr.last_page = jr.first_page;
322 1
        journal_pages.push(cur_page);
323 1
        let mut page = self.allocator.load_page(cur_page)?;
324 1
        page.seek(JOURNAL_PAGE_CONTENT_OFFSET)?;
325 1
        loop {
326 1
            let tp = page.read_u8();
327 1
            if tp == 0 {
328 1
                let last_pos = page.cursor_pos() as u32;
329 1
                page.seek(JOURNAL_PAGE_NEXT_OFFSET)?;
330 1
                cur_page = page.read_u64();
331 1
                if cur_page == 0 {
332 1
                    jr.last_pos = last_pos - 1;
333
                    break;
334
                }
335 0
                journal_pages.push(cur_page);
336 0
                page = self.allocator.load_page(cur_page)?;
337 0
                page.seek(JOURNAL_PAGE_CONTENT_OFFSET)?;
338 0
                jr.last_page = cur_page;
339
            } else {
340 1
                let page_id = page.read_varint_u64();
341 1
                let pos = page.read_varint_u32();
342 1
                let id = JournalId::new(page_id, pos);
343 1
                let ref_page = &mut page;
344 1
                let ref_found = &mut found;
345 1
                match tp {
346
                    //The Start entry has no valid id, should not be recovered
347 1
                    1 => Start::new().read(&mut page)?,
348 1
                    2 => recover_entry(&mut InsertRecord::default(), ref_page, ref_found, &id)?,
349 1
                    3 => recover_entry(&mut PrepareCommit::default(), ref_page, ref_found, &id)?,
350 1
                    4 => recover_entry(&mut Commit::default(), ref_page, ref_found, &id)?,
351 1
                    5 => recover_entry(&mut UpdateRecord::default(), ref_page, ref_found, &id)?,
352 1
                    6 => recover_entry(&mut DeleteRecord::default(), ref_page, ref_found, &id)?,
353 1
                    7 => recover_entry(&mut Rollback::default(), ref_page, ref_found, &id)?,
354 1
                    8 => recover_entry(&mut CreateSegment::default(), ref_page, ref_found, &id)?,
355 0
                    9 => recover_entry(&mut DropSegment::default(), ref_page, ref_found, &id)?,
356 0
                    10 => recover_entry(&mut ReadRecord::default(), ref_page, ref_found, &id)?,
357 1
                    11 => recover_entry(&mut Metadata::default(), ref_page, ref_found, &id)?,
358 1
                    12 => recover_entry(&mut FreedPage::default(), ref_page, ref_found, &id)?,
359 0
                    13 => recover_entry(&mut NewSegmentPage::default(), ref_page, ref_found, &id)?,
360 1
                    14 => recover_entry(&mut Cleanup::default(), ref_page, ref_found, &id)?,
361 0
                    _ => panic!(" wrong log entry {} ", tp),
362
                };
363
            }
364
        }
365 1
        jr.current = self.allocator.write_page(jr.last_page)?;
366 1
        Ok(journal_pages)
367 1
    }
368

369 1
    fn required_space(&self, space: u32, jr: &mut MutexGuard<JournalShared>) -> PRes<()> {
370
        // if there is no page or the  'current content' + 'space required' + 'end marker' is more
371
        // than the page, allocate new page and link the previous one
372 1
        if jr.last_pos + space + 1 >= JOURNAL_PAGE_SIZE as u32 {
373 1
            let prev = jr.last_page;
374 1
            let last_pos = jr.last_pos;
375 1
            let new_page = self.allocator.allocate(JOURNAL_PAGE_EXP)?;
376 1
            if prev != 0 {
377 1
                jr.current.seek(JOURNAL_PAGE_NEXT_OFFSET)?;
378 1
                jr.current.write_u64(new_page.get_index());
379 1
                jr.current.seek(last_pos)?;
380 1
                jr.current.write_u8(0);
381 1
                self.allocator.flush_page(jr.current.clone())?;
382
            }
383 1
            jr.last_page = new_page.get_index();
384 1
            jr.current = new_page;
385 1
            jr.current.seek(JOURNAL_PAGE_PREV_OFFSET)?;
386 1
            jr.current.write_u64(prev);
387 1
            self.allocator.flush_page(jr.current.clone())?;
388 1
            jr.last_pos = JOURNAL_PAGE_CONTENT_OFFSET;
389 1
        }
390 1
        Ok(())
391 1
    }
392
}
393

394
impl JournalEntry for DeleteRecord {
395 1
    fn get_type(&self) -> u8 {
396
        6
397 1
    }
398

399 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
400 1
        self.segment.write_varint(buffer);
401 1
        buffer.write_varint_u64(self.recref.page);
402 1
        buffer.write_varint_u32(self.recref.pos);
403 1
        buffer.write_varint_u16(self.version);
404 1
        Ok(())
405 1
    }
406

407 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
408 1
        self.segment = SegmentId::read_varint(buffer);
409 1
        self.recref.page = buffer.read_varint_u64();
410 1
        self.recref.pos = buffer.read_varint_u32();
411 1
        self.version = buffer.read_varint_u16();
412 1
        Ok(())
413 1
    }
414

415 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
416 1
        tx.recover_delete(self);
417 1
        Ok(RecoverStatus::Started)
418 1
    }
419
}
420

421
impl JournalEntry for ReadRecord {
422 1
    fn get_type(&self) -> u8 {
423
        10
424 1
    }
425

426 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
427 1
        self.segment.write_varint(buffer);
428 1
        buffer.write_varint_u64(self.recref.page);
429 1
        buffer.write_varint_u32(self.recref.pos);
430 1
        buffer.write_varint_u16(self.version);
431 1
        Ok(())
432 1
    }
433

434 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
435 1
        self.segment = SegmentId::read_varint(buffer);
436 1
        self.recref.page = buffer.read_varint_u64();
437 1
        self.recref.pos = buffer.read_varint_u32();
438 1
        self.version = buffer.read_varint_u16();
439 1
        Ok(())
440 1
    }
441

442 0
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
443 0
        tx.recover_read(self);
444 0
        Ok(RecoverStatus::Started)
445 0
    }
446
}
447
impl JournalEntry for UpdateRecord {
448 1
    fn get_type(&self) -> u8 {
449
        5
450 1
    }
451

452 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
453 1
        self.segment.write_varint(buffer);
454 1
        buffer.write_varint_u64(self.recref.page);
455 1
        buffer.write_varint_u32(self.recref.pos);
456 1
        buffer.write_varint_u64(self.record_page);
457 1
        buffer.write_varint_u16(self.version);
458 1
        Ok(())
459 1
    }
460

461 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
462 1
        self.segment = SegmentId::read_varint(buffer);
463 1
        self.recref.page = buffer.read_varint_u64();
464 1
        self.recref.pos = buffer.read_varint_u32();
465 1
        self.record_page = buffer.read_varint_u64();
466 1
        self.version = buffer.read_varint_u16();
467 1
        Ok(())
468 1
    }
469

470 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
471 1
        tx.recover_update(self);
472 1
        Ok(RecoverStatus::Started)
473 1
    }
474
}
475

476
impl JournalEntry for InsertRecord {
477 1
    fn get_type(&self) -> u8 {
478
        2
479 1
    }
480

481 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
482 1
        self.segment.write_varint(buffer);
483 1
        buffer.write_varint_u64(self.recref.page);
484 1
        buffer.write_varint_u32(self.recref.pos);
485 1
        buffer.write_varint_u64(self.record_page);
486 1
        Ok(())
487 1
    }
488

489 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
490 1
        self.segment = SegmentId::read_varint(buffer);
491 1
        self.recref.page = buffer.read_varint_u64();
492 1
        self.recref.pos = buffer.read_varint_u32();
493 1
        self.record_page = buffer.read_varint_u64();
494 1
        Ok(())
495 1
    }
496

497 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
498 1
        tx.recover_insert(self);
499 1
        Ok(RecoverStatus::Started)
500 1
    }
501
}
502

503
impl JournalEntry for PrepareCommit {
504 1
    fn get_type(&self) -> u8 {
505
        3
506 1
    }
507

508 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
509 1
        Ok(())
510 1
    }
511

512 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
513 1
        Ok(())
514 1
    }
515

516 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
517 1
        Ok(RecoverStatus::PrepareCommit)
518 1
    }
519
}
520

521
impl JournalEntry for Commit {
522 1
    fn get_type(&self) -> u8 {
523
        4
524 1
    }
525

526 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
527 1
        Ok(())
528 1
    }
529

530 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
531 1
        Ok(())
532 1
    }
533

534 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
535 1
        Ok(RecoverStatus::Commit)
536 1
    }
537
}
538

539
impl JournalEntry for Cleanup {
540 1
    fn get_type(&self) -> u8 {
541
        14
542 1
    }
543

544 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
545 1
        Ok(())
546 1
    }
547

548 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
549 1
        Ok(())
550 1
    }
551

552 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
553 1
        Ok(RecoverStatus::Cleanup)
554 1
    }
555
}
556

557
impl JournalEntry for Metadata {
558 1
    fn get_type(&self) -> u8 {
559
        11
560 1
    }
561

562 1
    fn write(&self, write: &mut dyn InfallibleWrite) -> PRes<()> {
563 1
        write.write_varint_u8(self.strategy.value());
564 1
        let len = self.meta_id.len();
565 1
        write.write_varint_u16(len as u16);
566 1
        write.write_all(&self.meta_id);
567 1
        Ok(())
568 1
    }
569

570 1
    fn read(&mut self, read: &mut dyn InfallibleRead) -> PRes<()> {
571 1
        self.strategy = TxStrategy::from_value(read.read_varint_u8());
572 1
        let len = read.read_varint_u16();
573 1
        let mut slice: Vec<u8> = vec![0; len as usize];
574 1
        read.read_exact(&mut slice[0..len as usize]);
575 1
        self.meta_id = slice;
576 1
        Ok(())
577 1
    }
578

579 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
580 1
        tx.recover_metadata(self);
581 1
        Ok(RecoverStatus::Started)
582 1
    }
583
}
584

585
impl JournalEntry for Start {
586 1
    fn get_type(&self) -> u8 {
587
        1
588 1
    }
589

590 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
591 1
        Ok(())
592 1
    }
593

594 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
595 1
        Ok(())
596 1
    }
597

598 0
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
599 0
        panic!("this should never be called")
600
    }
601
}
602

603
impl JournalEntry for Rollback {
604 1
    fn get_type(&self) -> u8 {
605
        7
606 1
    }
607

608 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
609 1
        Ok(())
610 1
    }
611

612 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
613 1
        Ok(())
614 1
    }
615 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
616 1
        Ok(RecoverStatus::Rollback)
617 1
    }
618
}
619

620
impl JournalEntry for CreateSegment {
621 1
    fn get_type(&self) -> u8 {
622
        8
623 1
    }
624

625 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
626 1
        self.segment_id.write_varint(buffer);
627 1
        buffer.write_varint_u64(self.first_page);
628 1
        buffer.write_varint_u16(self.name.len() as u16);
629 1
        buffer.write_all(self.name.as_bytes());
630 1
        Ok(())
631 1
    }
632

633 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
634 1
        self.segment_id = SegmentId::read_varint(buffer);
635 1
        self.first_page = buffer.read_varint_u64();
636 1
        let string_size = buffer.read_varint_u16();
637 1
        let mut slice: Vec<u8> = vec![0; string_size as usize];
638 1
        buffer.read_exact(&mut slice[0..string_size as usize]);
639 1
        self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
640 1
        Ok(())
641 1
    }
642

643 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
644 1
        tx.recover_add(self);
645 1
        Ok(RecoverStatus::Started)
646 1
    }
647
}
648

649
impl JournalEntry for DropSegment {
650 1
    fn get_type(&self) -> u8 {
651
        9
652 1
    }
653

654 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
655 1
        self.segment_id.write_varint(buffer);
656 1
        buffer.write_varint_u16(self.name.len() as u16);
657 1
        buffer.write_all(self.name.as_bytes());
658 1
        Ok(())
659 1
    }
660

661 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
662 1
        self.segment_id = SegmentId::read_varint(buffer);
663 1
        let string_size = buffer.read_varint_u16();
664 1
        let mut slice: Vec<u8> = vec![0; string_size as usize];
665 1
        buffer.read_exact(&mut slice[0..string_size as usize]);
666 1
        self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
667 1
        Ok(())
668 1
    }
669

670 0
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
671 0
        tx.recover_drop(self);
672 0
        Ok(RecoverStatus::Started)
673 0
    }
674
}
675

676
impl JournalEntry for FreedPage {
677 1
    fn get_type(&self) -> u8 {
678
        12
679 1
    }
680

681 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
682 1
        buffer.write_varint_u64(self.page);
683 1
        Ok(())
684 1
    }
685

686 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
687 1
        self.page = buffer.read_varint_u64();
688 1
        Ok(())
689 1
    }
690

691 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
692 1
        tx.recover_freed_page(self);
693 1
        Ok(RecoverStatus::Started)
694 1
    }
695
}
696

697
impl JournalEntry for NewSegmentPage {
698 1
    fn get_type(&self) -> u8 {
699
        13
700 1
    }
701

702 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
703 1
        self.segment.write_varint(buffer);
704 1
        buffer.write_varint_u64(self.page);
705 1
        buffer.write_varint_u64(self.previous);
706 1
        Ok(())
707 1
    }
708

709 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
710 1
        self.segment = SegmentId::read_varint(buffer);
711 1
        self.page = buffer.read_varint_u64();
712 1
        self.previous = buffer.read_varint_u64();
713 1
        Ok(())
714 1
    }
715

716 0
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
717 0
        tx.recover_new_segment_page(self);
718 0
        Ok(RecoverStatus::Started)
719 0
    }
720
}
721
impl JournalId {
722 1
    pub fn new(page: u64, pos: u32) -> JournalId {
723 1
        JournalId { page, pos }
724 1
    }
725
}
726

727
impl Start {
728 0
    fn new() -> Start {
729
        Start {}
730 1
    }
731
}
732

733
#[cfg(test)]
734
mod tests {
735
    use super::{Journal, JournalEntry, JournalId, StartList};
736
    use crate::transaction::{
737
        CreateSegment, DeleteRecord, DropSegment, InsertRecord, Metadata, NewSegmentPage, ReadRecord, UpdateRecord,
738
    };
739
    use crate::{
740
        allocator::Allocator,
741
        config::{Config, TxStrategy},
742
        discref::DiscRef,
743
        id::{RecRef, SegmentId},
744
    };
745
    use std::{
746
        io::{Cursor, Seek, SeekFrom},
747
        sync::Arc,
748
    };
749
    use tempfile::Builder;
750

751
    #[test]
752 1
    fn start_list_add_remove() {
753 1
        let mut start = StartList::new();
754 1
        start.push(&JournalId::new(1, 2));
755 1
        start.push(&JournalId::new(1, 4));
756 1
        assert!(start.remove(&JournalId::new(1, 2)));
757 1
        assert!(start.remove(&JournalId::new(1, 4)));
758

759 1
        start.push(&JournalId::new(1, 2));
760 1
        start.push(&JournalId::new(1, 4));
761 1
        assert!(!start.remove(&JournalId::new(1, 4)));
762 1
        assert!(start.remove(&JournalId::new(1, 2)));
763 1
    }
764

765
    #[test]
766 1
    fn start_list_add_remove_other_order() {
767 1
        let mut start = StartList::new();
768 1
        start.push(&JournalId::new(1, 1));
769 1
        start.push(&JournalId::new(1, 2));
770 1
        assert!(!start.remove(&JournalId::new(1, 2)));
771 1
        start.push(&JournalId::new(1, 3));
772 1
        assert!(!start.remove(&JournalId::new(1, 3)));
773 1
        assert!(start.remove(&JournalId::new(1, 1)));
774 1
    }
775

776
    #[test]
777 1
    fn read_write_insert_record() {
778 1
        let mut buffer = Vec::<u8>::new();
779 1
        let seg_id = SegmentId::new(10);
780 1
        let to_write = InsertRecord::new(seg_id, &RecRef::new(20, 10), 3);
781

782 1
        to_write.write(&mut buffer).unwrap();
783

784 1
        let mut to_read = InsertRecord::default();
785 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
786 1
        to_read.read(&mut cursor).unwrap();
787 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
788 1
        assert_eq!(to_read.segment, seg_id);
789 1
        assert_eq!(to_read.recref.page, 20);
790 1
        assert_eq!(to_read.recref.pos, 10);
791 1
        assert_eq!(to_read.record_page, 3);
792 1
    }
793

794
    #[test]
795 1
    fn read_write_insert_read() {
796 1
        let mut buffer = Vec::<u8>::new();
797 1
        let seg_id = SegmentId::new(10);
798 1
        let to_write = ReadRecord::new(seg_id, &RecRef::new(20, 10), 3);
799

800 1
        to_write.write(&mut buffer).unwrap();
801

802 1
        let mut to_read = ReadRecord::default();
803 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
804 1
        to_read.read(&mut cursor).unwrap();
805 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
806 1
        assert_eq!(to_read.segment, seg_id);
807 1
        assert_eq!(to_read.recref.page, 20);
808 1
        assert_eq!(to_read.recref.pos, 10);
809 1
        assert_eq!(to_read.version, 3);
810 1
    }
811

812
    #[test]
813 1
    fn read_write_update_record() {
814 1
        let mut buffer = Vec::<u8>::new();
815 1
        let seg_id = SegmentId::new(10);
816 1
        let to_write = UpdateRecord::new(seg_id, &RecRef::new(20, 10), 3, 1);
817

818 1
        to_write.write(&mut buffer).unwrap();
819

820 1
        let mut to_read = UpdateRecord::default();
821 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
822 1
        to_read.read(&mut cursor).unwrap();
823 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
824 1
        assert_eq!(to_read.segment, seg_id);
825 1
        assert_eq!(to_read.recref.page, 20);
826 1
        assert_eq!(to_read.recref.pos, 10);
827 1
        assert_eq!(to_read.record_page, 3);
828 1
        assert_eq!(to_read.version, 1);
829 1
    }
830

831
    #[test]
832 1
    fn read_write_delete_record() {
833 1
        let mut buffer = Vec::<u8>::new();
834 1
        let seg_id = SegmentId::new(10);
835 1
        let to_write = DeleteRecord::new(seg_id, &RecRef::new(20, 10), 1);
836

837 1
        to_write.write(&mut buffer).unwrap();
838

839 1
        let mut to_read = DeleteRecord::default();
840 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
841 1
        to_read.read(&mut cursor).unwrap();
842 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
843 1
        assert_eq!(to_read.segment, seg_id);
844 1
        assert_eq!(to_read.recref.page, 20);
845 1
        assert_eq!(to_read.recref.pos, 10);
846 1
        assert_eq!(to_read.version, 1);
847 1
    }
848

849
    #[test]
850 1
    fn read_write_create_segment() {
851 1
        let mut buffer = Vec::<u8>::new();
852 1
        let seg_id = SegmentId::new(10);
853 1
        let to_write = CreateSegment::new("some", seg_id, 20);
854 1
        to_write.write(&mut buffer).unwrap();
855 1
        let mut to_read = CreateSegment::default();
856 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
857 1
        to_read.read(&mut cursor).unwrap();
858 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
859 1
        assert_eq!(to_read.name, "some");
860 1
        assert_eq!(to_read.segment_id, seg_id);
861 1
        assert_eq!(to_read.first_page, 20);
862 1
    }
863

864
    #[test]
865 1
    fn read_write_drop_segment() {
866 1
        let mut buffer = Vec::<u8>::new();
867 1
        let seg_id = SegmentId::new(20);
868 1
        let to_write = DropSegment::new("some", seg_id);
869 1
        to_write.write(&mut buffer).unwrap();
870 1
        let mut to_read = DropSegment::default();
871 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
872 1
        to_read.read(&mut cursor).unwrap();
873 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
874 1
        assert_eq!(to_read.name, "some");
875 1
        assert_eq!(to_read.segment_id, seg_id);
876 1
    }
877

878
    #[test]
879 1
    fn read_write_metadata() {
880 1
        let mut buffer = Vec::<u8>::new();
881 1
        let meta_id = vec![10, 3];
882 1
        let to_write = Metadata::new(&TxStrategy::VersionOnWrite, meta_id.clone());
883 1
        to_write.write(&mut buffer).unwrap();
884 1
        let mut to_read = Metadata::new(&TxStrategy::LastWin, Vec::new());
885 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
886 1
        to_read.read(&mut cursor).unwrap();
887 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
888 1
        assert_eq!(to_read.strategy, TxStrategy::VersionOnWrite);
889 1
        assert_eq!(to_read.meta_id, meta_id);
890 1
    }
891

892
    #[test]
893 1
    fn read_write_new_segment_page() {
894 1
        let mut buffer = Vec::<u8>::new();
895 1
        let seg_id = SegmentId::new(10);
896 1
        let to_write = NewSegmentPage::new(seg_id, 20, 30);
897 1
        to_write.write(&mut buffer).unwrap();
898 1
        let mut to_read = NewSegmentPage::default();
899 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
900 1
        to_read.read(&mut cursor).unwrap();
901 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
902 1
        assert_eq!(to_read.segment, seg_id);
903 1
        assert_eq!(to_read.page, 20);
904 1
        assert_eq!(to_read.previous, 30);
905 1
    }
906

907
    #[test]
908 1
    fn journal_log_and_recover() {
909 1
        let file = Builder::new()
910
            .prefix("journal_test")
911
            .suffix(".persy")
912
            .tempfile()
913
            .unwrap()
914
            .reopen()
915 1
            .unwrap();
916 1
        let disc = Box::new(DiscRef::new(file).unwrap());
917 1
        let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
918 1
        let rp = Journal::init(&allocator).unwrap();
919 1
        let journal = Journal::new(&Arc::new(allocator), rp).unwrap();
920 1
        let seg_id = SegmentId::new(10);
921 1
        let rec = InsertRecord::new(seg_id, &RecRef::new(1, 1), 1);
922 1
        let id = JournalId::new(1, 20);
923 1
        journal.log(&rec, &id).unwrap();
924 1
        journal.log(&rec, &id).unwrap();
925 1
        journal.recover(|e, _| assert_eq!(e.get_type(), 2)).unwrap();
926 1
    }
927
}

Read our documentation on viewing source code .

Loading