1
use crate::repository::backend::common::{IndexTransaction, LockedFile};
2
use crate::repository::backend::{self, BackendError, Result, SegmentDescriptor};
3
use crate::repository::ChunkID;
4

5
use async_trait::async_trait;
6
use futures::channel::oneshot;
7
use serde_cbor as cbor;
8

9
use std::collections::{HashMap, HashSet};
10
use std::fs::{create_dir, read_dir, File};
11
use std::io::{BufWriter, Seek, SeekFrom};
12
use std::path::Path;
13
use std::thread;
14

15
#[derive(Debug)]
16
struct InternalIndex {
17
    state: HashMap<ChunkID, SegmentDescriptor>,
18
    file: LockedFile,
19
    changes: Vec<IndexTransaction>,
20
}
21

22
impl InternalIndex {
23
    /// Internal function for opening the index
24
    ///
25
    /// The index this creates is not thread safe, see `Index` for the thread safe implementation on
26
    /// top of this.
27 1
    fn open(repository_path: impl AsRef<Path>) -> Result<InternalIndex> {
28
        // construct the path of the index folder
29 1
        let index_path = repository_path.as_ref().join("index");
30
        // Check to see if it exists
31 1
        if Path::exists(&index_path) {
32
            // If it is a file, return failure
33 1
            if Path::is_file(&index_path) {
34 0
                return Err(BackendError::IndexError(format!(
35 0
                    "Failed to load index, {:?} is a file, not a directory",
36 0
                    index_path
37
                )));
38
            }
39
        } else {
40
            // Create the index directory
41 1
            create_dir(&index_path)?;
42
        }
43
        // Create the state map
44 1
        let mut state: HashMap<ChunkID, SegmentDescriptor> = HashMap::new();
45

46
        // Get the list of files, and sort them by ID
47 1
        let mut items = read_dir(&index_path)?
48 0
            .filter_map(std::result::Result::ok)
49 1
            .filter(|x| x.path().is_file())
50 1
            .filter_map(|x| {
51 1
                x.path()
52 1
                    .file_name()?
53 0
                    .to_str()
54 1
                    .and_then(|y| std::result::Result::ok(y.parse::<usize>()))
55 1
                    .map(|z| (z, x))
56
            })
57
            .collect::<Vec<_>>();
58 1
        items.sort_by(|a, b| a.0.cmp(&b.0));
59

60
        // Add all the seen transactions to our state hashmap
61 1
        for (_, file) in &items {
62
            // Open the file
63 1
            let mut file = File::open(file.path())?;
64
            // Keep deserializing transactions until we encouter an error
65 1
            let de = cbor::Deserializer::from_reader(&mut file);
66 1
            let mut de = de.into_iter::<IndexTransaction>();
67 1
            while let Some(tx) = de.next().and_then(std::result::Result::ok) {
68
                // Insert each item into the state
69 1
                state.insert(tx.chunk_id, tx.descriptor);
70
            }
71
        }
72

73
        // Check to see if there are any unlocked index files, and if so, use the first ones
74 1
        for (_, file) in &items {
75 1
            let locked_file = LockedFile::open_read_write(file.path())?;
76 1
            if let Some(file) = locked_file {
77 1
                return Ok(InternalIndex {
78 1
                    state,
79 1
                    file,
80 1
                    changes: Vec::new(),
81
                });
82
            }
83
        }
84

85
        // If we have gotten here there are no unlocked index files, creating one
86

87
        // Check the length of the items list, if it is empty, there are no index files,
88
        // so we must create the first
89 1
        let id = if items.is_empty() {
90 1
            0
91
        } else {
92 1
            items[items.len() - 1].0 + 1
93
        };
94

95 1
        let path = index_path.join(id.to_string());
96 1
        let file = LockedFile::open_read_write(path)?
97
            .expect("Somehow, our newly created index file is locked.");
98 1
        Ok(InternalIndex {
99 1
            state,
100 1
            file,
101 1
            changes: Vec::new(),
102
        })
103
    }
104

105
    /// Drains the changes out of the internal buffer and commits them to disk
106 1
    fn drain_changes(&mut self) -> Result<()> {
107 1
        let mut file = BufWriter::new(&mut self.file);
108 1
        file.seek(SeekFrom::End(0))?;
109 1
        for tx in self.changes.drain(0..self.changes.len()) {
110 1
            cbor::ser::to_writer(&mut file, &tx)?;
111
        }
112 1
        Ok(())
113
    }
114
}
115

116
#[derive(Debug)]
117
enum IndexCommand {
118
    Lookup(ChunkID, oneshot::Sender<Option<SegmentDescriptor>>),
119
    Set(ChunkID, SegmentDescriptor, oneshot::Sender<Result<()>>),
120
    KnownChunks(oneshot::Sender<HashSet<ChunkID>>),
121
    Commit(oneshot::Sender<Result<()>>),
122
    Count(oneshot::Sender<usize>),
123
    Close(oneshot::Sender<()>),
124
}
125

126
#[derive(Clone)]
127
pub struct Index {
128
    input: flume::Sender<IndexCommand>,
129
    path: String,
130
}
131

132
/// `MultiFile` index with lock free multithreading
133
///
134
/// # Warning
135
///
136
/// You must call `commit_index` for your changes to be committed to disk, the Index
137
/// will not do this for you
138
impl Index {
139
    /// Opens and reads the index, creating it if it does not exist.
140
    ///
141
    /// Note that the repository path is the root path of the repository, not the path of the index
142
    /// folder.
143
    ///
144
    /// This method will create the index folder if it does not exist.
145
    ///
146
    /// Files who's names are not strictly base 10 integers are ignored, and will not be added to the
147
    /// state or written to.
148
    ///
149
    /// This method only creates the event loop on its own, the actual index is created by
150
    /// `InternalIndex::open`
151
    ///
152
    /// # Errors
153
    ///
154
    /// Will return Err if
155
    ///
156
    /// 1. The index folder does not exist and creating it failed
157
    /// 2. There are no unlocked index files and creating one fails
158
    /// 3. There is a file called "index" in the repository folder
159
    /// 4. Some other IO error (such as lack of permissions) occurs
160
    /// 5. The path contains non-utf8 characters
161
    ///
162
    /// # TODOs:
163
    ///
164
    /// 1. Return an error if deserializing a transaction fails before the end of the file is reached
165
    /// 2. This function can currently panic if we have to create a new index file, but someone else
166
    ///    that while we were parsing the transaction. Resolution for this conflict needs to be
167
    ///    implemented.
168 1
    pub fn open(repository_path: impl AsRef<Path>, queue_depth: usize) -> Result<Index> {
169
        // Open the index
170 1
        let mut index = InternalIndex::open(&repository_path)?;
171
        // Create the communication channel and open the event processing loop in it own task
172 1
        let (input, output) = flume::bounded(queue_depth);
173 1
        thread::spawn(move || {
174 1
            let mut final_ret = None;
175 1
            while let Ok(command) = output.recv() {
176 1
                match command {
177 1
                    IndexCommand::Lookup(id, ret) => {
178 1
                        ret.send(index.state.get(&id).copied()).unwrap();
179
                    }
180 1
                    IndexCommand::Set(id, descriptor, ret) => {
181
                        // TODO: dont insert the item into the changes list if it its already in the index
182 1
                        index.state.insert(id, descriptor);
183 1
                        let transaction = IndexTransaction {
184 1
                            chunk_id: id,
185 0
                            descriptor,
186
                        };
187 1
                        index.changes.push(transaction);
188 1
                        ret.send(Ok(())).unwrap();
189
                    }
190 0
                    IndexCommand::KnownChunks(ret) => {
191 0
                        ret.send(index.state.keys().copied().collect::<HashSet<_>>())
192 0
                            .unwrap();
193
                    }
194 1
                    IndexCommand::Count(ret) => {
195 1
                        ret.send(index.state.len()).unwrap();
196
                    }
197 1
                    IndexCommand::Commit(ret) => {
198 1
                        ret.send(index.drain_changes()).unwrap();
199
                    }
200 1
                    IndexCommand::Close(ret) => {
201 1
                        final_ret = Some(ret);
202 1
                        break;
203
                    }
204
                }
205
            }
206
            // Make sure that our internals are dropped before sending the completion signal to a
207
            // possible close call
208 1
            std::mem::drop(index);
209 1
            std::mem::drop(output);
210 1
            if let Some(ret) = final_ret {
211 1
                ret.send(()).unwrap();
212
            };
213
        });
214

215 1
        Ok(Index {
216 1
            input,
217 1
            path: repository_path.as_ref().to_str().unwrap().to_string(),
218
        })
219
    }
220

221 1
    pub async fn close(&mut self) {
222 1
        let (tx, rx) = oneshot::channel();
223 1
        self.input
224 1
            .send_async(IndexCommand::Close(tx))
225 0
            .await
226
            .expect("Called close on an already closed repository.");
227 1
        rx.await
228
            .expect("Called close on an already closed repository.");
229
    }
230
}
231

232
impl std::fmt::Debug for Index {
233 0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 0
        write!(f, "Index: {:?}", self.path)
235
    }
236
}
237

238
#[async_trait]
239
impl backend::Index for Index {
240 1
    async fn lookup_chunk(&mut self, id: ChunkID) -> Option<SegmentDescriptor> {
241 1
        let (input, output) = oneshot::channel();
242 1
        self.input
243 1
            .send_async(IndexCommand::Lookup(id, input))
244 0
            .await
245
            .expect("Unable to communicate with index task.");
246 1
        output
247 0
            .await
248
            .expect("Unable to communicate with index task.")
249
    }
250 1
    async fn set_chunk(&mut self, id: ChunkID, location: SegmentDescriptor) -> Result<()> {
251 1
        let (input, output) = oneshot::channel();
252 1
        self.input
253 1
            .send_async(IndexCommand::Set(id, location, input))
254 0
            .await
255 1
            .map_err(|_| BackendError::ChannelDroppedSend)?;
256 1
        output.await?
257
    }
258 0
    async fn known_chunks(&mut self) -> HashSet<ChunkID> {
259 0
        let (input, output) = oneshot::channel();
260 0
        self.input
261 0
            .send_async(IndexCommand::KnownChunks(input))
262 0
            .await
263
            .expect("Unable to communicate with index task.");
264 0
        output
265 0
            .await
266
            .expect("Unable to communicate with index task.")
267
    }
268 1
    async fn commit_index(&mut self) -> Result<()> {
269 1
        let (input, output) = oneshot::channel();
270 1
        self.input
271 1
            .send_async(IndexCommand::Commit(input))
272 0
            .await
273 1
            .map_err(|_| BackendError::ChannelDroppedSend)?;
274 1
        output.await?
275
    }
276 1
    async fn count_chunk(&mut self) -> usize {
277 1
        let (input, output) = oneshot::channel();
278 1
        self.input
279 1
            .send_async(IndexCommand::Count(input))
280 0
            .await
281
            .expect("Unable to communicate with index task.");
282 1
        output
283 0
            .await
284
            .expect("Unable to communicate with index task.")
285
    }
286
}
287

288
#[cfg(test)]
289
mod tests {
290
    use super::*;
291
    use backend::Index as OtherIndex;
292
    use rand;
293
    use rand::prelude::*;
294
    use std::path::PathBuf;
295
    use tempfile::{tempdir, TempDir};
296
    use walkdir::WalkDir;
297

298
    // Utility function, gets a tempdir, its path, an executor, and a spawner
299
    fn setup() -> (TempDir, PathBuf) {
300
        let tempdir = tempdir().unwrap();
301
        let path = tempdir.path().to_path_buf();
302
        (tempdir, path)
303
    }
304

305
    // Test to make sure creating an index in an empty folder
306
    // 1. Doesn't Panic or error
307
    // 2. Creates the index directory
308
    // 3. Creates the initial index file (index/0)
309
    // 4. Locks the initial index file (index/0.lock)
310
    #[test]
311
    fn creation_works() {
312
        smol::block_on(async {
313
            let (tempdir, path) = setup();
314
            // Create the index
315
            let index = Index::open(&path, 4).expect("Index creation failed");
316
            // Walk the directory and print some debugging info
317
            for entry in WalkDir::new(&path) {
318
                let entry = entry.unwrap();
319
                println!("{}", entry.path().display());
320
            }
321
            // Check for the index directory
322
            let index_dir = path.join("index");
323
            assert!(index_dir.exists());
324
            assert!(index_dir.is_dir());
325
            // Check for the initial index file
326
            let index_file = index_dir.join("0");
327
            assert!(index_file.exists());
328
            assert!(index_file.is_file());
329
            // Check for the initial index lock file
330
            let index_lock = index_dir.join("0.lock");
331
            assert!(index_lock.exists());
332
            assert!(index_lock.is_file());
333
        });
334
    }
335

336
    // Test to make sure creating a second index while the first is open
337
    // 1. Doesn't panic or error
338
    // 2. Creates and locks a second index file
339
    #[test]
340
    fn double_creation_works() {
341
        smol::block_on(async {
342
            let (tempdir, path) = setup();
343
            // Create the first index
344
            let index1 = Index::open(&path, 4).expect("Index 1 creation failed");
345
            let index2 = Index::open(&path, 4).expect("Index 2 creation failed");
346
            // Walk the directory and print some debugging info
347
            for entry in WalkDir::new(&path) {
348
                let entry = entry.unwrap();
349
                println!("{}", entry.path().display());
350
            }
351
            // Get index dir and check for index files
352
            let index_dir = path.join("index");
353
            let if1 = index_dir.join("0");
354
            let if2 = index_dir.join("1");
355
            let il1 = index_dir.join("0.lock");
356
            let il2 = index_dir.join("1.lock");
357
            assert!(if1.exists() && if1.is_file());
358
            assert!(if2.exists() && if2.is_file());
359
            assert!(il1.exists() && il1.is_file());
360
            assert!(il2.exists() && il2.is_file());
361
        });
362
    }
363

364
    // Test to make sure that dropping an Index unlocks the index file
365
    // Note: since we are using a single threaded executor, we must manually run all tasks to
366
    // completion.
367
    #[test]
368
    fn unlock_on_drop() {
369
        smol::block_on(async {
370
            let (tempdir, path) = setup();
371
            // Open an index and drop it
372
            let mut index = Index::open(&path, 4).expect("Index creation failed");
373
            index.close().await;
374
            // check for the index file and the absense of the lock file
375
            let index_dir = path.join("index");
376
            let index_file = index_dir.join("0");
377
            let index_lock = index_dir.join("0.lock");
378
            assert!(index_file.exists() && index_file.is_file());
379
            assert!(!index_lock.exists());
380
        });
381
    }
382

383
    // Test to verify that:
384
    // 1. Writing to a properly setup index does not Err or Panic
385
    // 2. Reading keys we have inserted into a properly setup index does not Err or Panic
386
    // 3. Keys are still present in the index after dropping and reloading from the same directory
387
    // 4. Chunk count increments properly
388
    #[test]
389
    fn write_drop_read() {
390
        smol::block_on(async {
391
            let (tempdir, path) = setup();
392
            // Get some transactions to write to the repository
393
            let mut txs = HashMap::new();
394
            for _ in 0..10 {
395
                let mut raw_id = [0_u8; 32];
396
                rand::thread_rng().fill_bytes(&mut raw_id);
397
                let segment_id: u64 = rand::thread_rng().gen();
398
                let start: u64 = rand::thread_rng().gen();
399
                let chunk_id = ChunkID::new(&raw_id);
400
                let descriptor = SegmentDescriptor { segment_id, start };
401
                txs.insert(chunk_id, descriptor);
402
            }
403
            // Open the index
404
            let mut index = Index::open(&path, 4).expect("Index creation failed");
405
            // Insert the transactions
406
            for (id, desc) in &txs {
407
                index
408
                    .set_chunk(*id, *desc)
409
                    .await
410
                    .expect("Adding transaction failed");
411
            }
412
            // Commit the index
413
            index.commit_index().await.expect("commiting index failed");
414
            // Get the chunk count and check it
415
            let count = index.count_chunk().await;
416
            assert_eq!(count, txs.len());
417
            // Drop the index and let it complete
418
            index.close().await;
419
            // Load the index back up
420
            let mut index = Index::open(&path, 4).expect("Index recreation failed");
421
            // Walk the directory and print some debugging info
422
            for entry in WalkDir::new(&path) {
423
                let entry = entry.unwrap();
424
                println!("{}", entry.path().display());
425
            }
426
            // Verify we still have the same number of chunks
427
            let count = index.count_chunk().await;
428
            assert_eq!(count, txs.len());
429
            // Confirm that each tx is in the index and has the correct value
430
            for (id, desc) in txs {
431
                let location = index.lookup_chunk(id).await.expect("Tx retrieve failed");
432
                assert_eq!(desc, location);
433
            }
434
        });
435
    }
436
}

Read our documentation on viewing source code .

Loading