nazo6 knowledge

Rustでジョブキュー的なもの

作成:2023/11/15 0:00:00

更新:2023/11/15 0:00:00

  • 実行するコマンド(EnqueueかClear)をチャネルで受け取る
  • キューの本体はMutex<Vec<QueueKind>>であり、別のチャネルを作ってEnqueueタスクの実行を通知する
  • 同時実行数をセマフォで管理
  • データの更新が通知されるとキューを取りにいっているがもっといい方法がある気がする。
use std::{
    path::PathBuf,
    sync::{Arc, Mutex},
};

use tokio::sync::{mpsc, oneshot};
use tracing::info;

#[derive(Debug, rspc::Type, serde::Serialize)]
pub struct QueueInfo {
    pub tasks: Vec<JobTask>,
}

#[derive(Debug)]
pub enum JobCommand {
    Scan {
        path: PathBuf,
        retry_count: u8,
    }
}

#[derive(Debug, Clone, rspc::Type, serde::Serialize)]
pub enum JobTask {
    Scan {
        path: PathBuf,
        retry_count: u8,
    }
}

pub struct Queue {
    pub queue: Mutex<Vec<JobTask>>,
    pub channel: mpsc::UnboundedSender<()>,
}
impl Queue {
    pub fn new() -> (Self, mpsc::UnboundedReceiver<()>) {
        let channel = mpsc::unbounded_channel();
        (
            Self {
                queue: Mutex::new(vec![]),
                channel: channel.0,
            },
            channel.1,
        )
    }
    pub fn enqueue(&self, item: JobTask) {
        info!("Enqueue: {:?}", item);
        self.queue.lock().unwrap().push(item);
        self.channel.send(()).unwrap();
    }
    pub fn dequeue(&self) -> Option<JobTask> {
        self.queue.lock().unwrap().pop()
    }
    pub fn clear(&self) {
        self.queue.lock().unwrap().clear();
    }
}

#[tracing::instrument(skip(job_receiver))]
pub async fn start_job(mut job_receiver: JobReceiver) {
    let (queue, mut receiver) = Queue::new();
    let queue = Arc::new(queue);

    {
        let queue = queue.clone();
        tokio::spawn(async move {
            while let Some(job) = job_receiver.recv().await {
                match job {
                    JobCommand::Scan { path, retry_count } => {
                        queue.enqueue(JobTask::Scan { path, retry_count });
                    }
                }
            }
        });
    }

    let semaphore = Arc::new(tokio::sync::Semaphore::new(1));

    loop {
        if receiver.recv().await.is_some() {
            while let Some(item) = queue.dequeue() {
                let semaphore = semaphore.clone();
                let permit = semaphore.clone().acquire_owned().await.unwrap();
                match item {
                    JobTask::Scan { path, retry_count } => {
                        let q2 = queue.clone();
                        tokio::spawn(async move {
                            let _permit = permit;
                            scan_job::scan_job(&path, q2, retry_count).await;
                        });
                    }
                }
            }
        }
    }
}