Skip to content

Commit

Permalink
add more mode for redis
Browse files Browse the repository at this point in the history
  • Loading branch information
markcty committed Jul 18, 2023
1 parent 339d935 commit 83e3610
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 21 deletions.
3 changes: 2 additions & 1 deletion dispatchers/wlf-kafka-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ impl KafkaDispatcher {
}
}

pub fn set_topic_template(&mut self, topic: impl Into<String>) {
pub fn set_topic_template(&mut self, topic: impl Into<String>) -> &mut Self {
self.topic = topic.into();
self
}

pub async fn start_dispatching(self, router: Arc<EventRouter>) -> Result<(), Error> {
Expand Down
2 changes: 1 addition & 1 deletion dispatchers/wlf-redis-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
wlf-core = { path = "../../wlf-core" }
tokio_wasi = { version = "1", features = ["rt", "time", "test-util", "macros"] }
redis_wasi = { version = "0.22.3", features = ["tokio-comp"] }
redis_wasi = { version = "0.22.3", features = ["tokio-comp", "json"] }
thiserror = "1.0.40"
utils = { path = "../../utils" }
tracing = "0.1.37"
Expand Down
119 changes: 100 additions & 19 deletions dispatchers/wlf-redis-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::Arc;

use redis::{AsyncCommands, RedisError};
use redis::{
AsyncCommands, ConnectionAddr, ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo,
RedisError,
};
use thiserror::Error;
use tracing::info;
use utils::substitute_with_event;
Expand All @@ -21,35 +24,108 @@ pub enum Error {

pub struct RedisDispatcher {
id: String,
key: String,
url: String,
mode: Mode,
config: Config,
}

pub struct Config {
host: String,
port: u16,
auth: Option<String>,
database_number: u8,
}

impl IntoConnectionInfo for Config {
fn into_connection_info(self) -> redis::RedisResult<ConnectionInfo> {
Ok(ConnectionInfo {
addr: ConnectionAddr::Tcp(self.host, self.port),
redis: RedisConnectionInfo {
db: self.database_number as i64,
username: None,
password: self.auth,
},
})
}
}

impl Default for Config {
fn default() -> Self {
Self {
host: "localhost".to_string(),
port: 6379,
auth: None,
database_number: 0,
}
}
}

pub enum Mode {
LPush { key: String },
RPush { key: String },
Pub { channel: String },
XADD { key: String },
}

impl Default for Mode {
fn default() -> Self {
Self::RPush {
key: "wlf".to_string(),
}
}
}

impl RedisDispatcher {
pub fn new(id: impl Into<String>, url: impl Into<String>) -> Self {
pub fn new(id: impl Into<String>) -> Self {
Self {
id: id.into(),
key: default_redis_key().into(),
url: url.into(),
config: Default::default(),
mode: Default::default(),
}
}

pub fn set_key_template(&mut self, key: impl Into<String>) {
self.key = key.into();
pub fn set_mode(&mut self, mode: Mode) -> &mut Self {
self.mode = mode;
self
}

pub fn set_password(&mut self, password: impl Into<String>) -> &mut Self {
self.config.auth = Some(password.into());
self
}

pub async fn start_dispatching(self, router: Arc<EventRouter>) -> Result<(), Error> {
let mut redis_client = redis::Client::open(self.url)?
let mut redis_client = redis::Client::open(self.config)?
.get_async_connection()
.await?;

while let Ok(event) = router.poll_event(&self.id).await {
info!("receive new event:\n{event:#?}");

let key = substitute_with_event(&self.key, &event).map_err(Error::TopicName)?;
let value = serde_json::to_string(&event)?;

redis_client.rpush(key, value).await?;
match &self.mode {
Mode::LPush { key } => {
let key = substitute_with_event(key, &event).map_err(Error::TopicName)?;
let value = serde_json::to_string(&event)?;
redis_client.lpush(key, value).await?;
}
Mode::RPush { key } => {
let key = substitute_with_event(key, &event).map_err(Error::TopicName)?;
let value = serde_json::to_string(&event)?;
redis_client.rpush(key, value).await?;
}
Mode::Pub { channel } => {
let channel =
substitute_with_event(channel, &event).map_err(Error::TopicName)?;
let value = serde_json::to_string(&event)?;
redis_client.publish(channel, value).await?;
}
Mode::XADD { key } => {
let key = substitute_with_event(key, &event).map_err(Error::TopicName)?;
let value = serde_json::to_string(&event)?;
redis_client
.xadd(key, "*", &[("event".to_string(), value)])
.await?;
}
}
}

Ok(())
Expand All @@ -66,10 +142,6 @@ impl ComponentApi for RedisDispatcher {
}
}

pub const fn default_redis_key() -> &'static str {
"wlf"
}

#[cfg(test)]
mod tests {
use std::{error::Error, time::Duration};
Expand All @@ -83,8 +155,17 @@ mod tests {
async fn run() -> Result<(), Box<dyn Error>> {
let _ = tracing_subscriber::fmt::try_init();

let mut dispatcher = RedisDispatcher::new("redis_dispatcher", "redis://127.0.0.1/");
dispatcher.set_key_template(r"wlf_%{/file}");
let mut dispatcher = RedisDispatcher::new("redis_dispatcher");

// dispatcher.set_mode(Mode::RPush {
// key: r"wlf_%{/file}".to_string(),
// });
// dispatcher.set_mode(Mode::XADD {
// key: r"wlf_%{/file}".to_string(),
// });
dispatcher.set_mode(Mode::Pub {
channel: r"wlf_%{/file}".to_string(),
});

let mut router = EventRouter::new();
router.register_component(&dispatcher);
Expand Down

0 comments on commit 83e3610

Please sign in to comment.