1
use crate::transaction::{
2
    Cleanup, Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata, NewSegmentPage,
3
    PrepareCommit, ReadRecord, Rollback, Transaction, UpdateRecord,
4
};
5
use crate::{
6
    allocator::Allocator,
7
    config::TxStrategy,
8
    discref::{Page, PageOps, ReadPage, PAGE_METADATA_SIZE},
9
    error::PRes,
10
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
11
    io::{
12
        read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite,
13
        InfallibleWriteFormat, InfallibleWriteVarInt,
14
    },
15
    persy::RecoverStatus,
16
};
17
use std::{
18
    collections::hash_map::HashMap,
19
    str,
20
    sync::{Arc, Mutex, MutexGuard},
21
};
22

23
pub const JOURNAL_PAGE_EXP: u8 = 10; // 2^10
24
const JOURNAL_PAGE_SIZE: u32 = (1 << JOURNAL_PAGE_EXP) - PAGE_METADATA_SIZE; // 2^ 10 -2 size - page header
25
const JOURNAL_PAGE_NEXT_OFFSET: u32 = 0;
26
const JOURNAL_PAGE_PREV_OFFSET: u32 = 8;
27
const JOURNAL_PAGE_CONTENT_OFFSET: u32 = 16;
28

29
struct StartListEntry {
30
    next: Option<JournalId>,
31
    prev: Option<JournalId>,
32
}
33

34
impl StartListEntry {
35 1
    pub fn new(prev: Option<JournalId>) -> StartListEntry {
36 1
        StartListEntry { next: None, prev }
37 1
    }
38
}
39

40
struct StartList {
41
    transactions: HashMap<JournalId, StartListEntry>,
42
    last: Option<JournalId>,
43
}
44

45
impl StartList {
46 1
    fn new() -> StartList {
47 1
        StartList {
48 1
            transactions: HashMap::new(),
49 1
            last: None,
50
        }
51 1
    }
52

53 1
    fn push(&mut self, id: &JournalId) {
54 1
        self.transactions
55 1
            .insert(id.clone(), StartListEntry::new(self.last.clone()));
56 1
        if let Some(ref lst) = self.last {
57 1
            self.transactions.get_mut(lst).unwrap().next = Some(id.clone());
58
        }
59 1
        self.last = Some(id.clone());
60 1
    }
61

62 1
    fn remove(&mut self, id: &JournalId) -> bool {
63 1
        if let Some(entry) = self.transactions.remove(id) {
64 1
            if let Some(ref next) = entry.next {
65 1
                self.transactions.get_mut(next).unwrap().prev = entry.prev.clone();
66
            }
67 1
            if let Some(ref prev) = entry.prev {
68 1
                self.transactions.get_mut(prev).unwrap().next = entry.next.clone();
69
            }
70 1
            if let Some(ref l) = self.last {
71 1
                if l == id {
72 1
                    self.last = entry.prev.clone();
73
                }
74
            }
75 1
            entry.prev.is_none()
76
        } else {
77 1
            false
78
        }
79 1
    }
80
}
81

82
struct JournalPagesToFree {
83
    free_tx_id: JournalId,
84
    pages: Vec<u64>,
85
}
86

87
struct JournalShared {
88
    root: u64,
89
    first_page: u64,
90
    last_page: u64,
91
    last_pos: u32,
92
    starts: StartList,
93
    current: Page,
94
    last_flush: u8,
95
    to_clear: Vec<JournalId>,
96
    to_free: Option<JournalPagesToFree>,
97
}
98

99
/// Journal segment is the area where the transactional log is kept
100
pub struct Journal {
101
    allocator: Arc<Allocator>,
102
    journal: Mutex<JournalShared>,
103
}
104

105
pub(crate) trait JournalEntry {
106
    fn get_type(&self) -> u8;
107
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()>;
108
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()>;
109
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus>;
110
}
111

112
pub struct Start {}
113

114 1
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
115
pub struct JournalId {
116 1
    page: u64,
117 1
    pos: u32,
118
}
119

120 1
fn recover_entry<T>(entry: &mut dyn JournalEntry, page: &mut ReadPage, found: &mut T, id: &JournalId) -> PRes<()>
121
where
122
    T: FnMut(&dyn JournalEntry, &JournalId),
123
{
124 1
    entry.read(page)?;
125 1
    found(entry, id);
126 1
    Ok(())
127 1
}
128

129
impl Journal {
130 1
    pub fn new(all: &Arc<Allocator>, page: u64) -> PRes<Journal> {
131 1
        let first_page;
132
        let last_flush;
133 1
        let mut buffer_0 = [0; 11];
134 1
        let mut buffer_1 = [0; 11];
135

136
        let current;
137
        {
138 1
            let mut page = all.load_page(page)?;
139 1
            page.read_exact(&mut buffer_0);
140 1
            page.read_exact(&mut buffer_1);
141 1
            let (last_flush_ret, first) = double_buffer_check(&buffer_0, &buffer_1);
142 1
            last_flush = last_flush_ret;
143 1
            first_page = read_u64(if first { &buffer_0[0..8] } else { &buffer_1[0..8] });
144 1
        }
145 1
        current = if first_page != 0 {
146 1
            all.write_page(first_page)?
147 0
        } else {
148
            // Empty 0 sized page
149 0
            Page::new(Vec::new(), 0, 0, 0)
150
        };
151 1
        let journal = JournalShared {
152 1
            root: page,
153 1
            first_page,
154 1
            last_page: first_page,
155
            last_pos: 0,
156 1
            starts: StartList::new(),
157 1
            current,
158
            last_flush,
159 1
            to_clear: Vec::new(),
160 1
            to_free: None,
161 0
        };
162

163 1
        Ok(Journal {
164 1
            allocator: all.clone(),
165 1
            journal: Mutex::new(journal),
166 0
        })
167 1
    }
168

169 1
    pub fn init(allocator: &Allocator) -> PRes<u64> {
170 1
        let mut root_page = allocator.allocate(5)?;
171 1
        let first_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
172 1
        let mut buffer = [0; 11];
173 1
        write_u64(&mut buffer[0..8], first_page.get_index());
174 1
        prepare_buffer_flush(&mut buffer, 0);
175 1
        let root_page_index = root_page.get_index();
176 1
        root_page.write_all(&buffer);
177 1
        allocator.flush_page(root_page)?;
178 1
        Ok(root_page_index)
179 1
    }
180

181 1
    pub fn start(&self) -> PRes<JournalId> {
182 1
        let val = self.internal_log(&Start::new(), &JournalId::new(0, 0), false)?;
183 1
        let id = JournalId::new(val.0, val.1);
184 1
        self.journal.lock()?.starts.push(&id);
185 1
        Ok(id)
186 1
    }
187

188 1
    pub(crate) fn prepare(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
189 1
        self.internal_log(entry, id, true)?;
190 1
        Ok(())
191 1
    }
192

193 1
    pub(crate) fn end(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
194 1
        self.internal_log(entry, id, true)?;
195
        // TODO:optimization keep in memory last page and move the flush here.
196 1
        Ok(())
197 1
    }
198

199 1
    pub fn clear_in_queque(&self) -> PRes<()> {
200 1
        let mut pages_to_free = Vec::new();
201
        {
202 1
            let mut lock = self.journal.lock()?;
203 1
            let ids = lock.to_clear.clone();
204 1
            lock.to_clear.clear();
205 1
            for id in ids {
206 1
                if lock.starts.remove(&id) {
207 1
                    let first_page = lock.first_page;
208 1
                    let mut free_cursor = id.page;
209 0
                    loop {
210
                        let read;
211
                        {
212 1
                            let mut cur = self.allocator.load_page(free_cursor)?;
213 1
                            cur.seek(JOURNAL_PAGE_PREV_OFFSET)?;
214 1
                            read = cur.read_u64();
215 1
                        }
216 1
                        if free_cursor != id.page {
217 0
                            pages_to_free.push(free_cursor)
218
                        }
219 1
                        if free_cursor == first_page {
220
                            break;
221
                        }
222 0
                        free_cursor = read;
223
                    }
224

225 1
                    let mut buffer = [0; 11];
226 1
                    write_u64(&mut buffer[0..8], id.page);
227 1
                    let (flush, offset) = prepare_buffer_flush(&mut buffer, lock.last_flush);
228 1
                    lock.last_flush = flush;
229 1
                    let mut root_page = self.allocator.write_page(lock.root)?;
230 1
                    root_page.seek(offset)?;
231 1
                    root_page.write_all(&buffer);
232 1
                    self.allocator.flush_page(root_page)?;
233 1
                    lock.first_page = id.page;
234 1
                }
235 1
            }
236 1
        }
237 1
        self.free_pages_tx(pages_to_free)?;
238 1
        Ok(())
239 1
    }
240 1
    pub fn free_pages_tx(&self, pages_to_free: Vec<u64>) -> PRes<()> {
241 1
        let mut prev_tx_id = None;
242
        {
243
            //Free the pages of previous clear, logged 2 tx ahead, now synced because of the second
244
            //tx
245 1
            let mut lock = self.journal.lock()?;
246 1
            if let Some(to_free) = &lock.to_free {
247 0
                for page in &to_free.pages {
248 0
                    self.allocator.free(*page)?;
249
                }
250 0
                prev_tx_id = Some(to_free.free_tx_id.clone());
251
            }
252 1
            lock.to_free = None;
253 1
        }
254 1
        if let Some(id) = prev_tx_id {
255
            // Enqueue for clear the free pages journal pages tx
256 0
            self.finished_to_clean(&[id])?;
257
        }
258 1
        if !pages_to_free.is_empty() {
259
            // Log the pages removed from the journal for free
260 0
            let id = self.start()?;
261 0
            for page in &pages_to_free {
262 0
                self.log(&FreedPage::new(*page), &id)?;
263
            }
264 0
            self.log(&PrepareCommit::new(), &id)?;
265 0
            self.end(&Commit::new(), &id)?;
266 0
            let mut lock = self.journal.lock()?;
267 0
            lock.to_free = Some(JournalPagesToFree {
268 0
                free_tx_id: id,
269 0
                pages: pages_to_free,
270 0
            });
271 0
        }
272 1
        Ok(())
273 1
    }
274

275 1
    pub fn finished_to_clean(&self, ids: &[JournalId]) -> PRes<()> {
276 1
        for id in ids {
277 1
            self.log(&Cleanup::new(), id)?;
278
        }
279 1
        let mut lock = self.journal.lock()?;
280 1
        lock.to_clear.extend_from_slice(ids);
281 1
        Ok(())
282 1
    }
283

284 1
    pub(crate) fn log(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
285 1
        self.internal_log(entry, id, false)?;
286 1
        Ok(())
287 1
    }
288

289 1
    fn internal_log(&self, entry: &dyn JournalEntry, id: &JournalId, flush: bool) -> PRes<(u64, u32)> {
290 1
        let mut buffer = Vec::<u8>::new();
291 1
        buffer.write_u8(entry.get_type());
292 1
        buffer.write_u64(id.page);
293 1
        buffer.write_u32(id.pos);
294 1
        entry.write(&mut buffer)?;
295

296
        let cur_page;
297
        let cur_pos;
298
        {
299 1
            let mut jr = self.journal.lock()?;
300 1
            self.required_space(buffer.len() as u32, &mut jr)?;
301 1
            cur_page = jr.last_page;
302 1
            cur_pos = jr.last_pos;
303 1
            jr.current.seek(cur_pos)?;
304 1
            jr.current.write_all(&*buffer);
305 1
            if flush {
306 1
                self.allocator.flush_page(jr.current.clone())?;
307
            }
308 1
            jr.last_pos += buffer.len() as u32;
309 1
        }
310 1
        Ok((cur_page, cur_pos))
311 1
    }
312

313 1
    pub(crate) fn recover<T>(&self, mut found: T) -> PRes<Vec<u64>>
314
    where
315
        T: FnMut(&dyn JournalEntry, &JournalId),
316
    {
317 1
        let mut journal_pages = Vec::new();
318 1
        let mut jr = self.journal.lock()?;
319 1
        let mut cur_page = jr.first_page;
320 1
        jr.last_page = jr.first_page;
321 1
        journal_pages.push(cur_page);
322 1
        let mut page = self.allocator.load_page(cur_page)?;
323 1
        page.seek(JOURNAL_PAGE_CONTENT_OFFSET)?;
324 1
        loop {
325 1
            let tp = page.read_u8();
326 1
            if tp == 0 {
327 1
                let last_pos = page.cursor_pos() as u32;
328 1
                page.seek(JOURNAL_PAGE_NEXT_OFFSET)?;
329 1
                cur_page = page.read_u64();
330 1
                if cur_page == 0 {
331 1
                    jr.last_pos = last_pos - 1;
332
                    break;
333
                }
334 0
                journal_pages.push(cur_page);
335 0
                page = self.allocator.load_page(cur_page)?;
336 0
                page.seek(JOURNAL_PAGE_CONTENT_OFFSET)?;
337 0
                jr.last_page = cur_page;
338
            } else {
339 1
                let page_id = page.read_u64();
340 1
                let pos = page.read_u32();
341 1
                let id = JournalId::new(page_id, pos);
342 1
                let ref_page = &mut page;
343 1
                let ref_found = &mut found;
344 1
                match tp {
345
                    //The Start entry has no valid id, should not be recovered
346 1
                    1 => Start::new().read(&mut page)?,
347 1
                    2 => recover_entry(&mut InsertRecord::default(), ref_page, ref_found, &id)?,
348 1
                    3 => recover_entry(&mut PrepareCommit::default(), ref_page, ref_found, &id)?,
349 1
                    4 => recover_entry(&mut Commit::default(), ref_page, ref_found, &id)?,
350 1
                    5 => recover_entry(&mut UpdateRecord::default(), ref_page, ref_found, &id)?,
351 1
                    6 => recover_entry(&mut DeleteRecord::default(), ref_page, ref_found, &id)?,
352 1
                    7 => recover_entry(&mut Rollback::default(), ref_page, ref_found, &id)?,
353 1
                    8 => recover_entry(&mut CreateSegment::default(), ref_page, ref_found, &id)?,
354 0
                    9 => recover_entry(&mut DropSegment::default(), ref_page, ref_found, &id)?,
355 0
                    10 => recover_entry(&mut ReadRecord::default(), ref_page, ref_found, &id)?,
356 1
                    11 => recover_entry(&mut Metadata::default(), ref_page, ref_found, &id)?,
357 1
                    12 => recover_entry(&mut FreedPage::default(), ref_page, ref_found, &id)?,
358 0
                    13 => recover_entry(&mut NewSegmentPage::default(), ref_page, ref_found, &id)?,
359 1
                    14 => recover_entry(&mut Cleanup::default(), ref_page, ref_found, &id)?,
360 0
                    _ => panic!(" wrong log entry {} ", tp),
361
                };
362
            }
363
        }
364 1
        jr.current = self.allocator.write_page(jr.last_page)?;
365 1
        Ok(journal_pages)
366 1
    }
367

368 1
    fn required_space(&self, space: u32, jr: &mut MutexGuard<JournalShared>) -> PRes<()> {
369
        // if there is no page or the  'current content' + 'space required' + 'end marker' is more
370
        // than the page, allocate new page and link the previous one
371 1
        if jr.last_pos + space + 1 >= JOURNAL_PAGE_SIZE as u32 {
372 1
            let prev = jr.last_page;
373 1
            let last_pos = jr.last_pos;
374 1
            let new_page = self.allocator.allocate(JOURNAL_PAGE_EXP)?;
375 1
            if prev != 0 {
376 1
                jr.current.seek(JOURNAL_PAGE_NEXT_OFFSET)?;
377 1
                jr.current.write_u64(new_page.get_index());
378 1
                jr.current.seek(last_pos)?;
379 1
                jr.current.write_u8(0);
380 1
                self.allocator.flush_page(jr.current.clone())?;
381
            }
382 1
            jr.last_page = new_page.get_index();
383 1
            jr.current = new_page;
384 1
            jr.current.seek(JOURNAL_PAGE_PREV_OFFSET)?;
385 1
            jr.current.write_u64(prev);
386 1
            self.allocator.flush_page(jr.current.clone())?;
387 1
            jr.last_pos = JOURNAL_PAGE_CONTENT_OFFSET;
388 1
        }
389 1
        Ok(())
390 1
    }
391
}
392

393
impl JournalEntry for DeleteRecord {
394 1
    fn get_type(&self) -> u8 {
395
        6
396 1
    }
397

398 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
399 1
        buffer.write_varint_u32(self.segment);
400 1
        buffer.write_varint_u64(self.recref.page);
401 1
        buffer.write_varint_u32(self.recref.pos);
402 1
        buffer.write_varint_u16(self.version);
403 1
        Ok(())
404 1
    }
405

406 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
407 1
        self.segment = buffer.read_varint_u32();
408 1
        self.recref.page = buffer.read_varint_u64();
409 1
        self.recref.pos = buffer.read_varint_u32();
410 1
        self.version = buffer.read_varint_u16();
411 1
        Ok(())
412 1
    }
413

414 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
415 1
        tx.recover_delete(self);
416 1
        Ok(RecoverStatus::Started)
417 1
    }
418
}
419

420
impl JournalEntry for ReadRecord {
421 1
    fn get_type(&self) -> u8 {
422
        10
423 1
    }
424

425 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
426 1
        buffer.write_varint_u32(self.segment);
427 1
        buffer.write_varint_u64(self.recref.page);
428 1
        buffer.write_varint_u32(self.recref.pos);
429 1
        buffer.write_varint_u16(self.version);
430 1
        Ok(())
431 1
    }
432

433 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
434 1
        self.segment = buffer.read_varint_u32();
435 1
        self.recref.page = buffer.read_varint_u64();
436 1
        self.recref.pos = buffer.read_varint_u32();
437 1
        self.version = buffer.read_varint_u16();
438 1
        Ok(())
439 1
    }
440

441 0
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
442 0
        tx.recover_read(self);
443 0
        Ok(RecoverStatus::Started)
444 0
    }
445
}
446
impl JournalEntry for UpdateRecord {
447 1
    fn get_type(&self) -> u8 {
448
        5
449 1
    }
450

451 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
452 1
        buffer.write_varint_u32(self.segment);
453 1
        buffer.write_varint_u64(self.recref.page);
454 1
        buffer.write_varint_u32(self.recref.pos);
455 1
        buffer.write_varint_u64(self.record_page);
456 1
        buffer.write_varint_u16(self.version);
457 1
        Ok(())
458 1
    }
459

460 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
461 1
        self.segment = buffer.read_varint_u32();
462 1
        self.recref.page = buffer.read_varint_u64();
463 1
        self.recref.pos = buffer.read_varint_u32();
464 1
        self.record_page = buffer.read_varint_u64();
465 1
        self.version = buffer.read_varint_u16();
466 1
        Ok(())
467 1
    }
468

469 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
470 1
        tx.recover_update(self);
471 1
        Ok(RecoverStatus::Started)
472 1
    }
473
}
474

475
impl JournalEntry for InsertRecord {
476 1
    fn get_type(&self) -> u8 {
477
        2
478 1
    }
479

480 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
481 1
        buffer.write_varint_u32(self.segment);
482 1
        buffer.write_varint_u64(self.recref.page);
483 1
        buffer.write_varint_u32(self.recref.pos);
484 1
        buffer.write_varint_u64(self.record_page);
485 1
        Ok(())
486 1
    }
487

488 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
489 1
        self.segment = buffer.read_varint_u32();
490 1
        self.recref.page = buffer.read_varint_u64();
491 1
        self.recref.pos = buffer.read_varint_u32();
492 1
        self.record_page = buffer.read_varint_u64();
493 1
        Ok(())
494 1
    }
495

496 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
497 1
        tx.recover_insert(self);
498 1
        Ok(RecoverStatus::Started)
499 1
    }
500
}
501

502
impl JournalEntry for PrepareCommit {
503 1
    fn get_type(&self) -> u8 {
504
        3
505 1
    }
506

507 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
508 1
        Ok(())
509 1
    }
510

511 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
512 1
        Ok(())
513 1
    }
514

515 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
516 1
        Ok(RecoverStatus::PrepareCommit)
517 1
    }
518
}
519

520
impl JournalEntry for Commit {
521 1
    fn get_type(&self) -> u8 {
522
        4
523 1
    }
524

525 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
526 1
        Ok(())
527 1
    }
528

529 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
530 1
        Ok(())
531 1
    }
532

533 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
534 1
        Ok(RecoverStatus::Commit)
535 1
    }
536
}
537

538
impl JournalEntry for Cleanup {
539 1
    fn get_type(&self) -> u8 {
540
        14
541 1
    }
542

543 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
544 1
        Ok(())
545 1
    }
546

547 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
548 1
        Ok(())
549 1
    }
550

551 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
552 1
        Ok(RecoverStatus::Cleanup)
553 1
    }
554
}
555

556
impl JournalEntry for Metadata {
557 1
    fn get_type(&self) -> u8 {
558
        11
559 1
    }
560

561 1
    fn write(&self, write: &mut dyn InfallibleWrite) -> PRes<()> {
562 1
        write.write_varint_u8(self.strategy.value());
563 1
        let len = self.meta_id.len();
564 1
        write.write_varint_u16(len as u16);
565 1
        write.write_all(&self.meta_id);
566 1
        Ok(())
567 1
    }
568

569 1
    fn read(&mut self, read: &mut dyn InfallibleRead) -> PRes<()> {
570 1
        self.strategy = TxStrategy::from_value(read.read_varint_u8());
571 1
        let len = read.read_varint_u16();
572 1
        let mut slice: Vec<u8> = vec![0; len as usize];
573 1
        read.read_exact(&mut slice[0..len as usize]);
574 1
        self.meta_id = slice;
575 1
        Ok(())
576 1
    }
577

578 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
579 1
        tx.recover_metadata(self);
580 1
        Ok(RecoverStatus::Started)
581 1
    }
582
}
583

584
impl JournalEntry for Start {
585 1
    fn get_type(&self) -> u8 {
586
        1
587 1
    }
588

589 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
590 1
        Ok(())
591 1
    }
592

593 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
594 1
        Ok(())
595 1
    }
596

597 0
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
598 0
        panic!("this should never be called")
599
    }
600
}
601

602
impl JournalEntry for Rollback {
603 1
    fn get_type(&self) -> u8 {
604
        7
605 1
    }
606

607 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
608 1
        Ok(())
609 1
    }
610

611 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
612 1
        Ok(())
613 1
    }
614 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
615 1
        Ok(RecoverStatus::Rollback)
616 1
    }
617
}
618

619
impl JournalEntry for CreateSegment {
620 1
    fn get_type(&self) -> u8 {
621
        8
622 1
    }
623

624 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
625 1
        buffer.write_varint_u32(self.segment_id);
626 1
        buffer.write_varint_u64(self.first_page);
627 1
        buffer.write_varint_u16(self.name.len() as u16);
628 1
        buffer.write_all(self.name.as_bytes());
629 1
        Ok(())
630 1
    }
631

632 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
633 1
        self.segment_id = buffer.read_varint_u32();
634 1
        self.first_page = buffer.read_varint_u64();
635 1
        let string_size = buffer.read_varint_u16();
636 1
        let mut slice: Vec<u8> = vec![0; string_size as usize];
637 1
        buffer.read_exact(&mut slice[0..string_size as usize]);
638 1
        self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
639 1
        Ok(())
640 1
    }
641

642 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
643 1
        tx.recover_add(self);
644 1
        Ok(RecoverStatus::Started)
645 1
    }
646
}
647

648
impl JournalEntry for DropSegment {
649 1
    fn get_type(&self) -> u8 {
650
        9
651 1
    }
652

653 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
654 1
        buffer.write_varint_u32(self.segment_id);
655 1
        buffer.write_varint_u16(self.name.len() as u16);
656 1
        buffer.write_all(self.name.as_bytes());
657 1
        Ok(())
658 1
    }
659

660 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
661 1
        self.segment_id = buffer.read_varint_u32();
662 1
        let string_size = buffer.read_varint_u16();
663 1
        let mut slice: Vec<u8> = vec![0; string_size as usize];
664 1
        buffer.read_exact(&mut slice[0..string_size as usize]);
665 1
        self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
666 1
        Ok(())
667 1
    }
668

669 0
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
670 0
        tx.recover_drop(self);
671 0
        Ok(RecoverStatus::Started)
672 0
    }
673
}
674

675
impl JournalEntry for FreedPage {
676 1
    fn get_type(&self) -> u8 {
677
        12
678 1
    }
679

680 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
681 1
        buffer.write_u64(self.page);
682 1
        Ok(())
683 1
    }
684

685 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
686 1
        self.page = buffer.read_u64();
687 1
        Ok(())
688 1
    }
689

690 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
691 1
        tx.recover_freed_page(self);
692 1
        Ok(RecoverStatus::Started)
693 1
    }
694
}
695

696
impl JournalEntry for NewSegmentPage {
697 1
    fn get_type(&self) -> u8 {
698
        13
699 1
    }
700

701 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
702 1
        buffer.write_varint_u32(self.segment);
703 1
        buffer.write_varint_u64(self.page);
704 1
        buffer.write_varint_u64(self.previous);
705 1
        Ok(())
706 1
    }
707

708 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
709 1
        self.segment = buffer.read_varint_u32();
710 1
        self.page = buffer.read_varint_u64();
711 1
        self.previous = buffer.read_varint_u64();
712 1
        Ok(())
713 1
    }
714

715 0
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
716 0
        tx.recover_new_segment_page(self);
717 0
        Ok(RecoverStatus::Started)
718 0
    }
719
}
720
impl JournalId {
721 1
    pub fn new(page: u64, pos: u32) -> JournalId {
722 1
        JournalId { page, pos }
723 1
    }
724
}
725

726
impl Start {
727 0
    fn new() -> Start {
728
        Start {}
729 1
    }
730
}
731

732
#[cfg(test)]
733
mod tests {
734
    use super::{Journal, JournalEntry, JournalId, StartList};
735
    use crate::transaction::{
736
        CreateSegment, DeleteRecord, DropSegment, InsertRecord, Metadata, NewSegmentPage, ReadRecord, UpdateRecord,
737
    };
738
    use crate::{
739
        allocator::Allocator,
740
        config::{Config, TxStrategy},
741
        discref::DiscRef,
742
        id::RecRef,
743
    };
744
    use std::{
745
        io::{Cursor, Seek, SeekFrom},
746
        sync::Arc,
747
    };
748
    use tempfile::Builder;
749

750
    #[test]
751 1
    fn start_list_add_remove() {
752 1
        let mut start = StartList::new();
753 1
        start.push(&JournalId::new(1, 2));
754 1
        start.push(&JournalId::new(1, 4));
755 1
        assert!(start.remove(&JournalId::new(1, 2)));
756 1
        assert!(start.remove(&JournalId::new(1, 4)));
757

758 1
        start.push(&JournalId::new(1, 2));
759 1
        start.push(&JournalId::new(1, 4));
760 1
        assert!(!start.remove(&JournalId::new(1, 4)));
761 1
        assert!(start.remove(&JournalId::new(1, 2)));
762 1
    }
763

764
    #[test]
765 1
    fn start_list_add_remove_other_order() {
766 1
        let mut start = StartList::new();
767 1
        start.push(&JournalId::new(1, 1));
768 1
        start.push(&JournalId::new(1, 2));
769 1
        assert!(!start.remove(&JournalId::new(1, 2)));
770 1
        start.push(&JournalId::new(1, 3));
771 1
        assert!(!start.remove(&JournalId::new(1, 3)));
772 1
        assert!(start.remove(&JournalId::new(1, 1)));
773 1
    }
774

775
    #[test]
776 1
    fn read_write_insert_record() {
777 1
        let mut buffer = Vec::<u8>::new();
778 1
        let to_write = InsertRecord::new(10, &RecRef::new(20, 10), 3);
779

780 1
        to_write.write(&mut buffer).unwrap();
781

782 1
        let mut to_read = InsertRecord::new(0, &RecRef::new(0, 0), 0);
783 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
784 1
        to_read.read(&mut cursor).unwrap();
785 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
786 1
        assert_eq!(to_read.segment, 10);
787 1
        assert_eq!(to_read.recref.page, 20);
788 1
        assert_eq!(to_read.recref.pos, 10);
789 1
        assert_eq!(to_read.record_page, 3);
790 1
    }
791

792
    #[test]
793 1
    fn read_write_insert_read() {
794 1
        let mut buffer = Vec::<u8>::new();
795 1
        let to_write = ReadRecord::new(10, &RecRef::new(20, 10), 3);
796

797 1
        to_write.write(&mut buffer).unwrap();
798

799 1
        let mut to_read = ReadRecord::new(0, &RecRef::new(0, 0), 0);
800 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
801 1
        to_read.read(&mut cursor).unwrap();
802 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
803 1
        assert_eq!(to_read.segment, 10);
804 1
        assert_eq!(to_read.recref.page, 20);
805 1
        assert_eq!(to_read.recref.pos, 10);
806 1
        assert_eq!(to_read.version, 3);
807 1
    }
808

809
    #[test]
810 1
    fn read_write_update_record() {
811 1
        let mut buffer = Vec::<u8>::new();
812 1
        let to_write = UpdateRecord::new(10, &RecRef::new(20, 10), 3, 1);
813

814 1
        to_write.write(&mut buffer).unwrap();
815

816 1
        let mut to_read = UpdateRecord::new(0, &RecRef::new(0, 0), 0, 0);
817 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
818 1
        to_read.read(&mut cursor).unwrap();
819 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
820 1
        assert_eq!(to_read.segment, 10);
821 1
        assert_eq!(to_read.recref.page, 20);
822 1
        assert_eq!(to_read.recref.pos, 10);
823 1
        assert_eq!(to_read.record_page, 3);
824 1
        assert_eq!(to_read.version, 1);
825 1
    }
826

827
    #[test]
828 1
    fn read_write_delete_record() {
829 1
        let mut buffer = Vec::<u8>::new();
830 1
        let to_write = DeleteRecord::new(10, &RecRef::new(20, 10), 1);
831

832 1
        to_write.write(&mut buffer).unwrap();
833

834 1
        let mut to_read = DeleteRecord::new(0, &RecRef::new(0, 0), 1);
835 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
836 1
        to_read.read(&mut cursor).unwrap();
837 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
838 1
        assert_eq!(to_read.segment, 10);
839 1
        assert_eq!(to_read.recref.page, 20);
840 1
        assert_eq!(to_read.recref.pos, 10);
841 1
        assert_eq!(to_read.version, 1);
842 1
    }
843

844
    #[test]
845 1
    fn read_write_create_segment() {
846 1
        let mut buffer = Vec::<u8>::new();
847 1
        let to_write = CreateSegment::new("some", 10, 20);
848 1
        to_write.write(&mut buffer).unwrap();
849 1
        let mut to_read = CreateSegment::new("", 0, 0);
850 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
851 1
        to_read.read(&mut cursor).unwrap();
852 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
853 1
        assert_eq!(to_read.name, "some");
854 1
        assert_eq!(to_read.segment_id, 10);
855 1
        assert_eq!(to_read.first_page, 20);
856 1
    }
857

858
    #[test]
859 1
    fn read_write_drop_segment() {
860 1
        let mut buffer = Vec::<u8>::new();
861 1
        let to_write = DropSegment::new("some", 20);
862 1
        to_write.write(&mut buffer).unwrap();
863 1
        let mut to_read = DropSegment::new("", 0);
864 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
865 1
        to_read.read(&mut cursor).unwrap();
866 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
867 1
        assert_eq!(to_read.name, "some");
868 1
        assert_eq!(to_read.segment_id, 20);
869 1
    }
870

871
    #[test]
872 1
    fn read_write_metadata() {
873 1
        let mut buffer = Vec::<u8>::new();
874 1
        let meta_id = vec![10, 3];
875 1
        let to_write = Metadata::new(&TxStrategy::VersionOnWrite, meta_id.clone());
876 1
        to_write.write(&mut buffer).unwrap();
877 1
        let mut to_read = Metadata::new(&TxStrategy::LastWin, Vec::new());
878 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
879 1
        to_read.read(&mut cursor).unwrap();
880 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
881 1
        assert_eq!(to_read.strategy, TxStrategy::VersionOnWrite);
882 1
        assert_eq!(to_read.meta_id, meta_id);
883 1
    }
884

885
    #[test]
886 1
    fn read_write_new_segment_page() {
887 1
        let mut buffer = Vec::<u8>::new();
888 1
        let to_write = NewSegmentPage::new(10, 20, 30);
889 1
        to_write.write(&mut buffer).unwrap();
890 1
        let mut to_read = NewSegmentPage::new(0, 0, 0);
891 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
892 1
        to_read.read(&mut cursor).unwrap();
893 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
894 1
        assert_eq!(to_read.segment, 10);
895 1
        assert_eq!(to_read.page, 20);
896 1
        assert_eq!(to_read.previous, 30);
897 1
    }
898

899
    #[test]
900 1
    fn journal_log_and_recover() {
901 1
        let file = Builder::new()
902
            .prefix("journal_test")
903
            .suffix(".persy")
904
            .tempfile()
905
            .unwrap()
906
            .reopen()
907 1
            .unwrap();
908 1
        let disc = Box::new(DiscRef::new(file).unwrap());
909 1
        let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
910 1
        let rp = Journal::init(&allocator).unwrap();
911 1
        let journal = Journal::new(&Arc::new(allocator), rp).unwrap();
912 1
        let rec = InsertRecord::new(10, &RecRef::new(1, 1), 1);
913 1
        let id = JournalId::new(1, 20);
914 1
        journal.log(&rec, &id).unwrap();
915 1
        journal.log(&rec, &id).unwrap();
916 1
        journal.recover(|e, _| assert_eq!(e.get_type(), 2)).unwrap();
917 1
    }
918
}

Read our documentation on viewing source code .

Loading