fixed interval tasks not outputing data

This commit is contained in:
wrk 2023-05-30 02:10:55 +02:00
parent 78349705f0
commit c8d132767a
1 changed files with 27 additions and 4 deletions

View File

@ -25,7 +25,7 @@ use tokio::{
fs::File, fs::File,
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf}, io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf},
net::TcpStream, net::TcpStream,
sync::RwLock, sync::{mpsc, RwLock},
}; };
pub(crate) const MAX_MSG_LEN: usize = 512; pub(crate) const MAX_MSG_LEN: usize = 512;
@ -299,13 +299,14 @@ impl Context {
system.run(prefix, &mut *self.factory.write().await) system.run(prefix, &mut *self.factory.write().await)
} }
pub async fn run_interval_tasks(&mut self) { pub async fn run_interval_tasks(&mut self, tx: mpsc::Sender<Vec<String>>) {
for (task_duration, mut task) in std::mem::take(&mut self.interval_tasks) { for (task_duration, mut task) in std::mem::take(&mut self.interval_tasks) {
let fact = self.factory.clone(); let fact = self.factory.clone();
let task_tx = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
tokio::time::sleep(task_duration).await; tokio::time::sleep(task_duration).await;
task.run( let resp = task.run(
&IrcPrefix { &IrcPrefix {
nick: "", nick: "",
user: None, user: None,
@ -313,6 +314,10 @@ impl Context {
}, },
&mut *fact.write().await, &mut *fact.write().await,
); );
if resp.0.is_none() {
continue;
}
task_tx.send(resp.0.unwrap()).await.unwrap()
} }
}); });
} }
@ -495,16 +500,24 @@ impl Irc {
pub async fn run(&mut self) -> std::io::Result<()> { pub async fn run(&mut self) -> std::io::Result<()> {
self.connect().await?; self.connect().await?;
info!("Ready!"); info!("Ready!");
let (tx, mut rx) = mpsc::channel::<Vec<String>>(512);
{ {
let mut context = self.context.write().await; let mut context = self.context.write().await;
context.register(); context.register();
context.run_interval_tasks().await; context.run_interval_tasks(tx).await;
} }
let stream = self.stream.take().unwrap(); let stream = self.stream.take().unwrap();
let (mut reader, mut writer) = tokio::io::split(stream); let (mut reader, mut writer) = tokio::io::split(stream);
let cloned_ctx = self.context.clone();
tokio::spawn(async move {
loop {
handle_rx(&mut rx, &cloned_ctx).await;
}
});
let cloned_ctx = self.context.clone(); let cloned_ctx = self.context.clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -519,6 +532,16 @@ impl Irc {
} }
} }
async fn handle_rx(rx: &mut mpsc::Receiver<Vec<String>>, arc_context: &RwLock<Context>) {
while let Some(data) = rx.recv().await {
let mut context = arc_context.write().await;
for line in data {
context.privmsg_all(&line);
}
}
}
async fn send<T: AsyncWrite>( async fn send<T: AsyncWrite>(
writer: &mut WriteHalf<T>, writer: &mut WriteHalf<T>,
arc_context: &RwLock<Context>, arc_context: &RwLock<Context>,