use std::{
path::PathBuf,
sync::{Arc, Mutex},
};
use tokio::sync::{mpsc, oneshot};
use tracing::info;
#[derive(Debug, rspc::Type, serde::Serialize)]
pub struct QueueInfo {
pub tasks: Vec<JobTask>,
}
#[derive(Debug)]
pub enum JobCommand {
Scan {
path: PathBuf,
retry_count: u8,
}
}
#[derive(Debug, Clone, rspc::Type, serde::Serialize)]
pub enum JobTask {
Scan {
path: PathBuf,
retry_count: u8,
}
}
pub struct Queue {
pub queue: Mutex<Vec<JobTask>>,
pub channel: mpsc::UnboundedSender<()>,
}
impl Queue {
pub fn new() -> (Self, mpsc::UnboundedReceiver<()>) {
let channel = mpsc::unbounded_channel();
(
Self {
queue: Mutex::new(vec![]),
channel: channel.0,
},
channel.1,
)
}
pub fn enqueue(&self, item: JobTask) {
info!("Enqueue: {:?}", item);
self.queue.lock().unwrap().push(item);
self.channel.send(()).unwrap();
}
pub fn dequeue(&self) -> Option<JobTask> {
self.queue.lock().unwrap().pop()
}
pub fn clear(&self) {
self.queue.lock().unwrap().clear();
}
}
#[tracing::instrument(skip(job_receiver))]
pub async fn start_job(mut job_receiver: JobReceiver) {
let (queue, mut receiver) = Queue::new();
let queue = Arc::new(queue);
{
let queue = queue.clone();
tokio::spawn(async move {
while let Some(job) = job_receiver.recv().await {
match job {
JobCommand::Scan { path, retry_count } => {
queue.enqueue(JobTask::Scan { path, retry_count });
}
}
}
});
}
let semaphore = Arc::new(tokio::sync::Semaphore::new(1));
loop {
if receiver.recv().await.is_some() {
while let Some(item) = queue.dequeue() {
let semaphore = semaphore.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
match item {
JobTask::Scan { path, retry_count } => {
let q2 = queue.clone();
tokio::spawn(async move {
let _permit = permit;
scan_job::scan_job(&path, q2, retry_count).await;
});
}
}
}
}
}
}