1
|
|
use crate::{
|
2
|
|
address::{
|
3
|
|
Address, ADDRESS_ENTRY_SIZE, ADDRESS_PAGE_ENTRY_COUNT, ADDRESS_PAGE_EXP, ADDRESS_PAGE_SIZE, FLAG_DELETED,
|
4
|
|
FLAG_EXISTS, SEGMENT_DATA_OFFSET, SEGMENT_HASH_OFFSET, SEGMENT_PAGE_DELETE_COUNT_OFFSET,
|
5
|
|
},
|
6
|
|
allocator::Allocator,
|
7
|
|
discref::{Page, PageOps},
|
8
|
|
error::{PRes, PersyError},
|
9
|
|
flush_checksum::{double_buffer_check, prepare_buffer_flush},
|
10
|
|
id::{PersyId, RecRef},
|
11
|
|
io::{read_u64, write_u64, InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
|
12
|
|
persy::exp_from_content_size,
|
13
|
|
};
|
14
|
|
use std::{
|
15
|
|
collections::{hash_map::DefaultHasher, HashMap},
|
16
|
|
hash::{Hash, Hasher},
|
17
|
|
str,
|
18
|
|
sync::Arc,
|
19
|
|
vec,
|
20
|
|
};
|
21
|
|
|
22
|
|
pub struct AllocatedSegmentPage {
|
23
|
|
pub new_page: u64,
|
24
|
|
pub previus_page: u64,
|
25
|
|
}
|
26
|
|
|
27
|
|
pub struct Segment {
|
28
|
|
pub first_page: u64,
|
29
|
|
persistent_page: u64,
|
30
|
|
persistent_pos: u32,
|
31
|
|
pub alloc_page: u64,
|
32
|
|
pub alloc_pos: u32,
|
33
|
|
segment_id: u32,
|
34
|
|
name: String,
|
35
|
|
}
|
36
|
|
|
37
|
|
impl Segment {
|
38
|
1
|
pub fn new(
|
39
|
|
first_page: u64,
|
40
|
|
persistent_page: u64,
|
41
|
|
persistent_pos: u32,
|
42
|
|
alloc_page: u64,
|
43
|
|
alloc_pos: u32,
|
44
|
|
segment_id: u32,
|
45
|
|
name: &str,
|
46
|
|
) -> Segment {
|
47
|
1
|
Segment {
|
48
|
|
first_page,
|
49
|
|
persistent_page,
|
50
|
|
persistent_pos,
|
51
|
|
alloc_page,
|
52
|
|
alloc_pos,
|
53
|
|
segment_id,
|
54
|
1
|
name: name.to_string(),
|
55
|
|
}
|
56
|
1
|
}
|
57
|
|
|
58
|
1
|
pub fn allocate_internal(&mut self, allocator: &Allocator) -> PRes<(RecRef, Option<AllocatedSegmentPage>)> {
|
59
|
1
|
let page = self.alloc_page;
|
60
|
1
|
let pos = self.alloc_pos;
|
61
|
1
|
let new_pos = pos + ADDRESS_ENTRY_SIZE;
|
62
|
1
|
Ok(if new_pos > ADDRESS_PAGE_SIZE {
|
63
|
1
|
let mut pg = allocator.write_page(page)?;
|
64
|
|
// It can happen that the last page has all entry deleted, we never free the last
|
65
|
|
// page, so may be there a last page to free, but with no space, in that case is better
|
66
|
|
// reuse the page directly and avoid to allocate another page and push the last page in
|
67
|
|
// the back leaking it.
|
68
|
1
|
if pg.empty()? {
|
69
|
0
|
let prev = pg.get_prev()?;
|
70
|
0
|
pg.reset()?;
|
71
|
0
|
pg.set_next(0)?;
|
72
|
0
|
pg.set_prev(prev)?;
|
73
|
0
|
pg.set_segment_id(self.segment_id)?;
|
74
|
0
|
allocator.flush_page(pg)?;
|
75
|
0
|
self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
|
76
|
0
|
(RecRef::new(self.alloc_page, SEGMENT_DATA_OFFSET), None)
|
77
|
|
} else {
|
78
|
1
|
let mut new_pg = allocator.allocate(ADDRESS_PAGE_EXP)?;
|
79
|
1
|
let new_page = new_pg.get_index();
|
80
|
1
|
pg.set_next(new_page)?;
|
81
|
1
|
allocator.flush_page(pg)?;
|
82
|
1
|
new_pg.set_next(0)?;
|
83
|
1
|
new_pg.set_prev(page)?;
|
84
|
1
|
new_pg.set_segment_id(self.segment_id)?;
|
85
|
1
|
allocator.flush_page(new_pg)?;
|
86
|
1
|
self.alloc_page = new_page;
|
87
|
1
|
self.alloc_pos = SEGMENT_DATA_OFFSET + ADDRESS_ENTRY_SIZE;
|
88
|
1
|
(
|
89
|
1
|
RecRef::new(self.alloc_page, SEGMENT_DATA_OFFSET),
|
90
|
1
|
Some(AllocatedSegmentPage {
|
91
|
|
new_page,
|
92
|
|
previus_page: page,
|
93
|
|
}),
|
94
|
|
)
|
95
|
1
|
}
|
96
|
1
|
} else {
|
97
|
1
|
self.alloc_pos = new_pos;
|
98
|
1
|
(RecRef::new(page, pos), None)
|
99
|
|
})
|
100
|
1
|
}
|
101
|
|
|
102
|
1
|
pub fn collect_segment_pages(&self, allocator: &Allocator) -> PRes<Vec<u64>> {
|
103
|
1
|
let mut pages = Vec::new();
|
104
|
1
|
let mut page = self.first_page;
|
105
|
1
|
let last = self.persistent_page;
|
106
|
0
|
loop {
|
107
|
1
|
let mut pag = allocator.load_page(page)?;
|
108
|
1
|
let next = pag.read_u64();
|
109
|
1
|
let mut pos = SEGMENT_DATA_OFFSET;
|
110
|
1
|
loop {
|
111
|
1
|
pag.seek(pos)?;
|
112
|
1
|
let data_page = pag.read_u64();
|
113
|
1
|
let flag = pag.read_u8();
|
114
|
1
|
if entry_exits(flag) {
|
115
|
1
|
pages.push(data_page);
|
116
|
|
}
|
117
|
1
|
pos += ADDRESS_ENTRY_SIZE;
|
118
|
1
|
if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
|
119
|
|
break;
|
120
|
|
}
|
121
|
|
}
|
122
|
1
|
pages.push(page);
|
123
|
1
|
if page == last {
|
124
|
|
break;
|
125
|
|
}
|
126
|
0
|
page = next;
|
127
|
0
|
}
|
128
|
1
|
Ok(pages)
|
129
|
1
|
}
|
130
|
|
}
|
131
|
|
|
132
|
|
pub(crate) trait SegmentPageRead: PageOps {
|
133
|
|
fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>>;
|
134
|
|
fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)>;
|
135
|
|
fn segment_scan_all_entries(&mut self) -> PRes<(u64, Vec<(u32, bool)>)>;
|
136
|
|
fn segment_first_available_pos(&mut self) -> PRes<u32>;
|
137
|
|
fn get_next(&mut self) -> PRes<u64>;
|
138
|
|
fn get_prev(&mut self) -> PRes<u64>;
|
139
|
|
fn empty(&mut self) -> PRes<bool>;
|
140
|
|
}
|
141
|
|
pub(crate) trait SegmentPage: SegmentPageRead {
|
142
|
|
fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
|
143
|
|
fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()>;
|
144
|
|
fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool>;
|
145
|
|
fn set_segment_id(&mut self, id: u32) -> PRes<()>;
|
146
|
|
fn set_next(&mut self, next: u64) -> PRes<()>;
|
147
|
|
fn set_prev(&mut self, prev: u64) -> PRes<()>;
|
148
|
|
fn recalc_count(&mut self) -> PRes<()>;
|
149
|
|
}
|
150
|
|
|
151
|
1
|
fn entry_exits(flags: u8) -> bool {
|
152
|
1
|
flags & FLAG_EXISTS == 1 && flags & FLAG_DELETED == 0
|
153
|
1
|
}
|
154
|
|
|
155
|
|
impl<T: InfallibleRead + PageOps> SegmentPageRead for T {
|
156
|
1
|
fn segment_read_entry(&mut self, segment_id: u32, pos: u32) -> PRes<Option<(u64, u16)>> {
|
157
|
1
|
self.seek(SEGMENT_HASH_OFFSET)?;
|
158
|
1
|
let persistent_id = self.read_u32();
|
159
|
1
|
if persistent_id != segment_id {
|
160
|
0
|
return Ok(None);
|
161
|
|
}
|
162
|
1
|
self.seek(pos)?;
|
163
|
1
|
let record = self.read_u64();
|
164
|
1
|
let flag = self.read_u8();
|
165
|
1
|
let version = self.read_u16();
|
166
|
1
|
Ok(if !entry_exits(flag) || record == 0 {
|
167
|
1
|
None
|
168
|
|
} else {
|
169
|
1
|
Some((record, version))
|
170
|
|
})
|
171
|
1
|
}
|
172
|
|
|
173
|
1
|
fn segment_scan_all_entries(&mut self) -> PRes<(u64, Vec<(u32, bool)>)> {
|
174
|
1
|
let next_page = self.read_u64();
|
175
|
1
|
let mut pos = SEGMENT_DATA_OFFSET;
|
176
|
1
|
let mut recs = Vec::new();
|
177
|
1
|
loop {
|
178
|
1
|
self.seek(pos + 8)?;
|
179
|
1
|
let flag = self.read_u8();
|
180
|
1
|
recs.push((pos, flag & FLAG_EXISTS == 1));
|
181
|
1
|
pos += ADDRESS_ENTRY_SIZE;
|
182
|
1
|
if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
|
183
|
|
break;
|
184
|
|
}
|
185
|
|
}
|
186
|
1
|
Ok((next_page, recs))
|
187
|
1
|
}
|
188
|
|
|
189
|
1
|
fn segment_scan_entries(&mut self) -> PRes<(u64, Vec<u32>)> {
|
190
|
1
|
let next_page = self.read_u64();
|
191
|
1
|
let mut pos = SEGMENT_DATA_OFFSET;
|
192
|
1
|
let mut recs = Vec::new();
|
193
|
1
|
loop {
|
194
|
1
|
self.seek(pos + 8)?;
|
195
|
1
|
let flag = self.read_u8();
|
196
|
1
|
if entry_exits(flag) {
|
197
|
1
|
recs.push(pos);
|
198
|
|
}
|
199
|
1
|
pos += ADDRESS_ENTRY_SIZE;
|
200
|
1
|
if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
|
201
|
|
break;
|
202
|
|
}
|
203
|
|
}
|
204
|
1
|
Ok((next_page, recs))
|
205
|
1
|
}
|
206
|
|
|
207
|
1
|
fn segment_first_available_pos(&mut self) -> PRes<u32> {
|
208
|
1
|
let elements = (ADDRESS_PAGE_SIZE - SEGMENT_DATA_OFFSET) / ADDRESS_ENTRY_SIZE;
|
209
|
1
|
let mut pos = SEGMENT_DATA_OFFSET + (elements - 1) * ADDRESS_ENTRY_SIZE;
|
210
|
1
|
loop {
|
211
|
1
|
self.seek(pos + 8)?;
|
212
|
1
|
let flag = self.read_u8();
|
213
|
1
|
if entry_exits(flag) {
|
214
|
1
|
pos += ADDRESS_ENTRY_SIZE;
|
215
|
|
break;
|
216
|
|
}
|
217
|
1
|
if pos == SEGMENT_DATA_OFFSET {
|
218
|
1
|
break;
|
219
|
|
}
|
220
|
1
|
pos -= ADDRESS_ENTRY_SIZE;
|
221
|
1
|
debug_assert!(pos >= SEGMENT_DATA_OFFSET);
|
222
|
|
}
|
223
|
1
|
Ok(pos)
|
224
|
1
|
}
|
225
|
|
|
226
|
1
|
fn get_next(&mut self) -> PRes<u64> {
|
227
|
1
|
self.seek(0)?;
|
228
|
1
|
Ok(self.read_u64())
|
229
|
1
|
}
|
230
|
|
|
231
|
1
|
fn get_prev(&mut self) -> PRes<u64> {
|
232
|
1
|
self.seek(8)?;
|
233
|
1
|
Ok(self.read_u64())
|
234
|
1
|
}
|
235
|
1
|
fn empty(&mut self) -> PRes<bool> {
|
236
|
1
|
self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET)?;
|
237
|
1
|
Ok(self.read_u16() as u32 == ADDRESS_PAGE_ENTRY_COUNT)
|
238
|
1
|
}
|
239
|
|
}
|
240
|
|
|
241
|
1
|
fn inc_version(mut version: u16) -> u16 {
|
242
|
1
|
version += 1;
|
243
|
1
|
if version == 0 {
|
244
|
0
|
1
|
245
|
|
} else {
|
246
|
1
|
version
|
247
|
|
}
|
248
|
1
|
}
|
249
|
|
|
250
|
|
impl<T: InfallibleRead + InfallibleWrite + PageOps> SegmentPage for T {
|
251
|
1
|
fn segment_insert_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
|
252
|
1
|
debug_assert!(pos >= SEGMENT_DATA_OFFSET, "invalid page position {}", pos);
|
253
|
1
|
self.seek(SEGMENT_HASH_OFFSET)?;
|
254
|
1
|
let persistent_id = self.read_u32();
|
255
|
1
|
if persistent_id != segment_id {
|
256
|
0
|
return Err(PersyError::SegmentNotFound);
|
257
|
|
}
|
258
|
|
// TODO: In case of restore this may get a wrong value, so on restore should be
|
259
|
|
// re-calculated from the persistent flags.
|
260
|
1
|
self.seek(pos)?;
|
261
|
1
|
self.write_u64(record_page);
|
262
|
1
|
self.write_u8(FLAG_EXISTS);
|
263
|
1
|
self.write_u16(1);
|
264
|
1
|
Ok(())
|
265
|
1
|
}
|
266
|
1
|
fn segment_update_entry(&mut self, segment_id: u32, pos: u32, record_page: u64) -> PRes<()> {
|
267
|
1
|
debug_assert!(pos >= SEGMENT_DATA_OFFSET);
|
268
|
1
|
self.seek(SEGMENT_HASH_OFFSET)?;
|
269
|
1
|
let persistent_id = self.read_u32();
|
270
|
1
|
if persistent_id != segment_id {
|
271
|
0
|
return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
|
272
|
|
}
|
273
|
1
|
self.seek(pos + 9)?;
|
274
|
1
|
let version = self.read_u16();
|
275
|
1
|
self.seek(pos)?;
|
276
|
1
|
self.write_u64(record_page);
|
277
|
1
|
self.seek(pos + 9)?;
|
278
|
1
|
self.write_u16(inc_version(version));
|
279
|
1
|
Ok(())
|
280
|
1
|
}
|
281
|
1
|
fn segment_delete_entry(&mut self, segment_id: u32, pos: u32) -> PRes<bool> {
|
282
|
1
|
debug_assert!(pos >= SEGMENT_DATA_OFFSET);
|
283
|
1
|
self.seek(SEGMENT_HASH_OFFSET)?;
|
284
|
1
|
let persistent_id = self.read_u32();
|
285
|
1
|
if persistent_id != segment_id {
|
286
|
0
|
return Err(PersyError::RecordNotFound(PersyId(RecRef::new(self.get_index(), pos))));
|
287
|
|
}
|
288
|
1
|
self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET)?;
|
289
|
1
|
let count = self.read_u16() + 1;
|
290
|
1
|
self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET)?;
|
291
|
1
|
self.write_u16(count);
|
292
|
1
|
self.seek(pos + 8)?;
|
293
|
1
|
let flag = self.read_u8();
|
294
|
1
|
self.seek(pos + 8)?;
|
295
|
1
|
self.write_u8(flag | FLAG_DELETED);
|
296
|
1
|
Ok(count as u32 == ADDRESS_PAGE_ENTRY_COUNT)
|
297
|
1
|
}
|
298
|
|
|
299
|
1
|
fn set_segment_id(&mut self, id: u32) -> PRes<()> {
|
300
|
1
|
self.seek(SEGMENT_HASH_OFFSET)?;
|
301
|
1
|
self.write_u32(id);
|
302
|
1
|
Ok(())
|
303
|
1
|
}
|
304
|
|
|
305
|
1
|
fn set_next(&mut self, next: u64) -> PRes<()> {
|
306
|
1
|
self.seek(0)?;
|
307
|
1
|
self.write_u64(next);
|
308
|
1
|
Ok(())
|
309
|
1
|
}
|
310
|
|
|
311
|
1
|
fn set_prev(&mut self, prev: u64) -> PRes<()> {
|
312
|
1
|
self.seek(8)?;
|
313
|
1
|
self.write_u64(prev);
|
314
|
1
|
Ok(())
|
315
|
1
|
}
|
316
|
|
|
317
|
1
|
fn recalc_count(&mut self) -> PRes<()> {
|
318
|
1
|
let mut pos = SEGMENT_DATA_OFFSET;
|
319
|
1
|
let mut count = 0;
|
320
|
1
|
loop {
|
321
|
1
|
self.seek(pos + 8)?;
|
322
|
1
|
let flag = self.read_u8();
|
323
|
1
|
if flag & FLAG_DELETED == FLAG_DELETED {
|
324
|
1
|
count += 1;
|
325
|
|
}
|
326
|
1
|
pos += ADDRESS_ENTRY_SIZE;
|
327
|
1
|
if pos > ADDRESS_PAGE_SIZE - ADDRESS_ENTRY_SIZE {
|
328
|
|
break;
|
329
|
|
}
|
330
|
|
}
|
331
|
1
|
self.seek(SEGMENT_PAGE_DELETE_COUNT_OFFSET)?;
|
332
|
1
|
self.write_u8(count as u8);
|
333
|
1
|
Ok(())
|
334
|
1
|
}
|
335
|
|
}
|
336
|
|
|
337
|
|
pub struct Segments {
|
338
|
|
pub root_page: u64,
|
339
|
|
content_page: u64,
|
340
|
|
other_page: u64,
|
341
|
|
last_flush: u8,
|
342
|
|
pub segments: HashMap<u32, Segment>,
|
343
|
|
pub segments_id: HashMap<String, u32>,
|
344
|
|
pub temp_segments: HashMap<u32, Segment>,
|
345
|
|
pub temp_segments_id: HashMap<String, u32>,
|
346
|
|
}
|
347
|
|
|
348
|
1
|
pub fn segment_hash(segment: &str) -> u32 {
|
349
|
|
let mut val: u32;
|
350
|
1
|
let hasher = &mut DefaultHasher::new();
|
351
|
1
|
segment.hash(hasher);
|
352
|
1
|
val = hasher.finish() as u32;
|
353
|
1
|
val <<= 16;
|
354
|
1
|
val |= u32::from(rand::random::<u16>());
|
355
|
|
val
|
356
|
1
|
}
|
357
|
|
|
358
|
|
impl Segments {
|
359
|
1
|
pub fn new(root_page: u64, allocator: &Arc<Allocator>) -> PRes<Segments> {
|
360
|
1
|
let mut buffer_0 = [0; 19];
|
361
|
1
|
let mut buffer_1 = [0; 19];
|
362
|
|
let page_id;
|
363
|
|
let other_page_id;
|
364
|
|
let last_flush;
|
365
|
|
{
|
366
|
1
|
let mut root = allocator.load_page(root_page)?;
|
367
|
1
|
root.read_exact(&mut buffer_0);
|
368
|
1
|
root.read_exact(&mut buffer_1);
|
369
|
1
|
let (flush, first) = double_buffer_check(&buffer_0, &buffer_1);
|
370
|
1
|
last_flush = flush;
|
371
|
1
|
if first {
|
372
|
1
|
page_id = read_u64(&buffer_0[0..8]);
|
373
|
1
|
other_page_id = read_u64(&buffer_0[8..16]);
|
374
|
|
} else {
|
375
|
1
|
page_id = read_u64(&buffer_1[0..8]);
|
376
|
1
|
other_page_id = read_u64(&buffer_1[8..16]);
|
377
|
|
}
|
378
|
1
|
}
|
379
|
1
|
let mut segments = HashMap::new();
|
380
|
1
|
let mut segments_id = HashMap::new();
|
381
|
1
|
if page_id != 0 {
|
382
|
1
|
let mut page = allocator.load_page(page_id)?;
|
383
|
1
|
loop {
|
384
|
1
|
let flag = page.read_u8();
|
385
|
1
|
if flag == 1 {
|
386
|
1
|
let first_page = page.read_u64();
|
387
|
1
|
let persistent_page = page.read_u64();
|
388
|
1
|
let persistent_pos = page.read_u32();
|
389
|
1
|
let pers_hash = page.read_u32();
|
390
|
1
|
let name_size = page.read_u16() as usize;
|
391
|
|
|
392
|
1
|
let mut slice: Vec<u8> = vec![0; name_size];
|
393
|
1
|
page.read_exact(&mut slice);
|
394
|
1
|
let name: String = str::from_utf8(&slice[0..name_size])?.into();
|
395
|
1
|
segments.insert(
|
396
|
|
pers_hash,
|
397
|
1
|
Segment::new(
|
398
|
|
first_page,
|
399
|
|
persistent_page,
|
400
|
|
persistent_pos,
|
401
|
|
persistent_page,
|
402
|
|
persistent_pos,
|
403
|
|
pers_hash,
|
404
|
1
|
&name,
|
405
|
|
),
|
406
|
1
|
);
|
407
|
1
|
segments_id.insert(name, pers_hash);
|
408
|
1
|
} else {
|
409
|
|
break;
|
410
|
|
}
|
411
|
|
}
|
412
|
1
|
}
|
413
|
1
|
Ok(Segments {
|
414
|
|
root_page,
|
415
|
1
|
content_page: page_id,
|
416
|
1
|
other_page: other_page_id,
|
417
|
|
last_flush,
|
418
|
1
|
segments,
|
419
|
1
|
segments_id,
|
420
|
1
|
temp_segments: HashMap::new(),
|
421
|
1
|
temp_segments_id: HashMap::new(),
|
422
|
0
|
})
|
423
|
1
|
}
|
424
|
|
|
425
|
1
|
pub fn init(mut root: Page, allocator: &Allocator) -> PRes<()> {
|
426
|
1
|
let mut buffer = [0; 19];
|
427
|
1
|
write_u64(&mut buffer[0..8], 0);
|
428
|
1
|
write_u64(&mut buffer[8..16], 0);
|
429
|
1
|
prepare_buffer_flush(&mut buffer, 0);
|
430
|
1
|
root.write_all(&buffer);
|
431
|
1
|
allocator.flush_page(root)?;
|
432
|
1
|
Ok(())
|
433
|
1
|
}
|
434
|
|
|
435
|
1
|
pub fn segment_id(&self, segment: &str) -> Option<u32> {
|
436
|
1
|
if let Some(id) = self.segments_id.get(segment) {
|
437
|
1
|
self.segments.get(id).map(|x| x.segment_id)
|
438
|
|
} else {
|
439
|
1
|
None
|
440
|
|
}
|
441
|
1
|
}
|
442
|
|
|
443
|
1
|
pub fn segment_by_id(&self, id: u32) -> Option<&Segment> {
|
444
|
1
|
self.segments.get(&id)
|
445
|
1
|
}
|
446
|
|
|
447
|
|
pub fn segment_name_by_id(&self, id: u32) -> Option<String> {
|
448
|
|
self.segments.get(&id).map(|s| s.name.clone())
|
449
|
|
}
|
450
|
|
|
451
|
1
|
pub fn segment_by_id_temp(&self, id: u32) -> Option<&Segment> {
|
452
|
1
|
self.temp_segments.get(&id)
|
453
|
1
|
}
|
454
|
|
|
455
|
1
|
pub fn has_segment(&self, segment: &str) -> bool {
|
456
|
1
|
self.segments_id.contains_key(segment)
|
457
|
1
|
}
|
458
|
|
|
459
|
1
|
pub fn create_temp_segment(&mut self, allocator: &Allocator, segment: &str) -> PRes<(u32, u64)> {
|
460
|
1
|
let mut allocated = allocator.allocate(ADDRESS_PAGE_EXP)?;
|
461
|
1
|
let allocated_id = allocated.get_index();
|
462
|
1
|
let segment_id = segment_hash(segment);
|
463
|
1
|
let seg = Segment::new(
|
464
|
|
allocated_id,
|
465
|
|
allocated_id,
|
466
|
|
SEGMENT_DATA_OFFSET,
|
467
|
|
allocated_id,
|
468
|
|
SEGMENT_DATA_OFFSET,
|
469
|
|
segment_id,
|
470
|
|
segment,
|
471
|
|
);
|
472
|
1
|
self.temp_segments.insert(segment_id, seg);
|
473
|
1
|
self.temp_segments_id.insert(segment.to_string(), segment_id);
|
474
|
1
|
allocated.write_u64(0);
|
475
|
1
|
allocated.write_u64(0);
|
476
|
1
|
allocated.write_u32(segment_id);
|
477
|
1
|
allocator.flush_page(allocated)?;
|
478
|
1
|
Ok((segment_id, allocated_id))
|
479
|
1
|
}
|
480
|
|
|
481
|
1
|
pub fn get_temp_segment_mut(&mut self, segment: u32) -> Option<&mut Segment> {
|
482
|
1
|
self.temp_segments.get_mut(&segment)
|
483
|
1
|
}
|
484
|
|
|
485
|
1
|
pub fn drop_temp_segment(&mut self, allocator: &Allocator, segment: u32) -> PRes<()> {
|
486
|
1
|
if let Some(segment) = self.temp_segments.remove(&segment) {
|
487
|
1
|
self.temp_segments_id.remove(&segment.name);
|
488
|
|
//TODO: to review this, may cause disc leaks in case of crash on rollback of segment
|
489
|
|
// creation
|
490
|
1
|
let pages = segment.collect_segment_pages(allocator)?;
|
491
|
1
|
for page in pages {
|
492
|
1
|
allocator.free(page)?;
|
493
|
1
|
}
|
494
|
1
|
}
|
495
|
1
|
Ok(())
|
496
|
1
|
}
|
497
|
|
|
498
|
1
|
pub fn exists_real_or_temp(&self, segment: u32) -> bool {
|
499
|
1
|
self.segments.contains_key(&segment) || self.temp_segments.contains_key(&segment)
|
500
|
1
|
}
|
501
|
|
|
502
|
1
|
pub fn create_segment(&mut self, segment: u32, first_page: u64) -> PRes<()> {
|
503
|
1
|
if let Some(mut s) = self.temp_segments.remove(&segment) {
|
504
|
|
// This is needed mainly for recover
|
505
|
1
|
s.first_page = first_page;
|
506
|
1
|
self.temp_segments_id.remove(&s.name);
|
507
|
1
|
self.segments_id.insert(s.name.clone(), s.segment_id);
|
508
|
1
|
self.segments.insert(s.segment_id, s);
|
509
|
1
|
}
|
510
|
1
|
Ok(())
|
511
|
1
|
}
|
512
|
|
|
513
|
1
|
pub fn drop_segment(&mut self, segment: &str) -> PRes<()> {
|
514
|
1
|
if let Some(seg) = self.segments_id.remove(segment) {
|
515
|
1
|
self.segments.remove(&seg);
|
516
|
|
}
|
517
|
|
|
518
|
1
|
Ok(())
|
519
|
1
|
}
|
520
|
1
|
pub fn collect_segment_pages(&self, allocator: &Allocator, segment: u32) -> PRes<Vec<u64>> {
|
521
|
1
|
if let Some(seg) = self.segments.get(&segment) {
|
522
|
1
|
seg.collect_segment_pages(allocator)
|
523
|
|
} else {
|
524
|
1
|
Ok(Vec::new())
|
525
|
|
}
|
526
|
1
|
}
|
527
|
|
|
528
|
1
|
pub fn set_first_page(&mut self, segment: u32, first_page: u64, allocator: &Allocator) -> PRes<()> {
|
529
|
1
|
if let Some(seg) = self.segments.get_mut(&segment) {
|
530
|
1
|
seg.first_page = first_page;
|
531
|
|
}
|
532
|
1
|
self.flush_segments(allocator)?;
|
533
|
1
|
Ok(())
|
534
|
1
|
}
|
535
|
|
|
536
|
1
|
pub fn confirm_allocations(&mut self, segments: &[u32], allocator: &Allocator, recover: bool) -> PRes<()> {
|
537
|
1
|
for seg in segments {
|
538
|
|
// If happen that a segment is not found something went wrong and is better crash
|
539
|
1
|
let segment = if let Some(s) = self.segments.get_mut(seg) {
|
540
|
1
|
s
|
541
|
1
|
} else if let Some(s) = self.temp_segments.get_mut(seg) {
|
542
|
|
s
|
543
|
|
} else {
|
544
|
0
|
panic!("segment operation when segment do not exists");
|
545
|
|
};
|
546
|
1
|
segment.persistent_page = segment.alloc_page;
|
547
|
1
|
if recover {
|
548
|
|
// In case of recover recalculate the last available position
|
549
|
1
|
let mut page = allocator.load_page(segment.alloc_page)?;
|
550
|
1
|
segment.alloc_pos = page.segment_first_available_pos()?;
|
551
|
1
|
}
|
552
|
1
|
segment.persistent_pos = segment.alloc_pos;
|
553
|
|
}
|
554
|
1
|
Ok(())
|
555
|
1
|
}
|
556
|
|
|
557
|
1
|
pub fn flush_segments(&mut self, allocator: &Allocator) -> PRes<()> {
|
558
|
1
|
let mut buffer = Vec::<u8>::new();
|
559
|
1
|
for segment in self.segments.values() {
|
560
|
1
|
buffer.write_u8(1);
|
561
|
1
|
buffer.write_u64(segment.first_page);
|
562
|
1
|
buffer.write_u64(segment.persistent_page);
|
563
|
1
|
buffer.write_u32(segment.persistent_pos);
|
564
|
1
|
buffer.write_u32(segment.segment_id);
|
565
|
1
|
buffer.write_u16(segment.name.len() as u16);
|
566
|
1
|
buffer.write_all(segment.name.as_bytes());
|
567
|
|
}
|
568
|
1
|
buffer.write_u8(0);
|
569
|
1
|
let exp = exp_from_content_size(buffer.len() as u64);
|
570
|
1
|
let other_page_id = self.content_page;
|
571
|
1
|
let mut content_page_id = self.other_page;
|
572
|
|
{
|
573
|
1
|
let mut content_page = if content_page_id == 0 {
|
574
|
1
|
allocator.allocate(exp)?
|
575
|
0
|
} else {
|
576
|
1
|
let mut page = allocator.write_page(content_page_id)?;
|
577
|
1
|
if page.get_size_exp() != exp {
|
578
|
1
|
allocator.free(content_page_id)?;
|
579
|
1
|
page = allocator.allocate(exp)?;
|
580
|
|
}
|
581
|
1
|
page
|
582
|
0
|
};
|
583
|
1
|
content_page_id = content_page.get_index();
|
584
|
1
|
content_page.write_all(&buffer);
|
585
|
1
|
allocator.flush_page(content_page)?;
|
586
|
1
|
}
|
587
|
|
|
588
|
1
|
let mut root_buffer = [0; 19];
|
589
|
1
|
write_u64(&mut root_buffer[0..8], content_page_id);
|
590
|
1
|
write_u64(&mut root_buffer[8..16], other_page_id);
|
591
|
1
|
let (last_flush, offset) = prepare_buffer_flush(&mut root_buffer, self.last_flush);
|
592
|
1
|
self.last_flush = last_flush;
|
593
|
|
{
|
594
|
1
|
let mut root = allocator.write_page(self.root_page)?;
|
595
|
1
|
root.seek(offset)?;
|
596
|
1
|
root.write_all(&root_buffer);
|
597
|
1
|
allocator.flush_page(root)?;
|
598
|
1
|
}
|
599
|
1
|
self.content_page = content_page_id;
|
600
|
1
|
self.other_page = other_page_id;
|
601
|
1
|
Ok(())
|
602
|
1
|
}
|
603
|
|
|
604
|
1
|
pub fn list(&self) -> Vec<(String, u32)> {
|
605
|
1
|
self.segments_id.iter().map(|(n, id)| (n.clone(), *id)).collect()
|
606
|
1
|
}
|
607
|
1
|
pub fn snapshot_list(&self) -> Vec<(String, u32, u64)> {
|
608
|
1
|
self.segments
|
609
|
|
.iter()
|
610
|
1
|
.map(|(id, seg)| (seg.name.clone(), *id, seg.first_page))
|
611
|
|
.collect()
|
612
|
1
|
}
|
613
|
|
}
|
614
|
|
|
615
|
|
pub struct SegmentPageIterator {
|
616
|
|
cur_page: u64,
|
617
|
|
next_page: u64,
|
618
|
|
per_page_iterator: vec::IntoIter<u32>,
|
619
|
|
}
|
620
|
|
|
621
|
|
impl SegmentPageIterator {
|
622
|
1
|
pub fn new(first_page: u64) -> SegmentPageIterator {
|
623
|
1
|
SegmentPageIterator {
|
624
|
|
cur_page: first_page,
|
625
|
|
next_page: first_page,
|
626
|
1
|
per_page_iterator: Vec::new().into_iter(),
|
627
|
|
}
|
628
|
1
|
}
|
629
|
|
|
630
|
1
|
pub fn next(&mut self, address: &Address) -> Option<RecRef> {
|
631
|
|
// This loop is needed because some pages may be empty
|
632
|
1
|
loop {
|
633
|
1
|
let iter = self.per_page_iterator.next();
|
634
|
1
|
if iter.is_none() && self.next_page != 0 {
|
635
|
1
|
self.cur_page = self.next_page;
|
636
|
1
|
if let Ok((next_page, elements)) = address.scan_page(self.cur_page) {
|
637
|
1
|
self.next_page = next_page;
|
638
|
1
|
self.per_page_iterator = elements.into_iter();
|
639
|
1
|
continue;
|
640
|
|
}
|
641
|
1
|
}
|
642
|
1
|
break iter;
|
643
|
|
}
|
644
|
1
|
.map(|pos| RecRef::new(self.cur_page, pos))
|
645
|
1
|
}
|
646
|
|
}
|
647
|
|
|
648
|
|
pub struct SnapshotSegmentPageIterator {
|
649
|
|
cur_page: u64,
|
650
|
|
next_page: u64,
|
651
|
|
per_page_iterator: vec::IntoIter<(u32, bool)>,
|
652
|
|
}
|
653
|
|
|
654
|
|
impl SnapshotSegmentPageIterator {
|
655
|
1
|
pub fn new(first_page: u64) -> SnapshotSegmentPageIterator {
|
656
|
1
|
SnapshotSegmentPageIterator {
|
657
|
|
cur_page: first_page,
|
658
|
|
next_page: first_page,
|
659
|
1
|
per_page_iterator: Vec::new().into_iter(),
|
660
|
|
}
|
661
|
1
|
}
|
662
|
|
|
663
|
1
|
pub fn next(&mut self, address: &Address) -> Option<RecRef> {
|
664
|
|
// This loop is needed because some pages may be empty
|
665
|
1
|
loop {
|
666
|
1
|
let iter = self.per_page_iterator.next();
|
667
|
1
|
if iter.is_none() && self.next_page != 0 {
|
668
|
1
|
self.cur_page = self.next_page;
|
669
|
1
|
if let Ok((next_page, elements)) = address.scan_page_all(self.cur_page) {
|
670
|
1
|
self.next_page = next_page;
|
671
|
1
|
self.per_page_iterator = elements.into_iter();
|
672
|
1
|
continue;
|
673
|
|
}
|
674
|
1
|
}
|
675
|
1
|
break iter;
|
676
|
|
}
|
677
|
1
|
.map(|(pos, _exist)| RecRef::new(self.cur_page, pos))
|
678
|
1
|
}
|
679
|
|
}
|
680
|
|
|
681
|
|
#[cfg(test)]
|
682
|
|
mod tests {
|
683
|
|
use super::{segment_hash, SegmentPage, SegmentPageRead, Segments};
|
684
|
|
use crate::{
|
685
|
|
address::ADDRESS_PAGE_EXP,
|
686
|
|
allocator::Allocator,
|
687
|
|
config::Config,
|
688
|
|
discref::{DiscRef, Page, PageOps},
|
689
|
|
};
|
690
|
|
use std::sync::Arc;
|
691
|
|
use tempfile::Builder;
|
692
|
|
|
693
|
1
|
fn create_allocator(file_name: &str) -> Allocator {
|
694
|
1
|
let file = Builder::new()
|
695
|
|
.prefix(file_name)
|
696
|
|
.suffix(".persy")
|
697
|
|
.tempfile()
|
698
|
|
.unwrap()
|
699
|
|
.reopen()
|
700
|
1
|
.unwrap();
|
701
|
1
|
let config = Arc::new(Config::new());
|
702
|
1
|
let disc = Box::new(DiscRef::new(file).unwrap());
|
703
|
1
|
let (_, allocator) = Allocator::init(disc, &config).unwrap();
|
704
|
1
|
allocator.allocate(5).unwrap(); //Just to be sure it not start from 0, it cannot happen in not test cases.
|
705
|
|
|
706
|
|
allocator
|
707
|
1
|
}
|
708
|
|
|
709
|
|
#[test]
|
710
|
1
|
fn test_create_drop_segment() {
|
711
|
1
|
let allocator = create_allocator("./raw_segment_create_delete.persy");
|
712
|
1
|
let all = Arc::new(allocator);
|
713
|
1
|
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
|
714
|
1
|
let root_index = root.get_index();
|
715
|
1
|
Segments::init(root, &all).unwrap();
|
716
|
1
|
let mut segments = Segments::new(root_index, &all).unwrap();
|
717
|
|
|
718
|
1
|
let (id, fp) = segments.create_temp_segment(&all, "some").unwrap();
|
719
|
1
|
segments.create_segment(id, fp).unwrap();
|
720
|
1
|
segments.flush_segments(&all).unwrap();
|
721
|
1
|
assert!(segments.segments_id.contains_key("some"));
|
722
|
1
|
assert!(segments.segments.contains_key(&id));
|
723
|
1
|
segments.drop_segment("some").unwrap();
|
724
|
1
|
segments.flush_segments(&all).unwrap();
|
725
|
1
|
assert!(!segments.segments_id.contains_key("some"));
|
726
|
1
|
assert!(!segments.segments.contains_key(&id));
|
727
|
1
|
}
|
728
|
|
|
729
|
|
#[test]
|
730
|
1
|
fn test_create_drop_temp_segment() {
|
731
|
1
|
let allocator = create_allocator("./segment_create_delete.persy");
|
732
|
1
|
let all = Arc::new(allocator);
|
733
|
1
|
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
|
734
|
1
|
let root_index = root.get_index();
|
735
|
1
|
Segments::init(root, &all).unwrap();
|
736
|
1
|
let mut segments = Segments::new(root_index, &all).unwrap();
|
737
|
|
|
738
|
1
|
let (id, _) = segments.create_temp_segment(&all, "some").unwrap();
|
739
|
1
|
assert!(segments.temp_segments.contains_key(&id));
|
740
|
1
|
assert!(segments.temp_segments_id.contains_key("some"));
|
741
|
1
|
segments.drop_temp_segment(&all, id).unwrap();
|
742
|
1
|
assert!(!segments.temp_segments.contains_key(&id));
|
743
|
1
|
assert!(!segments.temp_segments_id.contains_key("some"));
|
744
|
1
|
}
|
745
|
|
|
746
|
|
#[test]
|
747
|
1
|
fn test_create_close_drop_close_segment() {
|
748
|
1
|
let allocator = create_allocator("./segment_pers_create_delete.persy");
|
749
|
1
|
let all = Arc::new(allocator);
|
750
|
1
|
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
|
751
|
1
|
let root_index = root.get_index();
|
752
|
1
|
Segments::init(root, &all).unwrap();
|
753
|
|
let id;
|
754
|
|
{
|
755
|
1
|
let mut segments = Segments::new(root_index, &all).unwrap();
|
756
|
1
|
let (c_id, fp) = segments.create_temp_segment(&all, "some").unwrap();
|
757
|
1
|
id = c_id;
|
758
|
1
|
segments.create_segment(id, fp).unwrap();
|
759
|
1
|
segments.flush_segments(&all).unwrap();
|
760
|
1
|
}
|
761
|
|
{
|
762
|
1
|
let mut segments = Segments::new(root_index, &all).unwrap();
|
763
|
1
|
assert_eq!(segments.segments.len(), 1);
|
764
|
1
|
assert!(segments.segments_id.contains_key("some"));
|
765
|
1
|
assert!(segments.segments.contains_key(&id));
|
766
|
1
|
segments.drop_segment("some").unwrap();
|
767
|
1
|
segments.flush_segments(&all).unwrap();
|
768
|
1
|
}
|
769
|
|
{
|
770
|
1
|
let segments = Segments::new(root_index, &all).unwrap();
|
771
|
1
|
assert!(!segments.segments_id.contains_key("some"));
|
772
|
1
|
assert!(!segments.segments.contains_key(&id));
|
773
|
1
|
}
|
774
|
1
|
}
|
775
|
|
|
776
|
|
#[test]
|
777
|
1
|
fn test_create_close_drop_close_segment_off_page() {
|
778
|
1
|
let allocator = create_allocator("./segment_pers_create_delete_off_page.persy");
|
779
|
1
|
let all = Arc::new(allocator);
|
780
|
1
|
let root = all.allocate(ADDRESS_PAGE_EXP).unwrap();
|
781
|
1
|
let root_index = root.get_index();
|
782
|
1
|
Segments::init(root, &all).unwrap();
|
783
|
|
{
|
784
|
1
|
let mut segments = Segments::new(root_index, &all).unwrap();
|
785
|
1
|
for i in 0..100 {
|
786
|
1
|
let (id, fp) = segments.create_temp_segment(&all, &format!("some{}", i)).unwrap();
|
787
|
1
|
segments.create_segment(id, fp).unwrap();
|
788
|
|
}
|
789
|
1
|
segments.flush_segments(&all).unwrap();
|
790
|
1
|
}
|
791
|
|
{
|
792
|
1
|
let mut segments = Segments::new(root_index, &all).unwrap();
|
793
|
1
|
for i in 0..100 {
|
794
|
1
|
assert!(segments.segments_id.contains_key(&format!("some{}", i)));
|
795
|
1
|
segments.drop_segment(&format!("some{}", i)).unwrap();
|
796
|
|
}
|
797
|
1
|
segments.flush_segments(&all).unwrap();
|
798
|
1
|
}
|
799
|
|
{
|
800
|
1
|
let segments = Segments::new(root_index, &all).unwrap();
|
801
|
1
|
for i in 0..100 {
|
802
|
1
|
assert!(!segments.segments_id.contains_key(&format!("some{}", i)));
|
803
|
|
}
|
804
|
1
|
}
|
805
|
1
|
}
|
806
|
|
|
807
|
|
#[test]
|
808
|
1
|
fn test_seg_insert_read_pointer() {
|
809
|
1
|
let mut page = Page::new(vec![0; 1024], 0, 0, 10);
|
810
|
1
|
page.segment_insert_entry(0, 30, 10).unwrap();
|
811
|
1
|
let read = page.segment_read_entry(0, 30).unwrap();
|
812
|
0
|
match read {
|
813
|
1
|
Some(val) => assert_eq!(val.0, 10),
|
814
|
0
|
None => assert!(false),
|
815
|
|
}
|
816
|
1
|
}
|
817
|
|
|
818
|
|
#[test]
|
819
|
1
|
fn test_seg_insert_update_read_pointer() {
|
820
|
1
|
let mut page = Page::new(vec![0; 1024], 0, 0, 10);
|
821
|
1
|
page.segment_insert_entry(0, 30, 10).unwrap();
|
822
|
1
|
page.segment_update_entry(0, 30, 15).unwrap();
|
823
|
1
|
let read = page.segment_read_entry(0, 30).unwrap();
|
824
|
0
|
match read {
|
825
|
1
|
Some(val) => assert_eq!(val.0, 15),
|
826
|
0
|
None => assert!(false),
|
827
|
|
}
|
828
|
1
|
}
|
829
|
|
|
830
|
|
#[test]
|
831
|
1
|
fn test_seg_insert_delete_read_pointer() {
|
832
|
1
|
let mut page = Page::new(vec![0; 1024], 0, 0, 10);
|
833
|
1
|
page.segment_insert_entry(0, 30, 10).unwrap();
|
834
|
1
|
page.segment_delete_entry(0, 30).unwrap();
|
835
|
1
|
let read = page.segment_read_entry(0, 30).unwrap();
|
836
|
0
|
match read {
|
837
|
1
|
Some(_) => assert!(false),
|
838
|
|
None => assert!(true),
|
839
|
|
}
|
840
|
1
|
}
|
841
|
|
|
842
|
|
#[test]
|
843
|
1
|
fn test_hash_id_generator() {
|
844
|
1
|
assert!(0 != segment_hash("some"));
|
845
|
1
|
}
|
846
|
|
}
|