actix / actix-extras
1
use std::collections::VecDeque;
2
use std::io;
3

4
use actix::prelude::*;
5
use actix_rt::net::TcpStream;
6
use actix_service::boxed::{service, BoxService};
7
use actix_tls::connect::{default_connector, Connect, ConnectError, Connection};
8
use backoff::backoff::Backoff;
9
use backoff::ExponentialBackoff;
10
use log::{error, info, warn};
11
use redis_async::error::Error as RespError;
12
use redis_async::resp::{RespCodec, RespValue};
13
use tokio::io::{split, WriteHalf};
14
use tokio::sync::oneshot;
15
use tokio_util::codec::FramedRead;
16

17
use crate::Error;
18

19
/// Command for send data to Redis
20
#[derive(Debug)]
21
pub struct Command(pub RespValue);
22

23
impl Message for Command {
24
    type Result = Result<RespValue, Error>;
25
}
26

27
/// Redis communication actor
28
pub struct RedisActor {
29
    addr: String,
30
    connector: BoxService<Connect<String>, Connection<String, TcpStream>, ConnectError>,
31
    backoff: ExponentialBackoff,
32
    cell: Option<actix::io::FramedWrite<RespValue, WriteHalf<TcpStream>, RespCodec>>,
33
    queue: VecDeque<oneshot::Sender<Result<RespValue, Error>>>,
34
}
35

36
impl RedisActor {
37
    /// Start new `Supervisor` with `RedisActor`.
38 1
    pub fn start<S: Into<String>>(addr: S) -> Addr<RedisActor> {
39 1
        let addr = addr.into();
40

41
        let backoff = ExponentialBackoff {
42
            max_elapsed_time: None,
43
            ..Default::default()
44
        };
45

46 1
        Supervisor::start(|_| RedisActor {
47 1
            addr,
48 1
            connector: service(default_connector()),
49 1
            cell: None,
50 1
            backoff,
51 1
            queue: VecDeque::new(),
52
        })
53
    }
54
}
55

56
impl Actor for RedisActor {
57
    type Context = Context<Self>;
58

59 1
    fn started(&mut self, ctx: &mut Context<Self>) {
60 1
        let req = Connect::new(self.addr.to_owned());
61 1
        self.connector
62 1
            .call(req)
63 0
            .into_actor(self)
64 1
            .map(|res, act, ctx| match res {
65 1
                Ok(conn) => {
66 1
                    let stream = conn.into_parts().0;
67 1
                    info!("Connected to redis server: {}", act.addr);
68

69 1
                    let (r, w) = split(stream);
70

71
                    // configure write side of the connection
72 1
                    let framed = actix::io::FramedWrite::new(w, RespCodec, ctx);
73 1
                    act.cell = Some(framed);
74

75
                    // read side of the connection
76 1
                    ctx.add_stream(FramedRead::new(r, RespCodec));
77

78 1
                    act.backoff.reset();
79
                }
80 1
                Err(err) => {
81 1
                    error!("Can not connect to redis server: {}", err);
82
                    // re-connect with backoff time.
83
                    // we stop current context, supervisor will restart it.
84 1
                    if let Some(timeout) = act.backoff.next_backoff() {
85 1
                        ctx.run_later(timeout, |_, ctx| ctx.stop());
86
                    }
87
                }
88
            })
89 0
            .wait(ctx);
90
    }
91
}
92

93
impl Supervised for RedisActor {
94 0
    fn restarting(&mut self, _: &mut Self::Context) {
95 0
        self.cell.take();
96 0
        for tx in self.queue.drain(..) {
97 0
            let _ = tx.send(Err(Error::Disconnected));
98
        }
99
    }
100
}
101

102
impl actix::io::WriteHandler<io::Error> for RedisActor {
103 0
    fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
104 0
        warn!("Redis connection dropped: {} error: {}", self.addr, err);
105 0
        Running::Stop
106
    }
107
}
108

109
impl StreamHandler<Result<RespValue, RespError>> for RedisActor {
110 1
    fn handle(&mut self, msg: Result<RespValue, RespError>, ctx: &mut Self::Context) {
111 1
        match msg {
112 1
            Err(e) => {
113 0
                if let Some(tx) = self.queue.pop_front() {
114 0
                    let _ = tx.send(Err(e.into()));
115
                }
116 0
                ctx.stop();
117
            }
118 1
            Ok(val) => {
119 1
                if let Some(tx) = self.queue.pop_front() {
120 1
                    let _ = tx.send(Ok(val));
121
                }
122
            }
123
        }
124
    }
125
}
126

127
impl Handler<Command> for RedisActor {
128
    type Result = ResponseFuture<Result<RespValue, Error>>;
129

130 1
    fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result {
131 1
        let (tx, rx) = oneshot::channel();
132 1
        if let Some(ref mut cell) = self.cell {
133 1
            self.queue.push_back(tx);
134 1
            cell.write(msg.0);
135
        } else {
136 1
            let _ = tx.send(Err(Error::NotConnected));
137
        }
138

139 1
        Box::pin(async move { rx.await.map_err(|_| Error::Disconnected)? })
140
    }
141
}

Read our documentation on viewing source code .

Loading