nazo6 notememo

Rustでジョブキュー的なもの

作成:2023/11/15

更新:2023/11/15

  • 実行するコマンド(EnqueueかClear)をチャネルで受け取る
  • キューの本体はMutex<Vec<QueueKind>>であり、別のチャネルを作ってEnqueueタスクの実行を通知する
  • 同時実行数をセマフォで管理
  • データの更新が通知されるとキューを取りにいっているがもっといい方法がある気がする。
use std::{
path::PathBuf,
sync::{Arc, Mutex},
};

use tokio::sync::{mpsc, oneshot};
use tracing::info;

#[derive(Debug, rspc::Type, serde::Serialize)]
pub struct QueueInfo {
pub tasks: Vec<JobTask>,
}

#[derive(Debug)]
pub enum JobCommand {
Scan {
path: PathBuf,
retry_count: u8,
}
}

#[derive(Debug, Clone, rspc::Type, serde::Serialize)]
pub enum JobTask {
Scan {
path: PathBuf,
retry_count: u8,
}
}

pub struct Queue {
pub queue: Mutex<Vec<JobTask>>,
pub channel: mpsc::UnboundedSender<()>,
}
impl Queue {
pub fn new() -> (Self, mpsc::UnboundedReceiver<()>) {
let channel = mpsc::unbounded_channel();
(
Self {
queue: Mutex::new(vec![]),
channel: channel.0,
},
channel.1,
)
}
pub fn enqueue(&self, item: JobTask) {
info!("Enqueue: {:?}", item);
self.queue.lock().unwrap().push(item);
self.channel.send(()).unwrap();
}
pub fn dequeue(&self) -> Option<JobTask> {
self.queue.lock().unwrap().pop()
}
pub fn clear(&self) {
self.queue.lock().unwrap().clear();
}
}

#[tracing::instrument(skip(job_receiver))]
pub async fn start_job(mut job_receiver: JobReceiver) {
let (queue, mut receiver) = Queue::new();
let queue = Arc::new(queue);

{
let queue = queue.clone();
tokio::spawn(async move {
while let Some(job) = job_receiver.recv().await {
match job {
JobCommand::Scan { path, retry_count } => {
queue.enqueue(JobTask::Scan { path, retry_count });
}
}
}
});
}

let semaphore = Arc::new(tokio::sync::Semaphore::new(1));

loop {
if receiver.recv().await.is_some() {
while let Some(item) = queue.dequeue() {
let semaphore = semaphore.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
match item {
JobTask::Scan { path, retry_count } => {
let q2 = queue.clone();
tokio::spawn(async move {
let _permit = permit;
scan_job::scan_job(&path, q2, retry_count).await;
});
}
}
}
}
}
}