added interval tasks

This commit is contained in:
wrk 2023-05-30 01:33:23 +02:00
parent 794a08e32c
commit 78349705f0
2 changed files with 55 additions and 11 deletions

View File

@ -126,11 +126,8 @@ impl Irc {
return; return;
} }
//TODO:
// MOVE RUN_SYSTEM BACK TO IRC
let mut context = self.context.write().await; let mut context = self.context.write().await;
let response = context.run_system(prefix, &sys_name); let response = context.run_system(prefix, &sys_name).await;
if response.0.is_none() { if response.0.is_none() {
return; return;

View File

@ -12,7 +12,7 @@ use std::{
net::ToSocketAddrs, net::ToSocketAddrs,
path::Path, path::Path,
sync::Arc, sync::Arc,
time::SystemTime, time::{Duration, SystemTime},
}; };
use async_native_tls::TlsStream; use async_native_tls::TlsStream;
@ -168,7 +168,8 @@ pub struct Context {
send_queue: VecDeque<String>, send_queue: VecDeque<String>,
systems: HashMap<String, StoredSystem>, systems: HashMap<String, StoredSystem>,
factory: Factory, interval_tasks: Vec<(Duration, StoredSystem)>,
factory: Arc<RwLock<Factory>>,
} }
impl Context { impl Context {
@ -274,16 +275,47 @@ impl Context {
self self
} }
pub fn add_resource<R: Send + Sync + 'static>(&mut self, res: R) -> &mut Self { pub fn add_interval_task<I, S: for<'a> System<'a> + Send + Sync + 'static>(
&mut self,
duration: Duration,
system_task: impl for<'a> IntoSystem<'a, I, System = S>,
) -> &mut Self {
self.interval_tasks
.push((duration, Box::new(system_task.into_system())));
self
}
pub async fn add_resource<R: Send + Sync + 'static>(&mut self, res: R) -> &mut Self {
self.factory self.factory
.write()
.await
.resources .resources
.insert(TypeId::of::<R>(), Box::new(res)); .insert(TypeId::of::<R>(), Box::new(res));
self self
} }
pub fn run_system<'a>(&mut self, prefix: &'a IrcPrefix, name: &str) -> Response { pub async fn run_system<'a>(&mut self, prefix: &'a IrcPrefix<'a>, name: &str) -> Response {
let system = self.systems.get_mut(name).unwrap(); let system = self.systems.get_mut(name).unwrap();
system.run(prefix, &mut self.factory) system.run(prefix, &mut *self.factory.write().await)
}
pub async fn run_interval_tasks(&mut self) {
for (task_duration, mut task) in std::mem::take(&mut self.interval_tasks) {
let fact = self.factory.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(task_duration).await;
task.run(
&IrcPrefix {
nick: "",
user: None,
host: None,
},
&mut *fact.write().await,
);
}
});
}
} }
} }
@ -311,7 +343,8 @@ impl Irc {
identified: false, identified: false,
send_queue: VecDeque::new(), send_queue: VecDeque::new(),
systems: HashMap::default(), systems: HashMap::default(),
factory: Factory::default(), interval_tasks: Vec::new(),
factory: Arc::new(RwLock::new(Factory::default())),
})); }));
Ok(Self { Ok(Self {
@ -334,10 +367,22 @@ impl Irc {
self self
} }
pub async fn add_interval_task<I, S: for<'a> System<'a> + Send + Sync + 'static>(
&mut self,
duration: Duration,
system: impl for<'a> IntoSystem<'a, I, System = S>,
) -> &mut Self {
{
let mut context = self.context.write().await;
context.add_interval_task(duration, system);
}
self
}
pub async fn add_resource<R: Send + Sync + 'static>(&mut self, res: R) -> &mut Self { pub async fn add_resource<R: Send + Sync + 'static>(&mut self, res: R) -> &mut Self {
{ {
let mut context = self.context.write().await; let mut context = self.context.write().await;
context.add_resource(res); context.add_resource(res).await;
} }
self self
} }
@ -448,10 +493,12 @@ impl Irc {
} }
pub async fn run(&mut self) -> std::io::Result<()> { pub async fn run(&mut self) -> std::io::Result<()> {
self.connect().await?;
info!("Ready!"); info!("Ready!");
{ {
let mut context = self.context.write().await; let mut context = self.context.write().await;
context.register(); context.register();
context.run_interval_tasks().await;
} }
let stream = self.stream.take().unwrap(); let stream = self.stream.take().unwrap();