1
use crate::{
2
    config::Config,
3
    discref::{Device, Page, PageOps, ReadPage, UpdateList},
4
    error::PRes,
5
    flush_checksum::{double_buffer_check, write_root_page},
6
    io::{read_u64, write_u64, InfallibleReadFormat},
7
};
8
use linked_hash_map::LinkedHashMap;
9
use std::{io::Read, sync::Mutex};
10

11
const ALLOCATOR_PAGE_EXP: u8 = 10; // 2^10
12
const ALLOCATER_ROOT_PAGE_VERSION_V0: u8 = 0;
13
const ALLOCATER_ROOT_PAGE_VERSION: u8 = ALLOCATER_ROOT_PAGE_VERSION_V0;
14

15
pub struct Cache {
16
    cache: LinkedHashMap<u64, ReadPage>,
17
    size: u64,
18
    limit: u64,
19
}
20

21
impl Cache {
22 1
    pub fn new(limit: u64) -> Cache {
23 1
        Cache {
24 1
            cache: LinkedHashMap::new(),
25
            size: 0,
26
            limit,
27
        }
28 1
    }
29

30 1
    fn get(&mut self, key: u64) -> Option<ReadPage> {
31 1
        self.cache.get_refresh(&key).map(|val| val.clone_read())
32 1
    }
33

34 1
    fn put(&mut self, key: u64, value: ReadPage) {
35 1
        self.size += 1 << value.get_size_exp();
36 1
        self.cache.insert(key, value);
37 1
        while self.size > self.limit {
38 1
            if let Some(en) = self.cache.pop_front() {
39 1
                self.size -= 1 << en.1.get_size_exp();
40 1
            } else {
41 0
                break;
42
            }
43 0
        }
44 1
    }
45 1
    fn remove(&mut self, key: u64) {
46 1
        self.cache.remove(&key);
47 1
    }
48
}
49

50
struct FreeList {
51
    list: [u64; 32],
52
    last_flush: u8,
53
}
54
impl FreeList {
55 1
    fn new(list: [u64; 32], last_flush: u8) -> Self {
56 1
        FreeList { list, last_flush }
57 1
    }
58

59 1
    fn read(page: &mut ReadPage) -> PRes<FreeList> {
60 1
        match page.read_u8() {
61 1
            ALLOCATER_ROOT_PAGE_VERSION_V0 => Self::read_v0(page),
62 0
            _ => panic!("unsupported format"),
63
        }
64 1
    }
65 1
    fn read_v0(page: &mut ReadPage) -> PRes<FreeList> {
66 1
        let mut buffer_0 = [0; 259];
67 1
        let mut buffer_1 = [0; 259];
68
        let freelist;
69
        let last_flush;
70 1
        page.read_exact(&mut buffer_0)?;
71 1
        page.read_exact(&mut buffer_1)?;
72 1
        let (flush_number, first) = double_buffer_check(&buffer_0, &buffer_1);
73 1
        last_flush = flush_number;
74 1
        if first {
75 1
            freelist = Self::read_free_list(&buffer_0);
76
        } else {
77 1
            freelist = Self::read_free_list(&buffer_1);
78
        }
79 1
        Ok(FreeList::new(freelist, last_flush))
80 1
    }
81

82 1
    pub fn read_free_list(buffer: &[u8]) -> [u64; 32] {
83 1
        let mut freelist = [0; 32];
84 1
        for p in &mut freelist.iter_mut().enumerate() {
85 1
            let pos = 8 * p.0;
86 1
            (*p.1) = read_u64(&buffer[pos..pos + 8]);
87
        }
88
        freelist
89 1
    }
90

91 1
    pub fn write_free_list(list: &[u64]) -> [u8; 259] {
92 1
        let mut buffer = [0; 259];
93 1
        for (pos, page) in list.iter().enumerate() {
94 1
            write_u64(&mut buffer[pos..pos + 8], *page);
95
        }
96
        buffer
97 1
    }
98

99 1
    pub fn write_list(&mut self, page: &mut Page) -> PRes<()> {
100 1
        let mut buffer = Self::write_free_list(&self.list);
101 1
        self.last_flush = write_root_page(page, &mut buffer, ALLOCATER_ROOT_PAGE_VERSION, self.last_flush)?;
102 1
        Ok(())
103 1
    }
104
}
105

106
// TODO: Manage defragmentation by merging/splitting pages in the free list
107
pub struct Allocator {
108
    disc: Box<dyn Device>,
109
    free_list: Mutex<FreeList>,
110
    cache: Mutex<Cache>,
111
    page: u64,
112
}
113

114
impl Allocator {
115 1
    pub fn new(dr: Box<dyn Device>, config: &Config, page: u64) -> PRes<Allocator> {
116 1
        let mut pg = dr.load_page(page)?;
117 1
        let freelist = FreeList::read(&mut pg)?;
118

119 1
        let cache_size = config.cache_size();
120 1
        Ok(Allocator {
121 1
            disc: dr,
122 1
            free_list: Mutex::new(freelist),
123 1
            cache: Mutex::new(Cache::new(cache_size)),
124
            page,
125 0
        })
126 1
    }
127

128 1
    pub fn init(dr: Box<dyn Device>, config: &Config) -> PRes<(u64, Allocator)> {
129 1
        let mut page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
130 1
        let mut list = FreeList::new([0; 32], 0);
131 1
        list.write_list(&mut page)?;
132 1
        dr.flush_page(&page)?;
133 1
        let allocate_page = page.get_index();
134 1
        Ok((allocate_page, Allocator::new(dr, config, allocate_page)?))
135 1
    }
136

137 1
    pub fn load_page_not_free(&self, page: u64) -> PRes<Option<ReadPage>> {
138 1
        let load = self.read_page_int(page)?;
139 1
        if load.is_free()? {
140 0
            Ok(None)
141
        } else {
142 1
            Ok(Some(load))
143
        }
144 1
    }
145

146 1
    pub fn load_page(&self, page: u64) -> PRes<ReadPage> {
147 1
        let load = self.read_page_int(page)?;
148 1
        debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
149 1
        Ok(load)
150 1
    }
151

152 1
    pub fn write_page(&self, page: u64) -> PRes<Page> {
153 1
        let load = self.write_page_int(page)?;
154 1
        debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
155 1
        Ok(load)
156 1
    }
157

158 1
    fn read_page_int(&self, page: u64) -> PRes<ReadPage> {
159 1
        {
160 1
            let mut cache = self.cache.lock()?;
161 1
            if let Some(pg) = cache.get(page) {
162 1
                return Ok(pg);
163
            }
164 1
        }
165 1
        let load = self.disc.load_page(page)?;
166
        {
167 1
            let mut cache = self.cache.lock()?;
168 1
            cache.put(page, load.clone_read());
169 1
        }
170 1
        Ok(load)
171 1
    }
172

173 1
    fn write_page_int(&self, page: u64) -> PRes<Page> {
174 1
        let cache_result;
175
        {
176 1
            let mut cache = self.cache.lock()?;
177 1
            cache_result = cache.get(page);
178 1
        }
179 1
        if let Some(pg) = cache_result {
180 1
            return Ok(pg.clone_write());
181 1
        }
182 1
        let load = self.disc.load_page(page)?;
183
        {
184 1
            let mut cache = self.cache.lock()?;
185 1
            cache.put(page, load.clone_read());
186 1
        }
187 1
        Ok(load.clone_write())
188 1
    }
189

190 1
    pub fn allocate(&self, exp: u8) -> PRes<Page> {
191 1
        {
192 1
            let mut fl = self.free_list.lock()?;
193 1
            let page = fl.list[exp as usize];
194 1
            if page != 0 as u64 {
195 1
                let next = self.disc.mark_allocated(page)?;
196 1
                fl.list[exp as usize] = next;
197 1
                let mut pg = self.write_page_int(page)?;
198 1
                pg.reset()?;
199 1
                return Ok(pg);
200 1
            }
201 1
        }
202 1
        let page = self.disc.create_page(exp)?;
203 1
        Ok(page)
204 1
    }
205

206 1
    pub fn flush_page(&self, page: Page) -> PRes<()> {
207 1
        self.disc.flush_page(&page)?;
208
        {
209 1
            let mut cache = self.cache.lock()?;
210 1
            cache.put(page.get_index(), page.make_read());
211 1
        }
212 1
        Ok(())
213 1
    }
214

215 1
    pub fn remove_from_free(&self, page: u64, exp: u8) -> PRes<()> {
216 1
        let mut fl = self.free_list.lock()?;
217 1
        let mut pg = self.write_page_int(page)?;
218 1
        if pg.is_free()? {
219 1
            if pg.get_prev_free() == 0 {
220 0
                fl.list[exp as usize] = pg.get_next_free();
221
            } else {
222 1
                let mut next = self.write_page_int(pg.get_next_free())?;
223 1
                next.set_prev_free(pg.get_prev_free());
224 1
                self.flush_page(next)?;
225 1
                let mut prev = self.write_page_int(pg.get_prev_free())?;
226 1
                prev.set_next_free(pg.get_next_free());
227 1
                self.flush_page(prev)?;
228 1
            }
229 1
            pg.set_free(false)?;
230 1
            self.flush_page(pg)?;
231
        } else {
232
            // If we hit this branch and the page is found in the free list
233
            // it means that the page free was flushed but not the free list
234
            // in this case the safest approach is to truncate the free list
235
            // with potential pages leaks, to fix in future free list refactors
236 1
            if fl.list[exp as usize] == page {
237 0
                fl.list[exp as usize] = 0;
238
            } else {
239 1
                let mut p = fl.list[exp as usize];
240 1
                while p != 0 {
241 0
                    let mut pg_in_list = self.write_page_int(p)?;
242 0
                    p = pg_in_list.get_next_free();
243 0
                    if p == page {
244 0
                        pg_in_list.set_next_free(0);
245 0
                        self.flush_page(pg_in_list)?;
246 0
                        break;
247
                    }
248 0
                }
249
            }
250
        }
251 1
        Ok(())
252 1
    }
253

254
    /// Recover free do not not have debug asserts for already freed pages
255 1
    pub fn recover_free(&self, page: u64) -> PRes<()> {
256 1
        if !self.disc.load_page(page)?.is_free()? {
257 1
            self.free(page)?;
258
        }
259 1
        Ok(())
260 1
    }
261 1
    pub fn trim_free_at_end(&self) -> PRes<()> {
262 1
        let mut fl = self.free_list.lock()?;
263 1
        self.disc.trim_end_pages(&mut fl.list)?;
264 1
        Ok(())
265 1
    }
266

267 1
    pub fn free(&self, page: u64) -> PRes<()> {
268 1
        self.cache.lock()?.remove(page);
269 1
        let mut fl = self.free_list.lock()?;
270 1
        self.disc.trim_or_free_page(page, &mut fl.list)?;
271 1
        Ok(())
272 1
    }
273

274 1
    pub fn flush_free_list(&self) -> PRes<()> {
275 1
        let mut lock = self.free_list.lock()?;
276 1
        let mut pag = self.disc.load_page(self.page)?.clone_write();
277 1
        lock.write_list(&mut pag)?;
278 1
        self.disc.flush_page(&pag)?;
279
        // I do not do the disk sync here because is every time done by the caller.
280 1
        Ok(())
281 1
    }
282

283 1
    pub fn disc(&self) -> &dyn Device {
284 1
        &*self.disc
285 1
    }
286

287 1
    pub fn release(self) -> Box<dyn Device> {
288 1
        self.disc
289 1
    }
290
}
291

292
impl UpdateList for [u64; 32] {
293 1
    fn update(&mut self, size: u8, page: u64) -> PRes<u64> {
294 1
        let old = self[size as usize];
295 1
        self[size as usize] = page;
296 1
        debug_assert!(old != page, "freeing: {} already free: {} ", page, old);
297 1
        Ok(old)
298 1
    }
299

300 0
    fn remove(&mut self, size: u8, page: u64, next: u64) -> PRes<()> {
301 0
        let old = self[size as usize];
302 0
        self[size as usize] = next;
303 0
        debug_assert!(
304 0
            old == page,
305
            "trimmed page not in top list expected:{} current:{} ",
306
            page,
307
            old
308
        );
309 0
        Ok(())
310 0
    }
311
}
312

313
#[cfg(test)]
314
mod tests {
315
    use super::{Allocator, Cache};
316
    use crate::{
317
        config::Config,
318
        discref::{DiscRef, PageOps, ReadPage},
319
    };
320
    use std::sync::Arc;
321
    use tempfile::Builder;
322

323
    #[test]
324 1
    fn test_reuse_freed_page() {
325 1
        let file = Builder::new()
326
            .prefix("all_reuse_test")
327
            .suffix(".persy")
328
            .tempfile()
329
            .unwrap()
330
            .reopen()
331 1
            .unwrap();
332 1
        let disc = Box::new(DiscRef::new(file).unwrap());
333 1
        let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
334
        // This is needed to avoid the 0 page
335 1
        allocator.allocate(10).unwrap();
336 1
        let first = allocator.allocate(10).unwrap().get_index();
337 1
        let second = allocator.allocate(10).unwrap().get_index();
338 1
        let third = allocator.allocate(11).unwrap().get_index();
339 1
        let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
340

341 1
        allocator.free(first).unwrap();
342 1
        allocator.free(second).unwrap();
343 1
        allocator.free(third).unwrap();
344

345 1
        let val = allocator.allocate(10).unwrap().get_index();
346 1
        assert_eq!(val, second);
347 1
        let val = allocator.allocate(10).unwrap().get_index();
348 1
        assert_eq!(val, first);
349 1
        let val = allocator.allocate(10).unwrap().get_index();
350 1
        assert!(val != first);
351 1
        assert!(val != second);
352 1
        let val = allocator.allocate(11).unwrap().get_index();
353 1
        assert_eq!(val, third);
354 1
        let val = allocator.allocate(11).unwrap().get_index();
355 1
        assert!(val != third);
356 1
    }
357

358
    #[test]
359 1
    fn test_remove_freed_page() {
360 1
        let file = Builder::new()
361
            .prefix("remove_free_test")
362
            .suffix(".persy")
363
            .tempfile()
364
            .unwrap()
365
            .reopen()
366 1
            .unwrap();
367 1
        let disc = Box::new(DiscRef::new(file).unwrap());
368 1
        let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
369
        // This is needed to avoid the 0 page
370 1
        allocator.allocate(2).unwrap();
371 1
        let first = allocator.allocate(10).unwrap().get_index();
372 1
        let second = allocator.allocate(10).unwrap().get_index();
373 1
        let third = allocator.allocate(10).unwrap().get_index();
374 1
        let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
375 1
        println!("{} {} {}", first, second, third);
376 1
        allocator.free(first).unwrap();
377 1
        allocator.free(second).unwrap();
378 1
        allocator.free(third).unwrap();
379 1
        allocator.remove_from_free(second, 10).unwrap();
380 1
        let val = allocator.allocate(10).unwrap().get_index();
381 1
        assert_eq!(val, third);
382 1
        let val = allocator.allocate(10).unwrap().get_index();
383 1
        assert_eq!(val, first);
384 1
    }
385

386
    #[test]
387 1
    fn test_cache_limit_evict() {
388 1
        let mut cache = Cache::new(1050 as u64);
389 1
        cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
390 1
        cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
391 1
        cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
392 1
        assert!(cache.size < 1050);
393 1
        assert_eq!(cache.cache.len(), 2);
394 1
        let ten = 10 as u64;
395 1
        match cache.get(ten) {
396 1
            Some(_) => assert!(false),
397
            None => assert!(true),
398 1
        }
399 1
        let ten = 20 as u64;
400 1
        match cache.get(ten) {
401 1
            Some(_) => assert!(true),
402 0
            None => assert!(false),
403 1
        }
404 1
        let ten = 30 as u64;
405 1
        match cache.get(ten) {
406 1
            Some(_) => assert!(true),
407 0
            None => assert!(false),
408
        }
409 1
    }
410

411
    #[test]
412 1
    fn test_cache_limit_refresh_evict() {
413 1
        let mut cache = Cache::new(1050 as u64);
414 1
        cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
415 1
        cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
416 1
        let ten = 10 as u64;
417 1
        match cache.get(ten) {
418 1
            Some(_) => assert!(true),
419 0
            None => assert!(false),
420 1
        }
421

422 1
        cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
423 1
        assert!(cache.size < 1050);
424 1
        assert_eq!(cache.cache.len(), 2);
425 1
        let ten = 10 as u64;
426 1
        match cache.get(ten) {
427 1
            Some(_) => assert!(true),
428 0
            None => assert!(false),
429 1
        }
430 1
        let ten = 20 as u64;
431 1
        match cache.get(ten) {
432 1
            Some(_) => assert!(false),
433
            None => assert!(true),
434 1
        }
435 1
        let ten = 30 as u64;
436 1
        match cache.get(ten) {
437 1
            Some(_) => assert!(true),
438 0
            None => assert!(false),
439
        }
440 1
    }
441
}

Read our documentation on viewing source code .

Loading