1
|
|
use crate::repository::{Chunk, Compression, Encryption, Key, HMAC};
|
2
|
|
|
3
|
|
use futures::channel::oneshot;
|
4
|
|
use std::thread;
|
5
|
|
use tracing::instrument;
|
6
|
|
|
7
|
|
#[derive(Debug)]
|
8
|
|
struct Message {
|
9
|
|
compression: Compression,
|
10
|
|
encryption: Encryption,
|
11
|
|
hmac: HMAC,
|
12
|
|
key: Key,
|
13
|
|
ret_chunk: oneshot::Sender<Chunk>,
|
14
|
|
}
|
15
|
|
|
16
|
|
#[derive(Clone)]
|
17
|
|
pub struct Pipeline {
|
18
|
|
input: flume::Sender<(Vec<u8>, Message)>,
|
19
|
|
}
|
20
|
|
|
21
|
|
impl Pipeline {
|
22
|
|
/// Spawns a new pipeline and populates it with a number of tasks
|
23
|
1
|
pub fn new(task_count: usize) -> Pipeline {
|
24
|
|
// A hacky approximation for the depth of the queue used
|
25
|
|
// roughly 1.5 times the number of tasks used, plus one extra to make sure its not zero
|
26
|
1
|
let queue_depth = (task_count * 3) / 2 + 1;
|
27
|
1
|
let (input, rx) = flume::bounded(queue_depth);
|
28
|
|
|
29
|
1
|
for _ in 0..task_count {
|
30
|
1
|
let rx = rx.clone();
|
31
|
1
|
thread::spawn(move || {
|
32
|
1
|
while let Ok(input) = rx.recv() {
|
33
|
1
|
let (chunk, message): (Vec<u8>, Message) = input;
|
34
|
1
|
let c = Chunk::pack(
|
35
|
1
|
chunk,
|
36
|
1
|
message.compression,
|
37
|
1
|
message.encryption,
|
38
|
0
|
message.hmac,
|
39
|
0
|
&message.key,
|
40
|
|
);
|
41
|
|
// If sending to this channel fails, we have no way to communicate to
|
42
|
|
// the outside anymore. Just let this task die.
|
43
|
1
|
message.ret_chunk.send(c).unwrap();
|
44
|
|
}
|
45
|
|
});
|
46
|
|
}
|
47
|
|
Pipeline { input }
|
48
|
|
}
|
49
|
|
|
50
|
|
#[instrument(skip(self, data))]
|
51
|
0
|
pub async fn process(
|
52
|
|
&self,
|
53
|
|
data: Vec<u8>,
|
54
|
|
compression: Compression,
|
55
|
|
encryption: Encryption,
|
56
|
|
hmac: HMAC,
|
57
|
|
key: Key,
|
58
|
|
) -> Chunk {
|
59
|
1
|
let (c_tx, c_rx) = oneshot::channel();
|
60
|
|
let message = Message {
|
61
|
|
compression,
|
62
|
|
encryption,
|
63
|
|
hmac,
|
64
|
|
key,
|
65
|
|
ret_chunk: c_tx,
|
66
|
|
};
|
67
|
1
|
let input = self.input.clone();
|
68
|
1
|
input
|
69
|
1
|
.send_async((data, message))
|
70
|
0
|
.await
|
71
|
|
.expect("Sending to processing thread failed");
|
72
|
|
|
73
|
1
|
c_rx.await
|
74
|
|
.expect("Not able to communicate with processing tasks. Unable to recover.")
|
75
|
|
}
|
76
|
|
}
|
77
|
|
|
78
|
|
impl Default for Pipeline {
|
79
|
0
|
fn default() -> Self {
|
80
|
0
|
Self::new(num_cpus::get_physical())
|
81
|
|
}
|
82
|
|
}
|