1
use crate::transaction::{
2
    Cleanup, Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata, NewSegmentPage,
3
    PrepareCommit, ReadRecord, Rollback, Transaction, UpdateRecord,
4
};
5
use crate::{
6
    allocator::Allocator,
7
    config::TxStrategy,
8
    discref::{Page, PageOps, ReadPage, PAGE_METADATA_SIZE},
9
    error::PRes,
10
    flush_checksum::{double_buffer_check, write_root_page},
11
    id::SegmentId,
12
    io::{
13
        read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite,
14
        InfallibleWriteFormat, InfallibleWriteVarInt,
15
    },
16
    persy::RecoverStatus,
17
};
18
use std::{
19
    collections::hash_map::HashMap,
20
    str,
21
    sync::{Arc, Mutex, MutexGuard},
22
};
23

24
pub const JOURNAL_PAGE_EXP: u8 = 10; // 2^10
25
const JOURNAL_PAGE_SIZE: u32 = (1 << JOURNAL_PAGE_EXP) - PAGE_METADATA_SIZE; // 2^ 10 -2 size - page header
26
const JOURNAL_PAGE_NEXT_OFFSET: u32 = 0;
27
const JOURNAL_PAGE_PREV_OFFSET: u32 = 8;
28
const JOURNAL_PAGE_CONTENT_OFFSET: u32 = 16;
29
const JOURNAL_ROOT_VERSION_0: u8 = 0;
30
const JOURNAL_ROOT_VERSION: u8 = JOURNAL_ROOT_VERSION_0;
31

32
struct StartListEntry {
33
    next: Option<JournalId>,
34
    prev: Option<JournalId>,
35
}
36

37
impl StartListEntry {
38 1
    pub fn new(prev: Option<JournalId>) -> StartListEntry {
39 1
        StartListEntry { next: None, prev }
40 1
    }
41
}
42

43
struct StartList {
44
    transactions: HashMap<JournalId, StartListEntry>,
45
    last: Option<JournalId>,
46
}
47

48
impl StartList {
49 1
    fn new() -> StartList {
50 1
        StartList {
51 1
            transactions: HashMap::new(),
52 1
            last: None,
53
        }
54 1
    }
55

56 1
    fn push(&mut self, id: &JournalId) {
57 1
        self.transactions
58 1
            .insert(id.clone(), StartListEntry::new(self.last.clone()));
59 1
        if let Some(ref lst) = self.last {
60 1
            self.transactions.get_mut(lst).unwrap().next = Some(id.clone());
61
        }
62 1
        self.last = Some(id.clone());
63 1
    }
64

65 1
    fn remove(&mut self, id: &JournalId) -> bool {
66 1
        if let Some(entry) = self.transactions.remove(id) {
67 1
            if let Some(ref next) = entry.next {
68 1
                self.transactions.get_mut(next).unwrap().prev = entry.prev.clone();
69
            }
70 1
            if let Some(ref prev) = entry.prev {
71 1
                self.transactions.get_mut(prev).unwrap().next = entry.next.clone();
72
            }
73 1
            if let Some(ref l) = self.last {
74 1
                if l == id {
75 1
                    self.last = entry.prev.clone();
76
                }
77
            }
78 1
            entry.prev.is_none()
79
        } else {
80 1
            false
81
        }
82 1
    }
83
}
84

85
struct JournalPagesToFree {
86
    free_tx_id: JournalId,
87
    pages: Vec<u64>,
88
}
89

90
struct JournalShared {
91
    root: u64,
92
    first_page: u64,
93
    last_page: u64,
94
    last_pos: u32,
95
    starts: StartList,
96
    current: Page,
97
    last_flush: u8,
98
    to_clear: Vec<JournalId>,
99
    to_free: Option<JournalPagesToFree>,
100
}
101

102
/// Journal segment is the area where the transactional log is kept
103
pub struct Journal {
104
    allocator: Arc<Allocator>,
105
    journal: Mutex<JournalShared>,
106
}
107

108
pub(crate) trait JournalEntry {
109
    fn get_type(&self) -> u8;
110
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()>;
111
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()>;
112
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus>;
113
}
114

115
pub struct Start {}
116

117 1
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
118
pub struct JournalId {
119 1
    page: u64,
120 1
    pos: u32,
121
}
122

123 1
fn recover_entry<T>(entry: &mut dyn JournalEntry, page: &mut ReadPage, found: &mut T, id: &JournalId) -> PRes<()>
124
where
125
    T: FnMut(&dyn JournalEntry, &JournalId),
126
{
127 1
    entry.read(page)?;
128 1
    found(entry, id);
129 1
    Ok(())
130 1
}
131

132
impl Journal {
133 1
    pub fn new(all: &Arc<Allocator>, page: u64) -> PRes<Journal> {
134 1
        let mut page = all.load_page(page)?;
135 1
        let journal = match page.read_u8() {
136 1
            JOURNAL_ROOT_VERSION_0 => Self::new_version_0(page, all)?,
137 0
            _ => panic!("version not supported"),
138
        };
139 1
        Ok(Journal {
140 1
            allocator: all.clone(),
141 1
            journal: Mutex::new(journal),
142 0
        })
143 1
    }
144 1
    fn new_version_0(mut page: ReadPage, all: &Allocator) -> PRes<JournalShared> {
145 1
        let first_page;
146
        let last_flush;
147 1
        let mut buffer_0 = [0; 11];
148 1
        let mut buffer_1 = [0; 11];
149

150
        let current;
151
        {
152 1
            page.read_exact(&mut buffer_0);
153 1
            page.read_exact(&mut buffer_1);
154 1
            let (last_flush_ret, first) = double_buffer_check(&buffer_0, &buffer_1);
155 1
            last_flush = last_flush_ret;
156 1
            first_page = read_u64(if first { &buffer_0[0..8] } else { &buffer_1[0..8] });
157
        }
158 1
        current = if first_page != 0 {
159 1
            all.write_page(first_page)?
160 0
        } else {
161
            // Empty 0 sized page
162 0
            Page::new(Vec::new(), 0, 0, 0)
163
        };
164 1
        Ok(JournalShared {
165 1
            root: page.get_index(),
166
            first_page,
167
            last_page: first_page,
168
            last_pos: 0,
169 1
            starts: StartList::new(),
170 1
            current,
171
            last_flush,
172 1
            to_clear: Vec::new(),
173 1
            to_free: None,
174 0
        })
175 1
    }
176

177 1
    pub fn init(allocator: &Allocator) -> PRes<u64> {
178 1
        let mut root_page = allocator.allocate(5)?;
179 1
        let root_page_index = root_page.get_index();
180 1
        let first_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
181 1
        let mut buffer = [0; 11];
182 1
        write_u64(&mut buffer[0..8], first_page.get_index());
183 1
        write_root_page(&mut root_page, &mut buffer, JOURNAL_ROOT_VERSION, 0)?;
184 1
        allocator.flush_page(root_page)?;
185 1
        Ok(root_page_index)
186 1
    }
187

188 1
    pub fn start(&self) -> PRes<JournalId> {
189 1
        let val = self.internal_log(&Start::new(), &JournalId::new(0, 0), false)?;
190 1
        let id = JournalId::new(val.0, val.1);
191 1
        self.journal.lock()?.starts.push(&id);
192 1
        Ok(id)
193 1
    }
194

195 1
    pub(crate) fn prepare(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
196 1
        self.internal_log(entry, id, true)?;
197 1
        Ok(())
198 1
    }
199

200 1
    pub(crate) fn end(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
201 1
        self.internal_log(entry, id, true)?;
202
        // TODO:optimization keep in memory last page and move the flush here.
203 1
        Ok(())
204 1
    }
205

206 1
    pub fn clear_in_queque(&self) -> PRes<()> {
207 1
        let mut pages_to_free = Vec::new();
208
        {
209 1
            let mut lock = self.journal.lock()?;
210 1
            let ids = lock.to_clear.clone();
211 1
            lock.to_clear.clear();
212 1
            for id in ids {
213 1
                if lock.starts.remove(&id) {
214 1
                    let first_page = lock.first_page;
215 1
                    let mut free_cursor = id.page;
216 0
                    loop {
217
                        let read;
218
                        {
219 1
                            let mut cur = self.allocator.load_page(free_cursor)?;
220 1
                            cur.seek(JOURNAL_PAGE_PREV_OFFSET);
221 1
                            read = cur.read_u64();
222 1
                        }
223 1
                        if free_cursor != id.page {
224 0
                            pages_to_free.push(free_cursor)
225
                        }
226 1
                        if free_cursor == first_page {
227
                            break;
228
                        }
229 0
                        free_cursor = read;
230
                    }
231

232 1
                    let mut buffer = [0; 11];
233 1
                    write_u64(&mut buffer[0..8], id.page);
234 1
                    let mut root_page = self.allocator.write_page(lock.root)?;
235 1
                    lock.last_flush =
236 1
                        write_root_page(&mut root_page, &mut buffer, JOURNAL_ROOT_VERSION, lock.last_flush)?;
237 1
                    self.allocator.flush_page(root_page)?;
238 1
                    lock.first_page = id.page;
239 1
                }
240 1
            }
241 1
        }
242 1
        self.free_pages_tx(pages_to_free)?;
243 1
        Ok(())
244 1
    }
245 1
    pub fn free_pages_tx(&self, pages_to_free: Vec<u64>) -> PRes<()> {
246 1
        let mut prev_tx_id = None;
247
        {
248
            //Free the pages of previous clear, logged 2 tx ahead, now synced because of the second
249
            //tx
250 1
            let mut lock = self.journal.lock()?;
251 1
            if let Some(to_free) = &lock.to_free {
252 0
                for page in &to_free.pages {
253 0
                    self.allocator.free(*page)?;
254
                }
255 0
                prev_tx_id = Some(to_free.free_tx_id.clone());
256
            }
257 1
            lock.to_free = None;
258 1
        }
259 1
        if let Some(id) = prev_tx_id {
260
            // Enqueue for clear the free pages journal pages tx
261 0
            self.finished_to_clean(&[id])?;
262
        }
263 1
        if !pages_to_free.is_empty() {
264
            // Log the pages removed from the journal for free
265 0
            let id = self.start()?;
266 0
            for page in &pages_to_free {
267 0
                self.log(&FreedPage::new(*page), &id)?;
268
            }
269 0
            self.log(&PrepareCommit::new(), &id)?;
270 0
            self.end(&Commit::new(), &id)?;
271 0
            let mut lock = self.journal.lock()?;
272 0
            lock.to_free = Some(JournalPagesToFree {
273 0
                free_tx_id: id,
274 0
                pages: pages_to_free,
275 0
            });
276 0
        }
277 1
        Ok(())
278 1
    }
279

280 1
    pub fn finished_to_clean(&self, ids: &[JournalId]) -> PRes<()> {
281 1
        for id in ids {
282 1
            self.log(&Cleanup::new(), id)?;
283
        }
284 1
        let mut lock = self.journal.lock()?;
285 1
        lock.to_clear.extend_from_slice(ids);
286 1
        Ok(())
287 1
    }
288

289 1
    pub(crate) fn log(&self, entry: &dyn JournalEntry, id: &JournalId) -> PRes<()> {
290 1
        self.internal_log(entry, id, false)?;
291 1
        Ok(())
292 1
    }
293

294 1
    fn internal_log(&self, entry: &dyn JournalEntry, id: &JournalId, flush: bool) -> PRes<(u64, u32)> {
295 1
        let mut buffer = Vec::<u8>::new();
296 1
        buffer.write_u8(entry.get_type());
297 1
        buffer.write_varint_u64(id.page);
298 1
        buffer.write_varint_u32(id.pos);
299 1
        entry.write(&mut buffer)?;
300

301
        let cur_page;
302
        let cur_pos;
303
        {
304 1
            let mut jr = self.journal.lock()?;
305 1
            self.required_space(buffer.len() as u32, &mut jr)?;
306 1
            cur_page = jr.last_page;
307 1
            cur_pos = jr.last_pos;
308 1
            jr.current.seek(cur_pos);
309 1
            jr.current.write_all(&*buffer);
310 1
            if flush {
311 1
                self.allocator.flush_page(jr.current.clone())?;
312
            }
313 1
            jr.last_pos += buffer.len() as u32;
314 1
        }
315 1
        Ok((cur_page, cur_pos))
316 1
    }
317

318 1
    pub(crate) fn recover<T>(&self, mut found: T) -> PRes<Vec<u64>>
319
    where
320
        T: FnMut(&dyn JournalEntry, &JournalId),
321
    {
322 1
        let mut journal_pages = Vec::new();
323 1
        let mut jr = self.journal.lock()?;
324 1
        let mut cur_page = jr.first_page;
325 1
        jr.last_page = jr.first_page;
326 1
        journal_pages.push(cur_page);
327 1
        let mut page = self.allocator.load_page(cur_page)?;
328 1
        page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
329 1
        loop {
330 1
            let tp = page.read_u8();
331 1
            if tp == 0 {
332 1
                let last_pos = page.cursor_pos() as u32;
333 1
                page.seek(JOURNAL_PAGE_NEXT_OFFSET);
334 1
                cur_page = page.read_u64();
335 1
                if cur_page == 0 {
336 1
                    jr.last_pos = last_pos - 1;
337
                    break;
338
                }
339 0
                journal_pages.push(cur_page);
340 0
                page = self.allocator.load_page(cur_page)?;
341 0
                page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
342 0
                jr.last_page = cur_page;
343
            } else {
344 1
                let page_id = page.read_varint_u64();
345 1
                let pos = page.read_varint_u32();
346 1
                let id = JournalId::new(page_id, pos);
347 1
                let ref_page = &mut page;
348 1
                let ref_found = &mut found;
349 1
                match tp {
350
                    //The Start entry has no valid id, should not be recovered
351 1
                    1 => Start::new().read(&mut page)?,
352 1
                    2 => recover_entry(&mut InsertRecord::default(), ref_page, ref_found, &id)?,
353 1
                    3 => recover_entry(&mut PrepareCommit::default(), ref_page, ref_found, &id)?,
354 1
                    4 => recover_entry(&mut Commit::default(), ref_page, ref_found, &id)?,
355 1
                    5 => recover_entry(&mut UpdateRecord::default(), ref_page, ref_found, &id)?,
356 1
                    6 => recover_entry(&mut DeleteRecord::default(), ref_page, ref_found, &id)?,
357 1
                    7 => recover_entry(&mut Rollback::default(), ref_page, ref_found, &id)?,
358 1
                    8 => recover_entry(&mut CreateSegment::default(), ref_page, ref_found, &id)?,
359 0
                    9 => recover_entry(&mut DropSegment::default(), ref_page, ref_found, &id)?,
360 0
                    10 => recover_entry(&mut ReadRecord::default(), ref_page, ref_found, &id)?,
361 1
                    11 => recover_entry(&mut Metadata::default(), ref_page, ref_found, &id)?,
362 1
                    12 => recover_entry(&mut FreedPage::default(), ref_page, ref_found, &id)?,
363 0
                    13 => recover_entry(&mut NewSegmentPage::default(), ref_page, ref_found, &id)?,
364 1
                    14 => recover_entry(&mut Cleanup::default(), ref_page, ref_found, &id)?,
365 0
                    _ => panic!(" wrong log entry {} ", tp),
366
                };
367
            }
368
        }
369 1
        jr.current = self.allocator.write_page(jr.last_page)?;
370 1
        Ok(journal_pages)
371 1
    }
372

373 1
    fn required_space(&self, space: u32, jr: &mut MutexGuard<JournalShared>) -> PRes<()> {
374
        // if there is no page or the  'current content' + 'space required' + 'end marker' is more
375
        // than the page, allocate new page and link the previous one
376 1
        if jr.last_pos + space + 1 >= JOURNAL_PAGE_SIZE as u32 {
377 1
            let prev = jr.last_page;
378 1
            let last_pos = jr.last_pos;
379 1
            let new_page = self.allocator.allocate(JOURNAL_PAGE_EXP)?;
380 1
            if prev != 0 {
381 1
                jr.current.seek(JOURNAL_PAGE_NEXT_OFFSET);
382 1
                jr.current.write_u64(new_page.get_index());
383 1
                jr.current.seek(last_pos);
384 1
                jr.current.write_u8(0);
385 1
                self.allocator.flush_page(jr.current.clone())?;
386
            }
387 1
            jr.last_page = new_page.get_index();
388 1
            jr.current = new_page;
389 1
            jr.current.seek(JOURNAL_PAGE_PREV_OFFSET);
390 1
            jr.current.write_u64(prev);
391 1
            self.allocator.flush_page(jr.current.clone())?;
392 1
            jr.last_pos = JOURNAL_PAGE_CONTENT_OFFSET;
393 1
        }
394 1
        Ok(())
395 1
    }
396
}
397

398
impl JournalEntry for DeleteRecord {
399 1
    fn get_type(&self) -> u8 {
400
        6
401 1
    }
402

403 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
404 1
        self.segment.write_varint(buffer);
405 1
        buffer.write_varint_u64(self.recref.page);
406 1
        buffer.write_varint_u32(self.recref.pos);
407 1
        buffer.write_varint_u16(self.version);
408 1
        Ok(())
409 1
    }
410

411 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
412 1
        self.segment = SegmentId::read_varint(buffer);
413 1
        self.recref.page = buffer.read_varint_u64();
414 1
        self.recref.pos = buffer.read_varint_u32();
415 1
        self.version = buffer.read_varint_u16();
416 1
        Ok(())
417 1
    }
418

419 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
420 1
        tx.recover_delete(self);
421 1
        Ok(RecoverStatus::Started)
422 1
    }
423
}
424

425
impl JournalEntry for ReadRecord {
426 1
    fn get_type(&self) -> u8 {
427
        10
428 1
    }
429

430 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
431 1
        self.segment.write_varint(buffer);
432 1
        buffer.write_varint_u64(self.recref.page);
433 1
        buffer.write_varint_u32(self.recref.pos);
434 1
        buffer.write_varint_u16(self.version);
435 1
        Ok(())
436 1
    }
437

438 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
439 1
        self.segment = SegmentId::read_varint(buffer);
440 1
        self.recref.page = buffer.read_varint_u64();
441 1
        self.recref.pos = buffer.read_varint_u32();
442 1
        self.version = buffer.read_varint_u16();
443 1
        Ok(())
444 1
    }
445

446 0
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
447 0
        tx.recover_read(self);
448 0
        Ok(RecoverStatus::Started)
449 0
    }
450
}
451
impl JournalEntry for UpdateRecord {
452 1
    fn get_type(&self) -> u8 {
453
        5
454 1
    }
455

456 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
457 1
        self.segment.write_varint(buffer);
458 1
        buffer.write_varint_u64(self.recref.page);
459 1
        buffer.write_varint_u32(self.recref.pos);
460 1
        buffer.write_varint_u64(self.record_page);
461 1
        buffer.write_varint_u16(self.version);
462 1
        Ok(())
463 1
    }
464

465 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
466 1
        self.segment = SegmentId::read_varint(buffer);
467 1
        self.recref.page = buffer.read_varint_u64();
468 1
        self.recref.pos = buffer.read_varint_u32();
469 1
        self.record_page = buffer.read_varint_u64();
470 1
        self.version = buffer.read_varint_u16();
471 1
        Ok(())
472 1
    }
473

474 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
475 1
        tx.recover_update(self);
476 1
        Ok(RecoverStatus::Started)
477 1
    }
478
}
479

480
impl JournalEntry for InsertRecord {
481 1
    fn get_type(&self) -> u8 {
482
        2
483 1
    }
484

485 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
486 1
        self.segment.write_varint(buffer);
487 1
        buffer.write_varint_u64(self.recref.page);
488 1
        buffer.write_varint_u32(self.recref.pos);
489 1
        buffer.write_varint_u64(self.record_page);
490 1
        Ok(())
491 1
    }
492

493 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
494 1
        self.segment = SegmentId::read_varint(buffer);
495 1
        self.recref.page = buffer.read_varint_u64();
496 1
        self.recref.pos = buffer.read_varint_u32();
497 1
        self.record_page = buffer.read_varint_u64();
498 1
        Ok(())
499 1
    }
500

501 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
502 1
        tx.recover_insert(self);
503 1
        Ok(RecoverStatus::Started)
504 1
    }
505
}
506

507
impl JournalEntry for PrepareCommit {
508 1
    fn get_type(&self) -> u8 {
509
        3
510 1
    }
511

512 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
513 1
        Ok(())
514 1
    }
515

516 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
517 1
        Ok(())
518 1
    }
519

520 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
521 1
        Ok(RecoverStatus::PrepareCommit)
522 1
    }
523
}
524

525
impl JournalEntry for Commit {
526 1
    fn get_type(&self) -> u8 {
527
        4
528 1
    }
529

530 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
531 1
        Ok(())
532 1
    }
533

534 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
535 1
        Ok(())
536 1
    }
537

538 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
539 1
        Ok(RecoverStatus::Commit)
540 1
    }
541
}
542

543
impl JournalEntry for Cleanup {
544 1
    fn get_type(&self) -> u8 {
545
        14
546 1
    }
547

548 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
549 1
        Ok(())
550 1
    }
551

552 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
553 1
        Ok(())
554 1
    }
555

556 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
557 1
        Ok(RecoverStatus::Cleanup)
558 1
    }
559
}
560

561
impl JournalEntry for Metadata {
562 1
    fn get_type(&self) -> u8 {
563
        11
564 1
    }
565

566 1
    fn write(&self, write: &mut dyn InfallibleWrite) -> PRes<()> {
567 1
        write.write_varint_u8(self.strategy.value());
568 1
        let len = self.meta_id.len();
569 1
        write.write_varint_u16(len as u16);
570 1
        write.write_all(&self.meta_id);
571 1
        Ok(())
572 1
    }
573

574 1
    fn read(&mut self, read: &mut dyn InfallibleRead) -> PRes<()> {
575 1
        self.strategy = TxStrategy::from_value(read.read_varint_u8());
576 1
        let len = read.read_varint_u16();
577 1
        let mut slice: Vec<u8> = vec![0; len as usize];
578 1
        read.read_exact(&mut slice[0..len as usize]);
579 1
        self.meta_id = slice;
580 1
        Ok(())
581 1
    }
582

583 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
584 1
        tx.recover_metadata(self);
585 1
        Ok(RecoverStatus::Started)
586 1
    }
587
}
588

589
impl JournalEntry for Start {
590 1
    fn get_type(&self) -> u8 {
591
        1
592 1
    }
593

594 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
595 1
        Ok(())
596 1
    }
597

598 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
599 1
        Ok(())
600 1
    }
601

602 0
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
603 0
        panic!("this should never be called")
604
    }
605
}
606

607
impl JournalEntry for Rollback {
608 1
    fn get_type(&self) -> u8 {
609
        7
610 1
    }
611

612 1
    fn write(&self, _: &mut dyn InfallibleWrite) -> PRes<()> {
613 1
        Ok(())
614 1
    }
615

616 1
    fn read(&mut self, _: &mut dyn InfallibleRead) -> PRes<()> {
617 1
        Ok(())
618 1
    }
619 1
    fn recover(&self, _: &mut Transaction) -> PRes<RecoverStatus> {
620 1
        Ok(RecoverStatus::Rollback)
621 1
    }
622
}
623

624
impl JournalEntry for CreateSegment {
625 1
    fn get_type(&self) -> u8 {
626
        8
627 1
    }
628

629 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
630 1
        self.segment_id.write_varint(buffer);
631 1
        buffer.write_varint_u64(self.first_page);
632 1
        buffer.write_varint_u16(self.name.len() as u16);
633 1
        buffer.write_all(self.name.as_bytes());
634 1
        Ok(())
635 1
    }
636

637 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
638 1
        self.segment_id = SegmentId::read_varint(buffer);
639 1
        self.first_page = buffer.read_varint_u64();
640 1
        let string_size = buffer.read_varint_u16();
641 1
        let mut slice: Vec<u8> = vec![0; string_size as usize];
642 1
        buffer.read_exact(&mut slice[0..string_size as usize]);
643 1
        self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
644 1
        Ok(())
645 1
    }
646

647 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
648 1
        tx.recover_add(self);
649 1
        Ok(RecoverStatus::Started)
650 1
    }
651
}
652

653
impl JournalEntry for DropSegment {
654 1
    fn get_type(&self) -> u8 {
655
        9
656 1
    }
657

658 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
659 1
        self.segment_id.write_varint(buffer);
660 1
        buffer.write_varint_u16(self.name.len() as u16);
661 1
        buffer.write_all(self.name.as_bytes());
662 1
        Ok(())
663 1
    }
664

665 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
666 1
        self.segment_id = SegmentId::read_varint(buffer);
667 1
        let string_size = buffer.read_varint_u16();
668 1
        let mut slice: Vec<u8> = vec![0; string_size as usize];
669 1
        buffer.read_exact(&mut slice[0..string_size as usize]);
670 1
        self.name = str::from_utf8(&slice[0..string_size as usize])?.into();
671 1
        Ok(())
672 1
    }
673

674 0
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
675 0
        tx.recover_drop(self);
676 0
        Ok(RecoverStatus::Started)
677 0
    }
678
}
679

680
impl JournalEntry for FreedPage {
681 1
    fn get_type(&self) -> u8 {
682
        12
683 1
    }
684

685 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
686 1
        buffer.write_varint_u64(self.page);
687 1
        Ok(())
688 1
    }
689

690 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
691 1
        self.page = buffer.read_varint_u64();
692 1
        Ok(())
693 1
    }
694

695 1
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
696 1
        tx.recover_freed_page(self);
697 1
        Ok(RecoverStatus::Started)
698 1
    }
699
}
700

701
impl JournalEntry for NewSegmentPage {
702 1
    fn get_type(&self) -> u8 {
703
        13
704 1
    }
705

706 1
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PRes<()> {
707 1
        self.segment.write_varint(buffer);
708 1
        buffer.write_varint_u64(self.page);
709 1
        buffer.write_varint_u64(self.previous);
710 1
        Ok(())
711 1
    }
712

713 1
    fn read(&mut self, buffer: &mut dyn InfallibleRead) -> PRes<()> {
714 1
        self.segment = SegmentId::read_varint(buffer);
715 1
        self.page = buffer.read_varint_u64();
716 1
        self.previous = buffer.read_varint_u64();
717 1
        Ok(())
718 1
    }
719

720 0
    fn recover(&self, tx: &mut Transaction) -> PRes<RecoverStatus> {
721 0
        tx.recover_new_segment_page(self);
722 0
        Ok(RecoverStatus::Started)
723 0
    }
724
}
725
impl JournalId {
726 1
    pub fn new(page: u64, pos: u32) -> JournalId {
727 1
        JournalId { page, pos }
728 1
    }
729
}
730

731
impl Start {
732 1
    fn new() -> Start {
733
        Start {}
734 1
    }
735
}
736

737
#[cfg(test)]
738
mod tests {
739
    use super::{Journal, JournalEntry, JournalId, StartList};
740
    use crate::transaction::{
741
        CreateSegment, DeleteRecord, DropSegment, InsertRecord, Metadata, NewSegmentPage, ReadRecord, UpdateRecord,
742
    };
743
    use crate::{
744
        allocator::Allocator,
745
        config::{Config, TxStrategy},
746
        discref::DiscRef,
747
        id::{RecRef, SegmentId},
748
    };
749
    use std::{
750
        io::{Cursor, Seek, SeekFrom},
751
        sync::Arc,
752
    };
753
    use tempfile::Builder;
754

755
    #[test]
756 1
    fn start_list_add_remove() {
757 1
        let mut start = StartList::new();
758 1
        start.push(&JournalId::new(1, 2));
759 1
        start.push(&JournalId::new(1, 4));
760 1
        assert!(start.remove(&JournalId::new(1, 2)));
761 1
        assert!(start.remove(&JournalId::new(1, 4)));
762

763 1
        start.push(&JournalId::new(1, 2));
764 1
        start.push(&JournalId::new(1, 4));
765 1
        assert!(!start.remove(&JournalId::new(1, 4)));
766 1
        assert!(start.remove(&JournalId::new(1, 2)));
767 1
    }
768

769
    #[test]
770 1
    fn start_list_add_remove_other_order() {
771 1
        let mut start = StartList::new();
772 1
        start.push(&JournalId::new(1, 1));
773 1
        start.push(&JournalId::new(1, 2));
774 1
        assert!(!start.remove(&JournalId::new(1, 2)));
775 1
        start.push(&JournalId::new(1, 3));
776 1
        assert!(!start.remove(&JournalId::new(1, 3)));
777 1
        assert!(start.remove(&JournalId::new(1, 1)));
778 1
    }
779

780
    #[test]
781 1
    fn read_write_insert_record() {
782 1
        let mut buffer = Vec::<u8>::new();
783 1
        let seg_id = SegmentId::new(10);
784 1
        let to_write = InsertRecord::new(seg_id, &RecRef::new(20, 10), 3);
785

786 1
        to_write.write(&mut buffer).unwrap();
787

788 1
        let mut to_read = InsertRecord::default();
789 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
790 1
        to_read.read(&mut cursor).unwrap();
791 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
792 1
        assert_eq!(to_read.segment, seg_id);
793 1
        assert_eq!(to_read.recref.page, 20);
794 1
        assert_eq!(to_read.recref.pos, 10);
795 1
        assert_eq!(to_read.record_page, 3);
796 1
    }
797

798
    #[test]
799 1
    fn read_write_insert_read() {
800 1
        let mut buffer = Vec::<u8>::new();
801 1
        let seg_id = SegmentId::new(10);
802 1
        let to_write = ReadRecord::new(seg_id, &RecRef::new(20, 10), 3);
803

804 1
        to_write.write(&mut buffer).unwrap();
805

806 1
        let mut to_read = ReadRecord::default();
807 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
808 1
        to_read.read(&mut cursor).unwrap();
809 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
810 1
        assert_eq!(to_read.segment, seg_id);
811 1
        assert_eq!(to_read.recref.page, 20);
812 1
        assert_eq!(to_read.recref.pos, 10);
813 1
        assert_eq!(to_read.version, 3);
814 1
    }
815

816
    #[test]
817 1
    fn read_write_update_record() {
818 1
        let mut buffer = Vec::<u8>::new();
819 1
        let seg_id = SegmentId::new(10);
820 1
        let to_write = UpdateRecord::new(seg_id, &RecRef::new(20, 10), 3, 1);
821

822 1
        to_write.write(&mut buffer).unwrap();
823

824 1
        let mut to_read = UpdateRecord::default();
825 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
826 1
        to_read.read(&mut cursor).unwrap();
827 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
828 1
        assert_eq!(to_read.segment, seg_id);
829 1
        assert_eq!(to_read.recref.page, 20);
830 1
        assert_eq!(to_read.recref.pos, 10);
831 1
        assert_eq!(to_read.record_page, 3);
832 1
        assert_eq!(to_read.version, 1);
833 1
    }
834

835
    #[test]
836 1
    fn read_write_delete_record() {
837 1
        let mut buffer = Vec::<u8>::new();
838 1
        let seg_id = SegmentId::new(10);
839 1
        let to_write = DeleteRecord::new(seg_id, &RecRef::new(20, 10), 1);
840

841 1
        to_write.write(&mut buffer).unwrap();
842

843 1
        let mut to_read = DeleteRecord::default();
844 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
845 1
        to_read.read(&mut cursor).unwrap();
846 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
847 1
        assert_eq!(to_read.segment, seg_id);
848 1
        assert_eq!(to_read.recref.page, 20);
849 1
        assert_eq!(to_read.recref.pos, 10);
850 1
        assert_eq!(to_read.version, 1);
851 1
    }
852

853
    #[test]
854 1
    fn read_write_create_segment() {
855 1
        let mut buffer = Vec::<u8>::new();
856 1
        let seg_id = SegmentId::new(10);
857 1
        let to_write = CreateSegment::new("some", seg_id, 20);
858 1
        to_write.write(&mut buffer).unwrap();
859 1
        let mut to_read = CreateSegment::default();
860 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
861 1
        to_read.read(&mut cursor).unwrap();
862 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
863 1
        assert_eq!(to_read.name, "some");
864 1
        assert_eq!(to_read.segment_id, seg_id);
865 1
        assert_eq!(to_read.first_page, 20);
866 1
    }
867

868
    #[test]
869 1
    fn read_write_drop_segment() {
870 1
        let mut buffer = Vec::<u8>::new();
871 1
        let seg_id = SegmentId::new(20);
872 1
        let to_write = DropSegment::new("some", seg_id);
873 1
        to_write.write(&mut buffer).unwrap();
874 1
        let mut to_read = DropSegment::default();
875 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
876 1
        to_read.read(&mut cursor).unwrap();
877 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
878 1
        assert_eq!(to_read.name, "some");
879 1
        assert_eq!(to_read.segment_id, seg_id);
880 1
    }
881

882
    #[test]
883 1
    fn read_write_metadata() {
884 1
        let mut buffer = Vec::<u8>::new();
885 1
        let meta_id = vec![10, 3];
886 1
        let to_write = Metadata::new(&TxStrategy::VersionOnWrite, meta_id.clone());
887 1
        to_write.write(&mut buffer).unwrap();
888 1
        let mut to_read = Metadata::new(&TxStrategy::LastWin, Vec::new());
889 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
890 1
        to_read.read(&mut cursor).unwrap();
891 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
892 1
        assert_eq!(to_read.strategy, TxStrategy::VersionOnWrite);
893 1
        assert_eq!(to_read.meta_id, meta_id);
894 1
    }
895

896
    #[test]
897 1
    fn read_write_new_segment_page() {
898 1
        let mut buffer = Vec::<u8>::new();
899 1
        let seg_id = SegmentId::new(10);
900 1
        let to_write = NewSegmentPage::new(seg_id, 20, 30);
901 1
        to_write.write(&mut buffer).unwrap();
902 1
        let mut to_read = NewSegmentPage::default();
903 1
        let mut cursor = Cursor::<&Vec<u8>>::new(&buffer);
904 1
        to_read.read(&mut cursor).unwrap();
905 1
        assert_eq!(cursor.seek(SeekFrom::Current(0)).unwrap(), (buffer.len()) as u64);
906 1
        assert_eq!(to_read.segment, seg_id);
907 1
        assert_eq!(to_read.page, 20);
908 1
        assert_eq!(to_read.previous, 30);
909 1
    }
910

911
    #[test]
912 1
    fn journal_log_and_recover() {
913 1
        let file = Builder::new()
914
            .prefix("journal_test")
915
            .suffix(".persy")
916
            .tempfile()
917
            .unwrap()
918
            .reopen()
919 1
            .unwrap();
920 1
        let disc = Box::new(DiscRef::new(file).unwrap());
921 1
        let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
922 1
        let rp = Journal::init(&allocator).unwrap();
923 1
        let journal = Journal::new(&Arc::new(allocator), rp).unwrap();
924 1
        let seg_id = SegmentId::new(10);
925 1
        let rec = InsertRecord::new(seg_id, &RecRef::new(1, 1), 1);
926 1
        let id = JournalId::new(1, 20);
927 1
        journal.log(&rec, &id).unwrap();
928 1
        journal.log(&rec, &id).unwrap();
929 1
        journal.recover(|e, _| assert_eq!(e.get_type(), 2)).unwrap();
930 1
    }
931
}

Read our documentation on viewing source code .

Loading