1
use crate::{
2
    config::Config,
3
    discref::{Device, Page, PageOps, ReadPage, UpdateList},
4
    error::PRes,
5
    flush_checksum::{double_buffer_check, prepare_buffer_flush},
6
    io::{read_u64, write_u64},
7
};
8
use linked_hash_map::LinkedHashMap;
9
use std::{
10
    io::{Read, Write},
11
    sync::Mutex,
12
};
13

14
const ALLOCATOR_PAGE_EXP: u8 = 10; // 2^10
15

16
pub struct Cache {
17
    cache: LinkedHashMap<u64, ReadPage>,
18
    size: u64,
19
    limit: u64,
20
}
21

22
impl Cache {
23 1
    pub fn new(limit: u64) -> Cache {
24 1
        Cache {
25 1
            cache: LinkedHashMap::new(),
26
            size: 0,
27
            limit,
28
        }
29 1
    }
30

31 1
    fn get(&mut self, key: u64) -> Option<ReadPage> {
32 1
        self.cache.get_refresh(&key).map(|val| val.clone_read())
33 1
    }
34

35 1
    fn put(&mut self, key: u64, value: ReadPage) {
36 1
        self.size += 1 << value.get_size_exp();
37 1
        self.cache.insert(key, value);
38 1
        while self.size > self.limit {
39 1
            if let Some(en) = self.cache.pop_front() {
40 1
                self.size -= 1 << en.1.get_size_exp();
41 1
            } else {
42 0
                break;
43
            }
44 0
        }
45 1
    }
46 1
    fn remove(&mut self, key: u64) {
47 1
        self.cache.remove(&key);
48 1
    }
49
}
50

51
struct FreeList {
52
    list: [u64; 32],
53
    last_flush: u8,
54
}
55
impl FreeList {
56 1
    fn new(list: [u64; 32], last_flush: u8) -> Self {
57 1
        FreeList { list, last_flush }
58 1
    }
59

60 1
    fn read(page: &mut ReadPage, offset: u32) -> PRes<FreeList> {
61 1
        let mut buffer_0 = [0; 259];
62 1
        let mut buffer_1 = [0; 259];
63
        let freelist;
64
        let last_flush;
65 1
        page.seek(offset)?;
66 1
        page.read_exact(&mut buffer_0)?;
67 1
        page.read_exact(&mut buffer_1)?;
68 1
        let (flush_number, first) = double_buffer_check(&buffer_0, &buffer_1);
69 1
        last_flush = flush_number;
70 1
        if first {
71 1
            freelist = Self::read_free_list(&buffer_0);
72
        } else {
73 1
            freelist = Self::read_free_list(&buffer_1);
74
        }
75 1
        Ok(FreeList::new(freelist, last_flush))
76 1
    }
77

78 1
    pub fn read_free_list(buffer: &[u8]) -> [u64; 32] {
79 1
        let mut freelist = [0; 32];
80 1
        for p in &mut freelist.iter_mut().enumerate() {
81 1
            let pos = 8 * p.0;
82 1
            (*p.1) = read_u64(&buffer[pos..pos + 8]);
83
        }
84
        freelist
85 1
    }
86

87 1
    pub fn write_free_list(list: &[u64]) -> [u8; 259] {
88 1
        let mut buffer = [0; 259];
89 1
        for (pos, page) in list.iter().enumerate() {
90 1
            write_u64(&mut buffer[pos..pos + 8], *page);
91
        }
92
        buffer
93 1
    }
94

95 1
    pub fn write_list(&mut self, page: &mut Page, base_offset: u32) -> PRes<()> {
96 1
        let mut buffer = Self::write_free_list(&self.list);
97 1
        let (last_flush, offset) = prepare_buffer_flush(&mut buffer, self.last_flush);
98 1
        self.last_flush = last_flush;
99 1
        page.seek(base_offset + offset)?;
100 1
        page.write_all(&buffer)?;
101 1
        Ok(())
102 1
    }
103
}
104

105
// TODO: Manage defragmentation by merging/splitting pages in the free list
106
pub struct Allocator {
107
    disc: Box<dyn Device>,
108
    free_list: Mutex<FreeList>,
109
    cache: Mutex<Cache>,
110
    page: u64,
111
}
112

113
impl Allocator {
114 1
    pub fn new(dr: Box<dyn Device>, config: &Config, page: u64) -> PRes<Allocator> {
115 1
        let mut pg = dr.load_page(page)?;
116 1
        let freelist = FreeList::read(&mut pg, 0)?;
117

118 1
        let cache_size = config.cache_size();
119 1
        Ok(Allocator {
120 1
            disc: dr,
121 1
            free_list: Mutex::new(freelist),
122 1
            cache: Mutex::new(Cache::new(cache_size)),
123
            page,
124 0
        })
125 1
    }
126

127 1
    pub fn init(dr: Box<dyn Device>, config: &Config) -> PRes<(u64, Allocator)> {
128 1
        let mut page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
129 1
        let mut list = FreeList::new([0; 32], 0);
130 1
        list.write_list(&mut page, 0)?;
131 1
        dr.flush_page(&page)?;
132 1
        let allocate_page = page.get_index();
133 1
        Ok((allocate_page, Allocator::new(dr, config, allocate_page)?))
134 1
    }
135

136 1
    pub fn load_page_not_free(&self, page: u64) -> PRes<Option<ReadPage>> {
137 1
        let load = self.read_page_int(page)?;
138 1
        if load.is_free()? {
139 0
            Ok(None)
140
        } else {
141 1
            Ok(Some(load))
142
        }
143 1
    }
144

145 1
    pub fn load_page(&self, page: u64) -> PRes<ReadPage> {
146 1
        let load = self.read_page_int(page)?;
147 1
        debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
148 1
        Ok(load)
149 1
    }
150

151 1
    pub fn write_page(&self, page: u64) -> PRes<Page> {
152 1
        let load = self.write_page_int(page)?;
153 1
        debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
154 1
        Ok(load)
155 1
    }
156

157 1
    fn read_page_int(&self, page: u64) -> PRes<ReadPage> {
158 1
        {
159 1
            let mut cache = self.cache.lock()?;
160 1
            if let Some(pg) = cache.get(page) {
161 1
                return Ok(pg);
162
            }
163 1
        }
164 1
        let load = self.disc.load_page(page)?;
165
        {
166 1
            let mut cache = self.cache.lock()?;
167 1
            cache.put(page, load.clone_read());
168 1
        }
169 1
        Ok(load)
170 1
    }
171

172 1
    fn write_page_int(&self, page: u64) -> PRes<Page> {
173 1
        let cache_result;
174
        {
175 1
            let mut cache = self.cache.lock()?;
176 1
            cache_result = cache.get(page);
177 1
        }
178 1
        if let Some(pg) = cache_result {
179 1
            return Ok(pg.clone_write());
180 1
        }
181 1
        let load = self.disc.load_page(page)?;
182
        {
183 1
            let mut cache = self.cache.lock()?;
184 1
            cache.put(page, load.clone_read());
185 1
        }
186 1
        Ok(load.clone_write())
187 1
    }
188

189 1
    pub fn allocate(&self, exp: u8) -> PRes<Page> {
190 1
        {
191 1
            let mut fl = self.free_list.lock()?;
192 1
            let page = fl.list[exp as usize];
193 1
            if page != 0 as u64 {
194 1
                let next = self.disc.mark_allocated(page)?;
195 1
                fl.list[exp as usize] = next;
196 1
                let mut pg = self.write_page_int(page)?;
197 1
                pg.reset()?;
198 1
                return Ok(pg);
199 1
            }
200 1
        }
201 1
        let page = self.disc.create_page(exp)?;
202 1
        Ok(page)
203 1
    }
204

205 1
    pub fn flush_page(&self, page: Page) -> PRes<()> {
206 1
        self.disc.flush_page(&page)?;
207
        {
208 1
            let mut cache = self.cache.lock()?;
209 1
            cache.put(page.get_index(), page.make_read());
210 1
        }
211 1
        Ok(())
212 1
    }
213

214 1
    pub fn remove_from_free(&self, page: u64, exp: u8) -> PRes<()> {
215
        //TODO: this at the moment may leak free pages on recover after crash
216 1
        let mut fl = self.free_list.lock()?;
217 1
        if fl.list[exp as usize] == page {
218 0
            fl.list[exp as usize] = 0;
219
        } else {
220 1
            let mut p = fl.list[exp as usize];
221 1
            while p != 0 as u64 {
222 1
                let mut pg = self.write_page_int(p)?;
223 1
                p = pg.get_next_free()?;
224 1
                if p == page {
225 1
                    pg.set_free(false)?;
226 1
                    pg.set_next_free(0)?;
227 1
                    self.flush_page(pg)?;
228 1
                    break;
229
                }
230 1
            }
231
        }
232 1
        Ok(())
233 1
    }
234

235
    /// Recover free do not not have debug asserts for already freed pages
236 1
    pub fn recover_free(&self, page: u64) -> PRes<()> {
237 1
        if !self.disc.load_page(page)?.is_free()? {
238 1
            self.free(page)?;
239
        }
240 1
        Ok(())
241 1
    }
242 1
    pub fn trim_free_at_end(&self) -> PRes<()> {
243 1
        let mut fl = self.free_list.lock()?;
244 1
        self.disc.trim_end_pages(&mut fl.list)?;
245 1
        Ok(())
246 1
    }
247

248 1
    pub fn free(&self, page: u64) -> PRes<()> {
249 1
        self.cache.lock()?.remove(page);
250 1
        let mut fl = self.free_list.lock()?;
251 1
        self.disc.trim_or_free_page(page, &mut fl.list)?;
252 1
        Ok(())
253 1
    }
254

255 1
    pub fn flush_free_list(&self) -> PRes<()> {
256 1
        let mut lock = self.free_list.lock()?;
257 1
        let mut pag = self.disc.load_page(self.page)?.clone_write();
258 1
        lock.write_list(&mut pag, 0)?;
259 1
        self.disc.flush_page(&pag)?;
260
        // I do not do the disk sync here because is every time done by the caller.
261 1
        Ok(())
262 1
    }
263

264 1
    pub fn disc(&self) -> &dyn Device {
265 1
        &*self.disc
266 1
    }
267

268 1
    pub fn release(self) -> Box<dyn Device> {
269 1
        self.disc
270 1
    }
271
}
272

273
impl UpdateList for [u64; 32] {
274 1
    fn update(&mut self, size: u8, page: u64) -> PRes<u64> {
275 1
        let old = self[size as usize];
276 1
        self[size as usize] = page;
277 1
        debug_assert!(old != page, "freeing: {} already free: {} ", page, old);
278 1
        Ok(old)
279 1
    }
280

281 0
    fn remove(&mut self, size: u8, page: u64, next: u64) -> PRes<()> {
282 0
        let old = self[size as usize];
283 0
        self[size as usize] = next;
284 0
        debug_assert!(
285 0
            old == page,
286 0
            "trimmed page not in top list expected:{} current:{} ",
287
            page,
288
            old
289
        );
290 0
        Ok(())
291 0
    }
292
}
293

294
#[cfg(test)]
295
mod tests {
296
    use super::{Allocator, Cache};
297
    use crate::{
298
        config::Config,
299
        discref::{DiscRef, PageOps, ReadPage},
300
    };
301
    use std::sync::Arc;
302
    use tempfile::Builder;
303

304
    #[test]
305 1
    fn test_reuse_freed_page() {
306 1
        let file = Builder::new()
307
            .prefix("all_reuse_test")
308
            .suffix(".persy")
309
            .tempfile()
310
            .unwrap()
311
            .reopen()
312 1
            .unwrap();
313 1
        let disc = Box::new(DiscRef::new(file).unwrap());
314 1
        let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
315
        // This is needed to avoid the 0 page
316 1
        allocator.allocate(2).unwrap();
317 1
        let first = allocator.allocate(10).unwrap().get_index();
318 1
        let second = allocator.allocate(10).unwrap().get_index();
319 1
        let third = allocator.allocate(11).unwrap().get_index();
320 1
        let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
321

322 1
        allocator.free(first).unwrap();
323 1
        allocator.free(second).unwrap();
324 1
        allocator.free(third).unwrap();
325

326 1
        let val = allocator.allocate(10).unwrap().get_index();
327 1
        assert_eq!(val, second);
328 1
        let val = allocator.allocate(10).unwrap().get_index();
329 1
        assert_eq!(val, first);
330 1
        let val = allocator.allocate(10).unwrap().get_index();
331 1
        assert!(val != first);
332 1
        assert!(val != second);
333 1
        let val = allocator.allocate(11).unwrap().get_index();
334 1
        assert_eq!(val, third);
335 1
        let val = allocator.allocate(11).unwrap().get_index();
336 1
        assert!(val != third);
337 1
    }
338

339
    #[test]
340 1
    fn test_remove_freed_page() {
341 1
        let file = Builder::new()
342
            .prefix("remove_free_test")
343
            .suffix(".persy")
344
            .tempfile()
345
            .unwrap()
346
            .reopen()
347 1
            .unwrap();
348 1
        let disc = Box::new(DiscRef::new(file).unwrap());
349 1
        let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
350
        // This is needed to avoid the 0 page
351 1
        allocator.allocate(2).unwrap();
352 1
        let first = allocator.allocate(10).unwrap().get_index();
353 1
        let second = allocator.allocate(10).unwrap().get_index();
354 1
        let third = allocator.allocate(10).unwrap().get_index();
355 1
        let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
356 1
        println!("{} {} {}", first, second, third);
357 1
        allocator.free(first).unwrap();
358 1
        allocator.free(second).unwrap();
359 1
        allocator.free(third).unwrap();
360 1
        allocator.remove_from_free(second, 10).unwrap();
361 1
        let val = allocator.allocate(10).unwrap().get_index();
362 1
        assert_eq!(val, third);
363 1
        let val = allocator.allocate(10).unwrap().get_index();
364 1
        assert!(val != first);
365 1
        assert!(val != second);
366 1
        assert!(val != third);
367 1
    }
368

369
    #[test]
370 1
    fn test_cache_limit_evict() {
371 1
        let mut cache = Cache::new(1050 as u64);
372 1
        cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
373 1
        cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
374 1
        cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
375 1
        assert!(cache.size < 1050);
376 1
        assert_eq!(cache.cache.len(), 2);
377 1
        let ten = 10 as u64;
378 1
        match cache.get(ten) {
379 1
            Some(_) => assert!(false),
380
            None => assert!(true),
381 1
        }
382 1
        let ten = 20 as u64;
383 1
        match cache.get(ten) {
384 1
            Some(_) => assert!(true),
385 0
            None => assert!(false),
386 1
        }
387 1
        let ten = 30 as u64;
388 1
        match cache.get(ten) {
389 1
            Some(_) => assert!(true),
390 0
            None => assert!(false),
391
        }
392 1
    }
393

394
    #[test]
395 1
    fn test_cache_limit_refresh_evict() {
396 1
        let mut cache = Cache::new(1050 as u64);
397 1
        cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
398 1
        cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
399 1
        let ten = 10 as u64;
400 1
        match cache.get(ten) {
401 1
            Some(_) => assert!(true),
402 0
            None => assert!(false),
403 1
        }
404

405 1
        cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
406 1
        assert!(cache.size < 1050);
407 1
        assert_eq!(cache.cache.len(), 2);
408 1
        let ten = 10 as u64;
409 1
        match cache.get(ten) {
410 1
            Some(_) => assert!(true),
411 0
            None => assert!(false),
412 1
        }
413 1
        let ten = 20 as u64;
414 1
        match cache.get(ten) {
415 1
            Some(_) => assert!(false),
416
            None => assert!(true),
417 1
        }
418 1
        let ten = 30 as u64;
419 1
        match cache.get(ten) {
420 1
            Some(_) => assert!(true),
421 0
            None => assert!(false),
422
        }
423 1
    }
424
}

Read our documentation on viewing source code .

Loading