diff --git a/src/lib.rs b/src/lib.rs index 9265f69..9636e48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,7 +25,7 @@ use tokio::{ fs::File, io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf}, net::TcpStream, - sync::RwLock, + sync::{mpsc, RwLock}, }; pub(crate) const MAX_MSG_LEN: usize = 512; @@ -299,13 +299,14 @@ impl Context { system.run(prefix, &mut *self.factory.write().await) } - pub async fn run_interval_tasks(&mut self) { + pub async fn run_interval_tasks(&mut self, tx: mpsc::Sender>) { for (task_duration, mut task) in std::mem::take(&mut self.interval_tasks) { let fact = self.factory.clone(); + let task_tx = tx.clone(); tokio::spawn(async move { loop { tokio::time::sleep(task_duration).await; - task.run( + let resp = task.run( &IrcPrefix { nick: "", user: None, @@ -313,6 +314,10 @@ impl Context { }, &mut *fact.write().await, ); + if resp.0.is_none() { + continue; + } + task_tx.send(resp.0.unwrap()).await.unwrap() } }); } @@ -495,16 +500,24 @@ impl Irc { pub async fn run(&mut self) -> std::io::Result<()> { self.connect().await?; info!("Ready!"); + let (tx, mut rx) = mpsc::channel::>(512); { let mut context = self.context.write().await; context.register(); - context.run_interval_tasks().await; + context.run_interval_tasks(tx).await; } let stream = self.stream.take().unwrap(); let (mut reader, mut writer) = tokio::io::split(stream); + let cloned_ctx = self.context.clone(); + tokio::spawn(async move { + loop { + handle_rx(&mut rx, &cloned_ctx).await; + } + }); + let cloned_ctx = self.context.clone(); tokio::spawn(async move { loop { @@ -519,6 +532,16 @@ impl Irc { } } +async fn handle_rx(rx: &mut mpsc::Receiver>, arc_context: &RwLock) { + while let Some(data) = rx.recv().await { + let mut context = arc_context.write().await; + + for line in data { + context.privmsg_all(&line); + } + } +} + async fn send( writer: &mut WriteHalf, arc_context: &RwLock,