1
|
|
use crate::{
|
2
|
|
config::Config,
|
3
|
|
discref::{Device, Page, PageOps, ReadPage, UpdateList},
|
4
|
|
error::PRes,
|
5
|
|
flush_checksum::{double_buffer_check, prepare_buffer_flush},
|
6
|
|
io::{read_u64, write_u64},
|
7
|
|
};
|
8
|
|
use linked_hash_map::LinkedHashMap;
|
9
|
|
use std::{
|
10
|
|
io::{Read, Write},
|
11
|
|
sync::Mutex,
|
12
|
|
};
|
13
|
|
|
14
|
|
const ALLOCATOR_PAGE_EXP: u8 = 10; // 2^10
|
15
|
|
|
16
|
|
pub struct Cache {
|
17
|
|
cache: LinkedHashMap<u64, ReadPage>,
|
18
|
|
size: u64,
|
19
|
|
limit: u64,
|
20
|
|
}
|
21
|
|
|
22
|
|
impl Cache {
|
23
|
1
|
pub fn new(limit: u64) -> Cache {
|
24
|
1
|
Cache {
|
25
|
1
|
cache: LinkedHashMap::new(),
|
26
|
|
size: 0,
|
27
|
|
limit,
|
28
|
|
}
|
29
|
1
|
}
|
30
|
|
|
31
|
1
|
fn get(&mut self, key: u64) -> Option<ReadPage> {
|
32
|
1
|
self.cache.get_refresh(&key).map(|val| val.clone_read())
|
33
|
1
|
}
|
34
|
|
|
35
|
1
|
fn put(&mut self, key: u64, value: ReadPage) {
|
36
|
1
|
self.size += 1 << value.get_size_exp();
|
37
|
1
|
self.cache.insert(key, value);
|
38
|
1
|
while self.size > self.limit {
|
39
|
1
|
if let Some(en) = self.cache.pop_front() {
|
40
|
1
|
self.size -= 1 << en.1.get_size_exp();
|
41
|
1
|
} else {
|
42
|
0
|
break;
|
43
|
|
}
|
44
|
0
|
}
|
45
|
1
|
}
|
46
|
1
|
fn remove(&mut self, key: u64) {
|
47
|
1
|
self.cache.remove(&key);
|
48
|
1
|
}
|
49
|
|
}
|
50
|
|
|
51
|
|
struct FreeList {
|
52
|
|
list: [u64; 32],
|
53
|
|
last_flush: u8,
|
54
|
|
}
|
55
|
|
impl FreeList {
|
56
|
1
|
fn new(list: [u64; 32], last_flush: u8) -> Self {
|
57
|
1
|
FreeList { list, last_flush }
|
58
|
1
|
}
|
59
|
|
|
60
|
1
|
fn read(page: &mut ReadPage, offset: u32) -> PRes<FreeList> {
|
61
|
1
|
let mut buffer_0 = [0; 259];
|
62
|
1
|
let mut buffer_1 = [0; 259];
|
63
|
|
let freelist;
|
64
|
|
let last_flush;
|
65
|
1
|
page.seek(offset)?;
|
66
|
1
|
page.read_exact(&mut buffer_0)?;
|
67
|
1
|
page.read_exact(&mut buffer_1)?;
|
68
|
1
|
let (flush_number, first) = double_buffer_check(&buffer_0, &buffer_1);
|
69
|
1
|
last_flush = flush_number;
|
70
|
1
|
if first {
|
71
|
1
|
freelist = Self::read_free_list(&buffer_0);
|
72
|
|
} else {
|
73
|
1
|
freelist = Self::read_free_list(&buffer_1);
|
74
|
|
}
|
75
|
1
|
Ok(FreeList::new(freelist, last_flush))
|
76
|
1
|
}
|
77
|
|
|
78
|
1
|
pub fn read_free_list(buffer: &[u8]) -> [u64; 32] {
|
79
|
1
|
let mut freelist = [0; 32];
|
80
|
1
|
for p in &mut freelist.iter_mut().enumerate() {
|
81
|
1
|
let pos = 8 * p.0;
|
82
|
1
|
(*p.1) = read_u64(&buffer[pos..pos + 8]);
|
83
|
|
}
|
84
|
|
freelist
|
85
|
1
|
}
|
86
|
|
|
87
|
1
|
pub fn write_free_list(list: &[u64]) -> [u8; 259] {
|
88
|
1
|
let mut buffer = [0; 259];
|
89
|
1
|
for (pos, page) in list.iter().enumerate() {
|
90
|
1
|
write_u64(&mut buffer[pos..pos + 8], *page);
|
91
|
|
}
|
92
|
|
buffer
|
93
|
1
|
}
|
94
|
|
|
95
|
1
|
pub fn write_list(&mut self, page: &mut Page, base_offset: u32) -> PRes<()> {
|
96
|
1
|
let mut buffer = Self::write_free_list(&self.list);
|
97
|
1
|
let (last_flush, offset) = prepare_buffer_flush(&mut buffer, self.last_flush);
|
98
|
1
|
self.last_flush = last_flush;
|
99
|
1
|
page.seek(base_offset + offset)?;
|
100
|
1
|
page.write_all(&buffer)?;
|
101
|
1
|
Ok(())
|
102
|
1
|
}
|
103
|
|
}
|
104
|
|
|
105
|
|
// TODO: Manage defragmentation by merging/splitting pages in the free list
|
106
|
|
pub struct Allocator {
|
107
|
|
disc: Box<dyn Device>,
|
108
|
|
free_list: Mutex<FreeList>,
|
109
|
|
cache: Mutex<Cache>,
|
110
|
|
page: u64,
|
111
|
|
}
|
112
|
|
|
113
|
|
impl Allocator {
|
114
|
1
|
pub fn new(dr: Box<dyn Device>, config: &Config, page: u64) -> PRes<Allocator> {
|
115
|
1
|
let mut pg = dr.load_page(page)?;
|
116
|
1
|
let freelist = FreeList::read(&mut pg, 0)?;
|
117
|
|
|
118
|
1
|
let cache_size = config.cache_size();
|
119
|
1
|
Ok(Allocator {
|
120
|
1
|
disc: dr,
|
121
|
1
|
free_list: Mutex::new(freelist),
|
122
|
1
|
cache: Mutex::new(Cache::new(cache_size)),
|
123
|
|
page,
|
124
|
0
|
})
|
125
|
1
|
}
|
126
|
|
|
127
|
1
|
pub fn init(dr: Box<dyn Device>, config: &Config) -> PRes<(u64, Allocator)> {
|
128
|
1
|
let mut page = dr.create_page(ALLOCATOR_PAGE_EXP)?;
|
129
|
1
|
let mut list = FreeList::new([0; 32], 0);
|
130
|
1
|
list.write_list(&mut page, 0)?;
|
131
|
1
|
dr.flush_page(&page)?;
|
132
|
1
|
let allocate_page = page.get_index();
|
133
|
1
|
Ok((allocate_page, Allocator::new(dr, config, allocate_page)?))
|
134
|
1
|
}
|
135
|
|
|
136
|
1
|
pub fn load_page_not_free(&self, page: u64) -> PRes<Option<ReadPage>> {
|
137
|
1
|
let load = self.read_page_int(page)?;
|
138
|
1
|
if load.is_free()? {
|
139
|
0
|
Ok(None)
|
140
|
|
} else {
|
141
|
1
|
Ok(Some(load))
|
142
|
|
}
|
143
|
1
|
}
|
144
|
|
|
145
|
1
|
pub fn load_page(&self, page: u64) -> PRes<ReadPage> {
|
146
|
1
|
let load = self.read_page_int(page)?;
|
147
|
1
|
debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
|
148
|
1
|
Ok(load)
|
149
|
1
|
}
|
150
|
|
|
151
|
1
|
pub fn write_page(&self, page: u64) -> PRes<Page> {
|
152
|
1
|
let load = self.write_page_int(page)?;
|
153
|
1
|
debug_assert!(!load.is_free()?, "page {} should not be marked as free", page);
|
154
|
1
|
Ok(load)
|
155
|
1
|
}
|
156
|
|
|
157
|
1
|
fn read_page_int(&self, page: u64) -> PRes<ReadPage> {
|
158
|
1
|
{
|
159
|
1
|
let mut cache = self.cache.lock()?;
|
160
|
1
|
if let Some(pg) = cache.get(page) {
|
161
|
1
|
return Ok(pg);
|
162
|
|
}
|
163
|
1
|
}
|
164
|
1
|
let load = self.disc.load_page(page)?;
|
165
|
|
{
|
166
|
1
|
let mut cache = self.cache.lock()?;
|
167
|
1
|
cache.put(page, load.clone_read());
|
168
|
1
|
}
|
169
|
1
|
Ok(load)
|
170
|
1
|
}
|
171
|
|
|
172
|
1
|
fn write_page_int(&self, page: u64) -> PRes<Page> {
|
173
|
1
|
let cache_result;
|
174
|
|
{
|
175
|
1
|
let mut cache = self.cache.lock()?;
|
176
|
1
|
cache_result = cache.get(page);
|
177
|
1
|
}
|
178
|
1
|
if let Some(pg) = cache_result {
|
179
|
1
|
return Ok(pg.clone_write());
|
180
|
1
|
}
|
181
|
1
|
let load = self.disc.load_page(page)?;
|
182
|
|
{
|
183
|
1
|
let mut cache = self.cache.lock()?;
|
184
|
1
|
cache.put(page, load.clone_read());
|
185
|
1
|
}
|
186
|
1
|
Ok(load.clone_write())
|
187
|
1
|
}
|
188
|
|
|
189
|
1
|
pub fn allocate(&self, exp: u8) -> PRes<Page> {
|
190
|
1
|
{
|
191
|
1
|
let mut fl = self.free_list.lock()?;
|
192
|
1
|
let page = fl.list[exp as usize];
|
193
|
1
|
if page != 0 as u64 {
|
194
|
1
|
let next = self.disc.mark_allocated(page)?;
|
195
|
1
|
fl.list[exp as usize] = next;
|
196
|
1
|
let mut pg = self.write_page_int(page)?;
|
197
|
1
|
pg.reset()?;
|
198
|
1
|
return Ok(pg);
|
199
|
1
|
}
|
200
|
1
|
}
|
201
|
1
|
let page = self.disc.create_page(exp)?;
|
202
|
1
|
Ok(page)
|
203
|
1
|
}
|
204
|
|
|
205
|
1
|
pub fn flush_page(&self, page: Page) -> PRes<()> {
|
206
|
1
|
self.disc.flush_page(&page)?;
|
207
|
|
{
|
208
|
1
|
let mut cache = self.cache.lock()?;
|
209
|
1
|
cache.put(page.get_index(), page.make_read());
|
210
|
1
|
}
|
211
|
1
|
Ok(())
|
212
|
1
|
}
|
213
|
|
|
214
|
1
|
pub fn remove_from_free(&self, page: u64, exp: u8) -> PRes<()> {
|
215
|
|
//TODO: this at the moment may leak free pages on recover after crash
|
216
|
1
|
let mut fl = self.free_list.lock()?;
|
217
|
1
|
if fl.list[exp as usize] == page {
|
218
|
0
|
fl.list[exp as usize] = 0;
|
219
|
|
} else {
|
220
|
1
|
let mut p = fl.list[exp as usize];
|
221
|
1
|
while p != 0 as u64 {
|
222
|
1
|
let mut pg = self.write_page_int(p)?;
|
223
|
1
|
p = pg.get_next_free()?;
|
224
|
1
|
if p == page {
|
225
|
1
|
pg.set_free(false)?;
|
226
|
1
|
pg.set_next_free(0)?;
|
227
|
1
|
self.flush_page(pg)?;
|
228
|
1
|
break;
|
229
|
|
}
|
230
|
1
|
}
|
231
|
|
}
|
232
|
1
|
Ok(())
|
233
|
1
|
}
|
234
|
|
|
235
|
|
/// Recover free do not not have debug asserts for already freed pages
|
236
|
1
|
pub fn recover_free(&self, page: u64) -> PRes<()> {
|
237
|
1
|
if !self.disc.load_page(page)?.is_free()? {
|
238
|
1
|
self.free(page)?;
|
239
|
|
}
|
240
|
1
|
Ok(())
|
241
|
1
|
}
|
242
|
1
|
pub fn trim_free_at_end(&self) -> PRes<()> {
|
243
|
1
|
let mut fl = self.free_list.lock()?;
|
244
|
1
|
self.disc.trim_end_pages(&mut fl.list)?;
|
245
|
1
|
Ok(())
|
246
|
1
|
}
|
247
|
|
|
248
|
1
|
pub fn free(&self, page: u64) -> PRes<()> {
|
249
|
1
|
self.cache.lock()?.remove(page);
|
250
|
1
|
let mut fl = self.free_list.lock()?;
|
251
|
1
|
self.disc.trim_or_free_page(page, &mut fl.list)?;
|
252
|
1
|
Ok(())
|
253
|
1
|
}
|
254
|
|
|
255
|
1
|
pub fn flush_free_list(&self) -> PRes<()> {
|
256
|
1
|
let mut lock = self.free_list.lock()?;
|
257
|
1
|
let mut pag = self.disc.load_page(self.page)?.clone_write();
|
258
|
1
|
lock.write_list(&mut pag, 0)?;
|
259
|
1
|
self.disc.flush_page(&pag)?;
|
260
|
|
// I do not do the disk sync here because is every time done by the caller.
|
261
|
1
|
Ok(())
|
262
|
1
|
}
|
263
|
|
|
264
|
1
|
pub fn disc(&self) -> &dyn Device {
|
265
|
1
|
&*self.disc
|
266
|
1
|
}
|
267
|
|
|
268
|
1
|
pub fn release(self) -> Box<dyn Device> {
|
269
|
1
|
self.disc
|
270
|
1
|
}
|
271
|
|
}
|
272
|
|
|
273
|
|
impl UpdateList for [u64; 32] {
|
274
|
1
|
fn update(&mut self, size: u8, page: u64) -> PRes<u64> {
|
275
|
1
|
let old = self[size as usize];
|
276
|
1
|
self[size as usize] = page;
|
277
|
1
|
debug_assert!(old != page, "freeing: {} already free: {} ", page, old);
|
278
|
1
|
Ok(old)
|
279
|
1
|
}
|
280
|
|
|
281
|
0
|
fn remove(&mut self, size: u8, page: u64, next: u64) -> PRes<()> {
|
282
|
0
|
let old = self[size as usize];
|
283
|
0
|
self[size as usize] = next;
|
284
|
0
|
debug_assert!(
|
285
|
0
|
old == page,
|
286
|
0
|
"trimmed page not in top list expected:{} current:{} ",
|
287
|
|
page,
|
288
|
|
old
|
289
|
|
);
|
290
|
0
|
Ok(())
|
291
|
0
|
}
|
292
|
|
}
|
293
|
|
|
294
|
|
#[cfg(test)]
|
295
|
|
mod tests {
|
296
|
|
use super::{Allocator, Cache};
|
297
|
|
use crate::{
|
298
|
|
config::Config,
|
299
|
|
discref::{DiscRef, PageOps, ReadPage},
|
300
|
|
};
|
301
|
|
use std::sync::Arc;
|
302
|
|
use tempfile::Builder;
|
303
|
|
|
304
|
|
#[test]
|
305
|
1
|
fn test_reuse_freed_page() {
|
306
|
1
|
let file = Builder::new()
|
307
|
|
.prefix("all_reuse_test")
|
308
|
|
.suffix(".persy")
|
309
|
|
.tempfile()
|
310
|
|
.unwrap()
|
311
|
|
.reopen()
|
312
|
1
|
.unwrap();
|
313
|
1
|
let disc = Box::new(DiscRef::new(file).unwrap());
|
314
|
1
|
let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
|
315
|
|
// This is needed to avoid the 0 page
|
316
|
1
|
allocator.allocate(2).unwrap();
|
317
|
1
|
let first = allocator.allocate(10).unwrap().get_index();
|
318
|
1
|
let second = allocator.allocate(10).unwrap().get_index();
|
319
|
1
|
let third = allocator.allocate(11).unwrap().get_index();
|
320
|
1
|
let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
|
321
|
|
|
322
|
1
|
allocator.free(first).unwrap();
|
323
|
1
|
allocator.free(second).unwrap();
|
324
|
1
|
allocator.free(third).unwrap();
|
325
|
|
|
326
|
1
|
let val = allocator.allocate(10).unwrap().get_index();
|
327
|
1
|
assert_eq!(val, second);
|
328
|
1
|
let val = allocator.allocate(10).unwrap().get_index();
|
329
|
1
|
assert_eq!(val, first);
|
330
|
1
|
let val = allocator.allocate(10).unwrap().get_index();
|
331
|
1
|
assert!(val != first);
|
332
|
1
|
assert!(val != second);
|
333
|
1
|
let val = allocator.allocate(11).unwrap().get_index();
|
334
|
1
|
assert_eq!(val, third);
|
335
|
1
|
let val = allocator.allocate(11).unwrap().get_index();
|
336
|
1
|
assert!(val != third);
|
337
|
1
|
}
|
338
|
|
|
339
|
|
#[test]
|
340
|
1
|
fn test_remove_freed_page() {
|
341
|
1
|
let file = Builder::new()
|
342
|
|
.prefix("remove_free_test")
|
343
|
|
.suffix(".persy")
|
344
|
|
.tempfile()
|
345
|
|
.unwrap()
|
346
|
|
.reopen()
|
347
|
1
|
.unwrap();
|
348
|
1
|
let disc = Box::new(DiscRef::new(file).unwrap());
|
349
|
1
|
let (_, allocator) = Allocator::init(disc, &Config::new()).unwrap();
|
350
|
|
// This is needed to avoid the 0 page
|
351
|
1
|
allocator.allocate(2).unwrap();
|
352
|
1
|
let first = allocator.allocate(10).unwrap().get_index();
|
353
|
1
|
let second = allocator.allocate(10).unwrap().get_index();
|
354
|
1
|
let third = allocator.allocate(10).unwrap().get_index();
|
355
|
1
|
let _forth_to_avoid_trim = allocator.allocate(11).unwrap();
|
356
|
1
|
println!("{} {} {}", first, second, third);
|
357
|
1
|
allocator.free(first).unwrap();
|
358
|
1
|
allocator.free(second).unwrap();
|
359
|
1
|
allocator.free(third).unwrap();
|
360
|
1
|
allocator.remove_from_free(second, 10).unwrap();
|
361
|
1
|
let val = allocator.allocate(10).unwrap().get_index();
|
362
|
1
|
assert_eq!(val, third);
|
363
|
1
|
let val = allocator.allocate(10).unwrap().get_index();
|
364
|
1
|
assert!(val != first);
|
365
|
1
|
assert!(val != second);
|
366
|
1
|
assert!(val != third);
|
367
|
1
|
}
|
368
|
|
|
369
|
|
#[test]
|
370
|
1
|
fn test_cache_limit_evict() {
|
371
|
1
|
let mut cache = Cache::new(1050 as u64);
|
372
|
1
|
cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
|
373
|
1
|
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
|
374
|
1
|
cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
|
375
|
1
|
assert!(cache.size < 1050);
|
376
|
1
|
assert_eq!(cache.cache.len(), 2);
|
377
|
1
|
let ten = 10 as u64;
|
378
|
1
|
match cache.get(ten) {
|
379
|
1
|
Some(_) => assert!(false),
|
380
|
|
None => assert!(true),
|
381
|
1
|
}
|
382
|
1
|
let ten = 20 as u64;
|
383
|
1
|
match cache.get(ten) {
|
384
|
1
|
Some(_) => assert!(true),
|
385
|
0
|
None => assert!(false),
|
386
|
1
|
}
|
387
|
1
|
let ten = 30 as u64;
|
388
|
1
|
match cache.get(ten) {
|
389
|
1
|
Some(_) => assert!(true),
|
390
|
0
|
None => assert!(false),
|
391
|
|
}
|
392
|
1
|
}
|
393
|
|
|
394
|
|
#[test]
|
395
|
1
|
fn test_cache_limit_refresh_evict() {
|
396
|
1
|
let mut cache = Cache::new(1050 as u64);
|
397
|
1
|
cache.put(10, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
|
398
|
1
|
cache.put(20, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
|
399
|
1
|
let ten = 10 as u64;
|
400
|
1
|
match cache.get(ten) {
|
401
|
1
|
Some(_) => assert!(true),
|
402
|
0
|
None => assert!(false),
|
403
|
1
|
}
|
404
|
|
|
405
|
1
|
cache.put(30, ReadPage::new(Arc::new(Vec::new()), 0, 10, 9));
|
406
|
1
|
assert!(cache.size < 1050);
|
407
|
1
|
assert_eq!(cache.cache.len(), 2);
|
408
|
1
|
let ten = 10 as u64;
|
409
|
1
|
match cache.get(ten) {
|
410
|
1
|
Some(_) => assert!(true),
|
411
|
0
|
None => assert!(false),
|
412
|
1
|
}
|
413
|
1
|
let ten = 20 as u64;
|
414
|
1
|
match cache.get(ten) {
|
415
|
1
|
Some(_) => assert!(false),
|
416
|
|
None => assert!(true),
|
417
|
1
|
}
|
418
|
1
|
let ten = 30 as u64;
|
419
|
1
|
match cache.get(ten) {
|
420
|
1
|
Some(_) => assert!(true),
|
421
|
0
|
None => assert!(false),
|
422
|
|
}
|
423
|
1
|
}
|
424
|
|
}
|