tglman / persy
1
use crate::{
2
    config::Config,
3
    error::{CreateIndexError, DropIndexError, IndexChangeError, IndexError, IndexOpsError, PERes, TimeoutError},
4
    id::{index_id_to_segment_id_meta, IndexId, PersyId, RecRef},
5
    index::{
6
        keeper::{Extractor, IndexSegmentKeeper, IndexSegmentKeeperTx},
7
        serialization::IndexSerialization,
8
    },
9
    io::{InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
10
    locks::RwLockManager,
11
    persy::PersyImpl,
12
    snapshots::SnapshotId,
13
    transaction_impl::TransactionImpl,
14
    GenericError,
15
};
16
use std::{fmt::Display, io::Cursor, str, sync::Arc};
17

18
use data_encoding::BASE64URL_NOPAD;
19

20
/// Enum of all the possible Key or Value types for indexes
21
#[derive(Clone)]
22
pub enum IndexTypeId {
23
    U8,
24
    U16,
25
    U32,
26
    U64,
27
    U128,
28
    I8,
29
    I16,
30
    I32,
31
    I64,
32
    I128,
33
    F32W,
34
    F64W,
35
    String,
36
    PersyId,
37
    ByteVec,
38
}
39

40
impl From<u8> for IndexTypeId {
41 1
    fn from(val: u8) -> IndexTypeId {
42 1
        match val {
43 1
            1 => IndexTypeId::U8,
44 0
            2 => IndexTypeId::U16,
45 1
            3 => IndexTypeId::U32,
46 0
            4 => IndexTypeId::U64,
47 0
            14 => IndexTypeId::U128,
48 0
            5 => IndexTypeId::I8,
49 0
            6 => IndexTypeId::I16,
50 0
            7 => IndexTypeId::I32,
51 0
            8 => IndexTypeId::I64,
52 0
            15 => IndexTypeId::I128,
53 0
            9 => IndexTypeId::F32W,
54 0
            10 => IndexTypeId::F64W,
55 0
            12 => IndexTypeId::String,
56 0
            13 => IndexTypeId::PersyId,
57 0
            16 => IndexTypeId::ByteVec,
58 0
            _ => panic!("type node defined for {}", val),
59
        }
60 1
    }
61
}
62

63
pub trait WrapperType<T>: Clone + From<T> + Extractor + Ord {
64
    fn value(self) -> T;
65
}
66
/// Trait implemented by all supported types in the index
67
pub trait IndexType: Display + IndexOrd + Clone + IndexSerialization {
68
    type Wrapper: WrapperType<Self>;
69
    fn get_id() -> u8;
70
    fn get_type_id() -> IndexTypeId;
71
}
72

73
pub trait IndexOrd {
74
    fn cmp(&self, other: &Self) -> std::cmp::Ordering;
75
}
76

77
macro_rules! impl_index_ord {
78
    ($($t:ty),+) => {
79
        $(
80
        impl IndexOrd for $t {
81 1
            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
82 1
                std::cmp::Ord::cmp(self, other)
83 1
            }
84
        }
85
        )+
86
    };
87
}
88

89
impl_index_ord!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, String, PersyId, ByteVec);
90

91
impl IndexOrd for f32 {
92 1
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
93 1
        if self.is_nan() {
94 0
            if other.is_nan() {
95 0
                std::cmp::Ordering::Equal
96
            } else {
97 0
                std::cmp::Ordering::Less
98
            }
99 1
        } else if other.is_nan() {
100 0
            std::cmp::Ordering::Greater
101
        } else {
102 1
            std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
103
        }
104 1
    }
105
}
106

107
impl IndexOrd for f64 {
108 1
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
109 1
        if self.is_nan() {
110 0
            if other.is_nan() {
111 0
                std::cmp::Ordering::Equal
112
            } else {
113 0
                std::cmp::Ordering::Less
114
            }
115 1
        } else if other.is_nan() {
116 0
            std::cmp::Ordering::Greater
117
        } else {
118 1
            std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
119
        }
120 1
    }
121
}
122

123
/// Wrapper for `Vec<u8>` for use it in index keys or values
124 1
#[derive(Debug, PartialOrd, PartialEq, Clone, Ord, Eq)]
125 1
pub struct ByteVec(pub Vec<u8>);
126

127
impl From<Vec<u8>> for ByteVec {
128 1
    fn from(f: Vec<u8>) -> ByteVec {
129 1
        ByteVec(f)
130 1
    }
131
}
132
impl From<ByteVec> for Vec<u8> {
133
    fn from(f: ByteVec) -> Vec<u8> {
134
        f.0
135
    }
136
}
137

138
impl Display for ByteVec {
139 1
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 1
        write!(f, "{}", BASE64URL_NOPAD.encode(&self.0))
141 1
    }
142
}
143

144
impl std::str::FromStr for ByteVec {
145
    type Err = GenericError;
146

147 1
    fn from_str(s: &str) -> Result<Self, Self::Err> {
148 1
        Ok(ByteVec(BASE64URL_NOPAD.decode(s.as_bytes())?))
149 1
    }
150
}
151

152
pub const INDEX_META_PREFIX: &str = "+_M";
153
pub const INDEX_DATA_PREFIX: &str = "+_D";
154

155
pub fn index_name_from_meta_segment(segment_name: &str) -> String {
156
    let mut name = segment_name.to_string();
157
    name.drain(..INDEX_META_PREFIX.len());
158
    name
159
}
160 1
pub fn format_segment_name_meta(index_name: &str) -> String {
161 1
    format!("{}{}", INDEX_META_PREFIX, index_name)
162 1
}
163

164 1
pub fn format_segment_name_data(index_name: &str) -> String {
165 1
    format!("{}{}", INDEX_DATA_PREFIX, index_name)
166 1
}
167

168
/// Define the behavior of the index in case a key value pair already exists
169 1
#[derive(Clone, Debug, PartialEq, Eq)]
170
pub enum ValueMode {
171
    /// An error will return if a key value pair already exists
172
    Exclusive,
173
    /// The value will be add to a list of values for the key, duplicate value will be collapsed to
174
    /// only one entry
175
    Cluster,
176
    /// The existing value will be replaced with the new value if a key value pair already exists
177
    Replace,
178
}
179

180
impl From<u8> for ValueMode {
181 1
    fn from(value: u8) -> Self {
182 1
        match value {
183 1
            1 => ValueMode::Exclusive,
184 1
            2 => ValueMode::Cluster,
185 1
            3 => ValueMode::Replace,
186 0
            _ => unreachable!("is impossible to get a value mode from values not 1,2,3"),
187
        }
188 1
    }
189
}
190

191
impl ValueMode {
192 1
    fn to_u8(&self) -> u8 {
193 1
        match self {
194 1
            ValueMode::Exclusive => 1,
195 1
            ValueMode::Cluster => 2,
196 1
            ValueMode::Replace => 3,
197
        }
198 1
    }
199
}
200

201
pub struct Indexes {
202
    index_locks: RwLockManager<IndexId>,
203
    config: Arc<Config>,
204
}
205

206 1
#[derive(Clone, Debug, PartialEq, Eq)]
207
pub struct IndexConfig {
208 1
    name: String,
209 1
    root: Option<RecRef>,
210 1
    pub key_type: u8,
211 1
    pub value_type: u8,
212 1
    page_min: usize,
213 1
    page_max: usize,
214 1
    pub value_mode: ValueMode,
215
}
216

217
impl IndexConfig {
218 1
    fn serialize(&self, w: &mut dyn InfallibleWrite) {
219 1
        w.write_u8(0);
220 1
        self.serialize_v0(w)
221 1
    }
222 1
    fn serialize_v0(&self, w: &mut dyn InfallibleWrite) {
223 1
        if let Some(ref root) = self.root {
224 1
            w.write_u64(root.page);
225 1
            w.write_u32(root.pos);
226
        } else {
227 1
            w.write_u64(0);
228 1
            w.write_u32(0);
229
        }
230 1
        w.write_u8(self.key_type);
231 1
        w.write_u8(self.value_type);
232 1
        w.write_u32(self.page_min as u32);
233 1
        w.write_u32(self.page_max as u32);
234 1
        w.write_u8(self.value_mode.to_u8());
235 1
        w.write_u16(self.name.len() as u16);
236 1
        w.write_all(self.name.as_bytes());
237 1
    }
238 1
    fn deserialize(r: &mut dyn InfallibleRead) -> PERes<IndexConfig> {
239 1
        let version = r.read_u8();
240
        match version {
241 1
            0u8 => IndexConfig::deserialize_v0(r),
242 0
            _ => panic!("unsupported disc format"),
243
        }
244 1
    }
245 1
    fn deserialize_v0(r: &mut dyn InfallibleRead) -> PERes<IndexConfig> {
246 1
        let index_root_page = r.read_u64();
247 1
        let index_root_pos = r.read_u32();
248 1
        let key_type = r.read_u8();
249 1
        let value_type = r.read_u8();
250 1
        let page_min = r.read_u32() as usize;
251 1
        let page_max = r.read_u32() as usize;
252 1
        let value_mode = ValueMode::from(r.read_u8());
253

254 1
        let name_size = r.read_u16() as usize;
255 1
        let mut slice: Vec<u8> = vec![0; name_size];
256 1
        r.read_exact(&mut slice);
257 1
        let name: String = str::from_utf8(&slice[0..name_size])?.into();
258 1
        let root = if index_root_page != 0 && index_root_pos != 0 {
259 1
            Some(RecRef::new(index_root_page, index_root_pos))
260
        } else {
261 1
            None
262
        };
263 1
        Ok(IndexConfig {
264 1
            name,
265 1
            root,
266
            key_type,
267
            value_type,
268
            page_min,
269
            page_max,
270
            value_mode,
271
        })
272 1
    }
273

274 1
    pub fn check<K: IndexType, V: IndexType>(&self) -> Result<(), IndexOpsError> {
275 1
        if self.key_type != K::get_id() {
276 0
            Err(IndexOpsError::IndexTypeMismatch("key type".into()))
277 1
        } else if self.value_type != V::get_id() {
278 0
            Err(IndexOpsError::IndexTypeMismatch("value type".into()))
279
        } else {
280 1
            Ok(())
281
        }
282 1
    }
283 1
    pub fn get_root(&self) -> Option<RecRef> {
284 1
        self.root.clone()
285 1
    }
286
}
287

288
impl Indexes {
289 1
    pub fn new(config: &Arc<Config>) -> Indexes {
290 1
        Indexes {
291 1
            index_locks: Default::default(),
292 1
            config: config.clone(),
293 0
        }
294 1
    }
295

296 1
    pub fn create_index<K, V>(
297
        p: &PersyImpl,
298
        tx: &mut TransactionImpl,
299
        name: &str,
300
        min: usize,
301
        max: usize,
302
        value_mode: ValueMode,
303
    ) -> Result<(), CreateIndexError>
304
    where
305
        K: IndexType,
306
        V: IndexType,
307
    {
308 1
        debug_assert!(min <= max / 2);
309 1
        let segment_name_meta = format_segment_name_meta(name);
310 1
        p.create_segment(tx, &segment_name_meta)?;
311 1
        let segment_name_data = format_segment_name_data(name);
312 1
        p.create_segment(tx, &segment_name_data)?;
313 1
        let cfg = IndexConfig {
314 1
            name: name.to_string(),
315 1
            root: None,
316 1
            key_type: K::get_id(),
317 1
            value_type: V::get_id(),
318
            page_min: min,
319
            page_max: max,
320
            value_mode,
321 0
        };
322 1
        let mut scfg = Vec::new();
323 1
        cfg.serialize(&mut scfg);
324 1
        p.insert_record(tx, &segment_name_meta, &scfg)?;
325 1
        Ok(())
326 1
    }
327

328 1
    pub fn drop_index(p: &PersyImpl, tx: &mut TransactionImpl, name: &str) -> Result<(), DropIndexError> {
329 1
        let segment_name_meta = format_segment_name_meta(name);
330 1
        p.drop_segment(tx, &segment_name_meta)?;
331 1
        let segment_name_data = format_segment_name_data(name);
332 1
        p.drop_segment(tx, &segment_name_data)?;
333 1
        Ok(())
334 1
    }
335

336 1
    pub fn update_index_root(
337
        p: &PersyImpl,
338
        tx: &mut TransactionImpl,
339
        index_id: &IndexId,
340
        root: Option<RecRef>,
341
    ) -> Result<(), IndexChangeError> {
342 1
        let segment_meta = index_id_to_segment_id_meta(index_id);
343 1
        let mut scan = p.scan_tx(tx, segment_meta)?;
344 1
        let (id, mut config) = if let Some((rid, content, _)) = scan.next(p, tx) {
345 1
            (rid, IndexConfig::deserialize(&mut Cursor::new(content))?)
346 1
        } else {
347 0
            return Err(IndexChangeError::IndexNotFound);
348 1
        };
349 1
        scan.release(p)?;
350

351 1
        if config.root != root {
352 1
            config.root = root;
353 1
            let mut scfg = Vec::new();
354 1
            config.serialize(&mut scfg);
355 1
            p.update(tx, segment_meta, &id.0, &scfg)?;
356 1
        }
357 1
        Ok(())
358 1
    }
359

360 1
    pub fn get_index_tx(
361
        p: &PersyImpl,
362
        tx: &TransactionImpl,
363
        index_id: &IndexId,
364
    ) -> Result<(IndexConfig, u16), IndexError> {
365 1
        let segment_meta = index_id_to_segment_id_meta(index_id);
366 1
        let mut scan = p.scan_tx(tx, segment_meta)?;
367 1
        if let Some((_, content, version)) = scan.next(p, tx) {
368 1
            scan.release(p)?;
369 1
            Ok((IndexConfig::deserialize(&mut Cursor::new(content))?, version))
370 1
        } else {
371 0
            scan.release(p)?;
372 0
            Err(IndexError::IndexNotFound)
373
        }
374 1
    }
375

376 1
    pub fn get_config_id(p: &PersyImpl, tx: &mut TransactionImpl, index_id: &IndexId) -> Result<PersyId, IndexError> {
377 1
        let segment_meta = index_id_to_segment_id_meta(index_id);
378 1
        let mut scan = p.scan_tx(tx, segment_meta)?;
379 1
        if let Some((id, _, _)) = scan.next(p, tx) {
380 1
            scan.release(p)?;
381 1
            Ok(id)
382
        } else {
383 0
            scan.release(p)?;
384 0
            Err(IndexError::IndexNotFound)
385
        }
386 1
    }
387

388 1
    pub fn get_index(p: &PersyImpl, snapshot_id: SnapshotId, index_id: &IndexId) -> Result<IndexConfig, IndexError> {
389 1
        let segment_meta = index_id_to_segment_id_meta(index_id);
390 1
        p.scan_snapshot_index(segment_meta, snapshot_id)?
391
            .next(p)
392 1
            .map(|(_, content)| IndexConfig::deserialize(&mut Cursor::new(content)).map_err(IndexError::from))
393 1
            .unwrap_or(Err(IndexError::IndexNotFound))
394 1
    }
395

396 1
    pub fn get_index_keeper<'a, K: IndexType, V: IndexType>(
397
        p: &'a PersyImpl,
398
        snapshot: SnapshotId,
399
        index_id: &IndexId,
400
    ) -> Result<IndexSegmentKeeper<'a>, IndexOpsError> {
401 1
        let config = Indexes::get_index(p, snapshot, index_id)?;
402 1
        config.check::<K, V>()?;
403 1
        Ok(IndexSegmentKeeper::new(
404 1
            &config.name,
405
            index_id,
406 1
            config.root,
407
            p,
408
            snapshot,
409
            config.value_mode,
410
        ))
411 1
    }
412

413 1
    pub fn get_index_keeper_tx<'a, K: IndexType, V: IndexType>(
414
        p: &'a PersyImpl,
415
        tx: &'a mut TransactionImpl,
416
        index_id: &IndexId,
417
    ) -> Result<IndexSegmentKeeperTx<'a, K, V>, IndexOpsError> {
418 1
        let (config, version) = Indexes::get_index_tx(p, tx, index_id)?;
419 1
        config.check::<K, V>()?;
420 1
        Ok(IndexSegmentKeeperTx::new(
421 1
            &config.name,
422
            index_id,
423 1
            config.root,
424
            version,
425
            p,
426
            tx,
427
            config.value_mode,
428 1
            config.page_min,
429 1
            config.page_max,
430
        ))
431 1
    }
432

433 1
    pub fn check_index<K: IndexType, V: IndexType>(
434
        p: &PersyImpl,
435
        tx: &mut TransactionImpl,
436
        index_id: &IndexId,
437
    ) -> Result<(), IndexOpsError> {
438 1
        let (config, _version) = Indexes::get_index_tx(p, tx, index_id)?;
439 1
        config.check::<K, V>()
440 0
    }
441

442
    #[allow(dead_code)]
443
    pub fn write_lock(&self, indexes: &[IndexId]) -> Result<(), TimeoutError> {
444
        self.index_locks
445
            .lock_all_write(indexes, *self.config.transaction_lock_timeout())
446
    }
447

448 1
    pub fn read_lock(&self, index: IndexId) -> Result<(), TimeoutError> {
449 1
        self.index_locks
450 1
            .lock_all_read(&[index], *self.config.transaction_lock_timeout())
451 1
    }
452

453 1
    pub fn read_lock_all(&self, indexes: &[IndexId]) -> Result<(), TimeoutError> {
454 1
        self.index_locks
455 1
            .lock_all_read(indexes, *self.config.transaction_lock_timeout())
456 1
    }
457

458 1
    pub fn read_unlock(&self, index: IndexId) -> PERes<()> {
459 1
        self.index_locks.unlock_all_read(&[index])
460 1
    }
461 1
    pub fn read_unlock_all(&self, indexes: &[IndexId]) -> PERes<()> {
462 1
        self.index_locks.unlock_all_read(indexes)
463 1
    }
464

465
    #[allow(dead_code)]
466
    pub fn write_unlock(&self, indexes: &[IndexId]) -> PERes<()> {
467
        self.index_locks.unlock_all_write(indexes)
468
    }
469
}
470

471
#[cfg(test)]
472
mod tests {
473
    use super::{ByteVec, IndexConfig, ValueMode};
474
    use std::io::Cursor;
475

476
    #[test]
477 1
    fn test_config_ser_des() {
478 1
        let cfg = IndexConfig {
479 1
            name: "abc".to_string(),
480 1
            root: None,
481
            key_type: 1,
482
            value_type: 1,
483
            page_min: 10,
484
            page_max: 30,
485 1
            value_mode: ValueMode::Replace,
486
        };
487

488 1
        let mut buff = Vec::new();
489 1
        cfg.serialize(&mut buff);
490 1
        let read = IndexConfig::deserialize(&mut Cursor::new(&mut buff)).expect("deserialization works");
491 1
        assert_eq!(cfg, read);
492 1
    }
493

494
    #[test]
495 1
    fn test_bytevec_to_from_string() {
496 1
        let bv = ByteVec(vec![10, 20]);
497 1
        let nbv = bv.to_string().parse().unwrap();
498 1
        assert_eq!(bv, nbv);
499 1
    }
500
}

Read our documentation on viewing source code .

Loading