1
//! The repository imeplements a low-level key-value store, upon which all
2
//! higher level structures in asuran are built.
3
//!
4
//! The repository stores individual chunks, arrays of bytes, that can be
5
//! compressed and encrypted. Chunks are addressed by their key, which,
6
//! with the exception of the repository manifest, is derived from an HMAC of
7
//! the plain text of the chunk.
8
//!
9
//! Asuran repositories currently only operate in append only mode
10
//!
11
//! # Encryption and Compression
12
//!
13
//! Encryption and compression algorthims can be swapped out on a chunk by
14
//! chunk basis, with `Encryption::NoEncryption` and `Compression::NoCompression`
15
//! providing pass through modes for those who do not wish to use those
16
//! features.
17
//!
18
//! # Authentication
19
//!
20
//! Asuran uses Hash based Method Authentication Codes (HMAC), with swappable
21
//! hash algorithims, for both deduplicating and ensuring data integrety.
22
//!
23
//! The hash algorhtim used for the HMAC can also be changed out on a chunk by
24
//! chunk basis, though this would not be wise to do. As deduplication is
25
//! perfomed based on plaintext HMAC, this would severely compromise the
26
//! effectiveness of deduplicaiton.
27
//!
28
//! While the hash algrorithim used for HMAC can be swapped out, unlike the
29
//! ones for encryption and compression, it can not be turned off. Asuran
30
//! always verifies the intergety of the data.
31
//!
32
//! # Deduplication
33
//!
34
//! The deduplication strategy in asuran is straight foward. Each chunk is
35
//! stored in the repository with the hash of its plaintext as its key.
36
//! As the hash function used is a cryptographically secure HMAC, we can be
37
//! sure within the limits of reason that if two chunks have the same key,
38
//! they have the same data, and if they have the same data, then they have the
39
//! same key.
40
//!
41
//! Asuran will not write a chunk whose key already exists in the repository,
42
//! effectivly preventing the storage of duplicate chunks.
43
use crate::repository::backend::Manifest as ManifestTrait;
44
pub use crate::repository::backend::{Backend, BackendClone, Index, SegmentDescriptor};
45
use crate::repository::pipeline::Pipeline;
46

47
pub use asuran_core::repository::chunk::{Chunk, ChunkID, ChunkSettings};
48
pub use asuran_core::repository::compression::Compression;
49
pub use asuran_core::repository::encryption::Encryption;
50
pub use asuran_core::repository::hmac::HMAC;
51
pub use asuran_core::repository::key::{EncryptedKey, Key};
52

53
use semver::Version;
54
use thiserror::Error;
55
use tracing::{debug, info, instrument, span, trace, Level};
56
use uuid::Uuid;
57

58
use std::collections::HashSet;
59

60
pub mod backend;
61
pub mod pipeline;
62

63
/// An error for all the various things that can go wrong with handling chunks
64
#[derive(Error, Debug)]
65
#[non_exhaustive]
66
pub enum RepositoryError {
67
    #[error("Chunk Not in Repository")]
68
    ChunkNotFound,
69
    #[error("Chunker Error")]
70
    ChunkerError(#[from] asuran_core::repository::chunk::ChunkError),
71
    #[error("Backend Error")]
72
    BackendError(#[from] backend::BackendError),
73
}
74

75
type Result<T> = std::result::Result<T, RepositoryError>;
76

77
/// Provides an interface to the storage-backed key value store
78
///
79
/// File access is abstracted behind a swappable backend, all backends should
80
/// use roughly the same format, but leeway is made for cases such as S3 having
81
/// a flat directory structure
82
#[derive(Clone)]
83
pub struct Repository<T> {
84
    backend: T,
85
    /// Default compression for new chunks
86
    compression: Compression,
87
    /// Default MAC algorthim for new chunks
88
    hmac: HMAC,
89
    /// Default encryption algorthim for new chunks
90
    encryption: Encryption,
91
    /// Encryption key for this repo
92
    key: Key,
93
    /// Pipeline used for chunking
94
    pipeline: Pipeline,
95
    /// Depth of queues to build
96
    pub queue_depth: usize,
97
}
98

99
impl<T: BackendClone + 'static> Repository<T> {
100
    /// Creates a new repository with the specificed backend and defaults
101
    #[instrument(skip(key))]
102 0
    pub fn new(
103
        backend: T,
104
        compression: Compression,
105
        hmac: HMAC,
106
        encryption: Encryption,
107
        key: Key,
108
        pipeline_tasks: usize,
109
    ) -> Repository<T> {
110 0
        info!("Creating a repository with backend {:?}", backend);
111 0
        let pipeline = Pipeline::new(pipeline_tasks);
112
        Repository {
113
            backend,
114
            compression,
115
            hmac,
116
            encryption,
117
            key,
118
            pipeline,
119
            queue_depth: pipeline_tasks,
120
        }
121
    }
122

123
    /// Creates a new repository, accepting a ChunkSettings and a ThreadPool
124
    #[instrument(skip(key))]
125 0
    pub fn with(
126
        backend: T,
127
        settings: ChunkSettings,
128
        key: Key,
129
        pipeline_tasks: usize,
130
    ) -> Repository<T> {
131 1
        info!(
132
            "Creating a repository with backend {:?} and chunk settings {:?}",
133 0
            backend, settings
134
        );
135 1
        let pipeline = Pipeline::new(pipeline_tasks);
136
        Repository {
137
            backend,
138
            key,
139
            pipeline,
140 1
            compression: settings.compression,
141 1
            hmac: settings.hmac,
142 1
            encryption: settings.encryption,
143
            queue_depth: pipeline_tasks,
144
        }
145
    }
146

147
    /// Commits the index to storage
148
    ///
149
    /// This should be called every time an archive or manifest is written, at
150
    /// the very least
151
    #[instrument(skip(self))]
152 1
    pub async fn commit_index(&self) {
153 1
        debug!("Commiting Index");
154 1
        self.backend
155
            .get_index()
156
            .commit_index()
157 0
            .await
158
            .expect("Unable to commit index");
159
    }
160

161
    /// Writes a chunk directly to the repository
162
    ///
163
    /// Will return (`Chunk_ID`, `Already_Present`)
164
    ///
165
    /// `Already_Present` will be true if the chunk already exists in the
166
    /// repository.
167 1
    pub async fn write_raw(&mut self, chunk: Chunk) -> Result<(ChunkID, bool)> {
168 1
        let id = chunk.get_id();
169 1
        let span = span!(Level::DEBUG, "Writing Chunk", ?id);
170 1
        let _guard = span.enter();
171 1
        debug!("Writing chunk with id {:?}", id);
172

173
        // Check if chunk exists
174 1
        if self.has_chunk(id).await && id != ChunkID::manifest_id() {
175 1
            trace!("Chunk already existed, doing nothing.");
176 1
            Ok((id, true))
177
        } else {
178 1
            trace!("Chunk did not exist, continuning");
179

180
            // Get highest segment and check to see if has enough space
181 1
            let backend = &mut self.backend;
182 1
            let location = backend.write_chunk(chunk).await?;
183

184 1
            self.backend.get_index().set_chunk(id, location).await?;
185

186 1
            Ok((id, false))
187
        }
188
    }
189

190
    /// Writes a chunk to the repo
191
    ///
192
    /// Uses all defaults
193
    ///
194
    /// Will return None if writing the chunk fails.
195
    /// Will not write the chunk if it already exists.
196

197
    /// Bool in return value will be true if the chunk already existed in the
198
    /// Repository, and false otherwise
199
    #[instrument(skip(self, data))]
200 1
    pub async fn write_chunk(&mut self, data: Vec<u8>) -> Result<(ChunkID, bool)> {
201 1
        let chunk = self
202 0
            .pipeline
203
            .process(
204 1
                data,
205 1
                self.compression,
206 1
                self.encryption,
207 1
                self.hmac,
208 1
                self.key.clone(),
209
            )
210 0
            .await;
211 1
        self.write_raw(chunk).await
212
    }
213

214
    /// Writes a chunk to the repo
215
    ///
216
    /// Uses all defaults
217
    ///
218
    /// Will return None if writing the chunk fails.
219
    /// Will not write the chunk if it already exists.
220
    ///
221
    /// Manually sets the id of the written chunk.
222
    /// This should be used carefully, as it has potential to damage the repository.
223
    ///
224
    /// Primiarly intended for writing the manifest
225
    #[instrument(skip(self, data))]
226 0
    pub async fn write_chunk_with_id(
227
        &mut self,
228
        data: Vec<u8>,
229
        id: ChunkID,
230
    ) -> Result<(ChunkID, bool)> {
231 1
        let mut chunk = self
232 0
            .pipeline
233
            .process(
234 1
                data,
235 1
                self.compression,
236 1
                self.encryption,
237 1
                self.hmac,
238 1
                self.key.clone(),
239
            )
240 0
            .await;
241 1
        let mac = chunk.mac();
242 1
        let encryption = chunk.encryption();
243 1
        let data = (chunk.split().1).0;
244 1
        chunk = Chunk::from_parts(data, self.compression, encryption, self.hmac, mac, id);
245 1
        self.write_raw(chunk).await
246
    }
247

248
    /// Determines if a chunk exists in the index
249
    #[instrument(skip(self))]
250 1
    pub async fn has_chunk(&self, id: ChunkID) -> bool {
251 1
        self.backend.get_index().lookup_chunk(id).await.is_some()
252
    }
253

254
    /// Reads a chunk from the repo
255
    ///
256
    /// Returns none if reading the chunk fails
257
    #[instrument(skip(self))]
258 1
    pub async fn read_chunk(&mut self, id: ChunkID) -> Result<Vec<u8>> {
259
        // First, check if the chunk exists
260 1
        if self.has_chunk(id).await {
261 1
            let mut index = self.backend.get_index();
262 1
            let location = index.lookup_chunk(id).await.unwrap_or_else(|| {
263 0
                panic!("Index lied to us about having the chunk with ID {:?}", id)
264
            });
265 1
            let chunk = self.backend.read_chunk(location).await?;
266

267 1
            let data = chunk.unpack(&self.key)?;
268

269 1
            Ok(data)
270
        } else {
271 0
            Err(RepositoryError::ChunkNotFound)
272
        }
273
    }
274

275
    /// Provides a count of the number of chunks in the repository
276
    #[instrument(skip(self))]
277 1
    pub async fn count_chunk(&self) -> usize {
278 1
        self.backend.get_index().count_chunk().await
279
    }
280

281
    /// Returns the current default chunk settings for this repository
282
    #[instrument(skip(self))]
283 1
    pub fn chunk_settings(&self) -> ChunkSettings {
284
        ChunkSettings {
285 1
            encryption: self.encryption,
286 1
            compression: self.compression,
287 1
            hmac: self.hmac,
288
        }
289
    }
290

291
    /// Gets a refrence to the repository's key
292
    #[instrument(skip(self))]
293 0
    pub fn key(&self) -> &Key {
294 0
        &self.key
295
    }
296

297
    /// Provides a handle to the backend manifest
298
    #[instrument(skip(self))]
299 1
    pub fn backend_manifest(&self) -> T::Manifest {
300 1
        self.backend.get_manifest()
301
    }
302

303
    /// Performs any work that would normally be done in a drop impl, but needs to be done
304
    /// asyncronsyly.
305
    ///
306
    /// Calls into the backend's implementation
307
    #[instrument(skip(self))]
308 1
    pub async fn close(mut self) {
309 1
        self.backend.close().await;
310
    }
311

312
    /// Returns a set of all version and implementation pairs that have touched this repository in
313
    /// the past.
314 0
    pub async fn seen_versions(&self) -> HashSet<(Version, Uuid)> {
315 0
        self.backend_manifest().seen_versions().await
316
    }
317
}
318

319
#[cfg(test)]
320
mod tests {
321
    use super::*;
322
    use crate::repository::backend::common::sync_backend::BackendHandle;
323
    use crate::repository::backend::mem::*;
324
    use rand::prelude::*;
325

326
    fn get_repo_mem(key: Key) -> Repository<BackendHandle<Mem>> {
327
        let settings = ChunkSettings {
328
            compression: Compression::ZStd { level: 1 },
329
            hmac: HMAC::Blake2b,
330
            encryption: Encryption::new_aes256ctr(),
331
        };
332
        let backend = Mem::new(settings, key.clone(), 4);
333
        Repository::with(backend, settings, key, 2)
334
    }
335

336
    #[test]
337
    fn repository_add_read() {
338
        smol::block_on(async {
339
            let key = Key::random(32);
340

341
            let size = 7 * 10_u64.pow(3);
342
            let mut data1 = vec![0_u8; size as usize];
343
            thread_rng().fill_bytes(&mut data1);
344
            let mut data2 = vec![0_u8; size as usize];
345
            thread_rng().fill_bytes(&mut data2);
346
            let mut data3 = vec![0_u8; size as usize];
347
            thread_rng().fill_bytes(&mut data3);
348

349
            let mut repo = get_repo_mem(key);
350
            println!("Adding Chunks");
351
            let key1 = repo.write_chunk(data1.clone()).await.unwrap().0;
352
            let key2 = repo.write_chunk(data2.clone()).await.unwrap().0;
353
            let key3 = repo.write_chunk(data3.clone()).await.unwrap().0;
354

355
            println!("Reading Chunks");
356
            let out1 = repo.read_chunk(key1).await.unwrap();
357
            let out2 = repo.read_chunk(key2).await.unwrap();
358
            let out3 = repo.read_chunk(key3).await.unwrap();
359

360
            assert_eq!(data1, out1);
361
            assert_eq!(data2, out2);
362
            assert_eq!(data3, out3);
363
        });
364
    }
365

366
    #[test]
367
    fn double_add() {
368
        smol::block_on(async {
369
            // Adding the same chunk to the repository twice shouldn't result in
370
            // two chunks in the repository
371
            let mut repo = get_repo_mem(Key::random(32));
372
            assert_eq!(repo.count_chunk().await, 0);
373
            let data = [1_u8; 8192];
374

375
            let (key_1, unique_1) = repo.write_chunk(data.to_vec()).await.unwrap();
376
            assert_eq!(unique_1, false);
377
            assert_eq!(repo.count_chunk().await, 1);
378
            let (key_2, unique_2) = repo.write_chunk(data.to_vec()).await.unwrap();
379
            assert_eq!(repo.count_chunk().await, 1);
380
            assert_eq!(unique_2, true);
381
            assert_eq!(key_1, key_2);
382
            std::mem::drop(repo);
383
        });
384
    }
385

386
    // Ensure writing a chunk with an ID works
387
    #[test]
388
    fn chunk_with_id() {
389
        smol::block_on(async {
390
            let mut repo = get_repo_mem(Key::random(32));
391
            // generate our chunk
392
            let size = 7 * 10_u64.pow(3);
393
            let mut data = vec![0_u8; size as usize];
394
            thread_rng().fill_bytes(&mut data);
395
            let id = ChunkID::manifest_id();
396
            // write it
397
            repo.write_chunk_with_id(data.clone(), id)
398
                .await
399
                .expect("Unable to write with id");
400
            // Read it
401
            let data_restore = repo
402
                .read_chunk(id)
403
                .await
404
                .expect("Unable to read chunk back out");
405
            assert_eq!(data, data_restore);
406
        });
407
    }
408
}

Read our documentation on viewing source code .

Loading