1
use crate::PRes;
2
use std::{
3
    mem::replace,
4
    sync::{Arc, Condvar, Mutex},
5
    thread::{Builder, JoinHandle},
6
};
7

8
struct PendingOps<T> {
9
    ops: Vec<T>,
10
    running: bool,
11
}
12

13
impl<T> PendingOps<T> {
14 1
    fn new() -> Self {
15 1
        Self {
16 1
            ops: Vec::new(),
17
            running: true,
18
        }
19 1
    }
20

21 1
    fn push(&mut self, op: T) {
22 1
        self.ops.push(op);
23 1
    }
24

25 1
    fn terminate(&mut self) {
26 1
        self.running = false;
27 1
    }
28
}
29

30
pub(crate) struct BackgroundOps<T> {
31
    pending: Arc<(Mutex<PendingOps<T>>, Condvar)>,
32
    flush_thread: Option<JoinHandle<()>>,
33
}
34

35 1
fn sync_on_need<T, F, FO>(ops: &Mutex<PendingOps<T>>, cond: &Condvar, operation: FO, release_all: F) -> PRes<()>
36
where
37
    F: Fn(&[T]) -> PRes<()>,
38
    FO: Fn() -> PRes<()>,
39
{
40 1
    loop {
41 1
        let pending;
42
        let running;
43
        {
44 1
            let mut lock = ops.lock()?;
45 1
            lock = cond.wait_while(lock, |x| x.ops.is_empty() && x.running)?;
46 1
            pending = replace(&mut lock.ops, Vec::new());
47 1
            running = lock.running;
48 1
        }
49 1
        operation()?;
50 1
        release_all(&pending)?;
51 1
        if !running {
52 1
            break Ok(());
53
        }
54 1
    }
55 1
}
56

57
impl<T: 'static + Send> BackgroundOps<T> {
58 1
    pub fn new<F, FO>(operation: FO, release_all: F) -> PRes<Self>
59
    where
60
        F: Fn(&[T]) -> PRes<()>,
61
        F: Send + 'static,
62
        FO: Fn() -> PRes<()>,
63
        FO: Send + 'static,
64
    {
65 1
        let pending = Arc::new((Mutex::new(PendingOps::new()), Condvar::new()));
66 1
        let pass = pending.clone();
67 1
        let th = Builder::new()
68 1
            .name("Disc sync".into())
69 1
            .spawn(move || {
70 1
                sync_on_need(&pass.0, &pass.1, operation, release_all).unwrap();
71 1
            })
72
            .unwrap();
73 1
        Ok(Self {
74 1
            pending,
75 1
            flush_thread: Some(th),
76
        })
77 1
    }
78

79 1
    pub fn add_pending(&self, op: T) -> PRes<()> {
80 1
        let (ops, cond) = &*self.pending;
81 1
        let mut lock = ops.lock()?;
82 1
        lock.push(op);
83 1
        cond.notify_one();
84 1
        Ok(())
85 1
    }
86
}
87

88
impl<T> Drop for BackgroundOps<T> {
89 1
    fn drop(&mut self) {
90 1
        let (ops, cond) = &*self.pending;
91
        {
92 1
            let mut pend = ops.lock().unwrap();
93 1
            pend.terminate();
94 1
            cond.notify_one();
95 1
        }
96 1
        if let Some(handle) = replace(&mut self.flush_thread, None) {
97 1
            handle.join().unwrap();
98
        }
99 1
    }
100
}
101

102
#[cfg(test)]
103
mod tests {
104
    use super::BackgroundOps;
105
    use std::{
106
        sync::{
107
            atomic::{AtomicU64, Ordering},
108
            Arc,
109
        },
110
        thread::sleep,
111
        time::Duration,
112
    };
113

114
    #[test]
115 1
    fn test_execute_delayed() {
116 1
        let counter = Arc::new(AtomicU64::new(0));
117 1
        let cr = counter.clone();
118 1
        let bg = BackgroundOps::new(
119 1
            || {
120 1
                sleep(Duration::from_millis(10));
121 1
                Ok(())
122 1
            },
123 1
            move |ops| {
124 1
                cr.fetch_add(ops.len() as u64, Ordering::SeqCst);
125 1
                Ok(())
126 1
            },
127
        )
128
        .unwrap();
129

130 1
        bg.add_pending(1).unwrap();
131 1
        sleep(Duration::from_millis(1));
132 1
        bg.add_pending(2).unwrap();
133 1
        bg.add_pending(3).unwrap();
134 1
        sleep(Duration::from_millis(31));
135 1
        assert_eq!(counter.load(Ordering::SeqCst), 3);
136 1
    }
137
}

Read our documentation on viewing source code .

Loading