import json import os import cv2 import threading from datetime import datetime from queue import Queue from typing import Dict from app.core.model import Model from app.services.model import TaskStatus, TaskStore class Worker: def __init__(self): self.detection = Model().getModel() self.task_queue = Queue() self.task_store: Dict[str, TaskStore] = {} threading.Thread(target=self.worker, daemon=True).start() def worker(self): from app.main import UPLOAD_DIR while True: task_id = self.task_queue.get() if task_id is None: break task = self.task_store.get(task_id) if not task: continue try: task.status = TaskStatus.PROCESSING.name task.progress = 0 print(f"开始处理任务 {task_id}...") # 创建输出目录 output_dir = os.path.join(UPLOAD_DIR, task_id, "outputs") os.makedirs(output_dir, exist_ok=True) for idx, input_img_path in enumerate(task.images): print(f"处理任务 {task_id}, 处理图片 {input_img_path}...") img_res, coords_res = self.detection.detect(input_img_path) coords_res = [{"name": name, "coords": coords} for name, coords in coords_res] coords_json = json.dumps(coords_res, ensure_ascii=False) out_img_path = os.path.join(str(output_dir), f"{idx}.jpg") cv2.imwrite(out_img_path, img_res) task.result.append( {"input_img_path": input_img_path, "output_img_path": out_img_path, "coords": coords_json}) task.progress = int((idx + 1) / len(task.images) * 100) task.status = TaskStatus.COMPLETED.name task.completedAt = datetime.now() task.message = "处理完成" print(f"任务 {task_id} 处理完成") except Exception as e: task.status = TaskStatus.FAILED.name task.message = str(e) finally: self.task_queue.task_done()