actix / actix-extras

@@ -1,98 +1,141 @@
Loading
1 1
use std::collections::VecDeque;
2 -
use std::net::SocketAddr;
2 +
use std::io;
3 3
4 -
use redis_async::client::{paired_connect, PairedConnection};
5 -
use redis_async::resp::RespValue;
6 -
use tokio::sync::Mutex;
7 -
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
8 -
use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
4 +
use actix::prelude::*;
5 +
use actix_rt::net::TcpStream;
6 +
use actix_service::boxed::{service, BoxService};
7 +
use actix_tls::connect::{default_connector, Connect, ConnectError, Connection};
8 +
use backoff::backoff::Backoff;
9 +
use backoff::ExponentialBackoff;
10 +
use log::{error, info, warn};
11 +
use redis_async::error::Error as RespError;
12 +
use redis_async::resp::{RespCodec, RespValue};
13 +
use tokio::io::{split, WriteHalf};
14 +
use tokio::sync::oneshot;
15 +
use tokio_util::codec::FramedRead;
9 16
10 17
use crate::Error;
11 18
12 -
pub struct RedisClient {
19 +
/// Command for send data to Redis
20 +
#[derive(Debug)]
21 +
pub struct Command(pub RespValue);
22 +
23 +
impl Message for Command {
24 +
    type Result = Result<RespValue, Error>;
25 +
}
26 +
27 +
/// Redis communication actor
28 +
pub struct RedisActor {
13 29
    addr: String,
14 -
    connection: Mutex<Option<PairedConnection>>,
30 +
    connector: BoxService<Connect<String>, Connection<String, TcpStream>, ConnectError>,
31 +
    backoff: ExponentialBackoff,
32 +
    cell: Option<actix::io::FramedWrite<RespValue, WriteHalf<TcpStream>, RespCodec>>,
33 +
    queue: VecDeque<oneshot::Sender<Result<RespValue, Error>>>,
15 34
}
16 35
17 -
impl RedisClient {
18 -
    pub fn new(addr: impl Into<String>) -> Self {
19 -
        Self {
20 -
            addr: addr.into(),
21 -
            connection: Mutex::new(None),
22 -
        }
36 +
impl RedisActor {
37 +
    /// Start new `Supervisor` with `RedisActor`.
38 +
    pub fn start<S: Into<String>>(addr: S) -> Addr<RedisActor> {
39 +
        let addr = addr.into();
40 +
41 +
        let backoff = ExponentialBackoff {
42 +
            max_elapsed_time: None,
43 +
            ..Default::default()
44 +
        };
45 +
46 +
        Supervisor::start(|_| RedisActor {
47 +
            addr,
48 +
            connector: service(default_connector()),
49 +
            cell: None,
50 +
            backoff,
51 +
            queue: VecDeque::new(),
52 +
        })
23 53
    }
54 +
}
24 55
25 -
    async fn get_connection(&self) -> Result<PairedConnection, Error> {
26 -
        let mut connection = self.connection.lock().await;
27 -
        if let Some(ref connection) = *connection {
28 -
            return Ok(connection.clone());
29 -
        }
56 +
impl Actor for RedisActor {
57 +
    type Context = Context<Self>;
30 58
31 -
        let mut addrs = resolve(&self.addr).await?;
32 -
        loop {
33 -
            // try to connect
34 -
            let socket_addr = addrs.pop_front().ok_or_else(|| {
35 -
                log::warn!("Cannot connect to {}.", self.addr);
36 -
                Error::NotConnected
37 -
            })?;
38 -
            match paired_connect(socket_addr).await {
59 +
    fn started(&mut self, ctx: &mut Context<Self>) {
60 +
        let req = Connect::new(self.addr.to_owned());
61 +
        self.connector
62 +
            .call(req)
63 +
            .into_actor(self)
64 +
            .map(|res, act, ctx| match res {
39 65
                Ok(conn) => {
40 -
                    *connection = Some(conn.clone());
41 -
                    return Ok(conn);
66 +
                    let stream = conn.into_parts().0;
67 +
                    info!("Connected to redis server: {}", act.addr);
68 +
69 +
                    let (r, w) = split(stream);
70 +
71 +
                    // configure write side of the connection
72 +
                    let framed = actix::io::FramedWrite::new(w, RespCodec, ctx);
73 +
                    act.cell = Some(framed);
74 +
75 +
                    // read side of the connection
76 +
                    ctx.add_stream(FramedRead::new(r, RespCodec));
77 +
78 +
                    act.backoff.reset();
42 79
                }
43 -
                Err(err) => log::warn!(
44 -
                    "Attempt to connect to {} as {} failed: {}.",
45 -
                    self.addr,
46 -
                    socket_addr,
47 -
                    err
48 -
                ),
49 -
            }
50 -
        }
80 +
                Err(err) => {
81 +
                    error!("Can not connect to redis server: {}", err);
82 +
                    // re-connect with backoff time.
83 +
                    // we stop current context, supervisor will restart it.
84 +
                    if let Some(timeout) = act.backoff.next_backoff() {
85 +
                        ctx.run_later(timeout, |_, ctx| ctx.stop());
86 +
                    }
87 +
                }
88 +
            })
89 +
            .wait(ctx);
51 90
    }
91 +
}
52 92
53 -
    pub async fn send(&self, req: RespValue) -> Result<RespValue, Error> {
54 -
        let res = self.get_connection().await?.send(req).await?;
55 -
        Ok(res)
93 +
impl Supervised for RedisActor {
94 +
    fn restarting(&mut self, _: &mut Self::Context) {
95 +
        self.cell.take();
96 +
        for tx in self.queue.drain(..) {
97 +
            let _ = tx.send(Err(Error::Disconnected));
98 +
        }
56 99
    }
57 100
}
58 101
59 -
fn parse_addr(addr: &str, default_port: u16) -> Option<(&str, u16)> {
60 -
    // split the string by ':' and convert the second part to u16
61 -
    let mut parts_iter = addr.splitn(2, ':');
62 -
    let host = parts_iter.next()?;
63 -
    let port_str = parts_iter.next().unwrap_or("");
64 -
    let port: u16 = port_str.parse().unwrap_or(default_port);
65 -
    Some((host, port))
102 +
impl actix::io::WriteHandler<io::Error> for RedisActor {
103 +
    fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
104 +
        warn!("Redis connection dropped: {} error: {}", self.addr, err);
105 +
        Running::Stop
106 +
    }
66 107
}
67 108
68 -
async fn resolve(addr: &str) -> Result<VecDeque<SocketAddr>, Error> {
69 -
    // try to parse as a regular SocketAddr first
70 -
    if let Ok(addr) = addr.parse::<SocketAddr>() {
71 -
        let mut addrs = VecDeque::new();
72 -
        addrs.push_back(addr);
73 -
        return Ok(addrs);
109 +
impl StreamHandler<Result<RespValue, RespError>> for RedisActor {
110 +
    fn handle(&mut self, msg: Result<RespValue, RespError>, ctx: &mut Self::Context) {
111 +
        match msg {
112 +
            Err(e) => {
113 +
                if let Some(tx) = self.queue.pop_front() {
114 +
                    let _ = tx.send(Err(e.into()));
115 +
                }
116 +
                ctx.stop();
117 +
            }
118 +
            Ok(val) => {
119 +
                if let Some(tx) = self.queue.pop_front() {
120 +
                    let _ = tx.send(Ok(val));
121 +
                }
122 +
            }
123 +
        }
74 124
    }
125 +
}
75 126
76 -
    let (host, port) = parse_addr(addr, 6379).ok_or(Error::InvalidAddress)?;
127 +
impl Handler<Command> for RedisActor {
128 +
    type Result = ResponseFuture<Result<RespValue, Error>>;
77 129
78 -
    // we need to do dns resolution
79 -
    let resolver = AsyncResolver::tokio_from_system_conf()
80 -
        .or_else(|err| {
81 -
            log::warn!("Cannot create system DNS resolver: {}", err);
82 -
            AsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default())
83 -
        })
84 -
        .map_err(|err| {
85 -
            log::error!("Cannot create DNS resolver: {}", err);
86 -
            Error::ResolveError
87 -
        })?;
88 -
89 -
    let addrs = resolver
90 -
        .lookup_ip(host)
91 -
        .await
92 -
        .map_err(|_| Error::ResolveError)?
93 -
        .into_iter()
94 -
        .map(|ip| SocketAddr::new(ip, port))
95 -
        .collect();
96 -
97 -
    Ok(addrs)
130 +
    fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result {
131 +
        let (tx, rx) = oneshot::channel();
132 +
        if let Some(ref mut cell) = self.cell {
133 +
            self.queue.push_back(tx);
134 +
            cell.write(msg.0);
135 +
        } else {
136 +
            let _ = tx.send(Err(Error::NotConnected));
137 +
        }
138 +
139 +
        Box::pin(async move { rx.await.map_err(|_| Error::Disconnected)? })
140 +
    }
98 141
}

@@ -1,21 +1,19 @@
Loading
1 -
use std::cell::RefCell;
2 -
use std::pin::Pin;
3 -
use std::task::{Context, Poll};
4 1
use std::{collections::HashMap, iter, rc::Rc};
5 2
3 +
use actix::prelude::*;
6 4
use actix_service::{Service, Transform};
7 5
use actix_session::{Session, SessionStatus};
8 6
use actix_web::cookie::{Cookie, CookieJar, Key, SameSite};
9 7
use actix_web::dev::{ServiceRequest, ServiceResponse};
10 8
use actix_web::http::header::{self, HeaderValue};
11 9
use actix_web::{error, Error, HttpMessage};
12 -
use futures_util::future::{ok, Future, Ready};
10 +
use futures_core::future::LocalBoxFuture;
13 11
use rand::{distributions::Alphanumeric, rngs::OsRng, Rng};
14 12
use redis_async::resp::RespValue;
15 13
use redis_async::resp_array;
16 14
use time::{self, Duration, OffsetDateTime};
17 15
18 -
use crate::redis::RedisClient;
16 +
use crate::redis::{Command, RedisActor};
19 17
20 18
/// Use redis as session storage.
21 19
///
@@ -35,7 +33,7 @@
Loading
35 33
            key: Key::derive_from(key),
36 34
            cache_keygen: Box::new(|key: &str| format!("session:{}", &key)),
37 35
            ttl: "7200".to_owned(),
38 -
            redis_client: RedisClient::new(addr),
36 +
            addr: RedisActor::start(addr),
39 37
            name: "actix-session".to_owned(),
40 38
            path: "/".to_owned(),
41 39
            domain: None,
@@ -120,21 +118,24 @@
Loading
120 118
{
121 119
    type Response = ServiceResponse<B>;
122 120
    type Error = S::Error;
123 -
    type InitError = ();
124 121
    type Transform = RedisSessionMiddleware<S>;
125 -
    type Future = Ready<Result<Self::Transform, Self::InitError>>;
122 +
    type InitError = ();
123 +
    type Future = LocalBoxFuture<'static, Result<Self::Transform, Self::InitError>>;
126 124
127 125
    fn new_transform(&self, service: S) -> Self::Future {
128 -
        ok(RedisSessionMiddleware {
129 -
            service: Rc::new(RefCell::new(service)),
130 -
            inner: self.0.clone(),
126 +
        let inner = self.0.clone();
127 +
        Box::pin(async {
128 +
            Ok(RedisSessionMiddleware {
129 +
                service: Rc::new(service),
130 +
                inner,
131 +
            })
131 132
        })
132 133
    }
133 134
}
134 135
135 136
/// Cookie session middleware
136 137
pub struct RedisSessionMiddleware<S: 'static> {
137 -
    service: Rc<RefCell<S>>,
138 +
    service: Rc<S>,
138 139
    inner: Rc<Inner>,
139 140
}
140 141
@@ -146,12 +147,9 @@
Loading
146 147
{
147 148
    type Response = ServiceResponse<B>;
148 149
    type Error = Error;
149 -
    #[allow(clippy::type_complexity)]
150 -
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
150 +
    type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
151 151
152 -
    fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153 -
        self.service.borrow_mut().poll_ready(cx)
154 -
    }
152 +
    actix_service::forward_ready!(service);
155 153
156 154
    fn call(&self, mut req: ServiceRequest) -> Self::Future {
157 155
        let srv = self.service.clone();
@@ -210,7 +208,7 @@
Loading
210 208
    key: Key,
211 209
    cache_keygen: Box<dyn Fn(&str) -> String>,
212 210
    ttl: String,
213 -
    redis_client: RedisClient,
211 +
    addr: Addr<RedisActor>,
214 212
    name: String,
215 213
    path: String,
216 214
    domain: Option<String>,
@@ -252,9 +250,11 @@
Loading
252 250
        };
253 251
254 252
        let val = self
255 -
            .redis_client
256 -
            .send(resp_array!["GET", cache_key])
257 -
            .await?;
253 +
            .addr
254 +
            .send(Command(resp_array!["GET", cache_key]))
255 +
            .await
256 +
            .map_err(error::ErrorInternalServerError)?
257 +
            .map_err(error::ErrorInternalServerError)?;
258 258
259 259
        match val {
260 260
            RespValue::Error(err) => {
@@ -285,11 +285,11 @@
Loading
285 285
        let (value, jar) = if let Some(value) = value {
286 286
            (value, None)
287 287
        } else {
288 -
            let value: String = iter::repeat(())
288 +
            let value = iter::repeat(())
289 289
                .map(|()| OsRng.sample(Alphanumeric))
290 -
                .map(char::from)
291 290
                .take(32)
292 -
                .collect();
291 +
                .collect::<Vec<_>>();
292 +
            let value = String::from_utf8(value).unwrap_or_default();
293 293
294 294
            // prepare session id cookie
295 295
            let mut cookie = Cookie::new(self.name.clone(), value.clone());
@@ -325,9 +325,13 @@
Loading
325 325
            Ok(body) => body,
326 326
        };
327 327
328 -
        self.redis_client
329 -
            .send(resp_array!["SET", cache_key, body, "EX", &self.ttl])
330 -
            .await?;
328 +
        let cmd = Command(resp_array!["SET", cache_key, body, "EX", &self.ttl]);
329 +
330 +
        self.addr
331 +
            .send(cmd)
332 +
            .await
333 +
            .map_err(error::ErrorInternalServerError)?
334 +
            .map_err(error::ErrorInternalServerError)?;
331 335
332 336
        if let Some(jar) = jar {
333 337
            for cookie in jar.delta() {
@@ -343,13 +347,15 @@
Loading
343 347
    async fn clear_cache(&self, key: String) -> Result<(), Error> {
344 348
        let cache_key = (self.cache_keygen)(&key);
345 349
346 -
        match self
347 -
            .redis_client
348 -
            .send(resp_array!["DEL", cache_key])
349 -
            .await?
350 -
        {
350 +
        let res = self
351 +
            .addr
352 +
            .send(Command(resp_array!["DEL", cache_key]))
353 +
            .await
354 +
            .map_err(error::ErrorInternalServerError)?;
355 +
356 +
        match res {
351 357
            // redis responds with number of deleted records
352 -
            RespValue::Integer(x) if x > 0 => Ok(()),
358 +
            Ok(RespValue::Integer(x)) if x > 0 => Ok(()),
353 359
            _ => Err(error::ErrorInternalServerError(
354 360
                "failed to remove session from cache",
355 361
            )),

@@ -1,31 +1,42 @@
Loading
1 1
#[macro_use]
2 2
extern crate redis_async;
3 3
4 -
use actix_redis::{Error, RedisClient, RespValue};
4 +
use actix_redis::{Command, Error, RedisActor, RespValue};
5 5
6 6
#[actix_rt::test]
7 7
async fn test_error_connect() {
8 -
    let addr = RedisClient::new("localhost:54000");
8 +
    let addr = RedisActor::start("localhost:54000");
9 +
    let _addr2 = addr.clone();
9 10
10 -
    let res = addr.send(resp_array!["GET", "test"]).await;
11 +
    let res = addr.send(Command(resp_array!["GET", "test"])).await;
11 12
    match res {
12 -
        Err(Error::NotConnected) => (),
13 +
        Ok(Err(Error::NotConnected)) => (),
13 14
        _ => panic!("Should not happen {:?}", res),
14 15
    }
15 16
}
16 17
17 18
#[actix_rt::test]
18 -
async fn test_redis() -> Result<(), Error> {
19 +
async fn test_redis() {
19 20
    env_logger::init();
20 21
21 -
    let addr = RedisClient::new("127.0.0.1:6379");
22 +
    let addr = RedisActor::start("127.0.0.1:6379");
23 +
    let res = addr
24 +
        .send(Command(resp_array!["SET", "test", "value"]))
25 +
        .await;
22 26
23 -
    let resp = addr.send(resp_array!["SET", "test", "value"]).await?;
24 -
25 -
    assert_eq!(resp, RespValue::SimpleString("OK".to_owned()));
26 -
27 -
    let resp = addr.send(resp_array!["GET", "test"]).await?;
28 -
    println!("RESP: {:?}", resp);
29 -
    assert_eq!(resp, RespValue::BulkString((&b"value"[..]).into()));
30 -
    Ok(())
27 +
    match res {
28 +
        Ok(Ok(resp)) => {
29 +
            assert_eq!(resp, RespValue::SimpleString("OK".to_owned()));
30 +
31 +
            let res = addr.send(Command(resp_array!["GET", "test"])).await;
32 +
            match res {
33 +
                Ok(Ok(resp)) => {
34 +
                    println!("RESP: {:?}", resp);
35 +
                    assert_eq!(resp, RespValue::BulkString((&b"value"[..]).into()));
36 +
                }
37 +
                _ => panic!("Should not happen {:?}", res),
38 +
            }
39 +
        }
40 +
        _ => panic!("Should not happen {:?}", res),
41 +
    }
31 42
}
Files Coverage
actix-cors 91.25%
actix-redis 82.47%
actix-session/src 89.07%
actix-web-httpauth/src 61.25%
actix-identity/src/lib.rs 95.96%
actix-protobuf/src/lib.rs 50.00%
Project Totals (29 files) 82.24%
1
comment: false
2

3
ignore: # ignore codecoverage on following paths
4
  - "**/examples"
5
  - ".github"
6
  - "**/*.md"
7
  - "**/*.toml"
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading