1
use crate::repository::{Chunk, Compression, Encryption, Key, HMAC};
2

3
use futures::channel::oneshot;
4
use std::thread;
5
use tracing::instrument;
6

7
#[derive(Debug)]
8
struct Message {
9
    compression: Compression,
10
    encryption: Encryption,
11
    hmac: HMAC,
12
    key: Key,
13
    ret_chunk: oneshot::Sender<Chunk>,
14
}
15

16
#[derive(Clone)]
17
pub struct Pipeline {
18
    input: flume::Sender<(Vec<u8>, Message)>,
19
}
20

21
impl Pipeline {
22
    /// Spawns a new pipeline and populates it with a number of tasks
23 1
    pub fn new(task_count: usize) -> Pipeline {
24
        // A hacky approximation for the depth of the queue used
25
        // roughly 1.5 times the number of tasks used, plus one extra to make sure its not zero
26 1
        let queue_depth = (task_count * 3) / 2 + 1;
27 1
        let (input, rx) = flume::bounded(queue_depth);
28

29 1
        for _ in 0..task_count {
30 1
            let rx = rx.clone();
31 1
            thread::spawn(move || {
32 1
                while let Ok(input) = rx.recv() {
33 1
                    let (chunk, message): (Vec<u8>, Message) = input;
34 1
                    let c = Chunk::pack(
35 1
                        chunk,
36 1
                        message.compression,
37 1
                        message.encryption,
38 0
                        message.hmac,
39 0
                        &message.key,
40
                    );
41
                    // If sending to this channel fails, we have no way to communicate to
42
                    // the outside anymore. Just let this task die.
43 1
                    message.ret_chunk.send(c).unwrap();
44
                }
45
            });
46
        }
47
        Pipeline { input }
48
    }
49

50
    #[instrument(skip(self, data))]
51 0
    pub async fn process(
52
        &self,
53
        data: Vec<u8>,
54
        compression: Compression,
55
        encryption: Encryption,
56
        hmac: HMAC,
57
        key: Key,
58
    ) -> Chunk {
59 1
        let (c_tx, c_rx) = oneshot::channel();
60
        let message = Message {
61
            compression,
62
            encryption,
63
            hmac,
64
            key,
65
            ret_chunk: c_tx,
66
        };
67 1
        let input = self.input.clone();
68 1
        input
69 1
            .send_async((data, message))
70 0
            .await
71
            .expect("Sending to processing thread failed");
72

73 1
        c_rx.await
74
            .expect("Not able to communicate with processing tasks. Unable to recover.")
75
    }
76
}
77

78
impl Default for Pipeline {
79 0
    fn default() -> Self {
80 0
        Self::new(num_cpus::get_physical())
81
    }
82
}

Read our documentation on viewing source code .

Loading