1
use crate::{
2
    config::Config,
3
    error::{PRes, PersyError},
4
    id::{index_id_to_segment_id_meta, IndexId, PersyId, RecRef},
5
    index::{
6
        keeper::{Extractor, IndexSegmentKeeper, IndexSegmentKeeperTx},
7
        serialization::IndexSerialization,
8
    },
9
    io::{InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
10
    locks::RwLockManager,
11
    persy::PersyImpl,
12
    snapshot::SnapshotId,
13
    transaction::Transaction,
14
};
15
use std::{fmt::Display, io::Cursor, str, sync::Arc};
16

17
use data_encoding::BASE64URL_NOPAD;
18

19
/// Enum of all the possible Key or Value types for indexes
20
#[derive(Clone)]
21
pub enum IndexTypeId {
22
    U8,
23
    U16,
24
    U32,
25
    U64,
26
    U128,
27
    I8,
28
    I16,
29
    I32,
30
    I64,
31
    I128,
32
    F32W,
33
    F64W,
34
    STRING,
35
    PERSYID,
36
    BYTEVEC,
37
}
38

39
impl From<u8> for IndexTypeId {
40 1
    fn from(val: u8) -> IndexTypeId {
41 1
        match val {
42 1
            1 => IndexTypeId::U8,
43 0
            2 => IndexTypeId::U16,
44 1
            3 => IndexTypeId::U32,
45 0
            4 => IndexTypeId::U64,
46 0
            14 => IndexTypeId::U128,
47 0
            5 => IndexTypeId::I8,
48 0
            6 => IndexTypeId::I16,
49 0
            7 => IndexTypeId::I32,
50 0
            8 => IndexTypeId::I64,
51 0
            15 => IndexTypeId::I128,
52 0
            9 => IndexTypeId::F32W,
53 0
            10 => IndexTypeId::F64W,
54 0
            12 => IndexTypeId::STRING,
55 0
            13 => IndexTypeId::PERSYID,
56 0
            16 => IndexTypeId::BYTEVEC,
57 0
            _ => panic!("type node defined for {}", val),
58
        }
59 1
    }
60
}
61

62
pub trait WrapperType<T>: Clone + From<T> + Extractor + Ord {
63
    fn value(self) -> T;
64
}
65
/// Trait implemented by all supported types in the index
66
pub trait IndexType: Display + IndexOrd + Clone + IndexSerialization {
67
    type Wrapper: WrapperType<Self>;
68
    fn get_id() -> u8;
69
    fn get_type_id() -> IndexTypeId;
70
}
71

72
pub trait IndexOrd {
73
    fn cmp(&self, other: &Self) -> std::cmp::Ordering;
74
}
75

76
macro_rules! impl_index_ord {
77
    ($($t:ty),+) => {
78
        $(
79
        impl IndexOrd for $t {
80 1
            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81 1
                std::cmp::Ord::cmp(self, other)
82 1
            }
83
        }
84
        )+
85
    };
86
}
87

88
impl_index_ord!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, String, PersyId, ByteVec);
89

90
impl IndexOrd for f32 {
91 1
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
92 1
        if self.is_nan() {
93 0
            if other.is_nan() {
94 0
                std::cmp::Ordering::Equal
95
            } else {
96 0
                std::cmp::Ordering::Less
97
            }
98 1
        } else if other.is_nan() {
99 0
            std::cmp::Ordering::Greater
100
        } else {
101 1
            std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
102
        }
103 1
    }
104
}
105

106
impl IndexOrd for f64 {
107 1
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
108 1
        if self.is_nan() {
109 0
            if other.is_nan() {
110 0
                std::cmp::Ordering::Equal
111
            } else {
112 0
                std::cmp::Ordering::Less
113
            }
114 1
        } else if other.is_nan() {
115 0
            std::cmp::Ordering::Greater
116
        } else {
117 1
            std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
118
        }
119 1
    }
120
}
121

122
/// Wrapper for `Vec<u8>` for use it in index keys or values
123 1
#[derive(Debug, PartialOrd, PartialEq, Clone, Ord, Eq)]
124 1
pub struct ByteVec(pub Vec<u8>);
125

126
impl From<Vec<u8>> for ByteVec {
127 1
    fn from(f: Vec<u8>) -> ByteVec {
128 1
        ByteVec(f)
129 1
    }
130
}
131
impl From<ByteVec> for Vec<u8> {
132
    fn from(f: ByteVec) -> Vec<u8> {
133
        f.0
134
    }
135
}
136

137
impl Display for ByteVec {
138 1
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 1
        write!(f, "{}", BASE64URL_NOPAD.encode(&self.0))
140 1
    }
141
}
142

143
impl std::str::FromStr for ByteVec {
144
    type Err = PersyError;
145

146 1
    fn from_str(s: &str) -> Result<Self, Self::Err> {
147 1
        Ok(ByteVec(BASE64URL_NOPAD.decode(s.as_bytes())?))
148 1
    }
149
}
150

151
pub const INDEX_META_PREFIX: &str = "+_M";
152
pub const INDEX_DATA_PREFIX: &str = "+_D";
153

154
pub fn index_name_from_meta_segment(segment_name: &str) -> String {
155
    let mut name = segment_name.to_string();
156
    name.drain(..INDEX_META_PREFIX.len());
157
    name
158
}
159 1
pub fn format_segment_name_meta(index_name: &str) -> String {
160 1
    format!("{}{}", INDEX_META_PREFIX, index_name)
161 1
}
162

163 1
pub fn format_segment_name_data(index_name: &str) -> String {
164 1
    format!("{}{}", INDEX_DATA_PREFIX, index_name)
165 1
}
166

167
/// Define the behavior of the index in case a key value pair already exists
168 1
#[derive(Clone, Debug, PartialEq, Eq)]
169
pub enum ValueMode {
170
    /// An error will return if a key value pair already exists
171
    EXCLUSIVE,
172
    /// The value will be add to a list of values for the key, duplicate value will be collapsed to
173
    /// only one entry
174
    CLUSTER,
175
    /// The existing value will be replaced with the new value if a key value pair already exists
176
    REPLACE,
177
}
178

179
impl From<u8> for ValueMode {
180 1
    fn from(value: u8) -> Self {
181 1
        match value {
182 1
            1 => ValueMode::EXCLUSIVE,
183 1
            2 => ValueMode::CLUSTER,
184 1
            3 => ValueMode::REPLACE,
185 0
            _ => unreachable!("is impossible to get a value mode from values not 1,2,3"),
186
        }
187 1
    }
188
}
189

190
impl ValueMode {
191 1
    fn to_u8(&self) -> u8 {
192 1
        match self {
193 1
            ValueMode::EXCLUSIVE => 1,
194 1
            ValueMode::CLUSTER => 2,
195 1
            ValueMode::REPLACE => 3,
196
        }
197 1
    }
198
}
199

200
pub struct Indexes {
201
    index_locks: RwLockManager<IndexId>,
202
    config: Arc<Config>,
203
}
204

205 1
#[derive(Clone, Debug, PartialEq, Eq)]
206
pub struct IndexConfig {
207 1
    name: String,
208 1
    root: Option<RecRef>,
209 1
    pub key_type: u8,
210 1
    pub value_type: u8,
211 1
    page_min: usize,
212 1
    page_max: usize,
213 1
    pub value_mode: ValueMode,
214
}
215

216
impl IndexConfig {
217 1
    fn serialize(&self, w: &mut dyn InfallibleWrite) -> PRes<()> {
218 1
        w.write_u8(0);
219 1
        self.serialize_v0(w)
220 1
    }
221 1
    fn serialize_v0(&self, w: &mut dyn InfallibleWrite) -> PRes<()> {
222 1
        if let Some(ref root) = self.root {
223 1
            w.write_u64(root.page);
224 1
            w.write_u32(root.pos);
225
        } else {
226 1
            w.write_u64(0);
227 1
            w.write_u32(0);
228
        }
229 1
        w.write_u8(self.key_type);
230 1
        w.write_u8(self.value_type);
231 1
        w.write_u32(self.page_min as u32);
232 1
        w.write_u32(self.page_max as u32);
233 1
        w.write_u8(self.value_mode.to_u8());
234 1
        w.write_u16(self.name.len() as u16);
235 1
        w.write_all(self.name.as_bytes());
236 1
        Ok(())
237 1
    }
238 1
    fn deserialize(r: &mut dyn InfallibleRead) -> PRes<IndexConfig> {
239 1
        let version = r.read_u8();
240
        match version {
241 1
            0u8 => IndexConfig::deserialize_v0(r),
242 0
            _ => panic!("unsupported disc format"),
243
        }
244 1
    }
245 1
    fn deserialize_v0(r: &mut dyn InfallibleRead) -> PRes<IndexConfig> {
246 1
        let index_root_page = r.read_u64();
247 1
        let index_root_pos = r.read_u32();
248 1
        let key_type = r.read_u8();
249 1
        let value_type = r.read_u8();
250 1
        let page_min = r.read_u32() as usize;
251 1
        let page_max = r.read_u32() as usize;
252 1
        let value_mode = ValueMode::from(r.read_u8());
253

254 1
        let name_size = r.read_u16() as usize;
255 1
        let mut slice: Vec<u8> = vec![0; name_size];
256 1
        r.read_exact(&mut slice);
257 1
        let name: String = str::from_utf8(&slice[0..name_size])?.into();
258 1
        let root = if index_root_page != 0 && index_root_pos != 0 {
259 1
            Some(RecRef::new(index_root_page, index_root_pos))
260
        } else {
261 1
            None
262
        };
263 1
        Ok(IndexConfig {
264 1
            name,
265 1
            root,
266
            key_type,
267
            value_type,
268
            page_min,
269
            page_max,
270
            value_mode,
271
        })
272 1
    }
273

274 1
    pub fn check<K: IndexType, V: IndexType>(&self) -> PRes<()> {
275 1
        if self.key_type != K::get_id() {
276 0
            Err(PersyError::IndexTypeMismatch("key type".into()))
277 1
        } else if self.value_type != V::get_id() {
278 0
            Err(PersyError::IndexTypeMismatch("value type".into()))
279
        } else {
280 1
            Ok(())
281
        }
282 1
    }
283 1
    pub fn get_root(&self) -> Option<RecRef> {
284 1
        self.root.clone()
285 1
    }
286
}
287

288 0
fn error_map(err: PersyError) -> PersyError {
289 0
    if let PersyError::SegmentNotFound = err {
290 0
        PersyError::IndexNotFound
291
    } else {
292 0
        err
293
    }
294 0
}
295

296
impl Indexes {
297 1
    pub fn new(config: &Arc<Config>) -> Indexes {
298 1
        Indexes {
299 1
            index_locks: Default::default(),
300 1
            config: config.clone(),
301 0
        }
302 1
    }
303

304 1
    pub fn create_index<K, V>(
305
        p: &PersyImpl,
306
        tx: &mut Transaction,
307
        name: &str,
308
        min: usize,
309
        max: usize,
310
        value_mode: ValueMode,
311
    ) -> PRes<()>
312
    where
313
        K: IndexType,
314
        V: IndexType,
315
    {
316 1
        if min > max / 2 {
317 0
            return Err(PersyError::IndexMinElementsShouldBeAtLeastDoubleOfMax);
318
        }
319 1
        let segment_name_meta = format_segment_name_meta(name);
320 1
        p.create_segment(tx, &segment_name_meta)?;
321 1
        let segment_name_data = format_segment_name_data(name);
322 1
        p.create_segment(tx, &segment_name_data)?;
323 1
        let cfg = IndexConfig {
324 1
            name: name.to_string(),
325 1
            root: None,
326 1
            key_type: K::get_id(),
327 1
            value_type: V::get_id(),
328
            page_min: min,
329
            page_max: max,
330
            value_mode,
331 0
        };
332 1
        let mut scfg = Vec::new();
333 1
        cfg.serialize(&mut scfg)?;
334 1
        p.insert_record(tx, &segment_name_meta, &scfg)?;
335 1
        Ok(())
336 1
    }
337

338 1
    pub fn drop_index(p: &PersyImpl, tx: &mut Transaction, name: &str) -> PRes<()> {
339 1
        let segment_name_meta = format_segment_name_meta(name);
340 1
        p.drop_segment(tx, &segment_name_meta)?;
341 1
        let segment_name_data = format_segment_name_data(name);
342 1
        p.drop_segment(tx, &segment_name_data)?;
343 1
        Ok(())
344 1
    }
345

346 1
    pub fn update_index_root(
347
        p: &PersyImpl,
348
        tx: &mut Transaction,
349
        index_id: &IndexId,
350
        root: Option<RecRef>,
351
    ) -> PRes<()> {
352 1
        let segment_meta = index_id_to_segment_id_meta(index_id);
353 1
        let (id, mut config) =
354 1
            if let Some((rid, content, _)) = p.scan_tx(tx, segment_meta).map_err(error_map)?.next(p, tx) {
355 1
                (rid, IndexConfig::deserialize(&mut Cursor::new(content))?)
356 1
            } else {
357 0
                return Err(PersyError::IndexNotFound);
358 1
            };
359

360 1
        if config.root != root {
361 1
            config.root = root;
362 1
            let mut scfg = Vec::new();
363 1
            config.serialize(&mut scfg)?;
364 1
            p.update(tx, segment_meta, &id.0, &scfg)?;
365 1
        }
366 1
        Ok(())
367 1
    }
368

369 1
    pub fn get_index_tx(p: &PersyImpl, tx: &Transaction, index_id: &IndexId) -> PRes<(IndexConfig, u16)> {
370 1
        let segment_meta = index_id_to_segment_id_meta(index_id);
371 1
        if let Some((_, content, version)) = p.scan_tx(tx, segment_meta).map_err(error_map)?.next(p, tx) {
372 1
            Ok((IndexConfig::deserialize(&mut Cursor::new(content))?, version))
373 1
        } else {
374 0
            Err(PersyError::IndexNotFound)
375
        }
376 1
    }
377

378 1
    pub fn get_config_id(p: &PersyImpl, tx: &mut Transaction, index_id: &IndexId) -> PRes<PersyId> {
379 1
        let segment_meta = index_id_to_segment_id_meta(index_id);
380 1
        if let Some((id, _, _)) = p.scan_tx(tx, segment_meta).map_err(error_map)?.next(p, tx) {
381 1
            Ok(id)
382
        } else {
383 0
            Err(PersyError::IndexNotFound)
384
        }
385 1
    }
386

387 1
    pub fn get_index(p: &PersyImpl, snapshot_id: SnapshotId, index_id: &IndexId) -> PRes<IndexConfig> {
388 1
        let segment_meta = index_id_to_segment_id_meta(index_id);
389 1
        p.scan_snapshot_index(segment_meta, snapshot_id)
390 1
            .map_err(error_map)?
391
            .next(p)
392 1
            .map(|(_, content)| IndexConfig::deserialize(&mut Cursor::new(content)))
393 1
            .unwrap_or(Err(PersyError::IndexNotFound))
394 1
    }
395

396 1
    pub fn get_index_keeper<'a, K: IndexType, V: IndexType>(
397
        p: &'a PersyImpl,
398
        snapshot: SnapshotId,
399
        index_id: &IndexId,
400
    ) -> PRes<IndexSegmentKeeper<'a>> {
401 1
        let config = Indexes::get_index(p, snapshot, index_id)?;
402 1
        config.check::<K, V>()?;
403 1
        Ok(IndexSegmentKeeper::new(
404 1
            &config.name,
405
            index_id,
406 1
            config.root,
407
            p,
408
            snapshot,
409
            config.value_mode,
410
        ))
411 1
    }
412

413 1
    pub fn get_index_keeper_tx<'a, K: IndexType, V: IndexType>(
414
        p: &'a PersyImpl,
415
        tx: &'a mut Transaction,
416
        index_id: &IndexId,
417
    ) -> PRes<IndexSegmentKeeperTx<'a, K, V>> {
418 1
        let (config, version) = Indexes::get_index_tx(p, tx, index_id)?;
419 1
        config.check::<K, V>()?;
420 1
        Ok(IndexSegmentKeeperTx::new(
421 1
            &config.name,
422
            index_id,
423 1
            config.root,
424
            version,
425
            p,
426
            tx,
427
            config.value_mode,
428 1
            config.page_min,
429 1
            config.page_max,
430
        ))
431 1
    }
432

433 1
    pub fn check_index<K: IndexType, V: IndexType>(
434
        p: &PersyImpl,
435
        tx: &mut Transaction,
436
        index_id: &IndexId,
437
    ) -> PRes<()> {
438 1
        let (config, _version) = Indexes::get_index_tx(p, tx, index_id)?;
439 1
        config.check::<K, V>()
440 1
    }
441

442
    #[allow(dead_code)]
443
    pub fn write_lock(&self, indexes: &[IndexId]) -> PRes<()> {
444
        self.index_locks
445
            .lock_all_write(indexes, *self.config.transaction_lock_timeout())
446
    }
447

448 1
    pub fn read_lock(&self, index: IndexId) -> PRes<()> {
449 1
        self.index_locks
450 1
            .lock_all_read(&[index], *self.config.transaction_lock_timeout())
451 1
    }
452

453 1
    pub fn read_lock_all(&self, indexes: &[IndexId]) -> PRes<()> {
454 1
        self.index_locks
455 1
            .lock_all_read(indexes, *self.config.transaction_lock_timeout())
456 1
    }
457

458 1
    pub fn read_unlock(&self, index: IndexId) -> PRes<()> {
459 1
        self.index_locks.unlock_all_read(&[index])
460 1
    }
461 1
    pub fn read_unlock_all(&self, indexes: &[IndexId]) -> PRes<()> {
462 1
        self.index_locks.unlock_all_read(indexes)
463 1
    }
464

465
    #[allow(dead_code)]
466
    pub fn write_unlock(&self, indexes: &[IndexId]) -> PRes<()> {
467
        self.index_locks.unlock_all_write(indexes)
468
    }
469
}
470

471
#[cfg(test)]
472
mod tests {
473
    use super::{ByteVec, IndexConfig, ValueMode};
474
    use std::io::Cursor;
475

476
    #[test]
477 1
    fn test_config_ser_des() {
478 1
        let cfg = IndexConfig {
479 1
            name: "abc".to_string(),
480 1
            root: None,
481
            key_type: 1,
482
            value_type: 1,
483
            page_min: 10,
484
            page_max: 30,
485 1
            value_mode: ValueMode::REPLACE,
486
        };
487

488 1
        let mut buff = Vec::new();
489 1
        cfg.serialize(&mut buff).expect("serialization works");
490 1
        let read = IndexConfig::deserialize(&mut Cursor::new(&mut buff)).expect("deserialization works");
491 1
        assert_eq!(cfg, read);
492 1
    }
493

494
    #[test]
495 1
    fn test_bytevec_to_from_string() {
496 1
        let bv = ByteVec(vec![10, 20]);
497 1
        let nbv = bv.to_string().parse().unwrap();
498 1
        assert_eq!(bv, nbv);
499 1
    }
500
}

Read our documentation on viewing source code .

Loading