Boen_Shi 6a2e046884 feat(api): 添加图像分析功能和相关路由接口
- 新增 analyze、analyze_result、analyze_status 和 health 路由
- 实现图像上传和任务提交功能
- 添加任务状态查询和结果获取接口
- 集成 segformer 和 yolo 模型进行图像检测
- 实现 SAM3 预处理功能用于图像预处理判断
- 添加模型选择配置支持 segformer 和 yolo
- 实现任务队列管理和异步处理机制
- 添加 Dockerfile 用于容器化部署
- 配置环境变量和 gitignore 规则
- 创建数据模型定义 API 响应结构
2026-01-27 11:59:45 +08:00

292 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import cv2
import numpy as np
import onnxruntime as ort
CLASS_NAMES = {
0: "Hollowing",
1: "Water seepage",
2: "Cracking",
}
CLASS_COLORS = {
0: (0, 0, 255), # Hollowing -> 红色
1: (0, 255, 0), # Water seepage -> 绿色
2: (255, 0, 0), # Cracking -> 蓝色
}
IMG_SIZE = 640
class YOLOSeg:
def __init__(self, onnx_path: str = "model.onnx", imgsz: int = IMG_SIZE):
real_path = os.path.join(os.path.dirname(__file__), onnx_path)
self.session = ort.InferenceSession(
real_path,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"]
if ort.get_device() == "GPU"
else ["CPUExecutionProvider"],
)
self.ndtype = (
np.half
if self.session.get_inputs()[0].type == "tensor(float16)"
else np.single
)
self.imgsz = imgsz
self.classes = CLASS_NAMES
# ---------- 预处理letterbox ----------
def _preprocess(self, img_bgr):
h0, w0 = img_bgr.shape[:2]
new_shape = (self.imgsz, self.imgsz)
r = min(new_shape[0] / h0, new_shape[1] / w0)
ratio = (r, r)
new_unpad = (int(round(w0 * r)), int(round(h0 * r)))
pad_w = (new_shape[1] - new_unpad[0]) / 2
pad_h = (new_shape[0] - new_unpad[1]) / 2
if (w0, h0) != new_unpad:
img = cv2.resize(img_bgr, new_unpad, interpolation=cv2.INTER_LINEAR)
else:
img = img_bgr.copy()
top, bottom = int(round(pad_h - 0.1)), int(round(pad_h + 0.1))
left, right = int(round(pad_w - 0.1)), int(round(pad_w + 0.1))
img = cv2.copyMakeBorder(
img, top, bottom, left, right,
borderType=cv2.BORDER_CONSTANT, value=(114, 114, 114)
)
# HWC -> CHW, BGR->RGB, /255
img = np.ascontiguousarray(np.einsum("HWC->CHW", img)[::-1], dtype=self.ndtype)
img = img / 255.0
if img.ndim == 3:
img = img[None] # (1,3,H,W)
return img, ratio, (pad_w, pad_h)
# ---------- mask -> 多边形 ----------
@staticmethod
def _masks2segments(masks):
"""masks: (N,H,W) -> 每个实例的多边形坐标"""
segments = []
for x in masks.astype("uint8"):
cs = cv2.findContours(x, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[0]
if cs:
# 取点数最多的一条轮廓
c = np.array(cs[np.argmax([len(i) for i in cs])]).reshape(-1, 2)
else:
c = np.zeros((0, 2))
segments.append(c.astype("float32"))
return segments
@staticmethod
def _crop_mask(masks, boxes):
"""masks: (N,H,W), boxes: (N,4) xyxy"""
n, h, w = masks.shape
x1, y1, x2, y2 = np.split(boxes[:, :, None], 4, 1)
r = np.arange(w, dtype=x1.dtype)[None, None, :]
c = np.arange(h, dtype=x1.dtype)[None, :, None]
return masks * ((r >= x1) * (r < x2) * (c >= y1) * (c < y2))
@staticmethod
def _scale_mask(masks, im0_shape, ratio_pad=None):
"""把特征图上的 mask 缩放到原图大小"""
im1_shape = masks.shape[:2]
if ratio_pad is None:
gain = min(im1_shape[0] / im0_shape[0], im1_shape[1] / im0_shape[1])
pad = (
(im1_shape[1] - im0_shape[1] * gain) / 2,
(im1_shape[0] - im0_shape[0] * gain) / 2,
)
else:
pad = ratio_pad[1]
top, left = int(round(pad[1] - 0.1)), int(round(pad[0] - 0.1))
bottom = int(round(im1_shape[0] - pad[1] + 0.1))
right = int(round(im1_shape[1] - pad[0] + 0.1))
if masks.ndim < 2:
raise ValueError("masks ndim 应该是 2 或 3")
masks = masks[top:bottom, left:right]
masks = cv2.resize(
masks, (im0_shape[1], im0_shape[0]), interpolation=cv2.INTER_LINEAR
)
if masks.ndim == 2:
masks = masks[:, :, None]
return masks
def _process_mask(self, protos, masks_in, bboxes, im0_shape):
"""
protos: (C,Hm,Wm)
masks_in: (N,C)
bboxes: (N,4) xyxy
返回: (N,H,W) bool
"""
c, mh, mw = protos.shape
masks = (
np.matmul(masks_in, protos.reshape(c, -1))
.reshape(-1, mh, mw)
.transpose(1, 2, 0)
) # HWN
masks = np.ascontiguousarray(masks)
masks = self._scale_mask(masks, im0_shape) # HWC
masks = np.einsum("HWN->NHW", masks) # NHW
masks = self._crop_mask(masks, bboxes)
return masks > 0.5
@staticmethod
def _get_cid(name):
for k, v in CLASS_NAMES.items():
if v == name:
return k
@staticmethod
def _make_color_mask(img_bgr, masks, coords):
"""
生成一张“带颜色的掩码图”
- 背景为黑色
- 每个实例区域按类别上色(不叠加到原图)
返回color_mask (H,W,3) BGR uint8
"""
h, w = img_bgr.shape[:2]
color_mask = np.zeros((h, w, 3), dtype=np.uint8)
N = masks.shape[0]
for i in range(N):
m = masks[i] # (H,W) bool
inst = coords[i]
cid = YOLOSeg._get_cid(inst[0])
# print(f"name: {inst[0]}, cid: {cid}")
color = CLASS_COLORS.get(cid, (0, 255, 255)) # 没配置的类用黄青色
# 只在掩码区域上色
color_mask[m] = color
return color_mask
# ---------- 推理主入口 ----------
def detect(self, img_input):
conf_thres = 0.1
iou_thres = 0.1
"""
输入: 原始 BGR 图像
输出:
masks: (N,H,W) bool 掩码
coords: List[dict] 每个实例包含 class_name, confidence, points(多边形)
color_mask: 带有颜色的掩码图(黑背景,上面是彩色的缺陷区域)
"""
if isinstance(img_input, str):
img_bgr = cv2.imdecode(np.fromfile(img_input, dtype=np.uint8), cv2.IMREAD_COLOR)
else:
img_bgr = img_input
if img_bgr is None:
raise ValueError("img_bgr is None, 请检查图片读取是否成功")
im0 = img_bgr.copy()
im, ratio, (pad_w, pad_h) = self._preprocess(im0)
# ONNX 推理
input_name = self.session.get_inputs()[0].name
preds = self.session.run(None, {input_name: im})
x, protos = preds[0], preds[1] # x:(1,C,N), protos:(1,32,Hm,Wm)
# (1,C,N) -> (N,C)
x = np.einsum("bcn->bnc", x)[0] # (N, C)
# 从 protos 动态推断 mask 通道数
nm = int(protos.shape[1]) # 一般是 32
C = x.shape[1]
nc = C - 4 - nm # 类别数
# 类别分数区间 [4:4+nc]
cls_scores = x[:, 4:4 + nc]
cls_max = np.max(cls_scores, axis=-1)
keep = cls_max > conf_thres
x = x[keep]
cls_scores = cls_scores[keep]
h0, w0 = im0.shape[:2]
if x.size == 0:
# 没有检测到任何目标:返回空 mask、空坐标、空彩色掩码
empty_masks = np.zeros((0, h0, w0), dtype=bool)
empty_color_mask = np.zeros((h0, w0, 3), dtype=np.uint8)
return empty_masks, [], empty_color_mask
conf = cls_max[keep]
cls_id = np.argmax(cls_scores, axis=-1)
# 拼成 [cx,cy,w,h, conf, cls_id, mask_coeffs...]
x = np.c_[x[:, :4], conf, cls_id, x[:, -nm:]]
# ===== NMSOpenCV NMSBoxes 需要 [x, y, w, h] 左上角坐标 =====
# 当前 x[:, :4] 是 [cx, cy, w, h],先转换成 [x, y, w, h]
bboxes_xywh = x[:, :4].copy()
bboxes_xywh[:, 0] = bboxes_xywh[:, 0] - bboxes_xywh[:, 2] / 2 # x = cx - w/2
bboxes_xywh[:, 1] = bboxes_xywh[:, 1] - bboxes_xywh[:, 3] / 2 # y = cy - h/2
indices = cv2.dnn.NMSBoxes(
bboxes_xywh.tolist(), x[:, 4].tolist(), conf_thres, iou_thres
)
# 不同 OpenCV 版本indices 可能是 []、[0,1]、[[0],[1]]、np.array([...])
if indices is None or len(indices) == 0:
empty_masks = np.zeros((0, h0, w0), dtype=bool)
empty_color_mask = np.zeros((h0, w0, 3), dtype=np.uint8)
return empty_masks, [], empty_color_mask
# 统一成一维整型索引数组
indices = np.array(indices).reshape(-1)
x = x[indices]
# cxcywh -> xyxy这里用处理后的 x[:, :4]
x[:, 0:2] -= x[:, 2:4] / 2
x[:, 2:4] += x[:, 0:2]
# 去掉 pad缩放回原图
x[:, [0, 2]] -= pad_w
x[:, [1, 3]] -= pad_h
x[:, :4] /= min(ratio)
# 限制在图像范围内
x[:, [0, 2]] = x[:, [0, 2]].clip(0, w0)
x[:, [1, 3]] = x[:, [1, 3]].clip(0, h0)
# 解码 mask
protos = protos[0] # (32,Hm,Wm)
bboxes_xyxy = x[:, :4]
mask_coeffs = x[:, 6:]
masks = self._process_mask(protos, mask_coeffs, bboxes_xyxy, im0.shape)
# 掩码 -> 多边形
segments = self._masks2segments(masks)
# 打包坐标结果
coords = []
for (x1, y1, x2, y2, conf_i, cls_i), seg in zip(x[:, :6], segments):
cid = int(cls_i)
coords.append(
(
self.classes.get(cid, str(cid)),
seg.tolist()
)
)
color_mask = self._make_color_mask(im0, masks, coords)
return color_mask, coords
if __name__ == "__main__":
img_path = r"D:\Projects\Python\wall\app\core\yolo\test.jpg"
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}
# ====== 加载模型 ======
model = YOLOSeg()
img = cv2.imdecode(np.fromfile(str(img_path), dtype=np.uint8), cv2.IMREAD_COLOR)
color_mask, coords = model.detect(img)
print(color_mask.shape)
print(coords)